From 2edc08465723f444a1ef4108d41bac852f7be88a Mon Sep 17 00:00:00 2001 From: bd Date: Mon, 6 Oct 2025 18:27:09 -0400 Subject: initial commit --- nemesis/causal_event_loop.py | 309 ++++++++++++++++++++++++++++++++++++++ nemesis/html_gen.py | 77 ++++++++++ nemesis/nemesis.py | 345 +++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 731 insertions(+) create mode 100644 nemesis/causal_event_loop.py create mode 100644 nemesis/html_gen.py create mode 100755 nemesis/nemesis.py (limited to 'nemesis') diff --git a/nemesis/causal_event_loop.py b/nemesis/causal_event_loop.py new file mode 100644 index 0000000..d01dfd4 --- /dev/null +++ b/nemesis/causal_event_loop.py @@ -0,0 +1,309 @@ +''' +Copyright 2025 bdunahu + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + +Commentary: + +Code: +''' + +import asyncio +import collections +from sortedcontainers import SortedList +import heapq +import selectors +import time +import traceback +from asyncio.log import logger +from asyncio import Task, events +from asyncio.base_events import _format_handle + +_MIN_SCHEDULED_TIMER_HANDLES = 100 +_MIN_CANCELLED_TIMER_HANDLES_FRACTION = 0.5 +MAXIMUM_SELECT_TIMEOUT = 24 * 3600 + + +class TimeAwareMixin: + + # the timestamp this callback was registered + register_time = None + # the timestamp this callback completed i/o + io_time = None + # the timestamp this callback was called by the event loop + process_start_time = None + + def __init__(self): + self.register_time = time.monotonic() + + +orig_handle = asyncio.events.Handle + + +def create_subclass(base_class): + class NewSubclass(base_class, TimeAwareMixin): + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + TimeAwareMixin.__init__(self) + + return NewSubclass + + +# make all the subclasses inherit from TimeAwareHandle as well +for sc in [orig_handle] + orig_handle.__subclasses__(): + subclass = create_subclass(sc) + setattr(asyncio.events, sc.__name__, subclass) + +class CausalEventLoop(asyncio.SelectorEventLoop): + + # a value between 0 and 1. 0 means no optimization, + # 1 means the target coroutine is optimized away entirely + _speedup = 1.0 + # a list of callbacks which have recently completed + _pause_buffer = [] + # a list of intervals in which the target coroutine has been active + _coro_intervals = SortedList() + # a list of completed callbacks, and their associated queue time + _completed_coros = [] + # the last time we entered the target coro + _time_entered_coro = None + + def __init__(self) -> None: + super().__init__() + + def set_speedup(self, speedup): + self._speedup = speedup + + # reset experiment counters + self._time_entered_coro = None + self._coro_intervals.clear() + self._completed_coros.clear() + + def get_completed_coros(self): + return self._completed_coros + + def get_pause_time(self): + if not self._coro_intervals: + return 0 + + start_interval = self._coro_intervals[0][0] + end_interval = self.time() if self._time_entered_coro else self._coro_intervals[-1][1] + interval = (start_interval, end_interval) + return self._get_pause_time(interval) + + def ping_enter_coro(self): + self._time_entered_coro = self.time() + + def ping_exit_coro(self): + assert isinstance(self._time_entered_coro, float), f"Tried to exit coro before recorded entry!" + self._coro_intervals.add((self._time_entered_coro, self.time())) + self._time_entered_coro = None + + def update_ready(self, can_stall=True): + ''' + Polls the IO selector, schedules resulting callbacks, and schedules + 'call_later' callbacks. + + This function can be called in the middle of an event loop iteration. + + This logic was separated out of `run_once` so that the list of `ready` + tasks may be updated more frequently than once per iteration. + + If SAMPLING is true, the timeout passed to the selector will always be 0. + ''' + curr_time = self.time() + sched_count = len(self._scheduled) + if (sched_count > _MIN_SCHEDULED_TIMER_HANDLES and + self._timer_cancelled_count / sched_count > + _MIN_CANCELLED_TIMER_HANDLES_FRACTION): + # Remove delayed calls that were cancelled if their number + # is too high + new_scheduled = [] + for handle in self._scheduled: + if handle._cancelled: + handle._scheduled = False + else: + new_scheduled.append(handle) + + heapq.heapify(new_scheduled) + self._scheduled = new_scheduled + self._timer_cancelled_count = 0 + else: + # Remove delayed calls that were cancelled from head of queue. + while self._scheduled and self._scheduled[0]._cancelled: + self._timer_cancelled_count -= 1 + handle = heapq.heappop(self._scheduled) + handle._scheduled = False + + timeout = None + # TODO this needs to be rewritten + # We can't miss things placed in timeout either + # if not can_stall or self._ready or self._stopping: + timeout = 0 + # elif self._scheduled: + # # Compute the desired timeout. + # timeout = self._scheduled[0]._when - self.time() + # if timeout > MAXIMUM_SELECT_TIMEOUT: + # timeout = MAXIMUM_SELECT_TIMEOUT + # elif timeout < 0: + # timeout = 0 + + event_list = self._selector.select(timeout) + self._process_events(event_list) + # Needed to break cycles when an exception occurs. + event_list = None + + # Handle 'later' callbacks that are ready. + end_time = self.time() + self._clock_resolution + while self._scheduled: + handle = self._scheduled[0] + if handle._when >= end_time: + break + handle = heapq.heappop(self._scheduled) + handle._scheduled = False + + time_interval = (handle.register_time, curr_time) + time_to_buffer = curr_time + self._get_pause_time(time_interval) + handle.io_time = time_to_buffer + self._ready.append(handle) + + def _run_once(self): + """ + Run one full iteration of the event loop. + + This calls all currently ready callbacks. + """ + self.update_ready() + + current_time = self.time() + to_process = collections.deque([ + handle for handle in self._ready + if handle.io_time < (current_time + self._clock_resolution) + ]) + + # This is the only place where callbacks are actually *called*. + # All other places just add them to ready. + # Note: We run all currently scheduled callbacks, but not any + # callbacks scheduled by callbacks run this time around -- + # they will be run the next time (after another I/O poll). + # Use an idiom that is thread-safe without using locks. + ntodo = len(to_process) + for i in range(ntodo): + handle = to_process.popleft() + self._ready.remove(handle) + if handle._cancelled: + continue + try: + self._current_handle = handle + + process_start_time = self.time() + handle.process_start_time = process_start_time + + handle._run() + + process_end_time = self.time() + dt = process_end_time - process_start_time + if self._debug and dt >= self.slow_callback_duration: + logger.warning('Executing %s took %.3f seconds', + _format_handle(handle), dt) + + time_interval = (handle.io_time, process_start_time) + pause_time = self._get_pause_time(time_interval) + adjusted_start_time = handle.process_start_time - \ + pause_time + wait_time = adjusted_start_time - handle.io_time + assert wait_time >= -0.0001, f"wait time on {_format_handle(handle)} was found to be {wait_time:.4f}!" + self._completed_coros.append((_format_handle(handle), wait_time)) + except Exception: + traceback.print_exc() + finally: + self._current_handle = None + handle = None # Needed to break cycles when an exception occurs. + + def _process_events(self, event_list): + curr_time = self.time() + for key, mask in event_list: + fileobj, (reader, writer) = key.fileobj, key.data + if mask & selectors.EVENT_READ and reader is not None: + if reader._cancelled: + self._remove_reader(fileobj) + else: + time_interval = (reader.register_time, curr_time) + time_to_buffer = curr_time + \ + self._get_pause_time(time_interval) + reader.io_time = time_to_buffer + self._add_callback(reader) + if mask & selectors.EVENT_WRITE and writer is not None: + if writer._cancelled: + self._remove_writer(fileobj) + else: + time_interval = (writer.register_time, curr_time) + time_to_buffer = curr_time + \ + self._get_pause_time(time_interval) + writer.io_time = time_to_buffer + self._add_callback(writer) + + def _call_soon(self, callback, args, context): + handle = events.Handle(callback, args, self, context) + if handle._source_traceback: + del handle._source_traceback[-1] + handle.io_time = self.time() + self._ready.append(handle) + return handle + + def call_soon_threadsafe(self, callback, *args, context=None): + """Like call_soon(), but thread-safe.""" + self._check_closed() + if self._debug: + self._check_callback(callback, 'call_soon_threadsafe') + handle = events._ThreadSafeHandle(callback, args, self, context) + handle.io_time = self.time() + self._ready.append(handle) + if handle._source_traceback: + del handle._source_traceback[-1] + if handle._source_traceback: + del handle._source_traceback[-1] + self._write_to_self() + return handle + + def _get_pause_time(self, cb_interval): + time = 0 + start, end = cb_interval + + for coro_start, coro_end in self._coro_intervals: + if start < coro_end and coro_start < end: + time += self._get_overlap(start, end, coro_start, coro_end) + # coro_intervals are sorted, so by this time all overlap has passed + if end < coro_start: + break + + curr_time = self.time() + if self._time_entered_coro and \ + start < curr_time and self._time_entered_coro < end: + time += self._get_overlap(start, end, self._time_entered_coro, + curr_time) + + return time * self._speedup + + def _get_overlap(self, a_start, a_end, b_start, b_end): + overlap_start = max(a_start, b_start) + overlap_end = min(a_end, b_end) + return overlap_end - overlap_start + + +class CausalEventLoopPolicy(asyncio.DefaultEventLoopPolicy): + def new_event_loop(self): + return CausalEventLoop() + + +asyncio.set_event_loop_policy(CausalEventLoopPolicy()) diff --git a/nemesis/html_gen.py b/nemesis/html_gen.py new file mode 100644 index 0000000..b774da9 --- /dev/null +++ b/nemesis/html_gen.py @@ -0,0 +1,77 @@ +import plotly.graph_objects as go +from plotly.subplots import make_subplots +import hashlib + +def get_color(name): + hash_object = hashlib.md5(name.encode()) + color_index = int(hash_object.hexdigest(), 16) % 360 + return f'hsl({color_index}, 100%, 50%)' + +def plot_results(results, filename): + fig = make_subplots(rows=3, cols=1) + + for i, (coro_name, x_values) in enumerate(results.items(), start=1): + x_list = [] + y_latency_list = [] + y_throughput_list = [] + y_max_latency_list = [] + + for speedup, experiments in x_values.items(): + for experiment in experiments: + + completed_callbacks = experiment["latency"] + virtual_run_time = experiment["virtual_run_time"][0] + + x_list.append(speedup * 100) + + num_callbacks = len(completed_callbacks) + + # handle average latency graph + if num_callbacks > 0: + + total_wait = sum([cb[1] for cb in completed_callbacks]) + max_wait = max([cb[1] for cb in completed_callbacks]) + latency = total_wait / num_callbacks + + y_max_latency_list.append(max_wait) + y_latency_list.append(latency) + else: + y_latency_list.append(0) + + # handle throughput graph + throughput = num_callbacks / virtual_run_time + y_throughput_list.append(throughput) + + fig.add_trace(go.Scatter( + x=x_list, + y=y_latency_list, + mode='markers', + name=coro_name, + marker=dict(color=get_color(coro_name)), + showlegend=True, + ), row=1, col=1) + + fig.add_trace(go.Scatter( + x=x_list, + y=y_throughput_list, + mode='markers', + name=coro_name, + marker=dict(color=get_color(coro_name)), + showlegend=False, + ), row=2, col=1) + + fig.add_trace(go.Scatter( + x=x_list, + y=y_max_latency_list, + mode='markers', + name=coro_name, + marker=dict(color=get_color(coro_name)), + showlegend=False, + ), row=3, col=1) + + fig.update_xaxes(title_text="Speedup (% optimized away)", row=3, col=1) + fig.update_yaxes(title_text="Average Handle Latency (seconds)", row=1, col=1) + fig.update_yaxes(title_text="Throughput (callbacks per second)", row=2, col=1) + fig.update_yaxes(title_text="Maximum Handle Latency (seconds)", row=3, col=1) + + fig.write_html(filename) diff --git a/nemesis/nemesis.py b/nemesis/nemesis.py new file mode 100755 index 0000000..27e93de --- /dev/null +++ b/nemesis/nemesis.py @@ -0,0 +1,345 @@ +#!/usr/bin/env python3 +''' + _/ _/ _/ + _/_/ _/ _/_/ _/_/_/ _/_/ _/_/ _/_/_/ _/_/_/ + _/ _/ _/ _/_/_/_/ _/ _/ _/ _/_/_/_/ _/_/ _/ _/_/ + _/ _/_/ _/ _/ _/ _/ _/ _/_/ _/ _/_/ + _/ _/ _/_/_/ _/ _/ _/ _/_/_/ _/_/_/ _/ _/_/_/ + + +Copyright 2025 bdunahu + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + +Commentary: +Code: +''' + +from causal_event_loop import CausalEventLoop +from collections import defaultdict +from html_gen import plot_results +import argparse +import asyncio +import os +import random +import signal +import sys +import threading +import time +import traceback +import types + +class Experiment: + + # event loops participating in this this experiment + _loops = [] + + def __init__(self, loops): + self._loops = loops + + def get_loops(self): + return [l for l in self._loops if l.is_running()] + +class Nemesis(object): + + # the (ideal) interval between samples + signal_interval = 0.0 + # the timestamp which the last sample was taken + last_sample = None + + # the current experiment being run + experiment_data = None + # the coroutine the current experiment is speeding up + experiment_coro = None + # the speedup of the current experiment + experiment_spdp = None + # the total time this experiment has been running + experiment_time = None + + # results from previous experiments. Keys represent names of coroutines. + results = defaultdict(lambda: defaultdict(lambda: [])) + + # the file to write results to + filename = None + + # The base duration of each performance experiment + e_duration = None + + # A mapping of event loops to the previous running coroutine. + prev_coro = defaultdict(lambda: None) + + @staticmethod + def __init__(e_duration, filename, signal_interval=0.01): + Nemesis.signal_interval = signal_interval + Nemesis.e_duration = e_duration + Nemesis.filename = filename + + @staticmethod + def start(): + Nemesis.last_sample = time.perf_counter() + signal.signal(signal.SIGALRM, + Nemesis._signal_handler) + signal.setitimer(signal.ITIMER_REAL, + Nemesis.signal_interval, + Nemesis.signal_interval) + + @staticmethod + def print_results(): + for coro_name, x_values in Nemesis.results.items(): + print(f'Results for {coro_name:}') + for speedup, experiments in x_values.items(): + print(f' {speedup * 100}% speedup:') + for experiment in experiments: + num_callbacks = len(experiment) + if num_callbacks > 0: + total_wait = sum([cb[1] for cb in experiment]) + latency = total_wait / num_callbacks + print(f' latency: {latency}') + print(f' callbacks processed: {num_callbacks}') + print(f'') + + @staticmethod + def stop(): + signal.setitimer(signal.ITIMER_REAL, 0) + plot_results(Nemesis.results, Nemesis.filename) + print(f"Wrote {Nemesis.filename}") + + @staticmethod + def _start_experiment(coro, speedup): + Nemesis.prev_coro = defaultdict(lambda: None) + Nemesis.experiment_coro = coro + Nemesis.experiment_spdp = speedup + Nemesis.experiment_time = 0 + + loops = Nemesis._get_event_loops() + for loop in loops: + if not isinstance(loop, CausalEventLoop): + raise RuntimeError("Nemesis requires a custom event loop to insert slowdowns. It does not work on programs which change the event loop policy.") + loop.set_speedup(speedup) + + Nemesis.experiment_data = Experiment(loops) + + @staticmethod + def _stop_experiment(): + if Nemesis.experiment_data is not None: + loops = Nemesis.experiment_data.get_loops() + + latency = [] + virtual_run_time = [] + for loop in loops: + latency.extend(loop.get_completed_coros()) + + pause_time = loop.get_pause_time() + virtual_run_time.append(Nemesis.experiment_time - pause_time) + + results = { + "latency": latency, + "virtual_run_time": virtual_run_time, + } + + print(f'Ran {Nemesis.experiment_coro} at {Nemesis.experiment_spdp} speed') + Nemesis.results[Nemesis.experiment_coro][Nemesis.experiment_spdp].append(results) + del Nemesis.experiment_data + + @staticmethod + def _signal_handler(sig, frame): + curr_sample = time.perf_counter() + passed_time = curr_sample - Nemesis.last_sample + Nemesis.last_sample = curr_sample + + if getattr(Nemesis, 'experiment_data', None): + loops = Nemesis.experiment_data.get_loops() + exp_coro = Nemesis.experiment_coro + for loop in loops: + coro = Nemesis._get_current_coro(loop) + prev_coro = Nemesis.prev_coro[loop] + if not prev_coro == coro: + if prev_coro == exp_coro: + loop.ping_exit_coro() + elif coro == exp_coro: + loop.ping_enter_coro() + Nemesis.prev_coro[loop] = coro + + loop.update_ready(False) + + Nemesis.experiment_time += passed_time + if (Nemesis.e_duration <= Nemesis.experiment_time): + Nemesis._stop_experiment() + + else: + coros = [] + loops = Nemesis._get_event_loops() + for loop in loops: + coro = Nemesis._get_current_coro(loop) + if coro is not None: + coros.append(coro) + if coros: + Nemesis._start_experiment(random.choice(coros), + Nemesis._select_speedup()) + + @staticmethod + def _parse_handle(handle): + cb = handle._callback + if isinstance(getattr(cb, '__self__', None), asyncio.tasks.Task): + task = cb.__self__ + coro = task.get_coro() + return [task.get_name(), Nemesis._get_coro_name(coro)] + else: + return [str(type(handle).__name__), cb.__name__] + + @staticmethod + def _get_current_coro(loop): + tid = loop._thread_id + assert tid, f"{loop} is not running, yet we attempted to sample it!" + + frame = sys._current_frames().get(tid) + fname = frame.f_code.co_filename + while not Nemesis._should_trace(fname): + if frame: + frame = frame.f_back + else: + break + if frame: + fname = frame.f_code.co_filename + if frame and frame.f_generator: + return frame.f_generator.cr_code.co_name + return None + + @staticmethod + def _get_event_loops(): + '''Returns each thread's event loop, if it exists.''' + loops = [] + for t in threading.enumerate(): + frame = sys._current_frames().get(t.ident) + if frame: + loop = Nemesis._walk_back_until_loop(frame) + if loop and loop not in loops: + loops.append(loop) + return loops + + @staticmethod + def _walk_back_until_loop(frame): + '''Walks back the callstack until we are in a method named '_run_once'. + If this is ever true, we assume we are in an Asyncio event loop method, + and check to see if the 'self' variable is indeed and instance of + AbstractEventLoop. Return this variable if true.''' + while frame: + if frame.f_code.co_name == '_run_once' and 'self' in frame.f_locals: + loop = frame.f_locals['self'] + if isinstance(loop, asyncio.AbstractEventLoop): + return loop + else: + frame = frame.f_back + return None + + @staticmethod + def _should_trace(filename): + '''Returns FALSE if filename is uninteresting to the user. + Don't depend on this. It kind of sucks.''' + if not filename: + return False + if '/gnu/store' in filename: + return False + if '/usr/local/lib/python' in filename: + return False + if 'site-packages' in filename: + return False + if 'propcache' in filename: + return False + if '.pyx' in filename: + return False + if filename[0] == '<': + return False + if 'nemesis' in filename: + return False + return True + + def _select_speedup(): + ''' + Returns a random speedup between 0% to 100%, in multiples of 5%. + Because a baseline is needed to calculate effect on program + performance, selects a speedup of 0 with 50% probability. + ''' + r1 = random.randint(-19, 20) + return max(0, r1) / 20 + + @staticmethod + def _get_coro_name(coro): + ''' + Stolen from _format_coroutine in cpython/Lib/asyncio/coroutines.py + ''' + # Coroutines compiled with Cython sometimes don't have + # proper __qualname__ or __name__. While that is a bug + # in Cython, asyncio shouldn't crash with an AttributeError + # in its __repr__ functions. + if hasattr(coro, '__qualname__') and coro.__qualname__: + coro_name = coro.__qualname__ + elif hasattr(coro, '__name__') and coro.__name__: + coro_name = coro.__name__ + else: + # Stop masking Cython bugs, expose them in a friendly way. + coro_name = f'<{type(coro).__name__} without __name__>' + return f'{coro_name}()' + +the_globals = { + '__name__': '__main__', + '__doc__': None, + '__package__': None, + '__loader__': globals()['__loader__'], + '__spec__': None, + '__annotations__': {}, + '__builtins__': globals()['__builtins__'], + '__file__': None, + '__cached__': None, +} + + +if __name__ == "__main__": + # parses CLI arguments and facilitates profiler runtime. + parser = argparse.ArgumentParser( + usage='%(prog)s [args] -- prog' + ) + + parser.add_argument('-i', '--interval', + help='The minimum amount of time inbetween \ + samples in seconds.', + metavar='', + type=float, + default=0.01) + parser.add_argument('-e', '--experiment-duration', + help='The performance experiment duration. Defaults to 3 seconds.', + metavar='', + type=float, + default=3) + parser.add_argument('-f', '--filename', + help='The filename to write results to.', + metavar='', + type=str, + default="results.html") + parser.add_argument('prog', + type=str, + nargs='*', + help='Path to the python script and its arguments.') + args = parser.parse_args() + + sys.argv = args.prog + try: + with open(args.prog[0], 'r', encoding='utf-8') as fp: + code = compile(fp.read(), args.prog[0], "exec") + Nemesis(args.experiment_duration, + args.filename, + args.interval).start() + exec(code, the_globals) + Nemesis.stop() + except Exception: + traceback.print_exc() -- cgit v1.2.3