summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorbd <bdunahu@operationnull.com>2025-06-14 14:55:27 -0400
committerbd <bdunahu@operationnull.com>2025-06-14 14:55:27 -0400
commitc21e4366fb64130d168150f0ed94293e699eb525 (patch)
tree90b3ae14e89c950580dfb5802d930bd045fb43c4
parentf4b54b5a2e99be859558331186f725fbfa224594 (diff)
Begin work of monkey-patching EPollSelector
-rw-r--r--__init__.py0
-rw-r--r--__pycache__/mini_scalene.cpython-311.pycbin0 -> 13865 bytes
-rw-r--r--__pycache__/replacement_poll_selector.cpython-311.pycbin0 -> 2546 bytes
-rw-r--r--__pycache__/sitecustomize.cpython-311.pycbin0 -> 417 bytes
-rw-r--r--mini_scalene.py (renamed from mini-scalene.py)86
-rw-r--r--replacement_poll_selector.py41
-rw-r--r--tests/simult.py19
7 files changed, 104 insertions, 42 deletions
diff --git a/__init__.py b/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/__init__.py
diff --git a/__pycache__/mini_scalene.cpython-311.pyc b/__pycache__/mini_scalene.cpython-311.pyc
new file mode 100644
index 0000000..24b7268
--- /dev/null
+++ b/__pycache__/mini_scalene.cpython-311.pyc
Binary files differ
diff --git a/__pycache__/replacement_poll_selector.cpython-311.pyc b/__pycache__/replacement_poll_selector.cpython-311.pyc
new file mode 100644
index 0000000..b346241
--- /dev/null
+++ b/__pycache__/replacement_poll_selector.cpython-311.pyc
Binary files differ
diff --git a/__pycache__/sitecustomize.cpython-311.pyc b/__pycache__/sitecustomize.cpython-311.pyc
new file mode 100644
index 0000000..4f5ff5a
--- /dev/null
+++ b/__pycache__/sitecustomize.cpython-311.pyc
Binary files differ
diff --git a/mini-scalene.py b/mini_scalene.py
index e957bb0..569d7c0 100644
--- a/mini-scalene.py
+++ b/mini_scalene.py
@@ -1,13 +1,16 @@
import sys
import argparse
-import os
import threading
import traceback
import atexit
import signal
import asyncio
import time
-from typing import cast
+from typing import (
+ Any,
+ Callable,
+ cast,
+)
from types import FrameType
from collections import defaultdict
@@ -39,12 +42,12 @@ def parse_arguments():
return parser.parse_args()
-class mini_scalene(object):
+class MiniScalene(object):
'''A stripped-down version of SCALENE which tallies active lines during
execution.'''
# a key-value pair where keys represent frame metadata (see
- # mini_scalene.frame_to_string) and values represent number of times
+ # MiniScalene.frame_to_string) and values represent number of times
# sampled.
cpu_samples = defaultdict(lambda: 0)
cpu_samples_c = defaultdict(lambda: 0)
@@ -61,12 +64,13 @@ class mini_scalene(object):
profile_async = True
def __init__(self):
+ import replacement_poll_selector
signal.signal(signal.SIGPROF,
self.cpu_signal_handler)
signal.setitimer(signal.ITIMER_PROF,
self.signal_interval,
self.signal_interval)
- mini_scalene.last_signal_time = mini_scalene.gettime()
+ MiniScalene.last_signal_time = MiniScalene.gettime()
@staticmethod
def gettime():
@@ -75,25 +79,25 @@ class mini_scalene(object):
@staticmethod
def start(profile_async):
- mini_scalene.profile_async = profile_async
- atexit.register(mini_scalene.exit_handler)
+ MiniScalene.profile_async = profile_async
+ atexit.register(MiniScalene.exit_handler)
@staticmethod
def exit_handler():
'''Turn off profiling signals & pretty-print profiling information.'''
- mini_scalene.disable_signals()
+ MiniScalene.disable_signals()
# If we've collected any samples, dump them.
print("CPU usage (Python):")
- if mini_scalene.total_cpu_samples > 0:
- for key in mini_scalene.sort_samples(mini_scalene.cpu_samples):
+ if MiniScalene.total_cpu_samples > 0:
+ for key in MiniScalene.sort_samples(MiniScalene.cpu_samples):
print(f"{key} : "
- f"{mini_scalene.cpu_samples[key] * 100 / mini_scalene.total_cpu_samples:.3f} % "
- f"({mini_scalene.cpu_samples[key]:.1f} total samples)")
+ f"{MiniScalene.cpu_samples[key] * 100 / MiniScalene.total_cpu_samples:.3f} % "
+ f"({MiniScalene.cpu_samples[key]:.1f} total samples)")
print("CPU usage (Native):")
- for key in mini_scalene.sort_samples(mini_scalene.cpu_samples_c):
+ for key in MiniScalene.sort_samples(MiniScalene.cpu_samples_c):
print(f"{key} : "
- f"{mini_scalene.cpu_samples_c[key] * 100 / mini_scalene.total_cpu_samples:.3f} % "
- f"({mini_scalene.cpu_samples_c[key]:.1f} total samples)")
+ f"{MiniScalene.cpu_samples_c[key] * 100 / MiniScalene.total_cpu_samples:.3f} % "
+ f"({MiniScalene.cpu_samples_c[key]:.1f} total samples)")
else:
print("(did not run long enough to profile)")
@@ -105,20 +109,20 @@ class mini_scalene(object):
@staticmethod
def cpu_signal_handler(sig, frame):
- elapsed_since_last_signal = mini_scalene.gettime() - \
- mini_scalene.last_signal_time
+ elapsed_since_last_signal = MiniScalene.gettime() - \
+ MiniScalene.last_signal_time
c_time_norm = (elapsed_since_last_signal -
- mini_scalene.signal_interval) / \
- mini_scalene.signal_interval
+ MiniScalene.signal_interval) / \
+ MiniScalene.signal_interval
- keys = mini_scalene.compute_frames_to_record(frame)
+ keys = MiniScalene.compute_frames_to_record(frame)
for key in keys:
- mini_scalene.cpu_samples[mini_scalene.frame_to_string(key)] += 1
- mini_scalene.cpu_samples_c[mini_scalene.frame_to_string(
+ MiniScalene.cpu_samples[MiniScalene.frame_to_string(key)] += 1
+ MiniScalene.cpu_samples_c[MiniScalene.frame_to_string(
key)] += c_time_norm
- mini_scalene.total_cpu_samples += elapsed_since_last_signal / \
- mini_scalene.signal_interval
- mini_scalene.last_signal_time = mini_scalene.gettime()
+ MiniScalene.total_cpu_samples += elapsed_since_last_signal / \
+ MiniScalene.signal_interval
+ MiniScalene.last_signal_time = MiniScalene.gettime()
@staticmethod
def compute_frames_to_record(this_frame):
@@ -126,9 +130,9 @@ class mini_scalene(object):
frames = [this_frame]
frames += [sys._current_frames().get(t.ident, None)
for t in threading.enumerate()]
- frames += mini_scalene.get_async_frames()
+ frames += MiniScalene.get_async_frames()
- frames = mini_scalene.filter_duplicated_frames(frames)
+ frames = MiniScalene.filter_duplicated_frames(frames)
# Process all the frames to remove ones we aren't going to track.
new_frames = []
for frame in frames:
@@ -141,7 +145,7 @@ class mini_scalene(object):
# to look back into the outer frame in order to check
# the co_filename.
fname = frame.f_back.f_code.co_filename
- while not mini_scalene.should_trace(fname):
+ while not MiniScalene.should_trace(fname):
# Walk the stack backwards until we hit a frame that
# IS one we should trace (if there is one). i.e., if
# it's in the code being profiled, and it is just
@@ -169,7 +173,7 @@ class mini_scalene(object):
@staticmethod
def get_async_frames():
'''Obtains the stack frames of all currently executing tasks.'''
- if mini_scalene.is_event_loop_running() and mini_scalene.profile_async:
+ if MiniScalene.is_event_loop_running() and MiniScalene.profile_async:
return [task.get_coro().cr_frame for task in asyncio.all_tasks()]
return []
@@ -181,7 +185,7 @@ class mini_scalene(object):
return False
if filename[0] == '<':
return False
- if 'mini-scalene.py' in filename:
+ if 'mini_scalene.py' in filename:
return False
return True
@@ -199,6 +203,26 @@ class mini_scalene(object):
reverse=True)}
@staticmethod
+ def shim(func: Callable[[Any], Any]) -> Any:
+ """Provide a decorator that calls the wrapped function with the
+ Scalene variant.
+
+ Wrapped function must be of type (s: Scalene) -> Any.
+
+ This decorator allows for marking a function in a separate
+ file as a drop-in replacement for an existing library
+ function. The intention is for these functions to replace a
+ function that indefinitely blocks (which interferes with
+ Scalene) with a function that awakens periodically to allow
+ for signals to be delivered.
+
+ """
+ func(MiniScalene)
+ # Returns the function itself to the calling file for the sake
+ # of not displaying unusual errors if someone attempts to call
+ # it
+
+ @staticmethod
def filter_duplicated_frames(frames) -> bool:
s = set()
dup = []
@@ -224,7 +248,7 @@ def main():
try:
with open(args.script, 'rb') as fp:
code = compile(fp.read(), args.script, "exec")
- mini_scalene().start(args.async_off)
+ MiniScalene().start(args.async_off)
exec(code, the_globals)
except Exception:
traceback.print_exc()
diff --git a/replacement_poll_selector.py b/replacement_poll_selector.py
new file mode 100644
index 0000000..0813a66
--- /dev/null
+++ b/replacement_poll_selector.py
@@ -0,0 +1,41 @@
+import selectors
+import sys
+import threading
+import time
+from typing import List, Optional, Tuple
+
+from mini_scalene import MiniScalene
+
+
+@MiniScalene.shim
+def replacement_poll_selector(mini_scalene: MiniScalene) -> None:
+ """
+ A replacement for selectors.PollSelector that
+ periodically wakes up to accept signals
+ """
+
+ class ReplacementPollSelector(selectors.PollSelector):
+ def select(
+ self, timeout: Optional[float] = -1
+ ) -> List[Tuple[selectors.SelectorKey, int]]:
+ tident = threading.get_ident()
+ start_time = time.perf_counter()
+ if not timeout or timeout < 0:
+ interval = sys.getswitchinterval()
+ else:
+ interval = min(timeout, sys.getswitchinterval())
+ while True:
+ scalene.set_thread_sleeping(tident)
+ selected = super().select(interval)
+ scalene.reset_thread_sleeping(tident)
+ if selected or timeout == 0:
+ return selected
+ end_time = time.perf_counter()
+ if timeout and timeout != -1:
+ if end_time - start_time >= timeout:
+ return [] # None
+
+ ReplacementPollSelector.__qualname__ = (
+ "replacement_poll_selector.ReplacementPollSelector"
+ )
+ selectors.PollSelector = ReplacementPollSelector # type: ignore
diff --git a/tests/simult.py b/tests/simult.py
index 61a3792..49d3e4b 100644
--- a/tests/simult.py
+++ b/tests/simult.py
@@ -1,20 +1,17 @@
import asyncio
-import time
-async def count(x):
- i = 0
- await asyncio.sleep(2)
- while i < 100000:
- z = x * x
- z = z * z
- z = z * z
- i += 1
- return z
+async def count():
+ print("before")
+ await asyncio.sleep(3)
+ print("after")
async def main():
- await asyncio.gather(count(1), count(2), count(3))
+ await asyncio.gather(count(), count(), count())
+ i = 0
+ while i < 3000000:
+ i += 1
print("done")
asyncio.run(main())