1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
|
from collections import defaultdict, namedtuple
import asyncio
import sys
import threading
from causal_event_loop import CausalEventLoop
class BadLoopTypeException(Exception):
pass
class Experiment:
# the selected task for this experiment
_coro = None
# the selected speedup for this experiment
_speedup = None
# event loops participating in this this experiment
loops = []
# a key-value pair where keys represent a handle and values
# represent the period of time waiting
_samples = None
# the amount of time required for a handle to be included in
# a report
_wait_time_threshold = 0.001
def __init__(self, coro, speedup):
self._coro = coro
self._speedup = speedup
self._samples = defaultdict(lambda: 0)
self._set_loops()
def get_coro(self):
return self._coro
def get_speedup(self):
return self._speedup
def get_results(self):
ret = f"Results for {self._coro} at {self._speedup} times speedup:\n"
if len(self._samples) > 0:
ret += (f" {'NAME':<15} {'FUNC/CORO':<45} {'TIDENT':<16} {'SEC':<10}")
ret += (f"\n {'---':<15} {'---':<45} {'---':<16} {'---':<10}")
tot = 0
for key in self._sort_samples(self._samples):
name = self._trim_to_last_x_chars(key[0], 15)
ctxt = self._trim_to_last_x_chars(key[1], 45)
tid = key[2]
value = round(self._samples[key] / self._speedup, 4)
tot += value
if value >= self._wait_time_threshold:
ret += f"\n {name:<15} {ctxt:<45} {tid:<16} {value:<10}"
ret += f"\n {'':<79}---"
ret += f"\n {'':<79}{round(tot, 4)}"
else:
ret += " No samples were gathered. (This is odd!)"
return ret
def get_loops(self):
return [l for l in self._loops if l.is_running()]
def add_handles(self, handles, loop, time):
for h in handles:
h.append(loop._thread_id)
self._samples[tuple(h)] += time
def _set_loops(self):
self._loops = self._get_event_loops()
for l in self._loops:
if not isinstance(l, CausalEventLoop):
raise BadLoopTypeException("Nemesis requires a custom event loop to insert slowdowns. It does not work on programs which change the event loop policy.")
l._set_dilation(1.0 / self._speedup)
def _get_event_loops(self):
'''Returns each thread's event loop, if it exists.'''
loops = []
for t in threading.enumerate():
frame = sys._current_frames().get(t.ident)
if frame:
loop = self._walk_back_until_loop(frame)
if loop and loop not in loops:
loops.append(loop)
return loops
def _walk_back_until_loop(self, frame):
'''Walks back the callstack until we are in a method named '_run_once'.
If this is ever true, we assume we are in an Asyncio event loop method,
and check to see if the 'self' variable is indeed and instance of
AbstractEventLoop. Return this variable if true.'''
while frame:
if frame.f_code.co_name == '_run_once' and 'self' in frame.f_locals:
loop = frame.f_locals['self']
if isinstance(loop, asyncio.AbstractEventLoop):
return loop
else:
frame = frame.f_back
return None
def _sort_samples(self, sample_dict):
'''Returns SAMPLE_DICT in descending order by number of samples.'''
return {k: v for k, v in sorted(sample_dict.items(),
key=lambda item: item[1],
reverse=True)}
def _trim_to_last_x_chars(self, string, x):
return string[-x:] if len(string) >= x else string
|