summaryrefslogtreecommitdiff
path: root/nemesis
diff options
context:
space:
mode:
Diffstat (limited to 'nemesis')
-rw-r--r--nemesis/causal_event_loop.py148
-rwxr-xr-xnemesis/nemesis.py2
2 files changed, 92 insertions, 58 deletions
diff --git a/nemesis/causal_event_loop.py b/nemesis/causal_event_loop.py
index d01dfd4..75857d7 100644
--- a/nemesis/causal_event_loop.py
+++ b/nemesis/causal_event_loop.py
@@ -25,6 +25,7 @@ import heapq
import selectors
import time
import traceback
+from copy import copy
from asyncio.log import logger
from asyncio import Task, events
from asyncio.base_events import _format_handle
@@ -34,20 +35,42 @@ _MIN_CANCELLED_TIMER_HANDLES_FRACTION = 0.5
MAXIMUM_SELECT_TIMEOUT = 24 * 3600
+orig_handle = asyncio.events.Handle
+
class TimeAwareMixin:
# the timestamp this callback was registered
register_time = None
# the timestamp this callback completed i/o
- io_time = None
+ _when = None
# the timestamp this callback was called by the event loop
process_start_time = None
def __init__(self):
self.register_time = time.monotonic()
+ def __hash__(self):
+ return hash(self._when)
-orig_handle = asyncio.events.Handle
+ def __lt__(self, other):
+ if isinstance(other, orig_handle):
+ return self._when < other._when
+ return NotImplemented
+
+ def __le__(self, other):
+ if isinstance(other, orig_handle):
+ return self._when < other._when or self.__eq__(other)
+ return NotImplemented
+
+ def __gt__(self, other):
+ if isinstance(other, orig_handle):
+ return self._when > other._when
+ return NotImplemented
+
+ def __ge__(self, other):
+ if isinstance(other, orig_handle):
+ return self._when > other._when or self.__eq__(other)
+ return NotImplemented
def create_subclass(base_class):
@@ -70,13 +93,14 @@ class CausalEventLoop(asyncio.SelectorEventLoop):
# 1 means the target coroutine is optimized away entirely
_speedup = 1.0
# a list of callbacks which have recently completed
- _pause_buffer = []
+ _pause_buffer = SortedList()
# a list of intervals in which the target coroutine has been active
_coro_intervals = SortedList()
# a list of completed callbacks, and their associated queue time
_completed_coros = []
# the last time we entered the target coro
_time_entered_coro = None
+ _ready_events = []
def __init__(self) -> None:
super().__init__()
@@ -109,17 +133,14 @@ class CausalEventLoop(asyncio.SelectorEventLoop):
self._coro_intervals.add((self._time_entered_coro, self.time()))
self._time_entered_coro = None
- def update_ready(self, can_stall=True):
+ def collect_ready_events(self, timeout=0):
+ event_list = self._selector.select(timeout)
+ self._ready_events.append((event_list, self.time()))
+
+ def update_ready(self):
'''
Polls the IO selector, schedules resulting callbacks, and schedules
'call_later' callbacks.
-
- This function can be called in the middle of an event loop iteration.
-
- This logic was separated out of `run_once` so that the list of `ready`
- tasks may be updated more frequently than once per iteration.
-
- If SAMPLING is true, the timeout passed to the selector will always be 0.
'''
curr_time = self.time()
sched_count = len(self._scheduled)
@@ -146,36 +167,48 @@ class CausalEventLoop(asyncio.SelectorEventLoop):
handle._scheduled = False
timeout = None
- # TODO this needs to be rewritten
- # We can't miss things placed in timeout either
- # if not can_stall or self._ready or self._stopping:
- timeout = 0
- # elif self._scheduled:
- # # Compute the desired timeout.
- # timeout = self._scheduled[0]._when - self.time()
- # if timeout > MAXIMUM_SELECT_TIMEOUT:
- # timeout = MAXIMUM_SELECT_TIMEOUT
- # elif timeout < 0:
- # timeout = 0
-
- event_list = self._selector.select(timeout)
- self._process_events(event_list)
- # Needed to break cycles when an exception occurs.
- event_list = None
+ if self._ready or self._stopping:
+ timeout = 0
+ else:
+ curr_time = self.time()
+ if self._scheduled:
+ # Compute the desired timeout.
+ # requires computing our-best guess arrival time
+ handle = self._scheduled[0]
+ timeout = (handle._when + self._get_pause_for_io(handle, curr_time)) \
+ - curr_time
+ timeout = max(0, min(timeout, MAXIMUM_SELECT_TIMEOUT))
+ if self._pause_buffer:
+ # pause buffer has an exact arrival time
+ pause_timeout = self._pause_buffer[0]._when - curr_time
+ timeout = min(pause_timeout, timeout) if timeout else pause_timeout
+ timeout = max(0, min(timeout, MAXIMUM_SELECT_TIMEOUT))
+
+ self.collect_ready_events(timeout)
+ for event_list in self._ready_events:
+ self._process_events(*event_list)
# Handle 'later' callbacks that are ready.
end_time = self.time() + self._clock_resolution
while self._scheduled:
handle = self._scheduled[0]
- if handle._when >= end_time:
+ when = handle._when
+ if when >= end_time:
break
handle = heapq.heappop(self._scheduled)
handle._scheduled = False
- time_interval = (handle.register_time, curr_time)
- time_to_buffer = curr_time + self._get_pause_time(time_interval)
- handle.io_time = time_to_buffer
- self._ready.append(handle)
+ time_to_buffer = when + self._get_pause_for_io(handle, curr_time)
+ handle._when = time_to_buffer
+ self._pause_buffer.add(handle)
+
+ # handle callbacks which can leave pause timeout
+ while self._pause_buffer:
+ handle = self._pause_buffer[0]
+ if handle._when >= end_time:
+ break
+ # pop the first item in the list
+ self._ready.append(self._pause_buffer.pop(0))
def _run_once(self):
"""
@@ -185,22 +218,15 @@ class CausalEventLoop(asyncio.SelectorEventLoop):
"""
self.update_ready()
- current_time = self.time()
- to_process = collections.deque([
- handle for handle in self._ready
- if handle.io_time < (current_time + self._clock_resolution)
- ])
-
# This is the only place where callbacks are actually *called*.
# All other places just add them to ready.
# Note: We run all currently scheduled callbacks, but not any
# callbacks scheduled by callbacks run this time around --
# they will be run the next time (after another I/O poll).
# Use an idiom that is thread-safe without using locks.
- ntodo = len(to_process)
+ ntodo = len(self._ready)
for i in range(ntodo):
- handle = to_process.popleft()
- self._ready.remove(handle)
+ handle = self._ready.popleft()
if handle._cancelled:
continue
try:
@@ -217,11 +243,11 @@ class CausalEventLoop(asyncio.SelectorEventLoop):
logger.warning('Executing %s took %.3f seconds',
_format_handle(handle), dt)
- time_interval = (handle.io_time, process_start_time)
+ time_interval = (handle._when, process_start_time)
pause_time = self._get_pause_time(time_interval)
adjusted_start_time = handle.process_start_time - \
pause_time
- wait_time = adjusted_start_time - handle.io_time
+ wait_time = adjusted_start_time - handle._when
assert wait_time >= -0.0001, f"wait time on {_format_handle(handle)} was found to be {wait_time:.4f}!"
self._completed_coros.append((_format_handle(handle), wait_time))
except Exception:
@@ -230,35 +256,32 @@ class CausalEventLoop(asyncio.SelectorEventLoop):
self._current_handle = None
handle = None # Needed to break cycles when an exception occurs.
- def _process_events(self, event_list):
- curr_time = self.time()
+ def _process_events(self, event_list, time):
for key, mask in event_list:
fileobj, (reader, writer) = key.fileobj, key.data
if mask & selectors.EVENT_READ and reader is not None:
if reader._cancelled:
self._remove_reader(fileobj)
else:
- time_interval = (reader.register_time, curr_time)
- time_to_buffer = curr_time + \
- self._get_pause_time(time_interval)
- reader.io_time = time_to_buffer
+ time_to_buffer = time + \
+ self._get_pause_for_io(reader, time)
+ reader._when = time_to_buffer
self._add_callback(reader)
if mask & selectors.EVENT_WRITE and writer is not None:
if writer._cancelled:
self._remove_writer(fileobj)
else:
- time_interval = (writer.register_time, curr_time)
- time_to_buffer = curr_time + \
- self._get_pause_time(time_interval)
- writer.io_time = time_to_buffer
+ time_to_buffer = time + \
+ self._get_pause_for_io(writer, time)
+ writer._when = time_to_buffer
self._add_callback(writer)
def _call_soon(self, callback, args, context):
handle = events.Handle(callback, args, self, context)
if handle._source_traceback:
del handle._source_traceback[-1]
- handle.io_time = self.time()
- self._ready.append(handle)
+ handle._when = self.time()
+ self._pause_buffer.add(handle)
return handle
def call_soon_threadsafe(self, callback, *args, context=None):
@@ -267,8 +290,8 @@ class CausalEventLoop(asyncio.SelectorEventLoop):
if self._debug:
self._check_callback(callback, 'call_soon_threadsafe')
handle = events._ThreadSafeHandle(callback, args, self, context)
- handle.io_time = self.time()
- self._ready.append(handle)
+ handle._when = self.time()
+ self._pause_buffer.add(handle)
if handle._source_traceback:
del handle._source_traceback[-1]
if handle._source_traceback:
@@ -276,6 +299,17 @@ class CausalEventLoop(asyncio.SelectorEventLoop):
self._write_to_self()
return handle
+ def _add_callback(self, handle):
+ """Add a Handle to _pause_buffer."""
+ if not handle._cancelled:
+ if not handle._when:
+ handle._when = self.time()
+ self._pause_buffer.add(handle)
+
+ def _get_pause_for_io(self, handle, io_time):
+ time_interval = (handle.register_time, io_time)
+ return self._get_pause_time(time_interval)
+
def _get_pause_time(self, cb_interval):
time = 0
start, end = cb_interval
diff --git a/nemesis/nemesis.py b/nemesis/nemesis.py
index 27e93de..b7a9877 100755
--- a/nemesis/nemesis.py
+++ b/nemesis/nemesis.py
@@ -170,7 +170,7 @@ class Nemesis(object):
loop.ping_enter_coro()
Nemesis.prev_coro[loop] = coro
- loop.update_ready(False)
+ loop.collect_ready_events()
Nemesis.experiment_time += passed_time
if (Nemesis.e_duration <= Nemesis.experiment_time):