Skip to content

Commit

Permalink
Extend existing API
Browse files Browse the repository at this point in the history
  • Loading branch information
edgarrmondragon committed Dec 3, 2024
1 parent a567254 commit bea370d
Showing 1 changed file with 37 additions and 85 deletions.
122 changes: 37 additions & 85 deletions singer_sdk/connectors/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -879,11 +879,14 @@ def discover_catalog_entry(
self,
engine: Engine, # noqa: ARG002
inspected: Inspector,
schema_name: str,
schema_name: str | None,
table_name: str,
is_view: bool, # noqa: FBT001
*,
reflect_indices: bool = True,
reflected_columns: list[reflection.ReflectedColumn] | None = None,
reflected_pk: reflection.ReflectedPrimaryKeyConstraint | None = None,
reflected_indices: list[reflection.ReflectedIndex] | None = None,
) -> CatalogEntry:
"""Create `CatalogEntry` object for the given table or a view.
Expand All @@ -894,104 +897,45 @@ def discover_catalog_entry(
table_name: Name of the table or a view
is_view: Flag whether this object is a view, returned by `get_object_names`
reflect_indices: Whether to reflect indices
reflected_columns: List of reflected columns
reflected_pk: Reflected primary key
reflected_indices: List of reflected indices
Returns:
`CatalogEntry` object for the given table or a view
"""
# Initialize unique stream name
unique_stream_id = f"{schema_name}-{table_name}"
unique_stream_id = f"{schema_name}-{table_name}" if schema_name else table_name

# Detect key properties
possible_primary_keys: list[list[str]] = []
pk_def = inspected.get_pk_constraint(table_name, schema=schema_name)
pk_def = reflected_pk or inspected.get_pk_constraint(
table_name,
schema=schema_name,
)
if pk_def and "constrained_columns" in pk_def: # type: ignore[redundant-expr]
possible_primary_keys.append(pk_def["constrained_columns"])

# An element of the columns list is ``None`` if it's an expression and is
# returned in the ``expressions`` list of the reflected index.
if reflect_indices:
indexes = reflected_indices or inspected.get_indexes(
table_name,
schema=schema_name,
)
possible_primary_keys.extend(
index_def["column_names"] # type: ignore[misc]
for index_def in inspected.get_indexes(table_name, schema=schema_name)
for index_def in indexes
if index_def.get("unique", False)
)

key_properties = next(iter(possible_primary_keys), None)

# Initialize columns list
table_schema = th.PropertiesList()
for column_def in inspected.get_columns(table_name, schema=schema_name):
column_name = column_def["name"]
is_nullable = column_def.get("nullable", False)
jsonschema_type: dict = self.to_jsonschema_type(column_def["type"])
table_schema.append(
th.Property(
name=column_name,
wrapped=th.CustomType(jsonschema_type),
nullable=is_nullable,
required=column_name in key_properties if key_properties else False,
),
)
schema = table_schema.to_dict()

# Initialize available replication methods
addl_replication_methods: list[str] = [""] # By default an empty list.
# Notes regarding replication methods:
# - 'INCREMENTAL' replication must be enabled by the user by specifying
# a replication_key value.
# - 'LOG_BASED' replication must be enabled by the developer, according
# to source-specific implementation capabilities.
replication_method = next(reversed(["FULL_TABLE", *addl_replication_methods]))

# Create the catalog entry object
return CatalogEntry(
tap_stream_id=unique_stream_id,
stream=unique_stream_id,
table=table_name,
key_properties=key_properties,
schema=Schema.from_dict(schema),
is_view=is_view,
replication_method=replication_method,
metadata=MetadataMapping.get_standard_metadata(
schema_name=schema_name,
schema=schema,
replication_method=replication_method,
key_properties=key_properties,
valid_replication_keys=None, # Must be defined by user
),
database=None, # Expects single-database context
row_count=None,
stream_alias=None,
replication_key=None, # Must be defined by user
)

def _discover_catalog_entry_from_inspected(
self,
*,
table_name: str,
schema_name: str | None,
columns: list[reflection.ReflectedColumn],
primary_key: reflection.ReflectedPrimaryKeyConstraint | None,
indices: list[reflection.ReflectedIndex],
is_view: bool = False,
) -> CatalogEntry:
unique_stream_id = f"{schema_name}-{table_name}" if schema_name else table_name

# Detect key properties
possible_primary_keys: list[list[str]] = []
if primary_key and "constrained_columns" in primary_key:
possible_primary_keys.append(primary_key["constrained_columns"])

# Get key property candidates from indices
possible_primary_keys.extend(
index_def["column_names"] # type: ignore[misc]
for index_def in indices
if index_def.get("unique", False)
)

key_properties = next(iter(possible_primary_keys), [])

# Initialize columns list
columns = reflected_columns or inspected.get_columns(
table_name,
schema=schema_name,
)
properties = [
th.Property(
name=column["name"],
Expand All @@ -1005,7 +949,12 @@ def _discover_catalog_entry_from_inspected(
schema = th.PropertiesList(*properties).to_dict()

# Initialize available replication methods
addl_replication_methods: list[str] = [] # By default an empty list.
addl_replication_methods: list[str] = [""] # By default an empty list.
# Notes regarding replication methods:
# - 'INCREMENTAL' replication must be enabled by the user by specifying
# a replication_key value.
# - 'LOG_BASED' replication must be enabled by the developer, according
# to source-specific implementation capabilities.
replication_method = next(reversed(["FULL_TABLE", *addl_replication_methods]))

# Create the catalog entry object
Expand Down Expand Up @@ -1071,13 +1020,16 @@ def discover_catalog_entries(
)

result.extend(
self._discover_catalog_entry_from_inspected(
table_name=table,
schema_name=schema,
columns=columns[schema, table],
primary_key=primary_keys.get((schema, table)),
indices=indices.get((schema, table), []),
is_view=is_view,
self.discover_catalog_entry(
engine,
inspected,
schema_name,
table,
is_view,
reflect_indices=reflect_indices,
reflected_columns=columns[schema, table],
reflected_pk=primary_keys.get((schema, table)),
reflected_indices=indices.get((schema, table)),
).to_dict()
for schema, table in columns
)
Expand Down

0 comments on commit bea370d

Please sign in to comment.