summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rwxr-xr-xaergia114
1 files changed, 37 insertions, 77 deletions
diff --git a/aergia b/aergia
index 9fd19ef..a698152 100755
--- a/aergia
+++ b/aergia
@@ -42,8 +42,7 @@ Code:
'''
from collections import defaultdict
-from types import FrameType
-from typing import cast, List, Optional, Tuple
+from typing import List, Optional, Tuple
import argparse
import asyncio
import selectors
@@ -80,32 +79,6 @@ def thread_join_replacement(
threading.Thread.join = thread_join_replacement
-class ReplacementEpollSelector(selectors.EpollSelector):
- '''
- Provides a replacement for selectors.PollSelector that
- periodically wakes up to accept signals.
- '''
-
- def select(
- self, timeout=None
- ) -> List[Tuple[selectors.SelectorKey, int]]:
- start_time = time.perf_counter()
- if not timeout or timeout < 0:
- interval = sys.getswitchinterval()
- else:
- interval = min(timeout, sys.getswitchinterval())
- while True:
- selected = super().select(interval)
- if selected or timeout == 0 or not timeout:
- return selected
- end_time = time.perf_counter()
- if end_time - start_time >= timeout:
- return []
-
-
-selectors.DefaultSelector = ReplacementEpollSelector
-
-
class Aergia(object):
# a key-value pair where keys represent frame metadata (see
@@ -118,16 +91,16 @@ class Aergia(object):
# the timestamp recorded last signal
last_signal_time = 0.0
- signal_interval = 0
+ signal_interval = 0.0
def __init__(self, signal_interval):
Aergia.signal_interval = signal_interval
@staticmethod
def start():
- signal.signal(signal.SIGPROF,
+ signal.signal(signal.SIGALRM,
Aergia.cpu_signal_handler)
- signal.setitimer(signal.ITIMER_PROF,
+ signal.setitimer(signal.ITIMER_REAL,
Aergia.signal_interval,
Aergia.signal_interval)
Aergia.last_signal_time = Aergia.gettime()
@@ -159,8 +132,7 @@ class Aergia(object):
@staticmethod
def disable_signals():
- signal.signal(signal.ITIMER_PROF, signal.SIG_IGN)
- signal.setitimer(signal.ITIMER_PROF, 0)
+ signal.setitimer(signal.ITIMER_REAL, 0)
@staticmethod
def cpu_signal_handler(sig, frame):
@@ -173,73 +145,56 @@ class Aergia(object):
keys = Aergia.compute_frames_to_record()
for key in keys:
Aergia.samples[Aergia.frame_to_string(key)][0] += 1
- Aergia.samples[Aergia.frame_to_string(key)][1] += c_time_norm
+ # Aergia.samples[Aergia.frame_to_string(key)][1] += c_time_norm
Aergia.total_samples += elapsed_since_last_signal / \
sig_intv
Aergia.last_signal_time = Aergia.gettime()
@staticmethod
def compute_frames_to_record():
- '''Collects all stack frames that Aergia actually processes.'''
- frames = Aergia.get_frames_from_loops(Aergia.get_event_loops())
+ '''Collects all stack frames which are currently being awaited on
+ during a given timestamp, and
- # Process all the frames to remove ones we aren't going to track.
- new_frames = []
- for frame in frames:
- if frame is None:
- continue
- fname = frame.f_code.co_filename
- # Record samples only for files we care about.
- if (len(fname)) == 0:
- # 'eval/compile' gives no f_code.co_filename. We have
- # to look back into the outer frame in order to check
- # the co_filename.
- fname = frame.f_back.f_code.co_filename
- while not Aergia.should_trace(fname):
- # Walk the stack backwards until we hit a frame that
- # IS one we should trace (if there is one). i.e., if
- # it's in the code being profiled, and it is just
- # calling stuff deep in libraries.
- if frame:
- frame = cast(FrameType, frame.f_back)
- else:
- break
- if frame:
- fname = frame.f_code.co_filename
- if frame:
- new_frames.append(frame)
- return new_frames
+ Note that we do NOT need to walk back up the call-stack to find
+ which of the user's lines caused the await call. There is NEVER
+ a previous frame, because idle frames aren't on the call stack!
+
+ Luckily, the event loop and asyncio.all_tasks keeps track of
+ what is running for us.'''
+ frames = Aergia.get_frames_from_loops(Aergia.get_event_loops())
+ return [
+ f for f in frames
+ if f is not None and Aergia.should_trace(f.f_code.co_filename)
+ ]
@staticmethod
def get_event_loops():
+ '''Obtains each thread's event loop by relying on the fact that
+ if an event loop is active, it's own `run_once' and `run_forever'
+ will appear in the callstack.'''
loops = []
for t in threading.enumerate():
frame = sys._current_frames().get(t.ident)
if not frame:
continue
- # print(f'searching frame {frame}')
- loop = Aergia.walk_back_until_loop(frame)
- if loop:
- # print(f'found loop {loop}')
- loops.append(loop)
+ loops.extend(Aergia.walk_back_until_loops(frame))
return loops
@staticmethod
- def walk_back_until_loop(frame):
+ def walk_back_until_loops(frame):
+ '''Walks back the callstack until all event loops are found.'''
+ loops = []
while frame:
loop = Aergia.find_loop_in_locals(frame.f_locals)
- if loop:
- return loop
+ if loop and loop not in loops: # Avoid duplicates
+ loops.append(loop)
frame = frame.f_back
- return None
+ return loops
@staticmethod
def find_loop_in_locals(locals_dict):
'''Given a dictionary of local variables for a stack frame,
- retrieves the asyncio loop object, if there is one.
-
- This function should work on Windows, but this is dubious because
- I don't have a system to test with.'''
+ retrieves the asyncio loop object, if there is one.'''
for val in locals_dict.values():
if isinstance(val, asyncio.AbstractEventLoop):
return val
@@ -299,12 +254,16 @@ class Aergia(object):
@staticmethod
def sum_sample(sample):
+ '''Sums the total samples taken from a line.
+ The indices represent both observed and calculated
+ samples. This is equivalent to native/python time
+ in SCALENE.'''
return sample[0] + sample[1]
@staticmethod
def gettime():
'''returns the wallclock time'''
- return time.time()
+ return time.process_time()
the_globals = {
@@ -330,6 +289,7 @@ def parse_arguments():
help='The minimum amount of time inbetween \
samples in seconds.',
metavar='',
+ type=float,
default=0.01)
parser.add_argument('script', help='A python script to run.')
parser.add_argument('s_args', nargs=argparse.REMAINDER,
@@ -343,7 +303,7 @@ def main():
sys.argv = [args.script] + args.s_args
try:
- with open(args.script, 'rb') as fp:
+ with open(args.script, 'r', encoding='utf-8') as fp:
code = compile(fp.read(), args.script, "exec")
Aergia(args.interval).start()
exec(code, the_globals)