summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorbd <bdunahu@operationnull.com>2025-10-15 00:16:38 -0400
committerbd <bdunahu@operationnull.com>2025-10-15 00:16:38 -0400
commit1a439653264adc3d9648645d2bb4006f331c1722 (patch)
tree27a4d50f3fb1cd333b15ec372500dd78cfb296b9
parent16aa392a3c08c8769cc30bdbc1830a31f9b0808e (diff)
Fix a lot of bugs--core callbacks being delayed, reused handles
-rw-r--r--nemesis/causal_event_loop.py39
-rw-r--r--nemesis/html_gen.py218
-rwxr-xr-xnemesis/nemesis.py13
3 files changed, 163 insertions, 107 deletions
diff --git a/nemesis/causal_event_loop.py b/nemesis/causal_event_loop.py
index 2fe8435..b73458f 100644
--- a/nemesis/causal_event_loop.py
+++ b/nemesis/causal_event_loop.py
@@ -23,6 +23,7 @@ import collections
from sortedcontainers import SortedList
import heapq
import selectors
+import sys
import time
import traceback
from copy import copy
@@ -114,7 +115,7 @@ class CausalEventLoop(asyncio.SelectorEventLoop):
self._completed_coros.clear()
def get_completed_coros(self):
- return self._completed_coros
+ return copy(self._completed_coros)
def get_pause_time(self):
if not self._coro_intervals:
@@ -210,14 +211,16 @@ class CausalEventLoop(asyncio.SelectorEventLoop):
heapq.heappush(self._pause_buffer, handle)
# handle callbacks which can leave pause timeout
- heapq.heapify(self._pause_buffer)
- # print([x._when for x in self._pause_buffer])
while self._pause_buffer:
+ # required when handle's _when is modified in place
+ heapq.heapify(self._pause_buffer)
handle = self._pause_buffer[0]
+ for other in self._pause_buffer[1:]:
+ assert handle._when <= other._when, f"Heap root {handle} is not smallest"
if handle._when >= end_time:
break
# pop the first item in the list
- handle = self._pause_buffer.pop(0)
+ handle = heapq.heappop(self._pause_buffer)
# if we paused during buffering, we need to delay again
# TODO clean this up
@@ -271,8 +274,7 @@ class CausalEventLoop(asyncio.SelectorEventLoop):
except AssertionError as e:
print(f"Assertion failed: {e}")
sys.exit(1)
- if not self._is_blacklisted(handle):
- self._completed_coros.append((_format_handle(handle), wait_time))
+ self._completed_coros.append((_format_handle(handle), wait_time))
except Exception:
traceback.print_exc()
finally:
@@ -300,10 +302,15 @@ class CausalEventLoop(asyncio.SelectorEventLoop):
self._add_callback(writer)
def _call_soon(self, callback, args, context):
+ """Do not add 'callsoon' events to the pause buffer.
+ Add them directly to ready."""
+ curr_time = self.time()
handle = events.Handle(callback, args, self, context)
if handle._source_traceback:
del handle._source_traceback[-1]
- self._add_callback(handle)
+ if not handle._when:
+ handle._when = curr_time
+ self._ready.append(handle)
return handle
def call_soon_threadsafe(self, callback, *args, context=None):
@@ -325,9 +332,18 @@ class CausalEventLoop(asyncio.SelectorEventLoop):
curr_time = self.time()
if not handle._when:
handle._when = curr_time
+ # required in cases where the event loop reuses the same handle
+ if handle._when < curr_time:
+ handle._when = curr_time
if not handle._cancelled and handle not in self._pause_buffer:
- handle.time_entered_pause_buffer = curr_time
- heapq.heappush(self._pause_buffer, handle)
+ if self._is_vital(handle):
+ handle._when = curr_time
+ # print(f'{_format_handle(handle)}: {curr_time}')
+ self._ready.append(handle)
+ else:
+ # print(f'DELAYED {_format_handle(handle)}: {curr_time}')
+ handle.time_entered_pause_buffer = curr_time
+ heapq.heappush(self._pause_buffer, handle)
def _get_pause_for_io(self, handle, io_time):
@@ -370,8 +386,9 @@ class CausalEventLoop(asyncio.SelectorEventLoop):
overlap_end = min(a_end, b_end)
return overlap_end - overlap_start
- def _is_blacklisted(self, handle):
- blacklist = ['_read_from_self']
+ def _is_vital(self, handle):
+ """ Methods which cannot afford to be paused."""
+ blacklist = ['_read_from_self', '_read_ready']
cb = handle._callback
if isinstance(getattr(cb, '__self__', None), asyncio.tasks.Task):
if cb.__self__.get_coro().__name__ in blacklist:
diff --git a/nemesis/html_gen.py b/nemesis/html_gen.py
index 60d0c06..bffa748 100644
--- a/nemesis/html_gen.py
+++ b/nemesis/html_gen.py
@@ -1,6 +1,9 @@
import plotly.graph_objects as go
from plotly.subplots import make_subplots
import hashlib
+import math
+
+TRIM_PERCENT = 0.05
def get_color(name):
hash_object = hashlib.md5(name.encode())
@@ -8,92 +11,135 @@ def get_color(name):
return f'hsl({color_index}, 100%, 50%)'
def plot_results(results, output_file, input_file):
- fig = make_subplots(rows=4, cols=1)
-
- for i, (coro_name, x_values) in enumerate(results.items(), start=1):
- x_list = []
- y_latency_list = []
- y_throughput_list = []
- y_max_latency_list = []
- y_num_callbacks_list = []
- latency_hover_text = []
- max_latency_hover_text = []
-
- for speedup, experiments in x_values.items():
- for experiment in experiments:
-
- completed_callbacks = experiment["latency"]
- virtual_run_time = experiment["virtual_run_time"][0]
-
- x_list.append(speedup * 100)
-
- num_callbacks = len(completed_callbacks)
- y_num_callbacks_list.append(num_callbacks)
-
- # handle average latency graph
-
- if num_callbacks > 0:
-
- breakdown = "<br>".join([f" {cb[0]}: {round(cb[1], 4)}" for cb in completed_callbacks])
- total_wait = sum([cb[1] for cb in completed_callbacks])
- max_cb = max(completed_callbacks, key=lambda cb: cb[1])
-
- latency = total_wait / num_callbacks
-
- y_max_latency_list.append(max_cb[1])
- y_latency_list.append(latency)
-
- else:
- latency = 0
- y_latency_list.append(latency)
-
- latency_hover_text.append(f"{coro_name}<br>Speedup: {speedup}<br>Average Wait: {round(latency, 4)}<br>Breakdown:<br>{breakdown}")
- max_latency_hover_text.append(f"{coro_name}<br>Speedup: {speedup}<br>Max Wait: {round(max_cb[1], 4)}<br>Handle: {max_cb[0]}")
-
- # handle throughput graph
- throughput = num_callbacks / virtual_run_time
- y_throughput_list.append(throughput)
-
- fig.add_trace(go.Scatter(
- x=x_list,
- y=y_latency_list,
- mode='markers',
- name=coro_name,
- marker=dict(color=get_color(coro_name)),
- hovertext=latency_hover_text,
- showlegend=True,
- ), row=1, col=1)
-
- fig.add_trace(go.Scatter(
- x=x_list,
- y=y_throughput_list,
- mode='markers',
- name=coro_name,
- marker=dict(color=get_color(coro_name)),
- showlegend=False,
- ), row=2, col=1)
-
- fig.add_trace(go.Scatter(
- x=x_list,
- y=y_max_latency_list,
- mode='markers',
- name=coro_name,
- marker=dict(color=get_color(coro_name)),
- hovertext=max_latency_hover_text,
- showlegend=False,
- ), row=3, col=1)
-
- fig.add_trace(go.Scatter(
- x=x_list,
- y=y_num_callbacks_list,
- mode='markers',
- name=coro_name,
- marker=dict(color=get_color(coro_name)),
- showlegend=False,
- ), row=4, col=1)
-
- fig.update_layout(title=input_file)
- fig.update_xaxes(title_text="speedup (% optimized away)", row=4, col=1)
+ # determine the number of loops we have data for
+ total_loops = set()
+ for x_values in results.values():
+ for xx_values in x_values.values():
+ total_loops.update(xx_values.keys())
+ total_loops = sorted(total_loops)
+ num_loops = len(total_loops)
+ loop_to_col = {loop: idx + 1 for idx, loop in enumerate(total_loops)}
+
+ fig = make_subplots(
+ rows=4,
+ cols=num_loops,
+ subplot_titles=[f"{loop}" for loop in total_loops] * 4,
+ vertical_spacing=0.1,
+ horizontal_spacing=0.05,
+ shared_xaxes=True,
+ shared_yaxes=False,
+ )
+
+ for coro_name, x_values in results.items():
+ for speedup, xx_values in x_values.items():
+ for loop, experiments in xx_values.items():
+ col = loop_to_col[loop]
+
+ x_list = []
+ y_latency_list = []
+ y_throughput_list = []
+ y_max_latency_list = []
+ y_num_callbacks_list = []
+ latency_hover_text = []
+ max_latency_hover_text = []
+
+ for experiment in experiments:
+ completed_callbacks = experiment[0]
+ virtual_run_time = experiment[1]
+
+ x_val = speedup * 100
+ x_list.append(x_val)
+
+ num_callbacks = len(completed_callbacks)
+ y_num_callbacks_list.append(num_callbacks)
+
+ # handle average latency graph
+ if num_callbacks > 0:
+
+ trim_count = math.floor(num_callbacks * TRIM_PERCENT / 2)
+ sorted_callbacks = sorted(completed_callbacks, key=lambda cb: cb[1])
+
+ trimmed_callbacks = (
+ sorted_callbacks[trim_count: len(sorted_callbacks) - trim_count]
+ if trim_count > 0 else sorted_callbacks
+ )
+
+ trimmed_latencies = [cb[1] for cb in trimmed_callbacks]
+ latency = sum(trimmed_latencies) / len(trimmed_latencies)
+
+ y_latency_list.append(latency)
+
+ max_cb = max(completed_callbacks, key=lambda cb: cb[1])
+ y_max_latency_list.append(max_cb[1])
+
+ breakdown = "<br>".join([f" {cb[0]}: {round(cb[1], 4)}" for cb in trimmed_callbacks])
+ latency_hover_text.append(
+ f"{coro_name}<br>Speedup: {speedup}<br>Trimmed Average Wait ({int(TRIM_PERCENT*100)}%): {round(latency, 4)}<br>Breakdown:<br>{breakdown}"
+ )
+ max_latency_hover_text.append(
+ f"{coro_name}<br>Speedup: {speedup}<br>Max Wait: {round(max_cb[1], 4)}<br>Handle: {max_cb[0]}"
+ )
+
+ else:
+ latency = 0
+ y_latency_list.append(latency)
+
+ latency_hover_text.append(f"{coro_name}<br>Speedup: {speedup}<br>No callbacks")
+ max_latency_hover_text.append(f"{coro_name}<br>Speedup: {speedup}<br>No callbacks")
+
+ # handle throughput graph
+ throughput = num_callbacks / virtual_run_time if virtual_run_time else 0
+ y_throughput_list.append(throughput)
+
+ fig.add_trace(go.Scatter(
+ x=x_list,
+ y=y_latency_list,
+ mode='markers',
+ name=coro_name,
+ marker=dict(color=get_color(coro_name)),
+ # hovertext=latency_hover_text,
+ showlegend=False,
+ ), row=1, col=col)
+
+ fig.add_trace(go.Scatter(
+ x=x_list,
+ y=y_throughput_list,
+ mode='markers',
+ name=coro_name,
+ marker=dict(color=get_color(coro_name)),
+ showlegend=False,
+ ), row=2, col=col)
+
+ fig.add_trace(go.Scatter(
+ x=x_list,
+ y=y_max_latency_list,
+ mode='markers',
+ name=coro_name,
+ marker=dict(color=get_color(coro_name)),
+ # hovertext=max_latency_hover_text,
+ showlegend=False,
+ ), row=3, col=col)
+
+ fig.add_trace(go.Scatter(
+ x=x_list,
+ y=y_num_callbacks_list,
+ mode='markers',
+ name=coro_name,
+ marker=dict(color=get_color(coro_name)),
+ showlegend=False,
+ ), row=4, col=col)
+
+ fig.update_layout(
+ height=1080,
+ width=1920 * num_loops,
+ title_text=f"Coroutine Performance Metrics: {input_file}",
+ showlegend=False,
+ )
+ for col in range(1, num_loops + 1):
+ fig.update_xaxes(title_text="speedup (% optimized away)", row=4, col=col)
+ fig.update_xaxes(showticklabels=True, col=col)
+ fig.update_xaxes(showticklabels=True, row=2, col=col)
+ fig.update_xaxes(showticklabels=True, row=3, col=col)
fig.update_yaxes(title_text="average latency (seconds)", row=1, col=1)
fig.update_yaxes(title_text="throughput (handles per second)", row=2, col=1)
fig.update_yaxes(title_text="maximum latency (seconds)", row=3, col=1)
diff --git a/nemesis/nemesis.py b/nemesis/nemesis.py
index 2e3837a..e593625 100755
--- a/nemesis/nemesis.py
+++ b/nemesis/nemesis.py
@@ -70,7 +70,7 @@ class Nemesis(object):
experiment_time = None
# results from previous experiments. Keys represent names of coroutines.
- results = defaultdict(lambda: defaultdict(lambda: []))
+ results = defaultdict(lambda: defaultdict(lambda: defaultdict(lambda: [])))
# the file to write results to
filename = None
@@ -141,18 +141,11 @@ class Nemesis(object):
latency = []
virtual_run_time = []
for loop in loops:
- latency.extend(loop.get_completed_coros())
-
pause_time = loop.get_pause_time()
- virtual_run_time.append(Nemesis.experiment_time - pause_time)
-
- results = {
- "latency": latency,
- "virtual_run_time": virtual_run_time,
- }
+ results = Nemesis.results[Nemesis.experiment_coro][Nemesis.experiment_spdp][loop._thread_id]
+ results.append((loop.get_completed_coros(), loop.get_pause_time()))
print(f'Ran {Nemesis.experiment_coro} at {Nemesis.experiment_spdp} speed')
- Nemesis.results[Nemesis.experiment_coro][Nemesis.experiment_spdp].append(results)
del Nemesis.experiment_data
@staticmethod