diff options
author | bd <bdunahu@operationnull.com> | 2025-09-08 13:25:22 -0400 |
---|---|---|
committer | bd <bdunahu@operationnull.com> | 2025-09-08 13:25:22 -0400 |
commit | ba1c72cedb56512f52c48ee947a2b11fa8a90c4d (patch) | |
tree | e92010316883e18f094ba68f222b57ca53232a5c | |
parent | 187ce23b369bf2e2156f4c2bcb1077799013e634 (diff) |
Perform speedups at coroutine granularity
-rw-r--r-- | nemesis/causal_event_loop.py | 94 | ||||
-rw-r--r-- | nemesis/experiment.py | 14 | ||||
-rwxr-xr-x | nemesis/nemesis.py | 64 |
3 files changed, 124 insertions, 48 deletions
diff --git a/nemesis/causal_event_loop.py b/nemesis/causal_event_loop.py index 4320655..35aca9d 100644 --- a/nemesis/causal_event_loop.py +++ b/nemesis/causal_event_loop.py @@ -1,3 +1,39 @@ +''' +Copyright 2025 bdunahu + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + +Commentary: + + This event loop facilitates testing virtual speedups for specific coroutines by providing functions + to insert delays into I/O operations and synchronous callback logic. + + A `_time_dilation' less than 1.0 will cause the event loop to slow down I/O and scheduled + callbacks proportionately. For this, some inspiration taken from aiodebug's 'time_dilated_loop'. + + `_time_dilation' also slows down the execution of synchronous logic at the same proportion, + by modifying the `_run_once' to insert a calculated delay at the end of each callback. + + The amount of delay to insert is dependent on how long the callback ran, and how long it spent running + inside the target coroutine. It relies on an external profiler (Nemesis) to signal when and how long + the target coroutine is running through the `ping_enter_coro' and `ping_exit_coro' methods. + + Finally, this event loop provides an `_update_ready' method. This allows the attached profiler to + have access to which tasks are currently being blocked at any given point in time. + +Code: +''' + import asyncio import collections import heapq @@ -10,11 +46,6 @@ _MIN_CANCELLED_TIMER_HANDLES_FRACTION = 0.5 MAXIMUM_SELECT_TIMEOUT = 24 * 3600 class CausalEventLoop(asyncio.SelectorEventLoop): - ''' - Internal time in this event loop may run faster or slower than objective time. - Control loop time dilation vs. objective time by setting the ``time_dilation`` attribute - (dilation == X means that loop time is running X times faster than objective time). - ''' def __init__(self) -> None: super().__init__() @@ -22,27 +53,27 @@ class CausalEventLoop(asyncio.SelectorEventLoop): self._last_subjective_time = None self._processing = collections.deque() self._time_dilation = 1.0 - self._task_to_optimize = None + + self._time_entered_coro = None + self._accumulated_time = 0 _select = self._selector.select def select(timeout: float): return _select(timeout / self._time_dilation) self._selector.select = select - def _set_experiment(self, task, dilation): - self._task_to_optimize = task + def _set_dilation(self, dilation): self._time_dilation = dilation + self._time_entered_coro = None - def _matches_task(self, handle): - cb = handle._callback - if isinstance(getattr(cb, '__self__', None), asyncio.tasks.Task): - task = cb.__self__ - if task.get_name() == self._task_to_optimize: - return True - return False + def _get_time_in_coro(self): + t = self._accumulated_time + if self._time_entered_coro: + t += super().time() - self._time_entered_coro + return t def _update_ready(self, sampling=False): - """ + ''' Polls the IO selector, schedules resulting callbacks, and schedules 'call_later' callbacks. @@ -50,7 +81,7 @@ class CausalEventLoop(asyncio.SelectorEventLoop): tasks may be updated more frequently than once per iteration. If SAMPLING is true, the timeout passed to the selector will always be 0. - """ + ''' sched_count = len(self._scheduled) if (sched_count > _MIN_SCHEDULED_TIMER_HANDLES and self._timer_cancelled_count / sched_count > @@ -101,7 +132,8 @@ class CausalEventLoop(asyncio.SelectorEventLoop): self._ready.append(handle) def _run_once(self): - """Run one full iteration of the event loop. + """ + Run one full iteration of the event loop. This calls all currently ready callbacks. """ @@ -130,15 +162,31 @@ class CausalEventLoop(asyncio.SelectorEventLoop): if self._debug and dt >= self.slow_callback_duration: logger.warning('Executing %s took %.3f seconds', _format_handle(handle), dt * 1/self._time_dilation) - if not self._matches_task(handle): - t0 = super().time() - # insert a delay - while super().time() - t0 < dt * (1/self._time_dilation - 1): - time.sleep(0.001) + + # calculate the amount of time to slow this callback down by. We only want + # to add time proportionate to the amount of time spent OUTSIDE of the + # coroutine of interest. + time_outside_coro = dt - self._get_time_in_coro() + delay = time_outside_coro * (1 / self._time_dilation - 1) + t0 = super().time() + # do it this way so the python interpreter still receives signals. + while super().time() - t0 < delay: + time.sleep(0.001) + + self._time_entered_coro = super().time() if self._time_entered_coro else None + self._accumulated_time = 0 finally: self._current_handle = None handle = None # Needed to break cycles when an exception occurs. + def ping_enter_coro(self): + self._time_entered_coro = super().time() + + def ping_exit_coro(self): + assert isinstance(self._time_entered_coro, float), f"Tried to exit coro before recorded entry!" + self._accumulated_time += super().time() - self._time_entered_coro + self._time_entered_coro = None + def time(self): obj = super().time() if self._last_objective_time is None: diff --git a/nemesis/experiment.py b/nemesis/experiment.py index 87a0103..9d179ea 100644 --- a/nemesis/experiment.py +++ b/nemesis/experiment.py @@ -10,7 +10,7 @@ class BadLoopTypeException(Exception): class Experiment: # the selected task for this experiment - _task = None + _coro = None # the selected speedup for this experiment _speedup = None # event loops participating in this this experiment @@ -19,21 +19,21 @@ class Experiment: # represent number of times sampled. _samples = None - def __init__(self, task, speedup): - self._task = task + def __init__(self, coro, speedup): + self._coro = coro self._speedup = speedup self._samples = defaultdict(lambda: 0) self._set_loops() - def get_task(self): - return self._task + def get_coro(self): + return self._coro def get_speedup(self): return self._speedup def get_results(self): - ret = f"Results for {self._task} at {self._speedup} times speedup:\n" + ret = f"Results for {self._coro} at {self._speedup} times speedup:\n" if len(self._samples) > 0: ret += (f" {'HANDLE':<30} {'LOOP':<10} {'SEC':<10}") for key in self._sort_samples(self._samples): @@ -54,7 +54,7 @@ class Experiment: for l in self._loops: if not isinstance(l, CausalEventLoop): raise BadLoopTypeException("Nemesis requires a custom event loop to insert slowdowns. It does not work on programs which change the event loop policy.") - l._set_experiment(self._task, 1.0 / self._speedup) + l._set_dilation(1.0 / self._speedup) def _get_event_loops(self): '''Returns each thread's event loop, if it exists.''' diff --git a/nemesis/nemesis.py b/nemesis/nemesis.py index eaf41f3..69d0c2b 100755 --- a/nemesis/nemesis.py +++ b/nemesis/nemesis.py @@ -25,16 +25,17 @@ Commentary: Code: ''' +from asyncio.base_events import _format_handle +from collections import defaultdict +from experiment import Experiment import argparse import asyncio +import os import signal import sys -import traceback import time +import traceback import types -import os -from experiment import Experiment -from asyncio.base_events import _format_handle class Nemesis(object): @@ -51,20 +52,21 @@ class Nemesis(object): e_duration = None # The number of seconds remaining in this performance experiment. r_duration = None + # A mapping of event loops to the previous running coroutine. + prev_coro = defaultdict(lambda: None) # temp - task = None + coro = None dilation = 1.0 @staticmethod - def __init__(task, speedup, e_duration, w_time, signal_interval=0.01): - os.environ["PYTHONASYNCIODEBUG"] = "1" + def __init__(coro, speedup, e_duration, w_time, signal_interval=0.01): Nemesis.signal_interval = signal_interval Nemesis.e_duration = e_duration - Nemesis.r_duration = w_time + Nemesis.r_duration = 0 # temporary - Nemesis.task = task - Nemesis.speedup = speedup + Nemesis.coro = coro + Nemesis.speedup = max(speedup, 1.0) @staticmethod def start(): @@ -86,12 +88,13 @@ class Nemesis(object): @staticmethod def _start_experiment(): Nemesis.r_duration = Nemesis.e_duration - Nemesis.curr_experiment = Experiment(Nemesis.task, Nemesis.speedup) + Nemesis.prev_coro = defaultdict(lambda: None) + Nemesis.curr_experiment = Experiment(Nemesis.coro, Nemesis.speedup) @staticmethod def _stop_experiment(): if Nemesis.curr_experiment is not None: - print(f'finished running {Nemesis.curr_experiment.get_task()} with speedup {Nemesis.curr_experiment.get_speedup()}') + print(f'finished running {Nemesis.curr_experiment.get_coro()} with speedup {Nemesis.curr_experiment.get_speedup()}') Nemesis.results.append(Nemesis.curr_experiment.get_results()) del Nemesis.curr_experiment @@ -102,8 +105,17 @@ class Nemesis(object): Nemesis.last_sample = curr_sample if Nemesis.curr_experiment: loops = Nemesis.curr_experiment.get_loops() - # print(loops) + exp_coro = Nemesis.curr_experiment.get_coro() for loop in loops: + coro = Nemesis._get_current_coro(loop) + prev_coro = Nemesis.prev_coro[loop] + if not prev_coro == coro: + if prev_coro == exp_coro: + loop.ping_exit_coro() + elif coro == exp_coro: + loop.ping_enter_coro() + Nemesis.prev_coro[loop] = coro + loop._update_ready(True) handles = Nemesis._get_waiting_handles(loop) Nemesis.curr_experiment.add_handles(handles, loop, passed_time) @@ -122,11 +134,27 @@ class Nemesis(object): handles.append(fmt_handle) return handles + def _get_current_coro(loop): + tid = loop._thread_id + assert tid, f"{loop} is not running, yet we attempted to sample it!" + + frame = sys._current_frames().get(tid) + fname = frame.f_code.co_filename + while not Nemesis._should_trace(fname): + if frame: + frame = frame.f_back + else: + break + if frame: + fname = frame.f_code.co_filename + if frame and frame.f_generator: + return frame.f_generator.cr_code.co_name + return None + @staticmethod def _should_trace(filename): '''Returns FALSE if filename is uninteresting to the user. - Don't depend on this. It's good enough for testing.''' - # FIXME Assume GuixSD. Makes filtering easy + Don't depend on this. It kind of sucks.''' if not filename: return False if '/gnu/store' in filename: @@ -172,10 +200,10 @@ if __name__ == "__main__": type=float, default=0.01) parser.add_argument('-s', '--speedup', - help='The amount of virtual speedup.', + help='The amount of virtual speedup. Cannot go below one. Default is 2.0.', metavar='', type=float, - default=0.5) + default=2.0) parser.add_argument('-c', '--task', help='The task to virtually speedup.', metavar='', @@ -187,7 +215,7 @@ if __name__ == "__main__": type=float, default=4) parser.add_argument('-w', '--warmup-time', - help='Amount of time to wait until the first performance experiment. Default is the minimum time of 100 milliseconds', + help='Amount of time to wait until the first performance experiment. Default is 0 milliseconds', metavar='', type=float, default=0.1) |