import sys import threading import traceback import runpy import atexit import signal import asyncio import inspect from typing import cast from types import FrameType from collections import defaultdict class mini_scalene: cpu_samples = defaultdict(lambda: 0) total_cpu_samples = 0 signal_interval = 0.01 def __init__(self): signal.signal(signal.SIGPROF, self.cpu_signal_handler) signal.setitimer(signal.ITIMER_PROF, self.signal_interval, self.signal_interval) @staticmethod def start(): atexit.register(mini_scalene.exit_handler) @staticmethod def exit_handler(): # Turn off the profiling signals. signal.signal(signal.ITIMER_PROF, signal.SIG_IGN) signal.signal(signal.SIGVTALRM, signal.SIG_IGN) signal.setitimer(signal.ITIMER_PROF, 0) # If we've collected any samples, dump them. print("CPU usage:") if mini_scalene.total_cpu_samples > 0: # Sort the samples in descending order by number of samples. mini_scalene.cpu_samples = {k: v for k, v in sorted( mini_scalene.cpu_samples.items(), key=lambda item: item[1], reverse=True)} for key in mini_scalene.cpu_samples: print(key + " : " + str(mini_scalene.cpu_samples[key] * 100 / mini_scalene.total_cpu_samples) + "%" + " (" + str( mini_scalene.cpu_samples[key]) + " total samples)") else: print("(did not run long enough to profile)") @staticmethod def cpu_signal_handler(sig, frame): keys = mini_scalene.compute_frames_to_record(frame) for key in keys: mini_scalene.cpu_samples[mini_scalene.frame_to_string(key)] += 1 mini_scalene.total_cpu_samples += 1 return @staticmethod def compute_frames_to_record(this_frame): """Collects all stack frames that Scalene actually processes.""" frames = [this_frame] frames += [sys._current_frames().get(t.ident, None) for t in threading.enumerate()] frames += mini_scalene.get_async_frames() # Process all the frames to remove ones we aren't going to track. new_frames = [] for frame in frames: if frame is None: continue fname = frame.f_code.co_filename # Record samples only for files we care about. if (len(fname)) == 0: # 'eval/compile' gives no f_code.co_filename. We have # to look back into the outer frame in order to check # the co_filename. fname = frame.f_back.f_code.co_filename while not mini_scalene.should_trace(fname): # Walk the stack backwards until we hit a frame that # IS one we should trace (if there is one). i.e., if # it's in the code being profiled, and it is just # calling stuff deep in libraries. if frame: frame = cast(FrameType, frame.f_back) else: break if frame: fname = frame.f_code.co_filename if frame: new_frames.append(frame) return new_frames @staticmethod def frame_to_string(frame): co = frame.f_code func_name = co.co_name line_no = frame.f_lineno filename = co.co_filename return filename + '\t' + func_name + '\t' + str(line_no) @staticmethod def get_async_frames(): if mini_scalene.is_event_loop_running(): return [task.get_coro().cr_frame for task in asyncio.all_tasks()] return [] @staticmethod def should_trace(filename): # We're assuming Guix System. That makes it easy. if '/gnu/store' in filename: return False if 'mini-scalene.py' in filename: return False if '' in filename: return False if '' in filename: return False return True @staticmethod def is_event_loop_running() -> bool: return asyncio.get_event_loop_policy()._local._loop is not None def main(): assert len(sys.argv) >= 2, "(Usage): python3 mini-scalene.py file.py {args ...}" script = sys.argv[1] mini_scalene().start() try: runpy.run_path(script, run_name="__main__") except Exception: traceback.print_exc() if __name__ == "__main__": main()