diff options
| -rw-r--r-- | __init__.py | 0 | ||||
| -rw-r--r-- | __pycache__/mini_scalene.cpython-311.pyc | bin | 0 -> 13865 bytes | |||
| -rw-r--r-- | __pycache__/replacement_poll_selector.cpython-311.pyc | bin | 0 -> 2546 bytes | |||
| -rw-r--r-- | __pycache__/sitecustomize.cpython-311.pyc | bin | 0 -> 417 bytes | |||
| -rw-r--r-- | mini_scalene.py (renamed from mini-scalene.py) | 86 | ||||
| -rw-r--r-- | replacement_poll_selector.py | 41 | ||||
| -rw-r--r-- | tests/simult.py | 19 |
7 files changed, 104 insertions, 42 deletions
diff --git a/__init__.py b/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/__init__.py diff --git a/__pycache__/mini_scalene.cpython-311.pyc b/__pycache__/mini_scalene.cpython-311.pyc Binary files differnew file mode 100644 index 0000000..24b7268 --- /dev/null +++ b/__pycache__/mini_scalene.cpython-311.pyc diff --git a/__pycache__/replacement_poll_selector.cpython-311.pyc b/__pycache__/replacement_poll_selector.cpython-311.pyc Binary files differnew file mode 100644 index 0000000..b346241 --- /dev/null +++ b/__pycache__/replacement_poll_selector.cpython-311.pyc diff --git a/__pycache__/sitecustomize.cpython-311.pyc b/__pycache__/sitecustomize.cpython-311.pyc Binary files differnew file mode 100644 index 0000000..4f5ff5a --- /dev/null +++ b/__pycache__/sitecustomize.cpython-311.pyc diff --git a/mini-scalene.py b/mini_scalene.py index e957bb0..569d7c0 100644 --- a/mini-scalene.py +++ b/mini_scalene.py @@ -1,13 +1,16 @@ import sys import argparse -import os import threading import traceback import atexit import signal import asyncio import time -from typing import cast +from typing import ( + Any, + Callable, + cast, +) from types import FrameType from collections import defaultdict @@ -39,12 +42,12 @@ def parse_arguments(): return parser.parse_args() -class mini_scalene(object): +class MiniScalene(object): '''A stripped-down version of SCALENE which tallies active lines during execution.''' # a key-value pair where keys represent frame metadata (see - # mini_scalene.frame_to_string) and values represent number of times + # MiniScalene.frame_to_string) and values represent number of times # sampled. cpu_samples = defaultdict(lambda: 0) cpu_samples_c = defaultdict(lambda: 0) @@ -61,12 +64,13 @@ class mini_scalene(object): profile_async = True def __init__(self): + import replacement_poll_selector signal.signal(signal.SIGPROF, self.cpu_signal_handler) signal.setitimer(signal.ITIMER_PROF, self.signal_interval, self.signal_interval) - mini_scalene.last_signal_time = mini_scalene.gettime() + MiniScalene.last_signal_time = MiniScalene.gettime() @staticmethod def gettime(): @@ -75,25 +79,25 @@ class mini_scalene(object): @staticmethod def start(profile_async): - mini_scalene.profile_async = profile_async - atexit.register(mini_scalene.exit_handler) + MiniScalene.profile_async = profile_async + atexit.register(MiniScalene.exit_handler) @staticmethod def exit_handler(): '''Turn off profiling signals & pretty-print profiling information.''' - mini_scalene.disable_signals() + MiniScalene.disable_signals() # If we've collected any samples, dump them. print("CPU usage (Python):") - if mini_scalene.total_cpu_samples > 0: - for key in mini_scalene.sort_samples(mini_scalene.cpu_samples): + if MiniScalene.total_cpu_samples > 0: + for key in MiniScalene.sort_samples(MiniScalene.cpu_samples): print(f"{key} : " - f"{mini_scalene.cpu_samples[key] * 100 / mini_scalene.total_cpu_samples:.3f} % " - f"({mini_scalene.cpu_samples[key]:.1f} total samples)") + f"{MiniScalene.cpu_samples[key] * 100 / MiniScalene.total_cpu_samples:.3f} % " + f"({MiniScalene.cpu_samples[key]:.1f} total samples)") print("CPU usage (Native):") - for key in mini_scalene.sort_samples(mini_scalene.cpu_samples_c): + for key in MiniScalene.sort_samples(MiniScalene.cpu_samples_c): print(f"{key} : " - f"{mini_scalene.cpu_samples_c[key] * 100 / mini_scalene.total_cpu_samples:.3f} % " - f"({mini_scalene.cpu_samples_c[key]:.1f} total samples)") + f"{MiniScalene.cpu_samples_c[key] * 100 / MiniScalene.total_cpu_samples:.3f} % " + f"({MiniScalene.cpu_samples_c[key]:.1f} total samples)") else: print("(did not run long enough to profile)") @@ -105,20 +109,20 @@ class mini_scalene(object): @staticmethod def cpu_signal_handler(sig, frame): - elapsed_since_last_signal = mini_scalene.gettime() - \ - mini_scalene.last_signal_time + elapsed_since_last_signal = MiniScalene.gettime() - \ + MiniScalene.last_signal_time c_time_norm = (elapsed_since_last_signal - - mini_scalene.signal_interval) / \ - mini_scalene.signal_interval + MiniScalene.signal_interval) / \ + MiniScalene.signal_interval - keys = mini_scalene.compute_frames_to_record(frame) + keys = MiniScalene.compute_frames_to_record(frame) for key in keys: - mini_scalene.cpu_samples[mini_scalene.frame_to_string(key)] += 1 - mini_scalene.cpu_samples_c[mini_scalene.frame_to_string( + MiniScalene.cpu_samples[MiniScalene.frame_to_string(key)] += 1 + MiniScalene.cpu_samples_c[MiniScalene.frame_to_string( key)] += c_time_norm - mini_scalene.total_cpu_samples += elapsed_since_last_signal / \ - mini_scalene.signal_interval - mini_scalene.last_signal_time = mini_scalene.gettime() + MiniScalene.total_cpu_samples += elapsed_since_last_signal / \ + MiniScalene.signal_interval + MiniScalene.last_signal_time = MiniScalene.gettime() @staticmethod def compute_frames_to_record(this_frame): @@ -126,9 +130,9 @@ class mini_scalene(object): frames = [this_frame] frames += [sys._current_frames().get(t.ident, None) for t in threading.enumerate()] - frames += mini_scalene.get_async_frames() + frames += MiniScalene.get_async_frames() - frames = mini_scalene.filter_duplicated_frames(frames) + frames = MiniScalene.filter_duplicated_frames(frames) # Process all the frames to remove ones we aren't going to track. new_frames = [] for frame in frames: @@ -141,7 +145,7 @@ class mini_scalene(object): # to look back into the outer frame in order to check # the co_filename. fname = frame.f_back.f_code.co_filename - while not mini_scalene.should_trace(fname): + while not MiniScalene.should_trace(fname): # Walk the stack backwards until we hit a frame that # IS one we should trace (if there is one). i.e., if # it's in the code being profiled, and it is just @@ -169,7 +173,7 @@ class mini_scalene(object): @staticmethod def get_async_frames(): '''Obtains the stack frames of all currently executing tasks.''' - if mini_scalene.is_event_loop_running() and mini_scalene.profile_async: + if MiniScalene.is_event_loop_running() and MiniScalene.profile_async: return [task.get_coro().cr_frame for task in asyncio.all_tasks()] return [] @@ -181,7 +185,7 @@ class mini_scalene(object): return False if filename[0] == '<': return False - if 'mini-scalene.py' in filename: + if 'mini_scalene.py' in filename: return False return True @@ -199,6 +203,26 @@ class mini_scalene(object): reverse=True)} @staticmethod + def shim(func: Callable[[Any], Any]) -> Any: + """Provide a decorator that calls the wrapped function with the + Scalene variant. + + Wrapped function must be of type (s: Scalene) -> Any. + + This decorator allows for marking a function in a separate + file as a drop-in replacement for an existing library + function. The intention is for these functions to replace a + function that indefinitely blocks (which interferes with + Scalene) with a function that awakens periodically to allow + for signals to be delivered. + + """ + func(MiniScalene) + # Returns the function itself to the calling file for the sake + # of not displaying unusual errors if someone attempts to call + # it + + @staticmethod def filter_duplicated_frames(frames) -> bool: s = set() dup = [] @@ -224,7 +248,7 @@ def main(): try: with open(args.script, 'rb') as fp: code = compile(fp.read(), args.script, "exec") - mini_scalene().start(args.async_off) + MiniScalene().start(args.async_off) exec(code, the_globals) except Exception: traceback.print_exc() diff --git a/replacement_poll_selector.py b/replacement_poll_selector.py new file mode 100644 index 0000000..0813a66 --- /dev/null +++ b/replacement_poll_selector.py @@ -0,0 +1,41 @@ +import selectors +import sys +import threading +import time +from typing import List, Optional, Tuple + +from mini_scalene import MiniScalene + + +@MiniScalene.shim +def replacement_poll_selector(mini_scalene: MiniScalene) -> None: + """ + A replacement for selectors.PollSelector that + periodically wakes up to accept signals + """ + + class ReplacementPollSelector(selectors.PollSelector): + def select( + self, timeout: Optional[float] = -1 + ) -> List[Tuple[selectors.SelectorKey, int]]: + tident = threading.get_ident() + start_time = time.perf_counter() + if not timeout or timeout < 0: + interval = sys.getswitchinterval() + else: + interval = min(timeout, sys.getswitchinterval()) + while True: + scalene.set_thread_sleeping(tident) + selected = super().select(interval) + scalene.reset_thread_sleeping(tident) + if selected or timeout == 0: + return selected + end_time = time.perf_counter() + if timeout and timeout != -1: + if end_time - start_time >= timeout: + return [] # None + + ReplacementPollSelector.__qualname__ = ( + "replacement_poll_selector.ReplacementPollSelector" + ) + selectors.PollSelector = ReplacementPollSelector # type: ignore diff --git a/tests/simult.py b/tests/simult.py index 61a3792..49d3e4b 100644 --- a/tests/simult.py +++ b/tests/simult.py @@ -1,20 +1,17 @@ import asyncio -import time -async def count(x): - i = 0 - await asyncio.sleep(2) - while i < 100000: - z = x * x - z = z * z - z = z * z - i += 1 - return z +async def count(): + print("before") + await asyncio.sleep(3) + print("after") async def main(): - await asyncio.gather(count(1), count(2), count(3)) + await asyncio.gather(count(), count(), count()) + i = 0 + while i < 3000000: + i += 1 print("done") asyncio.run(main()) |
