summaryrefslogtreecommitdiff
path: root/nemesis/causal_event_loop.py
diff options
context:
space:
mode:
Diffstat (limited to 'nemesis/causal_event_loop.py')
-rw-r--r--nemesis/causal_event_loop.py37
1 files changed, 24 insertions, 13 deletions
diff --git a/nemesis/causal_event_loop.py b/nemesis/causal_event_loop.py
index b6fdea6..12a4084 100644
--- a/nemesis/causal_event_loop.py
+++ b/nemesis/causal_event_loop.py
@@ -176,8 +176,8 @@ class CausalEventLoop(asyncio.SelectorEventLoop):
def collect_ready_events(self, timeout=0):
event_list = self._selector.select(timeout)
- self._ready_events.append((event_list, self.time()))
- # self._logger.debug(f"\tPolled events for {timeout} (waiting={len(self._ready_events)})\n{pformat(event_list, indent=2)}")
+ if event_list:
+ self._ready_events.append((event_list, self.time()))
def update_ready(self):
'''
@@ -234,7 +234,10 @@ class CausalEventLoop(asyncio.SelectorEventLoop):
self._logger.debug(f"HANDLING I/O.")
self._logger.info("-" * 20)
self.collect_ready_events(timeout)
- for event_list in self._ready_events:
+ self._logger.debug(f"\tPolled events for {timeout} (waiting={len(self._ready_events)})\n{pformat(self._ready_events, indent=2)}")
+
+ while len(self._ready_events):
+ event_list = self._ready_events.pop(0)
self._process_events(*event_list)
self._logger.debug(f"HANDLING SCHEDULED.")
@@ -255,7 +258,9 @@ class CausalEventLoop(asyncio.SelectorEventLoop):
handle._when = time_to_buffer
handle.time_entered_pause_buffer = curr_time
# self._ready.append(handle)
- heapq.heappush(self._pause_buffer, handle)
+ # do not allow duplicates (FIX?)
+ if handle not in self._pause_buffer:
+ heapq.heappush(self._pause_buffer, handle)
self._logger.debug(f"\tscheduled -> pause_buffer for {_format_handle(handle)} (delay={delay})")
@@ -269,6 +274,7 @@ class CausalEventLoop(asyncio.SelectorEventLoop):
for other in self._pause_buffer[1:]:
assert handle._when <= other._when, f"Heap root {handle} is not smallest"
if handle._when >= end_time:
+ self._logger.debug(f"\t{_format_handle(handle)} is not ready to leave. Moving on.")
break
# pop the first item in the list
handle = heapq.heappop(self._pause_buffer)
@@ -348,6 +354,7 @@ class CausalEventLoop(asyncio.SelectorEventLoop):
if mask & selectors.EVENT_READ and reader is not None:
if reader._cancelled:
self._remove_reader(fileobj)
+ self._logger.info(f"\treader {reader} was cancelled.")
else:
time_to_buffer = time + \
self._get_pause_for_io(reader, time)
@@ -356,6 +363,7 @@ class CausalEventLoop(asyncio.SelectorEventLoop):
if mask & selectors.EVENT_WRITE and writer is not None:
if writer._cancelled:
self._remove_writer(fileobj)
+ self._logger.info(f"\twriter {writer} was cancelled.")
else:
time_to_buffer = time + \
self._get_pause_for_io(writer, time)
@@ -400,19 +408,19 @@ class CausalEventLoop(asyncio.SelectorEventLoop):
# required in cases where the event loop reuses the same handle
if handle._when < curr_time:
handle._when = curr_time
- if not handle._cancelled and handle not in self._pause_buffer:
+ if not handle._cancelled:
if self._is_vital(handle):
handle._when = curr_time
- # print(f'{_format_handle(handle)}: {curr_time}')
self._ready.append(handle)
self._logger.debug(f"\tio -> ready for VITAL {_format_handle(handle)}")
else:
- # print(f'DELAYED {_format_handle(handle)}: {curr_time}')
handle.time_entered_pause_buffer = curr_time
- # self._ready.append(handle)
- heapq.heappush(self._pause_buffer, handle)
- self._logger.debug(f"\tio -> pause_buffer for {_format_handle(handle)} (delay={curr_time - handle._when})")
-
+ # do not allow duplicates (FIX?)
+ if handle not in self._pause_buffer:
+ heapq.heappush(self._pause_buffer, handle)
+ self._logger.debug(f"\tio -> pause_buffer for {_format_handle(handle)} (delay={handle._when - curr_time})")
+ else:
+ self._logger.warning(f"\t_add_callback called on cancelled handle {_format_handle(handle)}")
def _get_pause_for_io(self, handle, io_time):
time_interval = (handle.register_time, io_time)
@@ -447,16 +455,19 @@ class CausalEventLoop(asyncio.SelectorEventLoop):
time += self._get_overlap(start, end, self._time_entered_coro,
curr_time)
- return time * self._speedup
+ pause_time = time * self._speedup
+ assert pause_time >= 0, f"Delay was found to be less than 0: {pause_time}"
+ return pause_time
def _get_overlap(self, a_start, a_end, b_start, b_end):
overlap_start = max(a_start, b_start)
overlap_end = min(a_end, b_end)
+ assert overlap_end >= overlap_start, f"Bad overlaps: {a_start} {a_end} : {b_start} {b_end} ({overlap_end - overlap_start})"
return overlap_end - overlap_start
def _is_vital(self, handle):
""" Methods which cannot afford to be paused."""
- blacklist = ['_read_from_self', '_read_ready']
+ blacklist = ['_read_from_self', '_read_ready', '_accept_connection']
cb = handle._callback
if isinstance(getattr(cb, '__self__', None), asyncio.tasks.Task):
if cb.__self__.get_coro().__name__ in blacklist: