import unittest import yappi import utils import logging import asyncio import tempfile try: import aiofiles HAS_AIOFILES = True except ImportError: HAS_AIOFILES = False try: from aiohttp import web import aiohttp HAS_AIOHTTP = True except ImportError: HAS_AIOHTTP = False try: import httpx HAS_HTTPX = True except ImportError: HAS_HTTPX = False try: from quart import Quart, jsonify HAS_QUART = True except ImportError: HAS_QUART = False # A test file containing advanced asyncio tests which have to be verified # manually. class Manual(utils.AergiaUnitTestCase): @unittest.skipIf(not HAS_AIOHTTP, "Skipping 'test_fetch' because aiohttp " "could not be imported.") def test_fetch(self): async def handle(request): return web.Response(text="Hello, world") def get_runner(): app = web.Application() app.router.add_get('/', handle) return web.AppRunner(app) async def start_server(): await runner.setup() site = web.TCPSite(runner, 'localhost', 8080) await site.start() async def stop_server(): await runner.cleanup() async def fetch(url): async with aiohttp.ClientSession() as session: async with session.get(url) as response: return await response.text() async def query(urls): tasks = [fetch(url) for url in urls] return await asyncio.gather(*tasks) async def main(): await start_server() # wait for server to start await asyncio.sleep(1) urls = ['http://localhost:8080'] * 25 await query(urls) await stop_server() print('') runner = get_runner() yappi.start() asyncio.run(main()) yappi.stop() self.yappi_print_traceable_results(yappi.get_func_stats()) runner = get_runner() self.Aergia.start() asyncio.run(main()) self.Aergia.print_samples() self.Aergia.stop() print('') @unittest.skipIf(not HAS_AIOHTTP, "Skipping 'test_fetch_with_burn' because aiohttp " "could not be imported.") def test_fetch_with_burn(self): async def handle(request): utils.burn_cpu(0.05) return web.Response(text="Hello, world") def get_runner(): app = web.Application() app.router.add_get('/', handle) return web.AppRunner(app) async def start_server(): await runner.setup() site = web.TCPSite(runner, 'localhost', 8080) await site.start() async def stop_server(): await runner.cleanup() async def fetch(url): async with aiohttp.ClientSession() as session: async with session.get(url) as response: return await response.text() async def query(urls): tasks = [fetch(url) for url in urls] return await asyncio.gather(*tasks) async def main(): await start_server() # wait for server to start await asyncio.sleep(1) urls = ['http://localhost:8080'] * 150 await query(urls) await stop_server() print('') runner = get_runner() yappi.start() asyncio.run(main()) yappi.stop() self.yappi_print_traceable_results(yappi.get_func_stats()) runner = get_runner() self.Aergia.start() asyncio.run(main()) self.Aergia.print_samples() self.Aergia.stop() print('') @unittest.skipIf(not HAS_HTTPX or not HAS_QUART, "Skipping 'test_quart' because quart or httpx " "could not be imported.") def test_quart(self): # https://quart-latest-doc-gen.readthedocs.io/en/latest/how_to_guides/logging.html # official solutions don't actually disable the logger. :( logging.disable(logging.WARN) app = Quart(__name__) @app.route('/', methods=['GET']) async def fetch(): return jsonify({"message": "Hello, World!"}) async def query(url): async with httpx.AsyncClient() as c: t = [c.get(f'{url}/data') for _ in range(200)] return await asyncio.gather(*t) async def main(): asyncio.create_task(app.run_task(port=5000)) url = "http:/localhost:5000" await query(url) await app.shutdown() print('') yappi.start() asyncio.run(main()) yappi.stop() self.yappi_print_traceable_results(yappi.get_func_stats()) self.Aergia.start() asyncio.run(main()) self.Aergia.print_samples() self.Aergia.stop() print('') @unittest.skipIf(not HAS_HTTPX or not HAS_QUART, "Skipping 'test_quart_with_burn' because quart or httpx " "could not be imported.") def test_quart_with_burn(self): # https://quart-latest-doc-gen.readthedocs.io/en/latest/how_to_guides/logging.html # official solutions don't actually disable the logger. :( logging.disable(logging.WARN) app = Quart(__name__) @app.route('/', methods=['GET']) async def fetch(): return jsonify({"message": "Hello, World!"}) async def query(url): async with httpx.AsyncClient() as c: utils.burn_cpu(0.05) t = [c.get(f'{url}/data') for _ in range(200)] return await asyncio.gather(*t) async def main(): asyncio.create_task(app.run_task(port=5000)) url = "http:/localhost:5000" await query(url) await app.shutdown() print('') yappi.start() asyncio.run(main()) yappi.stop() self.yappi_print_traceable_results(yappi.get_func_stats()) self.Aergia.start() asyncio.run(main()) self.Aergia.print_samples() self.Aergia.stop() print('') @unittest.skipIf(not HAS_AIOFILES, "Skipping 'test_file' because aiofiles " "could not be imported.") def test_file(self): async def write_file(filename, content): async with aiofiles.open(filename, mode='w') as f: await f.write(content) async def read_file(filename): async with aiofiles.open(filename, mode='r') as f: content = await f.read() return content async def read_lots(filename, iterations): tasks = [read_file(filename) for _ in range(iterations)] await asyncio.gather(*tasks) async def main(): # Create a temp file (/tmp) with tempfile.NamedTemporaryFile(delete=True, suffix='.txt') as temp_file: filename = temp_file.name content = 'Hello, World!' # write the file await write_file(filename, content) # read the file 200 times await read_lots(filename, 200) print('') yappi.start() asyncio.run(main()) yappi.stop() self.yappi_print_traceable_results(yappi.get_func_stats()) self.Aergia.start() asyncio.run(main()) self.Aergia.print_samples() self.Aergia.stop() print('') @unittest.skipIf(not HAS_AIOFILES, "Skipping 'test_file_with_burn' because aiofiles " "could not be imported.") def test_file_with_burn(self): async def write_file(filename, content): async with aiofiles.open(filename, mode='w') as f: await f.write(content) async def read_file(filename): utils.burn_cpu(0.05) async with aiofiles.open(filename, mode='r') as f: content = await f.read() return content async def read_lots(filename, iterations): tasks = [read_file(filename) for _ in range(iterations)] await asyncio.gather(*tasks) async def main(): # Create a temp file (/tmp) with tempfile.NamedTemporaryFile(delete=False, suffix='.txt') as temp_file: filename = temp_file.name content = 'Hello, World!' # write the file await write_file(filename, content) # read the file 200 times await read_lots(filename, 200) print('') yappi.start() asyncio.run(main()) yappi.stop() self.yappi_print_traceable_results(yappi.get_func_stats()) self.Aergia.start() asyncio.run(main()) self.Aergia.print_samples() self.Aergia.stop() print('')