summaryrefslogtreecommitdiff
path: root/nemesis/causal_event_loop.py
diff options
context:
space:
mode:
authorbd <bdunahu@operationnull.com>2025-09-08 13:25:22 -0400
committerbd <bdunahu@operationnull.com>2025-09-08 13:25:22 -0400
commitba1c72cedb56512f52c48ee947a2b11fa8a90c4d (patch)
treee92010316883e18f094ba68f222b57ca53232a5c /nemesis/causal_event_loop.py
parent187ce23b369bf2e2156f4c2bcb1077799013e634 (diff)
Perform speedups at coroutine granularity
Diffstat (limited to 'nemesis/causal_event_loop.py')
-rw-r--r--nemesis/causal_event_loop.py94
1 files changed, 71 insertions, 23 deletions
diff --git a/nemesis/causal_event_loop.py b/nemesis/causal_event_loop.py
index 4320655..35aca9d 100644
--- a/nemesis/causal_event_loop.py
+++ b/nemesis/causal_event_loop.py
@@ -1,3 +1,39 @@
+'''
+Copyright 2025 bdunahu
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+
+Commentary:
+
+ This event loop facilitates testing virtual speedups for specific coroutines by providing functions
+ to insert delays into I/O operations and synchronous callback logic.
+
+ A `_time_dilation' less than 1.0 will cause the event loop to slow down I/O and scheduled
+ callbacks proportionately. For this, some inspiration taken from aiodebug's 'time_dilated_loop'.
+
+ `_time_dilation' also slows down the execution of synchronous logic at the same proportion,
+ by modifying the `_run_once' to insert a calculated delay at the end of each callback.
+
+ The amount of delay to insert is dependent on how long the callback ran, and how long it spent running
+ inside the target coroutine. It relies on an external profiler (Nemesis) to signal when and how long
+ the target coroutine is running through the `ping_enter_coro' and `ping_exit_coro' methods.
+
+ Finally, this event loop provides an `_update_ready' method. This allows the attached profiler to
+ have access to which tasks are currently being blocked at any given point in time.
+
+Code:
+'''
+
import asyncio
import collections
import heapq
@@ -10,11 +46,6 @@ _MIN_CANCELLED_TIMER_HANDLES_FRACTION = 0.5
MAXIMUM_SELECT_TIMEOUT = 24 * 3600
class CausalEventLoop(asyncio.SelectorEventLoop):
- '''
- Internal time in this event loop may run faster or slower than objective time.
- Control loop time dilation vs. objective time by setting the ``time_dilation`` attribute
- (dilation == X means that loop time is running X times faster than objective time).
- '''
def __init__(self) -> None:
super().__init__()
@@ -22,27 +53,27 @@ class CausalEventLoop(asyncio.SelectorEventLoop):
self._last_subjective_time = None
self._processing = collections.deque()
self._time_dilation = 1.0
- self._task_to_optimize = None
+
+ self._time_entered_coro = None
+ self._accumulated_time = 0
_select = self._selector.select
def select(timeout: float):
return _select(timeout / self._time_dilation)
self._selector.select = select
- def _set_experiment(self, task, dilation):
- self._task_to_optimize = task
+ def _set_dilation(self, dilation):
self._time_dilation = dilation
+ self._time_entered_coro = None
- def _matches_task(self, handle):
- cb = handle._callback
- if isinstance(getattr(cb, '__self__', None), asyncio.tasks.Task):
- task = cb.__self__
- if task.get_name() == self._task_to_optimize:
- return True
- return False
+ def _get_time_in_coro(self):
+ t = self._accumulated_time
+ if self._time_entered_coro:
+ t += super().time() - self._time_entered_coro
+ return t
def _update_ready(self, sampling=False):
- """
+ '''
Polls the IO selector, schedules resulting callbacks, and schedules
'call_later' callbacks.
@@ -50,7 +81,7 @@ class CausalEventLoop(asyncio.SelectorEventLoop):
tasks may be updated more frequently than once per iteration.
If SAMPLING is true, the timeout passed to the selector will always be 0.
- """
+ '''
sched_count = len(self._scheduled)
if (sched_count > _MIN_SCHEDULED_TIMER_HANDLES and
self._timer_cancelled_count / sched_count >
@@ -101,7 +132,8 @@ class CausalEventLoop(asyncio.SelectorEventLoop):
self._ready.append(handle)
def _run_once(self):
- """Run one full iteration of the event loop.
+ """
+ Run one full iteration of the event loop.
This calls all currently ready callbacks.
"""
@@ -130,15 +162,31 @@ class CausalEventLoop(asyncio.SelectorEventLoop):
if self._debug and dt >= self.slow_callback_duration:
logger.warning('Executing %s took %.3f seconds',
_format_handle(handle), dt * 1/self._time_dilation)
- if not self._matches_task(handle):
- t0 = super().time()
- # insert a delay
- while super().time() - t0 < dt * (1/self._time_dilation - 1):
- time.sleep(0.001)
+
+ # calculate the amount of time to slow this callback down by. We only want
+ # to add time proportionate to the amount of time spent OUTSIDE of the
+ # coroutine of interest.
+ time_outside_coro = dt - self._get_time_in_coro()
+ delay = time_outside_coro * (1 / self._time_dilation - 1)
+ t0 = super().time()
+ # do it this way so the python interpreter still receives signals.
+ while super().time() - t0 < delay:
+ time.sleep(0.001)
+
+ self._time_entered_coro = super().time() if self._time_entered_coro else None
+ self._accumulated_time = 0
finally:
self._current_handle = None
handle = None # Needed to break cycles when an exception occurs.
+ def ping_enter_coro(self):
+ self._time_entered_coro = super().time()
+
+ def ping_exit_coro(self):
+ assert isinstance(self._time_entered_coro, float), f"Tried to exit coro before recorded entry!"
+ self._accumulated_time += super().time() - self._time_entered_coro
+ self._time_entered_coro = None
+
def time(self):
obj = super().time()
if self._last_objective_time is None: