diff options
Diffstat (limited to 'aergia.py')
| -rw-r--r-- | aergia.py | 315 |
1 files changed, 0 insertions, 315 deletions
diff --git a/aergia.py b/aergia.py deleted file mode 100644 index 64567a2..0000000 --- a/aergia.py +++ /dev/null @@ -1,315 +0,0 @@ -''' - _/_/ _/ - _/ _/ _/_/ _/ _/_/ _/_/_/ _/_/_/ - _/_/_/_/ _/_/_/_/ _/_/ _/ _/ _/ _/ _/ - _/ _/ _/ _/ _/ _/ _/ _/ _/ - _/ _/ _/_/_/ _/ _/_/_/ _/ _/_/_/ - _/ - _/_/ -Copyright: - - This program is free software: you can redistribute it - and/or modify it under the terms of the GNU General - Public License as published by the Free Software - Foundation, either version 3 of the License, or (at your - option) any later version. - - This program is distributed in the hope that it will be - useful, but WITHOUT ANY WARRANTY; without even the implied - warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR - PURPOSE. See the GNU General Public License for more details. - - You should have received a copy of the GNU General Public - License along with this program. If not, see - <https://www.gnu.org/licenses/>. - - -Commentary: - - Aergia is a sampling based profiler based off of SCALENE - (https://github.com/plasma-umass/scalene). - - It is not particularly informative, but unlike SCALENE - or other sampling-based profilers I could find, reports - the wall-time each asyncio await call spends idling. - - The goal behind Aergia is to eventually have these - features, or similar, merged into SCALENE. - - -Code: -''' - -from collections import defaultdict -from types import FrameType -from typing import cast, List, Tuple -import argparse -import asyncio -import selectors -import signal -import sys -import threading -import time -import traceback - - -class ReplacementEpollSelector(selectors.EpollSelector): - ''' - Provides a replacement for selectors.PollSelector that - periodically wakes up to accept signals. - ''' - - def select( - self, timeout=None - ) -> List[Tuple[selectors.SelectorKey, int]]: - start_time = time.perf_counter() - if not timeout or timeout < 0: - interval = sys.getswitchinterval() - else: - interval = min(timeout, sys.getswitchinterval()) - while True: - selected = super().select(interval) - if selected or timeout == 0 or not timeout: - return selected - end_time = time.perf_counter() - if end_time - start_time >= timeout: - return [] - - -selectors.DefaultSelector = ReplacementEpollSelector - - -class Aergia(object): - - # a key-value pair where keys represent frame metadata (see - # Aergia.frame_to_string) and values represent number of times - # sampled. - cpu_samples = defaultdict(lambda: 0) - cpu_samples_c = defaultdict(lambda: 0) - # number of times samples have been collected - total_cpu_samples = 0 - - # the time, in seconds, between samples - signal_interval = 0.01 - # the timestamp recorded last signal - last_signal_time = 0.0 - - # if we should try to profile asynchronous code. Used to observe - # effectiveness of the implementation. - profile_async = True - - def __init__(self): - signal.signal(signal.SIGPROF, - self.cpu_signal_handler) - signal.setitimer(signal.ITIMER_PROF, - self.signal_interval, - self.signal_interval) - Aergia.last_signal_time = Aergia.gettime() - - @staticmethod - def gettime(): - '''get the wallclock time''' - return time.perf_counter() - - @staticmethod - def start(profile_async): - Aergia.profile_async = profile_async - - @staticmethod - def stop(): - '''Turn off profiling signals''' - Aergia.disable_signals() - Aergia.exit_handler() - - @staticmethod - def exit_handler(): - '''Pretty-print profiling information.''' - # If we've collected any samples, dump them. - print("CPU usage (Python):") - if Aergia.total_cpu_samples > 0: - for key in Aergia.sort_samples(Aergia.cpu_samples): - print(f"{key} : " - f"{Aergia.cpu_samples[key] * 100 / Aergia.total_cpu_samples:.3f} % " - f"({Aergia.cpu_samples[key]:.1f} total samples)") - print("CPU usage (Native):") - for key in Aergia.sort_samples(Aergia.cpu_samples_c): - print(f"{key} : " - f"{Aergia.cpu_samples_c[key] * 100 / Aergia.total_cpu_samples:.3f} % " - f"({Aergia.cpu_samples_c[key]:.1f} total samples)") - else: - print("(did not run long enough to profile)") - - @staticmethod - def disable_signals(): - signal.signal(signal.ITIMER_PROF, signal.SIG_IGN) - signal.signal(signal.SIGVTALRM, signal.SIG_IGN) - signal.setitimer(signal.ITIMER_PROF, 0) - - @staticmethod - def cpu_signal_handler(sig, frame): - elapsed_since_last_signal = Aergia.gettime() - \ - Aergia.last_signal_time - c_time_norm = (elapsed_since_last_signal - - Aergia.signal_interval) / \ - Aergia.signal_interval - - keys = Aergia.compute_frames_to_record(frame) - for key in keys: - Aergia.cpu_samples[Aergia.frame_to_string(key)] += 1 - Aergia.cpu_samples_c[Aergia.frame_to_string( - key)] += c_time_norm - Aergia.total_cpu_samples += elapsed_since_last_signal / \ - Aergia.signal_interval - Aergia.last_signal_time = Aergia.gettime() - - @staticmethod - def compute_frames_to_record(this_frame): - '''Collects all stack frames that Aergia actually processes.''' - frames = [this_frame] - frames += [sys._current_frames().get(t.ident, None) - for t in threading.enumerate()] - frames += Aergia.get_async_frames() - - frames = Aergia.filter_duplicated_frames(frames) - # Process all the frames to remove ones we aren't going to track. - new_frames = [] - for frame in frames: - if frame is None: - continue - fname = frame.f_code.co_filename - # Record samples only for files we care about. - if (len(fname)) == 0: - # 'eval/compile' gives no f_code.co_filename. We have - # to look back into the outer frame in order to check - # the co_filename. - fname = frame.f_back.f_code.co_filename - while not Aergia.should_trace(fname): - # Walk the stack backwards until we hit a frame that - # IS one we should trace (if there is one). i.e., if - # it's in the code being profiled, and it is just - # calling stuff deep in libraries. - if frame: - frame = cast(FrameType, frame.f_back) - else: - break - if frame: - fname = frame.f_code.co_filename - if frame: - new_frames.append(frame) - return new_frames - - @staticmethod - def frame_to_string(frame): - '''Pretty-prints a frame as a function/file name and a line number. - Additionally used a key for tallying lines.''' - co = frame.f_code - func_name = co.co_name - line_no = frame.f_lineno - filename = co.co_filename - return filename + '\t' + func_name + '\t' + str(line_no) - - @staticmethod - def get_async_frames(): - '''Obtains the stack frames of all currently executing tasks.''' - if Aergia.is_event_loop_running() and Aergia.profile_async: - return [task.get_coro().cr_frame for task in asyncio.all_tasks()] - return [] - - @staticmethod - def should_trace(filename): - '''Returns FALSE if filename is uninteresting to the user.''' - # FIXME Assume GuixSD. Makes filtering easy - if '/gnu/store' in filename: - return False - if filename[0] == '<': - return False - if 'aergia.py' in filename: - return False - return True - - @staticmethod - def is_event_loop_running() -> bool: - '''Returns TRUE if there is an exent loop running. This is what - `asyncio.get_event_loop()' did, before it was deprecated in 3.12''' - return asyncio.get_event_loop_policy()._local._loop is not None - - @staticmethod - def sort_samples(sample_dict): - '''Returns SAMPLE_DICT in descending order by number of samples.''' - return {k: v for k, v in sorted(sample_dict.items(), - key=lambda item: item[1], - reverse=True)} - - @staticmethod - def filter_duplicated_frames(frames) -> bool: - s = set() - dup = [] - for f in frames: - if f in s: - dup.append(f) - else: - s.add(f) - # TODO we probably have one because given get_async_frames returns the - # currently executing task. Would be an easy fix in that method. - # if there's more than one, I cannot explain it. - assert len( - dup) < 2, f"ERROR: More than 1 duplicate frame (shouldn't happen): {dup}" - if len(dup) != 0: - print(f"WARN: Duplicate frame found: {dup}", file=sys.stderr) - return list(s) - - -the_globals = { - '__name__': '__main__', - '__doc__': None, - '__package__': None, - '__loader__': globals()['__loader__'], - '__spec__': None, - '__annotations__': {}, - '__builtins__': globals()['__builtins__'], - '__file__': None, - '__cached__': None, -} - - -def parse_arguments(): - '''Parse CLI args''' - parser = argparse.ArgumentParser( - usage='%(prog)s [args] script [args]' - ) - - parser.add_argument('-a', '--async_off', - action='store_false', - help='Turn off experimental async profiling.', - default=True) - - parser.add_argument('-i', '--interval', - help='The minimum amount of time inbetween \ - samples in seconds.', - metavar='', - default=0.01) - parser.add_argument('script', help='A python script to run.') - parser.add_argument('s_args', nargs=argparse.REMAINDER, - help='python script args') - - return parser.parse_args() - - -def main(): - args = parse_arguments() - - sys.argv = [args.script] + args.s_args - try: - with open(args.script, 'rb') as fp: - code = compile(fp.read(), args.script, "exec") - Aergia().start(args.async_off) - exec(code, the_globals) - Aergia().stop() - except Exception: - traceback.print_exc() - - -if __name__ == "__main__": - main() - -# aergia.py ends here |
