summaryrefslogtreecommitdiff
path: root/nemesis
diff options
context:
space:
mode:
Diffstat (limited to 'nemesis')
-rw-r--r--nemesis/__init__.py0
-rw-r--r--nemesis/causal_event_loop.py159
-rw-r--r--nemesis/experiment.py92
-rwxr-xr-xnemesis/nemesis.py212
-rwxr-xr-xnemesis/nemesis.py.bkup186
5 files changed, 649 insertions, 0 deletions
diff --git a/nemesis/__init__.py b/nemesis/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/nemesis/__init__.py
diff --git a/nemesis/causal_event_loop.py b/nemesis/causal_event_loop.py
new file mode 100644
index 0000000..4320655
--- /dev/null
+++ b/nemesis/causal_event_loop.py
@@ -0,0 +1,159 @@
+import asyncio
+import collections
+import heapq
+import time
+from asyncio.log import logger
+from asyncio.base_events import _format_handle
+
+_MIN_SCHEDULED_TIMER_HANDLES = 100
+_MIN_CANCELLED_TIMER_HANDLES_FRACTION = 0.5
+MAXIMUM_SELECT_TIMEOUT = 24 * 3600
+
+class CausalEventLoop(asyncio.SelectorEventLoop):
+ '''
+ Internal time in this event loop may run faster or slower than objective time.
+ Control loop time dilation vs. objective time by setting the ``time_dilation`` attribute
+ (dilation == X means that loop time is running X times faster than objective time).
+ '''
+
+ def __init__(self) -> None:
+ super().__init__()
+ self._last_objective_time = None
+ self._last_subjective_time = None
+ self._processing = collections.deque()
+ self._time_dilation = 1.0
+ self._task_to_optimize = None
+
+ _select = self._selector.select
+ def select(timeout: float):
+ return _select(timeout / self._time_dilation)
+ self._selector.select = select
+
+ def _set_experiment(self, task, dilation):
+ self._task_to_optimize = task
+ self._time_dilation = dilation
+
+ def _matches_task(self, handle):
+ cb = handle._callback
+ if isinstance(getattr(cb, '__self__', None), asyncio.tasks.Task):
+ task = cb.__self__
+ if task.get_name() == self._task_to_optimize:
+ return True
+ return False
+
+ def _update_ready(self, sampling=False):
+ """
+ Polls the IO selector, schedules resulting callbacks, and schedules
+ 'call_later' callbacks.
+
+ This logic was separated out of `run_once` so that the list of `ready`
+ tasks may be updated more frequently than once per iteration.
+
+ If SAMPLING is true, the timeout passed to the selector will always be 0.
+ """
+ sched_count = len(self._scheduled)
+ if (sched_count > _MIN_SCHEDULED_TIMER_HANDLES and
+ self._timer_cancelled_count / sched_count >
+ _MIN_CANCELLED_TIMER_HANDLES_FRACTION):
+ # Remove delayed calls that were cancelled if their number
+ # is too high
+ new_scheduled = []
+ for handle in self._scheduled:
+ if handle._cancelled:
+ handle._scheduled = False
+ else:
+ new_scheduled.append(handle)
+
+ heapq.heapify(new_scheduled)
+ self._scheduled = new_scheduled
+ self._timer_cancelled_count = 0
+ else:
+ # Remove delayed calls that were cancelled from head of queue.
+ while self._scheduled and self._scheduled[0]._cancelled:
+ self._timer_cancelled_count -= 1
+ handle = heapq.heappop(self._scheduled)
+ handle._scheduled = False
+
+ timeout = None
+ if sampling or self._ready or self._stopping:
+ timeout = 0
+ elif self._scheduled:
+ # Compute the desired timeout.
+ timeout = self._scheduled[0]._when - self.time()
+ if timeout > MAXIMUM_SELECT_TIMEOUT:
+ timeout = MAXIMUM_SELECT_TIMEOUT
+ elif timeout < 0:
+ timeout = 0
+
+ event_list = self._selector.select(timeout)
+ self._process_events(event_list)
+ # Needed to break cycles when an exception occurs.
+ event_list = None
+
+ # Handle 'later' callbacks that are ready.
+ end_time = self.time() + self._clock_resolution
+ while self._scheduled:
+ handle = self._scheduled[0]
+ if handle._when >= end_time:
+ break
+ handle = heapq.heappop(self._scheduled)
+ handle._scheduled = False
+ self._ready.append(handle)
+
+ def _run_once(self):
+ """Run one full iteration of the event loop.
+
+ This calls all currently ready callbacks.
+ """
+ self._update_ready()
+ self._processing.extend(self._ready)
+ # This is the only place where callbacks are actually *called*.
+ # All other places just add them to ready.
+ # Note: We run all currently scheduled callbacks, but not any
+ # callbacks scheduled by callbacks run this time around --
+ # they will be run the next time (after another I/O poll).
+ # Use an idiom that is thread-safe without using locks.
+ ntodo = len(self._processing)
+ for i in range(ntodo):
+ handle = self._processing.popleft()
+ try:
+ self._ready.remove(handle)
+ except ValueError:
+ pass
+ if handle._cancelled:
+ continue
+ try:
+ self._current_handle = handle
+ t0 = super().time()
+ handle._run()
+ dt = super().time() - t0
+ if self._debug and dt >= self.slow_callback_duration:
+ logger.warning('Executing %s took %.3f seconds',
+ _format_handle(handle), dt * 1/self._time_dilation)
+ if not self._matches_task(handle):
+ t0 = super().time()
+ # insert a delay
+ while super().time() - t0 < dt * (1/self._time_dilation - 1):
+ time.sleep(0.001)
+ finally:
+ self._current_handle = None
+ handle = None # Needed to break cycles when an exception occurs.
+
+ def time(self):
+ obj = super().time()
+ if self._last_objective_time is None:
+ self._last_objective_time = self._last_subjective_time = obj
+ return obj
+ else:
+ sub = self._last_subjective_time + (obj - self._last_objective_time) * self._time_dilation
+ self._last_subjective_time = sub
+ self._last_objective_time = obj
+ return sub
+
+
+class CausalEventLoopPolicy(asyncio.DefaultEventLoopPolicy):
+ def new_event_loop(self):
+ return CausalEventLoop()
+
+
+asyncio.set_event_loop_policy(CausalEventLoopPolicy())
diff --git a/nemesis/experiment.py b/nemesis/experiment.py
new file mode 100644
index 0000000..87a0103
--- /dev/null
+++ b/nemesis/experiment.py
@@ -0,0 +1,92 @@
+from collections import defaultdict, namedtuple
+import asyncio
+import sys
+import threading
+from causal_event_loop import CausalEventLoop
+
+class BadLoopTypeException(Exception):
+ pass
+
+class Experiment:
+
+ # the selected task for this experiment
+ _task = None
+ # the selected speedup for this experiment
+ _speedup = None
+ # event loops participating in this this experiment
+ loops = []
+ # a key-value pair where keys represent a handle and values
+ # represent number of times sampled.
+ _samples = None
+
+ def __init__(self, task, speedup):
+ self._task = task
+ self._speedup = speedup
+ self._samples = defaultdict(lambda: 0)
+
+ self._set_loops()
+
+ def get_task(self):
+ return self._task
+
+ def get_speedup(self):
+ return self._speedup
+
+ def get_results(self):
+ ret = f"Results for {self._task} at {self._speedup} times speedup:\n"
+ if len(self._samples) > 0:
+ ret += (f" {'HANDLE':<30} {'LOOP':<10} {'SEC':<10}")
+ for key in self._sort_samples(self._samples):
+ ret += f"\n {self._get_sample_str(key)}"
+ else:
+ ret += " No samples were gathered. (This is odd!)"
+ return ret
+
+ def get_loops(self):
+ return [l for l in self._loops if l.is_running()]
+
+ def add_handles(self, handles, loop, time):
+ for h in handles:
+ self._samples[h.__str__()] += time
+
+ def _set_loops(self):
+ self._loops = self._get_event_loops()
+ for l in self._loops:
+ if not isinstance(l, CausalEventLoop):
+ raise BadLoopTypeException("Nemesis requires a custom event loop to insert slowdowns. It does not work on programs which change the event loop policy.")
+ l._set_experiment(self._task, 1.0 / self._speedup)
+
+ def _get_event_loops(self):
+ '''Returns each thread's event loop, if it exists.'''
+ loops = []
+ for t in threading.enumerate():
+ frame = sys._current_frames().get(t.ident)
+ if frame:
+ loop = self._walk_back_until_loop(frame)
+ if loop and loop not in loops:
+ loops.append(loop)
+ return loops
+
+ def _walk_back_until_loop(self, frame):
+ '''Walks back the callstack until we are in a method named '_run_once'.
+ If this is ever true, we assume we are in an Asyncio event loop method,
+ and check to see if the 'self' variable is indeed and instance of
+ AbstractEventLoop. Return this variable if true.'''
+ while frame:
+ if frame.f_code.co_name == '_run_once' and 'self' in frame.f_locals:
+ loop = frame.f_locals['self']
+ if isinstance(loop, asyncio.AbstractEventLoop):
+ return loop
+ else:
+ frame = frame.f_back
+ return None
+
+ def _get_sample_str(self, key):
+ value = self._samples[key] / self._speedup
+ return f"{key:29} {value:10}"
+
+ def _sort_samples(self, sample_dict):
+ '''Returns SAMPLE_DICT in descending order by number of samples.'''
+ return {k: v for k, v in sorted(sample_dict.items(),
+ key=lambda item: item[1],
+ reverse=True)}
diff --git a/nemesis/nemesis.py b/nemesis/nemesis.py
new file mode 100755
index 0000000..eaf41f3
--- /dev/null
+++ b/nemesis/nemesis.py
@@ -0,0 +1,212 @@
+#!/usr/bin/env python3
+'''
+ _/ _/ _/
+ _/_/ _/ _/_/ _/_/_/ _/_/ _/_/ _/_/_/ _/_/_/
+ _/ _/ _/ _/_/_/_/ _/ _/ _/ _/_/_/_/ _/_/ _/ _/_/
+ _/ _/_/ _/ _/ _/ _/ _/ _/_/ _/ _/_/
+ _/ _/ _/_/_/ _/ _/ _/ _/_/_/ _/_/_/ _/ _/_/_/
+
+
+Copyright 2025 bdunahu
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+
+Commentary:
+Code:
+'''
+
+import argparse
+import asyncio
+import signal
+import sys
+import traceback
+import time
+import types
+import os
+from experiment import Experiment
+from asyncio.base_events import _format_handle
+
+
+class Nemesis(object):
+
+ # the (ideal) interval between samples
+ signal_interval = 0.0
+ # the timestamp which the last sample was taken
+ last_sample = None
+ # the current experiment being run
+ curr_experiment = None
+ # results from previous experiments, represented as strings
+ results = []
+ # The duration of each performance experiment
+ e_duration = None
+ # The number of seconds remaining in this performance experiment.
+ r_duration = None
+
+ # temp
+ task = None
+ dilation = 1.0
+
+ @staticmethod
+ def __init__(task, speedup, e_duration, w_time, signal_interval=0.01):
+ os.environ["PYTHONASYNCIODEBUG"] = "1"
+ Nemesis.signal_interval = signal_interval
+ Nemesis.e_duration = e_duration
+ Nemesis.r_duration = w_time
+ # temporary
+ Nemesis.task = task
+ Nemesis.speedup = speedup
+
+ @staticmethod
+ def start():
+ Nemesis.last_sample = time.perf_counter()
+ signal.signal(signal.SIGALRM,
+ Nemesis._signal_handler)
+ signal.setitimer(signal.ITIMER_REAL,
+ Nemesis.signal_interval,
+ Nemesis.signal_interval)
+
+ @staticmethod
+ def stop():
+ signal.setitimer(signal.ITIMER_REAL, 0)
+ Nemesis._stop_experiment()
+ for r in Nemesis.results:
+ print()
+ print(r)
+
+ @staticmethod
+ def _start_experiment():
+ Nemesis.r_duration = Nemesis.e_duration
+ Nemesis.curr_experiment = Experiment(Nemesis.task, Nemesis.speedup)
+
+ @staticmethod
+ def _stop_experiment():
+ if Nemesis.curr_experiment is not None:
+ print(f'finished running {Nemesis.curr_experiment.get_task()} with speedup {Nemesis.curr_experiment.get_speedup()}')
+ Nemesis.results.append(Nemesis.curr_experiment.get_results())
+ del Nemesis.curr_experiment
+
+ @staticmethod
+ def _signal_handler(sig, frame):
+ curr_sample = time.perf_counter()
+ passed_time = curr_sample - Nemesis.last_sample
+ Nemesis.last_sample = curr_sample
+ if Nemesis.curr_experiment:
+ loops = Nemesis.curr_experiment.get_loops()
+ # print(loops)
+ for loop in loops:
+ loop._update_ready(True)
+ handles = Nemesis._get_waiting_handles(loop)
+ Nemesis.curr_experiment.add_handles(handles, loop, passed_time)
+
+ Nemesis.r_duration -= passed_time
+ if (Nemesis.r_duration <= 0):
+ Nemesis._stop_experiment()
+ Nemesis._start_experiment()
+
+ def _get_waiting_handles(loop):
+ handles = []
+ for handle in loop._ready:
+ if (fmt_handle := _format_handle(handle)) is not None \
+ and fmt_handle not in handles:
+ # no duplicates
+ handles.append(fmt_handle)
+ return handles
+
+ @staticmethod
+ def _should_trace(filename):
+ '''Returns FALSE if filename is uninteresting to the user.
+ Don't depend on this. It's good enough for testing.'''
+ # FIXME Assume GuixSD. Makes filtering easy
+ if not filename:
+ return False
+ if '/gnu/store' in filename:
+ return False
+ if '/usr/local/lib/python' in filename:
+ return False
+ if 'site-packages' in filename:
+ return False
+ if 'propcache' in filename:
+ return False
+ if '.pyx' in filename:
+ return False
+ if filename[0] == '<':
+ return False
+ if 'nemesis' in filename:
+ return False
+ return True
+
+
+the_globals = {
+ '__name__': '__main__',
+ '__doc__': None,
+ '__package__': None,
+ '__loader__': globals()['__loader__'],
+ '__spec__': None,
+ '__annotations__': {},
+ '__builtins__': globals()['__builtins__'],
+ '__file__': None,
+ '__cached__': None,
+}
+
+
+if __name__ == "__main__":
+ # parses CLI arguments and facilitates profiler runtime.
+ parser = argparse.ArgumentParser(
+ usage='%(prog)s [args] -- prog'
+ )
+
+ parser.add_argument('-i', '--interval',
+ help='The minimum amount of time inbetween \
+ samples in seconds.',
+ metavar='',
+ type=float,
+ default=0.01)
+ parser.add_argument('-s', '--speedup',
+ help='The amount of virtual speedup.',
+ metavar='',
+ type=float,
+ default=0.5)
+ parser.add_argument('-c', '--task',
+ help='The task to virtually speedup.',
+ metavar='',
+ type=str,
+ required=True)
+ parser.add_argument('-e', '--experiment-duration',
+ help='The performance experiment duration. Defaults to 4 seconds.',
+ metavar='',
+ type=float,
+ default=4)
+ parser.add_argument('-w', '--warmup-time',
+ help='Amount of time to wait until the first performance experiment. Default is the minimum time of 100 milliseconds',
+ metavar='',
+ type=float,
+ default=0.1)
+ parser.add_argument('prog',
+ type=str,
+ nargs='*',
+ help='Path to the python script and its arguments.')
+ args = parser.parse_args()
+
+ sys.argv = args.prog
+ try:
+ with open(args.prog[0], 'r', encoding='utf-8') as fp:
+ code = compile(fp.read(), args.prog[0], "exec")
+ Nemesis(args.task,
+ args.speedup,
+ args.experiment_duration,
+ args.warmup_time,
+ args.interval).start()
+ exec(code, the_globals)
+ Nemesis.stop()
+ except Exception:
+ traceback.print_exc()
diff --git a/nemesis/nemesis.py.bkup b/nemesis/nemesis.py.bkup
new file mode 100755
index 0000000..b2f41f4
--- /dev/null
+++ b/nemesis/nemesis.py.bkup
@@ -0,0 +1,186 @@
+#!/usr/bin/env python3
+'''
+ _/ _/ _/
+ _/_/ _/ _/_/ _/_/_/ _/_/ _/_/ _/_/_/ _/_/_/
+ _/ _/ _/ _/_/_/_/ _/ _/ _/ _/_/_/_/ _/_/ _/ _/_/
+ _/ _/_/ _/ _/ _/ _/ _/ _/_/ _/ _/_/
+ _/ _/ _/_/_/ _/ _/ _/ _/_/_/ _/_/_/ _/ _/_/_/
+
+
+Copyright 2025 bdunahu
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+
+Commentary:
+Code:
+'''
+
+import argparse
+import os
+import signal
+import subprocess
+import sys
+import threading
+import time
+from _remote_debugging import RemoteUnwinder
+
+import json
+
+class Nemesis(object):
+
+ # the process id of the target program
+ pid = 0
+ # the (ideal) interval between samples
+ signal_interval = 0.0
+ # the timestamp which the last sample was taken
+ last_sample = None
+
+ @staticmethod
+ def __init__(pid, signal_interval=0.01):
+ Nemesis.pid = pid
+ Nemesis.signal_interval = signal_interval
+
+ @staticmethod
+ def start():
+ Nemesis.last_sample = time.perf_counter()
+ signal.signal(signal.SIGALRM,
+ Nemesis._signal_handler)
+ signal.setitimer(signal.ITIMER_REAL,
+ Nemesis.signal_interval,
+ Nemesis.signal_interval)
+
+ @staticmethod
+ def stop():
+ signal.setitimer(signal.ITIMER_REAL, 0)
+
+ @staticmethod
+ def _signal_handler(sig, frame):
+ sample = Nemesis._get_all_awaited_by()
+ Nemesis._tally_coroutines(sample)
+
+ @staticmethod
+ def _format_stack_entry(elem: str|FrameInfo) -> str:
+ if not isinstance(elem, str):
+ if elem.lineno == 0 and elem.filename == "":
+ return f"{elem.funcname}"
+ else:
+ return f"{elem.funcname} {elem.filename}:{elem.lineno}"
+ return elem
+
+ @staticmethod
+ def _index(result):
+ id2name, awaits, task_stacks = {}, [], {}
+ for awaited_info in result:
+ for task_info in awaited_info.awaited_by:
+ task_id = task_info.task_id
+ task_name = task_info.task_name
+ id2name[task_id] = task_name
+
+ # Store the internal coroutine stack for this task
+ if task_info.coroutine_stack:
+ for coro_info in task_info.coroutine_stack:
+ call_stack = coro_info.call_stack
+ internal_stack = [Nemesis._format_stack_entry(frame) for frame in call_stack]
+ task_stacks[task_id] = internal_stack
+
+ # Add the awaited_by relationships (external dependencies)
+ if task_info.awaited_by:
+ for coro_info in task_info.awaited_by:
+ call_stack = coro_info.call_stack
+ parent_task_id = coro_info.task_name
+ stack = [Nemesis._format_stack_entry(frame) for frame in call_stack]
+ awaits.append((parent_task_id, stack, task_id))
+ return id2name, awaits, task_stacks
+
+ @staticmethod
+ def _get_all_awaited_by():
+ unwinder = RemoteUnwinder(Nemesis.pid)
+ return unwinder.get_all_awaited_by()
+
+ def _tally_coroutines(sample):
+ id2name, awaits, task_stacks = Nemesis._index(sample)
+ print(id2name)
+ print(awaits)
+ print(task_stacks)
+ print('--')
+ # for tid, tasks in sample:
+ # print('---')
+ # print(f'tid: {tid}')
+ # for awaited_info in sample:
+ # for task_info in awaited_info.awaited_by:
+ # print(f' task_id: {task_info.task_id}')
+ # print(f' name: {task_info.task_name}')
+ # if task_info.coroutine_stack:
+ # print(f' stack:')
+ # for coro_info in task_info.coroutine_stack:
+ # print(f' {coro_info.call_stack}')
+ # if task_info.awaited_by:
+ # print(f' parents:')
+ # for coro_info in task_info.awaited_by:
+ # print(f' {coro_info.task_name}')
+ # print(f' {coro_info.call_stack}')
+ # print(f'')
+
+
+if __name__ == "__main__":
+ def run_process(script_path, script_args):
+ if not os.path.isfile(script_path):
+ print(f"Script {script_path} does not exist.")
+ sys.exit(1)
+
+ try:
+ process = subprocess.Popen(['python3', script_path] + script_args)
+ print(f"Executed: {script_path} with {script_args} (pid {process.pid})")
+ return process.pid
+ except Exception as e:
+ print(f"Error starting script: {e}")
+ sys.exit(1)
+
+ parser = argparse.ArgumentParser(
+ usage="%(prog)s [args] -- script [args]"
+ )
+
+ parser.add_argument("-i", "--interval",
+ help="The minimum amount of time inbetween samples in seconds.",
+ metavar="",
+ type=float,
+ default=0.01)
+ parser.add_argument("-t", "--total-time",
+ help="The total amount of time to monitor the target process.",
+ metavar="",
+ type=float,
+ default=10)
+ parser.add_argument("-p", "--pid",
+ help="The pid of the target python process.",
+ metavar="",
+ type=int)
+ parser.add_argument("prog",
+ type=str,
+ nargs='*',
+ help="Path to the python script to run and its arguments.")
+
+ args = parser.parse_args()
+
+ if args.prog:
+ pid = run_process(args.prog[0], args.prog[1:])
+ # wait for process to start
+ time.sleep(0.5)
+ elif args.pid is not None:
+ pid = args.pid
+ else:
+ print("No valid arguments provided. Use -p for PID or -- followed by the script path.")
+ sys.exit(1)
+
+ Nemesis(pid, args.interval).start()
+ # stop the profiler after args.total_time
+ threading.Timer(args.total_time, Nemesis.stop).start()