Skip to content

Commit

Permalink
Update fetch_column_metadata; add test
Browse files Browse the repository at this point in the history
  • Loading branch information
maximearmstrong committed Dec 12, 2024
1 parent 1c98a76 commit 741a7d2
Show file tree
Hide file tree
Showing 6 changed files with 261 additions and 57 deletions.
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from typing import TYPE_CHECKING, Iterator, Union
from concurrent.futures import ThreadPoolExecutor
from typing import TYPE_CHECKING, Any, Callable, Dict, Iterator, Union

from dagster import (
AssetExecutionContext,
Expand All @@ -9,57 +10,58 @@
)
from dagster._annotations import experimental, public
from dagster._core.definitions.metadata.metadata_set import TableMetadataSet
from dagster._core.definitions.metadata.table import TableColumn, TableSchema
from dagster._core.utils import imap
from typing_extensions import TypeVar

from dagster_fivetran.translator import FivetranMetadataSet
from dagster_fivetran.utils import get_column_schema_for_columns, get_fivetran_connector_table_name

if TYPE_CHECKING:
from dagster_fivetran.resources import FivetranWorkspace


FivetranEventType = Union[AssetMaterialization, MaterializeResult]
T = TypeVar("T", bound=FivetranEventType)

DEFAULT_MAX_THREADPOOL_WORKERS = 10

def fetch_column_metadata(

def _fetch_column_metadata(
materialization: FivetranEventType,
fivetran_workspace: "FivetranWorkspace",
connector_id: str,
materialization: Union[AssetMaterialization, MaterializeResult],
) -> AssetMaterialization:
"""Subroutine to fetch column metadata for a given table from the Fivetran API and attach it to the
materialization.
"""
schema_source_name = materialization.metadata["schema_source_name"].value
table_source_name = materialization.metadata["table_source_name"].value
) -> Dict[str, Any]:
"""Subroutine to fetch column metadata for a given table from the Fivetran API."""
materialization_metadata = check.not_none(materialization.metadata)
connector_id = check.not_none(
FivetranMetadataSet.extract(materialization_metadata).connector_id
)
schema_name = check.not_none(
FivetranMetadataSet.extract(materialization_metadata).destination_schema_name
)
table_name = check.not_none(
FivetranMetadataSet.extract(materialization_metadata).destination_table_name
)

client = fivetran_workspace.get_client()

