diff options
| author | bd <bdunahu@operationnull.com> | 2025-10-15 00:16:38 -0400 |
|---|---|---|
| committer | bd <bdunahu@operationnull.com> | 2025-10-15 00:16:38 -0400 |
| commit | 1a439653264adc3d9648645d2bb4006f331c1722 (patch) | |
| tree | 27a4d50f3fb1cd333b15ec372500dd78cfb296b9 | |
| parent | 16aa392a3c08c8769cc30bdbc1830a31f9b0808e (diff) | |
Fix a lot of bugs--core callbacks being delayed, reused handles
| -rw-r--r-- | nemesis/causal_event_loop.py | 39 | ||||
| -rw-r--r-- | nemesis/html_gen.py | 218 | ||||
| -rwxr-xr-x | nemesis/nemesis.py | 13 |
3 files changed, 163 insertions, 107 deletions
diff --git a/nemesis/causal_event_loop.py b/nemesis/causal_event_loop.py index 2fe8435..b73458f 100644 --- a/nemesis/causal_event_loop.py +++ b/nemesis/causal_event_loop.py @@ -23,6 +23,7 @@ import collections from sortedcontainers import SortedList import heapq import selectors +import sys import time import traceback from copy import copy @@ -114,7 +115,7 @@ class CausalEventLoop(asyncio.SelectorEventLoop): self._completed_coros.clear() def get_completed_coros(self): - return self._completed_coros + return copy(self._completed_coros) def get_pause_time(self): if not self._coro_intervals: @@ -210,14 +211,16 @@ class CausalEventLoop(asyncio.SelectorEventLoop): heapq.heappush(self._pause_buffer, handle) # handle callbacks which can leave pause timeout - heapq.heapify(self._pause_buffer) - # print([x._when for x in self._pause_buffer]) while self._pause_buffer: + # required when handle's _when is modified in place + heapq.heapify(self._pause_buffer) handle = self._pause_buffer[0] + for other in self._pause_buffer[1:]: + assert handle._when <= other._when, f"Heap root {handle} is not smallest" if handle._when >= end_time: break # pop the first item in the list - handle = self._pause_buffer.pop(0) + handle = heapq.heappop(self._pause_buffer) # if we paused during buffering, we need to delay again # TODO clean this up @@ -271,8 +274,7 @@ class CausalEventLoop(asyncio.SelectorEventLoop): except AssertionError as e: print(f"Assertion failed: {e}") sys.exit(1) - if not self._is_blacklisted(handle): - self._completed_coros.append((_format_handle(handle), wait_time)) + self._completed_coros.append((_format_handle(handle), wait_time)) except Exception: traceback.print_exc() finally: @@ -300,10 +302,15 @@ class CausalEventLoop(asyncio.SelectorEventLoop): self._add_callback(writer) def _call_soon(self, callback, args, context): + """Do not add 'callsoon' events to the pause buffer. + Add them directly to ready.""" + curr_time = self.time() handle = events.Handle(callback, args, self, context) if handle._source_traceback: del handle._source_traceback[-1] - self._add_callback(handle) + if not handle._when: + handle._when = curr_time + self._ready.append(handle) return handle def call_soon_threadsafe(self, callback, *args, context=None): @@ -325,9 +332,18 @@ class CausalEventLoop(asyncio.SelectorEventLoop): curr_time = self.time() if not handle._when: handle._when = curr_time + # required in cases where the event loop reuses the same handle + if handle._when < curr_time: + handle._when = curr_time if not handle._cancelled and handle not in self._pause_buffer: - handle.time_entered_pause_buffer = curr_time - heapq.heappush(self._pause_buffer, handle) + if self._is_vital(handle): + handle._when = curr_time + # print(f'{_format_handle(handle)}: {curr_time}') + self._ready.append(handle) + else: + # print(f'DELAYED {_format_handle(handle)}: {curr_time}') + handle.time_entered_pause_buffer = curr_time + heapq.heappush(self._pause_buffer, handle) def _get_pause_for_io(self, handle, io_time): @@ -370,8 +386,9 @@ class CausalEventLoop(asyncio.SelectorEventLoop): overlap_end = min(a_end, b_end) return overlap_end - overlap_start - def _is_blacklisted(self, handle): - blacklist = ['_read_from_self'] + def _is_vital(self, handle): + """ Methods which cannot afford to be paused.""" + blacklist = ['_read_from_self', '_read_ready'] cb = handle._callback if isinstance(getattr(cb, '__self__', None), asyncio.tasks.Task): if cb.__self__.get_coro().__name__ in blacklist: diff --git a/nemesis/html_gen.py b/nemesis/html_gen.py index 60d0c06..bffa748 100644 --- a/nemesis/html_gen.py +++ b/nemesis/html_gen.py @@ -1,6 +1,9 @@ import plotly.graph_objects as go from plotly.subplots import make_subplots import hashlib +import math + +TRIM_PERCENT = 0.05 def get_color(name): hash_object = hashlib.md5(name.encode()) @@ -8,92 +11,135 @@ def get_color(name): return f'hsl({color_index}, 100%, 50%)' def plot_results(results, output_file, input_file): - fig = make_subplots(rows=4, cols=1) - - for i, (coro_name, x_values) in enumerate(results.items(), start=1): - x_list = [] - y_latency_list = [] - y_throughput_list = [] - y_max_latency_list = [] - y_num_callbacks_list = [] - latency_hover_text = [] - max_latency_hover_text = [] - - for speedup, experiments in x_values.items(): - for experiment in experiments: - - completed_callbacks = experiment["latency"] - virtual_run_time = experiment["virtual_run_time"][0] - - x_list.append(speedup * 100) - - num_callbacks = len(completed_callbacks) - y_num_callbacks_list.append(num_callbacks) - - # handle average latency graph - - if num_callbacks > 0: - - breakdown = "<br>".join([f" {cb[0]}: {round(cb[1], 4)}" for cb in completed_callbacks]) - total_wait = sum([cb[1] for cb in completed_callbacks]) - max_cb = max(completed_callbacks, key=lambda cb: cb[1]) - - latency = total_wait / num_callbacks - - y_max_latency_list.append(max_cb[1]) - y_latency_list.append(latency) - - else: - latency = 0 - y_latency_list.append(latency) - - latency_hover_text.append(f"{coro_name}<br>Speedup: {speedup}<br>Average Wait: {round(latency, 4)}<br>Breakdown:<br>{breakdown}") - max_latency_hover_text.append(f"{coro_name}<br>Speedup: {speedup}<br>Max Wait: {round(max_cb[1], 4)}<br>Handle: {max_cb[0]}") - - # handle throughput graph - throughput = num_callbacks / virtual_run_time - y_throughput_list.append(throughput) - - fig.add_trace(go.Scatter( - x=x_list, - y=y_latency_list, - mode='markers', - name=coro_name, - marker=dict(color=get_color(coro_name)), - hovertext=latency_hover_text, - showlegend=True, - ), row=1, col=1) - - fig.add_trace(go.Scatter( - x=x_list, - y=y_throughput_list, - mode='markers', - name=coro_name, - marker=dict(color=get_color(coro_name)), - showlegend=False, - ), row=2, col=1) - - fig.add_trace(go.Scatter( - x=x_list, - y=y_max_latency_list, - mode='markers', - name=coro_name, - marker=dict(color=get_color(coro_name)), - hovertext=max_latency_hover_text, - showlegend=False, - ), row=3, col=1) - - fig.add_trace(go.Scatter( - x=x_list, - y=y_num_callbacks_list, - mode='markers', - name=coro_name, - marker=dict(color=get_color(coro_name)), - showlegend=False, - ), row=4, col=1) - - fig.update_layout(title=input_file) - fig.update_xaxes(title_text="speedup (% optimized away)", row=4, col=1) + # determine the number of loops we have data for + total_loops = set() + for x_values in results.values(): + for xx_values in x_values.values(): + total_loops.update(xx_values.keys()) + total_loops = sorted(total_loops) + num_loops = len(total_loops) + loop_to_col = {loop: idx + 1 for idx, loop in enumerate(total_loops)} + + fig = make_subplots( + rows=4, + cols=num_loops, + subplot_titles=[f"{loop}" for loop in total_loops] * 4, + vertical_spacing=0.1, + horizontal_spacing=0.05, + shared_xaxes=True, + shared_yaxes=False, + ) + + for coro_name, x_values in results.items(): + for speedup, xx_values in x_values.items(): + for loop, experiments in xx_values.items(): + col = loop_to_col[loop] + + x_list = [] + y_latency_list = [] + y_throughput_list = [] + y_max_latency_list = [] + y_num_callbacks_list = [] + latency_hover_text = [] + max_latency_hover_text = [] + + for experiment in experiments: + completed_callbacks = experiment[0] + virtual_run_time = experiment[1] + + x_val = speedup * 100 + x_list.append(x_val) + + num_callbacks = len(completed_callbacks) + y_num_callbacks_list.append(num_callbacks) + + # handle average latency graph + if num_callbacks > 0: + + trim_count = math.floor(num_callbacks * TRIM_PERCENT / 2) + sorted_callbacks = sorted(completed_callbacks, key=lambda cb: cb[1]) + + trimmed_callbacks = ( + sorted_callbacks[trim_count: len(sorted_callbacks) - trim_count] + if trim_count > 0 else sorted_callbacks + ) + + trimmed_latencies = [cb[1] for cb in trimmed_callbacks] + latency = sum(trimmed_latencies) / len(trimmed_latencies) + + y_latency_list.append(latency) + + max_cb = max(completed_callbacks, key=lambda cb: cb[1]) + y_max_latency_list.append(max_cb[1]) + + breakdown = "<br>".join([f" {cb[0]}: {round(cb[1], 4)}" for cb in trimmed_callbacks]) + latency_hover_text.append( + f"{coro_name}<br>Speedup: {speedup}<br>Trimmed Average Wait ({int(TRIM_PERCENT*100)}%): {round(latency, 4)}<br>Breakdown:<br>{breakdown}" + ) + max_latency_hover_text.append( + f"{coro_name}<br>Speedup: {speedup}<br>Max Wait: {round(max_cb[1], 4)}<br>Handle: {max_cb[0]}" + ) + + else: + latency = 0 + y_latency_list.append(latency) + + latency_hover_text.append(f"{coro_name}<br>Speedup: {speedup}<br>No callbacks") + max_latency_hover_text.append(f"{coro_name}<br>Speedup: {speedup}<br>No callbacks") + + # handle throughput graph + throughput = num_callbacks / virtual_run_time if virtual_run_time else 0 + y_throughput_list.append(throughput) + + fig.add_trace(go.Scatter( + x=x_list, + y=y_latency_list, + mode='markers', + name=coro_name, + marker=dict(color=get_color(coro_name)), + # hovertext=latency_hover_text, + showlegend=False, + ), row=1, col=col) + + fig.add_trace(go.Scatter( + x=x_list, + y=y_throughput_list, + mode='markers', + name=coro_name, + marker=dict(color=get_color(coro_name)), + showlegend=False, + ), row=2, col=col) + + fig.add_trace(go.Scatter( + x=x_list, + y=y_max_latency_list, + mode='markers', + name=coro_name, + marker=dict(color=get_color(coro_name)), + # hovertext=max_latency_hover_text, + showlegend=False, + ), row=3, col=col) + + fig.add_trace(go.Scatter( + x=x_list, + y=y_num_callbacks_list, + mode='markers', + name=coro_name, + marker=dict(color=get_color(coro_name)), + showlegend=False, + ), row=4, col=col) + + fig.update_layout( + height=1080, + width=1920 * num_loops, + title_text=f"Coroutine Performance Metrics: {input_file}", + showlegend=False, + ) + for col in range(1, num_loops + 1): + fig.update_xaxes(title_text="speedup (% optimized away)", row=4, col=col) + fig.update_xaxes(showticklabels=True, col=col) + fig.update_xaxes(showticklabels=True, row=2, col=col) + fig.update_xaxes(showticklabels=True, row=3, col=col) fig.update_yaxes(title_text="average latency (seconds)", row=1, col=1) fig.update_yaxes(title_text="throughput (handles per second)", row=2, col=1) fig.update_yaxes(title_text="maximum latency (seconds)", row=3, col=1) diff --git a/nemesis/nemesis.py b/nemesis/nemesis.py index 2e3837a..e593625 100755 --- a/nemesis/nemesis.py +++ b/nemesis/nemesis.py @@ -70,7 +70,7 @@ class Nemesis(object): experiment_time = None # results from previous experiments. Keys represent names of coroutines. - results = defaultdict(lambda: defaultdict(lambda: [])) + results = defaultdict(lambda: defaultdict(lambda: defaultdict(lambda: []))) # the file to write results to filename = None @@ -141,18 +141,11 @@ class Nemesis(object): latency = [] virtual_run_time = [] for loop in loops: - latency.extend(loop.get_completed_coros()) - pause_time = loop.get_pause_time() - virtual_run_time.append(Nemesis.experiment_time - pause_time) - - results = { - "latency": latency, - "virtual_run_time": virtual_run_time, - } + results = Nemesis.results[Nemesis.experiment_coro][Nemesis.experiment_spdp][loop._thread_id] + results.append((loop.get_completed_coros(), loop.get_pause_time())) print(f'Ran {Nemesis.experiment_coro} at {Nemesis.experiment_spdp} speed') - Nemesis.results[Nemesis.experiment_coro][Nemesis.experiment_spdp].append(results) del Nemesis.experiment_data @staticmethod |
