summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorbd <bdunahu@operationnull.com>2025-10-06 18:27:09 -0400
committerbd <bdunahu@operationnull.com>2025-10-06 18:27:09 -0400
commit2edc08465723f444a1ef4108d41bac852f7be88a (patch)
tree53f5d1c4eca459c0c9784844b3d8c80bc4b03287
initial commit
-rw-r--r--.dir-locals.el4
-rw-r--r--.gitignore3
-rw-r--r--LICENSE202
-rw-r--r--nemesis/causal_event_loop.py309
-rw-r--r--nemesis/html_gen.py77
-rwxr-xr-xnemesis/nemesis.py345
-rw-r--r--run_tests.py15
7 files changed, 955 insertions, 0 deletions
diff --git a/.dir-locals.el b/.dir-locals.el
new file mode 100644
index 0000000..8f02dcf
--- /dev/null
+++ b/.dir-locals.el
@@ -0,0 +1,4 @@
+;;; Directory Local Variables -*- no-byte-compile: t -*-
+;;; For more information see (info "(emacs) Directory Variables")
+
+((python-mode . ((compile-command . "python3 ./run_tests.py"))))
diff --git a/.gitignore b/.gitignore
new file mode 100644
index 0000000..c9d0a01
--- /dev/null
+++ b/.gitignore
@@ -0,0 +1,3 @@
+__pycache__/
+t/manual/
+results.html
diff --git a/LICENSE b/LICENSE
new file mode 100644
index 0000000..d645695
--- /dev/null
+++ b/LICENSE
@@ -0,0 +1,202 @@
+
+ Apache License
+ Version 2.0, January 2004
+ http://www.apache.org/licenses/
+
+ TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
+
+ 1. Definitions.
+
+ "License" shall mean the terms and conditions for use, reproduction,
+ and distribution as defined by Sections 1 through 9 of this document.
+
+ "Licensor" shall mean the copyright owner or entity authorized by
+ the copyright owner that is granting the License.
+
+ "Legal Entity" shall mean the union of the acting entity and all
+ other entities that control, are controlled by, or are under common
+ control with that entity. For the purposes of this definition,
+ "control" means (i) the power, direct or indirect, to cause the
+ direction or management of such entity, whether by contract or
+ otherwise, or (ii) ownership of fifty percent (50%) or more of the
+ outstanding shares, or (iii) beneficial ownership of such entity.
+
+ "You" (or "Your") shall mean an individual or Legal Entity
+ exercising permissions granted by this License.
+
+ "Source" form shall mean the preferred form for making modifications,
+ including but not limited to software source code, documentation
+ source, and configuration files.
+
+ "Object" form shall mean any form resulting from mechanical
+ transformation or translation of a Source form, including but
+ not limited to compiled object code, generated documentation,
+ and conversions to other media types.
+
+ "Work" shall mean the work of authorship, whether in Source or
+ Object form, made available under the License, as indicated by a
+ copyright notice that is included in or attached to the work
+ (an example is provided in the Appendix below).
+
+ "Derivative Works" shall mean any work, whether in Source or Object
+ form, that is based on (or derived from) the Work and for which the
+ editorial revisions, annotations, elaborations, or other modifications
+ represent, as a whole, an original work of authorship. For the purposes
+ of this License, Derivative Works shall not include works that remain
+ separable from, or merely link (or bind by name) to the interfaces of,
+ the Work and Derivative Works thereof.
+
+ "Contribution" shall mean any work of authorship, including
+ the original version of the Work and any modifications or additions
+ to that Work or Derivative Works thereof, that is intentionally
+ submitted to Licensor for inclusion in the Work by the copyright owner
+ or by an individual or Legal Entity authorized to submit on behalf of
+ the copyright owner. For the purposes of this definition, "submitted"
+ means any form of electronic, verbal, or written communication sent
+ to the Licensor or its representatives, including but not limited to
+ communication on electronic mailing lists, source code control systems,
+ and issue tracking systems that are managed by, or on behalf of, the
+ Licensor for the purpose of discussing and improving the Work, but
+ excluding communication that is conspicuously marked or otherwise
+ designated in writing by the copyright owner as "Not a Contribution."
+
+ "Contributor" shall mean Licensor and any individual or Legal Entity
+ on behalf of whom a Contribution has been received by Licensor and
+ subsequently incorporated within the Work.
+
+ 2. Grant of Copyright License. Subject to the terms and conditions of
+ this License, each Contributor hereby grants to You a perpetual,
+ worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+ copyright license to reproduce, prepare Derivative Works of,
+ publicly display, publicly perform, sublicense, and distribute the
+ Work and such Derivative Works in Source or Object form.
+
+ 3. Grant of Patent License. Subject to the terms and conditions of
+ this License, each Contributor hereby grants to You a perpetual,
+ worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+ (except as stated in this section) patent license to make, have made,
+ use, offer to sell, sell, import, and otherwise transfer the Work,
+ where such license applies only to those patent claims licensable
+ by such Contributor that are necessarily infringed by their
+ Contribution(s) alone or by combination of their Contribution(s)
+ with the Work to which such Contribution(s) was submitted. If You
+ institute patent litigation against any entity (including a
+ cross-claim or counterclaim in a lawsuit) alleging that the Work
+ or a Contribution incorporated within the Work constitutes direct
+ or contributory patent infringement, then any patent licenses
+ granted to You under this License for that Work shall terminate
+ as of the date such litigation is filed.
+
+ 4. Redistribution. You may reproduce and distribute copies of the
+ Work or Derivative Works thereof in any medium, with or without
+ modifications, and in Source or Object form, provided that You
+ meet the following conditions:
+
+ (a) You must give any other recipients of the Work or
+ Derivative Works a copy of this License; and
+
+ (b) You must cause any modified files to carry prominent notices
+ stating that You changed the files; and
+
+ (c) You must retain, in the Source form of any Derivative Works
+ that You distribute, all copyright, patent, trademark, and
+ attribution notices from the Source form of the Work,
+ excluding those notices that do not pertain to any part of
+ the Derivative Works; and
+
+ (d) If the Work includes a "NOTICE" text file as part of its
+ distribution, then any Derivative Works that You distribute must
+ include a readable copy of the attribution notices contained
+ within such NOTICE file, excluding those notices that do not
+ pertain to any part of the Derivative Works, in at least one
+ of the following places: within a NOTICE text file distributed
+ as part of the Derivative Works; within the Source form or
+ documentation, if provided along with the Derivative Works; or,
+ within a display generated by the Derivative Works, if and
+ wherever such third-party notices normally appear. The contents
+ of the NOTICE file are for informational purposes only and
+ do not modify the License. You may add Your own attribution
+ notices within Derivative Works that You distribute, alongside
+ or as an addendum to the NOTICE text from the Work, provided
+ that such additional attribution notices cannot be construed
+ as modifying the License.
+
+ You may add Your own copyright statement to Your modifications and
+ may provide additional or different license terms and conditions
+ for use, reproduction, or distribution of Your modifications, or
+ for any such Derivative Works as a whole, provided Your use,
+ reproduction, and distribution of the Work otherwise complies with
+ the conditions stated in this License.
+
+ 5. Submission of Contributions. Unless You explicitly state otherwise,
+ any Contribution intentionally submitted for inclusion in the Work
+ by You to the Licensor shall be under the terms and conditions of
+ this License, without any additional terms or conditions.
+ Notwithstanding the above, nothing herein shall supersede or modify
+ the terms of any separate license agreement you may have executed
+ with Licensor regarding such Contributions.
+
+ 6. Trademarks. This License does not grant permission to use the trade
+ names, trademarks, service marks, or product names of the Licensor,
+ except as required for reasonable and customary use in describing the
+ origin of the Work and reproducing the content of the NOTICE file.
+
+ 7. Disclaimer of Warranty. Unless required by applicable law or
+ agreed to in writing, Licensor provides the Work (and each
+ Contributor provides its Contributions) on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ implied, including, without limitation, any warranties or conditions
+ of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
+ PARTICULAR PURPOSE. You are solely responsible for determining the
+ appropriateness of using or redistributing the Work and assume any
+ risks associated with Your exercise of permissions under this License.
+
+ 8. Limitation of Liability. In no event and under no legal theory,
+ whether in tort (including negligence), contract, or otherwise,
+ unless required by applicable law (such as deliberate and grossly
+ negligent acts) or agreed to in writing, shall any Contributor be
+ liable to You for damages, including any direct, indirect, special,
+ incidental, or consequential damages of any character arising as a
+ result of this License or out of the use or inability to use the
+ Work (including but not limited to damages for loss of goodwill,
+ work stoppage, computer failure or malfunction, or any and all
+ other commercial damages or losses), even if such Contributor
+ has been advised of the possibility of such damages.
+
+ 9. Accepting Warranty or Additional Liability. While redistributing
+ the Work or Derivative Works thereof, You may choose to offer,
+ and charge a fee for, acceptance of support, warranty, indemnity,
+ or other liability obligations and/or rights consistent with this
+ License. However, in accepting such obligations, You may act only
+ on Your own behalf and on Your sole responsibility, not on behalf
+ of any other Contributor, and only if You agree to indemnify,
+ defend, and hold each Contributor harmless for any liability
+ incurred by, or claims asserted against, such Contributor by reason
+ of your accepting any such warranty or additional liability.
+
+ END OF TERMS AND CONDITIONS
+
+ APPENDIX: How to apply the Apache License to your work.
+
+ To apply the Apache License to your work, attach the following
+ boilerplate notice, with the fields enclosed by brackets "[]"
+ replaced with your own identifying information. (Don't include
+ the brackets!) The text should be enclosed in the appropriate
+ comment syntax for the file format. We also recommend that a
+ file or class name and description of purpose be included on the
+ same "printed page" as the copyright notice for easier
+ identification within third-party archives.
+
+ Copyright [yyyy] [name of copyright owner]
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
diff --git a/nemesis/causal_event_loop.py b/nemesis/causal_event_loop.py
new file mode 100644
index 0000000..d01dfd4
--- /dev/null
+++ b/nemesis/causal_event_loop.py
@@ -0,0 +1,309 @@
+'''
+Copyright 2025 bdunahu
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+
+Commentary:
+
+Code:
+'''
+
+import asyncio
+import collections
+from sortedcontainers import SortedList
+import heapq
+import selectors
+import time
+import traceback
+from asyncio.log import logger
+from asyncio import Task, events
+from asyncio.base_events import _format_handle
+
+_MIN_SCHEDULED_TIMER_HANDLES = 100
+_MIN_CANCELLED_TIMER_HANDLES_FRACTION = 0.5
+MAXIMUM_SELECT_TIMEOUT = 24 * 3600
+
+
+class TimeAwareMixin:
+
+ # the timestamp this callback was registered
+ register_time = None
+ # the timestamp this callback completed i/o
+ io_time = None
+ # the timestamp this callback was called by the event loop
+ process_start_time = None
+
+ def __init__(self):
+ self.register_time = time.monotonic()
+
+
+orig_handle = asyncio.events.Handle
+
+
+def create_subclass(base_class):
+ class NewSubclass(base_class, TimeAwareMixin):
+ def __init__(self, *args, **kwargs):
+ super().__init__(*args, **kwargs)
+ TimeAwareMixin.__init__(self)
+
+ return NewSubclass
+
+
+# make all the subclasses inherit from TimeAwareHandle as well
+for sc in [orig_handle] + orig_handle.__subclasses__():
+ subclass = create_subclass(sc)
+ setattr(asyncio.events, sc.__name__, subclass)
+
+class CausalEventLoop(asyncio.SelectorEventLoop):
+
+ # a value between 0 and 1. 0 means no optimization,
+ # 1 means the target coroutine is optimized away entirely
+ _speedup = 1.0
+ # a list of callbacks which have recently completed
+ _pause_buffer = []
+ # a list of intervals in which the target coroutine has been active
+ _coro_intervals = SortedList()
+ # a list of completed callbacks, and their associated queue time
+ _completed_coros = []
+ # the last time we entered the target coro
+ _time_entered_coro = None
+
+ def __init__(self) -> None:
+ super().__init__()
+
+ def set_speedup(self, speedup):
+ self._speedup = speedup
+
+ # reset experiment counters
+ self._time_entered_coro = None
+ self._coro_intervals.clear()
+ self._completed_coros.clear()
+
+ def get_completed_coros(self):
+ return self._completed_coros
+
+ def get_pause_time(self):
+ if not self._coro_intervals:
+ return 0
+
+ start_interval = self._coro_intervals[0][0]
+ end_interval = self.time() if self._time_entered_coro else self._coro_intervals[-1][1]
+ interval = (start_interval, end_interval)
+ return self._get_pause_time(interval)
+
+ def ping_enter_coro(self):
+ self._time_entered_coro = self.time()
+
+ def ping_exit_coro(self):
+ assert isinstance(self._time_entered_coro, float), f"Tried to exit coro before recorded entry!"
+ self._coro_intervals.add((self._time_entered_coro, self.time()))
+ self._time_entered_coro = None
+
+ def update_ready(self, can_stall=True):
+ '''
+ Polls the IO selector, schedules resulting callbacks, and schedules
+ 'call_later' callbacks.
+
+ This function can be called in the middle of an event loop iteration.
+
+ This logic was separated out of `run_once` so that the list of `ready`
+ tasks may be updated more frequently than once per iteration.
+
+ If SAMPLING is true, the timeout passed to the selector will always be 0.
+ '''
+ curr_time = self.time()
+ sched_count = len(self._scheduled)
+ if (sched_count > _MIN_SCHEDULED_TIMER_HANDLES and
+ self._timer_cancelled_count / sched_count >
+ _MIN_CANCELLED_TIMER_HANDLES_FRACTION):
+ # Remove delayed calls that were cancelled if their number
+ # is too high
+ new_scheduled = []
+ for handle in self._scheduled:
+ if handle._cancelled:
+ handle._scheduled = False
+ else:
+ new_scheduled.append(handle)
+
+ heapq.heapify(new_scheduled)
+ self._scheduled = new_scheduled
+ self._timer_cancelled_count = 0
+ else:
+ # Remove delayed calls that were cancelled from head of queue.
+ while self._scheduled and self._scheduled[0]._cancelled:
+ self._timer_cancelled_count -= 1
+ handle = heapq.heappop(self._scheduled)
+ handle._scheduled = False
+
+ timeout = None
+ # TODO this needs to be rewritten
+ # We can't miss things placed in timeout either
+ # if not can_stall or self._ready or self._stopping:
+ timeout = 0
+ # elif self._scheduled:
+ # # Compute the desired timeout.
+ # timeout = self._scheduled[0]._when - self.time()
+ # if timeout > MAXIMUM_SELECT_TIMEOUT:
+ # timeout = MAXIMUM_SELECT_TIMEOUT
+ # elif timeout < 0:
+ # timeout = 0
+
+ event_list = self._selector.select(timeout)
+ self._process_events(event_list)
+ # Needed to break cycles when an exception occurs.
+ event_list = None
+
+ # Handle 'later' callbacks that are ready.
+ end_time = self.time() + self._clock_resolution
+ while self._scheduled:
+ handle = self._scheduled[0]
+ if handle._when >= end_time:
+ break
+ handle = heapq.heappop(self._scheduled)
+ handle._scheduled = False
+
+ time_interval = (handle.register_time, curr_time)
+ time_to_buffer = curr_time + self._get_pause_time(time_interval)
+ handle.io_time = time_to_buffer
+ self._ready.append(handle)
+
+ def _run_once(self):
+ """
+ Run one full iteration of the event loop.
+
+ This calls all currently ready callbacks.
+ """
+ self.update_ready()
+
+ current_time = self.time()
+ to_process = collections.deque([
+ handle for handle in self._ready
+ if handle.io_time < (current_time + self._clock_resolution)
+ ])
+
+ # This is the only place where callbacks are actually *called*.
+ # All other places just add them to ready.
+ # Note: We run all currently scheduled callbacks, but not any
+ # callbacks scheduled by callbacks run this time around --
+ # they will be run the next time (after another I/O poll).
+ # Use an idiom that is thread-safe without using locks.
+ ntodo = len(to_process)
+ for i in range(ntodo):
+ handle = to_process.popleft()
+ self._ready.remove(handle)
+ if handle._cancelled:
+ continue
+ try:
+ self._current_handle = handle
+
+ process_start_time = self.time()
+ handle.process_start_time = process_start_time
+
+ handle._run()
+
+ process_end_time = self.time()
+ dt = process_end_time - process_start_time
+ if self._debug and dt >= self.slow_callback_duration:
+ logger.warning('Executing %s took %.3f seconds',
+ _format_handle(handle), dt)
+
+ time_interval = (handle.io_time, process_start_time)
+ pause_time = self._get_pause_time(time_interval)
+ adjusted_start_time = handle.process_start_time - \
+ pause_time
+ wait_time = adjusted_start_time - handle.io_time
+ assert wait_time >= -0.0001, f"wait time on {_format_handle(handle)} was found to be {wait_time:.4f}!"
+ self._completed_coros.append((_format_handle(handle), wait_time))
+ except Exception:
+ traceback.print_exc()
+ finally:
+ self._current_handle = None
+ handle = None # Needed to break cycles when an exception occurs.
+
+ def _process_events(self, event_list):
+ curr_time = self.time()
+ for key, mask in event_list:
+ fileobj, (reader, writer) = key.fileobj, key.data
+ if mask & selectors.EVENT_READ and reader is not None:
+ if reader._cancelled:
+ self._remove_reader(fileobj)
+ else:
+ time_interval = (reader.register_time, curr_time)
+ time_to_buffer = curr_time + \
+ self._get_pause_time(time_interval)
+ reader.io_time = time_to_buffer
+ self._add_callback(reader)
+ if mask & selectors.EVENT_WRITE and writer is not None:
+ if writer._cancelled:
+ self._remove_writer(fileobj)
+ else:
+ time_interval = (writer.register_time, curr_time)
+ time_to_buffer = curr_time + \
+ self._get_pause_time(time_interval)
+ writer.io_time = time_to_buffer
+ self._add_callback(writer)
+
+ def _call_soon(self, callback, args, context):
+ handle = events.Handle(callback, args, self, context)
+ if handle._source_traceback:
+ del handle._source_traceback[-1]
+ handle.io_time = self.time()
+ self._ready.append(handle)
+ return handle
+
+ def call_soon_threadsafe(self, callback, *args, context=None):
+ """Like call_soon(), but thread-safe."""
+ self._check_closed()
+ if self._debug:
+ self._check_callback(callback, 'call_soon_threadsafe')
+ handle = events._ThreadSafeHandle(callback, args, self, context)
+ handle.io_time = self.time()
+ self._ready.append(handle)
+ if handle._source_traceback:
+ del handle._source_traceback[-1]
+ if handle._source_traceback:
+ del handle._source_traceback[-1]
+ self._write_to_self()
+ return handle
+
+ def _get_pause_time(self, cb_interval):
+ time = 0
+ start, end = cb_interval
+
+ for coro_start, coro_end in self._coro_intervals:
+ if start < coro_end and coro_start < end:
+ time += self._get_overlap(start, end, coro_start, coro_end)
+ # coro_intervals are sorted, so by this time all overlap has passed
+ if end < coro_start:
+ break
+
+ curr_time = self.time()
+ if self._time_entered_coro and \
+ start < curr_time and self._time_entered_coro < end:
+ time += self._get_overlap(start, end, self._time_entered_coro,
+ curr_time)
+
+ return time * self._speedup
+
+ def _get_overlap(self, a_start, a_end, b_start, b_end):
+ overlap_start = max(a_start, b_start)
+ overlap_end = min(a_end, b_end)
+ return overlap_end - overlap_start
+
+
+class CausalEventLoopPolicy(asyncio.DefaultEventLoopPolicy):
+ def new_event_loop(self):
+ return CausalEventLoop()
+
+
+asyncio.set_event_loop_policy(CausalEventLoopPolicy())
diff --git a/nemesis/html_gen.py b/nemesis/html_gen.py
new file mode 100644
index 0000000..b774da9
--- /dev/null
+++ b/nemesis/html_gen.py
@@ -0,0 +1,77 @@
+import plotly.graph_objects as go
+from plotly.subplots import make_subplots
+import hashlib
+
+def get_color(name):
+ hash_object = hashlib.md5(name.encode())
+ color_index = int(hash_object.hexdigest(), 16) % 360
+ return f'hsl({color_index}, 100%, 50%)'
+
+def plot_results(results, filename):
+ fig = make_subplots(rows=3, cols=1)
+
+ for i, (coro_name, x_values) in enumerate(results.items(), start=1):
+ x_list = []
+ y_latency_list = []
+ y_throughput_list = []
+ y_max_latency_list = []
+
+ for speedup, experiments in x_values.items():
+ for experiment in experiments:
+
+ completed_callbacks = experiment["latency"]
+ virtual_run_time = experiment["virtual_run_time"][0]
+
+ x_list.append(speedup * 100)
+
+ num_callbacks = len(completed_callbacks)
+
+ # handle average latency graph
+ if num_callbacks > 0:
+
+ total_wait = sum([cb[1] for cb in completed_callbacks])
+ max_wait = max([cb[1] for cb in completed_callbacks])
+ latency = total_wait / num_callbacks
+
+ y_max_latency_list.append(max_wait)
+ y_latency_list.append(latency)
+ else:
+ y_latency_list.append(0)
+
+ # handle throughput graph
+ throughput = num_callbacks / virtual_run_time
+ y_throughput_list.append(throughput)
+
+ fig.add_trace(go.Scatter(
+ x=x_list,
+ y=y_latency_list,
+ mode='markers',
+ name=coro_name,
+ marker=dict(color=get_color(coro_name)),
+ showlegend=True,
+ ), row=1, col=1)
+
+ fig.add_trace(go.Scatter(
+ x=x_list,
+ y=y_throughput_list,
+ mode='markers',
+ name=coro_name,
+ marker=dict(color=get_color(coro_name)),
+ showlegend=False,
+ ), row=2, col=1)
+
+ fig.add_trace(go.Scatter(
+ x=x_list,
+ y=y_max_latency_list,
+ mode='markers',
+ name=coro_name,
+ marker=dict(color=get_color(coro_name)),
+ showlegend=False,
+ ), row=3, col=1)
+
+ fig.update_xaxes(title_text="Speedup (% optimized away)", row=3, col=1)
+ fig.update_yaxes(title_text="Average Handle Latency (seconds)", row=1, col=1)
+ fig.update_yaxes(title_text="Throughput (callbacks per second)", row=2, col=1)
+ fig.update_yaxes(title_text="Maximum Handle Latency (seconds)", row=3, col=1)
+
+ fig.write_html(filename)
diff --git a/nemesis/nemesis.py b/nemesis/nemesis.py
new file mode 100755
index 0000000..27e93de
--- /dev/null
+++ b/nemesis/nemesis.py
@@ -0,0 +1,345 @@
+#!/usr/bin/env python3
+'''
+ _/ _/ _/
+ _/_/ _/ _/_/ _/_/_/ _/_/ _/_/ _/_/_/ _/_/_/
+ _/ _/ _/ _/_/_/_/ _/ _/ _/ _/_/_/_/ _/_/ _/ _/_/
+ _/ _/_/ _/ _/ _/ _/ _/ _/_/ _/ _/_/
+ _/ _/ _/_/_/ _/ _/ _/ _/_/_/ _/_/_/ _/ _/_/_/
+
+
+Copyright 2025 bdunahu
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+
+Commentary:
+Code:
+'''
+
+from causal_event_loop import CausalEventLoop
+from collections import defaultdict
+from html_gen import plot_results
+import argparse
+import asyncio
+import os
+import random
+import signal
+import sys
+import threading
+import time
+import traceback
+import types
+
+class Experiment:
+
+ # event loops participating in this this experiment
+ _loops = []
+
+ def __init__(self, loops):
+ self._loops = loops
+
+ def get_loops(self):
+ return [l for l in self._loops if l.is_running()]
+
+class Nemesis(object):
+
+ # the (ideal) interval between samples
+ signal_interval = 0.0
+ # the timestamp which the last sample was taken
+ last_sample = None
+
+ # the current experiment being run
+ experiment_data = None
+ # the coroutine the current experiment is speeding up
+ experiment_coro = None
+ # the speedup of the current experiment
+ experiment_spdp = None
+ # the total time this experiment has been running
+ experiment_time = None
+
+ # results from previous experiments. Keys represent names of coroutines.
+ results = defaultdict(lambda: defaultdict(lambda: []))
+
+ # the file to write results to
+ filename = None
+
+ # The base duration of each performance experiment
+ e_duration = None
+
+ # A mapping of event loops to the previous running coroutine.
+ prev_coro = defaultdict(lambda: None)
+
+ @staticmethod
+ def __init__(e_duration, filename, signal_interval=0.01):
+ Nemesis.signal_interval = signal_interval
+ Nemesis.e_duration = e_duration
+ Nemesis.filename = filename
+
+ @staticmethod
+ def start():
+ Nemesis.last_sample = time.perf_counter()
+ signal.signal(signal.SIGALRM,
+ Nemesis._signal_handler)
+ signal.setitimer(signal.ITIMER_REAL,
+ Nemesis.signal_interval,
+ Nemesis.signal_interval)
+
+ @staticmethod
+ def print_results():
+ for coro_name, x_values in Nemesis.results.items():
+ print(f'Results for {coro_name:}')
+ for speedup, experiments in x_values.items():
+ print(f' {speedup * 100}% speedup:')
+ for experiment in experiments:
+ num_callbacks = len(experiment)
+ if num_callbacks > 0:
+ total_wait = sum([cb[1] for cb in experiment])
+ latency = total_wait / num_callbacks
+ print(f' latency: {latency}')
+ print(f' callbacks processed: {num_callbacks}')
+ print(f'')
+
+ @staticmethod
+ def stop():
+ signal.setitimer(signal.ITIMER_REAL, 0)
+ plot_results(Nemesis.results, Nemesis.filename)
+ print(f"Wrote {Nemesis.filename}")
+
+ @staticmethod
+ def _start_experiment(coro, speedup):
+ Nemesis.prev_coro = defaultdict(lambda: None)
+ Nemesis.experiment_coro = coro
+ Nemesis.experiment_spdp = speedup
+ Nemesis.experiment_time = 0
+
+ loops = Nemesis._get_event_loops()
+ for loop in loops:
+ if not isinstance(loop, CausalEventLoop):
+ raise RuntimeError("Nemesis requires a custom event loop to insert slowdowns. It does not work on programs which change the event loop policy.")
+ loop.set_speedup(speedup)
+
+ Nemesis.experiment_data = Experiment(loops)
+
+ @staticmethod
+ def _stop_experiment():
+ if Nemesis.experiment_data is not None:
+ loops = Nemesis.experiment_data.get_loops()
+
+ latency = []
+ virtual_run_time = []
+ for loop in loops:
+ latency.extend(loop.get_completed_coros())
+
+ pause_time = loop.get_pause_time()
+ virtual_run_time.append(Nemesis.experiment_time - pause_time)
+
+ results = {
+ "latency": latency,
+ "virtual_run_time": virtual_run_time,
+ }
+
+ print(f'Ran {Nemesis.experiment_coro} at {Nemesis.experiment_spdp} speed')
+ Nemesis.results[Nemesis.experiment_coro][Nemesis.experiment_spdp].append(results)
+ del Nemesis.experiment_data
+
+ @staticmethod
+ def _signal_handler(sig, frame):
+ curr_sample = time.perf_counter()
+ passed_time = curr_sample - Nemesis.last_sample
+ Nemesis.last_sample = curr_sample
+
+ if getattr(Nemesis, 'experiment_data', None):
+ loops = Nemesis.experiment_data.get_loops()
+ exp_coro = Nemesis.experiment_coro
+ for loop in loops:
+ coro = Nemesis._get_current_coro(loop)
+ prev_coro = Nemesis.prev_coro[loop]
+ if not prev_coro == coro:
+ if prev_coro == exp_coro:
+ loop.ping_exit_coro()
+ elif coro == exp_coro:
+ loop.ping_enter_coro()
+ Nemesis.prev_coro[loop] = coro
+
+ loop.update_ready(False)
+
+ Nemesis.experiment_time += passed_time
+ if (Nemesis.e_duration <= Nemesis.experiment_time):
+ Nemesis._stop_experiment()
+
+ else:
+ coros = []
+ loops = Nemesis._get_event_loops()
+ for loop in loops:
+ coro = Nemesis._get_current_coro(loop)
+ if coro is not None:
+ coros.append(coro)
+ if coros:
+ Nemesis._start_experiment(random.choice(coros),
+ Nemesis._select_speedup())
+
+ @staticmethod
+ def _parse_handle(handle):
+ cb = handle._callback
+ if isinstance(getattr(cb, '__self__', None), asyncio.tasks.Task):
+ task = cb.__self__
+ coro = task.get_coro()
+ return [task.get_name(), Nemesis._get_coro_name(coro)]
+ else:
+ return [str(type(handle).__name__), cb.__name__]
+
+ @staticmethod
+ def _get_current_coro(loop):
+ tid = loop._thread_id
+ assert tid, f"{loop} is not running, yet we attempted to sample it!"
+
+ frame = sys._current_frames().get(tid)
+ fname = frame.f_code.co_filename
+ while not Nemesis._should_trace(fname):
+ if frame:
+ frame = frame.f_back
+ else:
+ break
+ if frame:
+ fname = frame.f_code.co_filename
+ if frame and frame.f_generator:
+ return frame.f_generator.cr_code.co_name
+ return None
+
+ @staticmethod
+ def _get_event_loops():
+ '''Returns each thread's event loop, if it exists.'''
+ loops = []
+ for t in threading.enumerate():
+ frame = sys._current_frames().get(t.ident)
+ if frame:
+ loop = Nemesis._walk_back_until_loop(frame)
+ if loop and loop not in loops:
+ loops.append(loop)
+ return loops
+
+ @staticmethod
+ def _walk_back_until_loop(frame):
+ '''Walks back the callstack until we are in a method named '_run_once'.
+ If this is ever true, we assume we are in an Asyncio event loop method,
+ and check to see if the 'self' variable is indeed and instance of
+ AbstractEventLoop. Return this variable if true.'''
+ while frame:
+ if frame.f_code.co_name == '_run_once' and 'self' in frame.f_locals:
+ loop = frame.f_locals['self']
+ if isinstance(loop, asyncio.AbstractEventLoop):
+ return loop
+ else:
+ frame = frame.f_back
+ return None
+
+ @staticmethod
+ def _should_trace(filename):
+ '''Returns FALSE if filename is uninteresting to the user.
+ Don't depend on this. It kind of sucks.'''
+ if not filename:
+ return False
+ if '/gnu/store' in filename:
+ return False
+ if '/usr/local/lib/python' in filename:
+ return False
+ if 'site-packages' in filename:
+ return False
+ if 'propcache' in filename:
+ return False
+ if '.pyx' in filename:
+ return False
+ if filename[0] == '<':
+ return False
+ if 'nemesis' in filename:
+ return False
+ return True
+
+ def _select_speedup():
+ '''
+ Returns a random speedup between 0% to 100%, in multiples of 5%.
+ Because a baseline is needed to calculate effect on program
+ performance, selects a speedup of 0 with 50% probability.
+ '''
+ r1 = random.randint(-19, 20)
+ return max(0, r1) / 20
+
+ @staticmethod
+ def _get_coro_name(coro):
+ '''
+ Stolen from _format_coroutine in cpython/Lib/asyncio/coroutines.py
+ '''
+ # Coroutines compiled with Cython sometimes don't have
+ # proper __qualname__ or __name__. While that is a bug
+ # in Cython, asyncio shouldn't crash with an AttributeError
+ # in its __repr__ functions.
+ if hasattr(coro, '__qualname__') and coro.__qualname__:
+ coro_name = coro.__qualname__
+ elif hasattr(coro, '__name__') and coro.__name__:
+ coro_name = coro.__name__
+ else:
+ # Stop masking Cython bugs, expose them in a friendly way.
+ coro_name = f'<{type(coro).__name__} without __name__>'
+ return f'{coro_name}()'
+
+the_globals = {
+ '__name__': '__main__',
+ '__doc__': None,
+ '__package__': None,
+ '__loader__': globals()['__loader__'],
+ '__spec__': None,
+ '__annotations__': {},
+ '__builtins__': globals()['__builtins__'],
+ '__file__': None,
+ '__cached__': None,
+}
+
+
+if __name__ == "__main__":
+ # parses CLI arguments and facilitates profiler runtime.
+ parser = argparse.ArgumentParser(
+ usage='%(prog)s [args] -- prog'
+ )
+
+ parser.add_argument('-i', '--interval',
+ help='The minimum amount of time inbetween \
+ samples in seconds.',
+ metavar='',
+ type=float,
+ default=0.01)
+ parser.add_argument('-e', '--experiment-duration',
+ help='The performance experiment duration. Defaults to 3 seconds.',
+ metavar='',
+ type=float,
+ default=3)
+ parser.add_argument('-f', '--filename',
+ help='The filename to write results to.',
+ metavar='',
+ type=str,
+ default="results.html")
+ parser.add_argument('prog',
+ type=str,
+ nargs='*',
+ help='Path to the python script and its arguments.')
+ args = parser.parse_args()
+
+ sys.argv = args.prog
+ try:
+ with open(args.prog[0], 'r', encoding='utf-8') as fp:
+ code = compile(fp.read(), args.prog[0], "exec")
+ Nemesis(args.experiment_duration,
+ args.filename,
+ args.interval).start()
+ exec(code, the_globals)
+ Nemesis.stop()
+ except Exception:
+ traceback.print_exc()
diff --git a/run_tests.py b/run_tests.py
new file mode 100644
index 0000000..bbb0a6d
--- /dev/null
+++ b/run_tests.py
@@ -0,0 +1,15 @@
+#!/usr/bin/env python3
+import unittest
+import sys
+
+if __name__ == '__main__':
+
+ sys.path.append('t/')
+ sys.path.append('nemesis/')
+ t_loader = unittest.defaultTestLoader
+ t_runner = unittest.TextTestRunner(verbosity=2)
+ t = []
+
+ t_suite = t_loader.loadTestsFromNames(t)
+ result = t_runner.run(t_suite)
+ sys.exit(not result.wasSuccessful())