Skip to content

Commit

Permalink
Cache code overhead measurements throughout the session for all tests
Browse files Browse the repository at this point in the history
  • Loading branch information
nolar committed Aug 29, 2020
1 parent 5c52d3a commit 17213d7
Showing 1 changed file with 55 additions and 43 deletions.
98 changes: 55 additions & 43 deletions tests/reactor/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,16 @@ class CodeOverhead:
max: float


@pytest.fixture(scope='session')
def _code_overhead_cache():
return []


@pytest.fixture()
async def code_overhead(resource, stream, aresponses, watcher_limited, timer) -> CodeOverhead:
async def code_overhead(
resource, stream, aresponses, watcher_limited, timer,
_code_overhead_cache,
) -> CodeOverhead:
"""
Estimate the overhead of synchronous code in the watching routines.
Expand Down Expand Up @@ -118,50 +126,54 @@ async def code_overhead(resource, stream, aresponses, watcher_limited, timer) ->
Empirically, the overhead usually remains within the range of 50-150 ms.
It does not depend on the number of events or unique uids in the stream.
It does depend on the hardware used, or containers in the CI systems.
"""
# Collect a few data samples to make the estimation realistic.
overheads: List[float] = []
for _ in range(10):

# We feed the stream and consume the stream before we go into the tests,
# which can feed the stream with their own events.
stream.feed([
{'type': 'ADDED', 'object': {'metadata': {'uid': 'uid'}}},
])
stream.close()

# We use our own fixtures -- to not collide with the tests' fixtures.
processor = CoroutineMock()
settings = OperatorSettings()
settings.batching.batch_window = 0
settings.batching.idle_timeout = 1
settings.batching.exit_timeout = 1

with timer:
await watcher(
namespace=None,
resource=resource,
settings=settings,
processor=processor,
)

# Ensure that everything worked as expected, i.e. the worker is not mocked,
# and the whole code is actually executed down to the processor callback.
assert processor.awaited, "The processor is not called for code overhead measurement."
overheads.append(timer.seconds)

# Reserve extra 10-30% from both sides for occasional variations.
overhead = CodeOverhead(
min=min(overheads) * 0.9,
avg=sum(overheads) / len(overheads),
max=max(overheads) * 1.1,
)

# Cleanup our own endpoints, if something is left.
aresponses._responses[:] = []
Several dummy runs are used to average the values, to avoid fluctuation.
The estimation happens only once per session, and is reused for all tests.
"""
if not _code_overhead_cache:

# Collect a few data samples to make the estimation realistic.
overheads: List[float] = []
for _ in range(10):

# We feed the stream and consume the stream before we go into the tests,
# which can feed the stream with their own events.
stream.feed([
{'type': 'ADDED', 'object': {'metadata': {'uid': 'uid'}}},
])
stream.close()

# We use our own fixtures -- to not collide with the tests' fixtures.
processor = CoroutineMock()
settings = OperatorSettings()
settings.batching.batch_window = 0
settings.batching.idle_timeout = 1
settings.batching.exit_timeout = 1

with timer:
await watcher(
namespace=None,
resource=resource,
settings=settings,
processor=processor,
)

# Ensure that everything worked as expected, i.e. the worker is not mocked,
# and the whole code is actually executed down to the processor callback.
assert processor.awaited, "The processor is not called for code overhead measurement."
overheads.append(timer.seconds)

# Reserve extra 10-30% from both sides for occasional variations.
_code_overhead_cache.append(CodeOverhead(
min=min(overheads) * 0.9,
avg=sum(overheads) / len(overheads),
max=max(overheads) * 1.1,
))

# Cleanup our own endpoints, if something is left.
aresponses._responses[:] = []

# Uncomment for debugging of the actual timing: visible only with -s pytest option.
# print(f"The estimated code overhead is {overhead}.")

return overhead
return _code_overhead_cache[0]

0 comments on commit 17213d7

Please sign in to comment.