summaryrefslogtreecommitdiff
path: root/aergia.py
diff options
context:
space:
mode:
Diffstat (limited to 'aergia.py')
-rw-r--r--aergia.py315
1 files changed, 0 insertions, 315 deletions
diff --git a/aergia.py b/aergia.py
deleted file mode 100644
index 64567a2..0000000
--- a/aergia.py
+++ /dev/null
@@ -1,315 +0,0 @@
-'''
- _/_/ _/
- _/ _/ _/_/ _/ _/_/ _/_/_/ _/_/_/
- _/_/_/_/ _/_/_/_/ _/_/ _/ _/ _/ _/ _/
- _/ _/ _/ _/ _/ _/ _/ _/ _/
- _/ _/ _/_/_/ _/ _/_/_/ _/ _/_/_/
- _/
- _/_/
-Copyright:
-
- This program is free software: you can redistribute it
- and/or modify it under the terms of the GNU General
- Public License as published by the Free Software
- Foundation, either version 3 of the License, or (at your
- option) any later version.
-
- This program is distributed in the hope that it will be
- useful, but WITHOUT ANY WARRANTY; without even the implied
- warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
- PURPOSE. See the GNU General Public License for more details.
-
- You should have received a copy of the GNU General Public
- License along with this program. If not, see
- <https://www.gnu.org/licenses/>.
-
-
-Commentary:
-
- Aergia is a sampling based profiler based off of SCALENE
- (https://github.com/plasma-umass/scalene).
-
- It is not particularly informative, but unlike SCALENE
- or other sampling-based profilers I could find, reports
- the wall-time each asyncio await call spends idling.
-
- The goal behind Aergia is to eventually have these
- features, or similar, merged into SCALENE.
-
-
-Code:
-'''
-
-from collections import defaultdict
-from types import FrameType
-from typing import cast, List, Tuple
-import argparse
-import asyncio
-import selectors
-import signal
-import sys
-import threading
-import time
-import traceback
-
-
-class ReplacementEpollSelector(selectors.EpollSelector):
- '''
- Provides a replacement for selectors.PollSelector that
- periodically wakes up to accept signals.
- '''
-
- def select(
- self, timeout=None
- ) -> List[Tuple[selectors.SelectorKey, int]]:
- start_time = time.perf_counter()
- if not timeout or timeout < 0:
- interval = sys.getswitchinterval()
- else:
- interval = min(timeout, sys.getswitchinterval())
- while True:
- selected = super().select(interval)
- if selected or timeout == 0 or not timeout:
- return selected
- end_time = time.perf_counter()
- if end_time - start_time >= timeout:
- return []
-
-
-selectors.DefaultSelector = ReplacementEpollSelector
-
-
-class Aergia(object):
-
- # a key-value pair where keys represent frame metadata (see
- # Aergia.frame_to_string) and values represent number of times
- # sampled.
- cpu_samples = defaultdict(lambda: 0)
- cpu_samples_c = defaultdict(lambda: 0)
- # number of times samples have been collected
- total_cpu_samples = 0
-
- # the time, in seconds, between samples
- signal_interval = 0.01
- # the timestamp recorded last signal
- last_signal_time = 0.0
-
- # if we should try to profile asynchronous code. Used to observe
- # effectiveness of the implementation.
- profile_async = True
-
- def __init__(self):
- signal.signal(signal.SIGPROF,
- self.cpu_signal_handler)
- signal.setitimer(signal.ITIMER_PROF,
- self.signal_interval,
- self.signal_interval)
- Aergia.last_signal_time = Aergia.gettime()
-
- @staticmethod
- def gettime():
- '''get the wallclock time'''
- return time.perf_counter()
-
- @staticmethod
- def start(profile_async):
- Aergia.profile_async = profile_async
-
- @staticmethod
- def stop():
- '''Turn off profiling signals'''
- Aergia.disable_signals()
- Aergia.exit_handler()
-
- @staticmethod
- def exit_handler():
- '''Pretty-print profiling information.'''
- # If we've collected any samples, dump them.
- print("CPU usage (Python):")
- if Aergia.total_cpu_samples > 0:
- for key in Aergia.sort_samples(Aergia.cpu_samples):
- print(f"{key} : "
- f"{Aergia.cpu_samples[key] * 100 / Aergia.total_cpu_samples:.3f} % "
- f"({Aergia.cpu_samples[key]:.1f} total samples)")
- print("CPU usage (Native):")
- for key in Aergia.sort_samples(Aergia.cpu_samples_c):
- print(f"{key} : "
- f"{Aergia.cpu_samples_c[key] * 100 / Aergia.total_cpu_samples:.3f} % "
- f"({Aergia.cpu_samples_c[key]:.1f} total samples)")
- else:
- print("(did not run long enough to profile)")
-
- @staticmethod
- def disable_signals():
- signal.signal(signal.ITIMER_PROF, signal.SIG_IGN)
- signal.signal(signal.SIGVTALRM, signal.SIG_IGN)
- signal.setitimer(signal.ITIMER_PROF, 0)
-
- @staticmethod
- def cpu_signal_handler(sig, frame):
- elapsed_since_last_signal = Aergia.gettime() - \
- Aergia.last_signal_time
- c_time_norm = (elapsed_since_last_signal -
- Aergia.signal_interval) / \
- Aergia.signal_interval
-
- keys = Aergia.compute_frames_to_record(frame)
- for key in keys:
- Aergia.cpu_samples[Aergia.frame_to_string(key)] += 1
- Aergia.cpu_samples_c[Aergia.frame_to_string(
- key)] += c_time_norm
- Aergia.total_cpu_samples += elapsed_since_last_signal / \
- Aergia.signal_interval
- Aergia.last_signal_time = Aergia.gettime()
-
- @staticmethod
- def compute_frames_to_record(this_frame):
- '''Collects all stack frames that Aergia actually processes.'''
- frames = [this_frame]
- frames += [sys._current_frames().get(t.ident, None)
- for t in threading.enumerate()]
- frames += Aergia.get_async_frames()
-
- frames = Aergia.filter_duplicated_frames(frames)
- # Process all the frames to remove ones we aren't going to track.
- new_frames = []
- for frame in frames:
- if frame is None:
- continue
- fname = frame.f_code.co_filename
- # Record samples only for files we care about.
- if (len(fname)) == 0:
- # 'eval/compile' gives no f_code.co_filename. We have
- # to look back into the outer frame in order to check
- # the co_filename.
- fname = frame.f_back.f_code.co_filename
- while not Aergia.should_trace(fname):
- # Walk the stack backwards until we hit a frame that
- # IS one we should trace (if there is one). i.e., if
- # it's in the code being profiled, and it is just
- # calling stuff deep in libraries.
- if frame:
- frame = cast(FrameType, frame.f_back)
- else:
- break
- if frame:
- fname = frame.f_code.co_filename
- if frame:
- new_frames.append(frame)
- return new_frames
-
- @staticmethod
- def frame_to_string(frame):
- '''Pretty-prints a frame as a function/file name and a line number.
- Additionally used a key for tallying lines.'''
- co = frame.f_code
- func_name = co.co_name
- line_no = frame.f_lineno
- filename = co.co_filename
- return filename + '\t' + func_name + '\t' + str(line_no)
-
- @staticmethod
- def get_async_frames():
- '''Obtains the stack frames of all currently executing tasks.'''
- if Aergia.is_event_loop_running() and Aergia.profile_async:
- return [task.get_coro().cr_frame for task in asyncio.all_tasks()]
- return []
-
- @staticmethod
- def should_trace(filename):
- '''Returns FALSE if filename is uninteresting to the user.'''
- # FIXME Assume GuixSD. Makes filtering easy
- if '/gnu/store' in filename:
- return False
- if filename[0] == '<':
- return False
- if 'aergia.py' in filename:
- return False
- return True
-
- @staticmethod
- def is_event_loop_running() -> bool:
- '''Returns TRUE if there is an exent loop running. This is what
- `asyncio.get_event_loop()' did, before it was deprecated in 3.12'''
- return asyncio.get_event_loop_policy()._local._loop is not None
-
- @staticmethod
- def sort_samples(sample_dict):
- '''Returns SAMPLE_DICT in descending order by number of samples.'''
- return {k: v for k, v in sorted(sample_dict.items(),
- key=lambda item: item[1],
- reverse=True)}
-
- @staticmethod
- def filter_duplicated_frames(frames) -> bool:
- s = set()
- dup = []
- for f in frames:
- if f in s:
- dup.append(f)
- else:
- s.add(f)
- # TODO we probably have one because given get_async_frames returns the
- # currently executing task. Would be an easy fix in that method.
- # if there's more than one, I cannot explain it.
- assert len(
- dup) < 2, f"ERROR: More than 1 duplicate frame (shouldn't happen): {dup}"
- if len(dup) != 0:
- print(f"WARN: Duplicate frame found: {dup}", file=sys.stderr)
- return list(s)
-
-
-the_globals = {
- '__name__': '__main__',
- '__doc__': None,
- '__package__': None,
- '__loader__': globals()['__loader__'],
- '__spec__': None,
- '__annotations__': {},
- '__builtins__': globals()['__builtins__'],
- '__file__': None,
- '__cached__': None,
-}
-
-
-def parse_arguments():
- '''Parse CLI args'''
- parser = argparse.ArgumentParser(
- usage='%(prog)s [args] script [args]'
- )
-
- parser.add_argument('-a', '--async_off',
- action='store_false',
- help='Turn off experimental async profiling.',
- default=True)
-
- parser.add_argument('-i', '--interval',
- help='The minimum amount of time inbetween \
- samples in seconds.',
- metavar='',
- default=0.01)
- parser.add_argument('script', help='A python script to run.')
- parser.add_argument('s_args', nargs=argparse.REMAINDER,
- help='python script args')
-
- return parser.parse_args()
-
-
-def main():
- args = parse_arguments()
-
- sys.argv = [args.script] + args.s_args
- try:
- with open(args.script, 'rb') as fp:
- code = compile(fp.read(), args.script, "exec")
- Aergia().start(args.async_off)
- exec(code, the_globals)
- Aergia().stop()
- except Exception:
- traceback.print_exc()
-
-
-if __name__ == "__main__":
- main()
-
-# aergia.py ends here