diff options
| author | bd <bdunahu@operationnull.com> | 2025-11-26 03:00:53 -0500 |
|---|---|---|
| committer | bd <bdunahu@operationnull.com> | 2025-11-26 03:00:53 -0500 |
| commit | d5b874c905907528c5e0c46f9c4b5ff8827fc6db (patch) | |
| tree | 51de915a85eceb268122978864c68a4252986222 | |
| parent | df45e4380bd325c333ccdd48771b4ceaf36ff4c4 (diff) | |
Numerous big fixes, target function only selects in event loop
| -rw-r--r-- | nemesis/causal_event_loop.py | 140 | ||||
| -rw-r--r-- | nemesis/html_gen.py | 13 | ||||
| -rwxr-xr-x | nemesis/nemesis.py | 21 |
3 files changed, 120 insertions, 54 deletions
diff --git a/nemesis/causal_event_loop.py b/nemesis/causal_event_loop.py index 6d18dbc..6fea90d 100644 --- a/nemesis/causal_event_loop.py +++ b/nemesis/causal_event_loop.py @@ -24,6 +24,7 @@ from sortedcontainers import SortedList import heapq import selectors import sys +import os import time import traceback from copy import copy @@ -31,10 +32,16 @@ from asyncio.log import logger from asyncio import Task, events from asyncio.base_events import _format_handle +from pathlib import Path +from pprint import pformat +import logging + _MIN_SCHEDULED_TIMER_HANDLES = 100 _MIN_CANCELLED_TIMER_HANDLES_FRACTION = 0.5 MAXIMUM_SELECT_TIMEOUT = 24 * 3600 +TRACE_PATH = \ + Path(__file__).resolve().parent.parent / "logs" / "trace.log" orig_handle = asyncio.events.Handle @@ -90,9 +97,11 @@ for sc in [orig_handle] + orig_handle.__subclasses__(): class CausalEventLoop(asyncio.SelectorEventLoop): + _log_level = logging.DEBUG + _logger = None # a value between 0 and 1. 0 means no optimization, # 1 means the target coroutine is optimized away entirely - _speedup = 1.0 + _speedup = 0.0 # a list of callbacks which have recently completed _pause_buffer = [] # a list of intervals in which the target coroutine has been active @@ -107,12 +116,32 @@ class CausalEventLoop(asyncio.SelectorEventLoop): def __init__(self) -> None: super().__init__() + self.set_logger() + + def set_logger(self): + if not os.path.exists(TRACE_PATH): + with open(TRACE_PATH, 'w') as file: + pass + self._logger = logging.getLogger(f'LOOP {self._thread_id}') + self._logger.setLevel(self._log_level) + self._logger.propagate = False + file_handler = logging.FileHandler(TRACE_PATH) + file_handler.setLevel(self._log_level) + formatter = logging.Formatter('%(name)s - %(asctime)s - %(levelname)s --- %(message)s') + file_handler.setFormatter(formatter) + self._logger.addHandler(file_handler) + self._logger.info("═" * 40) + self._logger.info("STARTING LOOP") + self._logger.info("═" * 40) def set_speedup(self, speedup): # print(self._coro_intervals) self._start_time = self.time() self._speedup = speedup + # self._logger.info(f"STARTING EXPERIMENT WITH {self._speedup}") + # self._logger.info("═" * 30) + # reset experiment counters self._coro_intervals.clear() self._completed_coros.clear() @@ -132,20 +161,23 @@ class CausalEventLoop(asyncio.SelectorEventLoop): return (curr_time - self._start_time) - pause_time def ping_enter_coro(self): + # self._logger.debug(f"Recording coro ENTER.") self._time_entered_coro = self.time() def ping_exit_coro(self): try: assert isinstance(self._time_entered_coro, float), "Tried to exit coro before recorded entry!" except AssertionError as e: - print(f"Assertion failed: {e}") + self._logger.critical(f"Tried to exit coro before recorded entry: {e}. Aborting.") sys.exit(1) + # self._logger.debug(f"Recording coro EXIT.") self._coro_intervals.add((self._time_entered_coro, self.time())) self._time_entered_coro = None def collect_ready_events(self, timeout=0): event_list = self._selector.select(timeout) - self._ready_events.append((event_list, self.time())) + if event_list: + self._ready_events.append((event_list, self.time())) def update_ready(self): ''' @@ -154,6 +186,8 @@ class CausalEventLoop(asyncio.SelectorEventLoop): ''' curr_time = self.time() sched_count = len(self._scheduled) + # two methods to cleanup cancelled callbacks; + # avoid the expensive one whenever possible if (sched_count > _MIN_SCHEDULED_TIMER_HANDLES and self._timer_cancelled_count / sched_count > _MIN_CANCELLED_TIMER_HANDLES_FRACTION): @@ -163,6 +197,7 @@ class CausalEventLoop(asyncio.SelectorEventLoop): for handle in self._scheduled: if handle._cancelled: handle._scheduled = False + self._logger.debug(f"\tSlow cleanup killed {_format_handle(handle)}") else: new_scheduled.append(handle) @@ -174,6 +209,7 @@ class CausalEventLoop(asyncio.SelectorEventLoop): while self._scheduled and self._scheduled[0]._cancelled: self._timer_cancelled_count -= 1 handle = heapq.heappop(self._scheduled) + self._logger.debug(f"\tLazy cleanup killed {_format_handle(handle)}") handle._scheduled = False timeout = None @@ -195,10 +231,17 @@ class CausalEventLoop(asyncio.SelectorEventLoop): timeout = max(0, min(timeout, MAXIMUM_SELECT_TIMEOUT)) timeout = 0 + self._logger.debug(f"HANDLING I/O.") + self._logger.info("-" * 20) self.collect_ready_events(timeout) - for event_list in self._ready_events: + self._logger.debug(f"\tPolled events for {timeout} (waiting={len(self._ready_events)})\n{pformat(self._ready_events, indent=2)}") + + while len(self._ready_events): + event_list = self._ready_events.pop(0) self._process_events(*event_list) + self._logger.debug(f"HANDLING SCHEDULED.") + self._logger.info("-" * 20) # Handle 'later' callbacks that are ready. curr_time = self.time() end_time = curr_time + self._clock_resolution @@ -210,11 +253,19 @@ class CausalEventLoop(asyncio.SelectorEventLoop): handle = heapq.heappop(self._scheduled) handle._scheduled = False - time_to_buffer = when + self._get_pause_for_io(handle, curr_time) + delay = self._get_pause_for_io(handle, curr_time) + time_to_buffer = when + delay handle._when = time_to_buffer handle.time_entered_pause_buffer = curr_time - heapq.heappush(self._pause_buffer, handle) + # self._ready.append(handle) + # do not allow duplicates (FIX?) + if handle not in self._pause_buffer: + heapq.heappush(self._pause_buffer, handle) + self._logger.debug(f"\tscheduled -> pause_buffer for {_format_handle(handle)} (delay={delay})") + + self._logger.debug(f"HANDLING PAUSED.") + self._logger.info("-" * 20) # handle callbacks which can leave pause timeout while self._pause_buffer: # required when handle's _when is modified in place @@ -223,6 +274,7 @@ class CausalEventLoop(asyncio.SelectorEventLoop): for other in self._pause_buffer[1:]: assert handle._when <= other._when, f"Heap root {handle} is not smallest" if handle._when >= end_time: + self._logger.debug(f"\t{_format_handle(handle)} is not ready to leave. Moving on.") break # pop the first item in the list handle = heapq.heappop(self._pause_buffer) @@ -230,13 +282,15 @@ class CausalEventLoop(asyncio.SelectorEventLoop): # if we paused during buffering, we need to delay again # TODO clean this up # this whole file has 'rounding' timing errors :( - handle._when = handle._when + \ - self._get_pause_for_pause_time(handle, curr_time) + delay = self._get_pause_for_pause_time(handle, curr_time) + handle._when = handle._when + delay if handle._when >= end_time: handle.time_entered_pause_buffer = curr_time heapq.heappush(self._pause_buffer, handle) + self._logger.debug(f"\tpause_buffer -> pause_buffer for {_format_handle(handle)} (delay={delay})") else: self._ready.append(handle) + self._logger.debug(f"\tpause_buffer -> ready for {_format_handle(handle)}") def _run_once(self): """ @@ -244,6 +298,8 @@ class CausalEventLoop(asyncio.SelectorEventLoop): This calls all currently ready callbacks. """ + self._logger.debug("STARTING ITERATION.") + self._logger.info("-" * 40) self.update_ready() # This is the only place where callbacks are actually *called*. @@ -252,7 +308,9 @@ class CausalEventLoop(asyncio.SelectorEventLoop): # callbacks scheduled by callbacks run this time around -- # they will be run the next time (after another I/O poll). # Use an idiom that is thread-safe without using locks. + self._logger.info("-" * 40) ntodo = len(self._ready) + self._logger.debug(f"RUNNING {ntodo} CALLBACKS.") for i in range(ntodo): handle = self._ready.popleft() if handle._cancelled: @@ -269,21 +327,20 @@ class CausalEventLoop(asyncio.SelectorEventLoop): if self._debug and dt >= self.slow_callback_duration: logger.warning('Executing %s took %.3f seconds', _format_handle(handle), dt) - - # do not record coroutines which left I/O during the previous experiment - # the time held in the pause buffer would have also been incorrect for - # this experiment, but there is nothing we can do about it. - if handle._when > self._start_time: - time_interval = (handle._when, process_start_time) - pause_time = self._get_pause_time(time_interval) - adjusted_start_time = process_start_time - pause_time - wait_time = adjusted_start_time - handle._when - try: - assert wait_time >= -0.0001, f"wait time on {_format_handle(handle)} was found to be {wait_time:.4f}!" - except AssertionError as e: - print(f"Assertion failed: {e}") - sys.exit(1) - self._completed_coros.append((_format_handle(handle), wait_time)) + self._logger.warning('Executing %s took %.3f seconds', + _format_handle(handle), dt) + + time_interval = (handle._when, process_start_time) + pause_time = self._get_pause_time(time_interval) + adjusted_start_time = process_start_time - pause_time + wait_time = adjusted_start_time - handle._when + try: + assert wait_time >= -0.0001, f"wait time on {_format_handle(handle)} was found to be {wait_time:.4f}!" + except AssertionError as e: + self._logger.critical(f'Negative latency on callback {_format_handle(handle)} ({dt}). Aborting.') + sys.exit(1) + self._completed_coros.append((_format_handle(handle), wait_time)) + self._logger.debug(f'\tCompleted {_format_handle(handle)} with latency {dt}') except Exception: traceback.print_exc() @@ -297,6 +354,7 @@ class CausalEventLoop(asyncio.SelectorEventLoop): if mask & selectors.EVENT_READ and reader is not None: if reader._cancelled: self._remove_reader(fileobj) + self._logger.info(f"\treader {reader} was cancelled.") else: time_to_buffer = time + \ self._get_pause_for_io(reader, time) @@ -305,6 +363,7 @@ class CausalEventLoop(asyncio.SelectorEventLoop): if mask & selectors.EVENT_WRITE and writer is not None: if writer._cancelled: self._remove_writer(fileobj) + self._logger.info(f"\twriter {writer} was cancelled.") else: time_to_buffer = time + \ self._get_pause_for_io(writer, time) @@ -321,6 +380,7 @@ class CausalEventLoop(asyncio.SelectorEventLoop): if not handle._when: handle._when = curr_time self._ready.append(handle) + self._logger.debug(f"\tio -> ready for {_format_handle(handle)}") return handle def call_soon_threadsafe(self, callback, *args, context=None): @@ -329,7 +389,10 @@ class CausalEventLoop(asyncio.SelectorEventLoop): if self._debug: self._check_callback(callback, 'call_soon_threadsafe') handle = events._ThreadSafeHandle(callback, args, self, context) - self._add_callback(handle) + if not handle._when: + handle._when = self.time() + self._ready.append(handle) + self._logger.debug(f"\tio -> ready for {_format_handle(handle)}") if handle._source_traceback: del handle._source_traceback[-1] if handle._source_traceback: @@ -345,15 +408,19 @@ class CausalEventLoop(asyncio.SelectorEventLoop): # required in cases where the event loop reuses the same handle if handle._when < curr_time: handle._when = curr_time - if not handle._cancelled and handle not in self._pause_buffer: + if not handle._cancelled: if self._is_vital(handle): handle._when = curr_time - # print(f'{_format_handle(handle)}: {curr_time}') self._ready.append(handle) + self._logger.debug(f"\tio -> ready for VITAL {_format_handle(handle)}") else: - # print(f'DELAYED {_format_handle(handle)}: {curr_time}') handle.time_entered_pause_buffer = curr_time - heapq.heappush(self._pause_buffer, handle) + # do not allow duplicates (FIX?) + if handle not in self._pause_buffer: + heapq.heappush(self._pause_buffer, handle) + self._logger.debug(f"\tio -> pause_buffer for {_format_handle(handle)} (delay={handle._when - curr_time})") + else: + self._logger.warning(f"\t_add_callback called on cancelled handle {_format_handle(handle)}") def _get_pause_for_io(self, handle, io_time): @@ -389,16 +456,19 @@ class CausalEventLoop(asyncio.SelectorEventLoop): time += self._get_overlap(start, end, self._time_entered_coro, curr_time) - return time * self._speedup + pause_time = time * self._speedup + assert pause_time >= 0, f"Delay was found to be less than 0: {pause_time}" + return pause_time def _get_overlap(self, a_start, a_end, b_start, b_end): overlap_start = max(a_start, b_start) overlap_end = min(a_end, b_end) + assert overlap_end >= overlap_start, f"Bad overlaps: {a_start} {a_end} : {b_start} {b_end} ({overlap_end - overlap_start})" return overlap_end - overlap_start def _is_vital(self, handle): """ Methods which cannot afford to be paused.""" - blacklist = ['_read_from_self', '_read_ready'] + blacklist = ['_read_from_self', '_read_ready', '_accept_connection'] cb = handle._callback if isinstance(getattr(cb, '__self__', None), asyncio.tasks.Task): if cb.__self__.get_coro().__name__ in blacklist: @@ -409,9 +479,7 @@ class CausalEventLoop(asyncio.SelectorEventLoop): return False -class CausalEventLoopPolicy(asyncio.DefaultEventLoopPolicy): - def new_event_loop(self): - return CausalEventLoop() - - -asyncio.set_event_loop_policy(CausalEventLoopPolicy()) +# to profile your program, start the event loop with +# asyncio.run(coro, loop_factory=causal_loop_factory) +def causal_loop_factory(): + return CausalEventLoop() diff --git a/nemesis/html_gen.py b/nemesis/html_gen.py index 89c62d4..70d476b 100644 --- a/nemesis/html_gen.py +++ b/nemesis/html_gen.py @@ -40,8 +40,6 @@ def plot_results(results, output_file, input_file): y_throughput_list = [] y_max_latency_list = [] y_num_callbacks_list = [] - latency_hover_text = [] - max_latency_hover_text = [] for experiment in experiments: completed_callbacks = experiment[0] @@ -73,20 +71,11 @@ def plot_results(results, output_file, input_file): y_max_latency_list.append(max_cb[1]) breakdown = "<br>".join([f" {cb[0]}: {round(cb[1], 4)}" for cb in trimmed_callbacks]) - latency_hover_text.append( - f"{coro_name}<br>Speedup: {speedup}<br>Trimmed Average Wait ({int(TRIM_PERCENT*100)}%): {round(latency, 4)}<br>Breakdown:<br>{breakdown}" - ) - max_latency_hover_text.append( - f"{coro_name}<br>Speedup: {speedup}<br>Max Wait: {round(max_cb[1], 4)}<br>Handle: {max_cb[0]}" - ) else: latency = 0 y_latency_list.append(latency) - latency_hover_text.append(f"{coro_name}<br>Speedup: {speedup}<br>No callbacks") - max_latency_hover_text.append(f"{coro_name}<br>Speedup: {speedup}<br>No callbacks") - # handle throughput graph throughput = num_callbacks / virtual_run_time if virtual_run_time else 0 y_throughput_list.append(throughput) @@ -97,7 +86,6 @@ def plot_results(results, output_file, input_file): mode='markers', name=coro_name, marker=dict(color=get_color(coro_name)), - hovertext=latency_hover_text, showlegend=True, ), row=1, col=col) @@ -116,7 +104,6 @@ def plot_results(results, output_file, input_file): mode='markers', name=coro_name, marker=dict(color=get_color(coro_name)), - hovertext=max_latency_hover_text, showlegend=False, ), row=3, col=col) diff --git a/nemesis/nemesis.py b/nemesis/nemesis.py index 7eed743..03901e3 100755 --- a/nemesis/nemesis.py +++ b/nemesis/nemesis.py @@ -31,6 +31,7 @@ from html_gen import plot_results import argparse import asyncio import os +import inspect import random import signal import sys @@ -39,6 +40,8 @@ import time import traceback import types +CO_COROUTINE = inspect.CO_COROUTINE + class Experiment: # event loops participating in this this experiment @@ -128,7 +131,7 @@ class Nemesis(object): loops = Nemesis._get_event_loops() for loop in loops: if not isinstance(loop, CausalEventLoop): - raise RuntimeError("Nemesis requires a custom event loop to insert slowdowns. It does not work on programs which change the event loop policy.") + raise RuntimeError(f"Nemesis requires a custom event loop to insert slowdowns. You must start the event loop with `asyncio.run(your_coro(), loop_factory=causal_loop_factory)'.") loop.set_speedup(speedup) Nemesis.experiment_data = Experiment(loops) @@ -157,7 +160,7 @@ class Nemesis(object): loops = Nemesis.experiment_data.get_loops() exp_coro = Nemesis.experiment_coro for loop in loops: - coro = Nemesis._get_current_frame(loop) + coro = Nemesis._get_current_frame(loop).f_code.co_name prev_coro = Nemesis.prev_coro[loop] if not prev_coro == coro: if prev_coro == exp_coro: @@ -177,8 +180,8 @@ class Nemesis(object): loops = Nemesis._get_event_loops() for loop in loops: frame = Nemesis._get_current_frame(loop) - if frame is not None: - frames.append(frame) + if frame is not None and Nemesis._is_child_of_async(frame): + frames.append(frame.f_code.co_name) if frames: Nemesis._start_experiment(random.choice(frames), Nemesis._select_speedup()) @@ -218,10 +221,18 @@ class Nemesis(object): if frame: fname = frame.f_code.co_filename if frame: - return frame.f_code.co_name + return frame return None @staticmethod + def _is_child_of_async(frame): + '''Returns TRUE if an async method is a parent of FRAME.''' + while frame: + if bool(frame.f_code.co_flags & CO_COROUTINE): + return True + frame = frame.f_back + + @staticmethod def _get_event_loops(): '''Returns each thread's event loop, if it exists.''' loops = [] |
