From c21e4366fb64130d168150f0ed94293e699eb525 Mon Sep 17 00:00:00 2001 From: bd Date: Sat, 14 Jun 2025 14:55:27 -0400 Subject: Begin work of monkey-patching EPollSelector --- __init__.py | 0 __pycache__/mini_scalene.cpython-311.pyc | Bin 0 -> 13865 bytes .../replacement_poll_selector.cpython-311.pyc | Bin 0 -> 2546 bytes __pycache__/sitecustomize.cpython-311.pyc | Bin 0 -> 417 bytes mini-scalene.py | 234 ------------------- mini_scalene.py | 258 +++++++++++++++++++++ replacement_poll_selector.py | 41 ++++ tests/simult.py | 19 +- 8 files changed, 307 insertions(+), 245 deletions(-) create mode 100644 __init__.py create mode 100644 __pycache__/mini_scalene.cpython-311.pyc create mode 100644 __pycache__/replacement_poll_selector.cpython-311.pyc create mode 100644 __pycache__/sitecustomize.cpython-311.pyc delete mode 100644 mini-scalene.py create mode 100644 mini_scalene.py create mode 100644 replacement_poll_selector.py diff --git a/__init__.py b/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/__pycache__/mini_scalene.cpython-311.pyc b/__pycache__/mini_scalene.cpython-311.pyc new file mode 100644 index 0000000..24b7268 Binary files /dev/null and b/__pycache__/mini_scalene.cpython-311.pyc differ diff --git a/__pycache__/replacement_poll_selector.cpython-311.pyc b/__pycache__/replacement_poll_selector.cpython-311.pyc new file mode 100644 index 0000000..b346241 Binary files /dev/null and b/__pycache__/replacement_poll_selector.cpython-311.pyc differ diff --git a/__pycache__/sitecustomize.cpython-311.pyc b/__pycache__/sitecustomize.cpython-311.pyc new file mode 100644 index 0000000..4f5ff5a Binary files /dev/null and b/__pycache__/sitecustomize.cpython-311.pyc differ diff --git a/mini-scalene.py b/mini-scalene.py deleted file mode 100644 index e957bb0..0000000 --- a/mini-scalene.py +++ /dev/null @@ -1,234 +0,0 @@ -import sys -import argparse -import os -import threading -import traceback -import atexit -import signal -import asyncio -import time -from typing import cast -from types import FrameType -from collections import defaultdict - -the_globals = { - '__name__': '__main__', - '__doc__': None, - '__package__': None, - '__loader__': globals()['__loader__'], - '__spec__': None, - '__annotations__': {}, - '__builtins__': globals()['__builtins__'], - '__file__': None, - '__cached__': None, -} - -def parse_arguments(): - '''Parse CLI args''' - parser = argparse.ArgumentParser() - - parser.add_argument('-a', '--async_off', - action='store_false', - help='Turn off experimental async profiling.', - default=True) - - parser.add_argument('script', help='A python script to run') - parser.add_argument('s_args', nargs=argparse.REMAINDER, - help='python script args') - - return parser.parse_args() - - -class mini_scalene(object): - '''A stripped-down version of SCALENE which tallies active lines during - execution.''' - - # a key-value pair where keys represent frame metadata (see - # mini_scalene.frame_to_string) and values represent number of times - # sampled. - cpu_samples = defaultdict(lambda: 0) - cpu_samples_c = defaultdict(lambda: 0) - # number of times samples have been collected - total_cpu_samples = 0 - - # the time, in seconds, between samples - signal_interval = 0.01 - # the timestamp recorded last signal - last_signal_time = 0.0 - - # if we should try to profile asynchronous code. Used to observe - # effectiveness of the implementation. - profile_async = True - - def __init__(self): - signal.signal(signal.SIGPROF, - self.cpu_signal_handler) - signal.setitimer(signal.ITIMER_PROF, - self.signal_interval, - self.signal_interval) - mini_scalene.last_signal_time = mini_scalene.gettime() - - @staticmethod - def gettime(): - '''get the wallclock time''' - return time.perf_counter() - - @staticmethod - def start(profile_async): - mini_scalene.profile_async = profile_async - atexit.register(mini_scalene.exit_handler) - - @staticmethod - def exit_handler(): - '''Turn off profiling signals & pretty-print profiling information.''' - mini_scalene.disable_signals() - # If we've collected any samples, dump them. - print("CPU usage (Python):") - if mini_scalene.total_cpu_samples > 0: - for key in mini_scalene.sort_samples(mini_scalene.cpu_samples): - print(f"{key} : " - f"{mini_scalene.cpu_samples[key] * 100 / mini_scalene.total_cpu_samples:.3f} % " - f"({mini_scalene.cpu_samples[key]:.1f} total samples)") - print("CPU usage (Native):") - for key in mini_scalene.sort_samples(mini_scalene.cpu_samples_c): - print(f"{key} : " - f"{mini_scalene.cpu_samples_c[key] * 100 / mini_scalene.total_cpu_samples:.3f} % " - f"({mini_scalene.cpu_samples_c[key]:.1f} total samples)") - else: - print("(did not run long enough to profile)") - - @staticmethod - def disable_signals(): - signal.signal(signal.ITIMER_PROF, signal.SIG_IGN) - signal.signal(signal.SIGVTALRM, signal.SIG_IGN) - signal.setitimer(signal.ITIMER_PROF, 0) - - @staticmethod - def cpu_signal_handler(sig, frame): - elapsed_since_last_signal = mini_scalene.gettime() - \ - mini_scalene.last_signal_time - c_time_norm = (elapsed_since_last_signal - - mini_scalene.signal_interval) / \ - mini_scalene.signal_interval - - keys = mini_scalene.compute_frames_to_record(frame) - for key in keys: - mini_scalene.cpu_samples[mini_scalene.frame_to_string(key)] += 1 - mini_scalene.cpu_samples_c[mini_scalene.frame_to_string( - key)] += c_time_norm - mini_scalene.total_cpu_samples += elapsed_since_last_signal / \ - mini_scalene.signal_interval - mini_scalene.last_signal_time = mini_scalene.gettime() - - @staticmethod - def compute_frames_to_record(this_frame): - '''Collects all stack frames that Scalene actually processes.''' - frames = [this_frame] - frames += [sys._current_frames().get(t.ident, None) - for t in threading.enumerate()] - frames += mini_scalene.get_async_frames() - - frames = mini_scalene.filter_duplicated_frames(frames) - # Process all the frames to remove ones we aren't going to track. - new_frames = [] - for frame in frames: - if frame is None: - continue - fname = frame.f_code.co_filename - # Record samples only for files we care about. - if (len(fname)) == 0: - # 'eval/compile' gives no f_code.co_filename. We have - # to look back into the outer frame in order to check - # the co_filename. - fname = frame.f_back.f_code.co_filename - while not mini_scalene.should_trace(fname): - # Walk the stack backwards until we hit a frame that - # IS one we should trace (if there is one). i.e., if - # it's in the code being profiled, and it is just - # calling stuff deep in libraries. - if frame: - frame = cast(FrameType, frame.f_back) - else: - break - if frame: - fname = frame.f_code.co_filename - if frame: - new_frames.append(frame) - return new_frames - - @staticmethod - def frame_to_string(frame): - '''Pretty-prints a frame as a function/file name and a line number. - Additionally used a key for tallying lines.''' - co = frame.f_code - func_name = co.co_name - line_no = frame.f_lineno - filename = co.co_filename - return filename + '\t' + func_name + '\t' + str(line_no) - - @staticmethod - def get_async_frames(): - '''Obtains the stack frames of all currently executing tasks.''' - if mini_scalene.is_event_loop_running() and mini_scalene.profile_async: - return [task.get_coro().cr_frame for task in asyncio.all_tasks()] - return [] - - @staticmethod - def should_trace(filename): - '''Returns FALSE if filename is uninteresting to the user.''' - # FIXME Assume GuixSD. Makes filtering easy - if '/gnu/store' in filename: - return False - if filename[0] == '<': - return False - if 'mini-scalene.py' in filename: - return False - return True - - @staticmethod - def is_event_loop_running() -> bool: - '''Returns TRUE if there is an exent loop running. This is what - `asyncio.get_event_loop()' did, before it was deprecated in 3.12''' - return asyncio.get_event_loop_policy()._local._loop is not None - - @staticmethod - def sort_samples(sample_dict): - '''Returns SAMPLE_DICT in descending order by number of samples.''' - return {k: v for k, v in sorted(sample_dict.items(), - key=lambda item: item[1], - reverse=True)} - - @staticmethod - def filter_duplicated_frames(frames) -> bool: - s = set() - dup = [] - for f in frames: - if f in s: - dup.append(f) - else: - s.add(f) - # TODO we probably have one because given get_async_frames returns the - # currently executing task. Would be an easy fix in that method. - # if there's more than one, I cannot explain it. - assert len( - dup) < 2, f"ERROR: More than 1 duplicate frame (shouldn't happen): {dup}" - if len(dup) != 0: - print(f"WARN: Duplicate frame found: {dup}", file=sys.stderr) - return list(s) - - -def main(): - args = parse_arguments() - - sys.argv = [args.script] + args.s_args - try: - with open(args.script, 'rb') as fp: - code = compile(fp.read(), args.script, "exec") - mini_scalene().start(args.async_off) - exec(code, the_globals) - except Exception: - traceback.print_exc() - - -if __name__ == "__main__": - main() diff --git a/mini_scalene.py b/mini_scalene.py new file mode 100644 index 0000000..569d7c0 --- /dev/null +++ b/mini_scalene.py @@ -0,0 +1,258 @@ +import sys +import argparse +import threading +import traceback +import atexit +import signal +import asyncio +import time +from typing import ( + Any, + Callable, + cast, +) +from types import FrameType +from collections import defaultdict + +the_globals = { + '__name__': '__main__', + '__doc__': None, + '__package__': None, + '__loader__': globals()['__loader__'], + '__spec__': None, + '__annotations__': {}, + '__builtins__': globals()['__builtins__'], + '__file__': None, + '__cached__': None, +} + +def parse_arguments(): + '''Parse CLI args''' + parser = argparse.ArgumentParser() + + parser.add_argument('-a', '--async_off', + action='store_false', + help='Turn off experimental async profiling.', + default=True) + + parser.add_argument('script', help='A python script to run') + parser.add_argument('s_args', nargs=argparse.REMAINDER, + help='python script args') + + return parser.parse_args() + + +class MiniScalene(object): + '''A stripped-down version of SCALENE which tallies active lines during + execution.''' + + # a key-value pair where keys represent frame metadata (see + # MiniScalene.frame_to_string) and values represent number of times + # sampled. + cpu_samples = defaultdict(lambda: 0) + cpu_samples_c = defaultdict(lambda: 0) + # number of times samples have been collected + total_cpu_samples = 0 + + # the time, in seconds, between samples + signal_interval = 0.01 + # the timestamp recorded last signal + last_signal_time = 0.0 + + # if we should try to profile asynchronous code. Used to observe + # effectiveness of the implementation. + profile_async = True + + def __init__(self): + import replacement_poll_selector + signal.signal(signal.SIGPROF, + self.cpu_signal_handler) + signal.setitimer(signal.ITIMER_PROF, + self.signal_interval, + self.signal_interval) + MiniScalene.last_signal_time = MiniScalene.gettime() + + @staticmethod + def gettime(): + '''get the wallclock time''' + return time.perf_counter() + + @staticmethod + def start(profile_async): + MiniScalene.profile_async = profile_async + atexit.register(MiniScalene.exit_handler) + + @staticmethod + def exit_handler(): + '''Turn off profiling signals & pretty-print profiling information.''' + MiniScalene.disable_signals() + # If we've collected any samples, dump them. + print("CPU usage (Python):") + if MiniScalene.total_cpu_samples > 0: + for key in MiniScalene.sort_samples(MiniScalene.cpu_samples): + print(f"{key} : " + f"{MiniScalene.cpu_samples[key] * 100 / MiniScalene.total_cpu_samples:.3f} % " + f"({MiniScalene.cpu_samples[key]:.1f} total samples)") + print("CPU usage (Native):") + for key in MiniScalene.sort_samples(MiniScalene.cpu_samples_c): + print(f"{key} : " + f"{MiniScalene.cpu_samples_c[key] * 100 / MiniScalene.total_cpu_samples:.3f} % " + f"({MiniScalene.cpu_samples_c[key]:.1f} total samples)") + else: + print("(did not run long enough to profile)") + + @staticmethod + def disable_signals(): + signal.signal(signal.ITIMER_PROF, signal.SIG_IGN) + signal.signal(signal.SIGVTALRM, signal.SIG_IGN) + signal.setitimer(signal.ITIMER_PROF, 0) + + @staticmethod + def cpu_signal_handler(sig, frame): + elapsed_since_last_signal = MiniScalene.gettime() - \ + MiniScalene.last_signal_time + c_time_norm = (elapsed_since_last_signal - + MiniScalene.signal_interval) / \ + MiniScalene.signal_interval + + keys = MiniScalene.compute_frames_to_record(frame) + for key in keys: + MiniScalene.cpu_samples[MiniScalene.frame_to_string(key)] += 1 + MiniScalene.cpu_samples_c[MiniScalene.frame_to_string( + key)] += c_time_norm + MiniScalene.total_cpu_samples += elapsed_since_last_signal / \ + MiniScalene.signal_interval + MiniScalene.last_signal_time = MiniScalene.gettime() + + @staticmethod + def compute_frames_to_record(this_frame): + '''Collects all stack frames that Scalene actually processes.''' + frames = [this_frame] + frames += [sys._current_frames().get(t.ident, None) + for t in threading.enumerate()] + frames += MiniScalene.get_async_frames() + + frames = MiniScalene.filter_duplicated_frames(frames) + # Process all the frames to remove ones we aren't going to track. + new_frames = [] + for frame in frames: + if frame is None: + continue + fname = frame.f_code.co_filename + # Record samples only for files we care about. + if (len(fname)) == 0: + # 'eval/compile' gives no f_code.co_filename. We have + # to look back into the outer frame in order to check + # the co_filename. + fname = frame.f_back.f_code.co_filename + while not MiniScalene.should_trace(fname): + # Walk the stack backwards until we hit a frame that + # IS one we should trace (if there is one). i.e., if + # it's in the code being profiled, and it is just + # calling stuff deep in libraries. + if frame: + frame = cast(FrameType, frame.f_back) + else: + break + if frame: + fname = frame.f_code.co_filename + if frame: + new_frames.append(frame) + return new_frames + + @staticmethod + def frame_to_string(frame): + '''Pretty-prints a frame as a function/file name and a line number. + Additionally used a key for tallying lines.''' + co = frame.f_code + func_name = co.co_name + line_no = frame.f_lineno + filename = co.co_filename + return filename + '\t' + func_name + '\t' + str(line_no) + + @staticmethod + def get_async_frames(): + '''Obtains the stack frames of all currently executing tasks.''' + if MiniScalene.is_event_loop_running() and MiniScalene.profile_async: + return [task.get_coro().cr_frame for task in asyncio.all_tasks()] + return [] + + @staticmethod + def should_trace(filename): + '''Returns FALSE if filename is uninteresting to the user.''' + # FIXME Assume GuixSD. Makes filtering easy + if '/gnu/store' in filename: + return False + if filename[0] == '<': + return False + if 'mini_scalene.py' in filename: + return False + return True + + @staticmethod + def is_event_loop_running() -> bool: + '''Returns TRUE if there is an exent loop running. This is what + `asyncio.get_event_loop()' did, before it was deprecated in 3.12''' + return asyncio.get_event_loop_policy()._local._loop is not None + + @staticmethod + def sort_samples(sample_dict): + '''Returns SAMPLE_DICT in descending order by number of samples.''' + return {k: v for k, v in sorted(sample_dict.items(), + key=lambda item: item[1], + reverse=True)} + + @staticmethod + def shim(func: Callable[[Any], Any]) -> Any: + """Provide a decorator that calls the wrapped function with the + Scalene variant. + + Wrapped function must be of type (s: Scalene) -> Any. + + This decorator allows for marking a function in a separate + file as a drop-in replacement for an existing library + function. The intention is for these functions to replace a + function that indefinitely blocks (which interferes with + Scalene) with a function that awakens periodically to allow + for signals to be delivered. + + """ + func(MiniScalene) + # Returns the function itself to the calling file for the sake + # of not displaying unusual errors if someone attempts to call + # it + + @staticmethod + def filter_duplicated_frames(frames) -> bool: + s = set() + dup = [] + for f in frames: + if f in s: + dup.append(f) + else: + s.add(f) + # TODO we probably have one because given get_async_frames returns the + # currently executing task. Would be an easy fix in that method. + # if there's more than one, I cannot explain it. + assert len( + dup) < 2, f"ERROR: More than 1 duplicate frame (shouldn't happen): {dup}" + if len(dup) != 0: + print(f"WARN: Duplicate frame found: {dup}", file=sys.stderr) + return list(s) + + +def main(): + args = parse_arguments() + + sys.argv = [args.script] + args.s_args + try: + with open(args.script, 'rb') as fp: + code = compile(fp.read(), args.script, "exec") + MiniScalene().start(args.async_off) + exec(code, the_globals) + except Exception: + traceback.print_exc() + + +if __name__ == "__main__": + main() diff --git a/replacement_poll_selector.py b/replacement_poll_selector.py new file mode 100644 index 0000000..0813a66 --- /dev/null +++ b/replacement_poll_selector.py @@ -0,0 +1,41 @@ +import selectors +import sys +import threading +import time +from typing import List, Optional, Tuple + +from mini_scalene import MiniScalene + + +@MiniScalene.shim +def replacement_poll_selector(mini_scalene: MiniScalene) -> None: + """ + A replacement for selectors.PollSelector that + periodically wakes up to accept signals + """ + + class ReplacementPollSelector(selectors.PollSelector): + def select( + self, timeout: Optional[float] = -1 + ) -> List[Tuple[selectors.SelectorKey, int]]: + tident = threading.get_ident() + start_time = time.perf_counter() + if not timeout or timeout < 0: + interval = sys.getswitchinterval() + else: + interval = min(timeout, sys.getswitchinterval()) + while True: + scalene.set_thread_sleeping(tident) + selected = super().select(interval) + scalene.reset_thread_sleeping(tident) + if selected or timeout == 0: + return selected + end_time = time.perf_counter() + if timeout and timeout != -1: + if end_time - start_time >= timeout: + return [] # None + + ReplacementPollSelector.__qualname__ = ( + "replacement_poll_selector.ReplacementPollSelector" + ) + selectors.PollSelector = ReplacementPollSelector # type: ignore diff --git a/tests/simult.py b/tests/simult.py index 61a3792..49d3e4b 100644 --- a/tests/simult.py +++ b/tests/simult.py @@ -1,20 +1,17 @@ import asyncio -import time -async def count(x): - i = 0 - await asyncio.sleep(2) - while i < 100000: - z = x * x - z = z * z - z = z * z - i += 1 - return z +async def count(): + print("before") + await asyncio.sleep(3) + print("after") async def main(): - await asyncio.gather(count(1), count(2), count(3)) + await asyncio.gather(count(), count(), count()) + i = 0 + while i < 3000000: + i += 1 print("done") asyncio.run(main()) -- cgit v1.2.3