summaryrefslogtreecommitdiff
path: root/nemesis/causal_event_loop.py
diff options
context:
space:
mode:
Diffstat (limited to 'nemesis/causal_event_loop.py')
-rw-r--r--nemesis/causal_event_loop.py20
1 files changed, 12 insertions, 8 deletions
diff --git a/nemesis/causal_event_loop.py b/nemesis/causal_event_loop.py
index c2b3b1d..f4834fc 100644
--- a/nemesis/causal_event_loop.py
+++ b/nemesis/causal_event_loop.py
@@ -93,7 +93,7 @@ class CausalEventLoop(asyncio.SelectorEventLoop):
# 1 means the target coroutine is optimized away entirely
_speedup = 1.0
# a list of callbacks which have recently completed
- _pause_buffer = SortedList()
+ _pause_buffer = []
# a list of intervals in which the target coroutine has been active
_coro_intervals = SortedList()
# a list of completed callbacks, and their associated queue time
@@ -202,9 +202,13 @@ class CausalEventLoop(asyncio.SelectorEventLoop):
time_to_buffer = when + self._get_pause_for_io(handle, curr_time)
handle._when = time_to_buffer
handle.time_entered_pause_buffer = curr_time
- self._pause_buffer.add(handle)
+ heapq.heappush(self._pause_buffer, handle)
# handle callbacks which can leave pause timeout
+ # if len(self._pause_buffer) > 100:
+ # exit(1)
+ heapq.heapify(self._pause_buffer)
+ # print([x._when for x in self._pause_buffer])
while self._pause_buffer:
handle = self._pause_buffer[0]
if handle._when >= end_time:
@@ -219,7 +223,7 @@ class CausalEventLoop(asyncio.SelectorEventLoop):
self._get_pause_for_pause_time(handle, curr_time)
if handle._when >= end_time:
handle.time_entered_pause_buffer = curr_time
- self._pause_buffer.add(handle)
+ heapq.heappush(self._pause_buffer, handle)
else:
self._ready.append(handle)
@@ -310,12 +314,12 @@ class CausalEventLoop(asyncio.SelectorEventLoop):
def _add_callback(self, handle):
"""Add a Handle to _pause_buffer."""
- if not handle._cancelled:
- curr_time = self.time()
- if not handle._when:
- handle._when = curr_time
+ curr_time = self.time()
+ if not handle._when:
+ handle._when = curr_time
+ if not handle._cancelled and handle not in self._pause_buffer:
handle.time_entered_pause_buffer = curr_time
- self._pause_buffer.add(handle)
+ heapq.heappush(self._pause_buffer, handle)
def _get_pause_for_io(self, handle, io_time):
time_interval = (handle.register_time, io_time)