diff options
Diffstat (limited to 'nemesis/experiment.py')
-rw-r--r-- | nemesis/experiment.py | 92 |
1 files changed, 92 insertions, 0 deletions
diff --git a/nemesis/experiment.py b/nemesis/experiment.py new file mode 100644 index 0000000..87a0103 --- /dev/null +++ b/nemesis/experiment.py @@ -0,0 +1,92 @@ +from collections import defaultdict, namedtuple +import asyncio +import sys +import threading +from causal_event_loop import CausalEventLoop + +class BadLoopTypeException(Exception): + pass + +class Experiment: + + # the selected task for this experiment + _task = None + # the selected speedup for this experiment + _speedup = None + # event loops participating in this this experiment + loops = [] + # a key-value pair where keys represent a handle and values + # represent number of times sampled. + _samples = None + + def __init__(self, task, speedup): + self._task = task + self._speedup = speedup + self._samples = defaultdict(lambda: 0) + + self._set_loops() + + def get_task(self): + return self._task + + def get_speedup(self): + return self._speedup + + def get_results(self): + ret = f"Results for {self._task} at {self._speedup} times speedup:\n" + if len(self._samples) > 0: + ret += (f" {'HANDLE':<30} {'LOOP':<10} {'SEC':<10}") + for key in self._sort_samples(self._samples): + ret += f"\n {self._get_sample_str(key)}" + else: + ret += " No samples were gathered. (This is odd!)" + return ret + + def get_loops(self): + return [l for l in self._loops if l.is_running()] + + def add_handles(self, handles, loop, time): + for h in handles: + self._samples[h.__str__()] += time + + def _set_loops(self): + self._loops = self._get_event_loops() + for l in self._loops: + if not isinstance(l, CausalEventLoop): + raise BadLoopTypeException("Nemesis requires a custom event loop to insert slowdowns. It does not work on programs which change the event loop policy.") + l._set_experiment(self._task, 1.0 / self._speedup) + + def _get_event_loops(self): + '''Returns each thread's event loop, if it exists.''' + loops = [] + for t in threading.enumerate(): + frame = sys._current_frames().get(t.ident) + if frame: + loop = self._walk_back_until_loop(frame) + if loop and loop not in loops: + loops.append(loop) + return loops + + def _walk_back_until_loop(self, frame): + '''Walks back the callstack until we are in a method named '_run_once'. + If this is ever true, we assume we are in an Asyncio event loop method, + and check to see if the 'self' variable is indeed and instance of + AbstractEventLoop. Return this variable if true.''' + while frame: + if frame.f_code.co_name == '_run_once' and 'self' in frame.f_locals: + loop = frame.f_locals['self'] + if isinstance(loop, asyncio.AbstractEventLoop): + return loop + else: + frame = frame.f_back + return None + + def _get_sample_str(self, key): + value = self._samples[key] / self._speedup + return f"{key:29} {value:10}" + + def _sort_samples(self, sample_dict): + '''Returns SAMPLE_DICT in descending order by number of samples.''' + return {k: v for k, v in sorted(sample_dict.items(), + key=lambda item: item[1], + reverse=True)} |