Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Compare the timing to code overhead's min/max range instead of max-only #522

Merged
merged 3 commits into from
Aug 29, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
100 changes: 67 additions & 33 deletions tests/reactor/conftest.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import asyncio
import time
import dataclasses
from typing import List

import pytest
from asynctest import CoroutineMock
Expand All @@ -24,7 +25,7 @@ def processor():
# Code overhead is not used, but is needed to order the fixtures: first,
# the measurement, which requires the real worker; then, the worker mocking.
@pytest.fixture()
def worker_spy(mocker, watcher_code_overhead):
def worker_spy(mocker, code_overhead):
""" Spy on the watcher: actually call it, but provide the mock-fields. """
spy = CoroutineMock(spec=original_worker, wraps=original_worker)
return mocker.patch('kopf.reactor.queueing.worker', spy)
Expand All @@ -33,7 +34,7 @@ def worker_spy(mocker, watcher_code_overhead):
# Code overhead is not used, but is needed to order the fixtures: first,
# the measurement, which requires the real worker; then, the worker mocking.
@pytest.fixture()
def worker_mock(mocker, watcher_code_overhead):
def worker_mock(mocker, code_overhead):
""" Prevent the queue consumption, so that the queues could be checked. """
return mocker.patch('kopf.reactor.queueing.worker')

Expand Down Expand Up @@ -75,8 +76,23 @@ async def do_nothing(*args, **kwargs):
pass


@dataclasses.dataclass(frozen=True)
class CodeOverhead:
min: float
avg: float
max: float


@pytest.fixture(scope='session')
def _code_overhead_cache():
return []


@pytest.fixture()
async def watcher_code_overhead(resource, stream, aresponses, watcher_limited, timer) -> float:
async def code_overhead(
resource, stream, aresponses, watcher_limited, timer,
_code_overhead_cache,
) -> CodeOverhead:
"""
Estimate the overhead of synchronous code in the watching routines.

Expand Down Expand Up @@ -110,36 +126,54 @@ async def watcher_code_overhead(resource, stream, aresponses, watcher_limited, t
Empirically, the overhead usually remains within the range of 50-150 ms.
It does not depend on the number of events or unique uids in the stream.
It does depend on the hardware used, or containers in the CI systems.
"""

# We feed the stream and consume the stream before we go into the tests,
# which can feed the stream with their own events.
stream.feed([
{'type': 'ADDED', 'object': {'metadata': {'uid': 'uid'}}},
])
stream.close()

# We use our own fixtures -- to not collide with the tests' fixtures.
processor = CoroutineMock()
settings = OperatorSettings()
settings.batching.batch_window = 0
settings.batching.idle_timeout = 1
settings.batching.exit_timeout = 1

with timer:
await watcher(
namespace=None,
resource=resource,
settings=settings,
processor=processor,
)

