#!/usr/bin/env python3 ''' _/_/ _/ _/ _/ _/_/ _/ _/_/ _/_/_/ _/_/_/ _/_/_/_/ _/_/_/_/ _/_/ _/ _/ _/ _/ _/ _/ _/ _/ _/ _/ _/ _/ _/ _/ _/ _/ _/_/_/ _/ _/_/_/ _/ _/_/_/ _/ _/_/ Copyright: This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program. If not, see . Commentary: Aergia is a sampling based profiler based off of SCALENE by Emery Berger (https://github.com/plasma-umass/scalene). It is not particularly informative, but unlike SCALENE or other sampling-based profilers I could find, reports the wall-time each asyncio await call spends idling. The goal behind Aergia is to eventually have these features, or similar, merged into SCALENE. Code: ''' from collections import defaultdict from types import FrameType from typing import cast, List, Optional, Tuple import argparse import asyncio import selectors import signal import sys import time import threading import traceback orig_thread_join = threading.Thread.join def thread_join_replacement( self: threading.Thread, timeout: Optional[float] = None ) -> None: ''' We replace threading.Thread.join with this method which always periodically yields. ''' start_time = time.perf_counter() interval = sys.getswitchinterval() while self.is_alive(): orig_thread_join(self, interval) # If a timeout was specified, check to see if it's expired. if timeout is not None: end_time = time.perf_counter() if end_time - start_time >= timeout: return None return None threading.Thread.join = thread_join_replacement class ReplacementEpollSelector(selectors.EpollSelector): ''' Provides a replacement for selectors.PollSelector that periodically wakes up to accept signals. ''' def select( self, timeout=None ) -> List[Tuple[selectors.SelectorKey, int]]: start_time = time.perf_counter() if not timeout or timeout < 0: interval = sys.getswitchinterval() else: interval = min(timeout, sys.getswitchinterval()) while True: selected = super().select(interval) if selected or timeout == 0 or not timeout: return selected end_time = time.perf_counter() if end_time - start_time >= timeout: return [] selectors.DefaultSelector = ReplacementEpollSelector class Aergia(object): # a key-value pair where keys represent frame metadata (see # Aergia.frame_to_string) and values represent number of times # sampled. samples = defaultdict(lambda: [0, 0]) # number of times samples have been collected total_samples = 0 # the timestamp recorded last signal last_signal_time = 0.0 signal_interval = 0 def __init__(self, signal_interval): Aergia.signal_interval = signal_interval @staticmethod def start(): signal.signal(signal.SIGPROF, Aergia.cpu_signal_handler) signal.setitimer(signal.ITIMER_PROF, Aergia.signal_interval, Aergia.signal_interval) Aergia.last_signal_time = Aergia.gettime() def stop(): Aergia.disable_signals() Aergia.print_samples() @staticmethod def print_samples(): '''Pretty-print profiling results.''' if Aergia.total_samples > 0: print("FILE\tFUNC\tPERC\tACTUAL+CALCULATED=SECONDS") for key in Aergia.sort_samples(Aergia.samples): Aergia.print_sample(key) else: print("No samples were gathered. If you are using concurrency, " "this is likely a bug and you may run the profiler again.") @staticmethod def print_sample(key): '''Pretty-print a single sample.''' sig_intv = Aergia.signal_interval value = Aergia.samples[key] tot = Aergia.sum_sample(value) print(f"{key} :\t{tot * 100 / Aergia.total_samples:.3f}%" f"\t({value[0]:.3f} + {value[1]:.3f} =" f" {tot*sig_intv:.6f} seconds)") @staticmethod def disable_signals(): signal.signal(signal.ITIMER_PROF, signal.SIG_IGN) signal.setitimer(signal.ITIMER_PROF, 0) @staticmethod def cpu_signal_handler(sig, frame): sig_intv = Aergia.signal_interval elapsed_since_last_signal = Aergia.gettime() - \ Aergia.last_signal_time c_time_norm = (elapsed_since_last_signal - sig_intv) / \ sig_intv keys = Aergia.compute_frames_to_record() for key in keys: Aergia.samples[Aergia.frame_to_string(key)][0] += 1 Aergia.samples[Aergia.frame_to_string(key)][1] += c_time_norm Aergia.total_samples += elapsed_since_last_signal / \ sig_intv Aergia.last_signal_time = Aergia.gettime() @staticmethod def compute_frames_to_record(): '''Collects all stack frames that Aergia actually processes.''' frames = Aergia.get_frames_from_runners(Aergia.get_event_loops()) # Process all the frames to remove ones we aren't going to track. new_frames = [] for frame in frames: if frame is None: continue fname = frame.f_code.co_filename # Record samples only for files we care about. if (len(fname)) == 0: # 'eval/compile' gives no f_code.co_filename. We have # to look back into the outer frame in order to check # the co_filename. fname = frame.f_back.f_code.co_filename while not Aergia.should_trace(fname): # Walk the stack backwards until we hit a frame that # IS one we should trace (if there is one). i.e., if # it's in the code being profiled, and it is just # calling stuff deep in libraries. if frame: frame = cast(FrameType, frame.f_back) else: break if frame: fname = frame.f_code.co_filename if frame: new_frames.append(frame) return new_frames @staticmethod def get_event_loops(): runners = [] for t in threading.enumerate(): frame = sys._current_frames().get(t.ident) if not frame: continue runner = Aergia.walk_back_until_runner(frame) if runner: runners.append(runner) return runners @staticmethod def walk_back_until_runner(frame): while frame: r = Aergia.find_runner_in_locals(frame.f_locals) if r: return r frame = frame.f_back return None @staticmethod def find_runner_in_locals(locals_dict): '''Given a dictionary of local variables for a stack frame, retrieves the asyncio runner object, if there is one.''' for val in locals_dict.values(): if type(val).__name__ == 'Runner' and \ val.__class__.__module__ == 'asyncio.runners': return val return None @staticmethod def get_frames_from_runners(runners): '''Given RUNNERS, returns a flat list of tasks.''' return [ task for runner in runners for task in Aergia.get_idle_task_frames(runner._loop) ] @staticmethod def frame_to_string(frame): '''Pretty-prints a frame as a function/file name and a line number. Additionally used as a key for tallying lines.''' co = frame.f_code func_name = co.co_name line_no = frame.f_lineno filename = co.co_filename return filename + ':' + str(line_no) + '\t' + func_name @staticmethod def should_trace(filename): '''Returns FALSE if filename is uninteresting to the user.''' # FIXME Assume GuixSD. Makes filtering easy if '/gnu/store' in filename: return False if 'site-packages' in filename: return False if filename[0] == '<': return False if 'aergia' in filename: return False return True @staticmethod def sort_samples(sample_dict): '''Returns SAMPLE_DICT in descending order by number of samples.''' return {k: v for k, v in sorted(sample_dict.items(), key=lambda item: Aergia.sum_sample( item[1]), reverse=True)} @staticmethod def get_idle_task_frames(loop): '''Given an asyncio event loop, returns the list of idle task frames. A task is considered 'idle' if it is not currently executing.''' curr_task = asyncio.current_task(loop) return [ task.get_coro().cr_frame for task in asyncio.all_tasks(loop) if task is not curr_task ] @staticmethod def sum_sample(sample): return sample[0] + sample[1] @staticmethod def gettime(): '''returns the wallclock time''' return time.time() the_globals = { '__name__': '__main__', '__doc__': None, '__package__': None, '__loader__': globals()['__loader__'], '__spec__': None, '__annotations__': {}, '__builtins__': globals()['__builtins__'], '__file__': None, '__cached__': None, } def parse_arguments(): '''Parse CLI args''' parser = argparse.ArgumentParser( usage='%(prog)s [args] script [args]' ) parser.add_argument('-i', '--interval', help='The minimum amount of time inbetween \ samples in seconds.', metavar='', default=0.01) parser.add_argument('script', help='A python script to run.') parser.add_argument('s_args', nargs=argparse.REMAINDER, help='python script args') return parser.parse_args() def main(): args = parse_arguments() sys.argv = [args.script] + args.s_args try: with open(args.script, 'rb') as fp: code = compile(fp.read(), args.script, "exec") Aergia(args.interval).start() exec(code, the_globals) Aergia.stop() except Exception: traceback.print_exc() if __name__ == "__main__": main()