summaryrefslogtreecommitdiff
path: root/nemesis/causal_event_loop.py
diff options
context:
space:
mode:
authorbd <bdunahu@operationnull.com>2025-10-08 01:27:33 -0400
committerbd <bdunahu@operationnull.com>2025-10-08 01:27:33 -0400
commit0bd8432b4c6bd3d82c4916e0b6fb173aa7a5ff39 (patch)
tree21588af3b7a8e9097836f4250fd136179a4cc3ac /nemesis/causal_event_loop.py
parenta2b9381f1c93122ff25586e79275a29dbddde790 (diff)
switch from sortedlist to heap queue to ensure proper sorting
Diffstat (limited to 'nemesis/causal_event_loop.py')
-rw-r--r--nemesis/causal_event_loop.py20
1 files changed, 12 insertions, 8 deletions
diff --git a/nemesis/causal_event_loop.py b/nemesis/causal_event_loop.py
index c2b3b1d..f4834fc 100644
--- a/nemesis/causal_event_loop.py
+++ b/nemesis/causal_event_loop.py
@@ -93,7 +93,7 @@ class CausalEventLoop(asyncio.SelectorEventLoop):
# 1 means the target coroutine is optimized away entirely
_speedup = 1.0
# a list of callbacks which have recently completed
- _pause_buffer = SortedList()
+ _pause_buffer = []
# a list of intervals in which the target coroutine has been active
_coro_intervals = SortedList()
# a list of completed callbacks, and their associated queue time
@@ -202,9 +202,13 @@ class CausalEventLoop(asyncio.SelectorEventLoop):
time_to_buffer = when + self._get_pause_for_io(handle, curr_time)
handle._when = time_to_buffer
handle.time_entered_pause_buffer = curr_time
- self._pause_buffer.add(handle)
+ heapq.heappush(self._pause_buffer, handle)
# handle callbacks which can leave pause timeout
+ # if len(self._pause_buffer) > 100:
+ # exit(1)
+ heapq.heapify(self._pause_buffer)
+ # print([x._when for x in self._pause_buffer])
while self._pause_buffer:
handle = self._pause_buffer[0]
if handle._when >= end_time:
@@ -219,7 +223,7 @@ class CausalEventLoop(asyncio.SelectorEventLoop):
self._get_pause_for_pause_time(handle, curr_time)
if handle._when >= end_time:
handle.time_entered_pause_buffer = curr_time
- self._pause_buffer.add(handle)
+ heapq.heappush(self._pause_buffer, handle)
else:
self._ready.append(handle)
@@ -310,12 +314,12 @@ class CausalEventLoop(asyncio.SelectorEventLoop):
def _add_callback(self, handle):
"""Add a Handle to _pause_buffer."""
- if not handle._cancelled:
- curr_time = self.time()
- if not handle._when:
- handle._when = curr_time
+ curr_time = self.time()
+ if not handle._when:
+ handle._when = curr_time
+ if not handle._cancelled and handle not in self._pause_buffer:
handle.time_entered_pause_buffer = curr_time
- self._pause_buffer.add(handle)
+ heapq.heappush(self._pause_buffer, handle)
def _get_pause_for_io(self, handle, io_time):
time_interval = (handle.register_time, io_time)