1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
|
import sys
import threading
import traceback
import runpy
import atexit
import signal
from typing import cast
from types import FrameType
from collections import defaultdict
class mini_scalene:
cpu_samples = defaultdict(lambda: 0)
total_cpu_samples = 0
signal_interval = 0.01
def __init__(self):
signal.signal(signal.SIGPROF, self.cpu_signal_handler)
signal.setitimer(signal.ITIMER_PROF,
self.signal_interval, self.signal_interval)
@staticmethod
def start():
atexit.register(mini_scalene.exit_handler)
@staticmethod
def exit_handler():
# Turn off the profiling signals.
signal.signal(signal.ITIMER_PROF, signal.SIG_IGN)
signal.signal(signal.SIGVTALRM, signal.SIG_IGN)
signal.setitimer(signal.ITIMER_PROF, 0)
# If we've collected any samples, dump them.
print("CPU usage:")
if mini_scalene.total_cpu_samples > 0:
# Sort the samples in descending order by number of samples.
mini_scalene.cpu_samples = {k: v for k, v in sorted(
mini_scalene.cpu_samples.items(), key=lambda item: item[1], reverse=True)}
for key in mini_scalene.cpu_samples:
print(key + " : " + str(mini_scalene.cpu_samples[key] * 100 / mini_scalene.total_cpu_samples) + "%" + " (" + str(
mini_scalene.cpu_samples[key]) + " total samples)")
else:
print("(did not run long enough to profile)")
@staticmethod
def cpu_signal_handler(sig, frame):
keys = mini_scalene.compute_frames_to_record(frame)
for key in keys:
mini_scalene.cpu_samples[mini_scalene.frame_to_string(key)] += 1
mini_scalene.total_cpu_samples += 1
return
@staticmethod
def compute_frames_to_record(this_frame):
"""Collects all stack frames that Scalene actually processes."""
frames = [this_frame]
frames += [sys._current_frames().get(t.ident, None)
for t in threading.enumerate()]
# Process all the frames to remove ones we aren't going to track.
new_frames = []
for frame in frames:
if frame is None:
continue
fname = frame.f_code.co_filename
# Record samples only for files we care about.
if (len(fname)) == 0:
# 'eval/compile' gives no f_code.co_filename. We have
# to look back into the outer frame in order to check
# the co_filename.
fname = frame.f_back.f_code.co_filename
while not mini_scalene.should_trace(fname):
# Walk the stack backwards until we hit a frame that
# IS one we should trace (if there is one). i.e., if
# it's in the code being profiled, and it is just
# calling stuff deep in libraries.
if frame:
frame = cast(FrameType, frame.f_back)
else:
break
if frame:
fname = frame.f_code.co_filename
if frame:
new_frames.append(frame)
return new_frames
@staticmethod
def frame_to_string(frame):
co = frame.f_code
func_name = co.co_name
line_no = frame.f_lineno
filename = co.co_filename
return filename + '\t' + func_name + '\t' + str(line_no)
@staticmethod
def should_trace(filename):
# We're assuming Guix System. That makes it easy.
if '/gnu/store' in filename:
return False
if 'mini-scalene.py' in filename:
return False
if '<frozen importlib._bootstrap>' in filename:
return False
if '<frozen importlib._bootstrap_external>' in filename:
return False
return True
def main():
assert len(
sys.argv) >= 2, "(Usage): python3 mini-scalene.py file.py {args ...}"
script = sys.argv[1]
mini_scalene().start()
try:
runpy.run_path(script, run_name="__main__")
except Exception:
traceback.print_exc()
if __name__ == "__main__":
main()
|