diff options
| author | bd <bdunahu@operationnull.com> | 2025-07-19 22:21:10 -0600 |
|---|---|---|
| committer | bd <bdunahu@operationnull.com> | 2025-07-19 22:21:10 -0600 |
| commit | ac55d9ff0b588b91202ccad72ee71e508e33ad08 (patch) | |
| tree | 4933b49b67f1affa16f8e7c63e0d214bfa7d62e8 /t | |
| parent | d08f067f8f470b440b563293397116d6036c4d71 (diff) | |
Reformat repository to allow for new unit tests
Diffstat (limited to 't')
| -rw-r--r-- | t/__init__.py | 0 | ||||
| -rw-r--r-- | t/async-generator-and-comprehension.py | 15 | ||||
| -rw-r--r-- | t/eager_and_scheduled.py | 19 | ||||
| -rw-r--r-- | t/manual/flask.py | 26 | ||||
| -rw-r--r-- | t/random_wait.py | 22 | ||||
| -rw-r--r-- | t/task_groups_and_cancel.py | 35 | ||||
| -rw-r--r-- | t/test_functionality.py | 57 | ||||
| -rw-r--r-- | t/threads.py | 23 | ||||
| -rw-r--r-- | t/utils.py | 34 |
9 files changed, 231 insertions, 0 deletions
diff --git a/t/__init__.py b/t/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/t/__init__.py diff --git a/t/async-generator-and-comprehension.py b/t/async-generator-and-comprehension.py new file mode 100644 index 0000000..ce85f45 --- /dev/null +++ b/t/async-generator-and-comprehension.py @@ -0,0 +1,15 @@ +import asyncio + + +async def async_generator(): + for i in range(10): + await asyncio.sleep(0.5) + yield i + + +async def main(): + r = [a async for a in async_generator()] + print(sum(r)) + + +asyncio.run(main()) diff --git a/t/eager_and_scheduled.py b/t/eager_and_scheduled.py new file mode 100644 index 0000000..178a9f4 --- /dev/null +++ b/t/eager_and_scheduled.py @@ -0,0 +1,19 @@ +import asyncio + + +async def foo(): + await asyncio.sleep(1.0) + await baz() + await asyncio.sleep(0.5) + + +async def bar(): + await asyncio.create_subprocess_shell('sleep 1.0') + + +async def baz(): + await asyncio.sleep(1.0) + + +asyncio.run(foo()) +asyncio.run(bar()) diff --git a/t/manual/flask.py b/t/manual/flask.py new file mode 100644 index 0000000..57951b6 --- /dev/null +++ b/t/manual/flask.py @@ -0,0 +1,26 @@ +#!/usr/bin/env -S python3 -m flask --app +import asyncio +import os +import signal +import time +from flask import Flask + +app = Flask(__name__) + +async def query_db(): + await asyncio.sleep(2.0) + return 1 + +@app.route("/") +async def hello_world(): + await asyncio.sleep(10.0) + return "<p>Hello, World!</p>" + +@app.route("/die") +async def die(): + await asyncio.sleep(2.0) + os.kill(os.getpid(), signal.SIGINT) + return "You've killed me!" + +if __name__ == "__main__": + app.run(debug=True) diff --git a/t/random_wait.py b/t/random_wait.py new file mode 100644 index 0000000..2cfc290 --- /dev/null +++ b/t/random_wait.py @@ -0,0 +1,22 @@ +# SuperFastPython.com +# example of waiting for all tasks to complete +from random import random +import asyncio + +total = 0 + + +async def task_coro(arg): + value = random() + total += value + await asyncio.sleep(value) + print(f'>{arg} done in {value}') + + +async def main(): + tasks = [asyncio.create_task(task_coro(i)) for i in range(10)] + done, pending = await asyncio.wait(tasks) + print(f'All done. Total waiting time: {total}') + + +asyncio.run(main()) diff --git a/t/task_groups_and_cancel.py b/t/task_groups_and_cancel.py new file mode 100644 index 0000000..dcc6bbe --- /dev/null +++ b/t/task_groups_and_cancel.py @@ -0,0 +1,35 @@ +import asyncio + + +async def sleep(): + await asyncio.sleep(3) + print('I should never finish!') + return 0 + + +async def work(): + i = 0 + while i < 50: + i += 1 + await asyncio.sleep(0.2) + return 0 + + +async def explode(): + await asyncio.sleep(1.5) + a = 1 / 0 + return a + + +async def main(): + # exploding will bring all other tasks down with it! + try: + async with asyncio.TaskGroup() as tg: + tg.create_task(sleep()) + tg.create_task(work()) + tg.create_task(explode()) + except: + pass + + +asyncio.run(main()) diff --git a/t/test_functionality.py b/t/test_functionality.py new file mode 100644 index 0000000..5253534 --- /dev/null +++ b/t/test_functionality.py @@ -0,0 +1,57 @@ +import utils +import aiohttp +import asyncio + + +class BasicUsage(utils.AergiaUnitTestCase): + + def test_asyncless(self): + def a(): + x = 100 + while x > 0: + x -= 1 + + self.Aergia.start() + a() + self.Aergia.stop() + + samples = self.Aergia.get_samples() + self.assertFalse(samples) + + def test_sequential_tasks(self): + delay = 0.2 + num_times = 5 + + async def b(tot, num): + await asyncio.sleep(delay) + return tot + num + + async def a(): + tot = 0 + for i in range(num_times): + tot = await b(tot, i) + assert tot == 10 + + self.Aergia.start() + asyncio.run(a()) + self.Aergia.stop() + + samples = self.Aergia.get_samples() + + self.assertFuncContains('b', [self.expected_samples(delay * num_times)], + samples) + + def test_simultaneous_tasks(self): + delay = 0.2 + async def b(): await asyncio.sleep(delay) + async def a(): await asyncio.gather(b(), b(), b()) + + self.Aergia.start() + asyncio.run(a()) + self.Aergia.stop() + + samples = self.Aergia.get_samples() + + print(self.expected_samples(delay * 3)) + self.assertFuncContains('b', [self.expected_samples(delay * 3)], + samples) diff --git a/t/threads.py b/t/threads.py new file mode 100644 index 0000000..fa9c7ac --- /dev/null +++ b/t/threads.py @@ -0,0 +1,23 @@ +import asyncio +import threading + + + +async def count(): + print("it's a secret!") + await asyncio.sleep(3) + + +async def main(): + await asyncio.gather(count(), count(), count()) + print("done") + + +def thread_func(): + asyncio.run(main()) + + +if __name__ == "__main__": + x = threading.Thread(target=thread_func) + x.start() + x.join() diff --git a/t/utils.py b/t/utils.py new file mode 100644 index 0000000..fe9db26 --- /dev/null +++ b/t/utils.py @@ -0,0 +1,34 @@ +from aergia.aergia import Aergia +import unittest + + +class AergiaUnitTestCase(unittest.TestCase): + + interval = 0.01 + Aergia = Aergia(interval) + + def setUp(self): + self.Aergia.clear() + + def assertFuncContains(self, func_name, samples_expected, samples): + samples_actual = self.extract_values_by_func(samples, + func_name) + self.assertTrue(len(samples_expected) == len(samples_actual)) + s1 = sorted(samples_expected) + s2 = sorted(samples_actual) + for v1, v2 in zip(s1, s2): + self.assertRoughlyEqual(v1, v2) + + def assertRoughlyEqual(self, v1, v2): + print(f'{v1}, {v2}') + a = abs(v1 - v2) + self.assertTrue(a <= 1) + + def expected_samples(self, total_seconds): + return (total_seconds / self.interval) + + def extract_values_by_func(self, samples, func_name): + return [ + value for key, value in samples.items() + if key.func == func_name + ] |
