Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf(taps): Improved discovery performance for SQL taps #2793

Merged
merged 8 commits into from
Dec 3, 2024
22 changes: 0 additions & 22 deletions samples/sample_tap_bigquery/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,28 +13,6 @@ def get_sqlalchemy_url(self, config: dict) -> str: # noqa: PLR6301
"""Concatenate a SQLAlchemy URL for use in connecting to the source."""
return f"bigquery://{config['project_id']}"

def get_object_names(
self,
engine,
inspected,
schema_name: str,
) -> list[tuple[str, bool]]:
"""Return discoverable object names."""
# Bigquery inspections returns table names in the form
# `schema_name.table_name` which later results in the project name
# override due to specifics in behavior of sqlalchemy-bigquery
#
# Let's strip `schema_name` prefix on the inspection

return [
(table_name.split(".")[-1], is_view)
for (table_name, is_view) in super().get_object_names(
engine,
inspected,
schema_name,
)
]


class BigQueryStream(SQLStream):
"""Stream class for BigQuery streams."""
Expand Down
110 changes: 68 additions & 42 deletions singer_sdk/connectors/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,12 @@
from functools import lru_cache

import sqlalchemy as sa
from sqlalchemy.engine import reflection

from singer_sdk import typing as th
from singer_sdk._singerlib import CatalogEntry, MetadataMapping, Schema
from singer_sdk.exceptions import ConfigValidationError
from singer_sdk.helpers._compat import SingerSDKDeprecationWarning
from singer_sdk.helpers._util import dump_json, load_json
from singer_sdk.helpers.capabilities import TargetLoadMethods

Expand Down Expand Up @@ -847,7 +849,12 @@
"""
return inspected.get_schema_names()

def get_object_names(
@deprecated(
"This method is deprecated.",
category=SingerSDKDeprecationWarning,
stacklevel=1,
)
def get_object_names( # pragma: no cover
self,
engine: Engine, # noqa: ARG002
inspected: Inspector,
Expand All @@ -874,15 +881,17 @@
return [(t, False) for t in table_names] + [(v, True) for v in view_names]

# TODO maybe should be splitted into smaller parts?
def discover_catalog_entry(

Check warning on line 884 in singer_sdk/connectors/sql.py

View workflow job for this annotation

GitHub Actions / Check API Changes

SQLConnector.discover_catalog_entry(reflect_indices)

Parameter was removed: `` -> ``
self,
engine: Engine, # noqa: ARG002
inspected: Inspector,
schema_name: str,
inspected: Inspector, # noqa: ARG002
schema_name: str | None,
table_name: str,
is_view: bool, # noqa: FBT001
*,
reflect_indices: bool = True,
reflected_columns: list[reflection.ReflectedColumn] | None = None,
reflected_pk: reflection.ReflectedPrimaryKeyConstraint | None = None,
reflected_indices: list[reflection.ReflectedIndex] | None = None,
) -> CatalogEntry:
"""Create `CatalogEntry` object for the given table or a view.

Expand All @@ -893,45 +902,47 @@
table_name: Name of the table or a view
is_view: Flag whether this object is a view, returned by `get_object_names`
reflect_indices: Whether to reflect indices
reflected_columns: List of reflected columns
reflected_pk: Reflected primary key
reflected_indices: List of reflected indices

Returns:
`CatalogEntry` object for the given table or a view
"""
# Initialize unique stream name
unique_stream_id = f"{schema_name}-{table_name}"
unique_stream_id = f"{schema_name}-{table_name}" if schema_name else table_name

# Backwards-compatibility
reflected_columns = reflected_columns or []
reflected_indices = reflected_indices or []

# Detect key properties
possible_primary_keys: list[list[str]] = []
pk_def = inspected.get_pk_constraint(table_name, schema=schema_name)
if pk_def and "constrained_columns" in pk_def: # type: ignore[redundant-expr]
possible_primary_keys.append(pk_def["constrained_columns"])
if reflected_pk and "constrained_columns" in reflected_pk:
possible_primary_keys.append(reflected_pk["constrained_columns"])

# An element of the columns list is ``None`` if it's an expression and is
# returned in the ``expressions`` list of the reflected index.
if reflect_indices:
possible_primary_keys.extend(
index_def["column_names"] # type: ignore[misc]
for index_def in inspected.get_indexes(table_name, schema=schema_name)
if index_def.get("unique", False)
)
possible_primary_keys.extend(
index_def["column_names"] # type: ignore[misc]
for index_def in reflected_indices
if index_def.get("unique", False)
)

key_properties = next(iter(possible_primary_keys), None)
key_properties = next(iter(possible_primary_keys), [])

# Initialize columns list
table_schema = th.PropertiesList()
for column_def in inspected.get_columns(table_name, schema=schema_name):
column_name = column_def["name"]
is_nullable = column_def.get("nullable", False)
jsonschema_type: dict = self.to_jsonschema_type(column_def["type"])
table_schema.append(
th.Property(
name=column_name,
wrapped=th.CustomType(jsonschema_type),
nullable=is_nullable,
required=column_name in key_properties if key_properties else False,
),
properties = [
th.Property(
name=column["name"],
wrapped=th.CustomType(self.to_jsonschema_type(column["type"])),
nullable=column.get("nullable", False),
required=column["name"] in key_properties,
description=column.get("comment"),
)
schema = table_schema.to_dict()
for column in reflected_columns
]
schema = th.PropertiesList(*properties).to_dict()

# Initialize available replication methods
addl_replication_methods: list[str] = [""] # By default an empty list.
Expand Down Expand Up @@ -983,25 +994,40 @@
result: list[dict] = []
engine = self._engine
inspected = sa.inspect(engine)
object_kinds = (
(reflection.ObjectKind.TABLE, False),
(reflection.ObjectKind.ANY_VIEW, True),
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The regression in MeltanoLabs/tap-postgres#528 was probably caused by ignoring views in the updated discovery process.

See MeltanoLabs/tap-postgres#535 (comment).

)
for schema_name in self.get_schema_names(engine, inspected):
if schema_name in exclude_schemas:
continue

# Iterate through each table and view
for table_name, is_view in self.get_object_names(
engine,
inspected,
schema_name,
):
catalog_entry = self.discover_catalog_entry(
engine,
inspected,
schema_name,
table_name,
is_view,
reflect_indices=reflect_indices,
primary_keys = inspected.get_multi_pk_constraint(schema=schema_name)

if reflect_indices:
indices = inspected.get_multi_indexes(schema=schema_name)
else:
indices = {}

for object_kind, is_view in object_kinds:
columns = inspected.get_multi_columns(
schema=schema_name,
kind=object_kind,
)

result.extend(
self.discover_catalog_entry(
engine,
inspected,
schema_name,
table,
is_view,
reflected_columns=columns[schema, table],
reflected_pk=primary_keys.get((schema, table)),
reflected_indices=indices.get((schema, table), []),
).to_dict()
for schema, table in columns
)
result.append(catalog_entry.to_dict())

return result

Expand Down
Loading