diff options
Diffstat (limited to 'nemesis/nemesis.py')
-rwxr-xr-x | nemesis/nemesis.py | 188 |
1 files changed, 131 insertions, 57 deletions
diff --git a/nemesis/nemesis.py b/nemesis/nemesis.py index c20d68d..46e8094 100755 --- a/nemesis/nemesis.py +++ b/nemesis/nemesis.py @@ -25,17 +25,39 @@ Commentary: Code: ''' +from causal_event_loop import CausalEventLoop from collections import defaultdict -from experiment import Experiment +from html_gen import plot_results import argparse import asyncio import os +import random import signal import sys +import threading import time import traceback import types +class Experiment: + + # event loops participating in this this experiment + _loops = [] + # a key-value pair where keys represent a handle and values + # represent the period of time waiting + samples = None + + def __init__(self, loops): + self._loops = loops + self.samples = defaultdict(lambda: 0) + + def get_loops(self): + return [l for l in self._loops if l.is_running()] + + def add_handles(self, handles, loop, time): + for h in handles: + h.append(loop._thread_id) + self.samples[tuple(h)] += time class Nemesis(object): @@ -43,29 +65,35 @@ class Nemesis(object): signal_interval = 0.0 # the timestamp which the last sample was taken last_sample = None + # the current experiment being run - curr_experiment = None - # results from previous experiments, represented as strings - results = [] - # The duration of each performance experiment + experiment_data = None + # the coroutine the current experiment is speeding up + experiment_coro = None + # the speedup of the current experiment + experiment_spdp = None + + # results from previous experiments. Keys represent names of coroutines. + results = defaultdict(lambda: defaultdict(lambda: [])) + + # the file to write results to + filename = None + + # The base duration of each performance experiment e_duration = None + # The number of seconds remaining in this performance experiment. r_duration = None + # A mapping of event loops to the previous running coroutine. prev_coro = defaultdict(lambda: None) - # temp - coro = None - dilation = 1.0 - @staticmethod - def __init__(coro, speedup, e_duration, w_time, signal_interval=0.01): + def __init__(e_duration, filename, signal_interval=0.01): Nemesis.signal_interval = signal_interval Nemesis.e_duration = e_duration + Nemesis.filename = filename Nemesis.r_duration = 0 - # temporary - Nemesis.coro = coro - Nemesis.speedup = max(speedup, 1.0) @staticmethod def start(): @@ -79,32 +107,42 @@ class Nemesis(object): @staticmethod def stop(): signal.setitimer(signal.ITIMER_REAL, 0) - Nemesis._stop_experiment() - for r in Nemesis.results: - print() - print(r) + plot_results(Nemesis.results, Nemesis.filename) + print(f"Wrote {Nemesis.filename}") @staticmethod - def _start_experiment(): - Nemesis.r_duration = Nemesis.e_duration + def _start_experiment(coro, speedup): + Nemesis.r_duration = Nemesis.e_duration * speedup Nemesis.prev_coro = defaultdict(lambda: None) - Nemesis.curr_experiment = Experiment(Nemesis.coro, Nemesis.speedup) + Nemesis.experiment_coro = coro + Nemesis.experiment_spdp = speedup + + loops = Nemesis._get_event_loops() + for l in loops: + if not isinstance(l, CausalEventLoop): + raise RuntimeException("Nemesis requires a custom event loop to insert slowdowns. It does not work on programs which change the event loop policy.") + l._set_dilation(1.0 / speedup) + + Nemesis.experiment_data = Experiment(loops) @staticmethod def _stop_experiment(): - if Nemesis.curr_experiment is not None: - print(f'finished running {Nemesis.curr_experiment.get_coro()} with speedup {Nemesis.curr_experiment.get_speedup()}') - Nemesis.results.append(Nemesis.curr_experiment.get_results()) - del Nemesis.curr_experiment + if Nemesis.experiment_data is not None: + print(f'finished running experiment on {Nemesis.experiment_coro}') + results = Nemesis.experiment_data.samples + Nemesis.results[Nemesis.experiment_coro][Nemesis.experiment_spdp].append(results) + del Nemesis.experiment_data @staticmethod def _signal_handler(sig, frame): curr_sample = time.perf_counter() passed_time = curr_sample - Nemesis.last_sample Nemesis.last_sample = curr_sample - if Nemesis.curr_experiment: - loops = Nemesis.curr_experiment.get_loops() - exp_coro = Nemesis.curr_experiment.get_coro() + + + if getattr(Nemesis, 'experiment_data', None): + loops = Nemesis.experiment_data.get_loops() + exp_coro = Nemesis.experiment_coro for loop in loops: coro = Nemesis._get_current_coro(loop) prev_coro = Nemesis.prev_coro[loop] @@ -117,31 +155,42 @@ class Nemesis(object): loop._update_ready(True) handles = Nemesis._get_waiting_handles(loop) - Nemesis.curr_experiment.add_handles(handles, loop, passed_time) + Nemesis.experiment_data.add_handles(handles, loop, passed_time) - Nemesis.r_duration -= passed_time - if (Nemesis.r_duration <= 0): - Nemesis._stop_experiment() - Nemesis._start_experiment() + Nemesis.r_duration -= passed_time + if (Nemesis.r_duration <= 0): + Nemesis._stop_experiment() + else: + coros = [] + loops = Nemesis._get_event_loops() + for loop in loops: + coro = Nemesis._get_current_coro(loop) + if coro is not None: + coros.append(coro) + if coros: + Nemesis._start_experiment(random.choice(coros), + Nemesis._select_speedup()) + + @staticmethod def _get_waiting_handles(loop): handles = [] for handle in loop._ready: - # no duplicates handle_info = Nemesis._parse_handle(handle) - if handle_info not in handles: - handles.append(handle_info) + handles.append(handle_info) return handles + @staticmethod def _parse_handle(handle): cb = handle._callback if isinstance(getattr(cb, '__self__', None), asyncio.tasks.Task): task = cb.__self__ coro = task.get_coro() - return [task.get_name(), Nemesis.get_coro_name(coro)] + return [task.get_name(), Nemesis._get_coro_name(coro)] else: return [str(type(handle).__name__), cb.__name__] + @staticmethod def _get_current_coro(loop): tid = loop._thread_id assert tid, f"{loop} is not running, yet we attempted to sample it!" @@ -160,6 +209,33 @@ class Nemesis(object): return None @staticmethod + def _get_event_loops(): + '''Returns each thread's event loop, if it exists.''' + loops = [] + for t in threading.enumerate(): + frame = sys._current_frames().get(t.ident) + if frame: + loop = Nemesis._walk_back_until_loop(frame) + if loop and loop not in loops: + loops.append(loop) + return loops + + @staticmethod + def _walk_back_until_loop(frame): + '''Walks back the callstack until we are in a method named '_run_once'. + If this is ever true, we assume we are in an Asyncio event loop method, + and check to see if the 'self' variable is indeed and instance of + AbstractEventLoop. Return this variable if true.''' + while frame: + if frame.f_code.co_name == '_run_once' and 'self' in frame.f_locals: + loop = frame.f_locals['self'] + if isinstance(loop, asyncio.AbstractEventLoop): + return loop + else: + frame = frame.f_back + return None + + @staticmethod def _should_trace(filename): '''Returns FALSE if filename is uninteresting to the user. Don't depend on this. It kind of sucks.''' @@ -181,7 +257,17 @@ class Nemesis(object): return False return True - def get_coro_name(coro): + def _select_speedup(): + ''' + Returns a random speedup between 0% to 100%, in multiples of 5%. + Because a baseline is needed to calculate effect on program + performance, selects a speedup of 0 with 50% probability. + ''' + r1 = random.randint(-9, 20) + return 1 + max(0, r1) / 20 + + @staticmethod + def _get_coro_name(coro): ''' Stolen from _format_coroutine in cpython/Lib/asyncio/coroutines.py ''' @@ -223,26 +309,16 @@ if __name__ == "__main__": metavar='', type=float, default=0.01) - parser.add_argument('-s', '--speedup', - help='The amount of virtual speedup. Cannot go below one. Default is 2.0.', - metavar='', - type=float, - default=2.0) - parser.add_argument('-c', '--task', - help='The task to virtually speedup.', - metavar='', - type=str, - required=True) parser.add_argument('-e', '--experiment-duration', - help='The performance experiment duration. Defaults to 4 seconds.', + help='The performance experiment duration. Defaults to 3 seconds.', metavar='', type=float, - default=4) - parser.add_argument('-w', '--warmup-time', - help='Amount of time to wait until the first performance experiment. Default is 0 milliseconds', + default=3) + parser.add_argument('-f', '--filename', + help='The filename to write results to.', metavar='', - type=float, - default=0.1) + type=str, + default="results.html") parser.add_argument('prog', type=str, nargs='*', @@ -253,10 +329,8 @@ if __name__ == "__main__": try: with open(args.prog[0], 'r', encoding='utf-8') as fp: code = compile(fp.read(), args.prog[0], "exec") - Nemesis(args.task, - args.speedup, - args.experiment_duration, - args.warmup_time, + Nemesis(args.experiment_duration, + args.filename, args.interval).start() exec(code, the_globals) Nemesis.stop() |