summaryrefslogtreecommitdiff
path: root/nemesis
diff options
context:
space:
mode:
Diffstat (limited to 'nemesis')
-rw-r--r--nemesis/causal_event_loop.py5
-rw-r--r--nemesis/experiment.py106
-rw-r--r--nemesis/html_gen.py38
-rwxr-xr-xnemesis/nemesis.py188
4 files changed, 172 insertions, 165 deletions
diff --git a/nemesis/causal_event_loop.py b/nemesis/causal_event_loop.py
index 2044850..8514dfc 100644
--- a/nemesis/causal_event_loop.py
+++ b/nemesis/causal_event_loop.py
@@ -72,8 +72,9 @@ class CausalEventLoop(asyncio.SelectorEventLoop):
def _get_time_in_coro(self):
t = self._accumulated_time
- if self._time_entered_coro:
- t += super().time() - self._time_entered_coro
+ prev = self._time_entered_coro
+ if prev:
+ t += super().time() - prev
return t
def _update_ready(self, sampling=False):
diff --git a/nemesis/experiment.py b/nemesis/experiment.py
deleted file mode 100644
index 353d013..0000000
--- a/nemesis/experiment.py
+++ /dev/null
@@ -1,106 +0,0 @@
-from collections import defaultdict, namedtuple
-import asyncio
-import sys
-import threading
-from causal_event_loop import CausalEventLoop
-
-class BadLoopTypeException(Exception):
- pass
-
-class Experiment:
-
- # the selected task for this experiment
- _coro = None
- # the selected speedup for this experiment
- _speedup = None
- # event loops participating in this this experiment
- loops = []
- # a key-value pair where keys represent a handle and values
- # represent the period of time waiting
- _samples = None
-
- # the amount of time required for a handle to be included in
- # a report
- _wait_time_threshold = 0.001
-
- def __init__(self, coro, speedup):
- self._coro = coro
- self._speedup = speedup
- self._samples = defaultdict(lambda: 0)
-
- self._set_loops()
-
- def get_coro(self):
- return self._coro
-
- def get_speedup(self):
- return self._speedup
-
- def get_results(self):
- ret = f"Results for {self._coro} at {self._speedup} times speedup:\n"
- if len(self._samples) > 0:
- ret += (f" {'NAME':<15} {'FUNC/CORO':<45} {'TIDENT':<16} {'SEC':<10}")
- ret += (f"\n {'---':<15} {'---':<45} {'---':<16} {'---':<10}")
- tot = 0
- for key in self._sort_samples(self._samples):
- name = self._trim_to_last_x_chars(key[0], 15)
- ctxt = self._trim_to_last_x_chars(key[1], 45)
- tid = key[2]
- value = round(self._samples[key] / self._speedup, 4)
- tot += value
- if value >= self._wait_time_threshold:
- ret += f"\n {name:<15} {ctxt:<45} {tid:<16} {value:<10}"
- ret += f"\n {'':<79}---"
- ret += f"\n {'':<79}{round(tot, 4)}"
- else:
- ret += " No samples were gathered. (This is odd!)"
- return ret
-
- def get_loops(self):
- return [l for l in self._loops if l.is_running()]
-
- def add_handles(self, handles, loop, time):
- for h in handles:
- h.append(loop._thread_id)
- self._samples[tuple(h)] += time
-
- def _set_loops(self):
- self._loops = self._get_event_loops()
- for l in self._loops:
- if not isinstance(l, CausalEventLoop):
- raise BadLoopTypeException("Nemesis requires a custom event loop to insert slowdowns. It does not work on programs which change the event loop policy.")
- l._set_dilation(1.0 / self._speedup)
-
- def _get_event_loops(self):
- '''Returns each thread's event loop, if it exists.'''
- loops = []
- for t in threading.enumerate():
- frame = sys._current_frames().get(t.ident)
- if frame:
- loop = self._walk_back_until_loop(frame)
- if loop and loop not in loops:
- loops.append(loop)
- return loops
-
- def _walk_back_until_loop(self, frame):
- '''Walks back the callstack until we are in a method named '_run_once'.
- If this is ever true, we assume we are in an Asyncio event loop method,
- and check to see if the 'self' variable is indeed and instance of
- AbstractEventLoop. Return this variable if true.'''
- while frame:
- if frame.f_code.co_name == '_run_once' and 'self' in frame.f_locals:
- loop = frame.f_locals['self']
- if isinstance(loop, asyncio.AbstractEventLoop):
- return loop
- else:
- frame = frame.f_back
- return None
-
- def _sort_samples(self, sample_dict):
- '''Returns SAMPLE_DICT in descending order by number of samples.'''
- return {k: v for k, v in sorted(sample_dict.items(),
- key=lambda item: item[1],
- reverse=True)}
-
- def _trim_to_last_x_chars(self, string, x):
- return string[-x:] if len(string) >= x else string
diff --git a/nemesis/html_gen.py b/nemesis/html_gen.py
new file mode 100644
index 0000000..ea7df15
--- /dev/null
+++ b/nemesis/html_gen.py
@@ -0,0 +1,38 @@
+import plotly.graph_objects as go
+from plotly.subplots import make_subplots
+
+def plot_results(results, filename):
+ fig = make_subplots(rows=len(results), cols=1, shared_xaxes=True)
+
+ for i, (coro_name, x_values) in enumerate(results.items(), start=1):
+ x_list = []
+ y_list = []
+ hover_text = []
+
+ for speedup, experiments in x_values.items():
+ for experiment in experiments:
+ y_value = sum(experiment.values())
+
+ x_list.append(speedup)
+ y_list.append(y_value)
+
+ breakdown = "<br>".join([f" {key[0]} ({key[1]}): {round(value, 4)}" for key, value in experiment.items()])
+ hover_text.append(f"{coro_name}<br>Speedup: {speedup}<br>Total Wait: {round(y_value, 4)}<br>Breakdown:<br>{breakdown}")
+
+ fig.add_trace(go.Scatter(
+ x=x_list,
+ y=y_list,
+ mode='markers',
+ name=coro_name,
+ hoverinfo='text',
+ hovertext=hover_text,
+ ))
+
+ fig.update_layout(
+ title="Potential Speedups for ",
+ xaxis_title="Speedup (times faster)",
+ yaxis_title="Total Wait (seconds)",
+ showlegend=True,
+ )
+
+ fig.write_html(filename)
diff --git a/nemesis/nemesis.py b/nemesis/nemesis.py
index c20d68d..46e8094 100755
--- a/nemesis/nemesis.py
+++ b/nemesis/nemesis.py
@@ -25,17 +25,39 @@ Commentary:
Code:
'''
+from causal_event_loop import CausalEventLoop
from collections import defaultdict
-from experiment import Experiment
+from html_gen import plot_results
import argparse
import asyncio
import os
+import random
import signal
import sys
+import threading
import time
import traceback
import types
+class Experiment:
+
+ # event loops participating in this this experiment
+ _loops = []
+ # a key-value pair where keys represent a handle and values
+ # represent the period of time waiting
+ samples = None
+
+ def __init__(self, loops):
+ self._loops = loops
+ self.samples = defaultdict(lambda: 0)
+
+ def get_loops(self):
+ return [l for l in self._loops if l.is_running()]
+
+ def add_handles(self, handles, loop, time):
+ for h in handles:
+ h.append(loop._thread_id)
+ self.samples[tuple(h)] += time
class Nemesis(object):
@@ -43,29 +65,35 @@ class Nemesis(object):
signal_interval = 0.0
# the timestamp which the last sample was taken
last_sample = None
+
# the current experiment being run
- curr_experiment = None
- # results from previous experiments, represented as strings
- results = []
- # The duration of each performance experiment
+ experiment_data = None
+ # the coroutine the current experiment is speeding up
+ experiment_coro = None
+ # the speedup of the current experiment
+ experiment_spdp = None
+
+ # results from previous experiments. Keys represent names of coroutines.
+ results = defaultdict(lambda: defaultdict(lambda: []))
+
+ # the file to write results to
+ filename = None
+
+ # The base duration of each performance experiment
e_duration = None
+
# The number of seconds remaining in this performance experiment.
r_duration = None
+
# A mapping of event loops to the previous running coroutine.
prev_coro = defaultdict(lambda: None)
- # temp
- coro = None
- dilation = 1.0
-
@staticmethod
- def __init__(coro, speedup, e_duration, w_time, signal_interval=0.01):
+ def __init__(e_duration, filename, signal_interval=0.01):
Nemesis.signal_interval = signal_interval
Nemesis.e_duration = e_duration
+ Nemesis.filename = filename
Nemesis.r_duration = 0
- # temporary
- Nemesis.coro = coro
- Nemesis.speedup = max(speedup, 1.0)
@staticmethod
def start():
@@ -79,32 +107,42 @@ class Nemesis(object):
@staticmethod
def stop():
signal.setitimer(signal.ITIMER_REAL, 0)
- Nemesis._stop_experiment()
- for r in Nemesis.results:
- print()
- print(r)
+ plot_results(Nemesis.results, Nemesis.filename)
+ print(f"Wrote {Nemesis.filename}")
@staticmethod
- def _start_experiment():
- Nemesis.r_duration = Nemesis.e_duration
+ def _start_experiment(coro, speedup):
+ Nemesis.r_duration = Nemesis.e_duration * speedup
Nemesis.prev_coro = defaultdict(lambda: None)
- Nemesis.curr_experiment = Experiment(Nemesis.coro, Nemesis.speedup)
+ Nemesis.experiment_coro = coro
+ Nemesis.experiment_spdp = speedup
+
+ loops = Nemesis._get_event_loops()
+ for l in loops:
+ if not isinstance(l, CausalEventLoop):
+ raise RuntimeException("Nemesis requires a custom event loop to insert slowdowns. It does not work on programs which change the event loop policy.")
+ l._set_dilation(1.0 / speedup)
+
+ Nemesis.experiment_data = Experiment(loops)
@staticmethod
def _stop_experiment():
- if Nemesis.curr_experiment is not None:
- print(f'finished running {Nemesis.curr_experiment.get_coro()} with speedup {Nemesis.curr_experiment.get_speedup()}')
- Nemesis.results.append(Nemesis.curr_experiment.get_results())
- del Nemesis.curr_experiment
+ if Nemesis.experiment_data is not None:
+ print(f'finished running experiment on {Nemesis.experiment_coro}')
+ results = Nemesis.experiment_data.samples
+ Nemesis.results[Nemesis.experiment_coro][Nemesis.experiment_spdp].append(results)
+ del Nemesis.experiment_data
@staticmethod
def _signal_handler(sig, frame):
curr_sample = time.perf_counter()
passed_time = curr_sample - Nemesis.last_sample
Nemesis.last_sample = curr_sample
- if Nemesis.curr_experiment:
- loops = Nemesis.curr_experiment.get_loops()
- exp_coro = Nemesis.curr_experiment.get_coro()
+
+
+ if getattr(Nemesis, 'experiment_data', None):
+ loops = Nemesis.experiment_data.get_loops()
+ exp_coro = Nemesis.experiment_coro
for loop in loops:
coro = Nemesis._get_current_coro(loop)
prev_coro = Nemesis.prev_coro[loop]
@@ -117,31 +155,42 @@ class Nemesis(object):
loop._update_ready(True)
handles = Nemesis._get_waiting_handles(loop)
- Nemesis.curr_experiment.add_handles(handles, loop, passed_time)
+ Nemesis.experiment_data.add_handles(handles, loop, passed_time)
- Nemesis.r_duration -= passed_time
- if (Nemesis.r_duration <= 0):
- Nemesis._stop_experiment()
- Nemesis._start_experiment()
+ Nemesis.r_duration -= passed_time
+ if (Nemesis.r_duration <= 0):
+ Nemesis._stop_experiment()
+ else:
+ coros = []
+ loops = Nemesis._get_event_loops()
+ for loop in loops:
+ coro = Nemesis._get_current_coro(loop)
+ if coro is not None:
+ coros.append(coro)
+ if coros:
+ Nemesis._start_experiment(random.choice(coros),
+ Nemesis._select_speedup())
+
+ @staticmethod
def _get_waiting_handles(loop):
handles = []
for handle in loop._ready:
- # no duplicates
handle_info = Nemesis._parse_handle(handle)
- if handle_info not in handles:
- handles.append(handle_info)
+ handles.append(handle_info)
return handles
+ @staticmethod
def _parse_handle(handle):
cb = handle._callback
if isinstance(getattr(cb, '__self__', None), asyncio.tasks.Task):
task = cb.__self__
coro = task.get_coro()
- return [task.get_name(), Nemesis.get_coro_name(coro)]
+ return [task.get_name(), Nemesis._get_coro_name(coro)]
else:
return [str(type(handle).__name__), cb.__name__]
+ @staticmethod
def _get_current_coro(loop):
tid = loop._thread_id
assert tid, f"{loop} is not running, yet we attempted to sample it!"
@@ -160,6 +209,33 @@ class Nemesis(object):
return None
@staticmethod
+ def _get_event_loops():
+ '''Returns each thread's event loop, if it exists.'''
+ loops = []
+ for t in threading.enumerate():
+ frame = sys._current_frames().get(t.ident)
+ if frame:
+ loop = Nemesis._walk_back_until_loop(frame)
+ if loop and loop not in loops:
+ loops.append(loop)
+ return loops
+
+ @staticmethod
+ def _walk_back_until_loop(frame):
+ '''Walks back the callstack until we are in a method named '_run_once'.
+ If this is ever true, we assume we are in an Asyncio event loop method,
+ and check to see if the 'self' variable is indeed and instance of
+ AbstractEventLoop. Return this variable if true.'''
+ while frame:
+ if frame.f_code.co_name == '_run_once' and 'self' in frame.f_locals:
+ loop = frame.f_locals['self']
+ if isinstance(loop, asyncio.AbstractEventLoop):
+ return loop
+ else:
+ frame = frame.f_back
+ return None
+
+ @staticmethod
def _should_trace(filename):
'''Returns FALSE if filename is uninteresting to the user.
Don't depend on this. It kind of sucks.'''
@@ -181,7 +257,17 @@ class Nemesis(object):
return False
return True
- def get_coro_name(coro):
+ def _select_speedup():
+ '''
+ Returns a random speedup between 0% to 100%, in multiples of 5%.
+ Because a baseline is needed to calculate effect on program
+ performance, selects a speedup of 0 with 50% probability.
+ '''
+ r1 = random.randint(-9, 20)
+ return 1 + max(0, r1) / 20
+
+ @staticmethod
+ def _get_coro_name(coro):
'''
Stolen from _format_coroutine in cpython/Lib/asyncio/coroutines.py
'''
@@ -223,26 +309,16 @@ if __name__ == "__main__":
metavar='',
type=float,
default=0.01)
- parser.add_argument('-s', '--speedup',
- help='The amount of virtual speedup. Cannot go below one. Default is 2.0.',
- metavar='',
- type=float,
- default=2.0)
- parser.add_argument('-c', '--task',
- help='The task to virtually speedup.',
- metavar='',
- type=str,
- required=True)
parser.add_argument('-e', '--experiment-duration',
- help='The performance experiment duration. Defaults to 4 seconds.',
+ help='The performance experiment duration. Defaults to 3 seconds.',
metavar='',
type=float,
- default=4)
- parser.add_argument('-w', '--warmup-time',
- help='Amount of time to wait until the first performance experiment. Default is 0 milliseconds',
+ default=3)
+ parser.add_argument('-f', '--filename',
+ help='The filename to write results to.',
metavar='',
- type=float,
- default=0.1)
+ type=str,
+ default="results.html")
parser.add_argument('prog',
type=str,
nargs='*',
@@ -253,10 +329,8 @@ if __name__ == "__main__":
try:
with open(args.prog[0], 'r', encoding='utf-8') as fp:
code = compile(fp.read(), args.prog[0], "exec")
- Nemesis(args.task,
- args.speedup,
- args.experiment_duration,
- args.warmup_time,
+ Nemesis(args.experiment_duration,
+ args.filename,
args.interval).start()
exec(code, the_globals)
Nemesis.stop()