diff options
| author | bd <bdunahu@operationnull.com> | 2025-12-01 00:37:20 -0500 |
|---|---|---|
| committer | bd <bdunahu@operationnull.com> | 2025-12-01 00:37:20 -0500 |
| commit | e39aed0d7a24cffda89ae0f33e0c40f171ad05ea (patch) | |
| tree | f50d1eaa0c2c7c6a3679028fd48d31dad09ff30d /nemesis | |
| parent | 37bf6949ca4e84760512c05719efbd18d4eb27f3 (diff) | |
profiling madness
Diffstat (limited to 'nemesis')
| -rw-r--r-- | nemesis/causal_event_loop.py | 93 |
1 files changed, 50 insertions, 43 deletions
diff --git a/nemesis/causal_event_loop.py b/nemesis/causal_event_loop.py index f9cffb4..1b04135 100644 --- a/nemesis/causal_event_loop.py +++ b/nemesis/causal_event_loop.py @@ -22,6 +22,7 @@ from typing import TYPE_CHECKING, Any, Callable, Self if TYPE_CHECKING: import contextvars +import line_profiler import asyncio import collections from sortedcontainers import SortedList @@ -126,25 +127,28 @@ class CausalEventLoop(asyncio.SelectorEventLoop): if not os.path.exists(TRACE_PATH): with open(TRACE_PATH, 'w') as file: pass - self._logger = logging.getLogger(f'LOOP {self._thread_id}') - self._logger.setLevel(self._log_level) - self._logger.propagate = False + # self._logger = logging.getLogger(f'LOOP {self._thread_id}') + # self._logger.setLevel(self._log_level) + # self._logger.propagate = False file_handler = logging.FileHandler(TRACE_PATH) file_handler.setLevel(self._log_level) formatter = logging.Formatter('%(name)s - %(asctime)s - %(levelname)s --- %(message)s') file_handler.setFormatter(formatter) - self._logger.addHandler(file_handler) - self._logger.info("═" * 40) - self._logger.info("STARTING LOOP") - self._logger.info("═" * 40) + # self._logger.addHandler(file_handler) + + # self._logger.disabled = True + + # self._logger.info("═" * 40) + # self._logger.info("STARTING LOOP") + # self._logger.info("═" * 40) def set_speedup(self: Any, speedup: float) -> None: # print(self._coro_intervals) self._start_time = self.time() self._speedup = speedup - # self._logger.info(f"STARTING EXPERIMENT WITH {self._speedup}") - # self._logger.info("═" * 30) + # # self._logger.info(f"STARTING EXPERIMENT WITH {self._speedup}") + # # self._logger.info("═" * 30) # reset experiment counters self._coro_intervals.clear() @@ -165,16 +169,16 @@ class CausalEventLoop(asyncio.SelectorEventLoop): return (curr_time - self._start_time) - pause_time def ping_enter_coro(self: Any) -> None: - # self._logger.debug(f"Recording coro ENTER.") + # # self._logger.debug(f"Recording coro ENTER.") self._time_entered_coro = self.time() def ping_exit_coro(self: Any) -> None: try: assert isinstance(self._time_entered_coro, float), "Tried to exit coro before recorded entry!" except AssertionError as e: - self._logger.critical(f"Tried to exit coro before recorded entry: {e}. Aborting.") + # self._logger.critical(f"Tried to exit coro before recorded entry: {e}. Aborting.") sys.exit(1) - # self._logger.debug(f"Recording coro EXIT.") + # # self._logger.debug(f"Recording coro EXIT.") self._coro_intervals.add((self._time_entered_coro, self.time())) self._time_entered_coro = None @@ -183,6 +187,7 @@ class CausalEventLoop(asyncio.SelectorEventLoop): if event_list: self._ready_events.append((event_list, self.time())) + @line_profiler.profile def update_ready(self: Any) -> None: ''' Polls the IO selector, schedules resulting callbacks, and schedules @@ -201,7 +206,7 @@ class CausalEventLoop(asyncio.SelectorEventLoop): for handle in self._scheduled: if handle._cancelled: handle._scheduled = False - self._logger.debug(f"\tSlow cleanup killed {_format_handle(handle)}") + # self._logger.debug(f"\tSlow cleanup killed {_format_handle(handle)}") else: new_scheduled.append(handle) @@ -213,7 +218,7 @@ class CausalEventLoop(asyncio.SelectorEventLoop): while self._scheduled and self._scheduled[0]._cancelled: self._timer_cancelled_count -= 1 handle = heapq.heappop(self._scheduled) - self._logger.debug(f"\tLazy cleanup killed {_format_handle(handle)}") + # self._logger.debug(f"\tLazy cleanup killed {_format_handle(handle)}") handle._scheduled = False timeout = None @@ -235,17 +240,17 @@ class CausalEventLoop(asyncio.SelectorEventLoop): timeout = max(0, min(timeout, MAXIMUM_SELECT_TIMEOUT)) timeout = 0 - self._logger.debug(f"HANDLING I/O.") - self._logger.info("-" * 20) + # self._logger.debug(f"HANDLING I/O.") + # self._logger.info("-" * 20) self.collect_ready_events(timeout) - self._logger.debug(f"\tPolled events for {timeout} (waiting={len(self._ready_events)})\n{pformat(self._ready_events, indent=2)}") + # self._logger.debug(f"\tPolled events for {timeout} (waiting={len(self._ready_events)})\n{pformat(self._ready_events, indent=2)}") while len(self._ready_events): event_list = self._ready_events.pop(0) self._process_events(*event_list) - self._logger.debug(f"HANDLING SCHEDULED.") - self._logger.info("-" * 20) + # self._logger.debug(f"HANDLING SCHEDULED.") + # self._logger.info("-" * 20) # Handle 'later' callbacks that are ready. curr_time = self.time() end_time = curr_time + self._clock_resolution @@ -261,24 +266,25 @@ class CausalEventLoop(asyncio.SelectorEventLoop): time_to_buffer = when + delay handle._when = time_to_buffer handle.time_entered_pause_buffer = curr_time - # self._ready.append(handle) # do not allow duplicates (FIX?) if handle not in self._pause_buffer: heapq.heappush(self._pause_buffer, handle) - self._logger.debug(f"\tscheduled -> pause_buffer for {_format_handle(handle)} (delay={delay})") + # self._logger.debug(f"\tscheduled -> pause_buffer for {_format_handle(handle)} (delay={delay})") - self._logger.debug(f"HANDLING PAUSED.") - self._logger.info("-" * 20) + # self._logger.debug(f"HANDLING PAUSED.") + # self._logger.info("-" * 20) # handle callbacks which can leave pause timeout while self._pause_buffer: # required when handle's _when is modified in place - heapq.heapify(self._pause_buffer) + # heapq.heapify(self._pause_buffer) handle = self._pause_buffer[0] for other in self._pause_buffer[1:]: - assert handle._when <= other._when, f"Heap root {handle} is not smallest" + if handle._when > other._when: + # self._logger.critical(f"{other} was modified in place: first: {handle._when} modified: {other._when}") + assert handle._when <= other._when, f"Heap root {handle} is not smallest" if handle._when >= end_time: - self._logger.debug(f"\t{_format_handle(handle)} is not ready to leave. Moving on.") + # self._logger.debug(f"\t{_format_handle(handle)} is not ready to leave. Moving on.") break # pop the first item in the list handle = heapq.heappop(self._pause_buffer) @@ -291,19 +297,20 @@ class CausalEventLoop(asyncio.SelectorEventLoop): if handle._when >= end_time: handle.time_entered_pause_buffer = curr_time heapq.heappush(self._pause_buffer, handle) - self._logger.debug(f"\tpause_buffer -> pause_buffer for {_format_handle(handle)} (delay={delay})") + # self._logger.debug(f"\tpause_buffer -> pause_buffer for {_format_handle(handle)} (delay={delay})") else: self._ready.append(handle) - self._logger.debug(f"\tpause_buffer -> ready for {_format_handle(handle)}") + # self._logger.debug(f"\tpause_buffer -> ready for {_format_handle(handle)}") + @line_profiler.profile def _run_once(self: Any) -> None: """ Run one full iteration of the event loop. This calls all currently ready callbacks. """ - self._logger.debug("STARTING ITERATION.") - self._logger.info("-" * 40) + # self._logger.debug("STARTING ITERATION.") + # self._logger.info("-" * 40) self.update_ready() # This is the only place where callbacks are actually *called*. @@ -312,9 +319,9 @@ class CausalEventLoop(asyncio.SelectorEventLoop): # callbacks scheduled by callbacks run this time around -- # they will be run the next time (after another I/O poll). # Use an idiom that is thread-safe without using locks. - self._logger.info("-" * 40) + # self._logger.info("-" * 40) ntodo = len(self._ready) - self._logger.debug(f"RUNNING {ntodo} CALLBACKS.") + # self._logger.debug(f"RUNNING {ntodo} CALLBACKS.") for i in range(ntodo): handle = self._ready.popleft() if handle._cancelled: @@ -331,8 +338,8 @@ class CausalEventLoop(asyncio.SelectorEventLoop): if self._debug and dt >= self.slow_callback_duration: logger.warning('Executing %s took %.3f seconds', _format_handle(handle), dt) - self._logger.warning('Executing %s took %.3f seconds', - _format_handle(handle), dt) + # self._logger.warning('Executing %s took %.3f seconds', + # _format_handle(handle), dt) time_interval = (handle._when, process_start_time) pause_time = self._get_pause_time(time_interval) @@ -341,10 +348,10 @@ class CausalEventLoop(asyncio.SelectorEventLoop): try: assert wait_time >= -0.0001, f"wait time on {_format_handle(handle)} was found to be {wait_time:.4f}!" except AssertionError as e: - self._logger.critical(f'Negative latency on callback {_format_handle(handle)} ({dt}). Aborting.') + # self._logger.critical(f'Negative latency on callback {_format_handle(handle)} ({dt}). Aborting.') sys.exit(1) self._completed_coros.append((_format_handle(handle), wait_time)) - self._logger.debug(f'\tCompleted {_format_handle(handle)} with latency {dt}') + # self._logger.debug(f'\tCompleted {_format_handle(handle)} with latency {dt}') except Exception: traceback.print_exc() @@ -358,7 +365,7 @@ class CausalEventLoop(asyncio.SelectorEventLoop): if mask & selectors.EVENT_READ and reader is not None: if reader._cancelled: self._remove_reader(fileobj) - self._logger.info(f"\treader {reader} was cancelled.") + # self._logger.info(f"\treader {reader} was cancelled.") else: time_to_buffer = time + \ self._get_pause_for_io(reader, time) @@ -367,7 +374,7 @@ class CausalEventLoop(asyncio.SelectorEventLoop): if mask & selectors.EVENT_WRITE and writer is not None: if writer._cancelled: self._remove_writer(fileobj) - self._logger.info(f"\twriter {writer} was cancelled.") + # self._logger.info(f"\twriter {writer} was cancelled.") else: time_to_buffer = time + \ self._get_pause_for_io(writer, time) @@ -384,7 +391,7 @@ class CausalEventLoop(asyncio.SelectorEventLoop): if not handle._when: handle._when = curr_time self._ready.append(handle) - self._logger.debug(f"\tio -> ready for {_format_handle(handle)}") + # self._logger.debug(f"\tio -> ready for {_format_handle(handle)}") return handle def call_soon_threadsafe(self, callback, *args, context=None): @@ -396,7 +403,7 @@ class CausalEventLoop(asyncio.SelectorEventLoop): if not handle._when: handle._when = self.time() self._ready.append(handle) - self._logger.debug(f"\tio -> ready for {_format_handle(handle)}") + # self._logger.debug(f"\tio -> ready for {_format_handle(handle)}") if handle._source_traceback: del handle._source_traceback[-1] if handle._source_traceback: @@ -416,15 +423,15 @@ class CausalEventLoop(asyncio.SelectorEventLoop): if self._is_vital(handle): handle._when = curr_time self._ready.append(handle) - self._logger.debug(f"\tio -> ready for VITAL {_format_handle(handle)}") + # self._logger.debug(f"\tio -> ready for VITAL {_format_handle(handle)}") else: handle.time_entered_pause_buffer = curr_time # do not allow duplicates (FIX?) if handle not in self._pause_buffer: heapq.heappush(self._pause_buffer, handle) - self._logger.debug(f"\tio -> pause_buffer for {_format_handle(handle)} (delay={handle._when - curr_time})") + # self._logger.debug(f"\tio -> pause_buffer for {_format_handle(handle)} (delay={handle._when - curr_time})") else: - self._logger.warning(f"\t_add_callback called on cancelled handle {_format_handle(handle)}") + # self._logger.warning(f"\t_add_callback called on cancelled handle {_format_handle(handle)}") def _get_pause_for_io(self: Any, handle, io_time: float) -> float: time_interval = (handle.register_time, io_time) |
