summaryrefslogtreecommitdiff
path: root/nemesis/causal_event_loop.py
diff options
context:
space:
mode:
Diffstat (limited to 'nemesis/causal_event_loop.py')
-rw-r--r--nemesis/causal_event_loop.py154
1 files changed, 70 insertions, 84 deletions
diff --git a/nemesis/causal_event_loop.py b/nemesis/causal_event_loop.py
index 1b04135..35792fd 100644
--- a/nemesis/causal_event_loop.py
+++ b/nemesis/causal_event_loop.py
@@ -54,10 +54,8 @@ class TimeAwareMixin:
# the timestamp this callback was registered
register_time = None
- # the timestamp this callback completed i/o
+ # the timestamp this callback ideally runs
_when = None
- # the timestamp this callback entered the pause buffer
- time_entered_pause_buffer = None
def __init__(self: Self) -> None:
self.register_time = time.monotonic()
@@ -107,8 +105,6 @@ class CausalEventLoop(asyncio.SelectorEventLoop):
# a value between 0 and 1. 0 means no optimization,
# 1 means the target coroutine is optimized away entirely
_speedup = 0.0
- # a list of callbacks which have recently completed
- _pause_buffer = []
# a list of intervals in which the target coroutine has been active
_coro_intervals = SortedList()
# a list of completed callbacks, and their associated queue time
@@ -203,42 +199,29 @@ class CausalEventLoop(asyncio.SelectorEventLoop):
# Remove delayed calls that were cancelled if their number
# is too high
new_scheduled = []
- for handle in self._scheduled:
+ for eff_guess, handle in self._scheduled:
if handle._cancelled:
handle._scheduled = False
# self._logger.debug(f"\tSlow cleanup killed {_format_handle(handle)}")
else:
- new_scheduled.append(handle)
+ new_scheduled.append((eff_guess, handle))
heapq.heapify(new_scheduled)
self._scheduled = new_scheduled
self._timer_cancelled_count = 0
else:
# Remove delayed calls that were cancelled from head of queue.
- while self._scheduled and self._scheduled[0]._cancelled:
+ while self._scheduled and self._scheduled[0][1]._cancelled:
self._timer_cancelled_count -= 1
- handle = heapq.heappop(self._scheduled)
+ _, handle = heapq.heappop(self._scheduled)
# self._logger.debug(f"\tLazy cleanup killed {_format_handle(handle)}")
handle._scheduled = False
timeout = None
if self._ready or self._stopping:
timeout = 0
- else:
- curr_time = self.time()
- if self._scheduled:
- # Compute the desired timeout.
- # requires computing our-best guess arrival time
- handle = self._scheduled[0]
- timeout = (handle._when + self._get_pause_for_io(handle, curr_time)) \
- - curr_time
- timeout = max(0, min(timeout, MAXIMUM_SELECT_TIMEOUT))
- if self._pause_buffer:
- # pause buffer has an exact arrival time
- pause_timeout = self._pause_buffer[0]._when - curr_time
- timeout = min(pause_timeout, timeout) if timeout else pause_timeout
- timeout = max(0, min(timeout, MAXIMUM_SELECT_TIMEOUT))
- timeout = 0
+ elif self._scheduled:
+ timeout = _get_next_effective_time()
# self._logger.debug(f"HANDLING I/O.")
# self._logger.info("-" * 20)
@@ -251,56 +234,9 @@ class CausalEventLoop(asyncio.SelectorEventLoop):
# self._logger.debug(f"HANDLING SCHEDULED.")
# self._logger.info("-" * 20)
- # Handle 'later' callbacks that are ready.
- curr_time = self.time()
- end_time = curr_time + self._clock_resolution
- while self._scheduled:
- handle = self._scheduled[0]
- when = handle._when
- if when >= end_time:
- break
- handle = heapq.heappop(self._scheduled)
- handle._scheduled = False
-
- delay = self._get_pause_for_io(handle, curr_time)
- time_to_buffer = when + delay
- handle._when = time_to_buffer
- handle.time_entered_pause_buffer = curr_time
- # do not allow duplicates (FIX?)
- if handle not in self._pause_buffer:
- heapq.heappush(self._pause_buffer, handle)
- # self._logger.debug(f"\tscheduled -> pause_buffer for {_format_handle(handle)} (delay={delay})")
-
-
- # self._logger.debug(f"HANDLING PAUSED.")
- # self._logger.info("-" * 20)
- # handle callbacks which can leave pause timeout
- while self._pause_buffer:
- # required when handle's _when is modified in place
- # heapq.heapify(self._pause_buffer)
- handle = self._pause_buffer[0]
- for other in self._pause_buffer[1:]:
- if handle._when > other._when:
- # self._logger.critical(f"{other} was modified in place: first: {handle._when} modified: {other._when}")
- assert handle._when <= other._when, f"Heap root {handle} is not smallest"
- if handle._when >= end_time:
- # self._logger.debug(f"\t{_format_handle(handle)} is not ready to leave. Moving on.")
- break
- # pop the first item in the list
- handle = heapq.heappop(self._pause_buffer)
-
- # if we paused during buffering, we need to delay again
- # TODO clean this up
- # this whole file has 'rounding' timing errors :(
- delay = self._get_pause_for_pause_time(handle, curr_time)
- handle._when = handle._when + delay
- if handle._when >= end_time:
- handle.time_entered_pause_buffer = curr_time
- heapq.heappush(self._pause_buffer, handle)
- # self._logger.debug(f"\tpause_buffer -> pause_buffer for {_format_handle(handle)} (delay={delay})")
- else:
- self._ready.append(handle)
- # self._logger.debug(f"\tpause_buffer -> ready for {_format_handle(handle)}")
+ # Handle 'scheduled' callbacks that are ready.
+ # note 'scheduled' callbacks include both I/O bound or call_later
+ self._add_ready_handles()
@line_profiler.profile
def _run_once(self: Any) -> None:
@@ -359,6 +295,44 @@ class CausalEventLoop(asyncio.SelectorEventLoop):
self._current_handle = None
handle = None # Needed to break cycles when an exception occurs.
+ def _get_next_effective_time(self):
+ """
+ Returns the delta time in which the next currently processing callback
+ will be ready.
+
+ The time this takes is O(log(n)), where n is the number of currently processing
+ callbacks. We can get away with lazily updating the estimated effective run time
+ of each item because `self._get_pause_for_io` never underestimates the true
+ effective firing time. By recomputing the heap head's true effective run time,
+ if our guess is correct, no other timer can have a smaller effective time.
+ """
+ next_effective_time = None
+ curr_time = self.time()
+ while True:
+ eff_guess, handle = self._scheduled[0]
+ eff_true = handle._when + self._get_pause_for_io(handle, curr_time)
+ if eff_true == eff_guess:
+ next_effective_time = eff_true - curr_time
+ break
+ heapq.heappop(self._scheduled)
+ heapq.heappush(self._scheduled, (eff_true, handle))
+ return min(next_effective_time, MAXIMUM_SELECT_TIMEOUT)
+
+ def _add_ready_handles(self):
+ curr_time = self.time() + self._clock_resolution
+ while self._scheduled:
+ eff_guess, handle = heapq.heappop(self._scheduled)
+ eff_true = handle._when + self._get_pause_for_io(handle, curr_time)
+ if curr_time >= eff_true:
+ # set the true 'when'
+ handle._when = eff_true
+ self._ready.append(handle)
+ # self._logger.debug(f"\tscheduled -> _ready for {_format_handle(handle)}")
+ else:
+ heapq.heappush(self._scheduled, (eff_true, handle))
+ break
+
+
def _process_events(self, event_list, time):
for key, mask in event_list:
fileobj, (reader, writer) = key.fileobj, key.data
@@ -381,6 +355,25 @@ class CausalEventLoop(asyncio.SelectorEventLoop):
writer._when = time_to_buffer
self._add_callback(writer)
+ def call_at(self, when, callback, *args, context=None):
+ """Like call_later(), but uses an absolute time.
+
+ Absolute time corresponds to the event loop's time() method.
+ """
+ if when is None:
+ raise TypeError("when cannot be None")
+ self._check_closed()
+ if self._debug:
+ self._check_thread()
+ self._check_callback(callback, 'call_at')
+ timer = events.TimerHandle(when, callback, args, self, context)
+ if timer._source_traceback:
+ del timer._source_traceback[-1]
+ heapq.heappush(self._scheduled, timer)
+ heapq.heappush(self._estimated, (timer._when, timer))
+ timer._scheduled = True
+ return timer
+
def _call_soon(self: Any, callback: Any, args: tuple, context: "contextvars.Context"):
"""Do not add 'callsoon' events to the pause buffer.
Add them directly to ready."""
@@ -412,7 +405,6 @@ class CausalEventLoop(asyncio.SelectorEventLoop):
return handle
def _add_callback(self, handle):
- """Add a Handle to _pause_buffer."""
curr_time = self.time()
if not handle._when:
handle._when = curr_time
@@ -425,13 +417,11 @@ class CausalEventLoop(asyncio.SelectorEventLoop):
self._ready.append(handle)
# self._logger.debug(f"\tio -> ready for VITAL {_format_handle(handle)}")
else:
- handle.time_entered_pause_buffer = curr_time
- # do not allow duplicates (FIX?)
- if handle not in self._pause_buffer:
- heapq.heappush(self._pause_buffer, handle)
- # self._logger.debug(f"\tio -> pause_buffer for {_format_handle(handle)} (delay={handle._when - curr_time})")
+ heapq.heappush(self._scheduled, (handle._when, handle))
+ # self._logger.debug(f"\tio -> _scheduled for {_format_handle(handle)} (delay={handle._when - curr_time})")
else:
# self._logger.warning(f"\t_add_callback called on cancelled handle {_format_handle(handle)}")
+ pass
def _get_pause_for_io(self: Any, handle, io_time: float) -> float:
time_interval = (handle.register_time, io_time)
@@ -445,10 +435,6 @@ class CausalEventLoop(asyncio.SelectorEventLoop):
return p_time
- def _get_pause_for_pause_time(self: Any, handle, exit_time: float) -> float:
- time_interval = (handle.time_entered_pause_buffer, exit_time)
- return self._get_pause_time(time_interval)
-
def _get_pause_time(self: Any, cb_interval: tuple[float, float]) -> float:
time = 0
start, end = cb_interval