diff --git a/singer_sdk/streams/core.py b/singer_sdk/streams/core.py index 82172e101..6f2e7d731 100644 --- a/singer_sdk/streams/core.py +++ b/singer_sdk/streams/core.py @@ -5,6 +5,7 @@ import abc import copy import datetime +import enum import json import typing as t import warnings @@ -67,6 +68,11 @@ REPLICATION_LOG_BASED = "LOG_BASED" +class StreamSyncOutcome(enum.Flag): + SYNCED = 0 + SKIPPED = enum.auto() + + class Stream(metaclass=abc.ABCMeta): # noqa: PLR0904 """Abstract base class for tap streams. @@ -1116,6 +1122,7 @@ def _sync_records( # noqa: C901 # Initialize metrics record_counter = metrics.record_counter(self.name) timer = metrics.sync_timer(self.name) + outcome = StreamSyncOutcome.SKIPPED record_index = 0 context_element: types.Context | None @@ -1165,6 +1172,7 @@ def _sync_records( # noqa: C901 raise if selected: + outcome = StreamSyncOutcome.SYNCED if write_messages: self._write_record_message(record) @@ -1192,6 +1200,8 @@ def _sync_records( # noqa: C901 # Write final state message if we haven't already self._write_state_message() + return StreamSyncOutcome + def _sync_batches( self, batch_config: BatchConfig, diff --git a/singer_sdk/tap_base.py b/singer_sdk/tap_base.py index 398ef49f0..f0ae14cab 100644 --- a/singer_sdk/tap_base.py +++ b/singer_sdk/tap_base.py @@ -4,10 +4,10 @@ import abc import contextlib +import enum import pathlib import typing as t import warnings -from enum import Enum import click @@ -42,7 +42,7 @@ STREAM_MAPS_CONFIG = "stream_maps" -class CliTestOptionValue(Enum): +class CliTestOptionValue(enum.Enum): """Values for CLI option --test.""" All = "all" @@ -463,6 +463,7 @@ def sync_all(self) -> None: self.write_message(StateMessage(value=self.state)) stream: Stream + synced_count = 0 for stream in self.streams.values(): if not stream.selected and not stream.has_selected_descendents: self.logger.info("Skipping deselected stream '%s'.", stream.name) @@ -486,6 +487,9 @@ def sync_all(self) -> None: for stream in self.streams.values(): stream.log_sync_costs() + if not synced_count: + self.logger.warning("No streams selected for sync") + # Command Line Execution @classmethod diff --git a/tests/samples/test_tap_countries.py b/tests/samples/test_tap_countries.py index 5730eed96..a6de50028 100644 --- a/tests/samples/test_tap_countries.py +++ b/tests/samples/test_tap_countries.py @@ -5,6 +5,7 @@ import copy import io import json +import logging import typing as t from contextlib import redirect_stdout @@ -52,13 +53,19 @@ def test_countries_primary_key(): ) -def test_with_catalog_mismatch(): +def test_with_catalog_mismatch(caplog: pytest.LogCaptureFixture): """Test catalog apply with no matching stream catalog entries.""" tap = SampleTapCountries(config=None, catalog={"streams": []}) for stream in tap.streams.values(): # All streams should be deselected: assert not stream.selected + with caplog.at_level(logging.WARNING): + tap.sync_all() + + assert len(caplog.records) == 1 + assert caplog.records[0].message == "N" + def test_with_catalog_entry(): """Test catalog apply with a matching stream catalog entry for one stream.""" diff --git a/tests/samples/test_tap_sqlite.py b/tests/samples/test_tap_sqlite.py index a59c4a08e..ace5df46f 100644 --- a/tests/samples/test_tap_sqlite.py +++ b/tests/samples/test_tap_sqlite.py @@ -2,6 +2,7 @@ import datetime import json +import logging import typing as t import pytest @@ -131,6 +132,17 @@ def test_sync_sqlite_to_csv(sqlite_sample_tap: SQLTap, tmp_path: Path): ) +def test_sync_without_catalog( + sqlite_sample_db_config: dict, caplog: pytest.LogCaptureFixture +): + tap = SQLiteTap(config=sqlite_sample_db_config) + with caplog.at_level(logging.WARNING): + tap.sync_all() + + assert len(caplog.records) == 1 + assert caplog.records[0].message == "No streams selected for sync" + + @pytest.fixture @time_machine.travel( datetime.datetime(2022, 1, 1, tzinfo=datetime.timezone.utc),