From 25b62280ae5db0544e20151c5561dbafce9cb3dc Mon Sep 17 00:00:00 2001 From: bd Date: Tue, 5 Aug 2025 22:09:15 -0400 Subject: Fix asyncgenerators, gather --- aergia/aergia.py | 185 ++++++++++++++++++++++++------------------------ t/test_functionality.py | 30 ++++---- 2 files changed, 106 insertions(+), 109 deletions(-) diff --git a/aergia/aergia.py b/aergia/aergia.py index 48efc68..bb8dcbe 100755 --- a/aergia/aergia.py +++ b/aergia/aergia.py @@ -47,13 +47,14 @@ from collections import defaultdict, namedtuple from typing import Optional import argparse import asyncio -import os import signal import sys import threading import time import traceback +import gc +from types import AsyncGeneratorType orig_thread_join = threading.Thread.join @@ -93,8 +94,11 @@ class Aergia(object): # number of times samples have been collected total_samples = 0 # the (ideal) interval between samples - signal_interval = 0.0 + + # the current task for the loop being processed + current_task = None + # if we should profile currently running tasks do_profile_current = False @@ -267,116 +271,116 @@ class Aergia(object): def _get_idle_task_frames(loop): '''Given an asyncio event loop, returns the list of idle task frames. We only care about idle task frames, as running tasks are already - included elsewhere. - - A task is considered 'idle' if it is pending and not the current - task.''' + included elsewhere.''' idle = [] - coros = Aergia._get_traceable_coros(loop) - for coro in coros: - frame = Aergia._get_deepest_traceable_frame(coro) - # if it's a future, - if frame: - idle.append(frame) - # handle async generators - # ideally, we would access these from _get_deepest_traceable_frame - # doing it this way causes us to assign generator time to the - # coroutine that called this generator in _get_deepest_traceable_frame - for ag in loop._asyncgens: - f = getattr(ag, 'ag_frame', None) - if f and Aergia._should_trace(f.f_code.co_filename): - idle.append(f) - return idle - - @staticmethod - def _get_traceable_coros(loop): - '''Given event loop LOOP, returns a list of coroutines corresponding - to each traceable task. + # set this when we start processing a loop. + # it is required later, but I only want to set it once. + Aergia.current_task = asyncio.current_task(loop) - A task is traceable if it is not the current task, if it has actually - started executing, and if a child task did not originate from it.''' - current = asyncio.current_task(loop) - - coros = dict() - p_tasks = set() for task in asyncio.all_tasks(loop): - # the task is not idle - if task == current: + if not Aergia._should_trace_task(task): continue coro = task.get_coro() - # the task hasn't even run yet - # assumes that all started tasks are sitting at an await - # statement. - # if this isn't the case, the associated coroutine will - # be 'waiting' on the coroutine declaration. No! Bad! - if getattr(coro, 'cr_frame', None) is None or \ - getattr(coro, 'cr_await', None) is None: - continue - - frame = getattr(coro, 'cr_frame', None) - - # for methods like gather, tasks create new tasks. keep - # track of these tasks so we don't count them twice. - parent = Aergia._get_parent_task(task) - if parent is not None: - p_tasks.add(parent) - - coros[coro] = (frame.f_code.co_filename, - frame.f_lineno, - frame.f_code.co_name) - - return [ - coro for coro, frame_summary in coros.items() - if frame_summary not in p_tasks - ] - - @staticmethod - def _get_parent_task(task): - '''Given TASK, returns the first frame in the source traceback - which is tracable. This only works if the event loop has debug - mode turned on. - - If this is not the case, returns None.''' - tb = task._source_traceback - - # true when debug mode isn't turned on - if tb is None: - return None - - for frame in tb[::-1]: - if Aergia._should_trace(frame.filename): - return (frame.filename, frame.lineno, frame.name) + frame = Aergia._get_deepest_traceable_frame(coro) + if frame: + idle.append(frame) - return None + return idle @staticmethod def _get_deepest_traceable_frame(coro): '''Get the deepest frame of coro we care to trace. - Luckily for us, coroutines keep track of the coroutine object which - they are waiting on (cr_await). We simply need to trace that down as - far as possible to get the actual line we are waiting on. + This is possible because each corooutine keeps a reference to the + coroutine it is waiting on. + Note that it cannot be the case that a task is suspended in a frame that does not belong to a coroutine, asyncio is very particular about - that! - - If cr_await is None, then the task was created but never started. - (since once a task is started, it either starts waiting on another - coroutine, or finishes immediately. Note when you call task_create, - asyncio schedules it for the next event loop iteration, so the code - does not actually run immediately). These cases were already filtered - out in `_get_idle_task_frames'.''' + that! This is also why we only track idle tasks this way.''' curr = coro deepest_frame = None while curr: frame = getattr(curr, 'cr_frame', None) - if frame and Aergia._should_trace(frame.f_code.co_filename): + + if not frame: + curr = Aergia._search_future(curr) + if isinstance(curr, AsyncGeneratorType): + frame = getattr(curr, 'ag_frame', None) + else: + break + + if Aergia._should_trace(frame.f_code.co_filename): deepest_frame = frame - curr = getattr(curr, 'cr_await', None) + + if isinstance(curr, AsyncGeneratorType): + curr = getattr(curr, 'ag_await', None) + else: + curr = getattr(curr, 'cr_await', None) + + # if this task is found to point to another task we're profiling, + # then we will get the deepest frame later and should return nothing. + if isinstance(curr, list) and any( + Aergia._should_trace_task(task) + for task in curr + ): + return None + return deepest_frame + @staticmethod + def _search_future(future): + '''Given an awaitable which is not a coroutine, assume it is a future + and attempt to find references to tasks or async generators.''' + awaitable = None + if not isinstance(future, asyncio.Future): + # TODO some wrappers like _asyncio.FutureIter, + # async_generator_asend get caught here, I am not sure if a more + # robust approach is necessary + + # can gc be avoided here? + refs = gc.get_referents(future) + if refs: + awaitable = refs[0] + + # this is specific to gathering futures, i.e., gather statement. + # Other cases may need to be added. + if isinstance(awaitable, asyncio.Future): + return getattr(awaitable, '_children', []) + + # if this is not AsyncGeneratorType, it is ignored + return awaitable + + @staticmethod + def _should_trace_task(task): + '''Returns FALSE if TASK is uninteresting to the user. + + A task is interesting if it is not CURRENT_TASK, if it has actually + started executing, and if a child task did not originate from it. + ''' + if not isinstance(task, asyncio.Task): + return False + + # the task is not idle + if task == Aergia.current_task: + return False + + coro = task.get_coro() + + # the task hasn't even run yet + # assumes that all started tasks are sitting at an await + # statement. + # if this isn't the case, the associated coroutine will + # be 'waiting' on the coroutine declaration. No! Bad! + if getattr(coro, 'cr_frame', None) is None or \ + getattr(coro, 'cr_await', None) is None: + return False + + frame = getattr(coro, 'cr_frame', None) + + return Aergia._should_trace(frame.f_code.co_filename) + @staticmethod def _should_trace(filename): '''Returns FALSE if filename is uninteresting to the user. @@ -438,7 +442,6 @@ if __name__ == "__main__": args = parser.parse_args() sys.argv = [args.script] + args.s_args - os.environ["PYTHONASYNCIODEBUG"] = "1" try: with open(args.script, 'r', encoding='utf-8') as fp: code = compile(fp.read(), args.script, "exec") diff --git a/t/test_functionality.py b/t/test_functionality.py index d6bf60c..f259a26 100644 --- a/t/test_functionality.py +++ b/t/test_functionality.py @@ -68,14 +68,12 @@ class BasicUsage(utils.AergiaUnitTestCase): aergia_samples = self.Aergia.get_samples() self.assert_reasonable_delay('b', delay * 3, aergia_samples) - # the gather function is technically waiting for all tasks to finish. - # This might be seen as unintuitive though I don't want to bias the - # results by adding logic to add artificial consistency. - # profiling does not mean obscuring implementation details. - self.assert_reasonable_delay('a', delay, aergia_samples) + self.assert_reasonable_delay('a', 0, aergia_samples) self.assert_similar_delay('b', yappi_samples, aergia_samples) - self.assert_similar_delay('a', yappi_samples, aergia_samples) + # Aergia only assigns time to the current line when the task suspends + # This should fail. + # self.assert_similar_delay('a', yappi_samples, aergia_samples) def test_subthread_task(self): delay = 0.2 @@ -97,12 +95,13 @@ class BasicUsage(utils.AergiaUnitTestCase): self.assert_reasonable_delay('c', delay, aergia_samples) self.assert_reasonable_delay('b', delay * 3, aergia_samples) - # see comment on `test_simultaneous_tasks'. - self.assert_reasonable_delay('a', delay, aergia_samples) + self.assert_reasonable_delay('a', 0, aergia_samples) self.assert_similar_delay('c', yappi_samples, aergia_samples) self.assert_similar_delay('b', yappi_samples, aergia_samples) - self.assert_similar_delay('a', yappi_samples, aergia_samples) + # Aergia only assigns time to the current line when the task suspends + # This should fail. + # self.assert_similar_delay('a', yappi_samples, aergia_samples) def test_eager_task(self): delay = 0.2 @@ -172,15 +171,10 @@ class BasicUsage(utils.AergiaUnitTestCase): yappi_samples = yappi.get_func_stats() aergia_samples = self.Aergia.get_samples() - # TODO There are some minor shortcomings with the way Aergia handles - # async_generators. Lines which call the async generator will always - # be assigned the async generator's execution time as well. yappi - # also seems inconsistent. - self.assert_reasonable_delay('b', delay * num_times, aergia_samples) - self.assert_reasonable_delay('a', delay * num_times, aergia_samples) + self.assert_reasonable_delay('a', 0, aergia_samples) - self.assert_similar_delay('a', yappi_samples, aergia_samples) + # self.assert_similar_delay('a', yappi_samples, aergia_samples) self.assert_similar_delay('b', yappi_samples, aergia_samples) def test_async_gen_and_comp(self): @@ -206,13 +200,13 @@ class BasicUsage(utils.AergiaUnitTestCase): self.assert_reasonable_delay('b', delay * num_times, aergia_samples) self.assert_reasonable_delay('a', 0, aergia_samples) - self.assert_reasonable_delay('', delay * num_times, aergia_samples) + self.assert_reasonable_delay('', 0, aergia_samples) self.assert_similar_delay('b', yappi_samples, aergia_samples) # Aergia only assigns time to the current line when the task suspends # This should fail. # self.assert_similar_delay('a', yappi_samples, aergia_samples) - self.assert_similar_delay('', yappi_samples, aergia_samples) + # self.assert_similar_delay('', yappi_samples, aergia_samples) def test_deep_await(self): delay = 0.2 -- cgit v1.2.3