#!/usr/bin/env python3 ''' _/_/ _/ _/ _/ _/_/ _/ _/_/ _/_/_/ _/_/_/ _/_/_/_/ _/_/_/_/ _/_/ _/ _/ _/ _/ _/ _/ _/ _/ _/ _/ _/ _/ _/ _/ _/ _/ _/_/_/ _/ _/_/_/ _/ _/_/_/ _/ _/_/ Copyright: This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program. If not, see . Commentary: Aergia is a sampling based profiler based off of SCALENE by Emery Berger (https://github.com/plasma-umass/scalene). It is not particularly informative, but unlike SCALENE or other sampling-based profilers I could find, reports the wall-time each asyncio await call spends idling. The goal behind Aergia is to eventually have these features, or similar, merged into SCALENE. Code: ''' from collections import defaultdict from types import FrameType from typing import cast, List, Tuple import argparse import asyncio import selectors import signal import sys import time import traceback class ReplacementEpollSelector(selectors.EpollSelector): ''' Provides a replacement for selectors.PollSelector that periodically wakes up to accept signals. ''' def select( self, timeout=None ) -> List[Tuple[selectors.SelectorKey, int]]: start_time = time.perf_counter() if not timeout or timeout < 0: interval = sys.getswitchinterval() else: interval = min(timeout, sys.getswitchinterval()) while True: selected = super().select(interval) if selected or timeout == 0 or not timeout: return selected end_time = time.perf_counter() if end_time - start_time >= timeout: return [] selectors.DefaultSelector = ReplacementEpollSelector class Aergia(object): # a key-value pair where keys represent frame metadata (see # Aergia.frame_to_string) and values represent number of times # sampled. samples = defaultdict(lambda: [0, 0]) # number of times samples have been collected total_samples = 0 # the timestamp recorded last signal last_signal_time = 0.0 signal_interval = 0 def __init__(self, signal_interval): Aergia.signal_interval = signal_interval @staticmethod def start(): signal.signal(signal.SIGPROF, Aergia.cpu_signal_handler) signal.setitimer(signal.ITIMER_PROF, Aergia.signal_interval, Aergia.signal_interval) Aergia.last_signal_time = Aergia.gettime() def stop(): Aergia.disable_signals() Aergia.print_samples() @staticmethod def print_samples(): '''Pretty-print profiling results.''' if Aergia.total_samples > 0: print("FILE\tFUNC\tPERC\tACTUAL+CALCULATED=SECONDS") for key in Aergia.sort_samples(Aergia.samples): Aergia.print_sample(key) else: print("No samples were gathered. If you are using concurrency, " "this is likely a bug and you may run the profiler again.") @staticmethod def print_sample(key): '''Pretty-print a single sample.''' sig_intv = Aergia.signal_interval value = Aergia.samples[key] tot = Aergia.sum_sample(value) print(f"{key} :\t{tot * 100 / Aergia.total_samples:.3f}%" f"\t({value[0]:.3f} + {value[1]:.3f} =" f" {tot*sig_intv:.6f} seconds)") @staticmethod def disable_signals(): signal.signal(signal.ITIMER_PROF, signal.SIG_IGN) signal.setitimer(signal.ITIMER_PROF, 0) @staticmethod def cpu_signal_handler(sig, frame): sig_intv = Aergia.signal_interval elapsed_since_last_signal = Aergia.gettime() - \ Aergia.last_signal_time c_time_norm = (elapsed_since_last_signal - sig_intv) / \ sig_intv keys = Aergia.compute_frames_to_record() for key in keys: Aergia.samples[Aergia.frame_to_string(key)][0] += 1 Aergia.samples[Aergia.frame_to_string(key)][1] += c_time_norm Aergia.total_samples += elapsed_since_last_signal / \ sig_intv Aergia.last_signal_time = Aergia.gettime() @staticmethod def compute_frames_to_record(): '''Collects all stack frames that Aergia actually processes.''' frames = [] if Aergia.is_event_loop_running(): frames = Aergia.get_current_idle_tasks() # Process all the frames to remove ones we aren't going to track. new_frames = [] for frame in frames: if frame is None: continue fname = frame.f_code.co_filename # Record samples only for files we care about. if (len(fname)) == 0: # 'eval/compile' gives no f_code.co_filename. We have # to look back into the outer frame in order to check # the co_filename. fname = frame.f_back.f_code.co_filename while not Aergia.should_trace(fname): # Walk the stack backwards until we hit a frame that # IS one we should trace (if there is one). i.e., if # it's in the code being profiled, and it is just # calling stuff deep in libraries. if frame: frame = cast(FrameType, frame.f_back) else: break if frame: fname = frame.f_code.co_filename if frame: new_frames.append(frame) return new_frames @staticmethod def get_current_idle_tasks(): '''Obtains the stack of frames of all currently idle tasks.''' curr_task = asyncio.current_task() return [ task.get_coro().cr_frame for task in asyncio.all_tasks() if task is not curr_task ] @staticmethod def frame_to_string(frame): '''Pretty-prints a frame as a function/file name and a line number. Additionally used as a key for tallying lines.''' co = frame.f_code func_name = co.co_name line_no = frame.f_lineno filename = co.co_filename return filename + ':' + str(line_no) + '\t' + func_name @staticmethod def should_trace(filename): '''Returns FALSE if filename is uninteresting to the user.''' # FIXME Assume GuixSD. Makes filtering easy if '/gnu/store' in filename: return False if 'site-packages' in filename: return False if filename[0] == '<': return False if 'aergia' in filename: return False return True @staticmethod def is_event_loop_running() -> bool: '''Returns TRUE if there is an exent loop running. This is what `asyncio.get_event_loop()' did, before it was deprecated in 3.12''' return asyncio.get_event_loop_policy()._local._loop is not None @staticmethod def sort_samples(sample_dict): '''Returns SAMPLE_DICT in descending order by number of samples.''' return {k: v for k, v in sorted(sample_dict.items(), key=lambda item: Aergia.sum_sample( item[1]), reverse=True)} @staticmethod def sum_sample(sample): return sample[0] + sample[1] @staticmethod def gettime(): '''returns the wallclock time''' return time.time() the_globals = { '__name__': '__main__', '__doc__': None, '__package__': None, '__loader__': globals()['__loader__'], '__spec__': None, '__annotations__': {}, '__builtins__': globals()['__builtins__'], '__file__': None, '__cached__': None, } def parse_arguments(): '''Parse CLI args''' parser = argparse.ArgumentParser( usage='%(prog)s [args] script [args]' ) parser.add_argument('-i', '--interval', help='The minimum amount of time inbetween \ samples in seconds.', metavar='', default=0.01) parser.add_argument('script', help='A python script to run.') parser.add_argument('s_args', nargs=argparse.REMAINDER, help='python script args') return parser.parse_args() def main(): args = parse_arguments() sys.argv = [args.script] + args.s_args try: with open(args.script, 'rb') as fp: code = compile(fp.read(), args.script, "exec") Aergia(args.interval).start() exec(code, the_globals) Aergia.stop() except Exception: traceback.print_exc() if __name__ == "__main__": main()