Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: A warning is now emitted when no streams are selected #2791

Draft
wants to merge 3 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions singer_sdk/streams/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import abc
import copy
import datetime
import enum
import json
import typing as t
import warnings
Expand Down Expand Up @@ -67,6 +68,11 @@
REPLICATION_LOG_BASED = "LOG_BASED"


class StreamSyncOutcome(enum.Flag):
SYNCED = 0
SKIPPED = enum.auto()


class Stream(metaclass=abc.ABCMeta): # noqa: PLR0904
"""Abstract base class for tap streams.

Expand Down Expand Up @@ -1116,6 +1122,7 @@ def _sync_records( # noqa: C901
# Initialize metrics
record_counter = metrics.record_counter(self.name)
timer = metrics.sync_timer(self.name)
outcome = StreamSyncOutcome.SKIPPED

record_index = 0
context_element: types.Context | None
Expand Down Expand Up @@ -1165,6 +1172,7 @@ def _sync_records( # noqa: C901
raise

if selected:
outcome = StreamSyncOutcome.SYNCED
if write_messages:
self._write_record_message(record)

Expand Down Expand Up @@ -1192,6 +1200,8 @@ def _sync_records( # noqa: C901
# Write final state message if we haven't already
self._write_state_message()

return StreamSyncOutcome

def _sync_batches(
self,
batch_config: BatchConfig,
Expand Down
8 changes: 6 additions & 2 deletions singer_sdk/tap_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@

import abc
import contextlib
import enum
import pathlib
import typing as t
import warnings
from enum import Enum

import click

Expand Down Expand Up @@ -42,7 +42,7 @@
STREAM_MAPS_CONFIG = "stream_maps"


class CliTestOptionValue(Enum):
class CliTestOptionValue(enum.Enum):
"""Values for CLI option --test."""

All = "all"
Expand Down Expand Up @@ -463,6 +463,7 @@ def sync_all(self) -> None:
self.write_message(StateMessage(value=self.state))

stream: Stream
synced_count = 0
for stream in self.streams.values():
if not stream.selected and not stream.has_selected_descendents:
self.logger.info("Skipping deselected stream '%s'.", stream.name)
Expand All @@ -486,6 +487,9 @@ def sync_all(self) -> None:
for stream in self.streams.values():
stream.log_sync_costs()

if not synced_count:
self.logger.warning("No streams selected for sync")

# Command Line Execution

@classmethod
Expand Down
9 changes: 8 additions & 1 deletion tests/samples/test_tap_countries.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import copy
import io
import json
import logging
import typing as t
from contextlib import redirect_stdout

Expand Down Expand Up @@ -52,13 +53,19 @@ def test_countries_primary_key():
)


def test_with_catalog_mismatch():
def test_with_catalog_mismatch(caplog: pytest.LogCaptureFixture):
"""Test catalog apply with no matching stream catalog entries."""
tap = SampleTapCountries(config=None, catalog={"streams": []})
for stream in tap.streams.values():
# All streams should be deselected:
assert not stream.selected

with caplog.at_level(logging.WARNING):
tap.sync_all()

assert len(caplog.records) == 1
assert caplog.records[0].message == "N"


def test_with_catalog_entry():
"""Test catalog apply with a matching stream catalog entry for one stream."""
Expand Down
12 changes: 12 additions & 0 deletions tests/samples/test_tap_sqlite.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import datetime
import json
import logging
import typing as t

import pytest
Expand Down Expand Up @@ -131,6 +132,17 @@ def test_sync_sqlite_to_csv(sqlite_sample_tap: SQLTap, tmp_path: Path):
)


def test_sync_without_catalog(
sqlite_sample_db_config: dict, caplog: pytest.LogCaptureFixture
):
tap = SQLiteTap(config=sqlite_sample_db_config)
with caplog.at_level(logging.WARNING):
tap.sync_all()

assert len(caplog.records) == 1
assert caplog.records[0].message == "No streams selected for sync"


@pytest.fixture
@time_machine.travel(
datetime.datetime(2022, 1, 1, tzinfo=datetime.timezone.utc),
Expand Down
Loading