diff options
author | bd <bdunahu@operationnull.com> | 2025-09-12 18:19:29 -0400 |
---|---|---|
committer | bd <bdunahu@operationnull.com> | 2025-09-12 18:19:29 -0400 |
commit | 006a5441997a0bc42a8a5277ad7c8f4937782002 (patch) | |
tree | 7a2588b249ecffa0cfec22b83e99ec4697e64447 /nemesis | |
parent | 963fd1ac52c8d762f65d1cf19d766507e666a986 (diff) |
Use plotly to visualize experiments, rework experiment/record logic
Diffstat (limited to 'nemesis')
-rw-r--r-- | nemesis/causal_event_loop.py | 5 | ||||
-rw-r--r-- | nemesis/experiment.py | 106 | ||||
-rw-r--r-- | nemesis/html_gen.py | 38 | ||||
-rwxr-xr-x | nemesis/nemesis.py | 188 |
4 files changed, 172 insertions, 165 deletions
diff --git a/nemesis/causal_event_loop.py b/nemesis/causal_event_loop.py index 2044850..8514dfc 100644 --- a/nemesis/causal_event_loop.py +++ b/nemesis/causal_event_loop.py @@ -72,8 +72,9 @@ class CausalEventLoop(asyncio.SelectorEventLoop): def _get_time_in_coro(self): t = self._accumulated_time - if self._time_entered_coro: - t += super().time() - self._time_entered_coro + prev = self._time_entered_coro + if prev: + t += super().time() - prev return t def _update_ready(self, sampling=False): diff --git a/nemesis/experiment.py b/nemesis/experiment.py deleted file mode 100644 index 353d013..0000000 --- a/nemesis/experiment.py +++ /dev/null @@ -1,106 +0,0 @@ -from collections import defaultdict, namedtuple -import asyncio -import sys -import threading -from causal_event_loop import CausalEventLoop - -class BadLoopTypeException(Exception): - pass - -class Experiment: - - # the selected task for this experiment - _coro = None - # the selected speedup for this experiment - _speedup = None - # event loops participating in this this experiment - loops = [] - # a key-value pair where keys represent a handle and values - # represent the period of time waiting - _samples = None - - # the amount of time required for a handle to be included in - # a report - _wait_time_threshold = 0.001 - - def __init__(self, coro, speedup): - self._coro = coro - self._speedup = speedup - self._samples = defaultdict(lambda: 0) - - self._set_loops() - - def get_coro(self): - return self._coro - - def get_speedup(self): - return self._speedup - - def get_results(self): - ret = f"Results for {self._coro} at {self._speedup} times speedup:\n" - if len(self._samples) > 0: - ret += (f" {'NAME':<15} {'FUNC/CORO':<45} {'TIDENT':<16} {'SEC':<10}") - ret += (f"\n {'---':<15} {'---':<45} {'---':<16} {'---':<10}") - tot = 0 - for key in self._sort_samples(self._samples): - name = self._trim_to_last_x_chars(key[0], 15) - ctxt = self._trim_to_last_x_chars(key[1], 45) - tid = key[2] - value = round(self._samples[key] / self._speedup, 4) - tot += value - if value >= self._wait_time_threshold: - ret += f"\n {name:<15} {ctxt:<45} {tid:<16} {value:<10}" - ret += f"\n {'':<79}---" - ret += f"\n {'':<79}{round(tot, 4)}" - else: - ret += " No samples were gathered. (This is odd!)" - return ret - - def get_loops(self): - return [l for l in self._loops if l.is_running()] - - def add_handles(self, handles, loop, time): - for h in handles: - h.append(loop._thread_id) - self._samples[tuple(h)] += time - - def _set_loops(self): - self._loops = self._get_event_loops() - for l in self._loops: - if not isinstance(l, CausalEventLoop): - raise BadLoopTypeException("Nemesis requires a custom event loop to insert slowdowns. It does not work on programs which change the event loop policy.") - l._set_dilation(1.0 / self._speedup) - - def _get_event_loops(self): - '''Returns each thread's event loop, if it exists.''' - loops = [] - for t in threading.enumerate(): - frame = sys._current_frames().get(t.ident) - if frame: - loop = self._walk_back_until_loop(frame) - if loop and loop not in loops: - loops.append(loop) - return loops - - def _walk_back_until_loop(self, frame): - '''Walks back the callstack until we are in a method named '_run_once'. - If this is ever true, we assume we are in an Asyncio event loop method, - and check to see if the 'self' variable is indeed and instance of - AbstractEventLoop. Return this variable if true.''' - while frame: - if frame.f_code.co_name == '_run_once' and 'self' in frame.f_locals: - loop = frame.f_locals['self'] - if isinstance(loop, asyncio.AbstractEventLoop): - return loop - else: - frame = frame.f_back - return None - - def _sort_samples(self, sample_dict): - '''Returns SAMPLE_DICT in descending order by number of samples.''' - return {k: v for k, v in sorted(sample_dict.items(), - key=lambda item: item[1], - reverse=True)} - - def _trim_to_last_x_chars(self, string, x): - return string[-x:] if len(string) >= x else string diff --git a/nemesis/html_gen.py b/nemesis/html_gen.py new file mode 100644 index 0000000..ea7df15 --- /dev/null +++ b/nemesis/html_gen.py @@ -0,0 +1,38 @@ +import plotly.graph_objects as go +from plotly.subplots import make_subplots + +def plot_results(results, filename): + fig = make_subplots(rows=len(results), cols=1, shared_xaxes=True) + + for i, (coro_name, x_values) in enumerate(results.items(), start=1): + x_list = [] + y_list = [] + hover_text = [] + + for speedup, experiments in x_values.items(): + for experiment in experiments: + y_value = sum(experiment.values()) + + x_list.append(speedup) + y_list.append(y_value) + + breakdown = "<br>".join([f" {key[0]} ({key[1]}): {round(value, 4)}" for key, value in experiment.items()]) + hover_text.append(f"{coro_name}<br>Speedup: {speedup}<br>Total Wait: {round(y_value, 4)}<br>Breakdown:<br>{breakdown}") + + fig.add_trace(go.Scatter( + x=x_list, + y=y_list, + mode='markers', + name=coro_name, + hoverinfo='text', + hovertext=hover_text, + )) + + fig.update_layout( + title="Potential Speedups for ", + xaxis_title="Speedup (times faster)", + yaxis_title="Total Wait (seconds)", + showlegend=True, + ) + + fig.write_html(filename) diff --git a/nemesis/nemesis.py b/nemesis/nemesis.py index c20d68d..46e8094 100755 --- a/nemesis/nemesis.py +++ b/nemesis/nemesis.py @@ -25,17 +25,39 @@ Commentary: Code: ''' +from causal_event_loop import CausalEventLoop from collections import defaultdict -from experiment import Experiment +from html_gen import plot_results import argparse import asyncio import os +import random import signal import sys +import threading import time import traceback import types +class Experiment: + + # event loops participating in this this experiment + _loops = [] + # a key-value pair where keys represent a handle and values + # represent the period of time waiting + samples = None + + def __init__(self, loops): + self._loops = loops + self.samples = defaultdict(lambda: 0) + + def get_loops(self): + return [l for l in self._loops if l.is_running()] + + def add_handles(self, handles, loop, time): + for h in handles: + h.append(loop._thread_id) + self.samples[tuple(h)] += time class Nemesis(object): @@ -43,29 +65,35 @@ class Nemesis(object): signal_interval = 0.0 # the timestamp which the last sample was taken last_sample = None + # the current experiment being run - curr_experiment = None - # results from previous experiments, represented as strings - results = [] - # The duration of each performance experiment + experiment_data = None + # the coroutine the current experiment is speeding up + experiment_coro = None + # the speedup of the current experiment + experiment_spdp = None + + # results from previous experiments. Keys represent names of coroutines. + results = defaultdict(lambda: defaultdict(lambda: [])) + + # the file to write results to + filename = None + + # The base duration of each performance experiment e_duration = None + # The number of seconds remaining in this performance experiment. r_duration = None + # A mapping of event loops to the previous running coroutine. prev_coro = defaultdict(lambda: None) - # temp - coro = None - dilation = 1.0 - @staticmethod - def __init__(coro, speedup, e_duration, w_time, signal_interval=0.01): + def __init__(e_duration, filename, signal_interval=0.01): Nemesis.signal_interval = signal_interval Nemesis.e_duration = e_duration + Nemesis.filename = filename Nemesis.r_duration = 0 - # temporary - Nemesis.coro = coro - Nemesis.speedup = max(speedup, 1.0) @staticmethod def start(): @@ -79,32 +107,42 @@ class Nemesis(object): @staticmethod def stop(): signal.setitimer(signal.ITIMER_REAL, 0) - Nemesis._stop_experiment() - for r in Nemesis.results: - print() - print(r) + plot_results(Nemesis.results, Nemesis.filename) + print(f"Wrote {Nemesis.filename}") @staticmethod - def _start_experiment(): - Nemesis.r_duration = Nemesis.e_duration + def _start_experiment(coro, speedup): + Nemesis.r_duration = Nemesis.e_duration * speedup Nemesis.prev_coro = defaultdict(lambda: None) - Nemesis.curr_experiment = Experiment(Nemesis.coro, Nemesis.speedup) + Nemesis.experiment_coro = coro + Nemesis.experiment_spdp = speedup + + loops = Nemesis._get_event_loops() + for l in loops: + if not isinstance(l, CausalEventLoop): + raise RuntimeException("Nemesis requires a custom event loop to insert slowdowns. It does not work on programs which change the event loop policy.") + l._set_dilation(1.0 / speedup) + + Nemesis.experiment_data = Experiment(loops) @staticmethod def _stop_experiment(): - if Nemesis.curr_experiment is not None: - print(f'finished running {Nemesis.curr_experiment.get_coro()} with speedup {Nemesis.curr_experiment.get_speedup()}') - Nemesis.results.append(Nemesis.curr_experiment.get_results()) - del Nemesis.curr_experiment + if Nemesis.experiment_data is not None: + print(f'finished running experiment on {Nemesis.experiment_coro}') + results = Nemesis.experiment_data.samples + Nemesis.results[Nemesis.experiment_coro][Nemesis.experiment_spdp].append(results) + del Nemesis.experiment_data @staticmethod def _signal_handler(sig, frame): curr_sample = time.perf_counter() passed_time = curr_sample - Nemesis.last_sample Nemesis.last_sample = curr_sample - if Nemesis.curr_experiment: - loops = Nemesis.curr_experiment.get_loops() - exp_coro = Nemesis.curr_experiment.get_coro() + + + if getattr(Nemesis, 'experiment_data', None): + loops = Nemesis.experiment_data.get_loops() + exp_coro = Nemesis.experiment_coro for loop in loops: coro = Nemesis._get_current_coro(loop) prev_coro = Nemesis.prev_coro[loop] @@ -117,31 +155,42 @@ class Nemesis(object): loop._update_ready(True) handles = Nemesis._get_waiting_handles(loop) - Nemesis.curr_experiment.add_handles(handles, loop, passed_time) + Nemesis.experiment_data.add_handles(handles, loop, passed_time) - Nemesis.r_duration -= passed_time - if (Nemesis.r_duration <= 0): - Nemesis._stop_experiment() - Nemesis._start_experiment() + Nemesis.r_duration -= passed_time + if (Nemesis.r_duration <= 0): + Nemesis._stop_experiment() + else: + coros = [] + loops = Nemesis._get_event_loops() + for loop in loops: + coro = Nemesis._get_current_coro(loop) + if coro is not None: + coros.append(coro) + if coros: + Nemesis._start_experiment(random.choice(coros), + Nemesis._select_speedup()) + + @staticmethod def _get_waiting_handles(loop): handles = [] for handle in loop._ready: - # no duplicates handle_info = Nemesis._parse_handle(handle) - if handle_info not in handles: - handles.append(handle_info) + handles.append(handle_info) return handles + @staticmethod def _parse_handle(handle): cb = handle._callback if isinstance(getattr(cb, '__self__', None), asyncio.tasks.Task): task = cb.__self__ coro = task.get_coro() - return [task.get_name(), Nemesis.get_coro_name(coro)] + return [task.get_name(), Nemesis._get_coro_name(coro)] else: return [str(type(handle).__name__), cb.__name__] + @staticmethod def _get_current_coro(loop): tid = loop._thread_id assert tid, f"{loop} is not running, yet we attempted to sample it!" @@ -160,6 +209,33 @@ class Nemesis(object): return None @staticmethod + def _get_event_loops(): + '''Returns each thread's event loop, if it exists.''' + loops = [] + for t in threading.enumerate(): + frame = sys._current_frames().get(t.ident) + if frame: + loop = Nemesis._walk_back_until_loop(frame) + if loop and loop not in loops: + loops.append(loop) + return loops + + @staticmethod + def _walk_back_until_loop(frame): + '''Walks back the callstack until we are in a method named '_run_once'. + If this is ever true, we assume we are in an Asyncio event loop method, + and check to see if the 'self' variable is indeed and instance of + AbstractEventLoop. Return this variable if true.''' + while frame: + if frame.f_code.co_name == '_run_once' and 'self' in frame.f_locals: + loop = frame.f_locals['self'] + if isinstance(loop, asyncio.AbstractEventLoop): + return loop + else: + frame = frame.f_back + return None + + @staticmethod def _should_trace(filename): '''Returns FALSE if filename is uninteresting to the user. Don't depend on this. It kind of sucks.''' @@ -181,7 +257,17 @@ class Nemesis(object): return False return True - def get_coro_name(coro): + def _select_speedup(): + ''' + Returns a random speedup between 0% to 100%, in multiples of 5%. + Because a baseline is needed to calculate effect on program + performance, selects a speedup of 0 with 50% probability. + ''' + r1 = random.randint(-9, 20) + return 1 + max(0, r1) / 20 + + @staticmethod + def _get_coro_name(coro): ''' Stolen from _format_coroutine in cpython/Lib/asyncio/coroutines.py ''' @@ -223,26 +309,16 @@ if __name__ == "__main__": metavar='', type=float, default=0.01) - parser.add_argument('-s', '--speedup', - help='The amount of virtual speedup. Cannot go below one. Default is 2.0.', - metavar='', - type=float, - default=2.0) - parser.add_argument('-c', '--task', - help='The task to virtually speedup.', - metavar='', - type=str, - required=True) parser.add_argument('-e', '--experiment-duration', - help='The performance experiment duration. Defaults to 4 seconds.', + help='The performance experiment duration. Defaults to 3 seconds.', metavar='', type=float, - default=4) - parser.add_argument('-w', '--warmup-time', - help='Amount of time to wait until the first performance experiment. Default is 0 milliseconds', + default=3) + parser.add_argument('-f', '--filename', + help='The filename to write results to.', metavar='', - type=float, - default=0.1) + type=str, + default="results.html") parser.add_argument('prog', type=str, nargs='*', @@ -253,10 +329,8 @@ if __name__ == "__main__": try: with open(args.prog[0], 'r', encoding='utf-8') as fp: code = compile(fp.read(), args.prog[0], "exec") - Nemesis(args.task, - args.speedup, - args.experiment_duration, - args.warmup_time, + Nemesis(args.experiment_duration, + args.filename, args.interval).start() exec(code, the_globals) Nemesis.stop() |