diff options
| author | bd <bdunahu@operationnull.com> | 2025-10-06 21:34:35 -0400 |
|---|---|---|
| committer | bd <bdunahu@operationnull.com> | 2025-10-06 21:34:35 -0400 |
| commit | 8bdca7cf39662879882f85b8bc9771b4c834f527 (patch) | |
| tree | e3b976a174465392b7431cebc6be5aabc6f1997e /nemesis/causal_event_loop.py | |
| parent | 2edc08465723f444a1ef4108d41bac852f7be88a (diff) | |
components to implement best-guess timeout for selector
Diffstat (limited to 'nemesis/causal_event_loop.py')
| -rw-r--r-- | nemesis/causal_event_loop.py | 148 |
1 files changed, 91 insertions, 57 deletions
diff --git a/nemesis/causal_event_loop.py b/nemesis/causal_event_loop.py index d01dfd4..75857d7 100644 --- a/nemesis/causal_event_loop.py +++ b/nemesis/causal_event_loop.py @@ -25,6 +25,7 @@ import heapq import selectors import time import traceback +from copy import copy from asyncio.log import logger from asyncio import Task, events from asyncio.base_events import _format_handle @@ -34,20 +35,42 @@ _MIN_CANCELLED_TIMER_HANDLES_FRACTION = 0.5 MAXIMUM_SELECT_TIMEOUT = 24 * 3600 +orig_handle = asyncio.events.Handle + class TimeAwareMixin: # the timestamp this callback was registered register_time = None # the timestamp this callback completed i/o - io_time = None + _when = None # the timestamp this callback was called by the event loop process_start_time = None def __init__(self): self.register_time = time.monotonic() + def __hash__(self): + return hash(self._when) -orig_handle = asyncio.events.Handle + def __lt__(self, other): + if isinstance(other, orig_handle): + return self._when < other._when + return NotImplemented + + def __le__(self, other): + if isinstance(other, orig_handle): + return self._when < other._when or self.__eq__(other) + return NotImplemented + + def __gt__(self, other): + if isinstance(other, orig_handle): + return self._when > other._when + return NotImplemented + + def __ge__(self, other): + if isinstance(other, orig_handle): + return self._when > other._when or self.__eq__(other) + return NotImplemented def create_subclass(base_class): @@ -70,13 +93,14 @@ class CausalEventLoop(asyncio.SelectorEventLoop): # 1 means the target coroutine is optimized away entirely _speedup = 1.0 # a list of callbacks which have recently completed - _pause_buffer = [] + _pause_buffer = SortedList() # a list of intervals in which the target coroutine has been active _coro_intervals = SortedList() # a list of completed callbacks, and their associated queue time _completed_coros = [] # the last time we entered the target coro _time_entered_coro = None + _ready_events = [] def __init__(self) -> None: super().__init__() @@ -109,17 +133,14 @@ class CausalEventLoop(asyncio.SelectorEventLoop): self._coro_intervals.add((self._time_entered_coro, self.time())) self._time_entered_coro = None - def update_ready(self, can_stall=True): + def collect_ready_events(self, timeout=0): + event_list = self._selector.select(timeout) + self._ready_events.append((event_list, self.time())) + + def update_ready(self): ''' Polls the IO selector, schedules resulting callbacks, and schedules 'call_later' callbacks. - - This function can be called in the middle of an event loop iteration. - - This logic was separated out of `run_once` so that the list of `ready` - tasks may be updated more frequently than once per iteration. - - If SAMPLING is true, the timeout passed to the selector will always be 0. ''' curr_time = self.time() sched_count = len(self._scheduled) @@ -146,36 +167,48 @@ class CausalEventLoop(asyncio.SelectorEventLoop): handle._scheduled = False timeout = None - # TODO this needs to be rewritten - # We can't miss things placed in timeout either - # if not can_stall or self._ready or self._stopping: - timeout = 0 - # elif self._scheduled: - # # Compute the desired timeout. - # timeout = self._scheduled[0]._when - self.time() - # if timeout > MAXIMUM_SELECT_TIMEOUT: - # timeout = MAXIMUM_SELECT_TIMEOUT - # elif timeout < 0: - # timeout = 0 - - event_list = self._selector.select(timeout) - self._process_events(event_list) - # Needed to break cycles when an exception occurs. - event_list = None + if self._ready or self._stopping: + timeout = 0 + else: + curr_time = self.time() + if self._scheduled: + # Compute the desired timeout. + # requires computing our-best guess arrival time + handle = self._scheduled[0] + timeout = (handle._when + self._get_pause_for_io(handle, curr_time)) \ + - curr_time + timeout = max(0, min(timeout, MAXIMUM_SELECT_TIMEOUT)) + if self._pause_buffer: + # pause buffer has an exact arrival time + pause_timeout = self._pause_buffer[0]._when - curr_time + timeout = min(pause_timeout, timeout) if timeout else pause_timeout + timeout = max(0, min(timeout, MAXIMUM_SELECT_TIMEOUT)) + + self.collect_ready_events(timeout) + for event_list in self._ready_events: + self._process_events(*event_list) # Handle 'later' callbacks that are ready. end_time = self.time() + self._clock_resolution while self._scheduled: handle = self._scheduled[0] - if handle._when >= end_time: + when = handle._when + if when >= end_time: break handle = heapq.heappop(self._scheduled) handle._scheduled = False - time_interval = (handle.register_time, curr_time) - time_to_buffer = curr_time + self._get_pause_time(time_interval) - handle.io_time = time_to_buffer - self._ready.append(handle) + time_to_buffer = when + self._get_pause_for_io(handle, curr_time) + handle._when = time_to_buffer + self._pause_buffer.add(handle) + + # handle callbacks which can leave pause timeout + while self._pause_buffer: + handle = self._pause_buffer[0] + if handle._when >= end_time: + break + # pop the first item in the list + self._ready.append(self._pause_buffer.pop(0)) def _run_once(self): """ @@ -185,22 +218,15 @@ class CausalEventLoop(asyncio.SelectorEventLoop): """ self.update_ready() - current_time = self.time() - to_process = collections.deque([ - handle for handle in self._ready - if handle.io_time < (current_time + self._clock_resolution) - ]) - # This is the only place where callbacks are actually *called*. # All other places just add them to ready. # Note: We run all currently scheduled callbacks, but not any # callbacks scheduled by callbacks run this time around -- # they will be run the next time (after another I/O poll). # Use an idiom that is thread-safe without using locks. - ntodo = len(to_process) + ntodo = len(self._ready) for i in range(ntodo): - handle = to_process.popleft() - self._ready.remove(handle) + handle = self._ready.popleft() if handle._cancelled: continue try: @@ -217,11 +243,11 @@ class CausalEventLoop(asyncio.SelectorEventLoop): logger.warning('Executing %s took %.3f seconds', _format_handle(handle), dt) - time_interval = (handle.io_time, process_start_time) + time_interval = (handle._when, process_start_time) pause_time = self._get_pause_time(time_interval) adjusted_start_time = handle.process_start_time - \ pause_time - wait_time = adjusted_start_time - handle.io_time + wait_time = adjusted_start_time - handle._when assert wait_time >= -0.0001, f"wait time on {_format_handle(handle)} was found to be {wait_time:.4f}!" self._completed_coros.append((_format_handle(handle), wait_time)) except Exception: @@ -230,35 +256,32 @@ class CausalEventLoop(asyncio.SelectorEventLoop): self._current_handle = None handle = None # Needed to break cycles when an exception occurs. - def _process_events(self, event_list): - curr_time = self.time() + def _process_events(self, event_list, time): for key, mask in event_list: fileobj, (reader, writer) = key.fileobj, key.data if mask & selectors.EVENT_READ and reader is not None: if reader._cancelled: self._remove_reader(fileobj) else: - time_interval = (reader.register_time, curr_time) - time_to_buffer = curr_time + \ - self._get_pause_time(time_interval) - reader.io_time = time_to_buffer + time_to_buffer = time + \ + self._get_pause_for_io(reader, time) + reader._when = time_to_buffer self._add_callback(reader) if mask & selectors.EVENT_WRITE and writer is not None: if writer._cancelled: self._remove_writer(fileobj) else: - time_interval = (writer.register_time, curr_time) - time_to_buffer = curr_time + \ - self._get_pause_time(time_interval) - writer.io_time = time_to_buffer + time_to_buffer = time + \ + self._get_pause_for_io(writer, time) + writer._when = time_to_buffer self._add_callback(writer) def _call_soon(self, callback, args, context): handle = events.Handle(callback, args, self, context) if handle._source_traceback: del handle._source_traceback[-1] - handle.io_time = self.time() - self._ready.append(handle) + handle._when = self.time() + self._pause_buffer.add(handle) return handle def call_soon_threadsafe(self, callback, *args, context=None): @@ -267,8 +290,8 @@ class CausalEventLoop(asyncio.SelectorEventLoop): if self._debug: self._check_callback(callback, 'call_soon_threadsafe') handle = events._ThreadSafeHandle(callback, args, self, context) - handle.io_time = self.time() - self._ready.append(handle) + handle._when = self.time() + self._pause_buffer.add(handle) if handle._source_traceback: del handle._source_traceback[-1] if handle._source_traceback: @@ -276,6 +299,17 @@ class CausalEventLoop(asyncio.SelectorEventLoop): self._write_to_self() return handle + def _add_callback(self, handle): + """Add a Handle to _pause_buffer.""" + if not handle._cancelled: + if not handle._when: + handle._when = self.time() + self._pause_buffer.add(handle) + + def _get_pause_for_io(self, handle, io_time): + time_interval = (handle.register_time, io_time) + return self._get_pause_time(time_interval) + def _get_pause_time(self, cb_interval): time = 0 start, end = cb_interval |
