blob: e593c94591b2f324492689a6373dbd373c81bdd9 (
plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
|
import runpy
import os
import sys
import time
import threading
# in milliseconds
SAMPLE_INTERVAL = 10
def die(message):
print(message, file=sys.stderr)
# use 'os' over 'sys', since we are calling from a child thread
os._exit(1)
def get_main_thread_id():
for thread_id, frame in sys._current_frames().items():
if threading.main_thread().ident == thread_id:
return thread_id
die("[mini-scalene] Couldn't find main thread--likely an unimplemented feature.")
def sample_running_function():
main_thread_id = get_main_thread_id()
frame = sys._current_frames()[main_thread_id]
code = frame.f_code
print(f"[mini-scalene] Currently executing: {code.co_name} in {code.co_filename}:{frame.f_lineno}")
def sampling_loop():
while True:
sample_running_function()
time.sleep(SAMPLE_INTERVAL / 1000.0)
def main():
if len(sys.argv) < 2:
die("[mini-scalene] (Usage): python3 mini-scalene.py file.py {args ...}")
script = sys.argv[1]
sys.argv = sys.argv[1:]
t = threading.Thread(target=sampling_loop, daemon=True)
t.start()
try:
runpy.run_path(script, run_name="__main__")
except Exception as e:
die(f"[mini-scalene] Script died unexpectedly: {e}")
if __name__ == "__main__":
main()
|