diff options
Diffstat (limited to 'aergia')
| -rwxr-xr-x | aergia | 300 |
1 files changed, 300 insertions, 0 deletions
@@ -0,0 +1,300 @@ +#!/usr/bin/env python3 +''' + _/_/ _/ + _/ _/ _/_/ _/ _/_/ _/_/_/ _/_/_/ + _/_/_/_/ _/_/_/_/ _/_/ _/ _/ _/ _/ _/ + _/ _/ _/ _/ _/ _/ _/ _/ _/ + _/ _/ _/_/_/ _/ _/_/_/ _/ _/_/_/ + _/ + _/_/ +Copyright: + + This program is free software: you can redistribute it + and/or modify it under the terms of the GNU General + Public License as published by the Free Software + Foundation, either version 3 of the License, or (at your + option) any later version. + + This program is distributed in the hope that it will be + useful, but WITHOUT ANY WARRANTY; without even the implied + warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR + PURPOSE. See the GNU General Public License for more details. + + You should have received a copy of the GNU General Public + License along with this program. If not, see + <https://www.gnu.org/licenses/>. + + +Commentary: + + Aergia is a sampling based profiler based off of SCALENE + by Emery Berger (https://github.com/plasma-umass/scalene). + + It is not particularly informative, but unlike SCALENE + or other sampling-based profilers I could find, reports + the wall-time each asyncio await call spends idling. + + The goal behind Aergia is to eventually have these features, + or similar, merged into SCALENE. + + +Code: +''' + +from collections import defaultdict +from types import FrameType +from typing import cast, List, Tuple +import argparse +import asyncio +import selectors +import signal +import sys +import threading +import time +import traceback + + +class ReplacementEpollSelector(selectors.EpollSelector): + ''' + Provides a replacement for selectors.PollSelector that + periodically wakes up to accept signals. + ''' + + def select( + self, timeout=None + ) -> List[Tuple[selectors.SelectorKey, int]]: + start_time = time.perf_counter() + if not timeout or timeout < 0: + interval = sys.getswitchinterval() + else: + interval = min(timeout, sys.getswitchinterval()) + while True: + selected = super().select(interval) + if selected or timeout == 0 or not timeout: + return selected + end_time = time.perf_counter() + if end_time - start_time >= timeout: + return [] + + +selectors.DefaultSelector = ReplacementEpollSelector + + +class Aergia(object): + + # a key-value pair where keys represent frame metadata (see + # Aergia.frame_to_string) and values represent number of times + # sampled. + cpu_samples = defaultdict(lambda: 0) + cpu_samples_c = defaultdict(lambda: 0) + # number of times samples have been collected + total_cpu_samples = 0 + + # the time, in seconds, between samples + signal_interval = 0.01 + # the timestamp recorded last signal + last_signal_time = 0.0 + + + def __init__(self): + signal.signal(signal.SIGPROF, + self.cpu_signal_handler) + signal.setitimer(signal.ITIMER_PROF, + self.signal_interval, + self.signal_interval) + + @staticmethod + def gettime(): + '''get the wallclock time''' + return time.time() + + @staticmethod + def start(profile_async): + Aergia.last_signal_time = Aergia.gettime() + + @staticmethod + def stop(): + '''Turn off profiling signals''' + Aergia.disable_signals() + Aergia.exit_handler() + + @staticmethod + def exit_handler(): + '''Pretty-print profiling information.''' + # If we've collected any samples, dump them. + print("CPU usage (Python):") + if Aergia.total_cpu_samples > 0: + for key in Aergia.sort_samples(Aergia.cpu_samples): + print(f"{key} : " + f"{Aergia.cpu_samples[key] * 100 / Aergia.total_cpu_samples:.3f} % " + f"({Aergia.cpu_samples[key]:.1f} total samples)") + print("CPU usage (Native):") + for key in Aergia.sort_samples(Aergia.cpu_samples_c): + print(f"{key} : " + f"{Aergia.cpu_samples_c[key] * 100 / Aergia.total_cpu_samples:.3f} % " + f"({Aergia.cpu_samples_c[key]:.1f} total samples)") + else: + print("(Bug) The program did not run long enough to profile, or a one-time error occured.") + + @staticmethod + def disable_signals(): + signal.signal(signal.ITIMER_PROF, signal.SIG_IGN) + signal.signal(signal.SIGVTALRM, signal.SIG_IGN) + signal.setitimer(signal.ITIMER_PROF, 0) + + @staticmethod + def cpu_signal_handler(sig, frame): + elapsed_since_last_signal = Aergia.gettime() - \ + Aergia.last_signal_time + c_time_norm = (elapsed_since_last_signal - + Aergia.signal_interval) / \ + Aergia.signal_interval + + keys = Aergia.compute_frames_to_record() + for key in keys: + Aergia.cpu_samples[Aergia.frame_to_string(key)] += 1 + Aergia.cpu_samples_c[Aergia.frame_to_string( + key)] += c_time_norm + Aergia.total_cpu_samples += elapsed_since_last_signal / \ + Aergia.signal_interval + Aergia.last_signal_time = Aergia.gettime() + + @staticmethod + def compute_frames_to_record(): + '''Collects all stack frames that Aergia actually processes.''' + if Aergia.is_event_loop_running(): + frames = [task.get_coro().cr_frame for task in asyncio.all_tasks()] + + # Process all the frames to remove ones we aren't going to track. + new_frames = [] + for frame in frames: + if frame is None: + continue + fname = frame.f_code.co_filename + # Record samples only for files we care about. + if (len(fname)) == 0: + # 'eval/compile' gives no f_code.co_filename. We have + # to look back into the outer frame in order to check + # the co_filename. + fname = frame.f_back.f_code.co_filename + while not Aergia.should_trace(fname): + # Walk the stack backwards until we hit a frame that + # IS one we should trace (if there is one). i.e., if + # it's in the code being profiled, and it is just + # calling stuff deep in libraries. + if frame: + frame = cast(FrameType, frame.f_back) + else: + break + if frame: + fname = frame.f_code.co_filename + if frame: + new_frames.append(frame) + return new_frames + + @staticmethod + def frame_to_string(frame): + '''Pretty-prints a frame as a function/file name and a line number. + Additionally used a key for tallying lines.''' + co = frame.f_code + func_name = co.co_name + line_no = frame.f_lineno + filename = co.co_filename + return filename + '\t' + func_name + '\t' + str(line_no) + + @staticmethod + def should_trace(filename): + '''Returns FALSE if filename is uninteresting to the user.''' + # FIXME Assume GuixSD. Makes filtering easy + if 'site-packages' in filename: + return False + if filename[0] == '<': + return False + if 'aergia' in filename: + return False + return True + + @staticmethod + def is_event_loop_running() -> bool: + '''Returns TRUE if there is an exent loop running. This is what + `asyncio.get_event_loop()' did, before it was deprecated in 3.12''' + return asyncio.get_event_loop_policy()._local._loop is not None + + @staticmethod + def sort_samples(sample_dict): + '''Returns SAMPLE_DICT in descending order by number of samples.''' + return {k: v for k, v in sorted(sample_dict.items(), + key=lambda item: item[1], + reverse=True)} + + @staticmethod + def filter_duplicated_frames(frames) -> bool: + s = set() + dup = [] + for f in frames: + if f in s: + dup.append(f) + else: + s.add(f) + # TODO we probably have one because given get_async_frames returns the + # currently executing task. Would be an easy fix in that method. + # if there's more than one, I cannot explain it. + assert len( + dup) < 2, f"ERROR: More than 1 duplicate frame (shouldn't happen): {dup}" + if len(dup) != 0: + print(f"WARN: Duplicate frame found: {dup}", file=sys.stderr) + return list(s) + + +the_globals = { + '__name__': '__main__', + '__doc__': None, + '__package__': None, + '__loader__': globals()['__loader__'], + '__spec__': None, + '__annotations__': {}, + '__builtins__': globals()['__builtins__'], + '__file__': None, + '__cached__': None, +} + + +def parse_arguments(): + '''Parse CLI args''' + parser = argparse.ArgumentParser( + usage='%(prog)s [args] script [args]' + ) + + parser.add_argument('-a', '--async_off', + action='store_false', + help='Turn off experimental async profiling.', + default=True) + + parser.add_argument('-i', '--interval', + help='The minimum amount of time inbetween \ + samples in seconds.', + metavar='', + default=0.01) + parser.add_argument('script', help='A python script to run.') + parser.add_argument('s_args', nargs=argparse.REMAINDER, + help='python script args') + + return parser.parse_args() + + +def main(): + args = parse_arguments() + + sys.argv = [args.script] + args.s_args + try: + with open(args.script, 'rb') as fp: + code = compile(fp.read(), args.script, "exec") + Aergia().start(args.async_off) + exec(code, the_globals) + Aergia().stop() + except Exception: + traceback.print_exc() + + +if __name__ == "__main__": + main() |
