import sys import argparse import os import threading import traceback import atexit import signal import asyncio import time from typing import cast from types import FrameType from collections import defaultdict the_globals = { '__name__': '__main__', '__doc__': None, '__package__': None, '__loader__': globals()['__loader__'], '__spec__': None, '__annotations__': {}, '__builtins__': globals()['__builtins__'], '__file__': None, '__cached__': None, } def parse_arguments(): '''Parse CLI args''' parser = argparse.ArgumentParser() parser.add_argument('-a', '--async_off', action='store_false', help='Turn off experimental async profiling.', default=True) parser.add_argument('script', help='A python script to run') parser.add_argument('s_args', nargs=argparse.REMAINDER, help='python script args') return parser.parse_args() class mini_scalene(object): '''A stripped-down version of SCALENE which tallies active lines during execution.''' # a key-value pair where keys represent frame metadata (see # mini_scalene.frame_to_string) and values represent number of times # sampled. cpu_samples = defaultdict(lambda: 0) cpu_samples_c = defaultdict(lambda: 0) # number of times samples have been collected total_cpu_samples = 0 # the time, in seconds, between samples signal_interval = 0.01 # the timestamp recorded last signal last_signal_time = 0.0 # if we should try to profile asynchronous code. Used to observe # effectiveness of the implementation. profile_async = True def __init__(self): signal.signal(signal.SIGPROF, self.cpu_signal_handler) signal.setitimer(signal.ITIMER_PROF, self.signal_interval, self.signal_interval) mini_scalene.last_signal_time = mini_scalene.gettime() @staticmethod def gettime(): '''get the wallclock time''' return time.perf_counter() @staticmethod def start(profile_async): mini_scalene.profile_async = profile_async atexit.register(mini_scalene.exit_handler) @staticmethod def exit_handler(): '''Turn off profiling signals & pretty-print profiling information.''' mini_scalene.disable_signals() # If we've collected any samples, dump them. print("CPU usage (Python):") if mini_scalene.total_cpu_samples > 0: for key in mini_scalene.sort_samples(mini_scalene.cpu_samples): print(f"{key} : " f"{mini_scalene.cpu_samples[key] * 100 / mini_scalene.total_cpu_samples:.3f} % " f"({mini_scalene.cpu_samples[key]:.1f} total samples)") print("CPU usage (Native):") for key in mini_scalene.sort_samples(mini_scalene.cpu_samples_c): print(f"{key} : " f"{mini_scalene.cpu_samples_c[key] * 100 / mini_scalene.total_cpu_samples:.3f} % " f"({mini_scalene.cpu_samples_c[key]:.1f} total samples)") else: print("(did not run long enough to profile)") @staticmethod def disable_signals(): signal.signal(signal.ITIMER_PROF, signal.SIG_IGN) signal.signal(signal.SIGVTALRM, signal.SIG_IGN) signal.setitimer(signal.ITIMER_PROF, 0) @staticmethod def cpu_signal_handler(sig, frame): elapsed_since_last_signal = mini_scalene.gettime() - \ mini_scalene.last_signal_time c_time_norm = (elapsed_since_last_signal - mini_scalene.signal_interval) / \ mini_scalene.signal_interval keys = mini_scalene.compute_frames_to_record(frame) for key in keys: mini_scalene.cpu_samples[mini_scalene.frame_to_string(key)] += 1 mini_scalene.cpu_samples_c[mini_scalene.frame_to_string( key)] += c_time_norm mini_scalene.total_cpu_samples += elapsed_since_last_signal / \ mini_scalene.signal_interval mini_scalene.last_signal_time = mini_scalene.gettime() @staticmethod def compute_frames_to_record(this_frame): '''Collects all stack frames that Scalene actually processes.''' frames = [this_frame] frames += [sys._current_frames().get(t.ident, None) for t in threading.enumerate()] frames += mini_scalene.get_async_frames() frames = mini_scalene.filter_duplicated_frames(frames) # Process all the frames to remove ones we aren't going to track. new_frames = [] for frame in frames: if frame is None: continue fname = frame.f_code.co_filename # Record samples only for files we care about. if (len(fname)) == 0: # 'eval/compile' gives no f_code.co_filename. We have # to look back into the outer frame in order to check # the co_filename. fname = frame.f_back.f_code.co_filename while not mini_scalene.should_trace(fname): # Walk the stack backwards until we hit a frame that # IS one we should trace (if there is one). i.e., if # it's in the code being profiled, and it is just # calling stuff deep in libraries. if frame: frame = cast(FrameType, frame.f_back) else: break if frame: fname = frame.f_code.co_filename if frame: new_frames.append(frame) return new_frames @staticmethod def frame_to_string(frame): '''Pretty-prints a frame as a function/file name and a line number. Additionally used a key for tallying lines.''' co = frame.f_code func_name = co.co_name line_no = frame.f_lineno filename = co.co_filename return filename + '\t' + func_name + '\t' + str(line_no) @staticmethod def get_async_frames(): '''Obtains the stack frames of all currently executing tasks.''' if mini_scalene.is_event_loop_running() and mini_scalene.profile_async: return [task.get_coro().cr_frame for task in asyncio.all_tasks()] return [] @staticmethod def should_trace(filename): '''Returns FALSE if filename is uninteresting to the user.''' # FIXME Assume GuixSD. Makes filtering easy if '/gnu/store' in filename: return False if filename[0] == '<': return False if 'mini-scalene.py' in filename: return False return True @staticmethod def is_event_loop_running() -> bool: '''Returns TRUE if there is an exent loop running. This is what `asyncio.get_event_loop()' did, before it was deprecated in 3.12''' return asyncio.get_event_loop_policy()._local._loop is not None @staticmethod def sort_samples(sample_dict): '''Returns SAMPLE_DICT in descending order by number of samples.''' return {k: v for k, v in sorted(sample_dict.items(), key=lambda item: item[1], reverse=True)} @staticmethod def filter_duplicated_frames(frames) -> bool: s = set() dup = [] for f in frames: if f in s: dup.append(f) else: s.add(f) # TODO we probably have one because given get_async_frames returns the # currently executing task. Would be an easy fix in that method. # if there's more than one, I cannot explain it. assert len( dup) < 2, f"ERROR: More than 1 duplicate frame (shouldn't happen): {dup}" if len(dup) != 0: print(f"WARN: Duplicate frame found: {dup}", file=sys.stderr) return list(s) def main(): args = parse_arguments() sys.argv = [args.script] + args.s_args try: with open(args.script, 'rb') as fp: code = compile(fp.read(), args.script, "exec") mini_scalene().start(args.async_off) exec(code, the_globals) except Exception: traceback.print_exc() if __name__ == "__main__": main()