import asyncio import collections import heapq import time from asyncio.log import logger from asyncio.base_events import _format_handle _MIN_SCHEDULED_TIMER_HANDLES = 100 _MIN_CANCELLED_TIMER_HANDLES_FRACTION = 0.5 MAXIMUM_SELECT_TIMEOUT = 24 * 3600 class CausalEventLoop(asyncio.SelectorEventLoop): ''' Internal time in this event loop may run faster or slower than objective time. Control loop time dilation vs. objective time by setting the ``time_dilation`` attribute (dilation == X means that loop time is running X times faster than objective time). ''' def __init__(self) -> None: super().__init__() self._last_objective_time = None self._last_subjective_time = None self._processing = collections.deque() self._time_dilation = 1.0 self._task_to_optimize = None _select = self._selector.select def select(timeout: float): return _select(timeout / self._time_dilation) self._selector.select = select def _set_experiment(self, task, dilation): self._task_to_optimize = task self._time_dilation = dilation def _matches_task(self, handle): cb = handle._callback if isinstance(getattr(cb, '__self__', None), asyncio.tasks.Task): task = cb.__self__ if task.get_name() == self._task_to_optimize: return True return False def _update_ready(self, sampling=False): """ Polls the IO selector, schedules resulting callbacks, and schedules 'call_later' callbacks. This logic was separated out of `run_once` so that the list of `ready` tasks may be updated more frequently than once per iteration. If SAMPLING is true, the timeout passed to the selector will always be 0. """ sched_count = len(self._scheduled) if (sched_count > _MIN_SCHEDULED_TIMER_HANDLES and self._timer_cancelled_count / sched_count > _MIN_CANCELLED_TIMER_HANDLES_FRACTION): # Remove delayed calls that were cancelled if their number # is too high new_scheduled = [] for handle in self._scheduled: if handle._cancelled: handle._scheduled = False else: new_scheduled.append(handle) heapq.heapify(new_scheduled) self._scheduled = new_scheduled self._timer_cancelled_count = 0 else: # Remove delayed calls that were cancelled from head of queue. while self._scheduled and self._scheduled[0]._cancelled: self._timer_cancelled_count -= 1 handle = heapq.heappop(self._scheduled) handle._scheduled = False timeout = None if sampling or self._ready or self._stopping: timeout = 0 elif self._scheduled: # Compute the desired timeout. timeout = self._scheduled[0]._when - self.time() if timeout > MAXIMUM_SELECT_TIMEOUT: timeout = MAXIMUM_SELECT_TIMEOUT elif timeout < 0: timeout = 0 event_list = self._selector.select(timeout) self._process_events(event_list) # Needed to break cycles when an exception occurs. event_list = None # Handle 'later' callbacks that are ready. end_time = self.time() + self._clock_resolution while self._scheduled: handle = self._scheduled[0] if handle._when >= end_time: break handle = heapq.heappop(self._scheduled) handle._scheduled = False self._ready.append(handle) def _run_once(self): """Run one full iteration of the event loop. This calls all currently ready callbacks. """ self._update_ready() self._processing.extend(self._ready) # This is the only place where callbacks are actually *called*. # All other places just add them to ready. # Note: We run all currently scheduled callbacks, but not any # callbacks scheduled by callbacks run this time around -- # they will be run the next time (after another I/O poll). # Use an idiom that is thread-safe without using locks. ntodo = len(self._processing) for i in range(ntodo): handle = self._processing.popleft() try: self._ready.remove(handle) except ValueError: pass if handle._cancelled: continue try: self._current_handle = handle t0 = super().time() handle._run() dt = super().time() - t0 if self._debug and dt >= self.slow_callback_duration: logger.warning('Executing %s took %.3f seconds', _format_handle(handle), dt * 1/self._time_dilation) if not self._matches_task(handle): t0 = super().time() # insert a delay while super().time() - t0 < dt * (1/self._time_dilation - 1): time.sleep(0.001) finally: self._current_handle = None handle = None # Needed to break cycles when an exception occurs. def time(self): obj = super().time() if self._last_objective_time is None: self._last_objective_time = self._last_subjective_time = obj return obj else: sub = self._last_subjective_time + (obj - self._last_objective_time) * self._time_dilation self._last_subjective_time = sub self._last_objective_time = obj return sub class CausalEventLoopPolicy(asyncio.DefaultEventLoopPolicy): def new_event_loop(self): return CausalEventLoop() asyncio.set_event_loop_policy(CausalEventLoopPolicy())