diff options
Diffstat (limited to 'nemesis/causal_event_loop.py')
| -rw-r--r-- | nemesis/causal_event_loop.py | 39 |
1 files changed, 28 insertions, 11 deletions
diff --git a/nemesis/causal_event_loop.py b/nemesis/causal_event_loop.py index 2fe8435..b73458f 100644 --- a/nemesis/causal_event_loop.py +++ b/nemesis/causal_event_loop.py @@ -23,6 +23,7 @@ import collections from sortedcontainers import SortedList import heapq import selectors +import sys import time import traceback from copy import copy @@ -114,7 +115,7 @@ class CausalEventLoop(asyncio.SelectorEventLoop): self._completed_coros.clear() def get_completed_coros(self): - return self._completed_coros + return copy(self._completed_coros) def get_pause_time(self): if not self._coro_intervals: @@ -210,14 +211,16 @@ class CausalEventLoop(asyncio.SelectorEventLoop): heapq.heappush(self._pause_buffer, handle) # handle callbacks which can leave pause timeout - heapq.heapify(self._pause_buffer) - # print([x._when for x in self._pause_buffer]) while self._pause_buffer: + # required when handle's _when is modified in place + heapq.heapify(self._pause_buffer) handle = self._pause_buffer[0] + for other in self._pause_buffer[1:]: + assert handle._when <= other._when, f"Heap root {handle} is not smallest" if handle._when >= end_time: break # pop the first item in the list - handle = self._pause_buffer.pop(0) + handle = heapq.heappop(self._pause_buffer) # if we paused during buffering, we need to delay again # TODO clean this up @@ -271,8 +274,7 @@ class CausalEventLoop(asyncio.SelectorEventLoop): except AssertionError as e: print(f"Assertion failed: {e}") sys.exit(1) - if not self._is_blacklisted(handle): - self._completed_coros.append((_format_handle(handle), wait_time)) + self._completed_coros.append((_format_handle(handle), wait_time)) except Exception: traceback.print_exc() finally: @@ -300,10 +302,15 @@ class CausalEventLoop(asyncio.SelectorEventLoop): self._add_callback(writer) def _call_soon(self, callback, args, context): + """Do not add 'callsoon' events to the pause buffer. + Add them directly to ready.""" + curr_time = self.time() handle = events.Handle(callback, args, self, context) if handle._source_traceback: del handle._source_traceback[-1] - self._add_callback(handle) + if not handle._when: + handle._when = curr_time + self._ready.append(handle) return handle def call_soon_threadsafe(self, callback, *args, context=None): @@ -325,9 +332,18 @@ class CausalEventLoop(asyncio.SelectorEventLoop): curr_time = self.time() if not handle._when: handle._when = curr_time + # required in cases where the event loop reuses the same handle + if handle._when < curr_time: + handle._when = curr_time if not handle._cancelled and handle not in self._pause_buffer: - handle.time_entered_pause_buffer = curr_time - heapq.heappush(self._pause_buffer, handle) + if self._is_vital(handle): + handle._when = curr_time + # print(f'{_format_handle(handle)}: {curr_time}') + self._ready.append(handle) + else: + # print(f'DELAYED {_format_handle(handle)}: {curr_time}') + handle.time_entered_pause_buffer = curr_time + heapq.heappush(self._pause_buffer, handle) def _get_pause_for_io(self, handle, io_time): @@ -370,8 +386,9 @@ class CausalEventLoop(asyncio.SelectorEventLoop): overlap_end = min(a_end, b_end) return overlap_end - overlap_start - def _is_blacklisted(self, handle): - blacklist = ['_read_from_self'] + def _is_vital(self, handle): + """ Methods which cannot afford to be paused.""" + blacklist = ['_read_from_self', '_read_ready'] cb = handle._callback if isinstance(getattr(cb, '__self__', None), asyncio.tasks.Task): if cb.__self__.get_coro().__name__ in blacklist: |
