summaryrefslogtreecommitdiff
path: root/aergia
diff options
context:
space:
mode:
Diffstat (limited to 'aergia')
-rwxr-xr-xaergia300
1 files changed, 300 insertions, 0 deletions
diff --git a/aergia b/aergia
new file mode 100755
index 0000000..4578d24
--- /dev/null
+++ b/aergia
@@ -0,0 +1,300 @@
+#!/usr/bin/env python3
+'''
+ _/_/ _/
+ _/ _/ _/_/ _/ _/_/ _/_/_/ _/_/_/
+ _/_/_/_/ _/_/_/_/ _/_/ _/ _/ _/ _/ _/
+ _/ _/ _/ _/ _/ _/ _/ _/ _/
+ _/ _/ _/_/_/ _/ _/_/_/ _/ _/_/_/
+ _/
+ _/_/
+Copyright:
+
+ This program is free software: you can redistribute it
+ and/or modify it under the terms of the GNU General
+ Public License as published by the Free Software
+ Foundation, either version 3 of the License, or (at your
+ option) any later version.
+
+ This program is distributed in the hope that it will be
+ useful, but WITHOUT ANY WARRANTY; without even the implied
+ warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
+ PURPOSE. See the GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public
+ License along with this program. If not, see
+ <https://www.gnu.org/licenses/>.
+
+
+Commentary:
+
+ Aergia is a sampling based profiler based off of SCALENE
+ by Emery Berger (https://github.com/plasma-umass/scalene).
+
+ It is not particularly informative, but unlike SCALENE
+ or other sampling-based profilers I could find, reports
+ the wall-time each asyncio await call spends idling.
+
+ The goal behind Aergia is to eventually have these features,
+ or similar, merged into SCALENE.
+
+
+Code:
+'''
+
+from collections import defaultdict
+from types import FrameType
+from typing import cast, List, Tuple
+import argparse
+import asyncio
+import selectors
+import signal
+import sys
+import threading
+import time
+import traceback
+
+
+class ReplacementEpollSelector(selectors.EpollSelector):
+ '''
+ Provides a replacement for selectors.PollSelector that
+ periodically wakes up to accept signals.
+ '''
+
+ def select(
+ self, timeout=None
+ ) -> List[Tuple[selectors.SelectorKey, int]]:
+ start_time = time.perf_counter()
+ if not timeout or timeout < 0:
+ interval = sys.getswitchinterval()
+ else:
+ interval = min(timeout, sys.getswitchinterval())
+ while True:
+ selected = super().select(interval)
+ if selected or timeout == 0 or not timeout:
+ return selected
+ end_time = time.perf_counter()
+ if end_time - start_time >= timeout:
+ return []
+
+
+selectors.DefaultSelector = ReplacementEpollSelector
+
+
+class Aergia(object):
+
+ # a key-value pair where keys represent frame metadata (see
+ # Aergia.frame_to_string) and values represent number of times
+ # sampled.
+ cpu_samples = defaultdict(lambda: 0)
+ cpu_samples_c = defaultdict(lambda: 0)
+ # number of times samples have been collected
+ total_cpu_samples = 0
+
+ # the time, in seconds, between samples
+ signal_interval = 0.01
+ # the timestamp recorded last signal
+ last_signal_time = 0.0
+
+
+ def __init__(self):
+ signal.signal(signal.SIGPROF,
+ self.cpu_signal_handler)
+ signal.setitimer(signal.ITIMER_PROF,
+ self.signal_interval,
+ self.signal_interval)
+
+ @staticmethod
+ def gettime():
+ '''get the wallclock time'''
+ return time.time()
+
+ @staticmethod
+ def start(profile_async):
+ Aergia.last_signal_time = Aergia.gettime()
+
+ @staticmethod
+ def stop():
+ '''Turn off profiling signals'''
+ Aergia.disable_signals()
+ Aergia.exit_handler()
+
+ @staticmethod
+ def exit_handler():
+ '''Pretty-print profiling information.'''
+ # If we've collected any samples, dump them.
+ print("CPU usage (Python):")
+ if Aergia.total_cpu_samples > 0:
+ for key in Aergia.sort_samples(Aergia.cpu_samples):
+ print(f"{key} : "
+ f"{Aergia.cpu_samples[key] * 100 / Aergia.total_cpu_samples:.3f} % "
+ f"({Aergia.cpu_samples[key]:.1f} total samples)")
+ print("CPU usage (Native):")
+ for key in Aergia.sort_samples(Aergia.cpu_samples_c):
+ print(f"{key} : "
+ f"{Aergia.cpu_samples_c[key] * 100 / Aergia.total_cpu_samples:.3f} % "
+ f"({Aergia.cpu_samples_c[key]:.1f} total samples)")
+ else:
+ print("(Bug) The program did not run long enough to profile, or a one-time error occured.")
+
+ @staticmethod
+ def disable_signals():
+ signal.signal(signal.ITIMER_PROF, signal.SIG_IGN)
+ signal.signal(signal.SIGVTALRM, signal.SIG_IGN)
+ signal.setitimer(signal.ITIMER_PROF, 0)
+
+ @staticmethod
+ def cpu_signal_handler(sig, frame):
+ elapsed_since_last_signal = Aergia.gettime() - \
+ Aergia.last_signal_time
+ c_time_norm = (elapsed_since_last_signal -
+ Aergia.signal_interval) / \
+ Aergia.signal_interval
+
+ keys = Aergia.compute_frames_to_record()
+ for key in keys:
+ Aergia.cpu_samples[Aergia.frame_to_string(key)] += 1
+ Aergia.cpu_samples_c[Aergia.frame_to_string(
+ key)] += c_time_norm
+ Aergia.total_cpu_samples += elapsed_since_last_signal / \
+ Aergia.signal_interval
+ Aergia.last_signal_time = Aergia.gettime()
+
+ @staticmethod
+ def compute_frames_to_record():
+ '''Collects all stack frames that Aergia actually processes.'''
+ if Aergia.is_event_loop_running():
+ frames = [task.get_coro().cr_frame for task in asyncio.all_tasks()]
+
+ # Process all the frames to remove ones we aren't going to track.
+ new_frames = []
+ for frame in frames:
+ if frame is None:
+ continue
+ fname = frame.f_code.co_filename
+ # Record samples only for files we care about.
+ if (len(fname)) == 0:
+ # 'eval/compile' gives no f_code.co_filename. We have
+ # to look back into the outer frame in order to check
+ # the co_filename.
+ fname = frame.f_back.f_code.co_filename
+ while not Aergia.should_trace(fname):
+ # Walk the stack backwards until we hit a frame that
+ # IS one we should trace (if there is one). i.e., if
+ # it's in the code being profiled, and it is just
+ # calling stuff deep in libraries.
+ if frame:
+ frame = cast(FrameType, frame.f_back)
+ else:
+ break
+ if frame:
+ fname = frame.f_code.co_filename
+ if frame:
+ new_frames.append(frame)
+ return new_frames
+
+ @staticmethod
+ def frame_to_string(frame):
+ '''Pretty-prints a frame as a function/file name and a line number.
+ Additionally used a key for tallying lines.'''
+ co = frame.f_code
+ func_name = co.co_name
+ line_no = frame.f_lineno
+ filename = co.co_filename
+ return filename + '\t' + func_name + '\t' + str(line_no)
+
+ @staticmethod
+ def should_trace(filename):
+ '''Returns FALSE if filename is uninteresting to the user.'''
+ # FIXME Assume GuixSD. Makes filtering easy
+ if 'site-packages' in filename:
+ return False
+ if filename[0] == '<':
+ return False
+ if 'aergia' in filename:
+ return False
+ return True
+
+ @staticmethod
+ def is_event_loop_running() -> bool:
+ '''Returns TRUE if there is an exent loop running. This is what
+ `asyncio.get_event_loop()' did, before it was deprecated in 3.12'''
+ return asyncio.get_event_loop_policy()._local._loop is not None
+
+ @staticmethod
+ def sort_samples(sample_dict):
+ '''Returns SAMPLE_DICT in descending order by number of samples.'''
+ return {k: v for k, v in sorted(sample_dict.items(),
+ key=lambda item: item[1],
+ reverse=True)}
+
+ @staticmethod
+ def filter_duplicated_frames(frames) -> bool:
+ s = set()
+ dup = []
+ for f in frames:
+ if f in s:
+ dup.append(f)
+ else:
+ s.add(f)
+ # TODO we probably have one because given get_async_frames returns the
+ # currently executing task. Would be an easy fix in that method.
+ # if there's more than one, I cannot explain it.
+ assert len(
+ dup) < 2, f"ERROR: More than 1 duplicate frame (shouldn't happen): {dup}"
+ if len(dup) != 0:
+ print(f"WARN: Duplicate frame found: {dup}", file=sys.stderr)
+ return list(s)
+
+
+the_globals = {
+ '__name__': '__main__',
+ '__doc__': None,
+ '__package__': None,
+ '__loader__': globals()['__loader__'],
+ '__spec__': None,
+ '__annotations__': {},
+ '__builtins__': globals()['__builtins__'],
+ '__file__': None,
+ '__cached__': None,
+}
+
+
+def parse_arguments():
+ '''Parse CLI args'''
+ parser = argparse.ArgumentParser(
+ usage='%(prog)s [args] script [args]'
+ )
+
+ parser.add_argument('-a', '--async_off',
+ action='store_false',
+ help='Turn off experimental async profiling.',
+ default=True)
+
+ parser.add_argument('-i', '--interval',
+ help='The minimum amount of time inbetween \
+ samples in seconds.',
+ metavar='',
+ default=0.01)
+ parser.add_argument('script', help='A python script to run.')
+ parser.add_argument('s_args', nargs=argparse.REMAINDER,
+ help='python script args')
+
+ return parser.parse_args()
+
+
+def main():
+ args = parse_arguments()
+
+ sys.argv = [args.script] + args.s_args
+ try:
+ with open(args.script, 'rb') as fp:
+ code = compile(fp.read(), args.script, "exec")
+ Aergia().start(args.async_off)
+ exec(code, the_globals)
+ Aergia().stop()
+ except Exception:
+ traceback.print_exc()
+
+
+if __name__ == "__main__":
+ main()