metadata = {}
try:
table_conn_data = client.get_columns_for_table(
table_conn_data = client.get_columns_config_for_table(
connector_id=connector_id,
schema_name=schema_source_name,
table_name=table_source_name,
schema_name=schema_name,
table_name=table_name,
)

columns = check.dict_elem(table_conn_data, "columns")
table_columns = sorted(
[
TableColumn(name=col["name_in_destination"], type="")
for col in columns.values()
if "name_in_destination" in col and col.get("enabled")
],
key=lambda col: col.name,
)
return materialization.with_metadata(
{
**materialization.metadata,
**TableMetadataSet(column_schema=TableSchema(table_columns)),
}
)
metadata = {**TableMetadataSet(column_schema=get_column_schema_for_columns(columns))}
except Exception as e:
client._log.warning( # noqa
"An error occurred while fetching column metadata for table %s",
f"An error occurred while fetching column metadata for table "
f"{get_fivetran_connector_table_name(schema_name=schema_name, table_name=table_name)}."
"Column metadata will not be included in the event.\n\n"
f"Exception: {e}",
exc_info=True,
)
return materialization
return metadata


class FivetranEventIterator(Iterator[T]):
Expand All @@ -86,17 +88,62 @@ def __iter__(self) -> "FivetranEventIterator[T]":
@experimental
@public
def fetch_column_metadata(self) -> "FivetranEventIterator":
"""Fetches column metadata for each table synced by the Sling CLI.
"""Fetches column metadata for each table synced with the Fivetran API.
Retrieves the column schema and lineage for each target table.
Retrieves the column schema for each destination table.
Returns:
SlingEventIterator: An iterator of Dagster events with column metadata attached.
FivetranEventIterator: An iterator of Dagster events with column metadata attached.
"""
fetch_metadata_fn: Callable[
[FivetranEventType],
Dict[str, Any],
] = lambda materialization: _fetch_column_metadata(
materialization=materialization,
fivetran_workspace=self._fivetran_workspace,
)

return self._attach_metadata(fetch_metadata_fn)

def _attach_metadata(
self,
fn: Callable[[FivetranEventType], Dict[str, Any]],
) -> "FivetranEventIterator":
"""Runs a threaded task to attach metadata to each event in the iterator.
def _fetch_column_metadata() -> Iterator[T]:
raise NotImplementedError()
Args:
fn (Callable[[Union[AssetMaterialization, MaterializeResult]], Dict[str, Any]]):
A function which takes a FivetranEventType and returns
a dictionary of metadata to attach to the event.
Returns:
Iterator[Union[AssetMaterialization, MaterializeResult]]:
A set of corresponding Dagster events for Fivetran tables, with any metadata output
by the function attached, yielded in the order they are emitted by the Fivetran API.
"""

return FivetranEventIterator[T](
_fetch_column_metadata(), self._fivetran_workspace, self._context
def _map_fn(event: FivetranEventType) -> FivetranEventType:
return event._replace(metadata={**check.is_dict(event.metadata), **fn(event)})

def _threadpool_wrap_map_fn() -> Iterator[FivetranEventType]:
assets_def = self._context.assets_def
connector_id = next(
check.not_none(FivetranMetadataSet.extract(spec.metadata).connector_id)
for spec in assets_def.specs
)

with ThreadPoolExecutor(
max_workers=DEFAULT_MAX_THREADPOOL_WORKERS,
thread_name_prefix=f"fivetran_{connector_id}",
) as executor:
yield from imap(
executor=executor,
iterable=self._inner_iterator,
func=_map_fn,
)

return FivetranEventIterator(
events=_threadpool_wrap_map_fn(),
fivetran_workspace=self._fivetran_workspace,
context=self._context,
)
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import time
from datetime import datetime, timedelta
from functools import partial
from typing import Any, Callable, Iterator, Mapping, Optional, Sequence, Tuple, Union
from typing import Any, Callable, Mapping, Optional, Sequence, Tuple, Union
from urllib.parse import urljoin

import requests
Expand Down Expand Up @@ -60,8 +60,6 @@
FIVETRAN_API_VERSION_PATH = f"{FIVETRAN_API_VERSION}/"
FIVETRAN_CONNECTOR_PATH = f"{FIVETRAN_CONNECTOR_ENDPOINT}/"

DEFAULT_MAX_THREADPOOL_WORKERS = 10

# default polling interval (in seconds)
DEFAULT_POLL_INTERVAL = 10

Expand Down Expand Up @@ -970,11 +968,11 @@ def _generate_materialization(
schema_config_details=fivetran_output.schema_config
)

for schema_source_name, schema in schema_config.schemas.items():
for schema in schema_config.schemas.values():
if not schema.enabled:
continue

for table_source_name, table in schema.tables.items():
for table in schema.tables.values():
if not table.enabled:
continue

Expand Down Expand Up @@ -1007,14 +1005,17 @@ def _generate_materialization(
schema=schema.name_in_destination,
table=table.name_in_destination,
),
"schema_source_name": schema_source_name,
"table_source_name": table_source_name,
**FivetranMetadataSet(
connector_id=connector.id,
destination_schema_name=schema.name_in_destination,
destination_table_name=table.name_in_destination,
),
},
)

def sync_and_poll(
self, context: Union[OpExecutionContext, AssetExecutionContext]
) -> Iterator[Union[AssetMaterialization, MaterializeResult]]:
) -> FivetranEventIterator[Union[AssetMaterialization, MaterializeResult]]:
"""Executes a sync and poll process to materialize Fivetran assets.
Args:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,8 @@ def to_fivetran_connector_table_props_data(self) -> Sequence[FivetranConnectorTa

class FivetranMetadataSet(NamespacedMetadataSet):
connector_id: Optional[str] = None
destination_schema_name: Optional[str] = None
destination_table_name: Optional[str] = None

@classmethod
def namespace(cls) -> str:
Expand Down Expand Up @@ -284,7 +286,14 @@ def get_asset_spec(self, props: FivetranConnectorTableProps) -> AssetSpec:
table=table_name,
)

augmented_metadata = {**metadata, **FivetranMetadataSet(connector_id=props.connector_id)}
augmented_metadata = {
**metadata,
**FivetranMetadataSet(
connector_id=props.connector_id,
destination_schema_name=schema_name,
destination_table_name=table_name,
),
}

return AssetSpec(
key=AssetKey(props.table.split(".")),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,15 +61,7 @@ def metadata_for_table(
table_name = None
if table_data.get("columns"):
columns = check.dict_elem(table_data, "columns")
table_columns = sorted(
[
TableColumn(name=col["name_in_destination"], type="")
for col in columns.values()
if "name_in_destination" in col and col.get("enabled")
],
key=lambda col: col.name,
)
column_schema = TableSchema(columns=table_columns)
column_schema = get_column_schema_for_columns(columns=columns)

if include_column_info:
metadata["column_info"] = MetadataValue.json(columns)
Expand All @@ -84,6 +76,18 @@ def metadata_for_table(
return metadata


def get_column_schema_for_columns(columns: Mapping[str, Any]):
table_columns = sorted(
[
TableColumn(name=col["name_in_destination"], type="")
for col in columns.values()
if "name_in_destination" in col and col.get("enabled")
],
key=lambda col: col.name,
)
return TableSchema(columns=table_columns)


def _table_data_to_materialization(
fivetran_output: FivetranOutput,
asset_key_prefix: Sequence[str],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@

TEST_SCHEMA_NAME = "schema_name_in_destination_1"
TEST_TABLE_NAME = "table_name_in_destination_1"
TEST_SECOND_SCHEMA_NAME = "schema_name_in_destination_2"
TEST_SECOND_TABLE_NAME = "table_name_in_destination_2"
TEST_ANOTHER_TABLE_NAME = "another_table_name_in_destination_1"

# Taken from Fivetran API documentation
Expand Down Expand Up @@ -343,7 +345,7 @@ def get_sample_schema_config_for_connector(table_name: str) -> Mapping[str, Any]
"is_primary_key": True,
},
"property2": {
"name_in_destination": "column_name_in_destination_1",
"name_in_destination": "column_name_in_destination_2",
"enabled": True,
"hashed": False,
"enabled_patch_settings": {
Expand Down
Loading

0 comments on commit 741a7d2

Please sign in to comment.