From c0aa037b4e3d7903254ef68e4654c66082c81c68 Mon Sep 17 00:00:00 2001 From: bd Date: Wed, 3 Dec 2025 01:39:56 -0500 Subject: fix incorrect handling of repeated events in sampler --- nemesis/causal_event_loop.py | 67 +++++++++++++++++++++++++++++++++----------- 1 file changed, 51 insertions(+), 16 deletions(-) (limited to 'nemesis/causal_event_loop.py') diff --git a/nemesis/causal_event_loop.py b/nemesis/causal_event_loop.py index 35792fd..dc807aa 100644 --- a/nemesis/causal_event_loop.py +++ b/nemesis/causal_event_loop.py @@ -25,6 +25,7 @@ if TYPE_CHECKING: import line_profiler import asyncio import collections +# TODO remove dependency from sortedcontainers import SortedList import heapq import selectors @@ -35,6 +36,7 @@ import traceback from copy import copy from asyncio.log import logger from asyncio import Task, events +from collections import defaultdict from asyncio.base_events import _format_handle from pathlib import Path @@ -83,7 +85,6 @@ class TimeAwareMixin: return self._when > other._when or self.__eq__(other) return NotImplemented - def create_subclass(base_class: "type[orig_handle]"): class CausalHandle(base_class, TimeAwareMixin): def __init__(self: Self, *args: tuple|Any|Callable|None, **kwargs: None) -> None: @@ -111,7 +112,7 @@ class CausalEventLoop(asyncio.SelectorEventLoop): _completed_coros = [] # the last time we entered the target coro _time_entered_coro = None - _ready_events = [] + _ready_events = {} # the time this experiment started _start_time = float("inf") @@ -132,7 +133,7 @@ class CausalEventLoop(asyncio.SelectorEventLoop): file_handler.setFormatter(formatter) # self._logger.addHandler(file_handler) - # self._logger.disabled = True + # # self._logger.disabled = True # self._logger.info("═" * 40) # self._logger.info("STARTING LOOP") @@ -179,16 +180,32 @@ class CausalEventLoop(asyncio.SelectorEventLoop): self._time_entered_coro = None def collect_ready_events(self: Any, timeout: int=0) -> None: + """ + cannot add shit already in ready events lol + cannot add shit in scheduled either (we need to match original EL + behavior exactly; with this pause schema we essentially been given + an I/O event 'early'. We shouldn't be given it again. + also makes it look like it came later than it really did, just bad. + """ + # TODO this method could probably be optimized, it runs a lot event_list = self._selector.select(timeout) - if event_list: - self._ready_events.append((event_list, self.time())) + curr_time = self.time() + scheduled = {t[1] for t in self._scheduled} + for e in event_list: + reader, writer = e[0].data + if e not in self._ready_events and \ + not ( + (reader is not None and reader in scheduled) or + (writer is not None and writer in scheduled) + ): + self._ready_events[e] = curr_time @line_profiler.profile def update_ready(self: Any) -> None: - ''' + """ Polls the IO selector, schedules resulting callbacks, and schedules 'call_later' callbacks. - ''' + """ curr_time = self.time() sched_count = len(self._scheduled) # two methods to cleanup cancelled callbacks; @@ -221,22 +238,29 @@ class CausalEventLoop(asyncio.SelectorEventLoop): if self._ready or self._stopping: timeout = 0 elif self._scheduled: - timeout = _get_next_effective_time() + timeout = self._get_next_effective_time() # self._logger.debug(f"HANDLING I/O.") # self._logger.info("-" * 20) self.collect_ready_events(timeout) # self._logger.debug(f"\tPolled events for {timeout} (waiting={len(self._ready_events)})\n{pformat(self._ready_events, indent=2)}") - while len(self._ready_events): - event_list = self._ready_events.pop(0) - self._process_events(*event_list) + ts_groups = defaultdict(list) + ready_copy = self._ready_events + # bind new address to protect against signaler + self._ready_events = {} + for e, ts in ready_copy.items(): + ts_groups[ts].append(e) + + for k, v in ts_groups.items(): + self._process_events(v, k) # self._logger.debug(f"HANDLING SCHEDULED.") # self._logger.info("-" * 20) # Handle 'scheduled' callbacks that are ready. # note 'scheduled' callbacks include both I/O bound or call_later self._add_ready_handles() + # self._logger.info(f"\tscheduled now size: {len(self._scheduled)}") @line_profiler.profile def _run_once(self: Any) -> None: @@ -275,7 +299,7 @@ class CausalEventLoop(asyncio.SelectorEventLoop): logger.warning('Executing %s took %.3f seconds', _format_handle(handle), dt) # self._logger.warning('Executing %s took %.3f seconds', - # _format_handle(handle), dt) + # _format_handle(handle), dt) time_interval = (handle._when, process_start_time) pause_time = self._get_pause_time(time_interval) @@ -295,6 +319,7 @@ class CausalEventLoop(asyncio.SelectorEventLoop): self._current_handle = None handle = None # Needed to break cycles when an exception occurs. + @line_profiler.profile def _get_next_effective_time(self): """ Returns the delta time in which the next currently processing callback @@ -318,9 +343,18 @@ class CausalEventLoop(asyncio.SelectorEventLoop): heapq.heappush(self._scheduled, (eff_true, handle)) return min(next_effective_time, MAXIMUM_SELECT_TIMEOUT) + @line_profiler.profile def _add_ready_handles(self): curr_time = self.time() + self._clock_resolution while self._scheduled: + # debug + # head = self._scheduled[0][0] + # EPS = 1e-12 + # for i in range(1, len(self._scheduled)): + # assert head <= self._scheduled[i][0] + EPS, ( + # f"min heap was not preserved: {self._scheduled[0][1]} > {self._scheduled[i][1]} ({head} > {self._scheduled[i][0]})" + # ) + eff_guess, handle = heapq.heappop(self._scheduled) eff_true = handle._when + self._get_pause_for_io(handle, curr_time) if curr_time >= eff_true: @@ -332,8 +366,10 @@ class CausalEventLoop(asyncio.SelectorEventLoop): heapq.heappush(self._scheduled, (eff_true, handle)) break - def _process_events(self, event_list, time): + """ + time -> time this handle finished IO + """ for key, mask in event_list: fileobj, (reader, writer) = key.fileobj, key.data if mask & selectors.EVENT_READ and reader is not None: @@ -369,8 +405,7 @@ class CausalEventLoop(asyncio.SelectorEventLoop): timer = events.TimerHandle(when, callback, args, self, context) if timer._source_traceback: del timer._source_traceback[-1] - heapq.heappush(self._scheduled, timer) - heapq.heappush(self._estimated, (timer._when, timer)) + heapq.heappush(self._scheduled, (timer._when, timer)) timer._scheduled = True return timer @@ -464,7 +499,7 @@ class CausalEventLoop(asyncio.SelectorEventLoop): def _is_vital(self, handle): """ Methods which cannot afford to be paused.""" - blacklist = ['_read_from_self', '_read_ready', '_accept_connection'] + blacklist = [] cb = handle._callback if isinstance(getattr(cb, '__self__', None), asyncio.tasks.Task): if cb.__self__.get_coro().__name__ in blacklist: -- cgit v1.2.3