from collections import defaultdict, namedtuple import asyncio import sys import threading from causal_event_loop import CausalEventLoop class BadLoopTypeException(Exception): pass class Experiment: # the selected task for this experiment _coro = None # the selected speedup for this experiment _speedup = None # event loops participating in this this experiment loops = [] # a key-value pair where keys represent a handle and values # represent number of times sampled. _samples = None def __init__(self, coro, speedup): self._coro = coro self._speedup = speedup self._samples = defaultdict(lambda: 0) self._set_loops() def get_coro(self): return self._coro def get_speedup(self): return self._speedup def get_results(self): ret = f"Results for {self._coro} at {self._speedup} times speedup:\n" if len(self._samples) > 0: ret += (f" {'HANDLE':<30} {'LOOP':<10} {'SEC':<10}") for key in self._sort_samples(self._samples): ret += f"\n {self._get_sample_str(key)}" else: ret += " No samples were gathered. (This is odd!)" return ret def get_loops(self): return [l for l in self._loops if l.is_running()] def add_handles(self, handles, loop, time): for h in handles: self._samples[h.__str__()] += time def _set_loops(self): self._loops = self._get_event_loops() for l in self._loops: if not isinstance(l, CausalEventLoop): raise BadLoopTypeException("Nemesis requires a custom event loop to insert slowdowns. It does not work on programs which change the event loop policy.") l._set_dilation(1.0 / self._speedup) def _get_event_loops(self): '''Returns each thread's event loop, if it exists.''' loops = [] for t in threading.enumerate(): frame = sys._current_frames().get(t.ident) if frame: loop = self._walk_back_until_loop(frame) if loop and loop not in loops: loops.append(loop) return loops def _walk_back_until_loop(self, frame): '''Walks back the callstack until we are in a method named '_run_once'. If this is ever true, we assume we are in an Asyncio event loop method, and check to see if the 'self' variable is indeed and instance of AbstractEventLoop. Return this variable if true.''' while frame: if frame.f_code.co_name == '_run_once' and 'self' in frame.f_locals: loop = frame.f_locals['self'] if isinstance(loop, asyncio.AbstractEventLoop): return loop else: frame = frame.f_back return None def _get_sample_str(self, key): value = self._samples[key] / self._speedup return f"{key:29} {value:10}" def _sort_samples(self, sample_dict): '''Returns SAMPLE_DICT in descending order by number of samples.''' return {k: v for k, v in sorted(sample_dict.items(), key=lambda item: item[1], reverse=True)}