summaryrefslogtreecommitdiff
path: root/nemesis/causal_event_loop.py
diff options
context:
space:
mode:
Diffstat (limited to 'nemesis/causal_event_loop.py')
-rw-r--r--nemesis/causal_event_loop.py110
1 files changed, 83 insertions, 27 deletions
diff --git a/nemesis/causal_event_loop.py b/nemesis/causal_event_loop.py
index 6d18dbc..b6fdea6 100644
--- a/nemesis/causal_event_loop.py
+++ b/nemesis/causal_event_loop.py
@@ -24,6 +24,7 @@ from sortedcontainers import SortedList
import heapq
import selectors
import sys
+import os
import time
import traceback
from copy import copy
@@ -31,10 +32,16 @@ from asyncio.log import logger
from asyncio import Task, events
from asyncio.base_events import _format_handle
+from pathlib import Path
+from pprint import pformat
+import logging
+
_MIN_SCHEDULED_TIMER_HANDLES = 100
_MIN_CANCELLED_TIMER_HANDLES_FRACTION = 0.5
MAXIMUM_SELECT_TIMEOUT = 24 * 3600
+TRACE_PATH = \
+ Path(__file__).resolve().parent.parent / "logs" / "trace.log"
orig_handle = asyncio.events.Handle
@@ -90,9 +97,11 @@ for sc in [orig_handle] + orig_handle.__subclasses__():
class CausalEventLoop(asyncio.SelectorEventLoop):
+ _log_level = logging.DEBUG
+ _logger = None
# a value between 0 and 1. 0 means no optimization,
# 1 means the target coroutine is optimized away entirely
- _speedup = 1.0
+ _speedup = 0.0
# a list of callbacks which have recently completed
_pause_buffer = []
# a list of intervals in which the target coroutine has been active
@@ -107,12 +116,32 @@ class CausalEventLoop(asyncio.SelectorEventLoop):
def __init__(self) -> None:
super().__init__()
+ self.set_logger()
+
+ def set_logger(self):
+ if not os.path.exists(TRACE_PATH):
+ with open(TRACE_PATH, 'w') as file:
+ pass
+ self._logger = logging.getLogger(f'LOOP {self._thread_id}')
+ self._logger.setLevel(self._log_level)
+ self._logger.propagate = False
+ file_handler = logging.FileHandler(TRACE_PATH)
+ file_handler.setLevel(self._log_level)
+ formatter = logging.Formatter('%(name)s - %(asctime)s - %(levelname)s --- %(message)s')
+ file_handler.setFormatter(formatter)
+ self._logger.addHandler(file_handler)
+ self._logger.info("═" * 40)
+ self._logger.info("STARTING LOOP")
+ self._logger.info("═" * 40)
def set_speedup(self, speedup):
# print(self._coro_intervals)
self._start_time = self.time()
self._speedup = speedup
+ # self._logger.info(f"STARTING EXPERIMENT WITH {self._speedup}")
+ # self._logger.info("═" * 30)
+
# reset experiment counters
self._coro_intervals.clear()
self._completed_coros.clear()
@@ -132,20 +161,23 @@ class CausalEventLoop(asyncio.SelectorEventLoop):
return (curr_time - self._start_time) - pause_time
def ping_enter_coro(self):
+ # self._logger.debug(f"Recording coro ENTER.")
self._time_entered_coro = self.time()
def ping_exit_coro(self):
try:
assert isinstance(self._time_entered_coro, float), "Tried to exit coro before recorded entry!"
except AssertionError as e:
- print(f"Assertion failed: {e}")
+ self._logger.critical(f"Tried to exit coro before recorded entry: {e}. Aborting.")
sys.exit(1)
+ # self._logger.debug(f"Recording coro EXIT.")
self._coro_intervals.add((self._time_entered_coro, self.time()))
self._time_entered_coro = None
def collect_ready_events(self, timeout=0):
event_list = self._selector.select(timeout)
self._ready_events.append((event_list, self.time()))
+ # self._logger.debug(f"\tPolled events for {timeout} (waiting={len(self._ready_events)})\n{pformat(event_list, indent=2)}")
def update_ready(self):
'''
@@ -154,6 +186,8 @@ class CausalEventLoop(asyncio.SelectorEventLoop):
'''
curr_time = self.time()
sched_count = len(self._scheduled)
+ # two methods to cleanup cancelled callbacks;
+ # avoid the expensive one whenever possible
if (sched_count > _MIN_SCHEDULED_TIMER_HANDLES and
self._timer_cancelled_count / sched_count >
_MIN_CANCELLED_TIMER_HANDLES_FRACTION):
@@ -163,6 +197,7 @@ class CausalEventLoop(asyncio.SelectorEventLoop):
for handle in self._scheduled:
if handle._cancelled:
handle._scheduled = False
+ self._logger.debug(f"\tSlow cleanup killed {_format_handle(handle)}")
else:
new_scheduled.append(handle)
@@ -174,6 +209,7 @@ class CausalEventLoop(asyncio.SelectorEventLoop):
while self._scheduled and self._scheduled[0]._cancelled:
self._timer_cancelled_count -= 1
handle = heapq.heappop(self._scheduled)
+ self._logger.debug(f"\tLazy cleanup killed {_format_handle(handle)}")
handle._scheduled = False
timeout = None
@@ -195,10 +231,14 @@ class CausalEventLoop(asyncio.SelectorEventLoop):
timeout = max(0, min(timeout, MAXIMUM_SELECT_TIMEOUT))
timeout = 0
+ self._logger.debug(f"HANDLING I/O.")
+ self._logger.info("-" * 20)
self.collect_ready_events(timeout)
for event_list in self._ready_events:
self._process_events(*event_list)
+ self._logger.debug(f"HANDLING SCHEDULED.")
+ self._logger.info("-" * 20)
# Handle 'later' callbacks that are ready.
curr_time = self.time()
end_time = curr_time + self._clock_resolution
@@ -210,11 +250,17 @@ class CausalEventLoop(asyncio.SelectorEventLoop):
handle = heapq.heappop(self._scheduled)
handle._scheduled = False
- time_to_buffer = when + self._get_pause_for_io(handle, curr_time)
+ delay = self._get_pause_for_io(handle, curr_time)
+ time_to_buffer = when + delay
handle._when = time_to_buffer
handle.time_entered_pause_buffer = curr_time
+ # self._ready.append(handle)
heapq.heappush(self._pause_buffer, handle)
+ self._logger.debug(f"\tscheduled -> pause_buffer for {_format_handle(handle)} (delay={delay})")
+
+ self._logger.debug(f"HANDLING PAUSED.")
+ self._logger.info("-" * 20)
# handle callbacks which can leave pause timeout
while self._pause_buffer:
# required when handle's _when is modified in place
@@ -230,13 +276,15 @@ class CausalEventLoop(asyncio.SelectorEventLoop):
# if we paused during buffering, we need to delay again
# TODO clean this up
# this whole file has 'rounding' timing errors :(
- handle._when = handle._when + \
- self._get_pause_for_pause_time(handle, curr_time)
+ delay = self._get_pause_for_pause_time(handle, curr_time)
+ handle._when = handle._when + delay
if handle._when >= end_time:
handle.time_entered_pause_buffer = curr_time
heapq.heappush(self._pause_buffer, handle)
+ self._logger.debug(f"\tpause_buffer -> pause_buffer for {_format_handle(handle)} (delay={delay})")
else:
self._ready.append(handle)
+ self._logger.debug(f"\tpause_buffer -> ready for {_format_handle(handle)}")
def _run_once(self):
"""
@@ -244,6 +292,8 @@ class CausalEventLoop(asyncio.SelectorEventLoop):
This calls all currently ready callbacks.
"""
+ self._logger.debug("STARTING ITERATION.")
+ self._logger.info("-" * 40)
self.update_ready()
# This is the only place where callbacks are actually *called*.
@@ -252,7 +302,9 @@ class CausalEventLoop(asyncio.SelectorEventLoop):
# callbacks scheduled by callbacks run this time around --
# they will be run the next time (after another I/O poll).
# Use an idiom that is thread-safe without using locks.
+ self._logger.info("-" * 40)
ntodo = len(self._ready)
+ self._logger.debug(f"RUNNING {ntodo} CALLBACKS.")
for i in range(ntodo):
handle = self._ready.popleft()
if handle._cancelled:
@@ -269,21 +321,20 @@ class CausalEventLoop(asyncio.SelectorEventLoop):
if self._debug and dt >= self.slow_callback_duration:
logger.warning('Executing %s took %.3f seconds',
_format_handle(handle), dt)
-
- # do not record coroutines which left I/O during the previous experiment
- # the time held in the pause buffer would have also been incorrect for
- # this experiment, but there is nothing we can do about it.
- if handle._when > self._start_time:
- time_interval = (handle._when, process_start_time)
- pause_time = self._get_pause_time(time_interval)
- adjusted_start_time = process_start_time - pause_time
- wait_time = adjusted_start_time - handle._when
- try:
- assert wait_time >= -0.0001, f"wait time on {_format_handle(handle)} was found to be {wait_time:.4f}!"
- except AssertionError as e:
- print(f"Assertion failed: {e}")
- sys.exit(1)
- self._completed_coros.append((_format_handle(handle), wait_time))
+ self._logger.warning('Executing %s took %.3f seconds',
+ _format_handle(handle), dt)
+
+ time_interval = (handle._when, process_start_time)
+ pause_time = self._get_pause_time(time_interval)
+ adjusted_start_time = process_start_time - pause_time
+ wait_time = adjusted_start_time - handle._when
+ try:
+ assert wait_time >= -0.0001, f"wait time on {_format_handle(handle)} was found to be {wait_time:.4f}!"
+ except AssertionError as e:
+ self._logger.critical(f'Negative latency on callback {_format_handle(handle)} ({dt}). Aborting.')
+ sys.exit(1)
+ self._completed_coros.append((_format_handle(handle), wait_time))
+ self._logger.debug(f'\tCompleted {_format_handle(handle)} with latency {dt}')
except Exception:
traceback.print_exc()
@@ -321,6 +372,7 @@ class CausalEventLoop(asyncio.SelectorEventLoop):
if not handle._when:
handle._when = curr_time
self._ready.append(handle)
+ self._logger.debug(f"\tio -> ready for {_format_handle(handle)}")
return handle
def call_soon_threadsafe(self, callback, *args, context=None):
@@ -329,7 +381,10 @@ class CausalEventLoop(asyncio.SelectorEventLoop):
if self._debug:
self._check_callback(callback, 'call_soon_threadsafe')
handle = events._ThreadSafeHandle(callback, args, self, context)
- self._add_callback(handle)
+ if not handle._when:
+ handle._when = self.time()
+ self._ready.append(handle)
+ self._logger.debug(f"\tio -> ready for {_format_handle(handle)}")
if handle._source_traceback:
del handle._source_traceback[-1]
if handle._source_traceback:
@@ -350,10 +405,13 @@ class CausalEventLoop(asyncio.SelectorEventLoop):
handle._when = curr_time
# print(f'{_format_handle(handle)}: {curr_time}')
self._ready.append(handle)
+ self._logger.debug(f"\tio -> ready for VITAL {_format_handle(handle)}")
else:
# print(f'DELAYED {_format_handle(handle)}: {curr_time}')
handle.time_entered_pause_buffer = curr_time
+ # self._ready.append(handle)
heapq.heappush(self._pause_buffer, handle)
+ self._logger.debug(f"\tio -> pause_buffer for {_format_handle(handle)} (delay={curr_time - handle._when})")
def _get_pause_for_io(self, handle, io_time):
@@ -409,9 +467,7 @@ class CausalEventLoop(asyncio.SelectorEventLoop):
return False
-class CausalEventLoopPolicy(asyncio.DefaultEventLoopPolicy):
- def new_event_loop(self):
- return CausalEventLoop()
-
-
-asyncio.set_event_loop_policy(CausalEventLoopPolicy())
+# to profile your program, start the event loop with
+# asyncio.run(coro, loop_factory=causal_loop_factory)
+def causal_loop_factory():
+ return CausalEventLoop()