Skip to content

Commit

Permalink
Merge pull request #522 from nolar/code-overhead-minmax
Browse files Browse the repository at this point in the history
Compare the timing to code overhead's min/max range instead of max-only
  • Loading branch information
nolar authored Aug 29, 2020
2 parents 49b342b + 17213d7 commit 105fa2f
Show file tree
Hide file tree
Showing 2 changed files with 75 additions and 42 deletions.
100 changes: 67 additions & 33 deletions tests/reactor/conftest.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import asyncio
import time
import dataclasses
from typing import List

import pytest
from asynctest import CoroutineMock
Expand All @@ -24,7 +25,7 @@ def processor():
# Code overhead is not used, but is needed to order the fixtures: first,
# the measurement, which requires the real worker; then, the worker mocking.
@pytest.fixture()
def worker_spy(mocker, watcher_code_overhead):
def worker_spy(mocker, code_overhead):
""" Spy on the watcher: actually call it, but provide the mock-fields. """
spy = CoroutineMock(spec=original_worker, wraps=original_worker)
return mocker.patch('kopf.reactor.queueing.worker', spy)
Expand All @@ -33,7 +34,7 @@ def worker_spy(mocker, watcher_code_overhead):
# Code overhead is not used, but is needed to order the fixtures: first,
# the measurement, which requires the real worker; then, the worker mocking.
@pytest.fixture()
def worker_mock(mocker, watcher_code_overhead):
def worker_mock(mocker, code_overhead):
""" Prevent the queue consumption, so that the queues could be checked. """
return mocker.patch('kopf.reactor.queueing.worker')

Expand Down Expand Up @@ -75,8 +76,23 @@ async def do_nothing(*args, **kwargs):
pass


@dataclasses.dataclass(frozen=True)
class CodeOverhead:
min: float
avg: float
max: float


@pytest.fixture(scope='session')
def _code_overhead_cache():
return []


@pytest.fixture()
async def watcher_code_overhead(resource, stream, aresponses, watcher_limited, timer) -> float:
async def code_overhead(
resource, stream, aresponses, watcher_limited, timer,
_code_overhead_cache,
) -> CodeOverhead:
"""
Estimate the overhead of synchronous code in the watching routines.
Expand Down Expand Up @@ -110,36 +126,54 @@ async def watcher_code_overhead(resource, stream, aresponses, watcher_limited, t
Empirically, the overhead usually remains within the range of 50-150 ms.
It does not depend on the number of events or unique uids in the stream.
It does depend on the hardware used, or containers in the CI systems.
"""
# We feed the stream and consume the stream before we go into the tests,
# which can feed the stream with their own events.
stream.feed([
{'type': 'ADDED', 'object': {'metadata': {'uid': 'uid'}}},
])
stream.close()

# We use our own fixtures -- to not collide with the tests' fixtures.
processor = CoroutineMock()
settings = OperatorSettings()
settings.batching.batch_window = 0
settings.batching.idle_timeout = 1
settings.batching.exit_timeout = 1

with timer:
await watcher(
namespace=None,
resource=resource,
settings=settings,
processor=processor,
)

