summaryrefslogtreecommitdiff
path: root/nemesis/causal_event_loop.py
diff options
context:
space:
mode:
authorbd <bdunahu@operationnull.com>2025-11-26 03:00:53 -0500
committerbd <bdunahu@operationnull.com>2025-11-26 03:00:53 -0500
commitd5b874c905907528c5e0c46f9c4b5ff8827fc6db (patch)
tree51de915a85eceb268122978864c68a4252986222 /nemesis/causal_event_loop.py
parentdf45e4380bd325c333ccdd48771b4ceaf36ff4c4 (diff)
Numerous big fixes, target function only selects in event loop
Diffstat (limited to 'nemesis/causal_event_loop.py')
-rw-r--r--nemesis/causal_event_loop.py140
1 files changed, 104 insertions, 36 deletions
diff --git a/nemesis/causal_event_loop.py b/nemesis/causal_event_loop.py
index 6d18dbc..6fea90d 100644
--- a/nemesis/causal_event_loop.py
+++ b/nemesis/causal_event_loop.py
@@ -24,6 +24,7 @@ from sortedcontainers import SortedList
import heapq
import selectors
import sys
+import os
import time
import traceback
from copy import copy
@@ -31,10 +32,16 @@ from asyncio.log import logger
from asyncio import Task, events
from asyncio.base_events import _format_handle
+from pathlib import Path
+from pprint import pformat
+import logging
+
_MIN_SCHEDULED_TIMER_HANDLES = 100
_MIN_CANCELLED_TIMER_HANDLES_FRACTION = 0.5
MAXIMUM_SELECT_TIMEOUT = 24 * 3600
+TRACE_PATH = \
+ Path(__file__).resolve().parent.parent / "logs" / "trace.log"
orig_handle = asyncio.events.Handle
@@ -90,9 +97,11 @@ for sc in [orig_handle] + orig_handle.__subclasses__():
class CausalEventLoop(asyncio.SelectorEventLoop):
+ _log_level = logging.DEBUG
+ _logger = None
# a value between 0 and 1. 0 means no optimization,
# 1 means the target coroutine is optimized away entirely
- _speedup = 1.0
+ _speedup = 0.0
# a list of callbacks which have recently completed
_pause_buffer = []
# a list of intervals in which the target coroutine has been active
@@ -107,12 +116,32 @@ class CausalEventLoop(asyncio.SelectorEventLoop):
def __init__(self) -> None:
super().__init__()
+ self.set_logger()
+
+ def set_logger(self):
+ if not os.path.exists(TRACE_PATH):
+ with open(TRACE_PATH, 'w') as file:
+ pass
+ self._logger = logging.getLogger(f'LOOP {self._thread_id}')
+ self._logger.setLevel(self._log_level)
+ self._logger.propagate = False
+ file_handler = logging.FileHandler(TRACE_PATH)
+ file_handler.setLevel(self._log_level)
+ formatter = logging.Formatter('%(name)s - %(asctime)s - %(levelname)s --- %(message)s')
+ file_handler.setFormatter(formatter)
+ self._logger.addHandler(file_handler)
+ self._logger.info("═" * 40)
+ self._logger.info("STARTING LOOP")
+ self._logger.info("═" * 40)
def set_speedup(self, speedup):
# print(self._coro_intervals)
self._start_time = self.time()
self._speedup = speedup
+ # self._logger.info(f"STARTING EXPERIMENT WITH {self._speedup}")
+ # self._logger.info("═" * 30)
+
# reset experiment counters
self._coro_intervals.clear()
self._completed_coros.clear()
@@ -132,20 +161,23 @@ class CausalEventLoop(asyncio.SelectorEventLoop):
return (curr_time - self._start_time) - pause_time
def ping_enter_coro(self):
+ # self._logger.debug(f"Recording coro ENTER.")
self._time_entered_coro = self.time()
def ping_exit_coro(self):
try:
assert isinstance(self._time_entered_coro, float), "Tried to exit coro before recorded entry!"
except AssertionError as e:
- print(f"Assertion failed: {e}")
+ self._logger.critical(f"Tried to exit coro before recorded entry: {e}. Aborting.")
sys.exit(1)
+ # self._logger.debug(f"Recording coro EXIT.")
self._coro_intervals.add((self._time_entered_coro, self.time()))
self._time_entered_coro = None
def collect_ready_events(self, timeout=0):
event_list = self._selector.select(timeout)
- self._ready_events.append((event_list, self.time()))
+ if event_list:
+ self._ready_events.append((event_list, self.time()))
def update_ready(self):
'''
@@ -154,6 +186,8 @@ class CausalEventLoop(asyncio.SelectorEventLoop):
'''
curr_time = self.time()
sched_count = len(self._scheduled)
+ # two methods to cleanup cancelled callbacks;
+ # avoid the expensive one whenever possible
if (sched_count > _MIN_SCHEDULED_TIMER_HANDLES and
self._timer_cancelled_count / sched_count >
_MIN_CANCELLED_TIMER_HANDLES_FRACTION):
@@ -163,6 +197,7 @@ class CausalEventLoop(asyncio.SelectorEventLoop):
for handle in self._scheduled:
if handle._cancelled:
handle._scheduled = False
+ self._logger.debug(f"\tSlow cleanup killed {_format_handle(handle)}")
else:
new_scheduled.append(handle)
@@ -174,6 +209,7 @@ class CausalEventLoop(asyncio.SelectorEventLoop):
while self._scheduled and self._scheduled[0]._cancelled:
self._timer_cancelled_count -= 1
handle = heapq.heappop(self._scheduled)
+ self._logger.debug(f"\tLazy cleanup killed {_format_handle(handle)}")
handle._scheduled = False
timeout = None
@@ -195,10 +231,17 @@ class CausalEventLoop(asyncio.SelectorEventLoop):
timeout = max(0, min(timeout, MAXIMUM_SELECT_TIMEOUT))
timeout = 0
+ self._logger.debug(f"HANDLING I/O.")
+ self._logger.info("-" * 20)
self.collect_ready_events(timeout)
- for event_list in self._ready_events:
+ self._logger.debug(f"\tPolled events for {timeout} (waiting={len(self._ready_events)})\n{pformat(self._ready_events, indent=2)}")
+
+ while len(self._ready_events):
+ event_list = self._ready_events.pop(0)
self._process_events(*event_list)
+ self._logger.debug(f"HANDLING SCHEDULED.")
+ self._logger.info("-" * 20)
# Handle 'later' callbacks that are ready.
curr_time = self.time()
end_time = curr_time + self._clock_resolution
@@ -210,11 +253,19 @@ class CausalEventLoop(asyncio.SelectorEventLoop):
handle = heapq.heappop(self._scheduled)
handle._scheduled = False
- time_to_buffer = when + self._get_pause_for_io(handle, curr_time)
+ delay = self._get_pause_for_io(handle, curr_time)
+ time_to_buffer = when + delay
handle._when = time_to_buffer
handle.time_entered_pause_buffer = curr_time
- heapq.heappush(self._pause_buffer, handle)
+ # self._ready.append(handle)
+ # do not allow duplicates (FIX?)
+ if handle not in self._pause_buffer:
+ heapq.heappush(self._pause_buffer, handle)
+ self._logger.debug(f"\tscheduled -> pause_buffer for {_format_handle(handle)} (delay={delay})")
+
+ self._logger.debug(f"HANDLING PAUSED.")
+ self._logger.info("-" * 20)
# handle callbacks which can leave pause timeout
while self._pause_buffer:
# required when handle's _when is modified in place
@@ -223,6 +274,7 @@ class CausalEventLoop(asyncio.SelectorEventLoop):
for other in self._pause_buffer[1:]:
assert handle._when <= other._when, f"Heap root {handle} is not smallest"
if handle._when >= end_time:
+ self._logger.debug(f"\t{_format_handle(handle)} is not ready to leave. Moving on.")
break
# pop the first item in the list
handle = heapq.heappop(self._pause_buffer)
@@ -230,13 +282,15 @@ class CausalEventLoop(asyncio.SelectorEventLoop):
# if we paused during buffering, we need to delay again
# TODO clean this up
# this whole file has 'rounding' timing errors :(
- handle._when = handle._when + \
- self._get_pause_for_pause_time(handle, curr_time)
+ delay = self._get_pause_for_pause_time(handle, curr_time)
+ handle._when = handle._when + delay
if handle._when >= end_time:
handle.time_entered_pause_buffer = curr_time
heapq.heappush(self._pause_buffer, handle)
+ self._logger.debug(f"\tpause_buffer -> pause_buffer for {_format_handle(handle)} (delay={delay})")
else:
self._ready.append(handle)
+ self._logger.debug(f"\tpause_buffer -> ready for {_format_handle(handle)}")
def _run_once(self):
"""
@@ -244,6 +298,8 @@ class CausalEventLoop(asyncio.SelectorEventLoop):
This calls all currently ready callbacks.
"""
+ self._logger.debug("STARTING ITERATION.")
+ self._logger.info("-" * 40)
self.update_ready()
# This is the only place where callbacks are actually *called*.
@@ -252,7 +308,9 @@ class CausalEventLoop(asyncio.SelectorEventLoop):
# callbacks scheduled by callbacks run this time around --
# they will be run the next time (after another I/O poll).
# Use an idiom that is thread-safe without using locks.
+ self._logger.info("-" * 40)
ntodo = len(self._ready)
+ self._logger.debug(f"RUNNING {ntodo} CALLBACKS.")
for i in range(ntodo):
handle = self._ready.popleft()
if handle._cancelled:
@@ -269,21 +327,20 @@ class CausalEventLoop(asyncio.SelectorEventLoop):
if self._debug and dt >= self.slow_callback_duration:
logger.warning('Executing %s took %.3f seconds',
_format_handle(handle), dt)
-
- # do not record coroutines which left I/O during the previous experiment
- # the time held in the pause buffer would have also been incorrect for
- # this experiment, but there is nothing we can do about it.
- if handle._when > self._start_time:
- time_interval = (handle._when, process_start_time)
- pause_time = self._get_pause_time(time_interval)
- adjusted_start_time = process_start_time - pause_time
- wait_time = adjusted_start_time - handle._when
- try:
- assert wait_time >= -0.0001, f"wait time on {_format_handle(handle)} was found to be {wait_time:.4f}!"
- except AssertionError as e:
- print(f"Assertion failed: {e}")
- sys.exit(1)
- self._completed_coros.append((_format_handle(handle), wait_time))
+ self._logger.warning('Executing %s took %.3f seconds',
+ _format_handle(handle), dt)
+
+ time_interval = (handle._when, process_start_time)
+ pause_time = self._get_pause_time(time_interval)
+ adjusted_start_time = process_start_time - pause_time
+ wait_time = adjusted_start_time - handle._when
+ try:
+ assert wait_time >= -0.0001, f"wait time on {_format_handle(handle)} was found to be {wait_time:.4f}!"
+ except AssertionError as e:
+ self._logger.critical(f'Negative latency on callback {_format_handle(handle)} ({dt}). Aborting.')
+ sys.exit(1)
+ self._completed_coros.append((_format_handle(handle), wait_time))
+ self._logger.debug(f'\tCompleted {_format_handle(handle)} with latency {dt}')
except Exception:
traceback.print_exc()
@@ -297,6 +354,7 @@ class CausalEventLoop(asyncio.SelectorEventLoop):
if mask & selectors.EVENT_READ and reader is not None:
if reader._cancelled:
self._remove_reader(fileobj)
+ self._logger.info(f"\treader {reader} was cancelled.")
else:
time_to_buffer = time + \
self._get_pause_for_io(reader, time)
@@ -305,6 +363,7 @@ class CausalEventLoop(asyncio.SelectorEventLoop):
if mask & selectors.EVENT_WRITE and writer is not None:
if writer._cancelled:
self._remove_writer(fileobj)
+ self._logger.info(f"\twriter {writer} was cancelled.")
else:
time_to_buffer = time + \
self._get_pause_for_io(writer, time)
@@ -321,6 +380,7 @@ class CausalEventLoop(asyncio.SelectorEventLoop):
if not handle._when:
handle._when = curr_time
self._ready.append(handle)
+ self._logger.debug(f"\tio -> ready for {_format_handle(handle)}")
return handle
def call_soon_threadsafe(self, callback, *args, context=None):
@@ -329,7 +389,10 @@ class CausalEventLoop(asyncio.SelectorEventLoop):
if self._debug:
self._check_callback(callback, 'call_soon_threadsafe')
handle = events._ThreadSafeHandle(callback, args, self, context)
- self._add_callback(handle)
+ if not handle._when:
+ handle._when = self.time()
+ self._ready.append(handle)
+ self._logger.debug(f"\tio -> ready for {_format_handle(handle)}")
if handle._source_traceback:
del handle._source_traceback[-1]
if handle._source_traceback:
@@ -345,15 +408,19 @@ class CausalEventLoop(asyncio.SelectorEventLoop):
# required in cases where the event loop reuses the same handle
if handle._when < curr_time:
handle._when = curr_time
- if not handle._cancelled and handle not in self._pause_buffer:
+ if not handle._cancelled:
if self._is_vital(handle):
handle._when = curr_time
- # print(f'{_format_handle(handle)}: {curr_time}')
self._ready.append(handle)
+ self._logger.debug(f"\tio -> ready for VITAL {_format_handle(handle)}")
else:
- # print(f'DELAYED {_format_handle(handle)}: {curr_time}')
handle.time_entered_pause_buffer = curr_time
- heapq.heappush(self._pause_buffer, handle)
+ # do not allow duplicates (FIX?)
+ if handle not in self._pause_buffer:
+ heapq.heappush(self._pause_buffer, handle)
+ self._logger.debug(f"\tio -> pause_buffer for {_format_handle(handle)} (delay={handle._when - curr_time})")
+ else:
+ self._logger.warning(f"\t_add_callback called on cancelled handle {_format_handle(handle)}")
def _get_pause_for_io(self, handle, io_time):
@@ -389,16 +456,19 @@ class CausalEventLoop(asyncio.SelectorEventLoop):
time += self._get_overlap(start, end, self._time_entered_coro,
curr_time)
- return time * self._speedup
+ pause_time = time * self._speedup
+ assert pause_time >= 0, f"Delay was found to be less than 0: {pause_time}"
+ return pause_time
def _get_overlap(self, a_start, a_end, b_start, b_end):
overlap_start = max(a_start, b_start)
overlap_end = min(a_end, b_end)
+ assert overlap_end >= overlap_start, f"Bad overlaps: {a_start} {a_end} : {b_start} {b_end} ({overlap_end - overlap_start})"
return overlap_end - overlap_start
def _is_vital(self, handle):
""" Methods which cannot afford to be paused."""
- blacklist = ['_read_from_self', '_read_ready']
+ blacklist = ['_read_from_self', '_read_ready', '_accept_connection']
cb = handle._callback
if isinstance(getattr(cb, '__self__', None), asyncio.tasks.Task):
if cb.__self__.get_coro().__name__ in blacklist:
@@ -409,9 +479,7 @@ class CausalEventLoop(asyncio.SelectorEventLoop):
return False
-class CausalEventLoopPolicy(asyncio.DefaultEventLoopPolicy):
- def new_event_loop(self):
- return CausalEventLoop()
-
-
-asyncio.set_event_loop_policy(CausalEventLoopPolicy())
+# to profile your program, start the event loop with
+# asyncio.run(coro, loop_factory=causal_loop_factory)
+def causal_loop_factory():
+ return CausalEventLoop()