From ba1c72cedb56512f52c48ee947a2b11fa8a90c4d Mon Sep 17 00:00:00 2001 From: bd Date: Mon, 8 Sep 2025 13:25:22 -0400 Subject: Perform speedups at coroutine granularity --- nemesis/causal_event_loop.py | 94 +++++++++++++++++++++++++++++++++----------- 1 file changed, 71 insertions(+), 23 deletions(-) (limited to 'nemesis/causal_event_loop.py') diff --git a/nemesis/causal_event_loop.py b/nemesis/causal_event_loop.py index 4320655..35aca9d 100644 --- a/nemesis/causal_event_loop.py +++ b/nemesis/causal_event_loop.py @@ -1,3 +1,39 @@ +''' +Copyright 2025 bdunahu + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + +Commentary: + + This event loop facilitates testing virtual speedups for specific coroutines by providing functions + to insert delays into I/O operations and synchronous callback logic. + + A `_time_dilation' less than 1.0 will cause the event loop to slow down I/O and scheduled + callbacks proportionately. For this, some inspiration taken from aiodebug's 'time_dilated_loop'. + + `_time_dilation' also slows down the execution of synchronous logic at the same proportion, + by modifying the `_run_once' to insert a calculated delay at the end of each callback. + + The amount of delay to insert is dependent on how long the callback ran, and how long it spent running + inside the target coroutine. It relies on an external profiler (Nemesis) to signal when and how long + the target coroutine is running through the `ping_enter_coro' and `ping_exit_coro' methods. + + Finally, this event loop provides an `_update_ready' method. This allows the attached profiler to + have access to which tasks are currently being blocked at any given point in time. + +Code: +''' + import asyncio import collections import heapq @@ -10,11 +46,6 @@ _MIN_CANCELLED_TIMER_HANDLES_FRACTION = 0.5 MAXIMUM_SELECT_TIMEOUT = 24 * 3600 class CausalEventLoop(asyncio.SelectorEventLoop): - ''' - Internal time in this event loop may run faster or slower than objective time. - Control loop time dilation vs. objective time by setting the ``time_dilation`` attribute - (dilation == X means that loop time is running X times faster than objective time). - ''' def __init__(self) -> None: super().__init__() @@ -22,27 +53,27 @@ class CausalEventLoop(asyncio.SelectorEventLoop): self._last_subjective_time = None self._processing = collections.deque() self._time_dilation = 1.0 - self._task_to_optimize = None + + self._time_entered_coro = None + self._accumulated_time = 0 _select = self._selector.select def select(timeout: float): return _select(timeout / self._time_dilation) self._selector.select = select - def _set_experiment(self, task, dilation): - self._task_to_optimize = task + def _set_dilation(self, dilation): self._time_dilation = dilation + self._time_entered_coro = None - def _matches_task(self, handle): - cb = handle._callback - if isinstance(getattr(cb, '__self__', None), asyncio.tasks.Task): - task = cb.__self__ - if task.get_name() == self._task_to_optimize: - return True - return False + def _get_time_in_coro(self): + t = self._accumulated_time + if self._time_entered_coro: + t += super().time() - self._time_entered_coro + return t def _update_ready(self, sampling=False): - """ + ''' Polls the IO selector, schedules resulting callbacks, and schedules 'call_later' callbacks. @@ -50,7 +81,7 @@ class CausalEventLoop(asyncio.SelectorEventLoop): tasks may be updated more frequently than once per iteration. If SAMPLING is true, the timeout passed to the selector will always be 0. - """ + ''' sched_count = len(self._scheduled) if (sched_count > _MIN_SCHEDULED_TIMER_HANDLES and self._timer_cancelled_count / sched_count > @@ -101,7 +132,8 @@ class CausalEventLoop(asyncio.SelectorEventLoop): self._ready.append(handle) def _run_once(self): - """Run one full iteration of the event loop. + """ + Run one full iteration of the event loop. This calls all currently ready callbacks. """ @@ -130,15 +162,31 @@ class CausalEventLoop(asyncio.SelectorEventLoop): if self._debug and dt >= self.slow_callback_duration: logger.warning('Executing %s took %.3f seconds', _format_handle(handle), dt * 1/self._time_dilation) - if not self._matches_task(handle): - t0 = super().time() - # insert a delay - while super().time() - t0 < dt * (1/self._time_dilation - 1): - time.sleep(0.001) + + # calculate the amount of time to slow this callback down by. We only want + # to add time proportionate to the amount of time spent OUTSIDE of the + # coroutine of interest. + time_outside_coro = dt - self._get_time_in_coro() + delay = time_outside_coro * (1 / self._time_dilation - 1) + t0 = super().time() + # do it this way so the python interpreter still receives signals. + while super().time() - t0 < delay: + time.sleep(0.001) + + self._time_entered_coro = super().time() if self._time_entered_coro else None + self._accumulated_time = 0 finally: self._current_handle = None handle = None # Needed to break cycles when an exception occurs. + def ping_enter_coro(self): + self._time_entered_coro = super().time() + + def ping_exit_coro(self): + assert isinstance(self._time_entered_coro, float), f"Tried to exit coro before recorded entry!" + self._accumulated_time += super().time() - self._time_entered_coro + self._time_entered_coro = None + def time(self): obj = super().time() if self._last_objective_time is None: -- cgit v1.2.3