import sys import argparse import threading import traceback import runpy import atexit import signal import asyncio from typing import cast from types import FrameType from collections import defaultdict def parse_arguments(): '''Parse CLI args''' parser = argparse.ArgumentParser() parser.add_argument('-a', '--async_off', action='store_false', help='Turn off experimental async profiling.', default=True) parser.add_argument('script', help='A python script to run') parser.add_argument('s_args', nargs=argparse.REMAINDER, help='python script args') return parser.parse_args() class mini_scalene: '''A stripped-down version of SCALENE which tallies active lines during execution.''' # a key-value pair where keys represent frame metadata (see # mini_scalene.frame_to_string) and values represent number of times # sampled. cpu_samples = defaultdict(lambda: 0) total_cpu_samples = 0 # the time, in seconds, between samples signal_interval = 0.01 # if we should try to profile asynchronous code. Used to observe # effectiveness of the implementation. profile_async = True def __init__(self): signal.signal(signal.SIGPROF, self.cpu_signal_handler) signal.setitimer(signal.ITIMER_PROF, self.signal_interval, self.signal_interval) @staticmethod def start(profile_async): mini_scalene.profile_async = profile_async atexit.register(mini_scalene.exit_handler) @staticmethod def exit_handler(): '''Turn off our profiling signals and pretty-print profiling information.''' mini_scalene.disable_signals() # If we've collected any samples, dump them. print("CPU usage:") if mini_scalene.total_cpu_samples > 0: # Sort the samples in descending order by number of samples. mini_scalene.cpu_samples = {k: v for k, v in sorted( mini_scalene.cpu_samples.items(), key=lambda item: item[1], reverse=True)} for key in mini_scalene.cpu_samples: print(key + " : " + str(mini_scalene.cpu_samples[key] * 100 / mini_scalene.total_cpu_samples) + "%" + " (" + str(mini_scalene.cpu_samples[key]) + " total samples)") else: print("(did not run long enough to profile)") @staticmethod def disable_signals(): signal.signal(signal.ITIMER_PROF, signal.SIG_IGN) signal.signal(signal.SIGVTALRM, signal.SIG_IGN) signal.setitimer(signal.ITIMER_PROF, 0) @staticmethod def cpu_signal_handler(sig, frame): keys = mini_scalene.compute_frames_to_record(frame) for key in keys: mini_scalene.cpu_samples[mini_scalene.frame_to_string(key)] += 1 mini_scalene.total_cpu_samples += 1 return @staticmethod def compute_frames_to_record(this_frame): '''Collects all stack frames that Scalene actually processes.''' frames = [this_frame] frames += [sys._current_frames().get(t.ident, None) for t in threading.enumerate()] frames += mini_scalene.get_async_frames() frames = mini_scalene.filter_duplicated_frames(frames) # Process all the frames to remove ones we aren't going to track. new_frames = [] for frame in frames: if frame is None: continue fname = frame.f_code.co_filename # Record samples only for files we care about. if (len(fname)) == 0: # 'eval/compile' gives no f_code.co_filename. We have # to look back into the outer frame in order to check # the co_filename. fname = frame.f_back.f_code.co_filename while not mini_scalene.should_trace(fname): # Walk the stack backwards until we hit a frame that # IS one we should trace (if there is one). i.e., if # it's in the code being profiled, and it is just # calling stuff deep in libraries. if frame: frame = cast(FrameType, frame.f_back) else: break if frame: fname = frame.f_code.co_filename if frame: new_frames.append(frame) return new_frames @staticmethod def frame_to_string(frame): '''Pretty-prints a frame as a function/file name and a line number. Additionally used a key for tallying lines.''' co = frame.f_code func_name = co.co_name line_no = frame.f_lineno filename = co.co_filename return filename + '\t' + func_name + '\t' + str(line_no) @staticmethod def get_async_frames(): '''Obtains the stack frames of all currently executing tasks.''' if mini_scalene.is_event_loop_running() and mini_scalene.profile_async: return [task.get_coro().cr_frame for task in asyncio.all_tasks()] return [] @staticmethod def should_trace(filename): '''Returns FALSE if filename is uninteresting to the user.''' # FIXME Assume GuixSD. Makes filtering easy if '/gnu/store' in filename: return False if filename[0] == '<': return False if 'mini-scalene.py' in filename: return False return True @staticmethod def is_event_loop_running() -> bool: '''Returns TRUE if there is an exent loop running. This is what `asyncio.get_event_loop()' did, before it was deprecated in 3.12''' return asyncio.get_event_loop_policy()._local._loop is not None @staticmethod def filter_duplicated_frames(frames) -> bool: s = set() dup = [] for f in frames: if f in s: dup.append(f) else: s.add(f) # TODO we probably have one because given get_async_frames returns the # currently executing task. Would be an easy fix in that method. # if there's more than one, I cannot explain it. assert len( dup) < 2, f"ERROR: More than 1 duplicate frame (shouldn't happen): {dup}" if len(dup) != 0: print(f"WARN: Duplicate frame found: {dup}", file=sys.stderr) return list(s) def main(): args = parse_arguments() mini_scalene().start(args.async_off) sys.argv = [args.script] + args.s_args try: runpy.run_path(args.script, run_name="__main__") except Exception: traceback.print_exc() if __name__ == "__main__": main()