import sys import os import threading import traceback import runpy import atexit import signal from collections import defaultdict class mini_scalene: cpu_samples = defaultdict(lambda: 0) total_cpu_samples = 0 signal_interval = 0.01 def __init__(self): signal.signal(signal.SIGPROF, self.cpu_signal_handler) signal.setitimer(signal.ITIMER_PROF, self.signal_interval, self.signal_interval) @staticmethod def start(): atexit.register(mini_scalene.exit_handler) @staticmethod def exit_handler(): # Turn off the profiling signals. signal.signal(signal.ITIMER_PROF, signal.SIG_IGN) signal.signal(signal.SIGVTALRM, signal.SIG_IGN) signal.setitimer(signal.ITIMER_PROF, 0) # If we've collected any samples, dump them. print("CPU usage:") if mini_scalene.total_cpu_samples > 0: # Sort the samples in descending order by number of samples. mini_scalene.cpu_samples = {k: v for k, v in sorted( mini_scalene.cpu_samples.items(), key=lambda item: item[1], reverse=True)} for key in mini_scalene.cpu_samples: print(key + " : " + str(mini_scalene.cpu_samples[key] * 100 / mini_scalene.total_cpu_samples) + "%" + " (" + str( mini_scalene.cpu_samples[key]) + " total samples)") else: print("(did not run long enough to profile)") @staticmethod def cpu_signal_handler(sig, frame): keys = mini_scalene.compute_frames_to_record(frame) for key in keys: mini_scalene.cpu_samples[mini_scalene.frame_to_string(key)] += 1 mini_scalene.total_cpu_samples += 1 return @staticmethod def compute_frames_to_record(this_frame): """Collects all stack frames that Scalene actually processes.""" frames = [this_frame] frames += [sys._current_frames().get(t.ident, None) for t in threading.enumerate()] # Process all the frames to remove ones we aren't going to track. new_frames = [] for frame in frames: if frame is None: continue fname = frame.f_code.co_filename # Record samples only for files we care about. if (len(fname)) == 0: # 'eval/compile' gives no f_code.co_filename. We have # to look back into the outer frame in order to check # the co_filename. fname = frame.f_back.f_code.co_filename if not mini_scalene.should_trace(fname): continue new_frames.append(frame) return new_frames @staticmethod def frame_to_string(frame): co = frame.f_code func_name = co.co_name line_no = frame.f_lineno filename = co.co_filename return filename + '\t' + func_name + '\t' + str(line_no) @staticmethod def should_trace(filename): # Don't trace the profiler itself. if 'mini-scalene.py' in filename: return False # Don't trace Python builtins. if '' in filename: return False if '' in filename: return False return True def main(): assert len( sys.argv) >= 2, "(Usage): python3 mini-scalene.py file.py {args ...}" script = sys.argv[1] mini_scalene().start() try: runpy.run_path(script, run_name="__main__") except Exception: traceback.print_exc() if __name__ == "__main__": main()