import sys import argparse import threading import traceback import signal import asyncio import time from typing import cast from types import FrameType from collections import defaultdict import replacement_epoll_selector the_globals = { '__name__': '__main__', '__doc__': None, '__package__': None, '__loader__': globals()['__loader__'], '__spec__': None, '__annotations__': {}, '__builtins__': globals()['__builtins__'], '__file__': None, '__cached__': None, } def parse_arguments(): '''Parse CLI args''' parser = argparse.ArgumentParser() parser.add_argument('-a', '--async_off', action='store_false', help='Turn off experimental async profiling.', default=True) parser.add_argument('script', help='A python script to run') parser.add_argument('s_args', nargs=argparse.REMAINDER, help='python script args') return parser.parse_args() class Aergia(object): '''A stripped-down version of SCALENE which tallies active lines during execution.''' # a key-value pair where keys represent frame metadata (see # Aergia.frame_to_string) and values represent number of times # sampled. cpu_samples = defaultdict(lambda: 0) cpu_samples_c = defaultdict(lambda: 0) # number of times samples have been collected total_cpu_samples = 0 # the time, in seconds, between samples signal_interval = 0.01 # the timestamp recorded last signal last_signal_time = 0.0 # if we should try to profile asynchronous code. Used to observe # effectiveness of the implementation. profile_async = True def __init__(self): signal.signal(signal.SIGPROF, self.cpu_signal_handler) signal.setitimer(signal.ITIMER_PROF, self.signal_interval, self.signal_interval) Aergia.last_signal_time = Aergia.gettime() @staticmethod def gettime(): '''get the wallclock time''' return time.perf_counter() @staticmethod def start(profile_async): Aergia.profile_async = profile_async @staticmethod def stop(): '''Turn off profiling signals''' Aergia.disable_signals() Aergia.exit_handler() @staticmethod def exit_handler(): '''Pretty-print profiling information.''' # If we've collected any samples, dump them. print("CPU usage (Python):") if Aergia.total_cpu_samples > 0: for key in Aergia.sort_samples(Aergia.cpu_samples): print(f"{key} : " f"{Aergia.cpu_samples[key] * 100 / Aergia.total_cpu_samples:.3f} % " f"({Aergia.cpu_samples[key]:.1f} total samples)") print("CPU usage (Native):") for key in Aergia.sort_samples(Aergia.cpu_samples_c): print(f"{key} : " f"{Aergia.cpu_samples_c[key] * 100 / Aergia.total_cpu_samples:.3f} % " f"({Aergia.cpu_samples_c[key]:.1f} total samples)") else: print("(did not run long enough to profile)") @staticmethod def disable_signals(): signal.signal(signal.ITIMER_PROF, signal.SIG_IGN) signal.signal(signal.SIGVTALRM, signal.SIG_IGN) signal.setitimer(signal.ITIMER_PROF, 0) @staticmethod def cpu_signal_handler(sig, frame): elapsed_since_last_signal = Aergia.gettime() - \ Aergia.last_signal_time c_time_norm = (elapsed_since_last_signal - Aergia.signal_interval) / \ Aergia.signal_interval keys = Aergia.compute_frames_to_record(frame) for key in keys: Aergia.cpu_samples[Aergia.frame_to_string(key)] += 1 Aergia.cpu_samples_c[Aergia.frame_to_string( key)] += c_time_norm Aergia.total_cpu_samples += elapsed_since_last_signal / \ Aergia.signal_interval Aergia.last_signal_time = Aergia.gettime() @staticmethod def compute_frames_to_record(this_frame): '''Collects all stack frames that Aergia actually processes.''' frames = [this_frame] frames += [sys._current_frames().get(t.ident, None) for t in threading.enumerate()] frames += Aergia.get_async_frames() frames = Aergia.filter_duplicated_frames(frames) # Process all the frames to remove ones we aren't going to track. new_frames = [] for frame in frames: if frame is None: continue fname = frame.f_code.co_filename # Record samples only for files we care about. if (len(fname)) == 0: # 'eval/compile' gives no f_code.co_filename. We have # to look back into the outer frame in order to check # the co_filename. fname = frame.f_back.f_code.co_filename while not Aergia.should_trace(fname): # Walk the stack backwards until we hit a frame that # IS one we should trace (if there is one). i.e., if # it's in the code being profiled, and it is just # calling stuff deep in libraries. if frame: frame = cast(FrameType, frame.f_back) else: break if frame: fname = frame.f_code.co_filename if frame: new_frames.append(frame) return new_frames @staticmethod def frame_to_string(frame): '''Pretty-prints a frame as a function/file name and a line number. Additionally used a key for tallying lines.''' co = frame.f_code func_name = co.co_name line_no = frame.f_lineno filename = co.co_filename return filename + '\t' + func_name + '\t' + str(line_no) @staticmethod def get_async_frames(): '''Obtains the stack frames of all currently executing tasks.''' if Aergia.is_event_loop_running() and Aergia.profile_async: return [task.get_coro().cr_frame for task in asyncio.all_tasks()] return [] @staticmethod def should_trace(filename): '''Returns FALSE if filename is uninteresting to the user.''' # FIXME Assume GuixSD. Makes filtering easy if '/gnu/store' in filename: return False if filename[0] == '<': return False if 'aergia.py' in filename: return False return True @staticmethod def is_event_loop_running() -> bool: '''Returns TRUE if there is an exent loop running. This is what `asyncio.get_event_loop()' did, before it was deprecated in 3.12''' return asyncio.get_event_loop_policy()._local._loop is not None @staticmethod def sort_samples(sample_dict): '''Returns SAMPLE_DICT in descending order by number of samples.''' return {k: v for k, v in sorted(sample_dict.items(), key=lambda item: item[1], reverse=True)} @staticmethod def filter_duplicated_frames(frames) -> bool: s = set() dup = [] for f in frames: if f in s: dup.append(f) else: s.add(f) # TODO we probably have one because given get_async_frames returns the # currently executing task. Would be an easy fix in that method. # if there's more than one, I cannot explain it. assert len( dup) < 2, f"ERROR: More than 1 duplicate frame (shouldn't happen): {dup}" if len(dup) != 0: print(f"WARN: Duplicate frame found: {dup}", file=sys.stderr) return list(s) def main(): args = parse_arguments() sys.argv = [args.script] + args.s_args try: with open(args.script, 'rb') as fp: code = compile(fp.read(), args.script, "exec") Aergia().start(args.async_off) exec(code, the_globals) Aergia().stop() except Exception: traceback.print_exc() if __name__ == "__main__": main()