#!/usr/bin/env python3 ''' _/_/ _/ _/ _/ _/_/ _/ _/_/ _/_/_/ _/_/_/ _/_/_/_/ _/_/_/_/ _/_/ _/ _/ _/ _/ _/ _/ _/ _/ _/ _/ _/ _/ _/ _/ _/ _/ _/_/_/ _/ _/_/_/ _/ _/_/_/ _/ _/_/ Copyright: This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program. If not, see . Commentary: Aergia is a sampling based profiler based off of SCALENE by Emery Berger (https://github.com/plasma-umass/scalene). It is not particularly informative, but unlike SCALENE or other sampling-based profilers I could find, reports the wall-time each asyncio await call spends idling. The goal behind Aergia is to eventually have these features, or similar, merged into SCALENE. Code: ''' from collections import defaultdict from types import FrameType from typing import cast, List, Tuple import argparse import asyncio import atexit import selectors import signal import sys import time import traceback class ReplacementEpollSelector(selectors.EpollSelector): ''' Provides a replacement for selectors.PollSelector that periodically wakes up to accept signals. ''' def select( self, timeout=None ) -> List[Tuple[selectors.SelectorKey, int]]: start_time = time.perf_counter() if not timeout or timeout < 0: interval = sys.getswitchinterval() else: interval = min(timeout, sys.getswitchinterval()) while True: selected = super().select(interval) if selected or timeout == 0 or not timeout: return selected end_time = time.perf_counter() if end_time - start_time >= timeout: return [] selectors.DefaultSelector = ReplacementEpollSelector class Aergia(object): # a key-value pair where keys represent frame metadata (see # Aergia.frame_to_string) and values represent number of times # sampled. samples = defaultdict(lambda: [0, 0]) # number of times samples have been collected total_samples = 0 # the timestamp recorded last signal last_signal_time = 0.0 def __init__(self, signal_interval): signal.signal(signal.SIGPROF, self.cpu_signal_handler) signal.setitimer(signal.ITIMER_PROF, signal_interval, signal_interval) @staticmethod def start(): atexit.register(Aergia.exit_handler) Aergia.last_signal_time = Aergia.gettime() @staticmethod def exit_handler(): Aergia.print_samples() Aergia.disable_signals() @staticmethod def print_samples(): '''Pretty-print profiling results.''' if Aergia.total_samples > 0: print("FILE\tFUNC\tPERC\tACTUAL+CALCULATED=SECONDS") for key in Aergia.sort_samples(Aergia.samples): Aergia.print_sample(key) else: print("No samples were gathered. If you are using concurrency, " "this is likely a bug and you may run the profiler again.") @staticmethod def print_sample(key): '''Pretty-print a single sample.''' sig_intv = Aergia.get_sampling_interval() value = Aergia.samples[key] tot = Aergia.sum_sample(value) print(f"{key} :\t{tot * 100 / Aergia.total_samples:.3f}%" f"\t({value[0]:.3f} + {value[1]:.3f} =" f" {tot*sig_intv:.6f} seconds)") @staticmethod def disable_signals(): signal.signal(signal.ITIMER_PROF, signal.SIG_IGN) signal.setitimer(signal.ITIMER_PROF, 0) @staticmethod def cpu_signal_handler(sig, frame): sig_intv = Aergia.get_sampling_interval() elapsed_since_last_signal = Aergia.gettime() - \ Aergia.last_signal_time c_time_norm = (elapsed_since_last_signal - sig_intv) / \ sig_intv keys = Aergia.compute_frames_to_record() for key in keys: Aergia.samples[Aergia.frame_to_string(key)][0] += \ elapsed_since_last_signal Aergia.samples[Aergia.frame_to_string(key)][1] += c_time_norm Aergia.total_samples += elapsed_since_last_signal / \ sig_intv Aergia.last_signal_time = Aergia.gettime() @staticmethod def compute_frames_to_record(): '''Collects all stack frames that Aergia actually processes.''' frames = [] if Aergia.is_event_loop_running(): frames = Aergia.get_current_idle_tasks() # Process all the frames to remove ones we aren't going to track. new_frames = [] for frame in frames: if frame is None: continue fname = frame.f_code.co_filename # Record samples only for files we care about. if (len(fname)) == 0: # 'eval/compile' gives no f_code.co_filename. We have # to look back into the outer frame in order to check # the co_filename. fname = frame.f_back.f_code.co_filename while not Aergia.should_trace(fname): # Walk the stack backwards until we hit a frame that # IS one we should trace (if there is one). i.e., if # it's in the code being profiled, and it is just # calling stuff deep in libraries. if frame: frame = cast(FrameType, frame.f_back) else: break if frame: fname = frame.f_code.co_filename if frame: new_frames.append(frame) return new_frames @staticmethod def get_current_idle_tasks(): '''Obtains the stack of frames of all currently idle tasks.''' curr_task = asyncio.current_task() return [ task.get_coro().cr_frame for task in asyncio.all_tasks() if task is not curr_task ] @staticmethod def frame_to_string(frame): '''Pretty-prints a frame as a function/file name and a line number. Additionally used as a key for tallying lines.''' co = frame.f_code func_name = co.co_name line_no = frame.f_lineno filename = co.co_filename return filename + ':' + str(line_no) + '\t' + func_name @staticmethod def should_trace(filename): '''Returns FALSE if filename is uninteresting to the user.''' # FIXME Assume GuixSD. Makes filtering easy if '/gnu/store' in filename: return False if 'site-packages' in filename: return False if filename[0] == '<': return False if 'aergia' in filename: return False return True @staticmethod def is_event_loop_running() -> bool: '''Returns TRUE if there is an exent loop running. This is what `asyncio.get_event_loop()' did, before it was deprecated in 3.12''' return asyncio.get_event_loop_policy()._local._loop is not None @staticmethod def sort_samples(sample_dict): '''Returns SAMPLE_DICT in descending order by number of samples.''' return {k: v for k, v in sorted(sample_dict.items(), key=lambda item: Aergia.sum_sample( item[1]), reverse=True)} @staticmethod def sum_sample(sample): return sample[0] + sample[1] @staticmethod def gettime(): '''returns the wallclock time''' return time.time() @staticmethod def get_sampling_interval(): '''returns the current sampling interval''' return signal.getitimer(signal.ITIMER_PROF)[-1] the_globals = { '__name__': '__main__', '__doc__': None, '__package__': None, '__loader__': globals()['__loader__'], '__spec__': None, '__annotations__': {}, '__builtins__': globals()['__builtins__'], '__file__': None, '__cached__': None, } def parse_arguments(): '''Parse CLI args''' parser = argparse.ArgumentParser( usage='%(prog)s [args] script [args]' ) parser.add_argument('-i', '--interval', help='The minimum amount of time inbetween \ samples in seconds.', metavar='', default=0.01) parser.add_argument('script', help='A python script to run.') parser.add_argument('s_args', nargs=argparse.REMAINDER, help='python script args') return parser.parse_args() def main(): args = parse_arguments() sys.argv = [args.script] + args.s_args try: with open(args.script, 'rb') as fp: code = compile(fp.read(), args.script, "exec") Aergia(args.interval).start() exec(code, the_globals) except Exception: traceback.print_exc() if __name__ == "__main__": main()