diff options
author | bd <bdunahu@operationnull.com> | 2025-09-07 18:58:57 -0400 |
---|---|---|
committer | bd <bdunahu@operationnull.com> | 2025-09-07 18:58:57 -0400 |
commit | 187ce23b369bf2e2156f4c2bcb1077799013e634 (patch) | |
tree | 4e0071e829cef34fd236282c031e5541634f26e6 /nemesis/causal_event_loop.py |
initial commit
Diffstat (limited to 'nemesis/causal_event_loop.py')
-rw-r--r-- | nemesis/causal_event_loop.py | 159 |
1 files changed, 159 insertions, 0 deletions
diff --git a/nemesis/causal_event_loop.py b/nemesis/causal_event_loop.py new file mode 100644 index 0000000..4320655 --- /dev/null +++ b/nemesis/causal_event_loop.py @@ -0,0 +1,159 @@ +import asyncio +import collections +import heapq +import time +from asyncio.log import logger +from asyncio.base_events import _format_handle + +_MIN_SCHEDULED_TIMER_HANDLES = 100 +_MIN_CANCELLED_TIMER_HANDLES_FRACTION = 0.5 +MAXIMUM_SELECT_TIMEOUT = 24 * 3600 + +class CausalEventLoop(asyncio.SelectorEventLoop): + ''' + Internal time in this event loop may run faster or slower than objective time. + Control loop time dilation vs. objective time by setting the ``time_dilation`` attribute + (dilation == X means that loop time is running X times faster than objective time). + ''' + + def __init__(self) -> None: + super().__init__() + self._last_objective_time = None + self._last_subjective_time = None + self._processing = collections.deque() + self._time_dilation = 1.0 + self._task_to_optimize = None + + _select = self._selector.select + def select(timeout: float): + return _select(timeout / self._time_dilation) + self._selector.select = select + + def _set_experiment(self, task, dilation): + self._task_to_optimize = task + self._time_dilation = dilation + + def _matches_task(self, handle): + cb = handle._callback + if isinstance(getattr(cb, '__self__', None), asyncio.tasks.Task): + task = cb.__self__ + if task.get_name() == self._task_to_optimize: + return True + return False + + def _update_ready(self, sampling=False): + """ + Polls the IO selector, schedules resulting callbacks, and schedules + 'call_later' callbacks. + + This logic was separated out of `run_once` so that the list of `ready` + tasks may be updated more frequently than once per iteration. + + If SAMPLING is true, the timeout passed to the selector will always be 0. + """ + sched_count = len(self._scheduled) + if (sched_count > _MIN_SCHEDULED_TIMER_HANDLES and + self._timer_cancelled_count / sched_count > + _MIN_CANCELLED_TIMER_HANDLES_FRACTION): + # Remove delayed calls that were cancelled if their number + # is too high + new_scheduled = [] + for handle in self._scheduled: + if handle._cancelled: + handle._scheduled = False + else: + new_scheduled.append(handle) + + heapq.heapify(new_scheduled) + self._scheduled = new_scheduled + self._timer_cancelled_count = 0 + else: + # Remove delayed calls that were cancelled from head of queue. + while self._scheduled and self._scheduled[0]._cancelled: + self._timer_cancelled_count -= 1 + handle = heapq.heappop(self._scheduled) + handle._scheduled = False + + timeout = None + if sampling or self._ready or self._stopping: + timeout = 0 + elif self._scheduled: + # Compute the desired timeout. + timeout = self._scheduled[0]._when - self.time() + if timeout > MAXIMUM_SELECT_TIMEOUT: + timeout = MAXIMUM_SELECT_TIMEOUT + elif timeout < 0: + timeout = 0 + + event_list = self._selector.select(timeout) + self._process_events(event_list) + # Needed to break cycles when an exception occurs. + event_list = None + + # Handle 'later' callbacks that are ready. + end_time = self.time() + self._clock_resolution + while self._scheduled: + handle = self._scheduled[0] + if handle._when >= end_time: + break + handle = heapq.heappop(self._scheduled) + handle._scheduled = False + self._ready.append(handle) + + def _run_once(self): + """Run one full iteration of the event loop. + + This calls all currently ready callbacks. + """ + self._update_ready() + self._processing.extend(self._ready) + # This is the only place where callbacks are actually *called*. + # All other places just add them to ready. + # Note: We run all currently scheduled callbacks, but not any + # callbacks scheduled by callbacks run this time around -- + # they will be run the next time (after another I/O poll). + # Use an idiom that is thread-safe without using locks. + ntodo = len(self._processing) + for i in range(ntodo): + handle = self._processing.popleft() + try: + self._ready.remove(handle) + except ValueError: + pass + if handle._cancelled: + continue + try: + self._current_handle = handle + t0 = super().time() + handle._run() + dt = super().time() - t0 + if self._debug and dt >= self.slow_callback_duration: + logger.warning('Executing %s took %.3f seconds', + _format_handle(handle), dt * 1/self._time_dilation) + if not self._matches_task(handle): + t0 = super().time() + # insert a delay + while super().time() - t0 < dt * (1/self._time_dilation - 1): + time.sleep(0.001) + finally: + self._current_handle = None + handle = None # Needed to break cycles when an exception occurs. + + def time(self): + obj = super().time() + if self._last_objective_time is None: + self._last_objective_time = self._last_subjective_time = obj + return obj + else: + sub = self._last_subjective_time + (obj - self._last_objective_time) * self._time_dilation + self._last_subjective_time = sub + self._last_objective_time = obj + return sub + + +class CausalEventLoopPolicy(asyncio.DefaultEventLoopPolicy): + def new_event_loop(self): + return CausalEventLoop() + + +asyncio.set_event_loop_policy(CausalEventLoopPolicy()) |