summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--nemesis/causal_event_loop.py37
-rw-r--r--nemesis/html_gen.py13
-rwxr-xr-xnemesis/nemesis.py19
3 files changed, 39 insertions, 30 deletions
diff --git a/nemesis/causal_event_loop.py b/nemesis/causal_event_loop.py
index b6fdea6..12a4084 100644
--- a/nemesis/causal_event_loop.py
+++ b/nemesis/causal_event_loop.py
@@ -176,8 +176,8 @@ class CausalEventLoop(asyncio.SelectorEventLoop):
def collect_ready_events(self, timeout=0):
event_list = self._selector.select(timeout)
- self._ready_events.append((event_list, self.time()))
- # self._logger.debug(f"\tPolled events for {timeout} (waiting={len(self._ready_events)})\n{pformat(event_list, indent=2)}")
+ if event_list:
+ self._ready_events.append((event_list, self.time()))
def update_ready(self):
'''
@@ -234,7 +234,10 @@ class CausalEventLoop(asyncio.SelectorEventLoop):
self._logger.debug(f"HANDLING I/O.")
self._logger.info("-" * 20)
self.collect_ready_events(timeout)
- for event_list in self._ready_events:
+ self._logger.debug(f"\tPolled events for {timeout} (waiting={len(self._ready_events)})\n{pformat(self._ready_events, indent=2)}")
+
+ while len(self._ready_events):
+ event_list = self._ready_events.pop(0)
self._process_events(*event_list)
self._logger.debug(f"HANDLING SCHEDULED.")
@@ -255,7 +258,9 @@ class CausalEventLoop(asyncio.SelectorEventLoop):
handle._when = time_to_buffer
handle.time_entered_pause_buffer = curr_time
# self._ready.append(handle)
- heapq.heappush(self._pause_buffer, handle)
+ # do not allow duplicates (FIX?)
+ if handle not in self._pause_buffer:
+ heapq.heappush(self._pause_buffer, handle)
self._logger.debug(f"\tscheduled -> pause_buffer for {_format_handle(handle)} (delay={delay})")
@@ -269,6 +274,7 @@ class CausalEventLoop(asyncio.SelectorEventLoop):
for other in self._pause_buffer[1:]:
assert handle._when <= other._when, f"Heap root {handle} is not smallest"
if handle._when >= end_time:
+ self._logger.debug(f"\t{_format_handle(handle)} is not ready to leave. Moving on.")
break
# pop the first item in the list
handle = heapq.heappop(self._pause_buffer)
@@ -348,6 +354,7 @@ class CausalEventLoop(asyncio.SelectorEventLoop):
if mask & selectors.EVENT_READ and reader is not None:
if reader._cancelled:
self._remove_reader(fileobj)
+ self._logger.info(f"\treader {reader} was cancelled.")
else:
time_to_buffer = time + \
self._get_pause_for_io(reader, time)
@@ -356,6 +363,7 @@ class CausalEventLoop(asyncio.SelectorEventLoop):
if mask & selectors.EVENT_WRITE and writer is not None:
if writer._cancelled:
self._remove_writer(fileobj)
+ self._logger.info(f"\twriter {writer} was cancelled.")
else:
time_to_buffer = time + \
self._get_pause_for_io(writer, time)
@@ -400,19 +408,19 @@ class CausalEventLoop(asyncio.SelectorEventLoop):
# required in cases where the event loop reuses the same handle
if handle._when < curr_time:
handle._when = curr_time
- if not handle._cancelled and handle not in self._pause_buffer:
+ if not handle._cancelled:
if self._is_vital(handle):
handle._when = curr_time
- # print(f'{_format_handle(handle)}: {curr_time}')
self._ready.append(handle)
self._logger.debug(f"\tio -> ready for VITAL {_format_handle(handle)}")
else:
- # print(f'DELAYED {_format_handle(handle)}: {curr_time}')
handle.time_entered_pause_buffer = curr_time
- # self._ready.append(handle)
- heapq.heappush(self._pause_buffer, handle)
- self._logger.debug(f"\tio -> pause_buffer for {_format_handle(handle)} (delay={curr_time - handle._when})")
-
+ # do not allow duplicates (FIX?)
+ if handle not in self._pause_buffer:
+ heapq.heappush(self._pause_buffer, handle)
+ self._logger.debug(f"\tio -> pause_buffer for {_format_handle(handle)} (delay={handle._when - curr_time})")
+ else:
+ self._logger.warning(f"\t_add_callback called on cancelled handle {_format_handle(handle)}")
def _get_pause_for_io(self, handle, io_time):
time_interval = (handle.register_time, io_time)
@@ -447,16 +455,19 @@ class CausalEventLoop(asyncio.SelectorEventLoop):
time += self._get_overlap(start, end, self._time_entered_coro,
curr_time)
- return time * self._speedup
+ pause_time = time * self._speedup
+ assert pause_time >= 0, f"Delay was found to be less than 0: {pause_time}"
+ return pause_time
def _get_overlap(self, a_start, a_end, b_start, b_end):
overlap_start = max(a_start, b_start)
overlap_end = min(a_end, b_end)
+ assert overlap_end >= overlap_start, f"Bad overlaps: {a_start} {a_end} : {b_start} {b_end} ({overlap_end - overlap_start})"
return overlap_end - overlap_start
def _is_vital(self, handle):
""" Methods which cannot afford to be paused."""
- blacklist = ['_read_from_self', '_read_ready']
+ blacklist = ['_read_from_self', '_read_ready', '_accept_connection']
cb = handle._callback
if isinstance(getattr(cb, '__self__', None), asyncio.tasks.Task):
if cb.__self__.get_coro().__name__ in blacklist:
diff --git a/nemesis/html_gen.py b/nemesis/html_gen.py
index 89c62d4..70d476b 100644
--- a/nemesis/html_gen.py
+++ b/nemesis/html_gen.py
@@ -40,8 +40,6 @@ def plot_results(results, output_file, input_file):
y_throughput_list = []
y_max_latency_list = []
y_num_callbacks_list = []
- latency_hover_text = []
- max_latency_hover_text = []
for experiment in experiments:
completed_callbacks = experiment[0]
@@ -73,20 +71,11 @@ def plot_results(results, output_file, input_file):
y_max_latency_list.append(max_cb[1])
breakdown = "<br>".join([f" {cb[0]}: {round(cb[1], 4)}" for cb in trimmed_callbacks])
- latency_hover_text.append(
- f"{coro_name}<br>Speedup: {speedup}<br>Trimmed Average Wait ({int(TRIM_PERCENT*100)}%): {round(latency, 4)}<br>Breakdown:<br>{breakdown}"
- )
- max_latency_hover_text.append(
- f"{coro_name}<br>Speedup: {speedup}<br>Max Wait: {round(max_cb[1], 4)}<br>Handle: {max_cb[0]}"
- )
else:
latency = 0
y_latency_list.append(latency)
- latency_hover_text.append(f"{coro_name}<br>Speedup: {speedup}<br>No callbacks")
- max_latency_hover_text.append(f"{coro_name}<br>Speedup: {speedup}<br>No callbacks")
-
# handle throughput graph
throughput = num_callbacks / virtual_run_time if virtual_run_time else 0
y_throughput_list.append(throughput)
@@ -97,7 +86,6 @@ def plot_results(results, output_file, input_file):
mode='markers',
name=coro_name,
marker=dict(color=get_color(coro_name)),
- hovertext=latency_hover_text,
showlegend=True,
), row=1, col=col)
@@ -116,7 +104,6 @@ def plot_results(results, output_file, input_file):
mode='markers',
name=coro_name,
marker=dict(color=get_color(coro_name)),
- hovertext=max_latency_hover_text,
showlegend=False,
), row=3, col=col)
diff --git a/nemesis/nemesis.py b/nemesis/nemesis.py
index 6d2941c..03901e3 100755
--- a/nemesis/nemesis.py
+++ b/nemesis/nemesis.py
@@ -31,6 +31,7 @@ from html_gen import plot_results
import argparse
import asyncio
import os
+import inspect
import random
import signal
import sys
@@ -39,6 +40,8 @@ import time
import traceback
import types
+CO_COROUTINE = inspect.CO_COROUTINE
+
class Experiment:
# event loops participating in this this experiment
@@ -157,7 +160,7 @@ class Nemesis(object):
loops = Nemesis.experiment_data.get_loops()
exp_coro = Nemesis.experiment_coro
for loop in loops:
- coro = Nemesis._get_current_frame(loop)
+ coro = Nemesis._get_current_frame(loop).f_code.co_name
prev_coro = Nemesis.prev_coro[loop]
if not prev_coro == coro:
if prev_coro == exp_coro:
@@ -177,8 +180,8 @@ class Nemesis(object):
loops = Nemesis._get_event_loops()
for loop in loops:
frame = Nemesis._get_current_frame(loop)
- if frame is not None:
- frames.append(frame)
+ if frame is not None and Nemesis._is_child_of_async(frame):
+ frames.append(frame.f_code.co_name)
if frames:
Nemesis._start_experiment(random.choice(frames),
Nemesis._select_speedup())
@@ -218,10 +221,18 @@ class Nemesis(object):
if frame:
fname = frame.f_code.co_filename
if frame:
- return frame.f_code.co_name
+ return frame
return None
@staticmethod
+ def _is_child_of_async(frame):
+ '''Returns TRUE if an async method is a parent of FRAME.'''
+ while frame:
+ if bool(frame.f_code.co_flags & CO_COROUTINE):
+ return True
+ frame = frame.f_back
+
+ @staticmethod
def _get_event_loops():
'''Returns each thread's event loop, if it exists.'''
loops = []