diff options
| author | bd <bdunahu@operationnull.com> | 2025-11-19 01:08:06 -0500 |
|---|---|---|
| committer | bd <bdunahu@operationnull.com> | 2025-11-19 01:08:06 -0500 |
| commit | 61c35f0bc0b3fe598a06a2a63a6b3fa26802add2 (patch) | |
| tree | 485b2a57e9ce7428825a0c5d45ef71d351c6e20b | |
| parent | df45e4380bd325c333ccdd48771b4ceaf36ff4c4 (diff) | |
add logger, provide loop factory rather than default loop policy
| -rw-r--r-- | nemesis/__init__.py | 0 | ||||
| -rw-r--r-- | nemesis/causal_event_loop.py | 110 | ||||
| -rwxr-xr-x | nemesis/nemesis.py | 2 |
3 files changed, 84 insertions, 28 deletions
diff --git a/nemesis/__init__.py b/nemesis/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/nemesis/__init__.py diff --git a/nemesis/causal_event_loop.py b/nemesis/causal_event_loop.py index 6d18dbc..b6fdea6 100644 --- a/nemesis/causal_event_loop.py +++ b/nemesis/causal_event_loop.py @@ -24,6 +24,7 @@ from sortedcontainers import SortedList import heapq import selectors import sys +import os import time import traceback from copy import copy @@ -31,10 +32,16 @@ from asyncio.log import logger from asyncio import Task, events from asyncio.base_events import _format_handle +from pathlib import Path +from pprint import pformat +import logging + _MIN_SCHEDULED_TIMER_HANDLES = 100 _MIN_CANCELLED_TIMER_HANDLES_FRACTION = 0.5 MAXIMUM_SELECT_TIMEOUT = 24 * 3600 +TRACE_PATH = \ + Path(__file__).resolve().parent.parent / "logs" / "trace.log" orig_handle = asyncio.events.Handle @@ -90,9 +97,11 @@ for sc in [orig_handle] + orig_handle.__subclasses__(): class CausalEventLoop(asyncio.SelectorEventLoop): + _log_level = logging.DEBUG + _logger = None # a value between 0 and 1. 0 means no optimization, # 1 means the target coroutine is optimized away entirely - _speedup = 1.0 + _speedup = 0.0 # a list of callbacks which have recently completed _pause_buffer = [] # a list of intervals in which the target coroutine has been active @@ -107,12 +116,32 @@ class CausalEventLoop(asyncio.SelectorEventLoop): def __init__(self) -> None: super().__init__() + self.set_logger() + + def set_logger(self): + if not os.path.exists(TRACE_PATH): + with open(TRACE_PATH, 'w') as file: + pass + self._logger = logging.getLogger(f'LOOP {self._thread_id}') + self._logger.setLevel(self._log_level) + self._logger.propagate = False + file_handler = logging.FileHandler(TRACE_PATH) + file_handler.setLevel(self._log_level) + formatter = logging.Formatter('%(name)s - %(asctime)s - %(levelname)s --- %(message)s') + file_handler.setFormatter(formatter) + self._logger.addHandler(file_handler) + self._logger.info("═" * 40) + self._logger.info("STARTING LOOP") + self._logger.info("═" * 40) def set_speedup(self, speedup): # print(self._coro_intervals) self._start_time = self.time() self._speedup = speedup + # self._logger.info(f"STARTING EXPERIMENT WITH {self._speedup}") + # self._logger.info("═" * 30) + # reset experiment counters self._coro_intervals.clear() self._completed_coros.clear() @@ -132,20 +161,23 @@ class CausalEventLoop(asyncio.SelectorEventLoop): return (curr_time - self._start_time) - pause_time def ping_enter_coro(self): + # self._logger.debug(f"Recording coro ENTER.") self._time_entered_coro = self.time() def ping_exit_coro(self): try: assert isinstance(self._time_entered_coro, float), "Tried to exit coro before recorded entry!" except AssertionError as e: - print(f"Assertion failed: {e}") + self._logger.critical(f"Tried to exit coro before recorded entry: {e}. Aborting.") sys.exit(1) + # self._logger.debug(f"Recording coro EXIT.") self._coro_intervals.add((self._time_entered_coro, self.time())) self._time_entered_coro = None def collect_ready_events(self, timeout=0): event_list = self._selector.select(timeout) self._ready_events.append((event_list, self.time())) + # self._logger.debug(f"\tPolled events for {timeout} (waiting={len(self._ready_events)})\n{pformat(event_list, indent=2)}") def update_ready(self): ''' @@ -154,6 +186,8 @@ class CausalEventLoop(asyncio.SelectorEventLoop): ''' curr_time = self.time() sched_count = len(self._scheduled) + # two methods to cleanup cancelled callbacks; + # avoid the expensive one whenever possible if (sched_count > _MIN_SCHEDULED_TIMER_HANDLES and self._timer_cancelled_count / sched_count > _MIN_CANCELLED_TIMER_HANDLES_FRACTION): @@ -163,6 +197,7 @@ class CausalEventLoop(asyncio.SelectorEventLoop): for handle in self._scheduled: if handle._cancelled: handle._scheduled = False + self._logger.debug(f"\tSlow cleanup killed {_format_handle(handle)}") else: new_scheduled.append(handle) @@ -174,6 +209,7 @@ class CausalEventLoop(asyncio.SelectorEventLoop): while self._scheduled and self._scheduled[0]._cancelled: self._timer_cancelled_count -= 1 handle = heapq.heappop(self._scheduled) + self._logger.debug(f"\tLazy cleanup killed {_format_handle(handle)}") handle._scheduled = False timeout = None @@ -195,10 +231,14 @@ class CausalEventLoop(asyncio.SelectorEventLoop): timeout = max(0, min(timeout, MAXIMUM_SELECT_TIMEOUT)) timeout = 0 + self._logger.debug(f"HANDLING I/O.") + self._logger.info("-" * 20) self.collect_ready_events(timeout) for event_list in self._ready_events: self._process_events(*event_list) + self._logger.debug(f"HANDLING SCHEDULED.") + self._logger.info("-" * 20) # Handle 'later' callbacks that are ready. curr_time = self.time() end_time = curr_time + self._clock_resolution @@ -210,11 +250,17 @@ class CausalEventLoop(asyncio.SelectorEventLoop): handle = heapq.heappop(self._scheduled) handle._scheduled = False - time_to_buffer = when + self._get_pause_for_io(handle, curr_time) + delay = self._get_pause_for_io(handle, curr_time) + time_to_buffer = when + delay handle._when = time_to_buffer handle.time_entered_pause_buffer = curr_time + # self._ready.append(handle) heapq.heappush(self._pause_buffer, handle) + self._logger.debug(f"\tscheduled -> pause_buffer for {_format_handle(handle)} (delay={delay})") + + self._logger.debug(f"HANDLING PAUSED.") + self._logger.info("-" * 20) # handle callbacks which can leave pause timeout while self._pause_buffer: # required when handle's _when is modified in place @@ -230,13 +276,15 @@ class CausalEventLoop(asyncio.SelectorEventLoop): # if we paused during buffering, we need to delay again # TODO clean this up # this whole file has 'rounding' timing errors :( - handle._when = handle._when + \ - self._get_pause_for_pause_time(handle, curr_time) + delay = self._get_pause_for_pause_time(handle, curr_time) + handle._when = handle._when + delay if handle._when >= end_time: handle.time_entered_pause_buffer = curr_time heapq.heappush(self._pause_buffer, handle) + self._logger.debug(f"\tpause_buffer -> pause_buffer for {_format_handle(handle)} (delay={delay})") else: self._ready.append(handle) + self._logger.debug(f"\tpause_buffer -> ready for {_format_handle(handle)}") def _run_once(self): """ @@ -244,6 +292,8 @@ class CausalEventLoop(asyncio.SelectorEventLoop): This calls all currently ready callbacks. """ + self._logger.debug("STARTING ITERATION.") + self._logger.info("-" * 40) self.update_ready() # This is the only place where callbacks are actually *called*. @@ -252,7 +302,9 @@ class CausalEventLoop(asyncio.SelectorEventLoop): # callbacks scheduled by callbacks run this time around -- # they will be run the next time (after another I/O poll). # Use an idiom that is thread-safe without using locks. + self._logger.info("-" * 40) ntodo = len(self._ready) + self._logger.debug(f"RUNNING {ntodo} CALLBACKS.") for i in range(ntodo): handle = self._ready.popleft() if handle._cancelled: @@ -269,21 +321,20 @@ class CausalEventLoop(asyncio.SelectorEventLoop): if self._debug and dt >= self.slow_callback_duration: logger.warning('Executing %s took %.3f seconds', _format_handle(handle), dt) - - # do not record coroutines which left I/O during the previous experiment - # the time held in the pause buffer would have also been incorrect for - # this experiment, but there is nothing we can do about it. - if handle._when > self._start_time: - time_interval = (handle._when, process_start_time) - pause_time = self._get_pause_time(time_interval) - adjusted_start_time = process_start_time - pause_time - wait_time = adjusted_start_time - handle._when - try: - assert wait_time >= -0.0001, f"wait time on {_format_handle(handle)} was found to be {wait_time:.4f}!" - except AssertionError as e: - print(f"Assertion failed: {e}") - sys.exit(1) - self._completed_coros.append((_format_handle(handle), wait_time)) + self._logger.warning('Executing %s took %.3f seconds', + _format_handle(handle), dt) + + time_interval = (handle._when, process_start_time) + pause_time = self._get_pause_time(time_interval) + adjusted_start_time = process_start_time - pause_time + wait_time = adjusted_start_time - handle._when + try: + assert wait_time >= -0.0001, f"wait time on {_format_handle(handle)} was found to be {wait_time:.4f}!" + except AssertionError as e: + self._logger.critical(f'Negative latency on callback {_format_handle(handle)} ({dt}). Aborting.') + sys.exit(1) + self._completed_coros.append((_format_handle(handle), wait_time)) + self._logger.debug(f'\tCompleted {_format_handle(handle)} with latency {dt}') except Exception: traceback.print_exc() @@ -321,6 +372,7 @@ class CausalEventLoop(asyncio.SelectorEventLoop): if not handle._when: handle._when = curr_time self._ready.append(handle) + self._logger.debug(f"\tio -> ready for {_format_handle(handle)}") return handle def call_soon_threadsafe(self, callback, *args, context=None): @@ -329,7 +381,10 @@ class CausalEventLoop(asyncio.SelectorEventLoop): if self._debug: self._check_callback(callback, 'call_soon_threadsafe') handle = events._ThreadSafeHandle(callback, args, self, context) - self._add_callback(handle) + if not handle._when: + handle._when = self.time() + self._ready.append(handle) + self._logger.debug(f"\tio -> ready for {_format_handle(handle)}") if handle._source_traceback: del handle._source_traceback[-1] if handle._source_traceback: @@ -350,10 +405,13 @@ class CausalEventLoop(asyncio.SelectorEventLoop): handle._when = curr_time # print(f'{_format_handle(handle)}: {curr_time}') self._ready.append(handle) + self._logger.debug(f"\tio -> ready for VITAL {_format_handle(handle)}") else: # print(f'DELAYED {_format_handle(handle)}: {curr_time}') handle.time_entered_pause_buffer = curr_time + # self._ready.append(handle) heapq.heappush(self._pause_buffer, handle) + self._logger.debug(f"\tio -> pause_buffer for {_format_handle(handle)} (delay={curr_time - handle._when})") def _get_pause_for_io(self, handle, io_time): @@ -409,9 +467,7 @@ class CausalEventLoop(asyncio.SelectorEventLoop): return False -class CausalEventLoopPolicy(asyncio.DefaultEventLoopPolicy): - def new_event_loop(self): - return CausalEventLoop() - - -asyncio.set_event_loop_policy(CausalEventLoopPolicy()) +# to profile your program, start the event loop with +# asyncio.run(coro, loop_factory=causal_loop_factory) +def causal_loop_factory(): + return CausalEventLoop() diff --git a/nemesis/nemesis.py b/nemesis/nemesis.py index 7eed743..6d2941c 100755 --- a/nemesis/nemesis.py +++ b/nemesis/nemesis.py @@ -128,7 +128,7 @@ class Nemesis(object): loops = Nemesis._get_event_loops() for loop in loops: if not isinstance(loop, CausalEventLoop): - raise RuntimeError("Nemesis requires a custom event loop to insert slowdowns. It does not work on programs which change the event loop policy.") + raise RuntimeError(f"Nemesis requires a custom event loop to insert slowdowns. You must start the event loop with `asyncio.run(your_coro(), loop_factory=causal_loop_factory)'.") loop.set_speedup(speedup) Nemesis.experiment_data = Experiment(loops) |
