1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
|
import asyncio
import collections
import heapq
import time
from asyncio.log import logger
from asyncio.base_events import _format_handle
_MIN_SCHEDULED_TIMER_HANDLES = 100
_MIN_CANCELLED_TIMER_HANDLES_FRACTION = 0.5
MAXIMUM_SELECT_TIMEOUT = 24 * 3600
class CausalEventLoop(asyncio.SelectorEventLoop):
'''
Internal time in this event loop may run faster or slower than objective time.
Control loop time dilation vs. objective time by setting the ``time_dilation`` attribute
(dilation == X means that loop time is running X times faster than objective time).
'''
def __init__(self) -> None:
super().__init__()
self._last_objective_time = None
self._last_subjective_time = None
self._processing = collections.deque()
self._time_dilation = 1.0
self._task_to_optimize = None
_select = self._selector.select
def select(timeout: float):
return _select(timeout / self._time_dilation)
self._selector.select = select
def _set_experiment(self, task, dilation):
self._task_to_optimize = task
self._time_dilation = dilation
def _matches_task(self, handle):
cb = handle._callback
if isinstance(getattr(cb, '__self__', None), asyncio.tasks.Task):
task = cb.__self__
if task.get_name() == self._task_to_optimize:
return True
return False
def _update_ready(self, sampling=False):
"""
Polls the IO selector, schedules resulting callbacks, and schedules
'call_later' callbacks.
This logic was separated out of `run_once` so that the list of `ready`
tasks may be updated more frequently than once per iteration.
If SAMPLING is true, the timeout passed to the selector will always be 0.
"""
sched_count = len(self._scheduled)
if (sched_count > _MIN_SCHEDULED_TIMER_HANDLES and
self._timer_cancelled_count / sched_count >
_MIN_CANCELLED_TIMER_HANDLES_FRACTION):
# Remove delayed calls that were cancelled if their number
# is too high
new_scheduled = []
for handle in self._scheduled:
if handle._cancelled:
handle._scheduled = False
else:
new_scheduled.append(handle)
heapq.heapify(new_scheduled)
self._scheduled = new_scheduled
self._timer_cancelled_count = 0
else:
# Remove delayed calls that were cancelled from head of queue.
while self._scheduled and self._scheduled[0]._cancelled:
self._timer_cancelled_count -= 1
handle = heapq.heappop(self._scheduled)
handle._scheduled = False
timeout = None
if sampling or self._ready or self._stopping:
timeout = 0
elif self._scheduled:
# Compute the desired timeout.
timeout = self._scheduled[0]._when - self.time()
if timeout > MAXIMUM_SELECT_TIMEOUT:
timeout = MAXIMUM_SELECT_TIMEOUT
elif timeout < 0:
timeout = 0
event_list = self._selector.select(timeout)
self._process_events(event_list)
# Needed to break cycles when an exception occurs.
event_list = None
# Handle 'later' callbacks that are ready.
end_time = self.time() + self._clock_resolution
while self._scheduled:
handle = self._scheduled[0]
if handle._when >= end_time:
break
handle = heapq.heappop(self._scheduled)
handle._scheduled = False
self._ready.append(handle)
def _run_once(self):
"""Run one full iteration of the event loop.
This calls all currently ready callbacks.
"""
self._update_ready()
self._processing.extend(self._ready)
# This is the only place where callbacks are actually *called*.
# All other places just add them to ready.
# Note: We run all currently scheduled callbacks, but not any
# callbacks scheduled by callbacks run this time around --
# they will be run the next time (after another I/O poll).
# Use an idiom that is thread-safe without using locks.
ntodo = len(self._processing)
for i in range(ntodo):
handle = self._processing.popleft()
try:
self._ready.remove(handle)
except ValueError:
pass
if handle._cancelled:
continue
try:
self._current_handle = handle
t0 = super().time()
handle._run()
dt = super().time() - t0
if self._debug and dt >= self.slow_callback_duration:
logger.warning('Executing %s took %.3f seconds',
_format_handle(handle), dt * 1/self._time_dilation)
if not self._matches_task(handle):
t0 = super().time()
# insert a delay
while super().time() - t0 < dt * (1/self._time_dilation - 1):
time.sleep(0.001)
finally:
self._current_handle = None
handle = None # Needed to break cycles when an exception occurs.
def time(self):
obj = super().time()
if self._last_objective_time is None:
self._last_objective_time = self._last_subjective_time = obj
return obj
else:
sub = self._last_subjective_time + (obj - self._last_objective_time) * self._time_dilation
self._last_subjective_time = sub
self._last_objective_time = obj
return sub
class CausalEventLoopPolicy(asyncio.DefaultEventLoopPolicy):
def new_event_loop(self):
return CausalEventLoop()
asyncio.set_event_loop_policy(CausalEventLoopPolicy())
|