Skip to content

Commit

Permalink
[dagster-fivetran] add relation identifier metadata to fivetran assets (
Browse files Browse the repository at this point in the history
#23672)

## Summary

Pull `database` config from Fivetran destinations when loading assets
from a live Fivetran instance. Join this with known schema and table
info to produce a full `dagster/relation_identifier`.

<img width="425" alt="Screenshot 2024-08-15 at 1 14 14 PM"
src="https://github.com/user-attachments/assets/c8c44e50-4ab7-4cad-8c23-4ffdfa4f4fcb">

## Test Plan

Unit test, tested with live Fivetran instance.
  • Loading branch information
benpankow authored Aug 15, 2024
1 parent 606cf70 commit d581365
Show file tree
Hide file tree
Showing 5 changed files with 64 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,7 @@ class FivetranConnectionMetadata(
("connector_id", str),
("connector_url", str),
("schemas", Mapping[str, Any]),
("database", Optional[str]),
],
)
):
Expand All @@ -253,7 +254,11 @@ def build_asset_defn_metadata(
if table["enabled"]:
table_name = table["name_in_destination"]
schema_table_meta[f"{schema_name}.{table_name}"] = metadata_for_table(
table, self.connector_url
table,
self.connector_url,
database=self.database,
schema=schema_name,
table=table_name,
)
else:
schema_table_meta[self.name] = {}
Expand Down Expand Up @@ -369,6 +374,9 @@ def _get_connectors(self) -> Sequence[FivetranConnectionMetadata]:
for group in groups:
group_id = group["id"]

group_details = self._fivetran_instance.get_destination_details(group_id)
database = group_details.get("config", {}).get("database")

connectors = self._fivetran_instance.make_request(
"GET", f"groups/{group_id}/connectors"
)["items"]
Expand All @@ -393,6 +401,7 @@ def _get_connectors(self) -> Sequence[FivetranConnectionMetadata]:
connector_id=connector_id,
connector_url=connector_url,
schemas=schemas,
database=database,
)
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -398,6 +398,10 @@ def resync_and_poll(
)
return FivetranOutput(connector_details=final_details, schema_config=schema_config)

def get_destination_details(self, destination_id: str) -> Mapping[str, Any]:
"""Fetches details about a given destination from the Fivetran API."""
return self.make_request("GET", f"destinations/{destination_id}")


@dagster_maintained_resource
@resource(config_schema=FivetranResource.to_config_schema())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import dagster._check as check
from dagster import AssetMaterialization, MetadataValue
from dagster._core.definitions.metadata import RawMetadataMapping
from dagster._core.definitions.metadata.metadata_set import TableMetadataSet
from dagster._core.definitions.metadata.table import TableColumn, TableSchema

from dagster_fivetran.types import FivetranOutput
Expand All @@ -19,9 +20,16 @@ def get_fivetran_logs_url(connector_details: Mapping[str, Any]) -> str:


def metadata_for_table(
table_data: Mapping[str, Any], connector_url: str, include_column_info: bool = False
table_data: Mapping[str, Any],
connector_url: str,
database: Optional[str],
schema: Optional[str],
table: Optional[str],
include_column_info: bool = False,
) -> RawMetadataMapping:
metadata: Dict[str, MetadataValue] = {"connector_url": MetadataValue.url(connector_url)}
column_schema = None
relation_identifier = None
if table_data.get("columns"):
columns = check.dict_elem(table_data, "columns")
table_columns = sorted(
Expand All @@ -32,10 +40,18 @@ def metadata_for_table(
],
key=lambda col: col.name,
)
metadata["table_schema"] = MetadataValue.table_schema(TableSchema(table_columns))
column_schema = TableSchema(columns=table_columns)

if include_column_info:
metadata["column_info"] = MetadataValue.json(columns)

if database and schema and table:
relation_identifier = ".".join([database, schema, table])
metadata = {
**TableMetadataSet(column_schema=column_schema, relation_identifier=relation_identifier),
**metadata,
}

return metadata


