From 1a1950c77ded40a6ead9ee7c82ba11a165fd7a95 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Edgar=20Ram=C3=ADrez-Mondrag=C3=B3n?= Date: Mon, 17 Jun 2024 16:35:52 +0200 Subject: [PATCH] feat: Allow SQL tap developers to auto-skip certain stream names from discovery --- singer_sdk/connectors/sql.py | 33 ++++++++++++++++++++++++-------- singer_sdk/tap_base.py | 7 ++++++- tests/core/test_connector_sql.py | 26 +++++++++++++++++++++++++ 3 files changed, 57 insertions(+), 9 deletions(-) diff --git a/singer_sdk/connectors/sql.py b/singer_sdk/connectors/sql.py index 540de023e6..f829fe897c 100644 --- a/singer_sdk/connectors/sql.py +++ b/singer_sdk/connectors/sql.py @@ -881,6 +881,8 @@ def discover_catalog_entry( schema_name: str, table_name: str, is_view: bool, # noqa: FBT001 + *, + reflect_indices: bool = True, ) -> CatalogEntry: """Create `CatalogEntry` object for the given table or a view. @@ -890,6 +892,7 @@ def discover_catalog_entry( schema_name: Schema name to inspect table_name: Name of the table or a view is_view: Flag whether this object is a view, returned by `get_object_names` + reflect_indices: Whether to reflect indices Returns: `CatalogEntry` object for the given table or a view @@ -905,11 +908,12 @@ def discover_catalog_entry( # An element of the columns list is ``None`` if it's an expression and is # returned in the ``expressions`` list of the reflected index. - possible_primary_keys.extend( - index_def["column_names"] # type: ignore[misc] - for index_def in inspected.get_indexes(table_name, schema=schema_name) - if index_def.get("unique", False) - ) + if reflect_indices: + possible_primary_keys.extend( + index_def["column_names"] # type: ignore[misc] + for index_def in inspected.get_indexes(table_name, schema=schema_name) + if index_def.get("unique", False) + ) key_properties = next(iter(possible_primary_keys), None) @@ -960,9 +964,19 @@ def discover_catalog_entry( replication_key=None, # Must be defined by user ) - def discover_catalog_entries(self) -> list[dict]: + def discover_catalog_entries( + self, + *, + exclude_schemas: t.Sequence[str] = (), + reflect_indices: bool = True, + ) -> list[dict]: """Return a list of catalog entries from discovery. + Args: + exclude_schemas: A list of schema names to exclude from discovery. + reflect_indices: Whether to reflect indices to detect potential primary + keys. + Returns: The discovered catalog entries as a list. """ @@ -970,6 +984,9 @@ def discover_catalog_entries(self) -> list[dict]: engine = self._engine inspected = sa.inspect(engine) for schema_name in self.get_schema_names(engine, inspected): + if schema_name in exclude_schemas: + continue + # Iterate through each table and view for table_name, is_view in self.get_object_names( engine, @@ -982,6 +999,7 @@ def discover_catalog_entries(self) -> list[dict]: schema_name, table_name, is_view, + reflect_indices=reflect_indices, ) result.append(catalog_entry.to_dict()) @@ -1217,8 +1235,7 @@ def prepare_schema(self, schema_name: str) -> None: Args: schema_name: The target schema name. """ - schema_exists = self.schema_exists(schema_name) - if not schema_exists: + if not self.schema_exists(schema_name): self.create_schema(schema_name) def prepare_table( diff --git a/singer_sdk/tap_base.py b/singer_sdk/tap_base.py index cccbe4fd5e..68c9179788 100644 --- a/singer_sdk/tap_base.py +++ b/singer_sdk/tap_base.py @@ -657,6 +657,9 @@ class SQLTap(Tap): querying a database's system tables). """ + skip_schemas: t.Sequence[str] = [] + """Hard-coded list of stream names to skip when discovering the catalog.""" + _tap_connector: SQLConnector | None = None def __init__(self, *args: t.Any, **kwargs: t.Any) -> None: @@ -700,7 +703,9 @@ def catalog_dict(self) -> dict: connector = self.tap_connector result: dict[str, list[dict]] = {"streams": []} - result["streams"].extend(connector.discover_catalog_entries()) + result["streams"].extend( + connector.discover_catalog_entries(skip_schemas=self.skip_schemas), + ) self._catalog_dict = result return self._catalog_dict diff --git a/tests/core/test_connector_sql.py b/tests/core/test_connector_sql.py index b4452d961f..a8b39521e5 100644 --- a/tests/core/test_connector_sql.py +++ b/tests/core/test_connector_sql.py @@ -351,6 +351,32 @@ def test_adapt_column_type(self, connector: DuckDBConnector): assert result.keys() == ["id", "name"] assert result.cursor.description[1][1] == "STRING" + @pytest.mark.parametrize( + "exclude_schemas,expected_streams", + [ + ([], 1), + (["memory.my_schema"], 0), + ], + ) + def test_discover_catalog_entries_exclude_schemas( + self, + connector: DuckDBConnector, + exclude_schemas: list[str], + expected_streams: int, + ): + with connector._engine.connect() as conn, conn.begin(): + conn.execute(sa.text("CREATE SCHEMA my_schema")) + conn.execute( + sa.text( + "CREATE TABLE my_schema.test_table (id INTEGER PRIMARY KEY, name STRING)", # noqa: E501 + ) + ) + entries = connector.discover_catalog_entries( + exclude_schemas=exclude_schemas, + reflect_indices=False, + ) + assert len(entries) == expected_streams + def test_adapter_without_json_serde(): registry.register(