summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorbd <bdunahu@operationnull.com>2025-09-09 23:31:12 -0400
committerbd <bdunahu@operationnull.com>2025-09-09 23:31:12 -0400
commit963fd1ac52c8d762f65d1cf19d766507e666a986 (patch)
treea0988c519a6b20984c0023031b3a2db624432d6d
parent98924f4ac0a07cd9d4bb74322364347493e73159 (diff)
Add delay insertion to I/O by rescheduling completed with call later
-rw-r--r--nemesis/causal_event_loop.py55
1 files changed, 48 insertions, 7 deletions
diff --git a/nemesis/causal_event_loop.py b/nemesis/causal_event_loop.py
index 35aca9d..2044850 100644
--- a/nemesis/causal_event_loop.py
+++ b/nemesis/causal_event_loop.py
@@ -37,8 +37,10 @@ Code:
import asyncio
import collections
import heapq
+import selectors
import time
from asyncio.log import logger
+from asyncio import Task, events
from asyncio.base_events import _format_handle
_MIN_SCHEDULED_TIMER_HANDLES = 100
@@ -47,15 +49,17 @@ MAXIMUM_SELECT_TIMEOUT = 24 * 3600
class CausalEventLoop(asyncio.SelectorEventLoop):
+ _time_fd_registered = dict()
+ _time_entered_coro = None
+ _accumulated_time = 0
+
+ _time_dilation = 1.0
+ _processing = collections.deque()
+ _last_objective_time = None
+ _last_subjective_time = None
+
def __init__(self) -> None:
super().__init__()
- self._last_objective_time = None
- self._last_subjective_time = None
- self._processing = collections.deque()
- self._time_dilation = 1.0
-
- self._time_entered_coro = None
- self._accumulated_time = 0
_select = self._selector.select
def select(timeout: float):
@@ -179,6 +183,43 @@ class CausalEventLoop(asyncio.SelectorEventLoop):
self._current_handle = None
handle = None # Needed to break cycles when an exception occurs.
+ def _process_events(self, event_list):
+ end_time = super().time()
+ for key, mask in event_list:
+ fileobj, (reader, writer) = key.fileobj, key.data
+ if mask & selectors.EVENT_READ and reader is not None:
+ if reader._cancelled:
+ self._remove_reader(fileobj)
+ elif (start_time := self._time_fd_registered[fileobj]) is not None:
+ self._time_fd_registered[fileobj] = None
+ dt = end_time - start_time
+ when = dt * (1 / self._time_dilation - 1) + super().time()
+ self.call_at(when, reader._callback, *reader._args, context=reader._context)
+ if mask & selectors.EVENT_WRITE and writer is not None:
+ if writer._cancelled:
+ self._remove_writer(fileobj)
+ elif (start_time := self._time_fd_registered[fileobj]) is not None:
+ self._time_fd_registered[fileobj] = None
+ dt = end_time - start_time
+ when = dt * (1 / self._time_dilation - 1) + super().time()
+ self.call_at(when, writer._callback, *writer._args, context=reader._context)
+
+ def _add_reader(self, fd, callback, *args):
+ self._time_fd_registered[fd] = super().time()
+ return super()._add_reader(fd, callback, *args)
+
+ def _remove_reader(self, fd):
+ self._time_fd_registered[fd] = None
+ return super()._remove_reader(fd)
+
+ def _add_writer(self, fd, callback, *args):
+ self._time_fd_registered[fd] = super().time()
+ return super()._add_writer(fd, callback, *args)
+
+ def _remove_writer(self, fd):
+ self._time_fd_registered[fd] = None
+ return super()._remove_writer(fd)
+
def ping_enter_coro(self):
self._time_entered_coro = super().time()