''' _/_/ _/ _/ _/ _/_/ _/ _/_/ _/_/_/ _/_/_/ _/_/_/_/ _/_/_/_/ _/_/ _/ _/ _/ _/ _/ _/ _/ _/ _/ _/ _/ _/ _/ _/ _/ _/ _/_/_/ _/ _/_/_/ _/ _/_/_/ _/ _/_/ Copyright: This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program. If not, see . Commentary: Aergia is a sampling based profiler based off of SCALENE (https://github.com/plasma-umass/scalene). It is not particularly informative, but unlike SCALENE or other sampling-based profilers I could find, reports the wall-time each asyncio await call spends idling. The goal behind Aergia is to eventually have these features, or similar, merged into SCALENE. Code: ''' from collections import defaultdict from types import FrameType from typing import cast, List, Tuple import argparse import asyncio import selectors import signal import sys import threading import time import traceback class ReplacementEpollSelector(selectors.EpollSelector): ''' Provides a replacement for selectors.PollSelector that periodically wakes up to accept signals. ''' def select( self, timeout=None ) -> List[Tuple[selectors.SelectorKey, int]]: start_time = time.perf_counter() if not timeout or timeout < 0: interval = sys.getswitchinterval() else: interval = min(timeout, sys.getswitchinterval()) while True: selected = super().select(interval) if selected or timeout == 0 or not timeout: return selected end_time = time.perf_counter() if end_time - start_time >= timeout: return [] selectors.DefaultSelector = ReplacementEpollSelector class Aergia(object): # a key-value pair where keys represent frame metadata (see # Aergia.frame_to_string) and values represent number of times # sampled. cpu_samples = defaultdict(lambda: 0) cpu_samples_c = defaultdict(lambda: 0) # number of times samples have been collected total_cpu_samples = 0 # the time, in seconds, between samples signal_interval = 0.01 # the timestamp recorded last signal last_signal_time = 0.0 # if we should try to profile asynchronous code. Used to observe # effectiveness of the implementation. profile_async = True def __init__(self): signal.signal(signal.SIGPROF, self.cpu_signal_handler) signal.setitimer(signal.ITIMER_PROF, self.signal_interval, self.signal_interval) Aergia.last_signal_time = Aergia.gettime() @staticmethod def gettime(): '''get the wallclock time''' return time.perf_counter() @staticmethod def start(profile_async): Aergia.profile_async = profile_async @staticmethod def stop(): '''Turn off profiling signals''' Aergia.disable_signals() Aergia.exit_handler() @staticmethod def exit_handler(): '''Pretty-print profiling information.''' # If we've collected any samples, dump them. print("CPU usage (Python):") if Aergia.total_cpu_samples > 0: for key in Aergia.sort_samples(Aergia.cpu_samples): print(f"{key} : " f"{Aergia.cpu_samples[key] * 100 / Aergia.total_cpu_samples:.3f} % " f"({Aergia.cpu_samples[key]:.1f} total samples)") print("CPU usage (Native):") for key in Aergia.sort_samples(Aergia.cpu_samples_c): print(f"{key} : " f"{Aergia.cpu_samples_c[key] * 100 / Aergia.total_cpu_samples:.3f} % " f"({Aergia.cpu_samples_c[key]:.1f} total samples)") else: print("(did not run long enough to profile)") @staticmethod def disable_signals(): signal.signal(signal.ITIMER_PROF, signal.SIG_IGN) signal.signal(signal.SIGVTALRM, signal.SIG_IGN) signal.setitimer(signal.ITIMER_PROF, 0) @staticmethod def cpu_signal_handler(sig, frame): elapsed_since_last_signal = Aergia.gettime() - \ Aergia.last_signal_time c_time_norm = (elapsed_since_last_signal - Aergia.signal_interval) / \ Aergia.signal_interval keys = Aergia.compute_frames_to_record(frame) for key in keys: Aergia.cpu_samples[Aergia.frame_to_string(key)] += 1 Aergia.cpu_samples_c[Aergia.frame_to_string( key)] += c_time_norm Aergia.total_cpu_samples += elapsed_since_last_signal / \ Aergia.signal_interval Aergia.last_signal_time = Aergia.gettime() @staticmethod def compute_frames_to_record(this_frame): '''Collects all stack frames that Aergia actually processes.''' frames = [this_frame] frames += [sys._current_frames().get(t.ident, None) for t in threading.enumerate()] frames += Aergia.get_async_frames() frames = Aergia.filter_duplicated_frames(frames) # Process all the frames to remove ones we aren't going to track. new_frames = [] for frame in frames: if frame is None: continue fname = frame.f_code.co_filename # Record samples only for files we care about. if (len(fname)) == 0: # 'eval/compile' gives no f_code.co_filename. We have # to look back into the outer frame in order to check # the co_filename. fname = frame.f_back.f_code.co_filename while not Aergia.should_trace(fname): # Walk the stack backwards until we hit a frame that # IS one we should trace (if there is one). i.e., if # it's in the code being profiled, and it is just # calling stuff deep in libraries. if frame: frame = cast(FrameType, frame.f_back) else: break if frame: fname = frame.f_code.co_filename if frame: new_frames.append(frame) return new_frames @staticmethod def frame_to_string(frame): '''Pretty-prints a frame as a function/file name and a line number. Additionally used a key for tallying lines.''' co = frame.f_code func_name = co.co_name line_no = frame.f_lineno filename = co.co_filename return filename + '\t' + func_name + '\t' + str(line_no) @staticmethod def get_async_frames(): '''Obtains the stack frames of all currently executing tasks.''' if Aergia.is_event_loop_running() and Aergia.profile_async: return [task.get_coro().cr_frame for task in asyncio.all_tasks()] return [] @staticmethod def should_trace(filename): '''Returns FALSE if filename is uninteresting to the user.''' # FIXME Assume GuixSD. Makes filtering easy if '/gnu/store' in filename: return False if filename[0] == '<': return False if 'aergia.py' in filename: return False return True @staticmethod def is_event_loop_running() -> bool: '''Returns TRUE if there is an exent loop running. This is what `asyncio.get_event_loop()' did, before it was deprecated in 3.12''' return asyncio.get_event_loop_policy()._local._loop is not None @staticmethod def sort_samples(sample_dict): '''Returns SAMPLE_DICT in descending order by number of samples.''' return {k: v for k, v in sorted(sample_dict.items(), key=lambda item: item[1], reverse=True)} @staticmethod def filter_duplicated_frames(frames) -> bool: s = set() dup = [] for f in frames: if f in s: dup.append(f) else: s.add(f) # TODO we probably have one because given get_async_frames returns the # currently executing task. Would be an easy fix in that method. # if there's more than one, I cannot explain it. assert len( dup) < 2, f"ERROR: More than 1 duplicate frame (shouldn't happen): {dup}" if len(dup) != 0: print(f"WARN: Duplicate frame found: {dup}", file=sys.stderr) return list(s) the_globals = { '__name__': '__main__', '__doc__': None, '__package__': None, '__loader__': globals()['__loader__'], '__spec__': None, '__annotations__': {}, '__builtins__': globals()['__builtins__'], '__file__': None, '__cached__': None, } def parse_arguments(): '''Parse CLI args''' parser = argparse.ArgumentParser( usage='%(prog)s [args] script [args]' ) parser.add_argument('-a', '--async_off', action='store_false', help='Turn off experimental async profiling.', default=True) parser.add_argument('-i', '--interval', help='The minimum amount of time inbetween \ samples in seconds.', metavar='', default=0.01) parser.add_argument('script', help='A python script to run.') parser.add_argument('s_args', nargs=argparse.REMAINDER, help='python script args') return parser.parse_args() def main(): args = parse_arguments() sys.argv = [args.script] + args.s_args try: with open(args.script, 'rb') as fp: code = compile(fp.read(), args.script, "exec") Aergia().start(args.async_off) exec(code, the_globals) Aergia().stop() except Exception: traceback.print_exc() if __name__ == "__main__": main() # aergia.py ends here