summaryrefslogtreecommitdiff
path: root/nemesis/nemesis.py
diff options
context:
space:
mode:
authorbd <bdunahu@operationnull.com>2025-10-06 18:27:09 -0400
committerbd <bdunahu@operationnull.com>2025-10-06 18:27:09 -0400
commit2edc08465723f444a1ef4108d41bac852f7be88a (patch)
tree53f5d1c4eca459c0c9784844b3d8c80bc4b03287 /nemesis/nemesis.py
initial commit
Diffstat (limited to 'nemesis/nemesis.py')
-rwxr-xr-xnemesis/nemesis.py345
1 files changed, 345 insertions, 0 deletions
diff --git a/nemesis/nemesis.py b/nemesis/nemesis.py
new file mode 100755
index 0000000..27e93de
--- /dev/null
+++ b/nemesis/nemesis.py
@@ -0,0 +1,345 @@
+#!/usr/bin/env python3
+'''
+ _/ _/ _/
+ _/_/ _/ _/_/ _/_/_/ _/_/ _/_/ _/_/_/ _/_/_/
+ _/ _/ _/ _/_/_/_/ _/ _/ _/ _/_/_/_/ _/_/ _/ _/_/
+ _/ _/_/ _/ _/ _/ _/ _/ _/_/ _/ _/_/
+ _/ _/ _/_/_/ _/ _/ _/ _/_/_/ _/_/_/ _/ _/_/_/
+
+
+Copyright 2025 bdunahu
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+
+Commentary:
+Code:
+'''
+
+from causal_event_loop import CausalEventLoop
+from collections import defaultdict
+from html_gen import plot_results
+import argparse
+import asyncio
+import os
+import random
+import signal
+import sys
+import threading
+import time
+import traceback
+import types
+
+class Experiment:
+
+ # event loops participating in this this experiment
+ _loops = []
+
+ def __init__(self, loops):
+ self._loops = loops
+
+ def get_loops(self):
+ return [l for l in self._loops if l.is_running()]
+
+class Nemesis(object):
+
+ # the (ideal) interval between samples
+ signal_interval = 0.0
+ # the timestamp which the last sample was taken
+ last_sample = None
+
+ # the current experiment being run
+ experiment_data = None
+ # the coroutine the current experiment is speeding up
+ experiment_coro = None
+ # the speedup of the current experiment
+ experiment_spdp = None
+ # the total time this experiment has been running
+ experiment_time = None
+
+ # results from previous experiments. Keys represent names of coroutines.
+ results = defaultdict(lambda: defaultdict(lambda: []))
+
+ # the file to write results to
+ filename = None
+
+ # The base duration of each performance experiment
+ e_duration = None
+
+ # A mapping of event loops to the previous running coroutine.
+ prev_coro = defaultdict(lambda: None)
+
+ @staticmethod
+ def __init__(e_duration, filename, signal_interval=0.01):
+ Nemesis.signal_interval = signal_interval
+ Nemesis.e_duration = e_duration
+ Nemesis.filename = filename
+
+ @staticmethod
+ def start():
+ Nemesis.last_sample = time.perf_counter()
+ signal.signal(signal.SIGALRM,
+ Nemesis._signal_handler)
+ signal.setitimer(signal.ITIMER_REAL,
+ Nemesis.signal_interval,
+ Nemesis.signal_interval)
+
+ @staticmethod
+ def print_results():
+ for coro_name, x_values in Nemesis.results.items():
+ print(f'Results for {coro_name:}')
+ for speedup, experiments in x_values.items():
+ print(f' {speedup * 100}% speedup:')
+ for experiment in experiments:
+ num_callbacks = len(experiment)
+ if num_callbacks > 0:
+ total_wait = sum([cb[1] for cb in experiment])
+ latency = total_wait / num_callbacks
+ print(f' latency: {latency}')
+ print(f' callbacks processed: {num_callbacks}')
+ print(f'')
+
+ @staticmethod
+ def stop():
+ signal.setitimer(signal.ITIMER_REAL, 0)
+ plot_results(Nemesis.results, Nemesis.filename)
+ print(f"Wrote {Nemesis.filename}")
+
+ @staticmethod
+ def _start_experiment(coro, speedup):
+ Nemesis.prev_coro = defaultdict(lambda: None)
+ Nemesis.experiment_coro = coro
+ Nemesis.experiment_spdp = speedup
+ Nemesis.experiment_time = 0
+
+ loops = Nemesis._get_event_loops()
+ for loop in loops:
+ if not isinstance(loop, CausalEventLoop):
+ raise RuntimeError("Nemesis requires a custom event loop to insert slowdowns. It does not work on programs which change the event loop policy.")
+ loop.set_speedup(speedup)
+
+ Nemesis.experiment_data = Experiment(loops)
+
+ @staticmethod
+ def _stop_experiment():
+ if Nemesis.experiment_data is not None:
+ loops = Nemesis.experiment_data.get_loops()
+
+ latency = []
+ virtual_run_time = []
+ for loop in loops:
+ latency.extend(loop.get_completed_coros())
+
+ pause_time = loop.get_pause_time()
+ virtual_run_time.append(Nemesis.experiment_time - pause_time)
+
+ results = {
+ "latency": latency,
+ "virtual_run_time": virtual_run_time,
+ }
+
+ print(f'Ran {Nemesis.experiment_coro} at {Nemesis.experiment_spdp} speed')
+ Nemesis.results[Nemesis.experiment_coro][Nemesis.experiment_spdp].append(results)
+ del Nemesis.experiment_data
+
+ @staticmethod
+ def _signal_handler(sig, frame):
+ curr_sample = time.perf_counter()
+ passed_time = curr_sample - Nemesis.last_sample
+ Nemesis.last_sample = curr_sample
+
+ if getattr(Nemesis, 'experiment_data', None):
+ loops = Nemesis.experiment_data.get_loops()
+ exp_coro = Nemesis.experiment_coro
+ for loop in loops:
+ coro = Nemesis._get_current_coro(loop)
+ prev_coro = Nemesis.prev_coro[loop]
+ if not prev_coro == coro:
+ if prev_coro == exp_coro:
+ loop.ping_exit_coro()
+ elif coro == exp_coro:
+ loop.ping_enter_coro()
+ Nemesis.prev_coro[loop] = coro
+
+ loop.update_ready(False)
+
+ Nemesis.experiment_time += passed_time
+ if (Nemesis.e_duration <= Nemesis.experiment_time):
+ Nemesis._stop_experiment()
+
+ else:
+ coros = []
+ loops = Nemesis._get_event_loops()
+ for loop in loops:
+ coro = Nemesis._get_current_coro(loop)
+ if coro is not None:
+ coros.append(coro)
+ if coros:
+ Nemesis._start_experiment(random.choice(coros),
+ Nemesis._select_speedup())
+
+ @staticmethod
+ def _parse_handle(handle):
+ cb = handle._callback
+ if isinstance(getattr(cb, '__self__', None), asyncio.tasks.Task):
+ task = cb.__self__
+ coro = task.get_coro()
+ return [task.get_name(), Nemesis._get_coro_name(coro)]
+ else:
+ return [str(type(handle).__name__), cb.__name__]
+
+ @staticmethod
+ def _get_current_coro(loop):
+ tid = loop._thread_id
+ assert tid, f"{loop} is not running, yet we attempted to sample it!"
+
+ frame = sys._current_frames().get(tid)
+ fname = frame.f_code.co_filename
+ while not Nemesis._should_trace(fname):
+ if frame:
+ frame = frame.f_back
+ else:
+ break
+ if frame:
+ fname = frame.f_code.co_filename
+ if frame and frame.f_generator:
+ return frame.f_generator.cr_code.co_name
+ return None
+
+ @staticmethod
+ def _get_event_loops():
+ '''Returns each thread's event loop, if it exists.'''
+ loops = []
+ for t in threading.enumerate():
+ frame = sys._current_frames().get(t.ident)
+ if frame:
+ loop = Nemesis._walk_back_until_loop(frame)
+ if loop and loop not in loops:
+ loops.append(loop)
+ return loops
+
+ @staticmethod
+ def _walk_back_until_loop(frame):
+ '''Walks back the callstack until we are in a method named '_run_once'.
+ If this is ever true, we assume we are in an Asyncio event loop method,
+ and check to see if the 'self' variable is indeed and instance of
+ AbstractEventLoop. Return this variable if true.'''
+ while frame:
+ if frame.f_code.co_name == '_run_once' and 'self' in frame.f_locals:
+ loop = frame.f_locals['self']
+ if isinstance(loop, asyncio.AbstractEventLoop):
+ return loop
+ else:
+ frame = frame.f_back
+ return None
+
+ @staticmethod
+ def _should_trace(filename):
+ '''Returns FALSE if filename is uninteresting to the user.
+ Don't depend on this. It kind of sucks.'''
+ if not filename:
+ return False
+ if '/gnu/store' in filename:
+ return False
+ if '/usr/local/lib/python' in filename:
+ return False
+ if 'site-packages' in filename:
+ return False
+ if 'propcache' in filename:
+ return False
+ if '.pyx' in filename:
+ return False
+ if filename[0] == '<':
+ return False
+ if 'nemesis' in filename:
+ return False
+ return True
+
+ def _select_speedup():
+ '''
+ Returns a random speedup between 0% to 100%, in multiples of 5%.
+ Because a baseline is needed to calculate effect on program
+ performance, selects a speedup of 0 with 50% probability.
+ '''
+ r1 = random.randint(-19, 20)
+ return max(0, r1) / 20
+
+ @staticmethod
+ def _get_coro_name(coro):
+ '''
+ Stolen from _format_coroutine in cpython/Lib/asyncio/coroutines.py
+ '''
+ # Coroutines compiled with Cython sometimes don't have
+ # proper __qualname__ or __name__. While that is a bug
+ # in Cython, asyncio shouldn't crash with an AttributeError
+ # in its __repr__ functions.
+ if hasattr(coro, '__qualname__') and coro.__qualname__:
+ coro_name = coro.__qualname__
+ elif hasattr(coro, '__name__') and coro.__name__:
+ coro_name = coro.__name__
+ else:
+ # Stop masking Cython bugs, expose them in a friendly way.
+ coro_name = f'<{type(coro).__name__} without __name__>'
+ return f'{coro_name}()'
+
+the_globals = {
+ '__name__': '__main__',
+ '__doc__': None,
+ '__package__': None,
+ '__loader__': globals()['__loader__'],
+ '__spec__': None,
+ '__annotations__': {},
+ '__builtins__': globals()['__builtins__'],
+ '__file__': None,
+ '__cached__': None,
+}
+
+
+if __name__ == "__main__":
+ # parses CLI arguments and facilitates profiler runtime.
+ parser = argparse.ArgumentParser(
+ usage='%(prog)s [args] -- prog'
+ )
+
+ parser.add_argument('-i', '--interval',
+ help='The minimum amount of time inbetween \
+ samples in seconds.',
+ metavar='',
+ type=float,
+ default=0.01)
+ parser.add_argument('-e', '--experiment-duration',
+ help='The performance experiment duration. Defaults to 3 seconds.',
+ metavar='',
+ type=float,
+ default=3)
+ parser.add_argument('-f', '--filename',
+ help='The filename to write results to.',
+ metavar='',
+ type=str,
+ default="results.html")
+ parser.add_argument('prog',
+ type=str,
+ nargs='*',
+ help='Path to the python script and its arguments.')
+ args = parser.parse_args()
+
+ sys.argv = args.prog
+ try:
+ with open(args.prog[0], 'r', encoding='utf-8') as fp:
+ code = compile(fp.read(), args.prog[0], "exec")
+ Nemesis(args.experiment_duration,
+ args.filename,
+ args.interval).start()
+ exec(code, the_globals)
+ Nemesis.stop()
+ except Exception:
+ traceback.print_exc()