From 1c98a760b68a7b02ba4050ae5e5c94944a9aae30 Mon Sep 17 00:00:00 2001 From: Maxime Armstrong Date: Tue, 10 Dec 2024 18:14:41 -0500 Subject: [PATCH] Move fetch_column_metadata to FivetranEventIterator --- .../fivetran_event_iterator.py | 73 ++++++++++++++++++- .../dagster_fivetran/resources.py | 42 +---------- 2 files changed, 73 insertions(+), 42 deletions(-) diff --git a/python_modules/libraries/dagster-fivetran/dagster_fivetran/fivetran_event_iterator.py b/python_modules/libraries/dagster-fivetran/dagster_fivetran/fivetran_event_iterator.py index aa24c23cd061f..ae17f37f91273 100644 --- a/python_modules/libraries/dagster-fivetran/dagster_fivetran/fivetran_event_iterator.py +++ b/python_modules/libraries/dagster-fivetran/dagster_fivetran/fivetran_event_iterator.py @@ -1,6 +1,15 @@ from typing import TYPE_CHECKING, Iterator, Union -from dagster import AssetMaterialization, MaterializeResult +from dagster import ( + AssetExecutionContext, + AssetMaterialization, + MaterializeResult, + OpExecutionContext, + _check as check, +) +from dagster._annotations import experimental, public +from dagster._core.definitions.metadata.metadata_set import TableMetadataSet +from dagster._core.definitions.metadata.table import TableColumn, TableSchema from typing_extensions import TypeVar if TYPE_CHECKING: @@ -11,6 +20,48 @@ T = TypeVar("T", bound=FivetranEventType) +def fetch_column_metadata( + fivetran_workspace: "FivetranWorkspace", + connector_id: str, + materialization: Union[AssetMaterialization, MaterializeResult], +) -> AssetMaterialization: + """Subroutine to fetch column metadata for a given table from the Fivetran API and attach it to the + materialization. + """ + schema_source_name = materialization.metadata["schema_source_name"].value + table_source_name = materialization.metadata["table_source_name"].value + client = fivetran_workspace.get_client() + + try: + table_conn_data = client.get_columns_for_table( + connector_id=connector_id, + schema_name=schema_source_name, + table_name=table_source_name, + ) + columns = check.dict_elem(table_conn_data, "columns") + table_columns = sorted( + [ + TableColumn(name=col["name_in_destination"], type="") + for col in columns.values() + if "name_in_destination" in col and col.get("enabled") + ], + key=lambda col: col.name, + ) + return materialization.with_metadata( + { + **materialization.metadata, + **TableMetadataSet(column_schema=TableSchema(table_columns)), + } + ) + except Exception as e: + client._log.warning( # noqa + "An error occurred while fetching column metadata for table %s", + f"Exception: {e}", + exc_info=True, + ) + return materialization + + class FivetranEventIterator(Iterator[T]): """A wrapper around an iterator of Fivetran events which contains additional methods for post-processing the events, such as fetching column metadata. @@ -20,12 +71,32 @@ def __init__( self, events: Iterator[T], fivetran_workspace: "FivetranWorkspace", + context: Union[OpExecutionContext, AssetExecutionContext], ) -> None: self._inner_iterator = events self._fivetran_workspace = fivetran_workspace + self._context = context def __next__(self) -> T: return next(self._inner_iterator) def __iter__(self) -> "FivetranEventIterator[T]": return self + + @experimental + @public + def fetch_column_metadata(self) -> "FivetranEventIterator": + """Fetches column metadata for each table synced by the Sling CLI. + + Retrieves the column schema and lineage for each target table. + + Returns: + SlingEventIterator: An iterator of Dagster events with column metadata attached. + """ + + def _fetch_column_metadata() -> Iterator[T]: + raise NotImplementedError() + + return FivetranEventIterator[T]( + _fetch_column_metadata(), self._fivetran_workspace, self._context + ) diff --git a/python_modules/libraries/dagster-fivetran/dagster_fivetran/resources.py b/python_modules/libraries/dagster-fivetran/dagster_fivetran/resources.py index cf1ff020eca5e..9f8782c857f4c 100644 --- a/python_modules/libraries/dagster-fivetran/dagster_fivetran/resources.py +++ b/python_modules/libraries/dagster-fivetran/dagster_fivetran/resources.py @@ -26,8 +26,6 @@ from dagster._config.pythonic_config import ConfigurableResource from dagster._core.definitions.asset_spec import AssetSpec from dagster._core.definitions.definitions_load_context import StateBackedDefinitionsLoader -from dagster._core.definitions.metadata.metadata_set import TableMetadataSet -from dagster._core.definitions.metadata.table import TableColumn, TableSchema from dagster._core.definitions.resource_definition import dagster_maintained_resource from dagster._record import as_dict, record from dagster._utils.cached_method import cached_method @@ -1029,7 +1027,7 @@ def sync_and_poll( or AssetMaterialization. """ return FivetranEventIterator( - events=self._sync_and_poll(context=context), fivetran_workspace=self + events=self._sync_and_poll(context=context), fivetran_workspace=self, context=context ) def _sync_and_poll(self, context: Union[OpExecutionContext, AssetExecutionContext]): @@ -1067,44 +1065,6 @@ def _sync_and_poll(self, context: Union[OpExecutionContext, AssetExecutionContex if unmaterialized_asset_keys: context.log.warning(f"Assets were not materialized: {unmaterialized_asset_keys}") - def _fetch_and_attach_col_metadata( - self, connector_id: str, materialization: AssetMaterialization - ) -> AssetMaterialization: - """Subroutine to fetch column metadata for a given table from the Fivetran API and attach it to the - materialization. - """ - try: - schema_source_name = materialization.metadata["schema_source_name"].value - table_source_name = materialization.metadata["table_source_name"].value - - table_conn_data = self.get_client().get_columns_for_table( - connector_id=connector_id, - schema_name=schema_source_name, - table_name=table_source_name, - ) - columns = check.dict_elem(table_conn_data, "columns") - table_columns = sorted( - [ - TableColumn(name=col["name_in_destination"], type="") - for col in columns.values() - if "name_in_destination" in col and col.get("enabled") - ], - key=lambda col: col.name, - ) - return materialization.with_metadata( - { - **materialization.metadata, - **TableMetadataSet(column_schema=TableSchema(table_columns)), - } - ) - except Exception as e: - self._log.warning( - "An error occurred while fetching column metadata for table %s", - f"Exception: {e}", - exc_info=True, - ) - return materialization - @experimental def load_fivetran_asset_specs(