summaryrefslogtreecommitdiff
path: root/nemesis/causal_event_loop.py
diff options
context:
space:
mode:
authorbd <bdunahu@operationnull.com>2025-12-01 00:37:20 -0500
committerbd <bdunahu@operationnull.com>2025-12-01 00:37:20 -0500
commite39aed0d7a24cffda89ae0f33e0c40f171ad05ea (patch)
treef50d1eaa0c2c7c6a3679028fd48d31dad09ff30d /nemesis/causal_event_loop.py
parent37bf6949ca4e84760512c05719efbd18d4eb27f3 (diff)
profiling madness
Diffstat (limited to 'nemesis/causal_event_loop.py')
-rw-r--r--nemesis/causal_event_loop.py93
1 files changed, 50 insertions, 43 deletions
diff --git a/nemesis/causal_event_loop.py b/nemesis/causal_event_loop.py
index f9cffb4..1b04135 100644
--- a/nemesis/causal_event_loop.py
+++ b/nemesis/causal_event_loop.py
@@ -22,6 +22,7 @@ from typing import TYPE_CHECKING, Any, Callable, Self
if TYPE_CHECKING:
import contextvars
+import line_profiler
import asyncio
import collections
from sortedcontainers import SortedList
@@ -126,25 +127,28 @@ class CausalEventLoop(asyncio.SelectorEventLoop):
if not os.path.exists(TRACE_PATH):
with open(TRACE_PATH, 'w') as file:
pass
- self._logger = logging.getLogger(f'LOOP {self._thread_id}')
- self._logger.setLevel(self._log_level)
- self._logger.propagate = False
+ # self._logger = logging.getLogger(f'LOOP {self._thread_id}')
+ # self._logger.setLevel(self._log_level)
+ # self._logger.propagate = False
file_handler = logging.FileHandler(TRACE_PATH)
file_handler.setLevel(self._log_level)
formatter = logging.Formatter('%(name)s - %(asctime)s - %(levelname)s --- %(message)s')
file_handler.setFormatter(formatter)
- self._logger.addHandler(file_handler)
- self._logger.info("═" * 40)
- self._logger.info("STARTING LOOP")
- self._logger.info("═" * 40)
+ # self._logger.addHandler(file_handler)
+
+ # self._logger.disabled = True
+
+ # self._logger.info("═" * 40)
+ # self._logger.info("STARTING LOOP")
+ # self._logger.info("═" * 40)
def set_speedup(self: Any, speedup: float) -> None:
# print(self._coro_intervals)
self._start_time = self.time()
self._speedup = speedup
- # self._logger.info(f"STARTING EXPERIMENT WITH {self._speedup}")
- # self._logger.info("═" * 30)
+ # # self._logger.info(f"STARTING EXPERIMENT WITH {self._speedup}")
+ # # self._logger.info("═" * 30)
# reset experiment counters
self._coro_intervals.clear()
@@ -165,16 +169,16 @@ class CausalEventLoop(asyncio.SelectorEventLoop):
return (curr_time - self._start_time) - pause_time
def ping_enter_coro(self: Any) -> None:
- # self._logger.debug(f"Recording coro ENTER.")
+ # # self._logger.debug(f"Recording coro ENTER.")
self._time_entered_coro = self.time()
def ping_exit_coro(self: Any) -> None:
try:
assert isinstance(self._time_entered_coro, float), "Tried to exit coro before recorded entry!"
except AssertionError as e:
- self._logger.critical(f"Tried to exit coro before recorded entry: {e}. Aborting.")
+ # self._logger.critical(f"Tried to exit coro before recorded entry: {e}. Aborting.")
sys.exit(1)
- # self._logger.debug(f"Recording coro EXIT.")
+ # # self._logger.debug(f"Recording coro EXIT.")
self._coro_intervals.add((self._time_entered_coro, self.time()))
self._time_entered_coro = None
@@ -183,6 +187,7 @@ class CausalEventLoop(asyncio.SelectorEventLoop):
if event_list:
self._ready_events.append((event_list, self.time()))
+ @line_profiler.profile
def update_ready(self: Any) -> None:
'''
Polls the IO selector, schedules resulting callbacks, and schedules
@@ -201,7 +206,7 @@ class CausalEventLoop(asyncio.SelectorEventLoop):
for handle in self._scheduled:
if handle._cancelled:
handle._scheduled = False
- self._logger.debug(f"\tSlow cleanup killed {_format_handle(handle)}")
+ # self._logger.debug(f"\tSlow cleanup killed {_format_handle(handle)}")
else:
new_scheduled.append(handle)
@@ -213,7 +218,7 @@ class CausalEventLoop(asyncio.SelectorEventLoop):
while self._scheduled and self._scheduled[0]._cancelled:
self._timer_cancelled_count -= 1
handle = heapq.heappop(self._scheduled)
- self._logger.debug(f"\tLazy cleanup killed {_format_handle(handle)}")
+ # self._logger.debug(f"\tLazy cleanup killed {_format_handle(handle)}")
handle._scheduled = False
timeout = None
@@ -235,17 +240,17 @@ class CausalEventLoop(asyncio.SelectorEventLoop):
timeout = max(0, min(timeout, MAXIMUM_SELECT_TIMEOUT))
timeout = 0
- self._logger.debug(f"HANDLING I/O.")
- self._logger.info("-" * 20)
+ # self._logger.debug(f"HANDLING I/O.")
+ # self._logger.info("-" * 20)
self.collect_ready_events(timeout)
- self._logger.debug(f"\tPolled events for {timeout} (waiting={len(self._ready_events)})\n{pformat(self._ready_events, indent=2)}")
+ # self._logger.debug(f"\tPolled events for {timeout} (waiting={len(self._ready_events)})\n{pformat(self._ready_events, indent=2)}")
while len(self._ready_events):
event_list = self._ready_events.pop(0)
self._process_events(*event_list)
- self._logger.debug(f"HANDLING SCHEDULED.")
- self._logger.info("-" * 20)
+ # self._logger.debug(f"HANDLING SCHEDULED.")
+ # self._logger.info("-" * 20)
# Handle 'later' callbacks that are ready.
curr_time = self.time()
end_time = curr_time + self._clock_resolution
@@ -261,24 +266,25 @@ class CausalEventLoop(asyncio.SelectorEventLoop):
time_to_buffer = when + delay
handle._when = time_to_buffer
handle.time_entered_pause_buffer = curr_time
- # self._ready.append(handle)
# do not allow duplicates (FIX?)
if handle not in self._pause_buffer:
heapq.heappush(self._pause_buffer, handle)
- self._logger.debug(f"\tscheduled -> pause_buffer for {_format_handle(handle)} (delay={delay})")
+ # self._logger.debug(f"\tscheduled -> pause_buffer for {_format_handle(handle)} (delay={delay})")
- self._logger.debug(f"HANDLING PAUSED.")
- self._logger.info("-" * 20)
+ # self._logger.debug(f"HANDLING PAUSED.")
+ # self._logger.info("-" * 20)
# handle callbacks which can leave pause timeout
while self._pause_buffer:
# required when handle's _when is modified in place
- heapq.heapify(self._pause_buffer)
+ # heapq.heapify(self._pause_buffer)
handle = self._pause_buffer[0]
for other in self._pause_buffer[1:]:
- assert handle._when <= other._when, f"Heap root {handle} is not smallest"
+ if handle._when > other._when:
+ # self._logger.critical(f"{other} was modified in place: first: {handle._when} modified: {other._when}")
+ assert handle._when <= other._when, f"Heap root {handle} is not smallest"
if handle._when >= end_time:
- self._logger.debug(f"\t{_format_handle(handle)} is not ready to leave. Moving on.")
+ # self._logger.debug(f"\t{_format_handle(handle)} is not ready to leave. Moving on.")
break
# pop the first item in the list
handle = heapq.heappop(self._pause_buffer)
@@ -291,19 +297,20 @@ class CausalEventLoop(asyncio.SelectorEventLoop):
if handle._when >= end_time:
handle.time_entered_pause_buffer = curr_time
heapq.heappush(self._pause_buffer, handle)
- self._logger.debug(f"\tpause_buffer -> pause_buffer for {_format_handle(handle)} (delay={delay})")
+ # self._logger.debug(f"\tpause_buffer -> pause_buffer for {_format_handle(handle)} (delay={delay})")
else:
self._ready.append(handle)
- self._logger.debug(f"\tpause_buffer -> ready for {_format_handle(handle)}")
+ # self._logger.debug(f"\tpause_buffer -> ready for {_format_handle(handle)}")
+ @line_profiler.profile
def _run_once(self: Any) -> None:
"""
Run one full iteration of the event loop.
This calls all currently ready callbacks.
"""
- self._logger.debug("STARTING ITERATION.")
- self._logger.info("-" * 40)
+ # self._logger.debug("STARTING ITERATION.")
+ # self._logger.info("-" * 40)
self.update_ready()
# This is the only place where callbacks are actually *called*.
@@ -312,9 +319,9 @@ class CausalEventLoop(asyncio.SelectorEventLoop):
# callbacks scheduled by callbacks run this time around --
# they will be run the next time (after another I/O poll).
# Use an idiom that is thread-safe without using locks.
- self._logger.info("-" * 40)
+ # self._logger.info("-" * 40)
ntodo = len(self._ready)
- self._logger.debug(f"RUNNING {ntodo} CALLBACKS.")
+ # self._logger.debug(f"RUNNING {ntodo} CALLBACKS.")
for i in range(ntodo):
handle = self._ready.popleft()
if handle._cancelled:
@@ -331,8 +338,8 @@ class CausalEventLoop(asyncio.SelectorEventLoop):
if self._debug and dt >= self.slow_callback_duration:
logger.warning('Executing %s took %.3f seconds',
_format_handle(handle), dt)
- self._logger.warning('Executing %s took %.3f seconds',
- _format_handle(handle), dt)
+ # self._logger.warning('Executing %s took %.3f seconds',
+ # _format_handle(handle), dt)
time_interval = (handle._when, process_start_time)
pause_time = self._get_pause_time(time_interval)
@@ -341,10 +348,10 @@ class CausalEventLoop(asyncio.SelectorEventLoop):
try:
assert wait_time >= -0.0001, f"wait time on {_format_handle(handle)} was found to be {wait_time:.4f}!"
except AssertionError as e:
- self._logger.critical(f'Negative latency on callback {_format_handle(handle)} ({dt}). Aborting.')
+ # self._logger.critical(f'Negative latency on callback {_format_handle(handle)} ({dt}). Aborting.')
sys.exit(1)
self._completed_coros.append((_format_handle(handle), wait_time))
- self._logger.debug(f'\tCompleted {_format_handle(handle)} with latency {dt}')
+ # self._logger.debug(f'\tCompleted {_format_handle(handle)} with latency {dt}')
except Exception:
traceback.print_exc()
@@ -358,7 +365,7 @@ class CausalEventLoop(asyncio.SelectorEventLoop):
if mask & selectors.EVENT_READ and reader is not None:
if reader._cancelled:
self._remove_reader(fileobj)
- self._logger.info(f"\treader {reader} was cancelled.")
+ # self._logger.info(f"\treader {reader} was cancelled.")
else:
time_to_buffer = time + \
self._get_pause_for_io(reader, time)
@@ -367,7 +374,7 @@ class CausalEventLoop(asyncio.SelectorEventLoop):
if mask & selectors.EVENT_WRITE and writer is not None:
if writer._cancelled:
self._remove_writer(fileobj)
- self._logger.info(f"\twriter {writer} was cancelled.")
+ # self._logger.info(f"\twriter {writer} was cancelled.")
else:
time_to_buffer = time + \
self._get_pause_for_io(writer, time)
@@ -384,7 +391,7 @@ class CausalEventLoop(asyncio.SelectorEventLoop):
if not handle._when:
handle._when = curr_time
self._ready.append(handle)
- self._logger.debug(f"\tio -> ready for {_format_handle(handle)}")
+ # self._logger.debug(f"\tio -> ready for {_format_handle(handle)}")
return handle
def call_soon_threadsafe(self, callback, *args, context=None):
@@ -396,7 +403,7 @@ class CausalEventLoop(asyncio.SelectorEventLoop):
if not handle._when:
handle._when = self.time()
self._ready.append(handle)
- self._logger.debug(f"\tio -> ready for {_format_handle(handle)}")
+ # self._logger.debug(f"\tio -> ready for {_format_handle(handle)}")
if handle._source_traceback:
del handle._source_traceback[-1]
if handle._source_traceback:
@@ -416,15 +423,15 @@ class CausalEventLoop(asyncio.SelectorEventLoop):
if self._is_vital(handle):
handle._when = curr_time
self._ready.append(handle)
- self._logger.debug(f"\tio -> ready for VITAL {_format_handle(handle)}")
+ # self._logger.debug(f"\tio -> ready for VITAL {_format_handle(handle)}")
else:
handle.time_entered_pause_buffer = curr_time
# do not allow duplicates (FIX?)
if handle not in self._pause_buffer:
heapq.heappush(self._pause_buffer, handle)
- self._logger.debug(f"\tio -> pause_buffer for {_format_handle(handle)} (delay={handle._when - curr_time})")
+ # self._logger.debug(f"\tio -> pause_buffer for {_format_handle(handle)} (delay={handle._when - curr_time})")
else:
- self._logger.warning(f"\t_add_callback called on cancelled handle {_format_handle(handle)}")
+ # self._logger.warning(f"\t_add_callback called on cancelled handle {_format_handle(handle)}")
def _get_pause_for_io(self: Any, handle, io_time: float) -> float:
time_interval = (handle.register_time, io_time)