diff options
Diffstat (limited to 'nemesis/nemesis.py')
| -rwxr-xr-x | nemesis/nemesis.py | 345 |
1 files changed, 345 insertions, 0 deletions
diff --git a/nemesis/nemesis.py b/nemesis/nemesis.py new file mode 100755 index 0000000..27e93de --- /dev/null +++ b/nemesis/nemesis.py @@ -0,0 +1,345 @@ +#!/usr/bin/env python3 +''' + _/ _/ _/ + _/_/ _/ _/_/ _/_/_/ _/_/ _/_/ _/_/_/ _/_/_/ + _/ _/ _/ _/_/_/_/ _/ _/ _/ _/_/_/_/ _/_/ _/ _/_/ + _/ _/_/ _/ _/ _/ _/ _/ _/_/ _/ _/_/ + _/ _/ _/_/_/ _/ _/ _/ _/_/_/ _/_/_/ _/ _/_/_/ + + +Copyright 2025 bdunahu + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + +Commentary: +Code: +''' + +from causal_event_loop import CausalEventLoop +from collections import defaultdict +from html_gen import plot_results +import argparse +import asyncio +import os +import random +import signal +import sys +import threading +import time +import traceback +import types + +class Experiment: + + # event loops participating in this this experiment + _loops = [] + + def __init__(self, loops): + self._loops = loops + + def get_loops(self): + return [l for l in self._loops if l.is_running()] + +class Nemesis(object): + + # the (ideal) interval between samples + signal_interval = 0.0 + # the timestamp which the last sample was taken + last_sample = None + + # the current experiment being run + experiment_data = None + # the coroutine the current experiment is speeding up + experiment_coro = None + # the speedup of the current experiment + experiment_spdp = None + # the total time this experiment has been running + experiment_time = None + + # results from previous experiments. Keys represent names of coroutines. + results = defaultdict(lambda: defaultdict(lambda: [])) + + # the file to write results to + filename = None + + # The base duration of each performance experiment + e_duration = None + + # A mapping of event loops to the previous running coroutine. + prev_coro = defaultdict(lambda: None) + + @staticmethod + def __init__(e_duration, filename, signal_interval=0.01): + Nemesis.signal_interval = signal_interval + Nemesis.e_duration = e_duration + Nemesis.filename = filename + + @staticmethod + def start(): + Nemesis.last_sample = time.perf_counter() + signal.signal(signal.SIGALRM, + Nemesis._signal_handler) + signal.setitimer(signal.ITIMER_REAL, + Nemesis.signal_interval, + Nemesis.signal_interval) + + @staticmethod + def print_results(): + for coro_name, x_values in Nemesis.results.items(): + print(f'Results for {coro_name:}') + for speedup, experiments in x_values.items(): + print(f' {speedup * 100}% speedup:') + for experiment in experiments: + num_callbacks = len(experiment) + if num_callbacks > 0: + total_wait = sum([cb[1] for cb in experiment]) + latency = total_wait / num_callbacks + print(f' latency: {latency}') + print(f' callbacks processed: {num_callbacks}') + print(f'') + + @staticmethod + def stop(): + signal.setitimer(signal.ITIMER_REAL, 0) + plot_results(Nemesis.results, Nemesis.filename) + print(f"Wrote {Nemesis.filename}") + + @staticmethod + def _start_experiment(coro, speedup): + Nemesis.prev_coro = defaultdict(lambda: None) + Nemesis.experiment_coro = coro + Nemesis.experiment_spdp = speedup + Nemesis.experiment_time = 0 + + loops = Nemesis._get_event_loops() + for loop in loops: + if not isinstance(loop, CausalEventLoop): + raise RuntimeError("Nemesis requires a custom event loop to insert slowdowns. It does not work on programs which change the event loop policy.") + loop.set_speedup(speedup) + + Nemesis.experiment_data = Experiment(loops) + + @staticmethod + def _stop_experiment(): + if Nemesis.experiment_data is not None: + loops = Nemesis.experiment_data.get_loops() + + latency = [] + virtual_run_time = [] + for loop in loops: + latency.extend(loop.get_completed_coros()) + + pause_time = loop.get_pause_time() + virtual_run_time.append(Nemesis.experiment_time - pause_time) + + results = { + "latency": latency, + "virtual_run_time": virtual_run_time, + } + + print(f'Ran {Nemesis.experiment_coro} at {Nemesis.experiment_spdp} speed') + Nemesis.results[Nemesis.experiment_coro][Nemesis.experiment_spdp].append(results) + del Nemesis.experiment_data + + @staticmethod + def _signal_handler(sig, frame): + curr_sample = time.perf_counter() + passed_time = curr_sample - Nemesis.last_sample + Nemesis.last_sample = curr_sample + + if getattr(Nemesis, 'experiment_data', None): + loops = Nemesis.experiment_data.get_loops() + exp_coro = Nemesis.experiment_coro + for loop in loops: + coro = Nemesis._get_current_coro(loop) + prev_coro = Nemesis.prev_coro[loop] + if not prev_coro == coro: + if prev_coro == exp_coro: + loop.ping_exit_coro() + elif coro == exp_coro: + loop.ping_enter_coro() + Nemesis.prev_coro[loop] = coro + + loop.update_ready(False) + + Nemesis.experiment_time += passed_time + if (Nemesis.e_duration <= Nemesis.experiment_time): + Nemesis._stop_experiment() + + else: + coros = [] + loops = Nemesis._get_event_loops() + for loop in loops: + coro = Nemesis._get_current_coro(loop) + if coro is not None: + coros.append(coro) + if coros: + Nemesis._start_experiment(random.choice(coros), + Nemesis._select_speedup()) + + @staticmethod + def _parse_handle(handle): + cb = handle._callback + if isinstance(getattr(cb, '__self__', None), asyncio.tasks.Task): + task = cb.__self__ + coro = task.get_coro() + return [task.get_name(), Nemesis._get_coro_name(coro)] + else: + return [str(type(handle).__name__), cb.__name__] + + @staticmethod + def _get_current_coro(loop): + tid = loop._thread_id + assert tid, f"{loop} is not running, yet we attempted to sample it!" + + frame = sys._current_frames().get(tid) + fname = frame.f_code.co_filename + while not Nemesis._should_trace(fname): + if frame: + frame = frame.f_back + else: + break + if frame: + fname = frame.f_code.co_filename + if frame and frame.f_generator: + return frame.f_generator.cr_code.co_name + return None + + @staticmethod + def _get_event_loops(): + '''Returns each thread's event loop, if it exists.''' + loops = [] + for t in threading.enumerate(): + frame = sys._current_frames().get(t.ident) + if frame: + loop = Nemesis._walk_back_until_loop(frame) + if loop and loop not in loops: + loops.append(loop) + return loops + + @staticmethod + def _walk_back_until_loop(frame): + '''Walks back the callstack until we are in a method named '_run_once'. + If this is ever true, we assume we are in an Asyncio event loop method, + and check to see if the 'self' variable is indeed and instance of + AbstractEventLoop. Return this variable if true.''' + while frame: + if frame.f_code.co_name == '_run_once' and 'self' in frame.f_locals: + loop = frame.f_locals['self'] + if isinstance(loop, asyncio.AbstractEventLoop): + return loop + else: + frame = frame.f_back + return None + + @staticmethod + def _should_trace(filename): + '''Returns FALSE if filename is uninteresting to the user. + Don't depend on this. It kind of sucks.''' + if not filename: + return False + if '/gnu/store' in filename: + return False + if '/usr/local/lib/python' in filename: + return False + if 'site-packages' in filename: + return False + if 'propcache' in filename: + return False + if '.pyx' in filename: + return False + if filename[0] == '<': + return False + if 'nemesis' in filename: + return False + return True + + def _select_speedup(): + ''' + Returns a random speedup between 0% to 100%, in multiples of 5%. + Because a baseline is needed to calculate effect on program + performance, selects a speedup of 0 with 50% probability. + ''' + r1 = random.randint(-19, 20) + return max(0, r1) / 20 + + @staticmethod + def _get_coro_name(coro): + ''' + Stolen from _format_coroutine in cpython/Lib/asyncio/coroutines.py + ''' + # Coroutines compiled with Cython sometimes don't have + # proper __qualname__ or __name__. While that is a bug + # in Cython, asyncio shouldn't crash with an AttributeError + # in its __repr__ functions. + if hasattr(coro, '__qualname__') and coro.__qualname__: + coro_name = coro.__qualname__ + elif hasattr(coro, '__name__') and coro.__name__: + coro_name = coro.__name__ + else: + # Stop masking Cython bugs, expose them in a friendly way. + coro_name = f'<{type(coro).__name__} without __name__>' + return f'{coro_name}()' + +the_globals = { + '__name__': '__main__', + '__doc__': None, + '__package__': None, + '__loader__': globals()['__loader__'], + '__spec__': None, + '__annotations__': {}, + '__builtins__': globals()['__builtins__'], + '__file__': None, + '__cached__': None, +} + + +if __name__ == "__main__": + # parses CLI arguments and facilitates profiler runtime. + parser = argparse.ArgumentParser( + usage='%(prog)s [args] -- prog' + ) + + parser.add_argument('-i', '--interval', + help='The minimum amount of time inbetween \ + samples in seconds.', + metavar='', + type=float, + default=0.01) + parser.add_argument('-e', '--experiment-duration', + help='The performance experiment duration. Defaults to 3 seconds.', + metavar='', + type=float, + default=3) + parser.add_argument('-f', '--filename', + help='The filename to write results to.', + metavar='', + type=str, + default="results.html") + parser.add_argument('prog', + type=str, + nargs='*', + help='Path to the python script and its arguments.') + args = parser.parse_args() + + sys.argv = args.prog + try: + with open(args.prog[0], 'r', encoding='utf-8') as fp: + code = compile(fp.read(), args.prog[0], "exec") + Nemesis(args.experiment_duration, + args.filename, + args.interval).start() + exec(code, the_globals) + Nemesis.stop() + except Exception: + traceback.print_exc() |
