diff --git a/singer_sdk/connectors/sql.py b/singer_sdk/connectors/sql.py index 374390b64..e8fd7be7b 100644 --- a/singer_sdk/connectors/sql.py +++ b/singer_sdk/connectors/sql.py @@ -879,11 +879,14 @@ def discover_catalog_entry( self, engine: Engine, # noqa: ARG002 inspected: Inspector, - schema_name: str, + schema_name: str | None, table_name: str, is_view: bool, # noqa: FBT001 *, reflect_indices: bool = True, + reflected_columns: list[reflection.ReflectedColumn] | None = None, + reflected_pk: reflection.ReflectedPrimaryKeyConstraint | None = None, + reflected_indices: list[reflection.ReflectedIndex] | None = None, ) -> CatalogEntry: """Create `CatalogEntry` object for the given table or a view. @@ -894,104 +897,45 @@ def discover_catalog_entry( table_name: Name of the table or a view is_view: Flag whether this object is a view, returned by `get_object_names` reflect_indices: Whether to reflect indices + reflected_columns: List of reflected columns + reflected_pk: Reflected primary key + reflected_indices: List of reflected indices Returns: `CatalogEntry` object for the given table or a view """ # Initialize unique stream name - unique_stream_id = f"{schema_name}-{table_name}" + unique_stream_id = f"{schema_name}-{table_name}" if schema_name else table_name # Detect key properties possible_primary_keys: list[list[str]] = [] - pk_def = inspected.get_pk_constraint(table_name, schema=schema_name) + pk_def = reflected_pk or inspected.get_pk_constraint( + table_name, + schema=schema_name, + ) if pk_def and "constrained_columns" in pk_def: # type: ignore[redundant-expr] possible_primary_keys.append(pk_def["constrained_columns"]) # An element of the columns list is ``None`` if it's an expression and is # returned in the ``expressions`` list of the reflected index. if reflect_indices: + indexes = reflected_indices or inspected.get_indexes( + table_name, + schema=schema_name, + ) possible_primary_keys.extend( index_def["column_names"] # type: ignore[misc] - for index_def in inspected.get_indexes(table_name, schema=schema_name) + for index_def in indexes if index_def.get("unique", False) ) - key_properties = next(iter(possible_primary_keys), None) - - # Initialize columns list - table_schema = th.PropertiesList() - for column_def in inspected.get_columns(table_name, schema=schema_name): - column_name = column_def["name"] - is_nullable = column_def.get("nullable", False) - jsonschema_type: dict = self.to_jsonschema_type(column_def["type"]) - table_schema.append( - th.Property( - name=column_name, - wrapped=th.CustomType(jsonschema_type), - nullable=is_nullable, - required=column_name in key_properties if key_properties else False, - ), - ) - schema = table_schema.to_dict() - - # Initialize available replication methods - addl_replication_methods: list[str] = [""] # By default an empty list. - # Notes regarding replication methods: - # - 'INCREMENTAL' replication must be enabled by the user by specifying - # a replication_key value. - # - 'LOG_BASED' replication must be enabled by the developer, according - # to source-specific implementation capabilities. - replication_method = next(reversed(["FULL_TABLE", *addl_replication_methods])) - - # Create the catalog entry object - return CatalogEntry( - tap_stream_id=unique_stream_id, - stream=unique_stream_id, - table=table_name, - key_properties=key_properties, - schema=Schema.from_dict(schema), - is_view=is_view, - replication_method=replication_method, - metadata=MetadataMapping.get_standard_metadata( - schema_name=schema_name, - schema=schema, - replication_method=replication_method, - key_properties=key_properties, - valid_replication_keys=None, # Must be defined by user - ), - database=None, # Expects single-database context - row_count=None, - stream_alias=None, - replication_key=None, # Must be defined by user - ) - - def _discover_catalog_entry_from_inspected( - self, - *, - table_name: str, - schema_name: str | None, - columns: list[reflection.ReflectedColumn], - primary_key: reflection.ReflectedPrimaryKeyConstraint | None, - indices: list[reflection.ReflectedIndex], - is_view: bool = False, - ) -> CatalogEntry: - unique_stream_id = f"{schema_name}-{table_name}" if schema_name else table_name - - # Detect key properties - possible_primary_keys: list[list[str]] = [] - if primary_key and "constrained_columns" in primary_key: - possible_primary_keys.append(primary_key["constrained_columns"]) - - # Get key property candidates from indices - possible_primary_keys.extend( - index_def["column_names"] # type: ignore[misc] - for index_def in indices - if index_def.get("unique", False) - ) - key_properties = next(iter(possible_primary_keys), []) # Initialize columns list + columns = reflected_columns or inspected.get_columns( + table_name, + schema=schema_name, + ) properties = [ th.Property( name=column["name"], @@ -1005,7 +949,12 @@ def _discover_catalog_entry_from_inspected( schema = th.PropertiesList(*properties).to_dict() # Initialize available replication methods - addl_replication_methods: list[str] = [] # By default an empty list. + addl_replication_methods: list[str] = [""] # By default an empty list. + # Notes regarding replication methods: + # - 'INCREMENTAL' replication must be enabled by the user by specifying + # a replication_key value. + # - 'LOG_BASED' replication must be enabled by the developer, according + # to source-specific implementation capabilities. replication_method = next(reversed(["FULL_TABLE", *addl_replication_methods])) # Create the catalog entry object @@ -1071,13 +1020,16 @@ def discover_catalog_entries( ) result.extend( - self._discover_catalog_entry_from_inspected( - table_name=table, - schema_name=schema, - columns=columns[schema, table], - primary_key=primary_keys.get((schema, table)), - indices=indices.get((schema, table), []), - is_view=is_view, + self.discover_catalog_entry( + engine, + inspected, + schema_name, + table, + is_view, + reflect_indices=reflect_indices, + reflected_columns=columns[schema, table], + reflected_pk=primary_keys.get((schema, table)), + reflected_indices=indices.get((schema, table)), ).to_dict() for schema, table in columns )