summaryrefslogtreecommitdiff
path: root/nemesis/causal_event_loop.py
diff options
context:
space:
mode:
Diffstat (limited to 'nemesis/causal_event_loop.py')
-rw-r--r--nemesis/causal_event_loop.py159
1 files changed, 159 insertions, 0 deletions
diff --git a/nemesis/causal_event_loop.py b/nemesis/causal_event_loop.py
new file mode 100644
index 0000000..4320655
--- /dev/null
+++ b/nemesis/causal_event_loop.py
@@ -0,0 +1,159 @@
+import asyncio
+import collections
+import heapq
+import time
+from asyncio.log import logger
+from asyncio.base_events import _format_handle
+
+_MIN_SCHEDULED_TIMER_HANDLES = 100
+_MIN_CANCELLED_TIMER_HANDLES_FRACTION = 0.5
+MAXIMUM_SELECT_TIMEOUT = 24 * 3600
+
+class CausalEventLoop(asyncio.SelectorEventLoop):
+ '''
+ Internal time in this event loop may run faster or slower than objective time.
+ Control loop time dilation vs. objective time by setting the ``time_dilation`` attribute
+ (dilation == X means that loop time is running X times faster than objective time).
+ '''
+
+ def __init__(self) -> None:
+ super().__init__()
+ self._last_objective_time = None
+ self._last_subjective_time = None
+ self._processing = collections.deque()
+ self._time_dilation = 1.0
+ self._task_to_optimize = None
+
+ _select = self._selector.select
+ def select(timeout: float):
+ return _select(timeout / self._time_dilation)
+ self._selector.select = select
+
+ def _set_experiment(self, task, dilation):
+ self._task_to_optimize = task
+ self._time_dilation = dilation
+
+ def _matches_task(self, handle):
+ cb = handle._callback
+ if isinstance(getattr(cb, '__self__', None), asyncio.tasks.Task):
+ task = cb.__self__
+ if task.get_name() == self._task_to_optimize:
+ return True
+ return False
+
+ def _update_ready(self, sampling=False):
+ """
+ Polls the IO selector, schedules resulting callbacks, and schedules
+ 'call_later' callbacks.
+
+ This logic was separated out of `run_once` so that the list of `ready`
+ tasks may be updated more frequently than once per iteration.
+
+ If SAMPLING is true, the timeout passed to the selector will always be 0.
+ """
+ sched_count = len(self._scheduled)
+ if (sched_count > _MIN_SCHEDULED_TIMER_HANDLES and
+ self._timer_cancelled_count / sched_count >
+ _MIN_CANCELLED_TIMER_HANDLES_FRACTION):
+ # Remove delayed calls that were cancelled if their number
+ # is too high
+ new_scheduled = []
+ for handle in self._scheduled:
+ if handle._cancelled:
+ handle._scheduled = False
+ else:
+ new_scheduled.append(handle)
+
+ heapq.heapify(new_scheduled)
+ self._scheduled = new_scheduled
+ self._timer_cancelled_count = 0
+ else:
+ # Remove delayed calls that were cancelled from head of queue.
+ while self._scheduled and self._scheduled[0]._cancelled:
+ self._timer_cancelled_count -= 1
+ handle = heapq.heappop(self._scheduled)
+ handle._scheduled = False
+
+ timeout = None
+ if sampling or self._ready or self._stopping:
+ timeout = 0
+ elif self._scheduled:
+ # Compute the desired timeout.
+ timeout = self._scheduled[0]._when - self.time()
+ if timeout > MAXIMUM_SELECT_TIMEOUT:
+ timeout = MAXIMUM_SELECT_TIMEOUT
+ elif timeout < 0:
+ timeout = 0
+
+ event_list = self._selector.select(timeout)
+ self._process_events(event_list)
+ # Needed to break cycles when an exception occurs.
+ event_list = None
+
+ # Handle 'later' callbacks that are ready.
+ end_time = self.time() + self._clock_resolution
+ while self._scheduled:
+ handle = self._scheduled[0]
+ if handle._when >= end_time:
+ break
+ handle = heapq.heappop(self._scheduled)
+ handle._scheduled = False
+ self._ready.append(handle)
+
+ def _run_once(self):
+ """Run one full iteration of the event loop.
+
+ This calls all currently ready callbacks.
+ """
+ self._update_ready()
+ self._processing.extend(self._ready)
+ # This is the only place where callbacks are actually *called*.
+ # All other places just add them to ready.
+ # Note: We run all currently scheduled callbacks, but not any
+ # callbacks scheduled by callbacks run this time around --
+ # they will be run the next time (after another I/O poll).
+ # Use an idiom that is thread-safe without using locks.
+ ntodo = len(self._processing)
+ for i in range(ntodo):
+ handle = self._processing.popleft()
+ try:
+ self._ready.remove(handle)
+ except ValueError:
+ pass
+ if handle._cancelled:
+ continue
+ try:
+ self._current_handle = handle
+ t0 = super().time()
+ handle._run()
+ dt = super().time() - t0
+ if self._debug and dt >= self.slow_callback_duration:
+ logger.warning('Executing %s took %.3f seconds',
+ _format_handle(handle), dt * 1/self._time_dilation)
+ if not self._matches_task(handle):
+ t0 = super().time()
+ # insert a delay
+ while super().time() - t0 < dt * (1/self._time_dilation - 1):
+ time.sleep(0.001)
+ finally:
+ self._current_handle = None
+ handle = None # Needed to break cycles when an exception occurs.
+
+ def time(self):
+ obj = super().time()
+ if self._last_objective_time is None:
+ self._last_objective_time = self._last_subjective_time = obj
+ return obj
+ else:
+ sub = self._last_subjective_time + (obj - self._last_objective_time) * self._time_dilation
+ self._last_subjective_time = sub
+ self._last_objective_time = obj
+ return sub
+
+
+class CausalEventLoopPolicy(asyncio.DefaultEventLoopPolicy):
+ def new_event_loop(self):
+ return CausalEventLoop()
+
+
+asyncio.set_event_loop_policy(CausalEventLoopPolicy())