summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorbd <bdunahu@operationnull.com>2025-09-08 13:25:22 -0400
committerbd <bdunahu@operationnull.com>2025-09-08 13:25:22 -0400
commitba1c72cedb56512f52c48ee947a2b11fa8a90c4d (patch)
treee92010316883e18f094ba68f222b57ca53232a5c
parent187ce23b369bf2e2156f4c2bcb1077799013e634 (diff)
Perform speedups at coroutine granularity
-rw-r--r--nemesis/causal_event_loop.py94
-rw-r--r--nemesis/experiment.py14
-rwxr-xr-xnemesis/nemesis.py64
3 files changed, 124 insertions, 48 deletions
diff --git a/nemesis/causal_event_loop.py b/nemesis/causal_event_loop.py
index 4320655..35aca9d 100644
--- a/nemesis/causal_event_loop.py
+++ b/nemesis/causal_event_loop.py
@@ -1,3 +1,39 @@
+'''
+Copyright 2025 bdunahu
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+
+Commentary:
+
+ This event loop facilitates testing virtual speedups for specific coroutines by providing functions
+ to insert delays into I/O operations and synchronous callback logic.
+
+ A `_time_dilation' less than 1.0 will cause the event loop to slow down I/O and scheduled
+ callbacks proportionately. For this, some inspiration taken from aiodebug's 'time_dilated_loop'.
+
+ `_time_dilation' also slows down the execution of synchronous logic at the same proportion,
+ by modifying the `_run_once' to insert a calculated delay at the end of each callback.
+
+ The amount of delay to insert is dependent on how long the callback ran, and how long it spent running
+ inside the target coroutine. It relies on an external profiler (Nemesis) to signal when and how long
+ the target coroutine is running through the `ping_enter_coro' and `ping_exit_coro' methods.
+
+ Finally, this event loop provides an `_update_ready' method. This allows the attached profiler to
+ have access to which tasks are currently being blocked at any given point in time.
+
+Code:
+'''
+
import asyncio
import collections
import heapq
@@ -10,11 +46,6 @@ _MIN_CANCELLED_TIMER_HANDLES_FRACTION = 0.5
MAXIMUM_SELECT_TIMEOUT = 24 * 3600
class CausalEventLoop(asyncio.SelectorEventLoop):
- '''
- Internal time in this event loop may run faster or slower than objective time.
- Control loop time dilation vs. objective time by setting the ``time_dilation`` attribute
- (dilation == X means that loop time is running X times faster than objective time).
- '''
def __init__(self) -> None:
super().__init__()
@@ -22,27 +53,27 @@ class CausalEventLoop(asyncio.SelectorEventLoop):
self._last_subjective_time = None
self._processing = collections.deque()
self._time_dilation = 1.0
- self._task_to_optimize = None
+
+ self._time_entered_coro = None
+ self._accumulated_time = 0
_select = self._selector.select
def select(timeout: float):
return _select(timeout / self._time_dilation)
self._selector.select = select
- def _set_experiment(self, task, dilation):
- self._task_to_optimize = task
+ def _set_dilation(self, dilation):
self._time_dilation = dilation
+ self._time_entered_coro = None
- def _matches_task(self, handle):
- cb = handle._callback
- if isinstance(getattr(cb, '__self__', None), asyncio.tasks.Task):
- task = cb.__self__
- if task.get_name() == self._task_to_optimize:
- return True
- return False
+ def _get_time_in_coro(self):
+ t = self._accumulated_time
+ if self._time_entered_coro:
+ t += super().time() - self._time_entered_coro
+ return t
def _update_ready(self, sampling=False):
- """
+ '''
Polls the IO selector, schedules resulting callbacks, and schedules
'call_later' callbacks.
@@ -50,7 +81,7 @@ class CausalEventLoop(asyncio.SelectorEventLoop):
tasks may be updated more frequently than once per iteration.
If SAMPLING is true, the timeout passed to the selector will always be 0.
- """
+ '''
sched_count = len(self._scheduled)
if (sched_count > _MIN_SCHEDULED_TIMER_HANDLES and
self._timer_cancelled_count / sched_count >
@@ -101,7 +132,8 @@ class CausalEventLoop(asyncio.SelectorEventLoop):
self._ready.append(handle)
def _run_once(self):
- """Run one full iteration of the event loop.
+ """
+ Run one full iteration of the event loop.
This calls all currently ready callbacks.
"""
@@ -130,15 +162,31 @@ class CausalEventLoop(asyncio.SelectorEventLoop):
if self._debug and dt >= self.slow_callback_duration:
logger.warning('Executing %s took %.3f seconds',
_format_handle(handle), dt * 1/self._time_dilation)
- if not self._matches_task(handle):
- t0 = super().time()
- # insert a delay
- while super().time() - t0 < dt * (1/self._time_dilation - 1):
- time.sleep(0.001)
+
+ # calculate the amount of time to slow this callback down by. We only want
+ # to add time proportionate to the amount of time spent OUTSIDE of the
+ # coroutine of interest.
+ time_outside_coro = dt - self._get_time_in_coro()
+ delay = time_outside_coro * (1 / self._time_dilation - 1)
+ t0 = super().time()
+ # do it this way so the python interpreter still receives signals.
+ while super().time() - t0 < delay:
+ time.sleep(0.001)
+
+ self._time_entered_coro = super().time() if self._time_entered_coro else None
+ self._accumulated_time = 0
finally:
self._current_handle = None
handle = None # Needed to break cycles when an exception occurs.
+ def ping_enter_coro(self):
+ self._time_entered_coro = super().time()
+
+ def ping_exit_coro(self):
+ assert isinstance(self._time_entered_coro, float), f"Tried to exit coro before recorded entry!"
+ self._accumulated_time += super().time() - self._time_entered_coro
+ self._time_entered_coro = None
+
def time(self):
obj = super().time()
if self._last_objective_time is None:
diff --git a/nemesis/experiment.py b/nemesis/experiment.py
index 87a0103..9d179ea 100644
--- a/nemesis/experiment.py
+++ b/nemesis/experiment.py
@@ -10,7 +10,7 @@ class BadLoopTypeException(Exception):
class Experiment:
# the selected task for this experiment
- _task = None
+ _coro = None
# the selected speedup for this experiment
_speedup = None
# event loops participating in this this experiment
@@ -19,21 +19,21 @@ class Experiment:
# represent number of times sampled.
_samples = None
- def __init__(self, task, speedup):
- self._task = task
+ def __init__(self, coro, speedup):
+ self._coro = coro
self._speedup = speedup
self._samples = defaultdict(lambda: 0)
self._set_loops()
- def get_task(self):
- return self._task
+ def get_coro(self):
+ return self._coro
def get_speedup(self):
return self._speedup
def get_results(self):
- ret = f"Results for {self._task} at {self._speedup} times speedup:\n"
+ ret = f"Results for {self._coro} at {self._speedup} times speedup:\n"
if len(self._samples) > 0:
ret += (f" {'HANDLE':<30} {'LOOP':<10} {'SEC':<10}")
for key in self._sort_samples(self._samples):
@@ -54,7 +54,7 @@ class Experiment:
for l in self._loops:
if not isinstance(l, CausalEventLoop):
raise BadLoopTypeException("Nemesis requires a custom event loop to insert slowdowns. It does not work on programs which change the event loop policy.")
- l._set_experiment(self._task, 1.0 / self._speedup)
+ l._set_dilation(1.0 / self._speedup)
def _get_event_loops(self):
'''Returns each thread's event loop, if it exists.'''
diff --git a/nemesis/nemesis.py b/nemesis/nemesis.py
index eaf41f3..69d0c2b 100755
--- a/nemesis/nemesis.py
+++ b/nemesis/nemesis.py
@@ -25,16 +25,17 @@ Commentary:
Code:
'''
+from asyncio.base_events import _format_handle
+from collections import defaultdict
+from experiment import Experiment
import argparse
import asyncio
+import os
import signal
import sys
-import traceback
import time
+import traceback
import types
-import os
-from experiment import Experiment
-from asyncio.base_events import _format_handle
class Nemesis(object):
@@ -51,20 +52,21 @@ class Nemesis(object):
e_duration = None
# The number of seconds remaining in this performance experiment.
r_duration = None
+ # A mapping of event loops to the previous running coroutine.
+ prev_coro = defaultdict(lambda: None)
# temp
- task = None
+ coro = None
dilation = 1.0
@staticmethod
- def __init__(task, speedup, e_duration, w_time, signal_interval=0.01):
- os.environ["PYTHONASYNCIODEBUG"] = "1"
+ def __init__(coro, speedup, e_duration, w_time, signal_interval=0.01):
Nemesis.signal_interval = signal_interval
Nemesis.e_duration = e_duration
- Nemesis.r_duration = w_time
+ Nemesis.r_duration = 0
# temporary
- Nemesis.task = task
- Nemesis.speedup = speedup
+ Nemesis.coro = coro
+ Nemesis.speedup = max(speedup, 1.0)
@staticmethod
def start():
@@ -86,12 +88,13 @@ class Nemesis(object):
@staticmethod
def _start_experiment():
Nemesis.r_duration = Nemesis.e_duration
- Nemesis.curr_experiment = Experiment(Nemesis.task, Nemesis.speedup)
+ Nemesis.prev_coro = defaultdict(lambda: None)
+ Nemesis.curr_experiment = Experiment(Nemesis.coro, Nemesis.speedup)
@staticmethod
def _stop_experiment():
if Nemesis.curr_experiment is not None:
- print(f'finished running {Nemesis.curr_experiment.get_task()} with speedup {Nemesis.curr_experiment.get_speedup()}')
+ print(f'finished running {Nemesis.curr_experiment.get_coro()} with speedup {Nemesis.curr_experiment.get_speedup()}')
Nemesis.results.append(Nemesis.curr_experiment.get_results())
del Nemesis.curr_experiment
@@ -102,8 +105,17 @@ class Nemesis(object):
Nemesis.last_sample = curr_sample
if Nemesis.curr_experiment:
loops = Nemesis.curr_experiment.get_loops()
- # print(loops)
+ exp_coro = Nemesis.curr_experiment.get_coro()
for loop in loops:
+ coro = Nemesis._get_current_coro(loop)
+ prev_coro = Nemesis.prev_coro[loop]
+ if not prev_coro == coro:
+ if prev_coro == exp_coro:
+ loop.ping_exit_coro()
+ elif coro == exp_coro:
+ loop.ping_enter_coro()
+ Nemesis.prev_coro[loop] = coro
+
loop._update_ready(True)
handles = Nemesis._get_waiting_handles(loop)
Nemesis.curr_experiment.add_handles(handles, loop, passed_time)
@@ -122,11 +134,27 @@ class Nemesis(object):
handles.append(fmt_handle)
return handles
+ def _get_current_coro(loop):
+ tid = loop._thread_id
+ assert tid, f"{loop} is not running, yet we attempted to sample it!"
+
+ frame = sys._current_frames().get(tid)
+ fname = frame.f_code.co_filename
+ while not Nemesis._should_trace(fname):
+ if frame:
+ frame = frame.f_back
+ else:
+ break
+ if frame:
+ fname = frame.f_code.co_filename
+ if frame and frame.f_generator:
+ return frame.f_generator.cr_code.co_name
+ return None
+
@staticmethod
def _should_trace(filename):
'''Returns FALSE if filename is uninteresting to the user.
- Don't depend on this. It's good enough for testing.'''
- # FIXME Assume GuixSD. Makes filtering easy
+ Don't depend on this. It kind of sucks.'''
if not filename:
return False
if '/gnu/store' in filename:
@@ -172,10 +200,10 @@ if __name__ == "__main__":
type=float,
default=0.01)
parser.add_argument('-s', '--speedup',
- help='The amount of virtual speedup.',
+ help='The amount of virtual speedup. Cannot go below one. Default is 2.0.',
metavar='',
type=float,
- default=0.5)
+ default=2.0)
parser.add_argument('-c', '--task',
help='The task to virtually speedup.',
metavar='',
@@ -187,7 +215,7 @@ if __name__ == "__main__":
type=float,
default=4)
parser.add_argument('-w', '--warmup-time',
- help='Amount of time to wait until the first performance experiment. Default is the minimum time of 100 milliseconds',
+ help='Amount of time to wait until the first performance experiment. Default is 0 milliseconds',
metavar='',
type=float,
default=0.1)