summaryrefslogtreecommitdiff
path: root/mini-scalene.py
diff options
context:
space:
mode:
Diffstat (limited to 'mini-scalene.py')
-rw-r--r--mini-scalene.py137
1 files changed, 95 insertions, 42 deletions
diff --git a/mini-scalene.py b/mini-scalene.py
index e593c94..4883cff 100644
--- a/mini-scalene.py
+++ b/mini-scalene.py
@@ -1,55 +1,108 @@
-import runpy
-import os
import sys
-import time
+import os
import threading
-
-
-# in milliseconds
-SAMPLE_INTERVAL = 10
-
-
-def die(message):
- print(message, file=sys.stderr)
- # use 'os' over 'sys', since we are calling from a child thread
- os._exit(1)
-
-
-def get_main_thread_id():
- for thread_id, frame in sys._current_frames().items():
- if threading.main_thread().ident == thread_id:
- return thread_id
- die("[mini-scalene] Couldn't find main thread--likely an unimplemented feature.")
-
-
-def sample_running_function():
- main_thread_id = get_main_thread_id()
-
- frame = sys._current_frames()[main_thread_id]
- code = frame.f_code
- print(f"[mini-scalene] Currently executing: {code.co_name} in {code.co_filename}:{frame.f_lineno}")
-
-
-def sampling_loop():
- while True:
- sample_running_function()
- time.sleep(SAMPLE_INTERVAL / 1000.0)
+import traceback
+import runpy
+import atexit
+import signal
+from collections import defaultdict
+
+
+class mini_scalene:
+ cpu_samples = defaultdict(lambda: 0)
+ total_cpu_samples = 0
+ signal_interval = 0.01
+
+ def __init__(self):
+ signal.signal(signal.SIGPROF, self.cpu_signal_handler)
+ signal.setitimer(signal.ITIMER_PROF,
+ self.signal_interval, self.signal_interval)
+
+ @staticmethod
+ def start():
+ atexit.register(mini_scalene.exit_handler)
+
+ @staticmethod
+ def exit_handler():
+ # Turn off the profiling signals.
+ signal.signal(signal.ITIMER_PROF, signal.SIG_IGN)
+ signal.signal(signal.SIGVTALRM, signal.SIG_IGN)
+ signal.setitimer(signal.ITIMER_PROF, 0)
+ # If we've collected any samples, dump them.
+ print("CPU usage:")
+ if mini_scalene.total_cpu_samples > 0:
+ # Sort the samples in descending order by number of samples.
+ mini_scalene.cpu_samples = {k: v for k, v in sorted(
+ mini_scalene.cpu_samples.items(), key=lambda item: item[1], reverse=True)}
+ for key in mini_scalene.cpu_samples:
+ print(key + " : " + str(mini_scalene.cpu_samples[key] * 100 / mini_scalene.total_cpu_samples) + "%" + " (" + str(
+ mini_scalene.cpu_samples[key]) + " total samples)")
+ else:
+ print("(did not run long enough to profile)")
+
+ @staticmethod
+ def cpu_signal_handler(sig, frame):
+ keys = mini_scalene.compute_frames_to_record(frame)
+ for key in keys:
+ mini_scalene.cpu_samples[mini_scalene.frame_to_string(key)] += 1
+ mini_scalene.total_cpu_samples += 1
+ return
+
+ @staticmethod
+ def compute_frames_to_record(this_frame):
+ """Collects all stack frames that Scalene actually processes."""
+ frames = [this_frame]
+ frames += [sys._current_frames().get(t.ident, None)
+ for t in threading.enumerate()]
+ # Process all the frames to remove ones we aren't going to track.
+ new_frames = []
+ for frame in frames:
+ if frame is None:
+ continue
+ fname = frame.f_code.co_filename
+ # Record samples only for files we care about.
+ if (len(fname)) == 0:
+ # 'eval/compile' gives no f_code.co_filename. We have
+ # to look back into the outer frame in order to check
+ # the co_filename.
+ fname = frame.f_back.f_code.co_filename
+ if not mini_scalene.should_trace(fname):
+ continue
+ new_frames.append(frame)
+ return new_frames
+
+ @staticmethod
+ def frame_to_string(frame):
+ co = frame.f_code
+ func_name = co.co_name
+ line_no = frame.f_lineno
+ filename = co.co_filename
+ return filename + '\t' + func_name + '\t' + str(line_no)
+
+ @staticmethod
+ def should_trace(filename):
+ # Don't trace the profiler itself.
+ if 'mini-scalene.py' in filename:
+ return False
+ # Don't trace Python builtins.
+ if '<frozen importlib._bootstrap>' in filename:
+ return False
+ if '<frozen importlib._bootstrap_external>' in filename:
+ return False
+ return True
def main():
- if len(sys.argv) < 2:
- die("[mini-scalene] (Usage): python3 mini-scalene.py file.py {args ...}")
+ assert len(
+ sys.argv) >= 2, "(Usage): python3 mini-scalene.py file.py {args ...}"
script = sys.argv[1]
- sys.argv = sys.argv[1:]
-
- t = threading.Thread(target=sampling_loop, daemon=True)
- t.start()
+ mini_scalene().start()
try:
runpy.run_path(script, run_name="__main__")
- except Exception as e:
- die(f"[mini-scalene] Script died unexpectedly: {e}")
+ except Exception:
+ traceback.print_exc()
if __name__ == "__main__":