summaryrefslogtreecommitdiff
path: root/nemesis/nemesis.py
diff options
context:
space:
mode:
authorbd <bdunahu@operationnull.com>2025-09-12 18:19:29 -0400
committerbd <bdunahu@operationnull.com>2025-09-12 18:19:29 -0400
commit006a5441997a0bc42a8a5277ad7c8f4937782002 (patch)
tree7a2588b249ecffa0cfec22b83e99ec4697e64447 /nemesis/nemesis.py
parent963fd1ac52c8d762f65d1cf19d766507e666a986 (diff)
Use plotly to visualize experiments, rework experiment/record logic
Diffstat (limited to 'nemesis/nemesis.py')
-rwxr-xr-xnemesis/nemesis.py188
1 files changed, 131 insertions, 57 deletions
diff --git a/nemesis/nemesis.py b/nemesis/nemesis.py
index c20d68d..46e8094 100755
--- a/nemesis/nemesis.py
+++ b/nemesis/nemesis.py
@@ -25,17 +25,39 @@ Commentary:
Code:
'''
+from causal_event_loop import CausalEventLoop
from collections import defaultdict
-from experiment import Experiment
+from html_gen import plot_results
import argparse
import asyncio
import os
+import random
import signal
import sys
+import threading
import time
import traceback
import types
+class Experiment:
+
+ # event loops participating in this this experiment
+ _loops = []
+ # a key-value pair where keys represent a handle and values
+ # represent the period of time waiting
+ samples = None
+
+ def __init__(self, loops):
+ self._loops = loops
+ self.samples = defaultdict(lambda: 0)
+
+ def get_loops(self):
+ return [l for l in self._loops if l.is_running()]
+
+ def add_handles(self, handles, loop, time):
+ for h in handles:
+ h.append(loop._thread_id)
+ self.samples[tuple(h)] += time
class Nemesis(object):
@@ -43,29 +65,35 @@ class Nemesis(object):
signal_interval = 0.0
# the timestamp which the last sample was taken
last_sample = None
+
# the current experiment being run
- curr_experiment = None
- # results from previous experiments, represented as strings
- results = []
- # The duration of each performance experiment
+ experiment_data = None
+ # the coroutine the current experiment is speeding up
+ experiment_coro = None
+ # the speedup of the current experiment
+ experiment_spdp = None
+
+ # results from previous experiments. Keys represent names of coroutines.
+ results = defaultdict(lambda: defaultdict(lambda: []))
+
+ # the file to write results to
+ filename = None
+
+ # The base duration of each performance experiment
e_duration = None
+
# The number of seconds remaining in this performance experiment.
r_duration = None
+
# A mapping of event loops to the previous running coroutine.
prev_coro = defaultdict(lambda: None)
- # temp
- coro = None
- dilation = 1.0
-
@staticmethod
- def __init__(coro, speedup, e_duration, w_time, signal_interval=0.01):
+ def __init__(e_duration, filename, signal_interval=0.01):
Nemesis.signal_interval = signal_interval
Nemesis.e_duration = e_duration
+ Nemesis.filename = filename
Nemesis.r_duration = 0
- # temporary
- Nemesis.coro = coro
- Nemesis.speedup = max(speedup, 1.0)
@staticmethod
def start():
@@ -79,32 +107,42 @@ class Nemesis(object):
@staticmethod
def stop():
signal.setitimer(signal.ITIMER_REAL, 0)
- Nemesis._stop_experiment()
- for r in Nemesis.results:
- print()
- print(r)
+ plot_results(Nemesis.results, Nemesis.filename)
+ print(f"Wrote {Nemesis.filename}")
@staticmethod
- def _start_experiment():
- Nemesis.r_duration = Nemesis.e_duration
+ def _start_experiment(coro, speedup):
+ Nemesis.r_duration = Nemesis.e_duration * speedup
Nemesis.prev_coro = defaultdict(lambda: None)
- Nemesis.curr_experiment = Experiment(Nemesis.coro, Nemesis.speedup)
+ Nemesis.experiment_coro = coro
+ Nemesis.experiment_spdp = speedup
+
+ loops = Nemesis._get_event_loops()
+ for l in loops:
+ if not isinstance(l, CausalEventLoop):
+ raise RuntimeException("Nemesis requires a custom event loop to insert slowdowns. It does not work on programs which change the event loop policy.")
+ l._set_dilation(1.0 / speedup)
+
+ Nemesis.experiment_data = Experiment(loops)
@staticmethod
def _stop_experiment():
- if Nemesis.curr_experiment is not None:
- print(f'finished running {Nemesis.curr_experiment.get_coro()} with speedup {Nemesis.curr_experiment.get_speedup()}')
- Nemesis.results.append(Nemesis.curr_experiment.get_results())
- del Nemesis.curr_experiment
+ if Nemesis.experiment_data is not None:
+ print(f'finished running experiment on {Nemesis.experiment_coro}')
+ results = Nemesis.experiment_data.samples
+ Nemesis.results[Nemesis.experiment_coro][Nemesis.experiment_spdp].append(results)
+ del Nemesis.experiment_data
@staticmethod
def _signal_handler(sig, frame):
curr_sample = time.perf_counter()
passed_time = curr_sample - Nemesis.last_sample
Nemesis.last_sample = curr_sample
- if Nemesis.curr_experiment:
- loops = Nemesis.curr_experiment.get_loops()
- exp_coro = Nemesis.curr_experiment.get_coro()
+
+
+ if getattr(Nemesis, 'experiment_data', None):
+ loops = Nemesis.experiment_data.get_loops()
+ exp_coro = Nemesis.experiment_coro
for loop in loops:
coro = Nemesis._get_current_coro(loop)
prev_coro = Nemesis.prev_coro[loop]
@@ -117,31 +155,42 @@ class Nemesis(object):
loop._update_ready(True)
handles = Nemesis._get_waiting_handles(loop)
- Nemesis.curr_experiment.add_handles(handles, loop, passed_time)
+ Nemesis.experiment_data.add_handles(handles, loop, passed_time)
- Nemesis.r_duration -= passed_time
- if (Nemesis.r_duration <= 0):
- Nemesis._stop_experiment()
- Nemesis._start_experiment()
+ Nemesis.r_duration -= passed_time
+ if (Nemesis.r_duration <= 0):
+ Nemesis._stop_experiment()
+ else:
+ coros = []
+ loops = Nemesis._get_event_loops()
+ for loop in loops:
+ coro = Nemesis._get_current_coro(loop)
+ if coro is not None:
+ coros.append(coro)
+ if coros:
+ Nemesis._start_experiment(random.choice(coros),
+ Nemesis._select_speedup())
+
+ @staticmethod
def _get_waiting_handles(loop):
handles = []
for handle in loop._ready:
- # no duplicates
handle_info = Nemesis._parse_handle(handle)
- if handle_info not in handles:
- handles.append(handle_info)
+ handles.append(handle_info)
return handles
+ @staticmethod
def _parse_handle(handle):
cb = handle._callback
if isinstance(getattr(cb, '__self__', None), asyncio.tasks.Task):
task = cb.__self__
coro = task.get_coro()
- return [task.get_name(), Nemesis.get_coro_name(coro)]
+ return [task.get_name(), Nemesis._get_coro_name(coro)]
else:
return [str(type(handle).__name__), cb.__name__]
+ @staticmethod
def _get_current_coro(loop):
tid = loop._thread_id
assert tid, f"{loop} is not running, yet we attempted to sample it!"
@@ -160,6 +209,33 @@ class Nemesis(object):
return None
@staticmethod
+ def _get_event_loops():
+ '''Returns each thread's event loop, if it exists.'''
+ loops = []
+ for t in threading.enumerate():
+ frame = sys._current_frames().get(t.ident)
+ if frame:
+ loop = Nemesis._walk_back_until_loop(frame)
+ if loop and loop not in loops:
+ loops.append(loop)
+ return loops
+
+ @staticmethod
+ def _walk_back_until_loop(frame):
+ '''Walks back the callstack until we are in a method named '_run_once'.
+ If this is ever true, we assume we are in an Asyncio event loop method,
+ and check to see if the 'self' variable is indeed and instance of
+ AbstractEventLoop. Return this variable if true.'''
+ while frame:
+ if frame.f_code.co_name == '_run_once' and 'self' in frame.f_locals:
+ loop = frame.f_locals['self']
+ if isinstance(loop, asyncio.AbstractEventLoop):
+ return loop
+ else:
+ frame = frame.f_back
+ return None
+
+ @staticmethod
def _should_trace(filename):
'''Returns FALSE if filename is uninteresting to the user.
Don't depend on this. It kind of sucks.'''
@@ -181,7 +257,17 @@ class Nemesis(object):
return False
return True
- def get_coro_name(coro):
+ def _select_speedup():
+ '''
+ Returns a random speedup between 0% to 100%, in multiples of 5%.
+ Because a baseline is needed to calculate effect on program
+ performance, selects a speedup of 0 with 50% probability.
+ '''
+ r1 = random.randint(-9, 20)
+ return 1 + max(0, r1) / 20
+
+ @staticmethod
+ def _get_coro_name(coro):
'''
Stolen from _format_coroutine in cpython/Lib/asyncio/coroutines.py
'''
@@ -223,26 +309,16 @@ if __name__ == "__main__":
metavar='',
type=float,
default=0.01)
- parser.add_argument('-s', '--speedup',
- help='The amount of virtual speedup. Cannot go below one. Default is 2.0.',
- metavar='',
- type=float,
- default=2.0)
- parser.add_argument('-c', '--task',
- help='The task to virtually speedup.',
- metavar='',
- type=str,
- required=True)
parser.add_argument('-e', '--experiment-duration',
- help='The performance experiment duration. Defaults to 4 seconds.',
+ help='The performance experiment duration. Defaults to 3 seconds.',
metavar='',
type=float,
- default=4)
- parser.add_argument('-w', '--warmup-time',
- help='Amount of time to wait until the first performance experiment. Default is 0 milliseconds',
+ default=3)
+ parser.add_argument('-f', '--filename',
+ help='The filename to write results to.',
metavar='',
- type=float,
- default=0.1)
+ type=str,
+ default="results.html")
parser.add_argument('prog',
type=str,
nargs='*',
@@ -253,10 +329,8 @@ if __name__ == "__main__":
try:
with open(args.prog[0], 'r', encoding='utf-8') as fp:
code = compile(fp.read(), args.prog[0], "exec")
- Nemesis(args.task,
- args.speedup,
- args.experiment_duration,
- args.warmup_time,
+ Nemesis(args.experiment_duration,
+ args.filename,
args.interval).start()
exec(code, the_globals)
Nemesis.stop()