summaryrefslogtreecommitdiff
path: root/nemesis
diff options
context:
space:
mode:
Diffstat (limited to 'nemesis')
-rw-r--r--nemesis/causal_event_loop.py39
1 files changed, 27 insertions, 12 deletions
diff --git a/nemesis/causal_event_loop.py b/nemesis/causal_event_loop.py
index 75857d7..c2b3b1d 100644
--- a/nemesis/causal_event_loop.py
+++ b/nemesis/causal_event_loop.py
@@ -43,8 +43,8 @@ class TimeAwareMixin:
register_time = None
# the timestamp this callback completed i/o
_when = None
- # the timestamp this callback was called by the event loop
- process_start_time = None
+ # the timestamp this callback entered the pause buffer
+ time_entered_pause_buffer = None
def __init__(self):
self.register_time = time.monotonic()
@@ -189,7 +189,8 @@ class CausalEventLoop(asyncio.SelectorEventLoop):
self._process_events(*event_list)
# Handle 'later' callbacks that are ready.
- end_time = self.time() + self._clock_resolution
+ curr_time = self.time()
+ end_time = curr_time + self._clock_resolution
while self._scheduled:
handle = self._scheduled[0]
when = handle._when
@@ -200,6 +201,7 @@ class CausalEventLoop(asyncio.SelectorEventLoop):
time_to_buffer = when + self._get_pause_for_io(handle, curr_time)
handle._when = time_to_buffer
+ handle.time_entered_pause_buffer = curr_time
self._pause_buffer.add(handle)
# handle callbacks which can leave pause timeout
@@ -208,7 +210,18 @@ class CausalEventLoop(asyncio.SelectorEventLoop):
if handle._when >= end_time:
break
# pop the first item in the list
- self._ready.append(self._pause_buffer.pop(0))
+ handle = self._pause_buffer.pop(0)
+
+ # if we paused during buffering, we need to delay again
+ # TODO clean this up
+ # this whole file has 'rounding' timing errors :(
+ handle._when = handle._when + \
+ self._get_pause_for_pause_time(handle, curr_time)
+ if handle._when >= end_time:
+ handle.time_entered_pause_buffer = curr_time
+ self._pause_buffer.add(handle)
+ else:
+ self._ready.append(handle)
def _run_once(self):
"""
@@ -233,7 +246,6 @@ class CausalEventLoop(asyncio.SelectorEventLoop):
self._current_handle = handle
process_start_time = self.time()
- handle.process_start_time = process_start_time
handle._run()
@@ -245,8 +257,7 @@ class CausalEventLoop(asyncio.SelectorEventLoop):
time_interval = (handle._when, process_start_time)
pause_time = self._get_pause_time(time_interval)
- adjusted_start_time = handle.process_start_time - \
- pause_time
+ adjusted_start_time = process_start_time - pause_time
wait_time = adjusted_start_time - handle._when
assert wait_time >= -0.0001, f"wait time on {_format_handle(handle)} was found to be {wait_time:.4f}!"
self._completed_coros.append((_format_handle(handle), wait_time))
@@ -280,8 +291,7 @@ class CausalEventLoop(asyncio.SelectorEventLoop):
handle = events.Handle(callback, args, self, context)
if handle._source_traceback:
del handle._source_traceback[-1]
- handle._when = self.time()
- self._pause_buffer.add(handle)
+ self._add_callback(handle)
return handle
def call_soon_threadsafe(self, callback, *args, context=None):
@@ -290,8 +300,7 @@ class CausalEventLoop(asyncio.SelectorEventLoop):
if self._debug:
self._check_callback(callback, 'call_soon_threadsafe')
handle = events._ThreadSafeHandle(callback, args, self, context)
- handle._when = self.time()
- self._pause_buffer.add(handle)
+ self._add_callback(handle)
if handle._source_traceback:
del handle._source_traceback[-1]
if handle._source_traceback:
@@ -302,14 +311,20 @@ class CausalEventLoop(asyncio.SelectorEventLoop):
def _add_callback(self, handle):
"""Add a Handle to _pause_buffer."""
if not handle._cancelled:
+ curr_time = self.time()
if not handle._when:
- handle._when = self.time()
+ handle._when = curr_time
+ handle.time_entered_pause_buffer = curr_time
self._pause_buffer.add(handle)
def _get_pause_for_io(self, handle, io_time):
time_interval = (handle.register_time, io_time)
return self._get_pause_time(time_interval)
+ def _get_pause_for_pause_time(self, handle, exit_time):
+ time_interval = (handle.time_entered_pause_buffer, exit_time)
+ return self._get_pause_time(time_interval)
+
def _get_pause_time(self, cb_interval):
time = 0
start, end = cb_interval