summaryrefslogtreecommitdiff
path: root/nemesis/causal_event_loop.py
blob: 4320655e01ba88799a3c2fd6c4cf47bb6dcd6fda (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
import asyncio
import collections
import heapq
import time
from asyncio.log import logger
from asyncio.base_events import _format_handle

_MIN_SCHEDULED_TIMER_HANDLES = 100
_MIN_CANCELLED_TIMER_HANDLES_FRACTION = 0.5
MAXIMUM_SELECT_TIMEOUT = 24 * 3600

class CausalEventLoop(asyncio.SelectorEventLoop):
    '''
        Internal time in this event loop may run faster or slower than objective time.
        Control loop time dilation vs. objective time by setting the ``time_dilation`` attribute
        (dilation == X means that loop time is running X times faster than objective time).
        '''

    def __init__(self) -> None:
        super().__init__()
        self._last_objective_time = None
        self._last_subjective_time = None
        self._processing = collections.deque()
        self._time_dilation = 1.0
        self._task_to_optimize = None

        _select = self._selector.select
        def select(timeout: float):
            return _select(timeout / self._time_dilation)
        self._selector.select = select

    def _set_experiment(self, task, dilation):
        self._task_to_optimize = task
        self._time_dilation = dilation

    def _matches_task(self, handle):
        cb = handle._callback
        if isinstance(getattr(cb, '__self__', None), asyncio.tasks.Task):
            task = cb.__self__
            if task.get_name() == self._task_to_optimize:
                return True
        return False

    def _update_ready(self, sampling=False):
        """
        Polls the IO selector, schedules resulting callbacks, and schedules
        'call_later' callbacks.

        This logic was separated out of `run_once` so that the list of `ready`
        tasks may be updated more frequently than once per iteration.

        If SAMPLING is true, the timeout passed to the selector will always be 0.
        """
        sched_count = len(self._scheduled)
        if (sched_count > _MIN_SCHEDULED_TIMER_HANDLES and
            self._timer_cancelled_count / sched_count >
            _MIN_CANCELLED_TIMER_HANDLES_FRACTION):
            # Remove delayed calls that were cancelled if their number
            # is too high
            new_scheduled = []
            for handle in self._scheduled:
                if handle._cancelled:
                    handle._scheduled = False
                else:
                    new_scheduled.append(handle)

            heapq.heapify(new_scheduled)
            self._scheduled = new_scheduled
            self._timer_cancelled_count = 0
        else:
            # Remove delayed calls that were cancelled from head of queue.
            while self._scheduled and self._scheduled[0]._cancelled:
                self._timer_cancelled_count -= 1
                handle = heapq.heappop(self._scheduled)
                handle._scheduled = False

        timeout = None
        if sampling or self._ready or self._stopping:
            timeout = 0
        elif self._scheduled:
            # Compute the desired timeout.
            timeout = self._scheduled[0]._when - self.time()
            if timeout > MAXIMUM_SELECT_TIMEOUT:
                timeout = MAXIMUM_SELECT_TIMEOUT
            elif timeout < 0:
                timeout = 0

        event_list = self._selector.select(timeout)
        self._process_events(event_list)
        # Needed to break cycles when an exception occurs.
        event_list = None

        # Handle 'later' callbacks that are ready.
        end_time = self.time() + self._clock_resolution
        while self._scheduled:
            handle = self._scheduled[0]
            if handle._when >= end_time:
                break
            handle = heapq.heappop(self._scheduled)
            handle._scheduled = False
            self._ready.append(handle)

    def _run_once(self):
        """Run one full iteration of the event loop.

        This calls all currently ready callbacks.
        """
        self._update_ready()
        self._processing.extend(self._ready)
        # This is the only place where callbacks are actually *called*.
        # All other places just add them to ready.
        # Note: We run all currently scheduled callbacks, but not any
        # callbacks scheduled by callbacks run this time around --
        # they will be run the next time (after another I/O poll).
        # Use an idiom that is thread-safe without using locks.
        ntodo = len(self._processing)
        for i in range(ntodo):
            handle = self._processing.popleft()
            try:
                self._ready.remove(handle)
            except ValueError:
                pass
            if handle._cancelled:
                continue
            try:
                self._current_handle = handle
                t0 = super().time()
                handle._run()
                dt = super().time() - t0
                if self._debug and dt >= self.slow_callback_duration:
                    logger.warning('Executing %s took %.3f seconds',
                                   _format_handle(handle), dt * 1/self._time_dilation)
                if not self._matches_task(handle):
                    t0 = super().time()
                    # insert a delay
                    while super().time() - t0 < dt * (1/self._time_dilation - 1):
                        time.sleep(0.001)
            finally:
                self._current_handle = None
            handle = None  # Needed to break cycles when an exception occurs.

    def time(self):
        obj = super().time()
        if self._last_objective_time is None:
            self._last_objective_time = self._last_subjective_time = obj
            return obj
        else:
            sub = self._last_subjective_time + (obj - self._last_objective_time) * self._time_dilation
            self._last_subjective_time = sub
            self._last_objective_time = obj
            return sub


class CausalEventLoopPolicy(asyncio.DefaultEventLoopPolicy):
    def new_event_loop(self):
        return CausalEventLoop()


asyncio.set_event_loop_policy(CausalEventLoopPolicy())