summaryrefslogtreecommitdiff
path: root/nemesis/experiment.py
blob: 9d179ea3de1c290867a81e27de3cd5dbbfae83bb (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
from collections import defaultdict, namedtuple
import asyncio
import sys
import threading
from causal_event_loop import CausalEventLoop

class BadLoopTypeException(Exception):
    pass

class Experiment:

    # the selected task for this experiment
    _coro = None
    # the selected speedup for this experiment
    _speedup = None
    # event loops participating in this this experiment
    loops = []
    # a key-value pair where keys represent a handle and values
    # represent number of times sampled.
    _samples = None

    def __init__(self, coro, speedup):
        self._coro = coro
        self._speedup = speedup
        self._samples = defaultdict(lambda: 0)

        self._set_loops()

    def get_coro(self):
        return self._coro

    def get_speedup(self):
        return self._speedup

    def get_results(self):
        ret = f"Results for {self._coro} at {self._speedup} times speedup:\n"
        if len(self._samples) > 0:
            ret += (f"	{'HANDLE':<30} {'LOOP':<10} {'SEC':<10}")
            for key in self._sort_samples(self._samples):
                ret += f"\n	{self._get_sample_str(key)}"
        else:
            ret += "	No samples were gathered. (This is odd!)"
        return ret

    def get_loops(self):
        return [l for l in self._loops if l.is_running()]

    def add_handles(self, handles, loop, time):
        for h in handles:
            self._samples[h.__str__()] += time

    def _set_loops(self):
        self._loops = self._get_event_loops()
        for l in self._loops:
            if not isinstance(l, CausalEventLoop):
                raise BadLoopTypeException("Nemesis requires a custom event loop to insert slowdowns. It does not work on programs which change the event loop policy.")
            l._set_dilation(1.0 / self._speedup)

    def _get_event_loops(self):
        '''Returns each thread's event loop, if it exists.'''
        loops = []
        for t in threading.enumerate():
            frame = sys._current_frames().get(t.ident)
            if frame:
                loop = self._walk_back_until_loop(frame)
                if loop and loop not in loops:
                    loops.append(loop)
        return loops

    def _walk_back_until_loop(self, frame):
        '''Walks back the callstack until we are in a method named '_run_once'.
        If this is ever true, we assume we are in an Asyncio event loop method,
        and check to see if the 'self' variable is indeed and instance of
        AbstractEventLoop. Return this variable if true.'''
        while frame:
            if frame.f_code.co_name == '_run_once' and 'self' in frame.f_locals:
                loop = frame.f_locals['self']
                if isinstance(loop, asyncio.AbstractEventLoop):
                    return loop
            else:
                frame = frame.f_back
        return None

    def _get_sample_str(self, key):
        value = self._samples[key] / self._speedup
        return f"{key:29} {value:10}"

    def _sort_samples(self, sample_dict):
        '''Returns SAMPLE_DICT in descending order by number of samples.'''
        return {k: v for k, v in sorted(sample_dict.items(),
                                        key=lambda item: item[1],
                                        reverse=True)}