# Ensure that everything worked as expected, i.e. the worker is not mocked,
# and the whole code is actually executed down to the processor callback.
assert processor.awaited, "The processor is not called for code overhead measurement."
aresponses._responses[:] = []
Several dummy runs are used to average the values, to avoid fluctuation.
The estimation happens only once per session, and is reused for all tests.
"""
if not _code_overhead_cache:

# Collect a few data samples to make the estimation realistic.
overheads: List[float] = []
for _ in range(10):

# We feed the stream and consume the stream before we go into the tests,
# which can feed the stream with their own events.
stream.feed([
{'type': 'ADDED', 'object': {'metadata': {'uid': 'uid'}}},
])
stream.close()

# We use our own fixtures -- to not collide with the tests' fixtures.
processor = CoroutineMock()
settings = OperatorSettings()
settings.batching.batch_window = 0
settings.batching.idle_timeout = 1
settings.batching.exit_timeout = 1

with timer:
await watcher(
namespace=None,
resource=resource,
settings=settings,
processor=processor,
)

# Ensure that everything worked as expected, i.e. the worker is not mocked,
# and the whole code is actually executed down to the processor callback.
assert processor.awaited, "The processor is not called for code overhead measurement."
overheads.append(timer.seconds)

# Reserve extra 10-30% from both sides for occasional variations.
_code_overhead_cache.append(CodeOverhead(
min=min(overheads) * 0.9,
avg=sum(overheads) / len(overheads),
max=max(overheads) * 1.1,
))

# Cleanup our own endpoints, if something is left.
aresponses._responses[:] = []

# Uncomment for debugging of the actual timing: visible only with -s pytest option.
# print(f"The estimated code overhead is {timer.seconds:.3f} seconds (unadjusted).")
# print(f"The estimated code overhead is {overhead}.")

return timer.seconds * 1.33
return _code_overhead_cache[0]
17 changes: 8 additions & 9 deletions tests/reactor/test_queueing.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
@pytest.mark.usefixtures('watcher_limited')
async def test_watchevent_demultiplexing(worker_mock, timer, resource, processor,
settings, stream, events, uids, cnts,
watcher_code_overhead):
code_overhead):
""" Verify that every unique uid goes into its own queue+worker, which are never shared. """

# Inject the events of unique objects - to produce few streams/workers.
Expand All @@ -60,7 +60,7 @@ async def test_watchevent_demultiplexing(worker_mock, timer, resource, processor
)

# The streams are not cleared by the mocked worker, but the worker exits fast.
assert timer.seconds < watcher_code_overhead
assert timer.seconds < code_overhead.max

# The processor must not be called by the watcher, only by the worker.
# But the worker (even if mocked) must be called & awaited by the watcher.
Expand Down Expand Up @@ -112,14 +112,13 @@ async def test_watchevent_demultiplexing(worker_mock, timer, resource, processor
])
@pytest.mark.usefixtures('watcher_limited')
async def test_watchevent_batching(settings, resource, processor, timer,
stream, events, uids, vals,
watcher_code_overhead):
async def test_watchevent_batching(settings, resource, processor, timer, code_overhead,
stream, events, uids, vals):
""" Verify that only the last event per uid is actually handled. """

# Override the default timeouts to make the tests faster.
settings.batching.idle_timeout = 0.5
settings.batching.batch_window = 0.1
settings.batching.batch_window = 0.2
settings.batching.exit_timeout = 0.5

# Inject the events of unique objects - to produce few streams/workers.
Expand All @@ -137,7 +136,7 @@ async def test_watchevent_batching(settings, resource, processor, timer,

# Significantly less than the queue getting timeout, but sufficient to run.
# 2x: 1 pull for the event chain + 1 pull for EOS. TODO: 1x must be enough.
assert timer.seconds < settings.batching.batch_window + watcher_code_overhead
assert code_overhead.min < timer.seconds < settings.batching.batch_window + code_overhead.max

# Was the processor called at all? Awaited as needed for async fns?
assert processor.awaited
Expand Down Expand Up @@ -170,7 +169,7 @@ async def test_watchevent_batching(settings, resource, processor, timer,
])
@pytest.mark.usefixtures('watcher_in_background')
async def test_garbage_collection_of_streams(settings, stream, events, unique, worker_spy,
watcher_code_overhead):
code_overhead):

# Override the default timeouts to make the tests faster.
settings.batching.idle_timeout = 0.5
Expand Down Expand Up @@ -200,7 +199,7 @@ async def test_garbage_collection_of_streams(settings, stream, events, unique, w
# Once the idle timeout, they will exit and gc their individual streams.
await asyncio.sleep(settings.batching.batch_window) # depleting the queues.
await asyncio.sleep(settings.batching.idle_timeout) # idling on empty queues.
await asyncio.sleep(watcher_code_overhead)
await asyncio.sleep(code_overhead.max)

# The mutable(!) streams dict is now empty, i.e. garbage-collected.
assert len(streams) == 0
Expand Down

0 comments on commit 105fa2f

Please sign in to comment.