import yappi import utils import asyncio import threading # A test file containing basic async functionality tests. Compares the results # of Aergia to both a calculated expected time and yappi reported time, done # simply to draw comparisons between the two approaches. class TerminateTaskGroup(Exception): '''Exception raised to terminate a task group.''' class BasicUsage(utils.AergiaUnitTestCase): def test_asyncless(self): def a(): x = 100 while x > 0: x -= 1 self.Aergia.start() a() self.Aergia.stop() samples = self.Aergia.get_samples() self.assertFalse(samples) def test_sequential_tasks(self): delay = 0.2 num_times = 5 async def b(tot, num): await asyncio.sleep(delay) return tot + num async def a(): tot = 0 for i in range(num_times): tot = await b(tot, i) assert tot == 10 yappi.start() self.Aergia.start() asyncio.run(a()) self.Aergia.stop() yappi.stop() yappi_samples = yappi.get_func_stats() aergia_samples = self.Aergia.get_samples() self.assert_reasonable_delay('b', delay * num_times, aergia_samples) self.assert_similar_delay('b', yappi_samples, aergia_samples) def test_simultaneous_tasks(self): delay = 0.2 async def b(): await asyncio.sleep(delay) async def a(): await asyncio.gather(b(), b(), b()) yappi.start() self.Aergia.start() asyncio.run(a()) self.Aergia.stop() yappi.stop() yappi_samples = yappi.get_func_stats() aergia_samples = self.Aergia.get_samples() self.assert_reasonable_delay('b', delay * 3, aergia_samples) # the gather function is technically waiting for all tasks to finish. # This might be seen as unintuitive though I don't want to bias the # results by adding logic to add artificial consistency. # profiling does not mean obscuring implementation details. self.assert_reasonable_delay('a', delay, aergia_samples) self.assert_similar_delay('b', yappi_samples, aergia_samples) self.assert_similar_delay('a', yappi_samples, aergia_samples) def test_subthread_task(self): delay = 0.2 async def b(): await asyncio.sleep(delay) async def a(): await asyncio.gather(b(), b(), b()) def c(): asyncio.run(a()) yappi.start() self.Aergia.start() x = threading.Thread(target=c) x.start() x.join() self.Aergia.stop() yappi.stop() yappi_samples = yappi.get_func_stats(ctx_id=0) aergia_samples = self.Aergia.get_samples() self.assert_reasonable_delay('c', 0, aergia_samples) self.assert_reasonable_delay('b', delay * 3, aergia_samples) # see comment on `test_simultaneous_tasks'. self.assert_reasonable_delay('a', delay, aergia_samples) # Aergia does not assign time to the current task. # Statistically, this means the select function, which traces # up to the event loop (function c). # Therefore this test would fail. # self.assert_similar_delay('c', yappi_samples, aergia_samples) self.assert_similar_delay('b', yappi_samples, aergia_samples) self.assert_similar_delay('a', yappi_samples, aergia_samples) def test_eager_task(self): delay = 0.2 async def a(): proc = await asyncio.create_subprocess_shell(f'sleep {delay}') await proc.communicate() yappi.start() self.Aergia.start() asyncio.run(a()) self.Aergia.stop() yappi.stop() yappi_samples = yappi.get_func_stats(ctx_id=0) aergia_samples = self.Aergia.get_samples() self.assert_reasonable_delay('a', delay, aergia_samples) self.assert_similar_delay('a', yappi_samples, aergia_samples) def test_async_eager_scheduled_woven(self): d1 = 0.2 d2 = 0.3 d3 = 0.2 d4 = 0.1 async def a(): await asyncio.sleep(d1) proc = await asyncio.create_subprocess_shell(f'sleep {d2}') await proc.communicate() await asyncio.sleep(d3) proc = await asyncio.create_subprocess_shell(f'sleep {d4}') await proc.communicate() yappi.start() self.Aergia.start() asyncio.run(a()) self.Aergia.stop() yappi.stop() yappi_samples = yappi.get_func_stats(ctx_id=0) aergia_samples = self.Aergia.get_samples() self.assert_reasonable_delay('a', d1 + d2 + d3 + d4, aergia_samples) self.assert_similar_delay('a', yappi_samples, aergia_samples) def test_async_generator(self): delay = 0.2 num_times = 10 async def b(): for i in range(num_times): await asyncio.sleep(delay) yield i async def a(): lst = [] async for item in b(): lst.append(item) yappi.start() self.Aergia.start() asyncio.run(a()) self.Aergia.stop() yappi.stop() yappi_samples = yappi.get_func_stats() aergia_samples = self.Aergia.get_samples() # TODO There are some minor shortcomings with the way Aergia handles # async_generators. Lines which call the async generator will always # be assigned the async generator's execution time as well. yappi # also seems inconsistent. self.assert_reasonable_delay('b', delay * num_times, aergia_samples) self.assert_reasonable_delay('a', delay * num_times, aergia_samples) self.assert_similar_delay('a', yappi_samples, aergia_samples) self.assert_similar_delay('b', yappi_samples, aergia_samples) def test_async_gen_and_comp(self): delay = 0.2 num_times = 10 async def b(): for i in range(num_times): await asyncio.sleep(delay) yield i async def a(): return [r async for r in b()] yappi.start() self.Aergia.start() asyncio.run(a()) self.Aergia.stop() yappi.stop() yappi_samples = yappi.get_func_stats() aergia_samples = self.Aergia.get_samples() self.assert_reasonable_delay('b', delay * num_times, aergia_samples) self.assert_reasonable_delay('a', 0, aergia_samples) self.assert_reasonable_delay('', delay * num_times, aergia_samples) self.assert_similar_delay('b', yappi_samples, aergia_samples) # Aergia only assigns time to the current line when the task suspends # This should fail. # self.assert_similar_delay('a', yappi_samples, aergia_samples) self.assert_similar_delay('', yappi_samples, aergia_samples) def test_deep_await(self): delay = 0.2 async def c(): await asyncio.sleep(delay) async def b(): await c() async def a(): await b() yappi.start() self.Aergia.start() asyncio.run(a()) self.Aergia.stop() yappi.stop() yappi_samples = yappi.get_func_stats() aergia_samples = self.Aergia.get_samples() self.assert_reasonable_delay('c', delay, aergia_samples) self.assert_reasonable_delay('b', 0, aergia_samples) self.assert_reasonable_delay('a', 0, aergia_samples) self.assert_similar_delay('c', yappi_samples, aergia_samples) # Aergia does not assign time to the current task. # These should fail. # self.assert_similar_delay('b', yappi_samples, aergia_samples) # self.assert_similar_delay('a', yappi_samples, aergia_samples) def test_task_groups(self): d1 = 0.2 d2 = 0.3 d3 = 0.1 async def d(): await asyncio.sleep(d1) async def c(): await asyncio.sleep(d2) async def b(): await asyncio.sleep(d3) async def a(): async with asyncio.TaskGroup() as tg: tg.create_task(d()) tg.create_task(c()) tg.create_task(b()) yappi.start() self.Aergia.start() asyncio.run(a()) self.Aergia.stop() yappi.stop() yappi_samples = yappi.get_func_stats() aergia_samples = self.Aergia.get_samples() self.assert_reasonable_delay('d', d1, aergia_samples) self.assert_reasonable_delay('c', d2, aergia_samples) self.assert_reasonable_delay('b', d3, aergia_samples) # the task group is technically waiting for all tasks to finish. # same as task.gather # This might be seen as unintuitive, (especially considering how # the next test works), though I don't want to bias the results # by adding logic to add artificial consistency. # Both are reporting correctly. self.assert_reasonable_delay('a', d2, aergia_samples) self.assert_similar_delay('d', yappi_samples, aergia_samples) self.assert_similar_delay('c', yappi_samples, aergia_samples) self.assert_similar_delay('b', yappi_samples, aergia_samples) self.assert_similar_delay('a', yappi_samples, aergia_samples) def test_task_groups_cancel(self): d1 = 0.1 d2 = 0.3 d3 = 0.2 async def d(): raise TerminateTaskGroup async def c(): await asyncio.sleep(d1) async def b(): await asyncio.sleep(d2) async def a(): try: async with asyncio.TaskGroup() as tg: tg.create_task(c()) tg.create_task(b()) await asyncio.sleep(d3) tg.create_task(d()) except* TerminateTaskGroup: pass yappi.start() self.Aergia.start() asyncio.run(a()) self.Aergia.stop() yappi.stop() yappi_samples = yappi.get_func_stats() aergia_samples = self.Aergia.get_samples() self.assert_reasonable_delay('d', 0, aergia_samples) self.assert_reasonable_delay('c', d1, aergia_samples) self.assert_reasonable_delay('b', d3, aergia_samples) # this time is attached to the sleep call itself. Aergia's print # function would confirm this! self.assert_reasonable_delay('a', d3, aergia_samples) self.assert_similar_delay('d', yappi_samples, aergia_samples) self.assert_similar_delay('c', yappi_samples, aergia_samples) self.assert_similar_delay('b', yappi_samples, aergia_samples) self.assert_similar_delay('a', yappi_samples, aergia_samples)