import unittest import yappi import utils import asyncio import tempfile try: import aiofiles HAS_AIOFILES = True except ImportError: HAS_AIOFILES = False try: from aiohttp import web import aiohttp HAS_AIOHTTP = True except ImportError: HAS_AIOHTTP = False # A test file containing advanced asyncio tests which have to be verified # manually. class Manual(utils.AergiaUnitTestCase): @unittest.skipIf(not HAS_AIOHTTP, "Skipping 'test_fetch' because aiohttp " "could not be imported.") def test_fetch(self): async def handle(request): return web.Response(text="Hello, world") def get_runner(): app = web.Application() app.router.add_get('/', handle) return web.AppRunner(app) async def start_server(): await runner.setup() site = web.TCPSite(runner, 'localhost', 8080) await site.start() async def stop_server(): await runner.cleanup() async def fetch(url): async with aiohttp.ClientSession() as session: async with session.get(url) as response: return await response.text() async def query(urls): tasks = [fetch(url) for url in urls] return await asyncio.gather(*tasks) async def main(): await start_server() # wait for server to start await asyncio.sleep(1) urls = ['http://localhost:8080'] * 150 await query(urls) await stop_server() print('') runner = get_runner() yappi.start() asyncio.run(main()) yappi.stop() self.yappi_print_traceable_results(yappi.get_func_stats()) runner = get_runner() self.Aergia.start() asyncio.run(main()) self.Aergia.print_samples() self.Aergia.stop() print('') @unittest.skipIf(not HAS_AIOHTTP, "Skipping 'test_fetch_with_burn' because aiohttp " "could not be imported.") def test_fetch_with_burn(self): async def handle(request): utils.burn_cpu(0.05) return web.Response(text="Hello, world") def get_runner(): app = web.Application() app.router.add_get('/', handle) return web.AppRunner(app) async def start_server(): await runner.setup() site = web.TCPSite(runner, 'localhost', 8080) await site.start() async def stop_server(): await runner.cleanup() async def fetch(url): async with aiohttp.ClientSession() as session: async with session.get(url) as response: return await response.text() async def query(urls): tasks = [fetch(url) for url in urls] return await asyncio.gather(*tasks) async def main(): await start_server() # wait for server to start await asyncio.sleep(1) urls = ['http://localhost:8080'] * 150 await query(urls) await stop_server() print('') runner = get_runner() yappi.start() asyncio.run(main()) yappi.stop() self.yappi_print_traceable_results(yappi.get_func_stats()) runner = get_runner() self.Aergia.start() asyncio.run(main()) self.Aergia.print_samples() self.Aergia.stop() print('') @unittest.skipIf(not HAS_AIOFILES, "Skipping 'test_file' because aiofiles " "could not be imported.") def test_file(self): async def write_file(filename, content): async with aiofiles.open(filename, mode='w') as f: await f.write(content) async def read_file(filename): async with aiofiles.open(filename, mode='r') as f: content = await f.read() return content async def read_lots(filename, iterations): tasks = [read_file(filename) for _ in range(iterations)] await asyncio.gather(*tasks) async def main(): # Create a temp file (/tmp) with tempfile.NamedTemporaryFile(delete=True, suffix='.txt') as temp_file: filename = temp_file.name content = 'Hello, World!' # write the file await write_file(filename, content) # read the file 200 times await read_lots(filename, 200) print('') yappi.start() asyncio.run(main()) yappi.stop() self.yappi_print_traceable_results(yappi.get_func_stats()) self.Aergia.start() asyncio.run(main()) self.Aergia.print_samples() self.Aergia.stop() print('') @unittest.skipIf(not HAS_AIOFILES, "Skipping 'test_file_with_burn' because aiofiles " "could not be imported.") def test_file_with_burn(self): async def write_file(filename, content): async with aiofiles.open(filename, mode='w') as f: await f.write(content) async def read_file(filename): utils.burn_cpu(0.05) async with aiofiles.open(filename, mode='r') as f: content = await f.read() return content async def read_lots(filename, iterations): tasks = [read_file(filename) for _ in range(iterations)] await asyncio.gather(*tasks) async def main(): # Create a temp file (/tmp) with tempfile.NamedTemporaryFile(delete=False, suffix='.txt') as temp_file: filename = temp_file.name content = 'Hello, World!' # write the file await write_file(filename, content) # read the file 200 times await read_lots(filename, 200) print('') yappi.start() asyncio.run(main()) yappi.stop() self.yappi_print_traceable_results(yappi.get_func_stats()) self.Aergia.start() asyncio.run(main()) self.Aergia.print_samples() self.Aergia.stop() print('')