#!/usr/bin/env python3 ''' _/ _/ _/ _/_/ _/ _/_/ _/_/_/ _/_/ _/_/ _/_/_/ _/_/_/ _/ _/ _/ _/_/_/_/ _/ _/ _/ _/_/_/_/ _/_/ _/ _/_/ _/ _/_/ _/ _/ _/ _/ _/ _/_/ _/ _/_/ _/ _/ _/_/_/ _/ _/ _/ _/_/_/ _/_/_/ _/ _/_/_/ Copyright 2025 bdunahu Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. Commentary: Code: ''' import argparse import os import signal import subprocess import sys import threading import time from _remote_debugging import RemoteUnwinder import json class Nemesis(object): # the process id of the target program pid = 0 # the (ideal) interval between samples signal_interval = 0.0 # the timestamp which the last sample was taken last_sample = None @staticmethod def __init__(pid, signal_interval=0.01): Nemesis.pid = pid Nemesis.signal_interval = signal_interval @staticmethod def start(): Nemesis.last_sample = time.perf_counter() signal.signal(signal.SIGALRM, Nemesis._signal_handler) signal.setitimer(signal.ITIMER_REAL, Nemesis.signal_interval, Nemesis.signal_interval) @staticmethod def stop(): signal.setitimer(signal.ITIMER_REAL, 0) @staticmethod def _signal_handler(sig, frame): sample = Nemesis._get_all_awaited_by() Nemesis._tally_coroutines(sample) @staticmethod def _format_stack_entry(elem: str|FrameInfo) -> str: if not isinstance(elem, str): if elem.lineno == 0 and elem.filename == "": return f"{elem.funcname}" else: return f"{elem.funcname} {elem.filename}:{elem.lineno}" return elem @staticmethod def _index(result): id2name, awaits, task_stacks = {}, [], {} for awaited_info in result: for task_info in awaited_info.awaited_by: task_id = task_info.task_id task_name = task_info.task_name id2name[task_id] = task_name # Store the internal coroutine stack for this task if task_info.coroutine_stack: for coro_info in task_info.coroutine_stack: call_stack = coro_info.call_stack internal_stack = [Nemesis._format_stack_entry(frame) for frame in call_stack] task_stacks[task_id] = internal_stack # Add the awaited_by relationships (external dependencies) if task_info.awaited_by: for coro_info in task_info.awaited_by: call_stack = coro_info.call_stack parent_task_id = coro_info.task_name stack = [Nemesis._format_stack_entry(frame) for frame in call_stack] awaits.append((parent_task_id, stack, task_id)) return id2name, awaits, task_stacks @staticmethod def _get_all_awaited_by(): unwinder = RemoteUnwinder(Nemesis.pid) return unwinder.get_all_awaited_by() def _tally_coroutines(sample): id2name, awaits, task_stacks = Nemesis._index(sample) print(id2name) print(awaits) print(task_stacks) print('--') # for tid, tasks in sample: # print('---') # print(f'tid: {tid}') # for awaited_info in sample: # for task_info in awaited_info.awaited_by: # print(f' task_id: {task_info.task_id}') # print(f' name: {task_info.task_name}') # if task_info.coroutine_stack: # print(f' stack:') # for coro_info in task_info.coroutine_stack: # print(f' {coro_info.call_stack}') # if task_info.awaited_by: # print(f' parents:') # for coro_info in task_info.awaited_by: # print(f' {coro_info.task_name}') # print(f' {coro_info.call_stack}') # print(f'') if __name__ == "__main__": def run_process(script_path, script_args): if not os.path.isfile(script_path): print(f"Script {script_path} does not exist.") sys.exit(1) try: process = subprocess.Popen(['python3', script_path] + script_args) print(f"Executed: {script_path} with {script_args} (pid {process.pid})") return process.pid except Exception as e: print(f"Error starting script: {e}") sys.exit(1) parser = argparse.ArgumentParser( usage="%(prog)s [args] -- script [args]" ) parser.add_argument("-i", "--interval", help="The minimum amount of time inbetween samples in seconds.", metavar="", type=float, default=0.01) parser.add_argument("-t", "--total-time", help="The total amount of time to monitor the target process.", metavar="", type=float, default=10) parser.add_argument("-p", "--pid", help="The pid of the target python process.", metavar="", type=int) parser.add_argument("prog", type=str, nargs='*', help="Path to the python script to run and its arguments.") args = parser.parse_args() if args.prog: pid = run_process(args.prog[0], args.prog[1:]) # wait for process to start time.sleep(0.5) elif args.pid is not None: pid = args.pid else: print("No valid arguments provided. Use -p for PID or -- followed by the script path.") sys.exit(1) Nemesis(pid, args.interval).start() # stop the profiler after args.total_time threading.Timer(args.total_time, Nemesis.stop).start()