Expand All @@ -57,6 +73,9 @@ def _table_data_to_materialization(
table_data,
get_fivetran_connector_url(fivetran_output.connector_details),
include_column_info=True,
database=None,
schema=schema_name,
table=table_name,
),
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
io_manager,
)
from dagster._core.definitions.materialize import materialize
from dagster._core.definitions.metadata import MetadataValue
from dagster._core.definitions.metadata.table import TableColumn, TableSchema
from dagster._core.execution.with_resources import with_resources
from dagster._core.instance_for_test import environ
Expand All @@ -32,6 +31,7 @@
get_sample_connector_response,
get_sample_connectors_response,
get_sample_connectors_response_multiple,
get_sample_destination_details_response,
get_sample_groups_response,
get_sample_sync_response,
get_sample_update_response,
Expand All @@ -56,7 +56,7 @@ def test_load_from_instance(
connector_to_asset_key_fn,
multiple_connectors,
destination_ids,
):
) -> None:
with environ({"FIVETRAN_API_KEY": "some_key", "FIVETRAN_API_SECRET": "some_secret"}):
load_calls = []

Expand Down Expand Up @@ -88,6 +88,13 @@ def load_input(self, context: InputContext) -> Any:
status=200,
match=[matchers.header_matcher(expected_auth_header)],
)
rsps.add(
method=rsps.GET,
url=ft_resource.api_base_url + "destinations/some_group",
json=(get_sample_destination_details_response()),
status=200,
match=[matchers.header_matcher(expected_auth_header)],
)
rsps.add(
method=rsps.GET,
url=ft_resource.api_base_url + "groups/some_group/connectors",
Expand Down Expand Up @@ -148,7 +155,9 @@ def load_input(self, context: InputContext) -> Any:
if connector_to_asset_key_fn:
tables = {
connector_to_asset_key_fn(
FivetranConnectionMetadata("some_service.some_name", "", "=", []),
FivetranConnectionMetadata(
"some_service.some_name", "", "=", {}, database="example_database"
),
".".join(t.path),
)
for t in tables
Expand All @@ -157,7 +166,9 @@ def load_input(self, context: InputContext) -> Any:
# Set up a downstream asset to consume the xyz output table
xyz_asset_key = (
connector_to_asset_key_fn(
FivetranConnectionMetadata("some_service.some_name", "", "=", []),
FivetranConnectionMetadata(
"some_service.some_name", "", "=", {}, database="example_database"
),
"abc.xyz",
)
if connector_to_asset_key_fn
Expand All @@ -168,7 +179,7 @@ def load_input(self, context: InputContext) -> Any:
def downstream_asset(xyz):
return

all_assets = [downstream_asset] + ft_assets
all_assets = [downstream_asset] + ft_assets # type: ignore

if destination_ids:
# if destination_ids is truthy then we should skip the API call to get groups
Expand All @@ -178,8 +189,8 @@ def downstream_asset(xyz):

# Check schema metadata is added correctly to asset def
assert any(
out.metadata.get("table_schema")
== MetadataValue.table_schema(
metadata.get("dagster/column_schema")
== (
TableSchema(
columns=[
TableColumn(name="column_1", type="any"),
Expand All @@ -188,8 +199,12 @@ def downstream_asset(xyz):
]
)
)
for out in ft_assets[0].node_def.output_defs
for key, metadata in ft_assets[0].metadata_by_key.items()
)
for key, metadata in ft_assets[0].metadata_by_key.items():
assert metadata.get("dagster/relation_identifier") == (
"example_database." + ".".join(key.path[-2:])
)

assert ft_assets[0].keys == tables
assert all(
Expand Down Expand Up @@ -242,7 +257,8 @@ def downstream_asset(xyz):
]
assert len(asset_materializations) == 3
asset_keys = set(
mat.event_specific_data.materialization.asset_key for mat in asset_materializations
mat.event_specific_data.materialization.asset_key # type: ignore
for mat in asset_materializations
)
assert asset_keys == tables

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -240,3 +240,7 @@ def get_sample_connectors_response_multiple():
},
]
}


def get_sample_destination_details_response():
return {"data": {"config": {"database": "example_database"}}}

0 comments on commit d581365

Please sign in to comment.