From 4fae663bfa9f228148f3fbd6b55196ce3492e5ad Mon Sep 17 00:00:00 2001 From: bd Date: Tue, 2 Dec 2025 20:33:28 -0500 Subject: attempt to fix incorrect select timeout --- nemesis/causal_event_loop.py | 154 ++++++++++++++++++++----------------------- 1 file changed, 70 insertions(+), 84 deletions(-) (limited to 'nemesis') diff --git a/nemesis/causal_event_loop.py b/nemesis/causal_event_loop.py index 1b04135..35792fd 100644 --- a/nemesis/causal_event_loop.py +++ b/nemesis/causal_event_loop.py @@ -54,10 +54,8 @@ class TimeAwareMixin: # the timestamp this callback was registered register_time = None - # the timestamp this callback completed i/o + # the timestamp this callback ideally runs _when = None - # the timestamp this callback entered the pause buffer - time_entered_pause_buffer = None def __init__(self: Self) -> None: self.register_time = time.monotonic() @@ -107,8 +105,6 @@ class CausalEventLoop(asyncio.SelectorEventLoop): # a value between 0 and 1. 0 means no optimization, # 1 means the target coroutine is optimized away entirely _speedup = 0.0 - # a list of callbacks which have recently completed - _pause_buffer = [] # a list of intervals in which the target coroutine has been active _coro_intervals = SortedList() # a list of completed callbacks, and their associated queue time @@ -203,42 +199,29 @@ class CausalEventLoop(asyncio.SelectorEventLoop): # Remove delayed calls that were cancelled if their number # is too high new_scheduled = [] - for handle in self._scheduled: + for eff_guess, handle in self._scheduled: if handle._cancelled: handle._scheduled = False # self._logger.debug(f"\tSlow cleanup killed {_format_handle(handle)}") else: - new_scheduled.append(handle) + new_scheduled.append((eff_guess, handle)) heapq.heapify(new_scheduled) self._scheduled = new_scheduled self._timer_cancelled_count = 0 else: # Remove delayed calls that were cancelled from head of queue. - while self._scheduled and self._scheduled[0]._cancelled: + while self._scheduled and self._scheduled[0][1]._cancelled: self._timer_cancelled_count -= 1 - handle = heapq.heappop(self._scheduled) + _, handle = heapq.heappop(self._scheduled) # self._logger.debug(f"\tLazy cleanup killed {_format_handle(handle)}") handle._scheduled = False timeout = None if self._ready or self._stopping: timeout = 0 - else: - curr_time = self.time() - if self._scheduled: - # Compute the desired timeout. - # requires computing our-best guess arrival time - handle = self._scheduled[0] - timeout = (handle._when + self._get_pause_for_io(handle, curr_time)) \ - - curr_time - timeout = max(0, min(timeout, MAXIMUM_SELECT_TIMEOUT)) - if self._pause_buffer: - # pause buffer has an exact arrival time - pause_timeout = self._pause_buffer[0]._when - curr_time - timeout = min(pause_timeout, timeout) if timeout else pause_timeout - timeout = max(0, min(timeout, MAXIMUM_SELECT_TIMEOUT)) - timeout = 0 + elif self._scheduled: + timeout = _get_next_effective_time() # self._logger.debug(f"HANDLING I/O.") # self._logger.info("-" * 20) @@ -251,56 +234,9 @@ class CausalEventLoop(asyncio.SelectorEventLoop): # self._logger.debug(f"HANDLING SCHEDULED.") # self._logger.info("-" * 20) - # Handle 'later' callbacks that are ready. - curr_time = self.time() - end_time = curr_time + self._clock_resolution - while self._scheduled: - handle = self._scheduled[0] - when = handle._when - if when >= end_time: - break - handle = heapq.heappop(self._scheduled) - handle._scheduled = False - - delay = self._get_pause_for_io(handle, curr_time) - time_to_buffer = when + delay - handle._when = time_to_buffer - handle.time_entered_pause_buffer = curr_time - # do not allow duplicates (FIX?) - if handle not in self._pause_buffer: - heapq.heappush(self._pause_buffer, handle) - # self._logger.debug(f"\tscheduled -> pause_buffer for {_format_handle(handle)} (delay={delay})") - - - # self._logger.debug(f"HANDLING PAUSED.") - # self._logger.info("-" * 20) - # handle callbacks which can leave pause timeout - while self._pause_buffer: - # required when handle's _when is modified in place - # heapq.heapify(self._pause_buffer) - handle = self._pause_buffer[0] - for other in self._pause_buffer[1:]: - if handle._when > other._when: - # self._logger.critical(f"{other} was modified in place: first: {handle._when} modified: {other._when}") - assert handle._when <= other._when, f"Heap root {handle} is not smallest" - if handle._when >= end_time: - # self._logger.debug(f"\t{_format_handle(handle)} is not ready to leave. Moving on.") - break - # pop the first item in the list - handle = heapq.heappop(self._pause_buffer) - - # if we paused during buffering, we need to delay again - # TODO clean this up - # this whole file has 'rounding' timing errors :( - delay = self._get_pause_for_pause_time(handle, curr_time) - handle._when = handle._when + delay - if handle._when >= end_time: - handle.time_entered_pause_buffer = curr_time - heapq.heappush(self._pause_buffer, handle) - # self._logger.debug(f"\tpause_buffer -> pause_buffer for {_format_handle(handle)} (delay={delay})") - else: - self._ready.append(handle) - # self._logger.debug(f"\tpause_buffer -> ready for {_format_handle(handle)}") + # Handle 'scheduled' callbacks that are ready. + # note 'scheduled' callbacks include both I/O bound or call_later + self._add_ready_handles() @line_profiler.profile def _run_once(self: Any) -> None: @@ -359,6 +295,44 @@ class CausalEventLoop(asyncio.SelectorEventLoop): self._current_handle = None handle = None # Needed to break cycles when an exception occurs. + def _get_next_effective_time(self): + """ + Returns the delta time in which the next currently processing callback + will be ready. + + The time this takes is O(log(n)), where n is the number of currently processing + callbacks. We can get away with lazily updating the estimated effective run time + of each item because `self._get_pause_for_io` never underestimates the true + effective firing time. By recomputing the heap head's true effective run time, + if our guess is correct, no other timer can have a smaller effective time. + """ + next_effective_time = None + curr_time = self.time() + while True: + eff_guess, handle = self._scheduled[0] + eff_true = handle._when + self._get_pause_for_io(handle, curr_time) + if eff_true == eff_guess: + next_effective_time = eff_true - curr_time + break + heapq.heappop(self._scheduled) + heapq.heappush(self._scheduled, (eff_true, handle)) + return min(next_effective_time, MAXIMUM_SELECT_TIMEOUT) + + def _add_ready_handles(self): + curr_time = self.time() + self._clock_resolution + while self._scheduled: + eff_guess, handle = heapq.heappop(self._scheduled) + eff_true = handle._when + self._get_pause_for_io(handle, curr_time) + if curr_time >= eff_true: + # set the true 'when' + handle._when = eff_true + self._ready.append(handle) + # self._logger.debug(f"\tscheduled -> _ready for {_format_handle(handle)}") + else: + heapq.heappush(self._scheduled, (eff_true, handle)) + break + + def _process_events(self, event_list, time): for key, mask in event_list: fileobj, (reader, writer) = key.fileobj, key.data @@ -381,6 +355,25 @@ class CausalEventLoop(asyncio.SelectorEventLoop): writer._when = time_to_buffer self._add_callback(writer) + def call_at(self, when, callback, *args, context=None): + """Like call_later(), but uses an absolute time. + + Absolute time corresponds to the event loop's time() method. + """ + if when is None: + raise TypeError("when cannot be None") + self._check_closed() + if self._debug: + self._check_thread() + self._check_callback(callback, 'call_at') + timer = events.TimerHandle(when, callback, args, self, context) + if timer._source_traceback: + del timer._source_traceback[-1] + heapq.heappush(self._scheduled, timer) + heapq.heappush(self._estimated, (timer._when, timer)) + timer._scheduled = True + return timer + def _call_soon(self: Any, callback: Any, args: tuple, context: "contextvars.Context"): """Do not add 'callsoon' events to the pause buffer. Add them directly to ready.""" @@ -412,7 +405,6 @@ class CausalEventLoop(asyncio.SelectorEventLoop): return handle def _add_callback(self, handle): - """Add a Handle to _pause_buffer.""" curr_time = self.time() if not handle._when: handle._when = curr_time @@ -425,13 +417,11 @@ class CausalEventLoop(asyncio.SelectorEventLoop): self._ready.append(handle) # self._logger.debug(f"\tio -> ready for VITAL {_format_handle(handle)}") else: - handle.time_entered_pause_buffer = curr_time - # do not allow duplicates (FIX?) - if handle not in self._pause_buffer: - heapq.heappush(self._pause_buffer, handle) - # self._logger.debug(f"\tio -> pause_buffer for {_format_handle(handle)} (delay={handle._when - curr_time})") + heapq.heappush(self._scheduled, (handle._when, handle)) + # self._logger.debug(f"\tio -> _scheduled for {_format_handle(handle)} (delay={handle._when - curr_time})") else: # self._logger.warning(f"\t_add_callback called on cancelled handle {_format_handle(handle)}") + pass def _get_pause_for_io(self: Any, handle, io_time: float) -> float: time_interval = (handle.register_time, io_time) @@ -445,10 +435,6 @@ class CausalEventLoop(asyncio.SelectorEventLoop): return p_time - def _get_pause_for_pause_time(self: Any, handle, exit_time: float) -> float: - time_interval = (handle.time_entered_pause_buffer, exit_time) - return self._get_pause_time(time_interval) - def _get_pause_time(self: Any, cb_interval: tuple[float, float]) -> float: time = 0 start, end = cb_interval -- cgit v1.2.3