diff options
| -rw-r--r-- | nemesis/causal_event_loop.py | 37 | ||||
| -rw-r--r-- | nemesis/html_gen.py | 13 | ||||
| -rwxr-xr-x | nemesis/nemesis.py | 19 |
3 files changed, 39 insertions, 30 deletions
diff --git a/nemesis/causal_event_loop.py b/nemesis/causal_event_loop.py index b6fdea6..12a4084 100644 --- a/nemesis/causal_event_loop.py +++ b/nemesis/causal_event_loop.py @@ -176,8 +176,8 @@ class CausalEventLoop(asyncio.SelectorEventLoop): def collect_ready_events(self, timeout=0): event_list = self._selector.select(timeout) - self._ready_events.append((event_list, self.time())) - # self._logger.debug(f"\tPolled events for {timeout} (waiting={len(self._ready_events)})\n{pformat(event_list, indent=2)}") + if event_list: + self._ready_events.append((event_list, self.time())) def update_ready(self): ''' @@ -234,7 +234,10 @@ class CausalEventLoop(asyncio.SelectorEventLoop): self._logger.debug(f"HANDLING I/O.") self._logger.info("-" * 20) self.collect_ready_events(timeout) - for event_list in self._ready_events: + self._logger.debug(f"\tPolled events for {timeout} (waiting={len(self._ready_events)})\n{pformat(self._ready_events, indent=2)}") + + while len(self._ready_events): + event_list = self._ready_events.pop(0) self._process_events(*event_list) self._logger.debug(f"HANDLING SCHEDULED.") @@ -255,7 +258,9 @@ class CausalEventLoop(asyncio.SelectorEventLoop): handle._when = time_to_buffer handle.time_entered_pause_buffer = curr_time # self._ready.append(handle) - heapq.heappush(self._pause_buffer, handle) + # do not allow duplicates (FIX?) + if handle not in self._pause_buffer: + heapq.heappush(self._pause_buffer, handle) self._logger.debug(f"\tscheduled -> pause_buffer for {_format_handle(handle)} (delay={delay})") @@ -269,6 +274,7 @@ class CausalEventLoop(asyncio.SelectorEventLoop): for other in self._pause_buffer[1:]: assert handle._when <= other._when, f"Heap root {handle} is not smallest" if handle._when >= end_time: + self._logger.debug(f"\t{_format_handle(handle)} is not ready to leave. Moving on.") break # pop the first item in the list handle = heapq.heappop(self._pause_buffer) @@ -348,6 +354,7 @@ class CausalEventLoop(asyncio.SelectorEventLoop): if mask & selectors.EVENT_READ and reader is not None: if reader._cancelled: self._remove_reader(fileobj) + self._logger.info(f"\treader {reader} was cancelled.") else: time_to_buffer = time + \ self._get_pause_for_io(reader, time) @@ -356,6 +363,7 @@ class CausalEventLoop(asyncio.SelectorEventLoop): if mask & selectors.EVENT_WRITE and writer is not None: if writer._cancelled: self._remove_writer(fileobj) + self._logger.info(f"\twriter {writer} was cancelled.") else: time_to_buffer = time + \ self._get_pause_for_io(writer, time) @@ -400,19 +408,19 @@ class CausalEventLoop(asyncio.SelectorEventLoop): # required in cases where the event loop reuses the same handle if handle._when < curr_time: handle._when = curr_time - if not handle._cancelled and handle not in self._pause_buffer: + if not handle._cancelled: if self._is_vital(handle): handle._when = curr_time - # print(f'{_format_handle(handle)}: {curr_time}') self._ready.append(handle) self._logger.debug(f"\tio -> ready for VITAL {_format_handle(handle)}") else: - # print(f'DELAYED {_format_handle(handle)}: {curr_time}') handle.time_entered_pause_buffer = curr_time - # self._ready.append(handle) - heapq.heappush(self._pause_buffer, handle) - self._logger.debug(f"\tio -> pause_buffer for {_format_handle(handle)} (delay={curr_time - handle._when})") - + # do not allow duplicates (FIX?) + if handle not in self._pause_buffer: + heapq.heappush(self._pause_buffer, handle) + self._logger.debug(f"\tio -> pause_buffer for {_format_handle(handle)} (delay={handle._when - curr_time})") + else: + self._logger.warning(f"\t_add_callback called on cancelled handle {_format_handle(handle)}") def _get_pause_for_io(self, handle, io_time): time_interval = (handle.register_time, io_time) @@ -447,16 +455,19 @@ class CausalEventLoop(asyncio.SelectorEventLoop): time += self._get_overlap(start, end, self._time_entered_coro, curr_time) - return time * self._speedup + pause_time = time * self._speedup + assert pause_time >= 0, f"Delay was found to be less than 0: {pause_time}" + return pause_time def _get_overlap(self, a_start, a_end, b_start, b_end): overlap_start = max(a_start, b_start) overlap_end = min(a_end, b_end) + assert overlap_end >= overlap_start, f"Bad overlaps: {a_start} {a_end} : {b_start} {b_end} ({overlap_end - overlap_start})" return overlap_end - overlap_start def _is_vital(self, handle): """ Methods which cannot afford to be paused.""" - blacklist = ['_read_from_self', '_read_ready'] + blacklist = ['_read_from_self', '_read_ready', '_accept_connection'] cb = handle._callback if isinstance(getattr(cb, '__self__', None), asyncio.tasks.Task): if cb.__self__.get_coro().__name__ in blacklist: diff --git a/nemesis/html_gen.py b/nemesis/html_gen.py index 89c62d4..70d476b 100644 --- a/nemesis/html_gen.py +++ b/nemesis/html_gen.py @@ -40,8 +40,6 @@ def plot_results(results, output_file, input_file): y_throughput_list = [] y_max_latency_list = [] y_num_callbacks_list = [] - latency_hover_text = [] - max_latency_hover_text = [] for experiment in experiments: completed_callbacks = experiment[0] @@ -73,20 +71,11 @@ def plot_results(results, output_file, input_file): y_max_latency_list.append(max_cb[1]) breakdown = "<br>".join([f" {cb[0]}: {round(cb[1], 4)}" for cb in trimmed_callbacks]) - latency_hover_text.append( - f"{coro_name}<br>Speedup: {speedup}<br>Trimmed Average Wait ({int(TRIM_PERCENT*100)}%): {round(latency, 4)}<br>Breakdown:<br>{breakdown}" - ) - max_latency_hover_text.append( - f"{coro_name}<br>Speedup: {speedup}<br>Max Wait: {round(max_cb[1], 4)}<br>Handle: {max_cb[0]}" - ) else: latency = 0 y_latency_list.append(latency) - latency_hover_text.append(f"{coro_name}<br>Speedup: {speedup}<br>No callbacks") - max_latency_hover_text.append(f"{coro_name}<br>Speedup: {speedup}<br>No callbacks") - # handle throughput graph throughput = num_callbacks / virtual_run_time if virtual_run_time else 0 y_throughput_list.append(throughput) @@ -97,7 +86,6 @@ def plot_results(results, output_file, input_file): mode='markers', name=coro_name, marker=dict(color=get_color(coro_name)), - hovertext=latency_hover_text, showlegend=True, ), row=1, col=col) @@ -116,7 +104,6 @@ def plot_results(results, output_file, input_file): mode='markers', name=coro_name, marker=dict(color=get_color(coro_name)), - hovertext=max_latency_hover_text, showlegend=False, ), row=3, col=col) diff --git a/nemesis/nemesis.py b/nemesis/nemesis.py index 6d2941c..03901e3 100755 --- a/nemesis/nemesis.py +++ b/nemesis/nemesis.py @@ -31,6 +31,7 @@ from html_gen import plot_results import argparse import asyncio import os +import inspect import random import signal import sys @@ -39,6 +40,8 @@ import time import traceback import types +CO_COROUTINE = inspect.CO_COROUTINE + class Experiment: # event loops participating in this this experiment @@ -157,7 +160,7 @@ class Nemesis(object): loops = Nemesis.experiment_data.get_loops() exp_coro = Nemesis.experiment_coro for loop in loops: - coro = Nemesis._get_current_frame(loop) + coro = Nemesis._get_current_frame(loop).f_code.co_name prev_coro = Nemesis.prev_coro[loop] if not prev_coro == coro: if prev_coro == exp_coro: @@ -177,8 +180,8 @@ class Nemesis(object): loops = Nemesis._get_event_loops() for loop in loops: frame = Nemesis._get_current_frame(loop) - if frame is not None: - frames.append(frame) + if frame is not None and Nemesis._is_child_of_async(frame): + frames.append(frame.f_code.co_name) if frames: Nemesis._start_experiment(random.choice(frames), Nemesis._select_speedup()) @@ -218,10 +221,18 @@ class Nemesis(object): if frame: fname = frame.f_code.co_filename if frame: - return frame.f_code.co_name + return frame return None @staticmethod + def _is_child_of_async(frame): + '''Returns TRUE if an async method is a parent of FRAME.''' + while frame: + if bool(frame.f_code.co_flags & CO_COROUTINE): + return True + frame = frame.f_back + + @staticmethod def _get_event_loops(): '''Returns each thread's event loop, if it exists.''' loops = [] |
