Skip to content

Commit

Permalink
Move fetch_column_metadata to FivetranEventIterator
Browse files Browse the repository at this point in the history
  • Loading branch information
maximearmstrong committed Dec 12, 2024
1 parent 70dd373 commit 2c5543b
Show file tree
Hide file tree
Showing 2 changed files with 73 additions and 42 deletions.
Original file line number Diff line number Diff line change
@@ -1,6 +1,15 @@
from typing import TYPE_CHECKING, Generic, Iterator, Union

from dagster import AssetMaterialization, MaterializeResult
from dagster import (
AssetExecutionContext,
AssetMaterialization,
MaterializeResult,
OpExecutionContext,
_check as check,
)
from dagster._annotations import experimental, public
from dagster._core.definitions.metadata.metadata_set import TableMetadataSet
from dagster._core.definitions.metadata.table import TableColumn, TableSchema
from typing_extensions import TypeVar

if TYPE_CHECKING:
Expand All @@ -11,6 +20,48 @@
T = TypeVar("T", bound=FivetranEventType)


def fetch_column_metadata(
fivetran_workspace: "FivetranWorkspace",
connector_id: str,
materialization: Union[AssetMaterialization, MaterializeResult],
) -> AssetMaterialization:
"""Subroutine to fetch column metadata for a given table from the Fivetran API and attach it to the
materialization.
"""
schema_source_name = materialization.metadata["schema_source_name"].value
table_source_name = materialization.metadata["table_source_name"].value
client = fivetran_workspace.get_client()

try:
table_conn_data = client.get_columns_for_table(
connector_id=connector_id,
schema_name=schema_source_name,
table_name=table_source_name,
)
columns = check.dict_elem(table_conn_data, "columns")
table_columns = sorted(
[
TableColumn(name=col["name_in_destination"], type="")
for col in columns.values()
if "name_in_destination" in col and col.get("enabled")
],
key=lambda col: col.name,
)
return materialization.with_metadata(
{
**materialization.metadata,
**TableMetadataSet(column_schema=TableSchema(table_columns)),
}
)
except Exception as e:
client._log.warning( # noqa
"An error occurred while fetching column metadata for table %s",
f"Exception: {e}",
exc_info=True,
)
return materialization


class FivetranEventIterator(Generic[T], Iterator):
"""A wrapper around an iterator of Fivetran events which contains additional methods for
post-processing the events, such as fetching column metadata.
Expand All @@ -20,12 +71,32 @@ def __init__(
self,
events: Iterator[T],
fivetran_workspace: "FivetranWorkspace",
context: Union[OpExecutionContext, AssetExecutionContext],
) -> None:
self._inner_iterator = events
self._fivetran_workspace = fivetran_workspace
self._context = context

def __next__(self) -> T:
return next(self._inner_iterator)

def __iter__(self) -> "FivetranEventIterator[T]":
return self

@experimental
@public
def fetch_column_metadata(self) -> "FivetranEventIterator":
"""Fetches column metadata for each table synced by the Sling CLI.
Retrieves the column schema and lineage for each target table.
Returns:
SlingEventIterator: An iterator of Dagster events with column metadata attached.
"""

def _fetch_column_metadata() -> Iterator[T]:
raise NotImplementedError()

return FivetranEventIterator[T](
_fetch_column_metadata(), self._fivetran_workspace, self._context
)
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,6 @@
from dagster._config.pythonic_config import ConfigurableResource
from dagster._core.definitions.asset_spec import AssetSpec
from dagster._core.definitions.definitions_load_context import StateBackedDefinitionsLoader
from dagster._core.definitions.metadata.metadata_set import TableMetadataSet
from dagster._core.definitions.metadata.table import TableColumn, TableSchema
from dagster._core.definitions.resource_definition import dagster_maintained_resource
from dagster._record import as_dict, record
from dagster._utils.cached_method import cached_method
Expand Down Expand Up @@ -1029,7 +1027,7 @@ def sync_and_poll(
or AssetMaterialization.
"""
return FivetranEventIterator(
events=self._sync_and_poll(context=context), fivetran_workspace=self
events=self._sync_and_poll(context=context), fivetran_workspace=self, context=context
)

def _sync_and_poll(self, context: Union[OpExecutionContext, AssetExecutionContext]):
Expand Down Expand Up @@ -1067,44 +1065,6 @@ def _sync_and_poll(self, context: Union[OpExecutionContext, AssetExecutionContex
if unmaterialized_asset_keys:
context.log.warning(f"Assets were not materialized: {unmaterialized_asset_keys}")

def _fetch_and_attach_col_metadata(
self, connector_id: str, materialization: AssetMaterialization
) -> AssetMaterialization:
"""Subroutine to fetch column metadata for a given table from the Fivetran API and attach it to the
materialization.
"""
try:
schema_source_name = materialization.metadata["schema_source_name"].value
table_source_name = materialization.metadata["table_source_name"].value

table_conn_data = self.get_client().get_columns_for_table(
connector_id=connector_id,
schema_name=schema_source_name,
table_name=table_source_name,
)
columns = check.dict_elem(table_conn_data, "columns")
table_columns = sorted(
[
TableColumn(name=col["name_in_destination"], type="")
for col in columns.values()
if "name_in_destination" in col and col.get("enabled")
],
key=lambda col: col.name,
)
return materialization.with_metadata(
{
**materialization.metadata,
**TableMetadataSet(column_schema=TableSchema(table_columns)),
}
)
except Exception as e:
self._log.warning(
"An error occurred while fetching column metadata for table %s",
f"Exception: {e}",
exc_info=True,
)
return materialization


@experimental
def load_fivetran_asset_specs(
Expand Down

0 comments on commit 2c5543b

Please sign in to comment.