diff --git a/arroyo/backends/kafka/commit.py b/arroyo/backends/kafka/commit.py index 44bd9538..ce7d3df6 100644 --- a/arroyo/backends/kafka/commit.py +++ b/arroyo/backends/kafka/commit.py @@ -97,6 +97,6 @@ def decode_legacy(self, value: KafkaPayload) -> Commit: if max_times_to_log_legacy_message > 0: max_times_to_log_legacy_message -= 1 - logger.warn(f"Legacy commit message found: {commit}") + logger.warning("Legacy commit message found: %s", commit) return commit diff --git a/arroyo/utils/clock.py b/arroyo/utils/clock.py index 976af377..8def194d 100644 --- a/arroyo/utils/clock.py +++ b/arroyo/utils/clock.py @@ -28,7 +28,7 @@ def sleep(self, duration: float) -> None: time.sleep(duration) -class TestingClock(Clock): +class MockedClock(Clock): """ A clock implementation that uses a stable time for testing. To advance the time, use the ``sleep`` method. diff --git a/tests/backends/test_local.py b/tests/backends/test_local.py index 139c4535..de31bb0f 100644 --- a/tests/backends/test_local.py +++ b/tests/backends/test_local.py @@ -18,14 +18,14 @@ ) from arroyo.backends.local.storages.memory import MemoryMessageStorage from arroyo.types import Partition, Topic -from arroyo.utils.clock import TestingClock +from arroyo.utils.clock import MockedClock from tests.backends.mixins import StreamsTestMixin class LocalStreamsTestMixin(StreamsTestMixin[int]): def setUp(self) -> None: self.storage = self.get_message_storage() - self.broker = LocalBroker(self.storage, TestingClock()) + self.broker = LocalBroker(self.storage, MockedClock()) @abstractmethod def get_message_storage(self) -> MessageStorage[int]: diff --git a/tests/conftest.py b/tests/conftest.py index 3188be29..5905fa13 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -7,7 +7,7 @@ from arroyo.backends.local.backend import LocalBroker from arroyo.backends.local.storages.memory import MemoryMessageStorage from arroyo.types import TStrategyPayload -from arroyo.utils.clock import TestingClock +from arroyo.utils.clock import MockedClock from arroyo.utils.metrics import configure_metrics from tests.metrics import TestingMetricsBackend @@ -24,7 +24,7 @@ def clear_metrics_state() -> Iterator[None]: @pytest.fixture def broker() -> Iterator[LocalBroker[TStrategyPayload]]: - yield LocalBroker(MemoryMessageStorage(), TestingClock()) + yield LocalBroker(MemoryMessageStorage(), MockedClock()) @pytest.fixture(autouse=True) diff --git a/tests/processing/strategies/test_produce.py b/tests/processing/strategies/test_produce.py index 10ff9db6..74eba7a1 100644 --- a/tests/processing/strategies/test_produce.py +++ b/tests/processing/strategies/test_produce.py @@ -9,13 +9,13 @@ from arroyo.processing.strategies.abstract import MessageRejected from arroyo.processing.strategies.produce import Produce from arroyo.types import Message, Partition, Topic, Value -from arroyo.utils.clock import TestingClock +from arroyo.utils.clock import MockedClock def test_produce() -> None: orig_topic = Topic("orig-topic") result_topic = Topic("result-topic") - clock = TestingClock() + clock = MockedClock() broker_storage: MemoryMessageStorage[KafkaPayload] = MemoryMessageStorage() broker: LocalBroker[KafkaPayload] = LocalBroker(broker_storage, clock) broker.create_topic(result_topic, partitions=1) diff --git a/tests/processing/test_processor.py b/tests/processing/test_processor.py index b669c61c..df0d59d7 100644 --- a/tests/processing/test_processor.py +++ b/tests/processing/test_processor.py @@ -19,7 +19,7 @@ ProcessingStrategyFactory, ) from arroyo.types import BrokerValue, Commit, Message, Partition, Topic -from arroyo.utils.clock import TestingClock +from arroyo.utils.clock import MockedClock from tests.assertions import assert_changes, assert_does_not_change from tests.metrics import Increment, TestingMetricsBackend, Timing @@ -523,7 +523,7 @@ def test_commit_policy_bench( storage: MessageStorage[int] = MemoryMessageStorage() storage.create_topic(topic, num_partitions) - broker = LocalBroker(storage, TestingClock()) + broker = LocalBroker(storage, MockedClock()) consumer = broker.get_consumer("test-group", enable_end_of_partition=True) diff --git a/tests/utils/test_retries.py b/tests/utils/test_retries.py index cb751945..6c871eb5 100644 --- a/tests/utils/test_retries.py +++ b/tests/utils/test_retries.py @@ -3,7 +3,7 @@ import pytest -from arroyo.utils.clock import TestingClock +from arroyo.utils.clock import MockedClock from arroyo.utils.retries import BasicRetryPolicy, RetryException, constant_delay value = object() @@ -45,7 +45,7 @@ def setup_function() -> None: def test_basic_retry_policy_no_delay() -> None: - clock = TestingClock() + clock = MockedClock() policy = BasicRetryPolicy(3, clock=clock) @@ -68,19 +68,19 @@ def test_basic_retry_policy_no_delay() -> None: @pytest.mark.parametrize("delay", [1, constant_delay(1)]) def test_basic_retry_policy_with_delay(delay: int) -> None: - clock = TestingClock() + clock = MockedClock() policy = BasicRetryPolicy(3, delay, clock=clock) assert policy.call(good_function) is value assert good_function.call_count == 1 assert clock.time() == 0 - clock = TestingClock() + clock = MockedClock() policy = BasicRetryPolicy(3, delay, clock=clock) assert policy.call(flaky_function) is value assert flaky_function.call_count == 2 assert clock.time() == 1 # one retry - clock = TestingClock() + clock = MockedClock() policy = BasicRetryPolicy(3, delay, clock=clock) try: policy.call(bad_function) @@ -109,7 +109,7 @@ def _test_function() -> None: def suppression_test(exception: Exception) -> bool: return isinstance(exception, ExpectedError) - clock = TestingClock() + clock = MockedClock() policy = BasicRetryPolicy( 3, constant_delay(1), suppression_test=suppression_test, clock=clock )