# Ensure that everything worked as expected, i.e. the worker is not mocked,
# and the whole code is actually executed down to the processor callback.
assert processor.awaited, "The processor is not called for code overhead measurement."
aresponses._responses[:] = []
Several dummy runs are used to average the values, to avoid fluctuation.
The estimation happens only once per session, and is reused for all tests.
"""
if not _code_overhead_cache:

# Collect a few data samples to make the estimation realistic.
overheads: List[float] = []
for _ in range(10):

# We feed the stream and consume the stream before we go into the tests,
# which can feed the stream with their own events.
stream.feed([
{'type': 'ADDED', 'object': {'metadata': {'uid': 'uid'}}},
])
stream.close()

# We use our own fixtures -- to not collide with the tests' fixtures.
processor = CoroutineMock()
settings = OperatorSettings()
settings.batching.batch_window = 0
settings.batching.idle_timeout = 1
settings.batching.exit_timeout = 1

with timer:
await watcher(
namespace=None,
resource=resource,
settings=settings,
processor=processor,
)

# Ensure that everything worked as expected, i.e. the worker is not mocked,
# and the whole code is actually executed down to the processor callback.
assert processor.awaited, "The processor is not called for code overhead measurement."
overheads.append(timer.seconds)

# Reserve extra 10-30% from both sides for occasional variations.
_code_overhead_cache.append(CodeOverhead(
min=min(overheads) * 0.9,
avg=sum(overheads) / len(overheads),
max=max(overheads) * 1.1,
))

# Cleanup our own endpoints, if something is left.
aresponses._responses[:] = []

# Uncomment for debugging of the actual timing: visible only with -s pytest option.
# print(f"The estimated code overhead is {timer.seconds:.3f} seconds (unadjusted).")
# print(f"The estimated code overhead is {overhead}.")

return timer.seconds * 1.33
return _code_overhead_cache[0]
17 changes: 8 additions & 9 deletions tests/reactor/test_queueing.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
@pytest.mark.usefixtures('watcher_limited')
async def test_watchevent_demultiplexing(worker_mock, timer, resource, processor,
settings, stream, events, uids, cnts,
watcher_code_overhead):
code_overhead):
""" Verify that every unique uid goes into its own queue+worker, which are never shared. """

# Inject the events of unique objects - to produce few streams/workers.
Expand All @@ -60,7 +60,7 @@ async def test_watchevent_demultiplexing(worker_mock, timer, resource, processor
)

# The streams are not cleared by the mocked worker, but the worker exits fast.
assert timer.seconds < watcher_code_overhead
assert timer.seconds < code_overhead.max

# The processor must not be called by the watcher, only by the worker.
# But the worker (even if mocked) must be called & awaited by the watcher.
Expand Down Expand Up @@ -112,14 +112,13 @@ async def test_watchevent_demultiplexing(worker_mock, timer, resource, processor

])
@pytest.mark.usefixtures('watcher_limited')
async def test_watchevent_batching(settings, resource, processor, timer,
stream, events, uids, vals,
watcher_code_overhead):
async def test_watchevent_batching(settings, resource, processor, timer, code_overhead,
stream, events, uids, vals):
""" Verify that only the last event per uid is actually handled. """

# Override the default timeouts to make the tests faster.
settings.batching.idle_timeout = 0.5
settings.batching.batch_window = 0.1
settings.batching.batch_window = 0.2
settings.batching.exit_timeout = 0.5

# Inject the events of unique objects - to produce few streams/workers.
Expand All @@ -137,7 +136,7 @@ async def test_watchevent_batching(settings, resource, processor, timer,

# Significantly less than the queue getting timeout, but sufficient to run.
# 2x: 1 pull for the event chain + 1 pull for EOS. TODO: 1x must be enough.
assert timer.seconds < settings.batching.batch_window + watcher_code_overhead
assert code_overhead.min < timer.seconds < settings.batching.batch_window + code_overhead.max

# Was the processor called at all? Awaited as needed for async fns?
assert processor.awaited
Expand Down Expand Up @@ -170,7 +169,7 @@ async def test_watchevent_batching(settings, resource, processor, timer,
])
@pytest.mark.usefixtures('watcher_in_background')
async def test_garbage_collection_of_streams(settings, stream, events, unique, worker_spy,
watcher_code_overhead):
code_overhead):

# Override the default timeouts to make the tests faster.
settings.batching.idle_timeout = 0.5
Expand Down Expand Up @@ -200,7 +199,7 @@ async def test_garbage_collection_of_streams(settings, stream, events, unique, w
# Once the idle timeout, they will exit and gc their individual streams.
await asyncio.sleep(settings.batching.batch_window) # depleting the queues.
await asyncio.sleep(settings.batching.idle_timeout) # idling on empty queues.
await asyncio.sleep(watcher_code_overhead)
await asyncio.sleep(code_overhead.max)

# The mutable(!) streams dict is now empty, i.e. garbage-collected.
assert len(streams) == 0
Expand Down