From 6eaca462aad4f6d887ec3274f7183208b55ed106 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Edgar=20Ram=C3=ADrez-Mondrag=C3=B3n?= Date: Mon, 2 Dec 2024 17:27:09 -0600 Subject: [PATCH] perf(taps): Improved discovery performance for SQL taps --- singer_sdk/connectors/sql.py | 109 ++++++++++++++++++++++++++++++----- 1 file changed, 95 insertions(+), 14 deletions(-) diff --git a/singer_sdk/connectors/sql.py b/singer_sdk/connectors/sql.py index f829fe897..1b88d5c25 100644 --- a/singer_sdk/connectors/sql.py +++ b/singer_sdk/connectors/sql.py @@ -13,6 +13,7 @@ from functools import lru_cache import sqlalchemy as sa +from sqlalchemy.engine import reflection from singer_sdk import typing as th from singer_sdk._singerlib import CatalogEntry, MetadataMapping, Schema @@ -964,6 +965,70 @@ def discover_catalog_entry( replication_key=None, # Must be defined by user ) + def _discover_catalog_entry_from_inspected( + self, + *, + table_name: str, + schema_name: str | None, + columns: list[reflection.ReflectedColumn], + primary_key: reflection.ReflectedPrimaryKeyConstraint | None, + unique_constraints: list[reflection.ReflectedUniqueConstraint], + is_view: bool = False, + ) -> CatalogEntry: + unique_stream_id = f"{schema_name}-{table_name}" if schema_name else table_name + + # Detect key properties + possible_primary_keys: list[list[str]] = [] + if primary_key and "constrained_columns" in primary_key: + possible_primary_keys.append(primary_key["constrained_columns"]) + + # Check UNIQUE constraints + possible_primary_keys.extend( + unique_constraint["column_names"] + for unique_constraint in unique_constraints + ) + + key_properties = next(iter(possible_primary_keys), []) + + # Initialize columns list + properties = [ + th.Property( + name=column["name"], + wrapped=th.CustomType(self.to_jsonschema_type(column["type"])), + nullable=column.get("nullable", False), + required=column["name"] in key_properties, + description=column.get("comment"), + ) + for column in columns + ] + schema = th.PropertiesList(*properties).to_dict() + + # Initialize available replication methods + addl_replication_methods: list[str] = [] # By default an empty list. + replication_method = next(reversed(["FULL_TABLE", *addl_replication_methods])) + + # Create the catalog entry object + return CatalogEntry( + tap_stream_id=unique_stream_id, + stream=unique_stream_id, + table=table_name, + key_properties=key_properties, + schema=Schema.from_dict(schema), + is_view=is_view, + replication_method=replication_method, + metadata=MetadataMapping.get_standard_metadata( + schema_name=schema_name, + schema=schema, + replication_method=replication_method, + key_properties=key_properties, + valid_replication_keys=None, # Must be defined by user + ), + database=None, # Expects single-database context + row_count=None, + stream_alias=None, + replication_key=None, # Must be defined by user + ) + def discover_catalog_entries( self, *, @@ -983,25 +1048,41 @@ def discover_catalog_entries( result: list[dict] = [] engine = self._engine inspected = sa.inspect(engine) + object_kinds = ( + (reflection.ObjectKind.TABLE, False), + (reflection.ObjectKind.ANY_VIEW, True), + ) for schema_name in self.get_schema_names(engine, inspected): if schema_name in exclude_schemas: continue - # Iterate through each table and view - for table_name, is_view in self.get_object_names( - engine, - inspected, - schema_name, - ): - catalog_entry = self.discover_catalog_entry( - engine, - inspected, - schema_name, - table_name, - is_view, - reflect_indices=reflect_indices, + columns = inspected.get_multi_columns(schema=schema_name) + primary_keys = inspected.get_multi_pk_constraint(schema=schema_name) + + if reflect_indices: + unique_constraints = inspected.get_multi_unique_constraints( + schema=schema_name, + ) + else: + unique_constraints = {} + + for object_kind, is_view in object_kinds: + columns = inspected.get_multi_columns( + schema=schema_name, + kind=object_kind, + ) + + result.extend( + self._discover_catalog_entry_from_inspected( + table_name=table, + schema_name=schema, + columns=columns[schema, table], + primary_key=primary_keys.get((schema, table)), + unique_constraints=unique_constraints.get((schema, table), []), + is_view=is_view, + ).to_dict() + for schema, table in columns ) - result.append(catalog_entry.to_dict()) return result