1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
|
import yappi
import utils
import asyncio
import threading
# A test file containing tests adapted from yappi's own test suite.
class YappiTests(utils.AergiaUnitTestCase):
def test_asyncio_recursion_yappi(self):
delay = 0.1
async def a(n):
if n <= 0:
return
await asyncio.sleep(delay)
utils.burn_cpu(0.1)
await a(n - 1)
await a(n - 2)
yappi.start()
self.Aergia.start()
asyncio.run(a(3))
self.Aergia.stop()
yappi.stop()
yappi_samples = yappi.get_func_stats()
aergia_samples = self.Aergia.get_samples()
self.assert_reasonable_delay('a', delay * 4, aergia_samples)
# TODO revisit, I think Aergia was agreeable when I checked this manually
# possible function bias
# self.assert_similar_delay('a', yappi_samples, aergia_samples)
def test_basic_multithread(self):
delay = 0.1
num_times = 5
async def a():
await asyncio.sleep(delay)
async def b():
await a()
async def recursive_a(n):
if not n:
return
await asyncio.sleep(delay)
await recursive_a(n - 1)
def tag_cbk():
cthread = threading.current_thread()
try:
return cthread._tag
except:
return -1
threading.current_thread()._tag = 0
yappi.set_tag_callback(tag_cbk)
def _thread_event_loop(loop):
asyncio.set_event_loop(loop)
loop.run_forever()
_TCOUNT = 3
_ctag = 1
ts = []
for i in range(_TCOUNT):
_loop = asyncio.new_event_loop()
t = threading.Thread(target=_thread_event_loop, args=(_loop, ))
t._tag = _ctag
t._loop = _loop
t.start()
ts.append(t)
_ctag += 1
async def stop_loop():
asyncio.get_event_loop().stop()
async def driver():
futs = []
fut = asyncio.run_coroutine_threadsafe(a(), ts[0]._loop)
futs.append(fut)
fut = asyncio.run_coroutine_threadsafe(recursive_a(num_times), ts[1]._loop)
futs.append(fut)
fut = asyncio.run_coroutine_threadsafe(b(), ts[2]._loop)
futs.append(fut)
for fut in futs:
fut.result()
for t in ts:
asyncio.run_coroutine_threadsafe(stop_loop(), t._loop)
yappi.start()
self.Aergia.start()
asyncio.run(driver())
self.Aergia.stop()
yappi.stop()
yappi_samples = yappi.get_func_stats()
aergia_samples = self.Aergia.get_samples()
self.assert_reasonable_delay('a', delay * 2, aergia_samples)
self.assert_reasonable_delay('b', 0, aergia_samples)
self.assert_reasonable_delay('recursive_a',
delay * num_times,
aergia_samples)
self.assert_similar_delay('a', yappi_samples, aergia_samples)
self.assert_similar_delay('recursive_a', yappi_samples, aergia_samples)
# Aergia only assigns time to the current line when the task suspends
# This should fail.
# self.assert_similar_delay('b', yappi_samples, aergia_samples)
|