diff options
Diffstat (limited to 'nemesis/causal_event_loop.py')
-rw-r--r-- | nemesis/causal_event_loop.py | 55 |
1 files changed, 48 insertions, 7 deletions
diff --git a/nemesis/causal_event_loop.py b/nemesis/causal_event_loop.py index 35aca9d..2044850 100644 --- a/nemesis/causal_event_loop.py +++ b/nemesis/causal_event_loop.py @@ -37,8 +37,10 @@ Code: import asyncio import collections import heapq +import selectors import time from asyncio.log import logger +from asyncio import Task, events from asyncio.base_events import _format_handle _MIN_SCHEDULED_TIMER_HANDLES = 100 @@ -47,15 +49,17 @@ MAXIMUM_SELECT_TIMEOUT = 24 * 3600 class CausalEventLoop(asyncio.SelectorEventLoop): + _time_fd_registered = dict() + _time_entered_coro = None + _accumulated_time = 0 + + _time_dilation = 1.0 + _processing = collections.deque() + _last_objective_time = None + _last_subjective_time = None + def __init__(self) -> None: super().__init__() - self._last_objective_time = None - self._last_subjective_time = None - self._processing = collections.deque() - self._time_dilation = 1.0 - - self._time_entered_coro = None - self._accumulated_time = 0 _select = self._selector.select def select(timeout: float): @@ -179,6 +183,43 @@ class CausalEventLoop(asyncio.SelectorEventLoop): self._current_handle = None handle = None # Needed to break cycles when an exception occurs. + def _process_events(self, event_list): + end_time = super().time() + for key, mask in event_list: + fileobj, (reader, writer) = key.fileobj, key.data + if mask & selectors.EVENT_READ and reader is not None: + if reader._cancelled: + self._remove_reader(fileobj) + elif (start_time := self._time_fd_registered[fileobj]) is not None: + self._time_fd_registered[fileobj] = None + dt = end_time - start_time + when = dt * (1 / self._time_dilation - 1) + super().time() + self.call_at(when, reader._callback, *reader._args, context=reader._context) + if mask & selectors.EVENT_WRITE and writer is not None: + if writer._cancelled: + self._remove_writer(fileobj) + elif (start_time := self._time_fd_registered[fileobj]) is not None: + self._time_fd_registered[fileobj] = None + dt = end_time - start_time + when = dt * (1 / self._time_dilation - 1) + super().time() + self.call_at(when, writer._callback, *writer._args, context=reader._context) + + def _add_reader(self, fd, callback, *args): + self._time_fd_registered[fd] = super().time() + return super()._add_reader(fd, callback, *args) + + def _remove_reader(self, fd): + self._time_fd_registered[fd] = None + return super()._remove_reader(fd) + + def _add_writer(self, fd, callback, *args): + self._time_fd_registered[fd] = super().time() + return super()._add_writer(fd, callback, *args) + + def _remove_writer(self, fd): + self._time_fd_registered[fd] = None + return super()._remove_writer(fd) + def ping_enter_coro(self): self._time_entered_coro = super().time() |