summaryrefslogtreecommitdiff
path: root/t
diff options
context:
space:
mode:
authorbd <bdunahu@operationnull.com>2025-07-19 22:21:10 -0600
committerbd <bdunahu@operationnull.com>2025-07-19 22:21:10 -0600
commitac55d9ff0b588b91202ccad72ee71e508e33ad08 (patch)
tree4933b49b67f1affa16f8e7c63e0d214bfa7d62e8 /t
parentd08f067f8f470b440b563293397116d6036c4d71 (diff)
Reformat repository to allow for new unit tests
Diffstat (limited to 't')
-rw-r--r--t/__init__.py0
-rw-r--r--t/async-generator-and-comprehension.py15
-rw-r--r--t/eager_and_scheduled.py19
-rw-r--r--t/manual/flask.py26
-rw-r--r--t/random_wait.py22
-rw-r--r--t/task_groups_and_cancel.py35
-rw-r--r--t/test_functionality.py57
-rw-r--r--t/threads.py23
-rw-r--r--t/utils.py34
9 files changed, 231 insertions, 0 deletions
diff --git a/t/__init__.py b/t/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/t/__init__.py
diff --git a/t/async-generator-and-comprehension.py b/t/async-generator-and-comprehension.py
new file mode 100644
index 0000000..ce85f45
--- /dev/null
+++ b/t/async-generator-and-comprehension.py
@@ -0,0 +1,15 @@
+import asyncio
+
+
+async def async_generator():
+ for i in range(10):
+ await asyncio.sleep(0.5)
+ yield i
+
+
+async def main():
+ r = [a async for a in async_generator()]
+ print(sum(r))
+
+
+asyncio.run(main())
diff --git a/t/eager_and_scheduled.py b/t/eager_and_scheduled.py
new file mode 100644
index 0000000..178a9f4
--- /dev/null
+++ b/t/eager_and_scheduled.py
@@ -0,0 +1,19 @@
+import asyncio
+
+
+async def foo():
+ await asyncio.sleep(1.0)
+ await baz()
+ await asyncio.sleep(0.5)
+
+
+async def bar():
+ await asyncio.create_subprocess_shell('sleep 1.0')
+
+
+async def baz():
+ await asyncio.sleep(1.0)
+
+
+asyncio.run(foo())
+asyncio.run(bar())
diff --git a/t/manual/flask.py b/t/manual/flask.py
new file mode 100644
index 0000000..57951b6
--- /dev/null
+++ b/t/manual/flask.py
@@ -0,0 +1,26 @@
+#!/usr/bin/env -S python3 -m flask --app
+import asyncio
+import os
+import signal
+import time
+from flask import Flask
+
+app = Flask(__name__)
+
+async def query_db():
+ await asyncio.sleep(2.0)
+ return 1
+
+@app.route("/")
+async def hello_world():
+ await asyncio.sleep(10.0)
+ return "<p>Hello, World!</p>"
+
+@app.route("/die")
+async def die():
+ await asyncio.sleep(2.0)
+ os.kill(os.getpid(), signal.SIGINT)
+ return "You've killed me!"
+
+if __name__ == "__main__":
+ app.run(debug=True)
diff --git a/t/random_wait.py b/t/random_wait.py
new file mode 100644
index 0000000..2cfc290
--- /dev/null
+++ b/t/random_wait.py
@@ -0,0 +1,22 @@
+# SuperFastPython.com
+# example of waiting for all tasks to complete
+from random import random
+import asyncio
+
+total = 0
+
+
+async def task_coro(arg):
+ value = random()
+ total += value
+ await asyncio.sleep(value)
+ print(f'>{arg} done in {value}')
+
+
+async def main():
+ tasks = [asyncio.create_task(task_coro(i)) for i in range(10)]
+ done, pending = await asyncio.wait(tasks)
+ print(f'All done. Total waiting time: {total}')
+
+
+asyncio.run(main())
diff --git a/t/task_groups_and_cancel.py b/t/task_groups_and_cancel.py
new file mode 100644
index 0000000..dcc6bbe
--- /dev/null
+++ b/t/task_groups_and_cancel.py
@@ -0,0 +1,35 @@
+import asyncio
+
+
+async def sleep():
+ await asyncio.sleep(3)
+ print('I should never finish!')
+ return 0
+
+
+async def work():
+ i = 0
+ while i < 50:
+ i += 1
+ await asyncio.sleep(0.2)
+ return 0
+
+
+async def explode():
+ await asyncio.sleep(1.5)
+ a = 1 / 0
+ return a
+
+
+async def main():
+ # exploding will bring all other tasks down with it!
+ try:
+ async with asyncio.TaskGroup() as tg:
+ tg.create_task(sleep())
+ tg.create_task(work())
+ tg.create_task(explode())
+ except:
+ pass
+
+
+asyncio.run(main())
diff --git a/t/test_functionality.py b/t/test_functionality.py
new file mode 100644
index 0000000..5253534
--- /dev/null
+++ b/t/test_functionality.py
@@ -0,0 +1,57 @@
+import utils
+import aiohttp
+import asyncio
+
+
+class BasicUsage(utils.AergiaUnitTestCase):
+
+ def test_asyncless(self):
+ def a():
+ x = 100
+ while x > 0:
+ x -= 1
+
+ self.Aergia.start()
+ a()
+ self.Aergia.stop()
+
+ samples = self.Aergia.get_samples()
+ self.assertFalse(samples)
+
+ def test_sequential_tasks(self):
+ delay = 0.2
+ num_times = 5
+
+ async def b(tot, num):
+ await asyncio.sleep(delay)
+ return tot + num
+
+ async def a():
+ tot = 0
+ for i in range(num_times):
+ tot = await b(tot, i)
+ assert tot == 10
+
+ self.Aergia.start()
+ asyncio.run(a())
+ self.Aergia.stop()
+
+ samples = self.Aergia.get_samples()
+
+ self.assertFuncContains('b', [self.expected_samples(delay * num_times)],
+ samples)
+
+ def test_simultaneous_tasks(self):
+ delay = 0.2
+ async def b(): await asyncio.sleep(delay)
+ async def a(): await asyncio.gather(b(), b(), b())
+
+ self.Aergia.start()
+ asyncio.run(a())
+ self.Aergia.stop()
+
+ samples = self.Aergia.get_samples()
+
+ print(self.expected_samples(delay * 3))
+ self.assertFuncContains('b', [self.expected_samples(delay * 3)],
+ samples)
diff --git a/t/threads.py b/t/threads.py
new file mode 100644
index 0000000..fa9c7ac
--- /dev/null
+++ b/t/threads.py
@@ -0,0 +1,23 @@
+import asyncio
+import threading
+
+
+
+async def count():
+ print("it's a secret!")
+ await asyncio.sleep(3)
+
+
+async def main():
+ await asyncio.gather(count(), count(), count())
+ print("done")
+
+
+def thread_func():
+ asyncio.run(main())
+
+
+if __name__ == "__main__":
+ x = threading.Thread(target=thread_func)
+ x.start()
+ x.join()
diff --git a/t/utils.py b/t/utils.py
new file mode 100644
index 0000000..fe9db26
--- /dev/null
+++ b/t/utils.py
@@ -0,0 +1,34 @@
+from aergia.aergia import Aergia
+import unittest
+
+
+class AergiaUnitTestCase(unittest.TestCase):
+
+ interval = 0.01
+ Aergia = Aergia(interval)
+
+ def setUp(self):
+ self.Aergia.clear()
+
+ def assertFuncContains(self, func_name, samples_expected, samples):
+ samples_actual = self.extract_values_by_func(samples,
+ func_name)
+ self.assertTrue(len(samples_expected) == len(samples_actual))
+ s1 = sorted(samples_expected)
+ s2 = sorted(samples_actual)
+ for v1, v2 in zip(s1, s2):
+ self.assertRoughlyEqual(v1, v2)
+
+ def assertRoughlyEqual(self, v1, v2):
+ print(f'{v1}, {v2}')
+ a = abs(v1 - v2)
+ self.assertTrue(a <= 1)
+
+ def expected_samples(self, total_seconds):
+ return (total_seconds / self.interval)
+
+ def extract_values_by_func(self, samples, func_name):
+ return [
+ value for key, value in samples.items()
+ if key.func == func_name
+ ]