summaryrefslogtreecommitdiff
path: root/nemesis/causal_event_loop.py
diff options
context:
space:
mode:
authorbd <bdunahu@operationnull.com>2025-12-03 01:39:56 -0500
committerbd <bdunahu@operationnull.com>2025-12-03 01:39:56 -0500
commitc0aa037b4e3d7903254ef68e4654c66082c81c68 (patch)
treeb6d4c233fec8b0e851eee780db4dca2775bf34b0 /nemesis/causal_event_loop.py
parent4fae663bfa9f228148f3fbd6b55196ce3492e5ad (diff)
fix incorrect handling of repeated events in sampler
Diffstat (limited to 'nemesis/causal_event_loop.py')
-rw-r--r--nemesis/causal_event_loop.py67
1 files changed, 51 insertions, 16 deletions
diff --git a/nemesis/causal_event_loop.py b/nemesis/causal_event_loop.py
index 35792fd..dc807aa 100644
--- a/nemesis/causal_event_loop.py
+++ b/nemesis/causal_event_loop.py
@@ -25,6 +25,7 @@ if TYPE_CHECKING:
import line_profiler
import asyncio
import collections
+# TODO remove dependency
from sortedcontainers import SortedList
import heapq
import selectors
@@ -35,6 +36,7 @@ import traceback
from copy import copy
from asyncio.log import logger
from asyncio import Task, events
+from collections import defaultdict
from asyncio.base_events import _format_handle
from pathlib import Path
@@ -83,7 +85,6 @@ class TimeAwareMixin:
return self._when > other._when or self.__eq__(other)
return NotImplemented
-
def create_subclass(base_class: "type[orig_handle]"):
class CausalHandle(base_class, TimeAwareMixin):
def __init__(self: Self, *args: tuple|Any|Callable|None, **kwargs: None) -> None:
@@ -111,7 +112,7 @@ class CausalEventLoop(asyncio.SelectorEventLoop):
_completed_coros = []
# the last time we entered the target coro
_time_entered_coro = None
- _ready_events = []
+ _ready_events = {}
# the time this experiment started
_start_time = float("inf")
@@ -132,7 +133,7 @@ class CausalEventLoop(asyncio.SelectorEventLoop):
file_handler.setFormatter(formatter)
# self._logger.addHandler(file_handler)
- # self._logger.disabled = True
+ # # self._logger.disabled = True
# self._logger.info("═" * 40)
# self._logger.info("STARTING LOOP")
@@ -179,16 +180,32 @@ class CausalEventLoop(asyncio.SelectorEventLoop):
self._time_entered_coro = None
def collect_ready_events(self: Any, timeout: int=0) -> None:
+ """
+ cannot add shit already in ready events lol
+ cannot add shit in scheduled either (we need to match original EL
+ behavior exactly; with this pause schema we essentially been given
+ an I/O event 'early'. We shouldn't be given it again.
+ also makes it look like it came later than it really did, just bad.
+ """
+ # TODO this method could probably be optimized, it runs a lot
event_list = self._selector.select(timeout)
- if event_list:
- self._ready_events.append((event_list, self.time()))
+ curr_time = self.time()
+ scheduled = {t[1] for t in self._scheduled}
+ for e in event_list:
+ reader, writer = e[0].data
+ if e not in self._ready_events and \
+ not (
+ (reader is not None and reader in scheduled) or
+ (writer is not None and writer in scheduled)
+ ):
+ self._ready_events[e] = curr_time
@line_profiler.profile
def update_ready(self: Any) -> None:
- '''
+ """
Polls the IO selector, schedules resulting callbacks, and schedules
'call_later' callbacks.
- '''
+ """
curr_time = self.time()
sched_count = len(self._scheduled)
# two methods to cleanup cancelled callbacks;
@@ -221,22 +238,29 @@ class CausalEventLoop(asyncio.SelectorEventLoop):
if self._ready or self._stopping:
timeout = 0
elif self._scheduled:
- timeout = _get_next_effective_time()
+ timeout = self._get_next_effective_time()
# self._logger.debug(f"HANDLING I/O.")
# self._logger.info("-" * 20)
self.collect_ready_events(timeout)
# self._logger.debug(f"\tPolled events for {timeout} (waiting={len(self._ready_events)})\n{pformat(self._ready_events, indent=2)}")
- while len(self._ready_events):
- event_list = self._ready_events.pop(0)
- self._process_events(*event_list)
+ ts_groups = defaultdict(list)
+ ready_copy = self._ready_events
+ # bind new address to protect against signaler
+ self._ready_events = {}
+ for e, ts in ready_copy.items():
+ ts_groups[ts].append(e)
+
+ for k, v in ts_groups.items():
+ self._process_events(v, k)
# self._logger.debug(f"HANDLING SCHEDULED.")
# self._logger.info("-" * 20)
# Handle 'scheduled' callbacks that are ready.
# note 'scheduled' callbacks include both I/O bound or call_later
self._add_ready_handles()
+ # self._logger.info(f"\tscheduled now size: {len(self._scheduled)}")
@line_profiler.profile
def _run_once(self: Any) -> None:
@@ -275,7 +299,7 @@ class CausalEventLoop(asyncio.SelectorEventLoop):
logger.warning('Executing %s took %.3f seconds',
_format_handle(handle), dt)
# self._logger.warning('Executing %s took %.3f seconds',
- # _format_handle(handle), dt)
+ # _format_handle(handle), dt)
time_interval = (handle._when, process_start_time)
pause_time = self._get_pause_time(time_interval)
@@ -295,6 +319,7 @@ class CausalEventLoop(asyncio.SelectorEventLoop):
self._current_handle = None
handle = None # Needed to break cycles when an exception occurs.
+ @line_profiler.profile
def _get_next_effective_time(self):
"""
Returns the delta time in which the next currently processing callback
@@ -318,9 +343,18 @@ class CausalEventLoop(asyncio.SelectorEventLoop):
heapq.heappush(self._scheduled, (eff_true, handle))
return min(next_effective_time, MAXIMUM_SELECT_TIMEOUT)
+ @line_profiler.profile
def _add_ready_handles(self):
curr_time = self.time() + self._clock_resolution
while self._scheduled:
+ # debug
+ # head = self._scheduled[0][0]
+ # EPS = 1e-12
+ # for i in range(1, len(self._scheduled)):
+ # assert head <= self._scheduled[i][0] + EPS, (
+ # f"min heap was not preserved: {self._scheduled[0][1]} > {self._scheduled[i][1]} ({head} > {self._scheduled[i][0]})"
+ # )
+
eff_guess, handle = heapq.heappop(self._scheduled)
eff_true = handle._when + self._get_pause_for_io(handle, curr_time)
if curr_time >= eff_true:
@@ -332,8 +366,10 @@ class CausalEventLoop(asyncio.SelectorEventLoop):
heapq.heappush(self._scheduled, (eff_true, handle))
break
-
def _process_events(self, event_list, time):
+ """
+ time -> time this handle finished IO
+ """
for key, mask in event_list:
fileobj, (reader, writer) = key.fileobj, key.data
if mask & selectors.EVENT_READ and reader is not None:
@@ -369,8 +405,7 @@ class CausalEventLoop(asyncio.SelectorEventLoop):
timer = events.TimerHandle(when, callback, args, self, context)
if timer._source_traceback:
del timer._source_traceback[-1]
- heapq.heappush(self._scheduled, timer)
- heapq.heappush(self._estimated, (timer._when, timer))
+ heapq.heappush(self._scheduled, (timer._when, timer))
timer._scheduled = True
return timer
@@ -464,7 +499,7 @@ class CausalEventLoop(asyncio.SelectorEventLoop):
def _is_vital(self, handle):
""" Methods which cannot afford to be paused."""
- blacklist = ['_read_from_self', '_read_ready', '_accept_connection']
+ blacklist = []
cb = handle._callback
if isinstance(getattr(cb, '__self__', None), asyncio.tasks.Task):
if cb.__self__.get_coro().__name__ in blacklist: