1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
|
from collections import defaultdict, namedtuple
import asyncio
import sys
import threading
from causal_event_loop import CausalEventLoop
class BadLoopTypeException(Exception):
pass
class Experiment:
# the selected task for this experiment
_task = None
# the selected speedup for this experiment
_speedup = None
# event loops participating in this this experiment
loops = []
# a key-value pair where keys represent a handle and values
# represent number of times sampled.
_samples = None
def __init__(self, task, speedup):
self._task = task
self._speedup = speedup
self._samples = defaultdict(lambda: 0)
self._set_loops()
def get_task(self):
return self._task
def get_speedup(self):
return self._speedup
def get_results(self):
ret = f"Results for {self._task} at {self._speedup} times speedup:\n"
if len(self._samples) > 0:
ret += (f" {'HANDLE':<30} {'LOOP':<10} {'SEC':<10}")
for key in self._sort_samples(self._samples):
ret += f"\n {self._get_sample_str(key)}"
else:
ret += " No samples were gathered. (This is odd!)"
return ret
def get_loops(self):
return [l for l in self._loops if l.is_running()]
def add_handles(self, handles, loop, time):
for h in handles:
self._samples[h.__str__()] += time
def _set_loops(self):
self._loops = self._get_event_loops()
for l in self._loops:
if not isinstance(l, CausalEventLoop):
raise BadLoopTypeException("Nemesis requires a custom event loop to insert slowdowns. It does not work on programs which change the event loop policy.")
l._set_experiment(self._task, 1.0 / self._speedup)
def _get_event_loops(self):
'''Returns each thread's event loop, if it exists.'''
loops = []
for t in threading.enumerate():
frame = sys._current_frames().get(t.ident)
if frame:
loop = self._walk_back_until_loop(frame)
if loop and loop not in loops:
loops.append(loop)
return loops
def _walk_back_until_loop(self, frame):
'''Walks back the callstack until we are in a method named '_run_once'.
If this is ever true, we assume we are in an Asyncio event loop method,
and check to see if the 'self' variable is indeed and instance of
AbstractEventLoop. Return this variable if true.'''
while frame:
if frame.f_code.co_name == '_run_once' and 'self' in frame.f_locals:
loop = frame.f_locals['self']
if isinstance(loop, asyncio.AbstractEventLoop):
return loop
else:
frame = frame.f_back
return None
def _get_sample_str(self, key):
value = self._samples[key] / self._speedup
return f"{key:29} {value:10}"
def _sort_samples(self, sample_dict):
'''Returns SAMPLE_DICT in descending order by number of samples.'''
return {k: v for k, v in sorted(sample_dict.items(),
key=lambda item: item[1],
reverse=True)}
|