1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
|
import utils
import aiohttp
import asyncio
class BasicUsage(utils.AergiaUnitTestCase):
def test_asyncless(self):
def a():
x = 100
while x > 0:
x -= 1
self.Aergia.start()
a()
self.Aergia.stop()
samples = self.Aergia.get_samples()
self.assertFalse(samples)
def test_sequential_tasks(self):
delay = 0.2
num_times = 5
async def b(tot, num):
await asyncio.sleep(delay)
return tot + num
async def a():
tot = 0
for i in range(num_times):
tot = await b(tot, i)
assert tot == 10
self.Aergia.start()
asyncio.run(a())
self.Aergia.stop()
samples = self.Aergia.get_samples()
self.assertFuncContains('b', [self.expected_samples(delay * num_times)],
samples)
def test_simultaneous_tasks(self):
delay = 0.2
async def b(): await asyncio.sleep(delay)
async def a(): await asyncio.gather(b(), b(), b())
self.Aergia.start()
asyncio.run(a())
self.Aergia.stop()
samples = self.Aergia.get_samples()
self.assertFuncContains('b', [self.expected_samples(delay * 3)],
samples)
# TODO samples from gather all execution time, should we trace this??
self.assertFuncContains('a', [self.expected_samples(delay)], samples)
def test_eager_task(self):
delay = 0.2
async def a():
proc = await asyncio.create_subprocess_shell(f'sleep {delay}')
await proc.communicate()
self.Aergia.start()
asyncio.run(a())
self.Aergia.stop()
samples = self.Aergia.get_samples()
self.assertFuncContains('a', [self.expected_samples(delay)], samples)
|