From 6eaca462aad4f6d887ec3274f7183208b55ed106 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Edgar=20Ram=C3=ADrez-Mondrag=C3=B3n?= Date: Mon, 2 Dec 2024 17:27:09 -0600 Subject: [PATCH 1/8] perf(taps): Improved discovery performance for SQL taps --- singer_sdk/connectors/sql.py | 109 ++++++++++++++++++++++++++++++----- 1 file changed, 95 insertions(+), 14 deletions(-) diff --git a/singer_sdk/connectors/sql.py b/singer_sdk/connectors/sql.py index f829fe897..1b88d5c25 100644 --- a/singer_sdk/connectors/sql.py +++ b/singer_sdk/connectors/sql.py @@ -13,6 +13,7 @@ from functools import lru_cache import sqlalchemy as sa +from sqlalchemy.engine import reflection from singer_sdk import typing as th from singer_sdk._singerlib import CatalogEntry, MetadataMapping, Schema @@ -964,6 +965,70 @@ def discover_catalog_entry( replication_key=None, # Must be defined by user ) + def _discover_catalog_entry_from_inspected( + self, + *, + table_name: str, + schema_name: str | None, + columns: list[reflection.ReflectedColumn], + primary_key: reflection.ReflectedPrimaryKeyConstraint | None, + unique_constraints: list[reflection.ReflectedUniqueConstraint], + is_view: bool = False, + ) -> CatalogEntry: + unique_stream_id = f"{schema_name}-{table_name}" if schema_name else table_name + + # Detect key properties + possible_primary_keys: list[list[str]] = [] + if primary_key and "constrained_columns" in primary_key: + possible_primary_keys.append(primary_key["constrained_columns"]) + + # Check UNIQUE constraints + possible_primary_keys.extend( + unique_constraint["column_names"] + for unique_constraint in unique_constraints + ) + + key_properties = next(iter(possible_primary_keys), []) + + # Initialize columns list + properties = [ + th.Property( + name=column["name"], + wrapped=th.CustomType(self.to_jsonschema_type(column["type"])), + nullable=column.get("nullable", False), + required=column["name"] in key_properties, + description=column.get("comment"), + ) + for column in columns + ] + schema = th.PropertiesList(*properties).to_dict() + + # Initialize available replication methods + addl_replication_methods: list[str] = [] # By default an empty list. + replication_method = next(reversed(["FULL_TABLE", *addl_replication_methods])) + + # Create the catalog entry object + return CatalogEntry( + tap_stream_id=unique_stream_id, + stream=unique_stream_id, + table=table_name, + key_properties=key_properties, + schema=Schema.from_dict(schema), + is_view=is_view, + replication_method=replication_method, + metadata=MetadataMapping.get_standard_metadata( + schema_name=schema_name, + schema=schema, + replication_method=replication_method, + key_properties=key_properties, + valid_replication_keys=None, # Must be defined by user + ), + database=None, # Expects single-database context + row_count=None, + stream_alias=None, + replication_key=None, # Must be defined by user + ) + def discover_catalog_entries( self, *, @@ -983,25 +1048,41 @@ def discover_catalog_entries( result: list[dict] = [] engine = self._engine inspected = sa.inspect(engine) + object_kinds = ( + (reflection.ObjectKind.TABLE, False), + (reflection.ObjectKind.ANY_VIEW, True), + ) for schema_name in self.get_schema_names(engine, inspected): if schema_name in exclude_schemas: continue - # Iterate through each table and view - for table_name, is_view in self.get_object_names( - engine, - inspected, - schema_name, - ): - catalog_entry = self.discover_catalog_entry( - engine, - inspected, - schema_name, - table_name, - is_view, - reflect_indices=reflect_indices, + columns = inspected.get_multi_columns(schema=schema_name) + primary_keys = inspected.get_multi_pk_constraint(schema=schema_name) + + if reflect_indices: + unique_constraints = inspected.get_multi_unique_constraints( + schema=schema_name, + ) + else: + unique_constraints = {} + + for object_kind, is_view in object_kinds: + columns = inspected.get_multi_columns( + schema=schema_name, + kind=object_kind, + ) + + result.extend( + self._discover_catalog_entry_from_inspected( + table_name=table, + schema_name=schema, + columns=columns[schema, table], + primary_key=primary_keys.get((schema, table)), + unique_constraints=unique_constraints.get((schema, table), []), + is_view=is_view, + ).to_dict() + for schema, table in columns ) - result.append(catalog_entry.to_dict()) return result From 1bf62d92b648e8e5ee3c171c8a71d243cd87eaa3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Edgar=20Ram=C3=ADrez=20Mondrag=C3=B3n?= Date: Tue, 3 Dec 2024 10:40:19 -0600 Subject: [PATCH 2/8] Discover constraints scoped by object type --- singer_sdk/connectors/sql.py | 21 +++++++++++---------- 1 file changed, 11 insertions(+), 10 deletions(-) diff --git a/singer_sdk/connectors/sql.py b/singer_sdk/connectors/sql.py index 1b88d5c25..c5f79ea93 100644 --- a/singer_sdk/connectors/sql.py +++ b/singer_sdk/connectors/sql.py @@ -1056,21 +1056,22 @@ def discover_catalog_entries( if schema_name in exclude_schemas: continue - columns = inspected.get_multi_columns(schema=schema_name) - primary_keys = inspected.get_multi_pk_constraint(schema=schema_name) - - if reflect_indices: - unique_constraints = inspected.get_multi_unique_constraints( - schema=schema_name, - ) - else: - unique_constraints = {} - for object_kind, is_view in object_kinds: columns = inspected.get_multi_columns( schema=schema_name, kind=object_kind, ) + primary_keys = inspected.get_multi_pk_constraint( + schema=schema_name, kind=object_kind + ) + + if reflect_indices: + unique_constraints = inspected.get_multi_unique_constraints( + schema=schema_name, + kind=object_kind, + ) + else: + unique_constraints = {} result.extend( self._discover_catalog_entry_from_inspected( From d29108485a35578610efab452b3eb936acd3a168 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Edgar=20Ram=C3=ADrez=20Mondrag=C3=B3n?= Date: Tue, 3 Dec 2024 10:51:32 -0600 Subject: [PATCH 3/8] Inspect only once per schema --- singer_sdk/connectors/sql.py | 20 ++++++++------------ 1 file changed, 8 insertions(+), 12 deletions(-) diff --git a/singer_sdk/connectors/sql.py b/singer_sdk/connectors/sql.py index c5f79ea93..eee54c195 100644 --- a/singer_sdk/connectors/sql.py +++ b/singer_sdk/connectors/sql.py @@ -1056,22 +1056,18 @@ def discover_catalog_entries( if schema_name in exclude_schemas: continue + primary_keys = inspected.get_multi_pk_constraint(schema=schema_name) + + if reflect_indices: + constraints = inspected.get_multi_unique_constraints(schema=schema_name) + else: + constraints = {} + for object_kind, is_view in object_kinds: columns = inspected.get_multi_columns( schema=schema_name, kind=object_kind, ) - primary_keys = inspected.get_multi_pk_constraint( - schema=schema_name, kind=object_kind - ) - - if reflect_indices: - unique_constraints = inspected.get_multi_unique_constraints( - schema=schema_name, - kind=object_kind, - ) - else: - unique_constraints = {} result.extend( self._discover_catalog_entry_from_inspected( @@ -1079,7 +1075,7 @@ def discover_catalog_entries( schema_name=schema, columns=columns[schema, table], primary_key=primary_keys.get((schema, table)), - unique_constraints=unique_constraints.get((schema, table), []), + unique_constraints=constraints.get((schema, table), []), is_view=is_view, ).to_dict() for schema, table in columns From a5672540ff4fc713873dca619ccc322142ca5c66 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Edgar=20Ram=C3=ADrez=20Mondrag=C3=B3n?= Date: Tue, 3 Dec 2024 11:02:21 -0600 Subject: [PATCH 4/8] Try indices instead of UNIQUE constraints --- singer_sdk/connectors/sql.py | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/singer_sdk/connectors/sql.py b/singer_sdk/connectors/sql.py index eee54c195..374390b64 100644 --- a/singer_sdk/connectors/sql.py +++ b/singer_sdk/connectors/sql.py @@ -972,7 +972,7 @@ def _discover_catalog_entry_from_inspected( schema_name: str | None, columns: list[reflection.ReflectedColumn], primary_key: reflection.ReflectedPrimaryKeyConstraint | None, - unique_constraints: list[reflection.ReflectedUniqueConstraint], + indices: list[reflection.ReflectedIndex], is_view: bool = False, ) -> CatalogEntry: unique_stream_id = f"{schema_name}-{table_name}" if schema_name else table_name @@ -982,10 +982,11 @@ def _discover_catalog_entry_from_inspected( if primary_key and "constrained_columns" in primary_key: possible_primary_keys.append(primary_key["constrained_columns"]) - # Check UNIQUE constraints + # Get key property candidates from indices possible_primary_keys.extend( - unique_constraint["column_names"] - for unique_constraint in unique_constraints + index_def["column_names"] # type: ignore[misc] + for index_def in indices + if index_def.get("unique", False) ) key_properties = next(iter(possible_primary_keys), []) @@ -1059,9 +1060,9 @@ def discover_catalog_entries( primary_keys = inspected.get_multi_pk_constraint(schema=schema_name) if reflect_indices: - constraints = inspected.get_multi_unique_constraints(schema=schema_name) + indices = inspected.get_multi_indexes(schema=schema_name) else: - constraints = {} + indices = {} for object_kind, is_view in object_kinds: columns = inspected.get_multi_columns( @@ -1075,7 +1076,7 @@ def discover_catalog_entries( schema_name=schema, columns=columns[schema, table], primary_key=primary_keys.get((schema, table)), - unique_constraints=constraints.get((schema, table), []), + indices=indices.get((schema, table), []), is_view=is_view, ).to_dict() for schema, table in columns From bea370d0f23ac9dd39a44946029bea73c554ff95 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Edgar=20Ram=C3=ADrez=20Mondrag=C3=B3n?= Date: Tue, 3 Dec 2024 11:17:51 -0600 Subject: [PATCH 5/8] Extend existing API --- singer_sdk/connectors/sql.py | 122 +++++++++++------------------------ 1 file changed, 37 insertions(+), 85 deletions(-) diff --git a/singer_sdk/connectors/sql.py b/singer_sdk/connectors/sql.py index 374390b64..e8fd7be7b 100644 --- a/singer_sdk/connectors/sql.py +++ b/singer_sdk/connectors/sql.py @@ -879,11 +879,14 @@ def discover_catalog_entry( self, engine: Engine, # noqa: ARG002 inspected: Inspector, - schema_name: str, + schema_name: str | None, table_name: str, is_view: bool, # noqa: FBT001 *, reflect_indices: bool = True, + reflected_columns: list[reflection.ReflectedColumn] | None = None, + reflected_pk: reflection.ReflectedPrimaryKeyConstraint | None = None, + reflected_indices: list[reflection.ReflectedIndex] | None = None, ) -> CatalogEntry: """Create `CatalogEntry` object for the given table or a view. @@ -894,104 +897,45 @@ def discover_catalog_entry( table_name: Name of the table or a view is_view: Flag whether this object is a view, returned by `get_object_names` reflect_indices: Whether to reflect indices + reflected_columns: List of reflected columns + reflected_pk: Reflected primary key + reflected_indices: List of reflected indices Returns: `CatalogEntry` object for the given table or a view """ # Initialize unique stream name - unique_stream_id = f"{schema_name}-{table_name}" + unique_stream_id = f"{schema_name}-{table_name}" if schema_name else table_name # Detect key properties possible_primary_keys: list[list[str]] = [] - pk_def = inspected.get_pk_constraint(table_name, schema=schema_name) + pk_def = reflected_pk or inspected.get_pk_constraint( + table_name, + schema=schema_name, + ) if pk_def and "constrained_columns" in pk_def: # type: ignore[redundant-expr] possible_primary_keys.append(pk_def["constrained_columns"]) # An element of the columns list is ``None`` if it's an expression and is # returned in the ``expressions`` list of the reflected index. if reflect_indices: + indexes = reflected_indices or inspected.get_indexes( + table_name, + schema=schema_name, + ) possible_primary_keys.extend( index_def["column_names"] # type: ignore[misc] - for index_def in inspected.get_indexes(table_name, schema=schema_name) + for index_def in indexes if index_def.get("unique", False) ) - key_properties = next(iter(possible_primary_keys), None) - - # Initialize columns list - table_schema = th.PropertiesList() - for column_def in inspected.get_columns(table_name, schema=schema_name): - column_name = column_def["name"] - is_nullable = column_def.get("nullable", False) - jsonschema_type: dict = self.to_jsonschema_type(column_def["type"]) - table_schema.append( - th.Property( - name=column_name, - wrapped=th.CustomType(jsonschema_type), - nullable=is_nullable, - required=column_name in key_properties if key_properties else False, - ), - ) - schema = table_schema.to_dict() - - # Initialize available replication methods - addl_replication_methods: list[str] = [""] # By default an empty list. - # Notes regarding replication methods: - # - 'INCREMENTAL' replication must be enabled by the user by specifying - # a replication_key value. - # - 'LOG_BASED' replication must be enabled by the developer, according - # to source-specific implementation capabilities. - replication_method = next(reversed(["FULL_TABLE", *addl_replication_methods])) - - # Create the catalog entry object - return CatalogEntry( - tap_stream_id=unique_stream_id, - stream=unique_stream_id, - table=table_name, - key_properties=key_properties, - schema=Schema.from_dict(schema), - is_view=is_view, - replication_method=replication_method, - metadata=MetadataMapping.get_standard_metadata( - schema_name=schema_name, - schema=schema, - replication_method=replication_method, - key_properties=key_properties, - valid_replication_keys=None, # Must be defined by user - ), - database=None, # Expects single-database context - row_count=None, - stream_alias=None, - replication_key=None, # Must be defined by user - ) - - def _discover_catalog_entry_from_inspected( - self, - *, - table_name: str, - schema_name: str | None, - columns: list[reflection.ReflectedColumn], - primary_key: reflection.ReflectedPrimaryKeyConstraint | None, - indices: list[reflection.ReflectedIndex], - is_view: bool = False, - ) -> CatalogEntry: - unique_stream_id = f"{schema_name}-{table_name}" if schema_name else table_name - - # Detect key properties - possible_primary_keys: list[list[str]] = [] - if primary_key and "constrained_columns" in primary_key: - possible_primary_keys.append(primary_key["constrained_columns"]) - - # Get key property candidates from indices - possible_primary_keys.extend( - index_def["column_names"] # type: ignore[misc] - for index_def in indices - if index_def.get("unique", False) - ) - key_properties = next(iter(possible_primary_keys), []) # Initialize columns list + columns = reflected_columns or inspected.get_columns( + table_name, + schema=schema_name, + ) properties = [ th.Property( name=column["name"], @@ -1005,7 +949,12 @@ def _discover_catalog_entry_from_inspected( schema = th.PropertiesList(*properties).to_dict() # Initialize available replication methods - addl_replication_methods: list[str] = [] # By default an empty list. + addl_replication_methods: list[str] = [""] # By default an empty list. + # Notes regarding replication methods: + # - 'INCREMENTAL' replication must be enabled by the user by specifying + # a replication_key value. + # - 'LOG_BASED' replication must be enabled by the developer, according + # to source-specific implementation capabilities. replication_method = next(reversed(["FULL_TABLE", *addl_replication_methods])) # Create the catalog entry object @@ -1071,13 +1020,16 @@ def discover_catalog_entries( ) result.extend( - self._discover_catalog_entry_from_inspected( - table_name=table, - schema_name=schema, - columns=columns[schema, table], - primary_key=primary_keys.get((schema, table)), - indices=indices.get((schema, table), []), - is_view=is_view, + self.discover_catalog_entry( + engine, + inspected, + schema_name, + table, + is_view, + reflect_indices=reflect_indices, + reflected_columns=columns[schema, table], + reflected_pk=primary_keys.get((schema, table)), + reflected_indices=indices.get((schema, table)), ).to_dict() for schema, table in columns ) From 307d4d9b4b6cf426c7e108eb149a5fbfb93d1b39 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Edgar=20Ram=C3=ADrez=20Mondrag=C3=B3n?= Date: Tue, 3 Dec 2024 11:44:30 -0600 Subject: [PATCH 6/8] Avoid unnecessarily reflecting stuff --- singer_sdk/connectors/sql.py | 41 ++++++++++++------------------------ 1 file changed, 13 insertions(+), 28 deletions(-) diff --git a/singer_sdk/connectors/sql.py b/singer_sdk/connectors/sql.py index e8fd7be7b..3a4912160 100644 --- a/singer_sdk/connectors/sql.py +++ b/singer_sdk/connectors/sql.py @@ -878,15 +878,14 @@ def get_object_names( def discover_catalog_entry( self, engine: Engine, # noqa: ARG002 - inspected: Inspector, + inspected: Inspector, # noqa: ARG002 schema_name: str | None, table_name: str, is_view: bool, # noqa: FBT001 *, - reflect_indices: bool = True, - reflected_columns: list[reflection.ReflectedColumn] | None = None, - reflected_pk: reflection.ReflectedPrimaryKeyConstraint | None = None, - reflected_indices: list[reflection.ReflectedIndex] | None = None, + reflected_columns: list[reflection.ReflectedColumn], + reflected_pk: reflection.ReflectedPrimaryKeyConstraint | None, + reflected_indices: list[reflection.ReflectedIndex], ) -> CatalogEntry: """Create `CatalogEntry` object for the given table or a view. @@ -909,33 +908,20 @@ def discover_catalog_entry( # Detect key properties possible_primary_keys: list[list[str]] = [] - pk_def = reflected_pk or inspected.get_pk_constraint( - table_name, - schema=schema_name, - ) - if pk_def and "constrained_columns" in pk_def: # type: ignore[redundant-expr] - possible_primary_keys.append(pk_def["constrained_columns"]) + if reflected_pk and "constrained_columns" in reflected_pk: + possible_primary_keys.append(reflected_pk["constrained_columns"]) # An element of the columns list is ``None`` if it's an expression and is # returned in the ``expressions`` list of the reflected index. - if reflect_indices: - indexes = reflected_indices or inspected.get_indexes( - table_name, - schema=schema_name, - ) - possible_primary_keys.extend( - index_def["column_names"] # type: ignore[misc] - for index_def in indexes - if index_def.get("unique", False) - ) + possible_primary_keys.extend( + index_def["column_names"] # type: ignore[misc] + for index_def in reflected_indices + if index_def.get("unique", False) + ) key_properties = next(iter(possible_primary_keys), []) # Initialize columns list - columns = reflected_columns or inspected.get_columns( - table_name, - schema=schema_name, - ) properties = [ th.Property( name=column["name"], @@ -944,7 +930,7 @@ def discover_catalog_entry( required=column["name"] in key_properties, description=column.get("comment"), ) - for column in columns + for column in reflected_columns ] schema = th.PropertiesList(*properties).to_dict() @@ -1026,10 +1012,9 @@ def discover_catalog_entries( schema_name, table, is_view, - reflect_indices=reflect_indices, reflected_columns=columns[schema, table], reflected_pk=primary_keys.get((schema, table)), - reflected_indices=indices.get((schema, table)), + reflected_indices=indices.get((schema, table), []), ).to_dict() for schema, table in columns ) From edd2d995fcf69940cedc6239604ede1adade7470 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Edgar=20Ram=C3=ADrez=20Mondrag=C3=B3n?= Date: Tue, 3 Dec 2024 11:51:41 -0600 Subject: [PATCH 7/8] Backwards-compatible API --- singer_sdk/connectors/sql.py | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/singer_sdk/connectors/sql.py b/singer_sdk/connectors/sql.py index 3a4912160..6b3c3788e 100644 --- a/singer_sdk/connectors/sql.py +++ b/singer_sdk/connectors/sql.py @@ -883,9 +883,9 @@ def discover_catalog_entry( table_name: str, is_view: bool, # noqa: FBT001 *, - reflected_columns: list[reflection.ReflectedColumn], - reflected_pk: reflection.ReflectedPrimaryKeyConstraint | None, - reflected_indices: list[reflection.ReflectedIndex], + reflected_columns: list[reflection.ReflectedColumn] | None = None, + reflected_pk: reflection.ReflectedPrimaryKeyConstraint | None = None, + reflected_indices: list[reflection.ReflectedIndex] | None = None, ) -> CatalogEntry: """Create `CatalogEntry` object for the given table or a view. @@ -906,6 +906,10 @@ def discover_catalog_entry( # Initialize unique stream name unique_stream_id = f"{schema_name}-{table_name}" if schema_name else table_name + # Backwards-compatibility + reflected_columns = reflected_columns or [] + reflected_indices = reflected_indices or [] + # Detect key properties possible_primary_keys: list[list[str]] = [] if reflected_pk and "constrained_columns" in reflected_pk: From 304e2062d3128df378ff6b9a250b866acc84630f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Edgar=20Ram=C3=ADrez=20Mondrag=C3=B3n?= Date: Tue, 3 Dec 2024 16:44:49 -0600 Subject: [PATCH 8/8] Deprecate `SQLConnector.get_object_names` --- samples/sample_tap_bigquery/__init__.py | 22 ---------------------- singer_sdk/connectors/sql.py | 8 +++++++- 2 files changed, 7 insertions(+), 23 deletions(-) diff --git a/samples/sample_tap_bigquery/__init__.py b/samples/sample_tap_bigquery/__init__.py index ad0d3c3cb..a91f372eb 100644 --- a/samples/sample_tap_bigquery/__init__.py +++ b/samples/sample_tap_bigquery/__init__.py @@ -13,28 +13,6 @@ def get_sqlalchemy_url(self, config: dict) -> str: # noqa: PLR6301 """Concatenate a SQLAlchemy URL for use in connecting to the source.""" return f"bigquery://{config['project_id']}" - def get_object_names( - self, - engine, - inspected, - schema_name: str, - ) -> list[tuple[str, bool]]: - """Return discoverable object names.""" - # Bigquery inspections returns table names in the form - # `schema_name.table_name` which later results in the project name - # override due to specifics in behavior of sqlalchemy-bigquery - # - # Let's strip `schema_name` prefix on the inspection - - return [ - (table_name.split(".")[-1], is_view) - for (table_name, is_view) in super().get_object_names( - engine, - inspected, - schema_name, - ) - ] - class BigQueryStream(SQLStream): """Stream class for BigQuery streams.""" diff --git a/singer_sdk/connectors/sql.py b/singer_sdk/connectors/sql.py index 6b3c3788e..3302a249f 100644 --- a/singer_sdk/connectors/sql.py +++ b/singer_sdk/connectors/sql.py @@ -18,6 +18,7 @@ from singer_sdk import typing as th from singer_sdk._singerlib import CatalogEntry, MetadataMapping, Schema from singer_sdk.exceptions import ConfigValidationError +from singer_sdk.helpers._compat import SingerSDKDeprecationWarning from singer_sdk.helpers._util import dump_json, load_json from singer_sdk.helpers.capabilities import TargetLoadMethods @@ -848,7 +849,12 @@ def get_schema_names( # noqa: PLR6301 """ return inspected.get_schema_names() - def get_object_names( + @deprecated( + "This method is deprecated.", + category=SingerSDKDeprecationWarning, + stacklevel=1, + ) + def get_object_names( # pragma: no cover self, engine: Engine, # noqa: ARG002 inspected: Inspector,