Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: Fix a few warnings #408

Merged
merged 1 commit into from
Dec 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion arroyo/backends/kafka/commit.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,6 @@ def decode_legacy(self, value: KafkaPayload) -> Commit:

if max_times_to_log_legacy_message > 0:
max_times_to_log_legacy_message -= 1
logger.warn(f"Legacy commit message found: {commit}")
logger.warning("Legacy commit message found: %s", commit)

return commit
2 changes: 1 addition & 1 deletion arroyo/utils/clock.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ def sleep(self, duration: float) -> None:
time.sleep(duration)


class TestingClock(Clock):
class MockedClock(Clock):
"""
A clock implementation that uses a stable time for testing. To advance
the time, use the ``sleep`` method.
Expand Down
4 changes: 2 additions & 2 deletions tests/backends/test_local.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,14 @@
)
from arroyo.backends.local.storages.memory import MemoryMessageStorage
from arroyo.types import Partition, Topic
from arroyo.utils.clock import TestingClock
from arroyo.utils.clock import MockedClock
from tests.backends.mixins import StreamsTestMixin


class LocalStreamsTestMixin(StreamsTestMixin[int]):
def setUp(self) -> None:
self.storage = self.get_message_storage()
self.broker = LocalBroker(self.storage, TestingClock())
self.broker = LocalBroker(self.storage, MockedClock())

@abstractmethod
def get_message_storage(self) -> MessageStorage[int]:
Expand Down
4 changes: 2 additions & 2 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from arroyo.backends.local.backend import LocalBroker
from arroyo.backends.local.storages.memory import MemoryMessageStorage
from arroyo.types import TStrategyPayload
from arroyo.utils.clock import TestingClock
from arroyo.utils.clock import MockedClock
from arroyo.utils.metrics import configure_metrics
from tests.metrics import TestingMetricsBackend

Expand All @@ -24,7 +24,7 @@ def clear_metrics_state() -> Iterator[None]:

@pytest.fixture
def broker() -> Iterator[LocalBroker[TStrategyPayload]]:
yield LocalBroker(MemoryMessageStorage(), TestingClock())
yield LocalBroker(MemoryMessageStorage(), MockedClock())


@pytest.fixture(autouse=True)
Expand Down
4 changes: 2 additions & 2 deletions tests/processing/strategies/test_produce.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,13 @@
from arroyo.processing.strategies.abstract import MessageRejected
from arroyo.processing.strategies.produce import Produce
from arroyo.types import Message, Partition, Topic, Value
from arroyo.utils.clock import TestingClock
from arroyo.utils.clock import MockedClock


def test_produce() -> None:
orig_topic = Topic("orig-topic")
result_topic = Topic("result-topic")
clock = TestingClock()
clock = MockedClock()
broker_storage: MemoryMessageStorage[KafkaPayload] = MemoryMessageStorage()
broker: LocalBroker[KafkaPayload] = LocalBroker(broker_storage, clock)
broker.create_topic(result_topic, partitions=1)
Expand Down
4 changes: 2 additions & 2 deletions tests/processing/test_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
ProcessingStrategyFactory,
)
from arroyo.types import BrokerValue, Commit, Message, Partition, Topic
from arroyo.utils.clock import TestingClock
from arroyo.utils.clock import MockedClock
from tests.assertions import assert_changes, assert_does_not_change
from tests.metrics import Increment, TestingMetricsBackend, Timing

Expand Down Expand Up @@ -523,7 +523,7 @@ def test_commit_policy_bench(
storage: MessageStorage[int] = MemoryMessageStorage()
storage.create_topic(topic, num_partitions)

broker = LocalBroker(storage, TestingClock())
broker = LocalBroker(storage, MockedClock())

consumer = broker.get_consumer("test-group", enable_end_of_partition=True)

Expand Down
12 changes: 6 additions & 6 deletions tests/utils/test_retries.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

import pytest

from arroyo.utils.clock import TestingClock
from arroyo.utils.clock import MockedClock
from arroyo.utils.retries import BasicRetryPolicy, RetryException, constant_delay

value = object()
Expand Down Expand Up @@ -45,7 +45,7 @@ def setup_function() -> None:

def test_basic_retry_policy_no_delay() -> None:

clock = TestingClock()
clock = MockedClock()

policy = BasicRetryPolicy(3, clock=clock)

Expand All @@ -68,19 +68,19 @@ def test_basic_retry_policy_no_delay() -> None:

@pytest.mark.parametrize("delay", [1, constant_delay(1)])
def test_basic_retry_policy_with_delay(delay: int) -> None:
clock = TestingClock()
clock = MockedClock()
policy = BasicRetryPolicy(3, delay, clock=clock)
assert policy.call(good_function) is value
assert good_function.call_count == 1
assert clock.time() == 0

clock = TestingClock()
clock = MockedClock()
policy = BasicRetryPolicy(3, delay, clock=clock)
assert policy.call(flaky_function) is value
assert flaky_function.call_count == 2
assert clock.time() == 1 # one retry

clock = TestingClock()
clock = MockedClock()
policy = BasicRetryPolicy(3, delay, clock=clock)
try:
policy.call(bad_function)
Expand Down Expand Up @@ -109,7 +109,7 @@ def _test_function() -> None:
def suppression_test(exception: Exception) -> bool:
return isinstance(exception, ExpectedError)

clock = TestingClock()
clock = MockedClock()
policy = BasicRetryPolicy(
3, constant_delay(1), suppression_test=suppression_test, clock=clock
)
Expand Down
Loading