diff options
| author | bd <bdunahu@operationnull.com> | 2025-10-06 23:51:19 -0400 |
|---|---|---|
| committer | bd <bdunahu@operationnull.com> | 2025-10-06 23:51:19 -0400 |
| commit | a2b9381f1c93122ff25586e79275a29dbddde790 (patch) | |
| tree | b5c24b2dbecfc64c4158809564143816911b90b0 /nemesis/causal_event_loop.py | |
| parent | 8bdca7cf39662879882f85b8bc9771b4c834f527 (diff) | |
allow 'pausing' while callbacks are I/O buffered
Diffstat (limited to 'nemesis/causal_event_loop.py')
| -rw-r--r-- | nemesis/causal_event_loop.py | 39 |
1 files changed, 27 insertions, 12 deletions
diff --git a/nemesis/causal_event_loop.py b/nemesis/causal_event_loop.py index 75857d7..c2b3b1d 100644 --- a/nemesis/causal_event_loop.py +++ b/nemesis/causal_event_loop.py @@ -43,8 +43,8 @@ class TimeAwareMixin: register_time = None # the timestamp this callback completed i/o _when = None - # the timestamp this callback was called by the event loop - process_start_time = None + # the timestamp this callback entered the pause buffer + time_entered_pause_buffer = None def __init__(self): self.register_time = time.monotonic() @@ -189,7 +189,8 @@ class CausalEventLoop(asyncio.SelectorEventLoop): self._process_events(*event_list) # Handle 'later' callbacks that are ready. - end_time = self.time() + self._clock_resolution + curr_time = self.time() + end_time = curr_time + self._clock_resolution while self._scheduled: handle = self._scheduled[0] when = handle._when @@ -200,6 +201,7 @@ class CausalEventLoop(asyncio.SelectorEventLoop): time_to_buffer = when + self._get_pause_for_io(handle, curr_time) handle._when = time_to_buffer + handle.time_entered_pause_buffer = curr_time self._pause_buffer.add(handle) # handle callbacks which can leave pause timeout @@ -208,7 +210,18 @@ class CausalEventLoop(asyncio.SelectorEventLoop): if handle._when >= end_time: break # pop the first item in the list - self._ready.append(self._pause_buffer.pop(0)) + handle = self._pause_buffer.pop(0) + + # if we paused during buffering, we need to delay again + # TODO clean this up + # this whole file has 'rounding' timing errors :( + handle._when = handle._when + \ + self._get_pause_for_pause_time(handle, curr_time) + if handle._when >= end_time: + handle.time_entered_pause_buffer = curr_time + self._pause_buffer.add(handle) + else: + self._ready.append(handle) def _run_once(self): """ @@ -233,7 +246,6 @@ class CausalEventLoop(asyncio.SelectorEventLoop): self._current_handle = handle process_start_time = self.time() - handle.process_start_time = process_start_time handle._run() @@ -245,8 +257,7 @@ class CausalEventLoop(asyncio.SelectorEventLoop): time_interval = (handle._when, process_start_time) pause_time = self._get_pause_time(time_interval) - adjusted_start_time = handle.process_start_time - \ - pause_time + adjusted_start_time = process_start_time - pause_time wait_time = adjusted_start_time - handle._when assert wait_time >= -0.0001, f"wait time on {_format_handle(handle)} was found to be {wait_time:.4f}!" self._completed_coros.append((_format_handle(handle), wait_time)) @@ -280,8 +291,7 @@ class CausalEventLoop(asyncio.SelectorEventLoop): handle = events.Handle(callback, args, self, context) if handle._source_traceback: del handle._source_traceback[-1] - handle._when = self.time() - self._pause_buffer.add(handle) + self._add_callback(handle) return handle def call_soon_threadsafe(self, callback, *args, context=None): @@ -290,8 +300,7 @@ class CausalEventLoop(asyncio.SelectorEventLoop): if self._debug: self._check_callback(callback, 'call_soon_threadsafe') handle = events._ThreadSafeHandle(callback, args, self, context) - handle._when = self.time() - self._pause_buffer.add(handle) + self._add_callback(handle) if handle._source_traceback: del handle._source_traceback[-1] if handle._source_traceback: @@ -302,14 +311,20 @@ class CausalEventLoop(asyncio.SelectorEventLoop): def _add_callback(self, handle): """Add a Handle to _pause_buffer.""" if not handle._cancelled: + curr_time = self.time() if not handle._when: - handle._when = self.time() + handle._when = curr_time + handle.time_entered_pause_buffer = curr_time self._pause_buffer.add(handle) def _get_pause_for_io(self, handle, io_time): time_interval = (handle.register_time, io_time) return self._get_pause_time(time_interval) + def _get_pause_for_pause_time(self, handle, exit_time): + time_interval = (handle.time_entered_pause_buffer, exit_time) + return self._get_pause_time(time_interval) + def _get_pause_time(self, cb_interval): time = 0 start, end = cb_interval |
