import yappi import utils import asyncio import threading # A test file containing tests adapted from yappi's own test suite. class YappiTests(utils.AergiaUnitTestCase): def test_asyncio_recursion_yappi(self): delay = 0.1 async def a(n): if n <= 0: return await asyncio.sleep(delay) await a(n - 1) await a(n - 2) yappi.start() self.Aergia.start() asyncio.run(a(3)) self.Aergia.stop() yappi.stop() yappi_samples = yappi.get_func_stats() aergia_samples = self.Aergia.get_samples() self.assert_reasonable_delay('a', delay * 4, aergia_samples) self.assert_similar_delay('a', yappi_samples, aergia_samples) def test_basic_multithread(self): delay = 0.1 num_times = 5 async def a(): await asyncio.sleep(delay) async def b(): await a() async def recursive_a(n): if not n: return await asyncio.sleep(delay) await recursive_a(n - 1) def tag_cbk(): cthread = threading.current_thread() try: return cthread._tag except: return -1 threading.current_thread()._tag = 0 yappi.set_tag_callback(tag_cbk) def _thread_event_loop(loop): asyncio.set_event_loop(loop) loop.run_forever() _TCOUNT = 3 _ctag = 1 ts = [] for i in range(_TCOUNT): _loop = asyncio.new_event_loop() t = threading.Thread(target=_thread_event_loop, args=(_loop, )) t._tag = _ctag t._loop = _loop t.start() ts.append(t) _ctag += 1 async def stop_loop(): asyncio.get_event_loop().stop() async def driver(): futs = [] fut = asyncio.run_coroutine_threadsafe(a(), ts[0]._loop) futs.append(fut) fut = asyncio.run_coroutine_threadsafe(recursive_a(num_times), ts[1]._loop) futs.append(fut) fut = asyncio.run_coroutine_threadsafe(b(), ts[2]._loop) futs.append(fut) for fut in futs: fut.result() for t in ts: asyncio.run_coroutine_threadsafe(stop_loop(), t._loop) yappi.start() self.Aergia.start() asyncio.run(driver()) self.Aergia.stop() yappi.stop() yappi_samples = yappi.get_func_stats() aergia_samples = self.Aergia.get_samples() self.assert_reasonable_delay('a', delay * 2, aergia_samples) self.assert_reasonable_delay('b', 0, aergia_samples) self.assert_reasonable_delay('recursive_a', delay * num_times, aergia_samples) self.assert_similar_delay('a', yappi_samples, aergia_samples) self.assert_similar_delay('recursive_a', yappi_samples, aergia_samples) # Aergia only assigns time to the current line when the task suspends # This should fail. # self.assert_similar_delay('b', yappi_samples, aergia_samples)