diff options
Diffstat (limited to 'nemesis')
-rw-r--r-- | nemesis/__init__.py | 0 | ||||
-rw-r--r-- | nemesis/causal_event_loop.py | 159 | ||||
-rw-r--r-- | nemesis/experiment.py | 92 | ||||
-rwxr-xr-x | nemesis/nemesis.py | 212 | ||||
-rwxr-xr-x | nemesis/nemesis.py.bkup | 186 |
5 files changed, 649 insertions, 0 deletions
diff --git a/nemesis/__init__.py b/nemesis/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/nemesis/__init__.py diff --git a/nemesis/causal_event_loop.py b/nemesis/causal_event_loop.py new file mode 100644 index 0000000..4320655 --- /dev/null +++ b/nemesis/causal_event_loop.py @@ -0,0 +1,159 @@ +import asyncio +import collections +import heapq +import time +from asyncio.log import logger +from asyncio.base_events import _format_handle + +_MIN_SCHEDULED_TIMER_HANDLES = 100 +_MIN_CANCELLED_TIMER_HANDLES_FRACTION = 0.5 +MAXIMUM_SELECT_TIMEOUT = 24 * 3600 + +class CausalEventLoop(asyncio.SelectorEventLoop): + ''' + Internal time in this event loop may run faster or slower than objective time. + Control loop time dilation vs. objective time by setting the ``time_dilation`` attribute + (dilation == X means that loop time is running X times faster than objective time). + ''' + + def __init__(self) -> None: + super().__init__() + self._last_objective_time = None + self._last_subjective_time = None + self._processing = collections.deque() + self._time_dilation = 1.0 + self._task_to_optimize = None + + _select = self._selector.select + def select(timeout: float): + return _select(timeout / self._time_dilation) + self._selector.select = select + + def _set_experiment(self, task, dilation): + self._task_to_optimize = task + self._time_dilation = dilation + + def _matches_task(self, handle): + cb = handle._callback + if isinstance(getattr(cb, '__self__', None), asyncio.tasks.Task): + task = cb.__self__ + if task.get_name() == self._task_to_optimize: + return True + return False + + def _update_ready(self, sampling=False): + """ + Polls the IO selector, schedules resulting callbacks, and schedules + 'call_later' callbacks. + + This logic was separated out of `run_once` so that the list of `ready` + tasks may be updated more frequently than once per iteration. + + If SAMPLING is true, the timeout passed to the selector will always be 0. + """ + sched_count = len(self._scheduled) + if (sched_count > _MIN_SCHEDULED_TIMER_HANDLES and + self._timer_cancelled_count / sched_count > + _MIN_CANCELLED_TIMER_HANDLES_FRACTION): + # Remove delayed calls that were cancelled if their number + # is too high + new_scheduled = [] + for handle in self._scheduled: + if handle._cancelled: + handle._scheduled = False + else: + new_scheduled.append(handle) + + heapq.heapify(new_scheduled) + self._scheduled = new_scheduled + self._timer_cancelled_count = 0 + else: + # Remove delayed calls that were cancelled from head of queue. + while self._scheduled and self._scheduled[0]._cancelled: + self._timer_cancelled_count -= 1 + handle = heapq.heappop(self._scheduled) + handle._scheduled = False + + timeout = None + if sampling or self._ready or self._stopping: + timeout = 0 + elif self._scheduled: + # Compute the desired timeout. + timeout = self._scheduled[0]._when - self.time() + if timeout > MAXIMUM_SELECT_TIMEOUT: + timeout = MAXIMUM_SELECT_TIMEOUT + elif timeout < 0: + timeout = 0 + + event_list = self._selector.select(timeout) + self._process_events(event_list) + # Needed to break cycles when an exception occurs. + event_list = None + + # Handle 'later' callbacks that are ready. + end_time = self.time() + self._clock_resolution + while self._scheduled: + handle = self._scheduled[0] + if handle._when >= end_time: + break + handle = heapq.heappop(self._scheduled) + handle._scheduled = False + self._ready.append(handle) + + def _run_once(self): + """Run one full iteration of the event loop. + + This calls all currently ready callbacks. + """ + self._update_ready() + self._processing.extend(self._ready) + # This is the only place where callbacks are actually *called*. + # All other places just add them to ready. + # Note: We run all currently scheduled callbacks, but not any + # callbacks scheduled by callbacks run this time around -- + # they will be run the next time (after another I/O poll). + # Use an idiom that is thread-safe without using locks. + ntodo = len(self._processing) + for i in range(ntodo): + handle = self._processing.popleft() + try: + self._ready.remove(handle) + except ValueError: + pass + if handle._cancelled: + continue + try: + self._current_handle = handle + t0 = super().time() + handle._run() + dt = super().time() - t0 + if self._debug and dt >= self.slow_callback_duration: + logger.warning('Executing %s took %.3f seconds', + _format_handle(handle), dt * 1/self._time_dilation) + if not self._matches_task(handle): + t0 = super().time() + # insert a delay + while super().time() - t0 < dt * (1/self._time_dilation - 1): + time.sleep(0.001) + finally: + self._current_handle = None + handle = None # Needed to break cycles when an exception occurs. + + def time(self): + obj = super().time() + if self._last_objective_time is None: + self._last_objective_time = self._last_subjective_time = obj + return obj + else: + sub = self._last_subjective_time + (obj - self._last_objective_time) * self._time_dilation + self._last_subjective_time = sub + self._last_objective_time = obj + return sub + + +class CausalEventLoopPolicy(asyncio.DefaultEventLoopPolicy): + def new_event_loop(self): + return CausalEventLoop() + + +asyncio.set_event_loop_policy(CausalEventLoopPolicy()) diff --git a/nemesis/experiment.py b/nemesis/experiment.py new file mode 100644 index 0000000..87a0103 --- /dev/null +++ b/nemesis/experiment.py @@ -0,0 +1,92 @@ +from collections import defaultdict, namedtuple +import asyncio +import sys +import threading +from causal_event_loop import CausalEventLoop + +class BadLoopTypeException(Exception): + pass + +class Experiment: + + # the selected task for this experiment + _task = None + # the selected speedup for this experiment + _speedup = None + # event loops participating in this this experiment + loops = [] + # a key-value pair where keys represent a handle and values + # represent number of times sampled. + _samples = None + + def __init__(self, task, speedup): + self._task = task + self._speedup = speedup + self._samples = defaultdict(lambda: 0) + + self._set_loops() + + def get_task(self): + return self._task + + def get_speedup(self): + return self._speedup + + def get_results(self): + ret = f"Results for {self._task} at {self._speedup} times speedup:\n" + if len(self._samples) > 0: + ret += (f" {'HANDLE':<30} {'LOOP':<10} {'SEC':<10}") + for key in self._sort_samples(self._samples): + ret += f"\n {self._get_sample_str(key)}" + else: + ret += " No samples were gathered. (This is odd!)" + return ret + + def get_loops(self): + return [l for l in self._loops if l.is_running()] + + def add_handles(self, handles, loop, time): + for h in handles: + self._samples[h.__str__()] += time + + def _set_loops(self): + self._loops = self._get_event_loops() + for l in self._loops: + if not isinstance(l, CausalEventLoop): + raise BadLoopTypeException("Nemesis requires a custom event loop to insert slowdowns. It does not work on programs which change the event loop policy.") + l._set_experiment(self._task, 1.0 / self._speedup) + + def _get_event_loops(self): + '''Returns each thread's event loop, if it exists.''' + loops = [] + for t in threading.enumerate(): + frame = sys._current_frames().get(t.ident) + if frame: + loop = self._walk_back_until_loop(frame) + if loop and loop not in loops: + loops.append(loop) + return loops + + def _walk_back_until_loop(self, frame): + '''Walks back the callstack until we are in a method named '_run_once'. + If this is ever true, we assume we are in an Asyncio event loop method, + and check to see if the 'self' variable is indeed and instance of + AbstractEventLoop. Return this variable if true.''' + while frame: + if frame.f_code.co_name == '_run_once' and 'self' in frame.f_locals: + loop = frame.f_locals['self'] + if isinstance(loop, asyncio.AbstractEventLoop): + return loop + else: + frame = frame.f_back + return None + + def _get_sample_str(self, key): + value = self._samples[key] / self._speedup + return f"{key:29} {value:10}" + + def _sort_samples(self, sample_dict): + '''Returns SAMPLE_DICT in descending order by number of samples.''' + return {k: v for k, v in sorted(sample_dict.items(), + key=lambda item: item[1], + reverse=True)} diff --git a/nemesis/nemesis.py b/nemesis/nemesis.py new file mode 100755 index 0000000..eaf41f3 --- /dev/null +++ b/nemesis/nemesis.py @@ -0,0 +1,212 @@ +#!/usr/bin/env python3 +''' + _/ _/ _/ + _/_/ _/ _/_/ _/_/_/ _/_/ _/_/ _/_/_/ _/_/_/ + _/ _/ _/ _/_/_/_/ _/ _/ _/ _/_/_/_/ _/_/ _/ _/_/ + _/ _/_/ _/ _/ _/ _/ _/ _/_/ _/ _/_/ + _/ _/ _/_/_/ _/ _/ _/ _/_/_/ _/_/_/ _/ _/_/_/ + + +Copyright 2025 bdunahu + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + +Commentary: +Code: +''' + +import argparse +import asyncio +import signal +import sys +import traceback +import time +import types +import os +from experiment import Experiment +from asyncio.base_events import _format_handle + + +class Nemesis(object): + + # the (ideal) interval between samples + signal_interval = 0.0 + # the timestamp which the last sample was taken + last_sample = None + # the current experiment being run + curr_experiment = None + # results from previous experiments, represented as strings + results = [] + # The duration of each performance experiment + e_duration = None + # The number of seconds remaining in this performance experiment. + r_duration = None + + # temp + task = None + dilation = 1.0 + + @staticmethod + def __init__(task, speedup, e_duration, w_time, signal_interval=0.01): + os.environ["PYTHONASYNCIODEBUG"] = "1" + Nemesis.signal_interval = signal_interval + Nemesis.e_duration = e_duration + Nemesis.r_duration = w_time + # temporary + Nemesis.task = task + Nemesis.speedup = speedup + + @staticmethod + def start(): + Nemesis.last_sample = time.perf_counter() + signal.signal(signal.SIGALRM, + Nemesis._signal_handler) + signal.setitimer(signal.ITIMER_REAL, + Nemesis.signal_interval, + Nemesis.signal_interval) + + @staticmethod + def stop(): + signal.setitimer(signal.ITIMER_REAL, 0) + Nemesis._stop_experiment() + for r in Nemesis.results: + print() + print(r) + + @staticmethod + def _start_experiment(): + Nemesis.r_duration = Nemesis.e_duration + Nemesis.curr_experiment = Experiment(Nemesis.task, Nemesis.speedup) + + @staticmethod + def _stop_experiment(): + if Nemesis.curr_experiment is not None: + print(f'finished running {Nemesis.curr_experiment.get_task()} with speedup {Nemesis.curr_experiment.get_speedup()}') + Nemesis.results.append(Nemesis.curr_experiment.get_results()) + del Nemesis.curr_experiment + + @staticmethod + def _signal_handler(sig, frame): + curr_sample = time.perf_counter() + passed_time = curr_sample - Nemesis.last_sample + Nemesis.last_sample = curr_sample + if Nemesis.curr_experiment: + loops = Nemesis.curr_experiment.get_loops() + # print(loops) + for loop in loops: + loop._update_ready(True) + handles = Nemesis._get_waiting_handles(loop) + Nemesis.curr_experiment.add_handles(handles, loop, passed_time) + + Nemesis.r_duration -= passed_time + if (Nemesis.r_duration <= 0): + Nemesis._stop_experiment() + Nemesis._start_experiment() + + def _get_waiting_handles(loop): + handles = [] + for handle in loop._ready: + if (fmt_handle := _format_handle(handle)) is not None \ + and fmt_handle not in handles: + # no duplicates + handles.append(fmt_handle) + return handles + + @staticmethod + def _should_trace(filename): + '''Returns FALSE if filename is uninteresting to the user. + Don't depend on this. It's good enough for testing.''' + # FIXME Assume GuixSD. Makes filtering easy + if not filename: + return False + if '/gnu/store' in filename: + return False + if '/usr/local/lib/python' in filename: + return False + if 'site-packages' in filename: + return False + if 'propcache' in filename: + return False + if '.pyx' in filename: + return False + if filename[0] == '<': + return False + if 'nemesis' in filename: + return False + return True + + +the_globals = { + '__name__': '__main__', + '__doc__': None, + '__package__': None, + '__loader__': globals()['__loader__'], + '__spec__': None, + '__annotations__': {}, + '__builtins__': globals()['__builtins__'], + '__file__': None, + '__cached__': None, +} + + +if __name__ == "__main__": + # parses CLI arguments and facilitates profiler runtime. + parser = argparse.ArgumentParser( + usage='%(prog)s [args] -- prog' + ) + + parser.add_argument('-i', '--interval', + help='The minimum amount of time inbetween \ + samples in seconds.', + metavar='', + type=float, + default=0.01) + parser.add_argument('-s', '--speedup', + help='The amount of virtual speedup.', + metavar='', + type=float, + default=0.5) + parser.add_argument('-c', '--task', + help='The task to virtually speedup.', + metavar='', + type=str, + required=True) + parser.add_argument('-e', '--experiment-duration', + help='The performance experiment duration. Defaults to 4 seconds.', + metavar='', + type=float, + default=4) + parser.add_argument('-w', '--warmup-time', + help='Amount of time to wait until the first performance experiment. Default is the minimum time of 100 milliseconds', + metavar='', + type=float, + default=0.1) + parser.add_argument('prog', + type=str, + nargs='*', + help='Path to the python script and its arguments.') + args = parser.parse_args() + + sys.argv = args.prog + try: + with open(args.prog[0], 'r', encoding='utf-8') as fp: + code = compile(fp.read(), args.prog[0], "exec") + Nemesis(args.task, + args.speedup, + args.experiment_duration, + args.warmup_time, + args.interval).start() + exec(code, the_globals) + Nemesis.stop() + except Exception: + traceback.print_exc() diff --git a/nemesis/nemesis.py.bkup b/nemesis/nemesis.py.bkup new file mode 100755 index 0000000..b2f41f4 --- /dev/null +++ b/nemesis/nemesis.py.bkup @@ -0,0 +1,186 @@ +#!/usr/bin/env python3 +''' + _/ _/ _/ + _/_/ _/ _/_/ _/_/_/ _/_/ _/_/ _/_/_/ _/_/_/ + _/ _/ _/ _/_/_/_/ _/ _/ _/ _/_/_/_/ _/_/ _/ _/_/ + _/ _/_/ _/ _/ _/ _/ _/ _/_/ _/ _/_/ + _/ _/ _/_/_/ _/ _/ _/ _/_/_/ _/_/_/ _/ _/_/_/ + + +Copyright 2025 bdunahu + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + +Commentary: +Code: +''' + +import argparse +import os +import signal +import subprocess +import sys +import threading +import time +from _remote_debugging import RemoteUnwinder + +import json + +class Nemesis(object): + + # the process id of the target program + pid = 0 + # the (ideal) interval between samples + signal_interval = 0.0 + # the timestamp which the last sample was taken + last_sample = None + + @staticmethod + def __init__(pid, signal_interval=0.01): + Nemesis.pid = pid + Nemesis.signal_interval = signal_interval + + @staticmethod + def start(): + Nemesis.last_sample = time.perf_counter() + signal.signal(signal.SIGALRM, + Nemesis._signal_handler) + signal.setitimer(signal.ITIMER_REAL, + Nemesis.signal_interval, + Nemesis.signal_interval) + + @staticmethod + def stop(): + signal.setitimer(signal.ITIMER_REAL, 0) + + @staticmethod + def _signal_handler(sig, frame): + sample = Nemesis._get_all_awaited_by() + Nemesis._tally_coroutines(sample) + + @staticmethod + def _format_stack_entry(elem: str|FrameInfo) -> str: + if not isinstance(elem, str): + if elem.lineno == 0 and elem.filename == "": + return f"{elem.funcname}" + else: + return f"{elem.funcname} {elem.filename}:{elem.lineno}" + return elem + + @staticmethod + def _index(result): + id2name, awaits, task_stacks = {}, [], {} + for awaited_info in result: + for task_info in awaited_info.awaited_by: + task_id = task_info.task_id + task_name = task_info.task_name + id2name[task_id] = task_name + + # Store the internal coroutine stack for this task + if task_info.coroutine_stack: + for coro_info in task_info.coroutine_stack: + call_stack = coro_info.call_stack + internal_stack = [Nemesis._format_stack_entry(frame) for frame in call_stack] + task_stacks[task_id] = internal_stack + + # Add the awaited_by relationships (external dependencies) + if task_info.awaited_by: + for coro_info in task_info.awaited_by: + call_stack = coro_info.call_stack + parent_task_id = coro_info.task_name + stack = [Nemesis._format_stack_entry(frame) for frame in call_stack] + awaits.append((parent_task_id, stack, task_id)) + return id2name, awaits, task_stacks + + @staticmethod + def _get_all_awaited_by(): + unwinder = RemoteUnwinder(Nemesis.pid) + return unwinder.get_all_awaited_by() + + def _tally_coroutines(sample): + id2name, awaits, task_stacks = Nemesis._index(sample) + print(id2name) + print(awaits) + print(task_stacks) + print('--') + # for tid, tasks in sample: + # print('---') + # print(f'tid: {tid}') + # for awaited_info in sample: + # for task_info in awaited_info.awaited_by: + # print(f' task_id: {task_info.task_id}') + # print(f' name: {task_info.task_name}') + # if task_info.coroutine_stack: + # print(f' stack:') + # for coro_info in task_info.coroutine_stack: + # print(f' {coro_info.call_stack}') + # if task_info.awaited_by: + # print(f' parents:') + # for coro_info in task_info.awaited_by: + # print(f' {coro_info.task_name}') + # print(f' {coro_info.call_stack}') + # print(f'') + + +if __name__ == "__main__": + def run_process(script_path, script_args): + if not os.path.isfile(script_path): + print(f"Script {script_path} does not exist.") + sys.exit(1) + + try: + process = subprocess.Popen(['python3', script_path] + script_args) + print(f"Executed: {script_path} with {script_args} (pid {process.pid})") + return process.pid + except Exception as e: + print(f"Error starting script: {e}") + sys.exit(1) + + parser = argparse.ArgumentParser( + usage="%(prog)s [args] -- script [args]" + ) + + parser.add_argument("-i", "--interval", + help="The minimum amount of time inbetween samples in seconds.", + metavar="", + type=float, + default=0.01) + parser.add_argument("-t", "--total-time", + help="The total amount of time to monitor the target process.", + metavar="", + type=float, + default=10) + parser.add_argument("-p", "--pid", + help="The pid of the target python process.", + metavar="", + type=int) + parser.add_argument("prog", + type=str, + nargs='*', + help="Path to the python script to run and its arguments.") + + args = parser.parse_args() + + if args.prog: + pid = run_process(args.prog[0], args.prog[1:]) + # wait for process to start + time.sleep(0.5) + elif args.pid is not None: + pid = args.pid + else: + print("No valid arguments provided. Use -p for PID or -- followed by the script path.") + sys.exit(1) + + Nemesis(pid, args.interval).start() + # stop the profiler after args.total_time + threading.Timer(args.total_time, Nemesis.stop).start() |