from aergia.aergia import Aergia import yappi import time import unittest class AergiaUnitTestCase(unittest.TestCase): interval = 0.01 Aergia = Aergia(interval, True) # yappi is a singleton def setUp(self): self.Aergia.clear() yappi.stop() yappi.clear_stats() yappi.set_clock_type('wall') yappi.set_context_id_callback(None) yappi.set_context_name_callback(None) yappi.set_tag_callback(None) def assert_reasonable_delay(self, func_name, time_expected, samples): '''Compares the results reported by Aergia for FUNC_NAME with time_expected.''' time_actual = self.aergia_extract_values_by_func(samples, func_name) self.assert_roughly_equal(time_actual, time_expected) def assert_similar_delay(self, func_name, yappi_stats, aergia_samples): '''Compares the results reported by Aergia for FUNC_NAME with yappi.''' time_yappi = self.yappi_extract_values_by_func(yappi_stats, func_name) time_aergia = self.aergia_extract_values_by_func( aergia_samples, func_name) self.assert_roughly_equal(time_aergia, time_yappi) def assert_roughly_equal(self, v1, v2): '''Throws an exception if values V1 and V2 are not within 0.035 seconds of each other. This number is influenced by instrumentation overhead, sampling inconsistency, etc.''' a = abs(v1 - v2) self.assertTrue(a <= .035, f'{v1} (actual) not roughly {v2} (expected)') def aergia_extract_values_by_func(self, samples, func_name): '''Returns the total number of samples for function name, given an Aergia SAMPLES object.''' return sum( value for key, value in samples.items() if key.func == func_name ) def yappi_extract_values_by_func(self, stats, func_name): '''Returns the total number of samples for function name, given a yappi stats object.''' for s in stats: if s.name == func_name: return s.ttot def yappi_print_traceable_results(self, stats): '''Filters the list of yappi stats, printing information for only the same functions as Aergia. This is for manual testing.''' print(f"{'FILE':<30} {'FUNC':<30} {'SEC':<10}") for s in stats: fname = s.module if Aergia._should_trace(fname): fname = \ (fname if len(fname) <= 25 else fname[-25:]) print(f"{fname}:{s.lineno}".ljust(30) + f"{s.name:<30} {s.ttot:<10.4f}") def burn_cpu(sec): t0 = time.perf_counter() elapsed = 0 while (elapsed < sec): for _ in range(1000): pass elapsed = time.perf_counter() - t0 def burn_io(sec): time.sleep(sec)