import utils import asyncio import threading class TerminateTaskGroup(Exception): '''Exception raised to terminate a task group.''' class BasicUsage(utils.AergiaUnitTestCase): def test_asyncless(self): def a(): x = 100 while x > 0: x -= 1 self.Aergia.start() a() self.Aergia.stop() samples = self.Aergia.get_samples() self.assertFalse(samples) def test_sequential_tasks(self): delay = 0.2 num_times = 5 async def b(tot, num): await asyncio.sleep(delay) return tot + num async def a(): tot = 0 for i in range(num_times): tot = await b(tot, i) assert tot == 10 self.Aergia.start() asyncio.run(a()) self.Aergia.stop() samples = self.Aergia.get_samples() self.assert_reasonable_delay('b', delay * num_times, samples) def test_simultaneous_tasks(self): delay = 0.2 async def b(): await asyncio.sleep(delay) async def a(): await asyncio.gather(b(), b(), b()) self.Aergia.start() asyncio.run(a()) self.Aergia.stop() samples = self.Aergia.get_samples() self.assert_reasonable_delay('b', delay * 3, samples) # the gather function is technically waiting for all tasks to finish. # This might be seen as unintuitive though I don't want to bias the # results by adding logic to add artificial consistency. # profiling does not mean obscuring implementation details. self.assert_reasonable_delay('a', delay, samples) def test_subthread_task(self): delay = 0.2 async def b(): await asyncio.sleep(delay) async def a(): await asyncio.gather(b(), b(), b()) def c(): asyncio.run(a()) self.Aergia.start() x = threading.Thread(target=c) x.start() x.join() self.Aergia.stop() samples = self.Aergia.get_samples() self.assert_reasonable_delay('c', 0, samples) self.assert_reasonable_delay('b', delay * 3, samples) # see comment on `test_simultaneous_tasks'. self.assert_reasonable_delay('a', delay, samples) def test_eager_task(self): delay = 0.2 async def a(): proc = await asyncio.create_subprocess_shell(f'sleep {delay}') await proc.communicate() self.Aergia.start() asyncio.run(a()) self.Aergia.stop() samples = self.Aergia.get_samples() self.assert_reasonable_delay('a', delay, samples) def test_async_eager_scheduled_woven(self): d1 = 0.2 d2 = 0.3 d3 = 0.2 d4 = 0.1 async def a(): await asyncio.sleep(d1) proc = await asyncio.create_subprocess_shell(f'sleep {d2}') await proc.communicate() await asyncio.sleep(d3) proc = await asyncio.create_subprocess_shell(f'sleep {d4}') await proc.communicate() self.Aergia.start() asyncio.run(a()) self.Aergia.stop() samples = self.Aergia.get_samples() self.assert_reasonable_delay('a', d1 + d2 + d3 + d4, samples) def test_async_generator(self): delay = 0.2 num_times = 10 async def b(): for i in range(num_times): await asyncio.sleep(delay) yield i async def a(): lst = [] async for item in b(): lst.append(item) self.Aergia.start() asyncio.run(a()) self.Aergia.stop() samples = self.Aergia.get_samples() # TODO I do not think async generators report correctly. # also does not work at all with yappi. # doing so would be unique to this profiler. # self.assert_reasonable_delay('b', delay * num_times, samples) # self.assert_reasonable_delay('a', [], samples) self.assert_reasonable_delay('a', delay * num_times, samples) self.assert_reasonable_delay('b', 0, samples) def test_async_gen_and_comp(self): delay = 0.2 num_times = 10 async def b(): for i in range(num_times): await asyncio.sleep(delay) yield i async def a(): return [r async for r in b()] self.Aergia.start() asyncio.run(a()) self.Aergia.stop() samples = self.Aergia.get_samples() # TODO I do not think async generators report correctly. # also does not work at all with yappi. # doing so would be unique to this profiler. self.assert_reasonable_delay('b', 0, samples) self.assert_reasonable_delay('a', 0, samples) self.assert_reasonable_delay('', delay * num_times, samples) def test_deep_await(self): delay = 0.2 async def c(): await asyncio.sleep(delay) async def b(): await c() async def a(): await b() self.Aergia.start() asyncio.run(a()) self.Aergia.stop() samples = self.Aergia.get_samples() self.assert_reasonable_delay( 'c', delay, samples) self.assert_reasonable_delay('b', 0, samples) self.assert_reasonable_delay('a', 0, samples) def test_task_groups(self): d1 = 0.2 d2 = 0.3 d3 = 0.1 async def d(): await asyncio.sleep(d1) async def c(): await asyncio.sleep(d2) async def b(): await asyncio.sleep(d3) async def a(): async with asyncio.TaskGroup() as tg: tg.create_task(d()) tg.create_task(c()) tg.create_task(b()) self.Aergia.start() asyncio.run(a()) self.Aergia.stop() samples = self.Aergia.get_samples() self.assert_reasonable_delay('d', d1, samples) self.assert_reasonable_delay('c', d2, samples) self.assert_reasonable_delay('b', d3, samples) # the task group is technically waiting for all tasks to finish. # same as task.gather # This might be seen as unintuitive, (especially considering how # the next test works), though I don't want to bias the results # by adding logic to add artificial consistency. # Both are reporting correctly. self.assert_reasonable_delay('a', d2, samples) def test_task_groups_cancel(self): d1 = 0.1 d2 = 0.3 d3 = 0.2 async def d(): raise TerminateTaskGroup async def c(): await asyncio.sleep(d1) async def b(): await asyncio.sleep(d2) async def a(): try: async with asyncio.TaskGroup() as tg: tg.create_task(c()) tg.create_task(b()) await asyncio.sleep(d3) tg.create_task(d()) except* TerminateTaskGroup: pass self.Aergia.start() asyncio.run(a()) self.Aergia.stop() samples = self.Aergia.get_samples() self.assert_reasonable_delay('d', 0, samples) self.assert_reasonable_delay('c', d1, samples) self.assert_reasonable_delay('b', d3, samples) # this time is attached to the sleep call itself. Aergia's print # function would confirm this! self.assert_reasonable_delay('a', d3, samples) def test_asyncio_recursion(self): delay = 0.1 async def a(n): if n <= 0: return await asyncio.sleep(delay) await a(n - 1) await a(n - 2) self.Aergia.start() asyncio.run(a(3)) self.Aergia.stop() samples = self.Aergia.get_samples()