From 0c0af7dd5a9d1c30820b52f8209151f0f2d5610d Mon Sep 17 00:00:00 2001 From: bd Date: Sat, 19 Jul 2025 23:22:13 -0600 Subject: Fix a bug where inactive loops would be indexed (read desc) Cleaned up Aergia.walk_back_until_loop to only get running event loops. Previously, this manifested as a bug. The asyncio.Process class would fork off a thread, and the event loop would be found in that thread's local variables, even though that thread was not executing that event loop. This would result in get_event_loops containing duplicate values, and affected event loops would be assigned duplicate time --- aergia/aergia.py | 39 +++++++++++++++++---------------------- t/eager_and_scheduled.py | 3 ++- t/test_functionality.py | 18 +++++++++++++++++- t/utils.py | 7 ++++--- 4 files changed, 40 insertions(+), 27 deletions(-) diff --git a/aergia/aergia.py b/aergia/aergia.py index b7b4f35..ad283e0 100755 --- a/aergia/aergia.py +++ b/aergia/aergia.py @@ -175,35 +175,30 @@ class Aergia(object): @staticmethod def get_event_loops(): - '''Obtains each thread's event loop by relying on the fact that - if an event loop is active, it's own `run_once' and `run_forever' - will appear in the callstack.''' + '''Obtains each thread's event loop.''' loops = [] for t in threading.enumerate(): frame = sys._current_frames().get(t.ident) - if not frame: - continue - loops.extend(Aergia.walk_back_until_loops(frame)) + if frame: + loop = Aergia.walk_back_until_loop(frame) + if loop and loop not in loops: + loops.append(loop) return loops @staticmethod - def walk_back_until_loops(frame): - '''Walks back the callstack until all event loops are found.''' - loops = [] + def walk_back_until_loop(frame): + '''Walks back the callstack until we are in a method named '_run_once'. + If this is ever true, we assume we are in an Asyncio event loop method, + and check to see if the 'self' variable is indeed and instance of + AbstractEventLoop. Return this variable if true.''' while frame: - loop = Aergia.find_loop_in_locals(frame.f_locals) - if loop and loop not in loops: # Avoid duplicates - loops.append(loop) - frame = frame.f_back - return loops - - @staticmethod - def find_loop_in_locals(locals_dict): - '''Given a dictionary of local variables for a stack frame, - retrieves the asyncio loop object, if there is one.''' - for val in locals_dict.values(): - if isinstance(val, asyncio.AbstractEventLoop): - return val + if frame.f_code.co_name == '_run_once' and 'self' in frame.f_locals: + loop = frame.f_locals['self'] + if isinstance(loop, asyncio.AbstractEventLoop): + return loop + else: + # print(frame.f_code.co_name) + frame = frame.f_back return None @staticmethod diff --git a/t/eager_and_scheduled.py b/t/eager_and_scheduled.py index 178a9f4..030b539 100644 --- a/t/eager_and_scheduled.py +++ b/t/eager_and_scheduled.py @@ -8,7 +8,8 @@ async def foo(): async def bar(): - await asyncio.create_subprocess_shell('sleep 1.0') + proc = await asyncio.create_subprocess_shell('sleep 1.0') + await proc.communicate() async def baz(): diff --git a/t/test_functionality.py b/t/test_functionality.py index 5253534..12bf775 100644 --- a/t/test_functionality.py +++ b/t/test_functionality.py @@ -52,6 +52,22 @@ class BasicUsage(utils.AergiaUnitTestCase): samples = self.Aergia.get_samples() - print(self.expected_samples(delay * 3)) self.assertFuncContains('b', [self.expected_samples(delay * 3)], samples) + + # TODO samples from gather all execution time, should we trace this?? + self.assertFuncContains('a', [self.expected_samples(delay)], samples) + + def test_eager_task(self): + delay = 0.2 + + async def a(): + proc = await asyncio.create_subprocess_shell(f'sleep {delay}') + await proc.communicate() + + self.Aergia.start() + asyncio.run(a()) + self.Aergia.stop() + + samples = self.Aergia.get_samples() + self.assertFuncContains('a', [self.expected_samples(delay)], samples) diff --git a/t/utils.py b/t/utils.py index fe9db26..5772fac 100644 --- a/t/utils.py +++ b/t/utils.py @@ -13,16 +13,17 @@ class AergiaUnitTestCase(unittest.TestCase): def assertFuncContains(self, func_name, samples_expected, samples): samples_actual = self.extract_values_by_func(samples, func_name) - self.assertTrue(len(samples_expected) == len(samples_actual)) + self.assertTrue(len(samples_expected) == len(samples_actual), + f'{samples_expected} (expected) not length of ' + f'{samples_actual} (actual)') s1 = sorted(samples_expected) s2 = sorted(samples_actual) for v1, v2 in zip(s1, s2): self.assertRoughlyEqual(v1, v2) def assertRoughlyEqual(self, v1, v2): - print(f'{v1}, {v2}') a = abs(v1 - v2) - self.assertTrue(a <= 1) + self.assertTrue(a <= 1, f'{v1} (expected) not roughly {v2} (actual)') def expected_samples(self, total_seconds): return (total_seconds / self.interval) -- cgit v1.2.3