summaryrefslogtreecommitdiff
path: root/nemesis/causal_event_loop.py
diff options
context:
space:
mode:
authorbd <bdunahu@operationnull.com>2025-10-15 00:16:38 -0400
committerbd <bdunahu@operationnull.com>2025-10-15 00:16:38 -0400
commit1a439653264adc3d9648645d2bb4006f331c1722 (patch)
tree27a4d50f3fb1cd333b15ec372500dd78cfb296b9 /nemesis/causal_event_loop.py
parent16aa392a3c08c8769cc30bdbc1830a31f9b0808e (diff)
Fix a lot of bugs--core callbacks being delayed, reused handles
Diffstat (limited to 'nemesis/causal_event_loop.py')
-rw-r--r--nemesis/causal_event_loop.py39
1 files changed, 28 insertions, 11 deletions
diff --git a/nemesis/causal_event_loop.py b/nemesis/causal_event_loop.py
index 2fe8435..b73458f 100644
--- a/nemesis/causal_event_loop.py
+++ b/nemesis/causal_event_loop.py
@@ -23,6 +23,7 @@ import collections
from sortedcontainers import SortedList
import heapq
import selectors
+import sys
import time
import traceback
from copy import copy
@@ -114,7 +115,7 @@ class CausalEventLoop(asyncio.SelectorEventLoop):
self._completed_coros.clear()
def get_completed_coros(self):
- return self._completed_coros
+ return copy(self._completed_coros)
def get_pause_time(self):
if not self._coro_intervals:
@@ -210,14 +211,16 @@ class CausalEventLoop(asyncio.SelectorEventLoop):
heapq.heappush(self._pause_buffer, handle)
# handle callbacks which can leave pause timeout
- heapq.heapify(self._pause_buffer)
- # print([x._when for x in self._pause_buffer])
while self._pause_buffer:
+ # required when handle's _when is modified in place
+ heapq.heapify(self._pause_buffer)
handle = self._pause_buffer[0]
+ for other in self._pause_buffer[1:]:
+ assert handle._when <= other._when, f"Heap root {handle} is not smallest"
if handle._when >= end_time:
break
# pop the first item in the list
- handle = self._pause_buffer.pop(0)
+ handle = heapq.heappop(self._pause_buffer)
# if we paused during buffering, we need to delay again
# TODO clean this up
@@ -271,8 +274,7 @@ class CausalEventLoop(asyncio.SelectorEventLoop):
except AssertionError as e:
print(f"Assertion failed: {e}")
sys.exit(1)
- if not self._is_blacklisted(handle):
- self._completed_coros.append((_format_handle(handle), wait_time))
+ self._completed_coros.append((_format_handle(handle), wait_time))
except Exception:
traceback.print_exc()
finally:
@@ -300,10 +302,15 @@ class CausalEventLoop(asyncio.SelectorEventLoop):
self._add_callback(writer)
def _call_soon(self, callback, args, context):
+ """Do not add 'callsoon' events to the pause buffer.
+ Add them directly to ready."""
+ curr_time = self.time()
handle = events.Handle(callback, args, self, context)
if handle._source_traceback:
del handle._source_traceback[-1]
- self._add_callback(handle)
+ if not handle._when:
+ handle._when = curr_time
+ self._ready.append(handle)
return handle
def call_soon_threadsafe(self, callback, *args, context=None):
@@ -325,9 +332,18 @@ class CausalEventLoop(asyncio.SelectorEventLoop):
curr_time = self.time()
if not handle._when:
handle._when = curr_time
+ # required in cases where the event loop reuses the same handle
+ if handle._when < curr_time:
+ handle._when = curr_time
if not handle._cancelled and handle not in self._pause_buffer:
- handle.time_entered_pause_buffer = curr_time
- heapq.heappush(self._pause_buffer, handle)
+ if self._is_vital(handle):
+ handle._when = curr_time
+ # print(f'{_format_handle(handle)}: {curr_time}')
+ self._ready.append(handle)
+ else:
+ # print(f'DELAYED {_format_handle(handle)}: {curr_time}')
+ handle.time_entered_pause_buffer = curr_time
+ heapq.heappush(self._pause_buffer, handle)
def _get_pause_for_io(self, handle, io_time):
@@ -370,8 +386,9 @@ class CausalEventLoop(asyncio.SelectorEventLoop):
overlap_end = min(a_end, b_end)
return overlap_end - overlap_start
- def _is_blacklisted(self, handle):
- blacklist = ['_read_from_self']
+ def _is_vital(self, handle):
+ """ Methods which cannot afford to be paused."""
+ blacklist = ['_read_from_self', '_read_ready']
cb = handle._callback
if isinstance(getattr(cb, '__self__', None), asyncio.tasks.Task):
if cb.__self__.get_coro().__name__ in blacklist: