summaryrefslogtreecommitdiff
path: root/aergia/aergia.py
diff options
context:
space:
mode:
Diffstat (limited to 'aergia/aergia.py')
-rwxr-xr-xaergia/aergia.py185
1 files changed, 94 insertions, 91 deletions
diff --git a/aergia/aergia.py b/aergia/aergia.py
index 48efc68..bb8dcbe 100755
--- a/aergia/aergia.py
+++ b/aergia/aergia.py
@@ -47,13 +47,14 @@ from collections import defaultdict, namedtuple
from typing import Optional
import argparse
import asyncio
-import os
import signal
import sys
import threading
import time
import traceback
+import gc
+from types import AsyncGeneratorType
orig_thread_join = threading.Thread.join
@@ -93,8 +94,11 @@ class Aergia(object):
# number of times samples have been collected
total_samples = 0
# the (ideal) interval between samples
-
signal_interval = 0.0
+
+ # the current task for the loop being processed
+ current_task = None
+
# if we should profile currently running tasks
do_profile_current = False
@@ -267,117 +271,117 @@ class Aergia(object):
def _get_idle_task_frames(loop):
'''Given an asyncio event loop, returns the list of idle task frames.
We only care about idle task frames, as running tasks are already
- included elsewhere.
-
- A task is considered 'idle' if it is pending and not the current
- task.'''
+ included elsewhere.'''
idle = []
- coros = Aergia._get_traceable_coros(loop)
- for coro in coros:
- frame = Aergia._get_deepest_traceable_frame(coro)
- # if it's a future,
- if frame:
- idle.append(frame)
- # handle async generators
- # ideally, we would access these from _get_deepest_traceable_frame
- # doing it this way causes us to assign generator time to the
- # coroutine that called this generator in _get_deepest_traceable_frame
- for ag in loop._asyncgens:
- f = getattr(ag, 'ag_frame', None)
- if f and Aergia._should_trace(f.f_code.co_filename):
- idle.append(f)
- return idle
-
- @staticmethod
- def _get_traceable_coros(loop):
- '''Given event loop LOOP, returns a list of coroutines corresponding
- to each traceable task.
+ # set this when we start processing a loop.
+ # it is required later, but I only want to set it once.
+ Aergia.current_task = asyncio.current_task(loop)
- A task is traceable if it is not the current task, if it has actually
- started executing, and if a child task did not originate from it.'''
- current = asyncio.current_task(loop)
-
- coros = dict()
- p_tasks = set()
for task in asyncio.all_tasks(loop):
- # the task is not idle
- if task == current:
+ if not Aergia._should_trace_task(task):
continue
coro = task.get_coro()
- # the task hasn't even run yet
- # assumes that all started tasks are sitting at an await
- # statement.
- # if this isn't the case, the associated coroutine will
- # be 'waiting' on the coroutine declaration. No! Bad!
- if getattr(coro, 'cr_frame', None) is None or \
- getattr(coro, 'cr_await', None) is None:
- continue
-
- frame = getattr(coro, 'cr_frame', None)
-
- # for methods like gather, tasks create new tasks. keep
- # track of these tasks so we don't count them twice.
- parent = Aergia._get_parent_task(task)
- if parent is not None:
- p_tasks.add(parent)
-
- coros[coro] = (frame.f_code.co_filename,
- frame.f_lineno,
- frame.f_code.co_name)
-
- return [
- coro for coro, frame_summary in coros.items()
- if frame_summary not in p_tasks
- ]
-
- @staticmethod
- def _get_parent_task(task):
- '''Given TASK, returns the first frame in the source traceback
- which is tracable. This only works if the event loop has debug
- mode turned on.
-
- If this is not the case, returns None.'''
- tb = task._source_traceback
-
- # true when debug mode isn't turned on
- if tb is None:
- return None
-
- for frame in tb[::-1]:
- if Aergia._should_trace(frame.filename):
- return (frame.filename, frame.lineno, frame.name)
+ frame = Aergia._get_deepest_traceable_frame(coro)
+ if frame:
+ idle.append(frame)
- return None
+ return idle
@staticmethod
def _get_deepest_traceable_frame(coro):
'''Get the deepest frame of coro we care to trace.
- Luckily for us, coroutines keep track of the coroutine object which
- they are waiting on (cr_await). We simply need to trace that down as
- far as possible to get the actual line we are waiting on.
+ This is possible because each corooutine keeps a reference to the
+ coroutine it is waiting on.
+
Note that it cannot be the case that a task is suspended in a frame
that does not belong to a coroutine, asyncio is very particular about
- that!
-
- If cr_await is None, then the task was created but never started.
- (since once a task is started, it either starts waiting on another
- coroutine, or finishes immediately. Note when you call task_create,
- asyncio schedules it for the next event loop iteration, so the code
- does not actually run immediately). These cases were already filtered
- out in `_get_idle_task_frames'.'''
+ that! This is also why we only track idle tasks this way.'''
curr = coro
deepest_frame = None
while curr:
frame = getattr(curr, 'cr_frame', None)
- if frame and Aergia._should_trace(frame.f_code.co_filename):
+
+ if not frame:
+ curr = Aergia._search_future(curr)
+ if isinstance(curr, AsyncGeneratorType):
+ frame = getattr(curr, 'ag_frame', None)
+ else:
+ break
+
+ if Aergia._should_trace(frame.f_code.co_filename):
deepest_frame = frame
- curr = getattr(curr, 'cr_await', None)
+
+ if isinstance(curr, AsyncGeneratorType):
+ curr = getattr(curr, 'ag_await', None)
+ else:
+ curr = getattr(curr, 'cr_await', None)
+
+ # if this task is found to point to another task we're profiling,
+ # then we will get the deepest frame later and should return nothing.
+ if isinstance(curr, list) and any(
+ Aergia._should_trace_task(task)
+ for task in curr
+ ):
+ return None
+
return deepest_frame
@staticmethod
+ def _search_future(future):
+ '''Given an awaitable which is not a coroutine, assume it is a future
+ and attempt to find references to tasks or async generators.'''
+ awaitable = None
+ if not isinstance(future, asyncio.Future):
+ # TODO some wrappers like _asyncio.FutureIter,
+ # async_generator_asend get caught here, I am not sure if a more
+ # robust approach is necessary
+
+ # can gc be avoided here?
+ refs = gc.get_referents(future)
+ if refs:
+ awaitable = refs[0]
+
+ # this is specific to gathering futures, i.e., gather statement.
+ # Other cases may need to be added.
+ if isinstance(awaitable, asyncio.Future):
+ return getattr(awaitable, '_children', [])
+
+ # if this is not AsyncGeneratorType, it is ignored
+ return awaitable
+
+ @staticmethod
+ def _should_trace_task(task):
+ '''Returns FALSE if TASK is uninteresting to the user.
+
+ A task is interesting if it is not CURRENT_TASK, if it has actually
+ started executing, and if a child task did not originate from it.
+ '''
+ if not isinstance(task, asyncio.Task):
+ return False
+
+ # the task is not idle
+ if task == Aergia.current_task:
+ return False
+
+ coro = task.get_coro()
+
+ # the task hasn't even run yet
+ # assumes that all started tasks are sitting at an await
+ # statement.
+ # if this isn't the case, the associated coroutine will
+ # be 'waiting' on the coroutine declaration. No! Bad!
+ if getattr(coro, 'cr_frame', None) is None or \
+ getattr(coro, 'cr_await', None) is None:
+ return False
+
+ frame = getattr(coro, 'cr_frame', None)
+
+ return Aergia._should_trace(frame.f_code.co_filename)
+
+ @staticmethod
def _should_trace(filename):
'''Returns FALSE if filename is uninteresting to the user.
Don't depend on this. It's good enough for testing.'''
@@ -438,7 +442,6 @@ if __name__ == "__main__":
args = parser.parse_args()
sys.argv = [args.script] + args.s_args
- os.environ["PYTHONASYNCIODEBUG"] = "1"
try:
with open(args.script, 'r', encoding='utf-8') as fp:
code = compile(fp.read(), args.script, "exec")