diff --git a/python_modules/libraries/dagster-fivetran/dagster_fivetran/resources.py b/python_modules/libraries/dagster-fivetran/dagster_fivetran/resources.py index c519248e752d9..b341f33a9b9f9 100644 --- a/python_modules/libraries/dagster-fivetran/dagster_fivetran/resources.py +++ b/python_modules/libraries/dagster-fivetran/dagster_fivetran/resources.py @@ -2,6 +2,7 @@ import logging import os import time +from concurrent.futures import ThreadPoolExecutor from datetime import datetime, timedelta from functools import partial from typing import Any, Callable, Iterator, Mapping, Optional, Sequence, Tuple, Union @@ -26,7 +27,10 @@ from dagster._config.pythonic_config import ConfigurableResource from dagster._core.definitions.asset_spec import AssetSpec from dagster._core.definitions.definitions_load_context import StateBackedDefinitionsLoader +from dagster._core.definitions.metadata.metadata_set import TableMetadataSet +from dagster._core.definitions.metadata.table import TableColumn, TableSchema from dagster._core.definitions.resource_definition import dagster_maintained_resource +from dagster._core.utils import imap from dagster._record import as_dict, record from dagster._utils.cached_method import cached_method from dagster._vendored.dateutil import parser @@ -59,6 +63,9 @@ FIVETRAN_API_VERSION_PATH = f"{FIVETRAN_API_VERSION}/" FIVETRAN_CONNECTOR_PATH = f"{FIVETRAN_CONNECTOR_ENDPOINT}/" +DAGSTER_FIVETRAN_FETCH_COLUMN_METADATA_KEY = "dagster-fivetran/fetch_column_metadata" +DEFAULT_MAX_THREADPOOL_WORKERS = 10 + # default polling interval (in seconds) DEFAULT_POLL_INTERVAL = 10 @@ -1029,6 +1036,9 @@ def sync_and_poll( assets_def = context.assets_def dagster_fivetran_translator = get_translator_from_fivetran_assets(assets_def) + # TODO: Add op tags to fivetran_assets decorator and build_fivetran_assets_definitions factory + fetch_column_metadata = context.op.tags.get(DAGSTER_FIVETRAN_FETCH_COLUMN_METADATA_KEY) + connector_id = next( check.not_none(FivetranMetadataSet.extract(spec.metadata).connector_id) for spec in assets_def.specs @@ -1040,26 +1050,81 @@ def sync_and_poll( ) materialized_asset_keys = set() - for materialization in self._generate_materialization( - fivetran_output=fivetran_output, dagster_fivetran_translator=dagster_fivetran_translator - ): - # Scan through all tables actually created, if it was expected then emit a MaterializeResult. - # Otherwise, emit a runtime AssetMaterialization. - if materialization.asset_key in context.selected_asset_keys: - yield MaterializeResult( - asset_key=materialization.asset_key, metadata=materialization.metadata - ) - materialized_asset_keys.add(materialization.asset_key) - else: - context.log.warning( - f"An unexpected asset was materialized: {materialization.asset_key}. " - f"Yielding a materialization event." - ) - yield materialization - unmaterialized_asset_keys = context.selected_asset_keys - materialized_asset_keys - if unmaterialized_asset_keys: - context.log.warning(f"Assets were not materialized: {unmaterialized_asset_keys}") + _map_fn: Callable[[AssetMaterialization], AssetMaterialization] = ( + lambda materialization: self._fetch_and_attach_col_metadata( + connector_id, materialization + ) + if fetch_column_metadata + else materialization + ) + with ThreadPoolExecutor( + max_workers=DEFAULT_MAX_THREADPOOL_WORKERS, + thread_name_prefix=f"fivetran_{connector_id}", + ) as executor: + for materialization in imap( + executor=executor, + iterable=self._generate_materialization( + fivetran_output=fivetran_output, dagster_fivetran_translator=dagster_fivetran_translator + ), + func=_map_fn, + ): + # Scan through all tables actually created, if it was expected then emit a MaterializeResult. + # Otherwise, emit a runtime AssetMaterialization. + if materialization.asset_key in context.selected_asset_keys: + yield MaterializeResult( + asset_key=materialization.asset_key, + metadata=materialization.metadata + ) + materialized_asset_keys.add(materialization.asset_key) + else: + context.log.warning( + f"An unexpected asset was materialized: {materialization.asset_key}." + f"Yielding a materialization event." + ) + yield materialization + + unmaterialized_asset_keys = context.selected_asset_keys - materialized_asset_keys + if unmaterialized_asset_keys: + context.log.warning(f"Assets were not materialized: {unmaterialized_asset_keys}") + + def _fetch_and_attach_col_metadata( + self, connector_id: str, materialization: AssetMaterialization + ) -> AssetMaterialization: + """Subroutine to fetch column metadata for a given table from the Fivetran API and attach it to the + materialization. + """ + try: + schema_source_name = materialization.metadata["schema_source_name"].value + table_source_name = materialization.metadata["table_source_name"].value + + table_conn_data = self.get_client().get_columns_for_table( + connector_id=connector_id, + schema_name=schema_source_name, + table_name=table_source_name, + ) + columns = check.dict_elem(table_conn_data, "columns") + table_columns = sorted( + [ + TableColumn(name=col["name_in_destination"], type="") + for col in columns.values() + if "name_in_destination" in col and col.get("enabled") + ], + key=lambda col: col.name, + ) + return materialization.with_metadata( + { + **materialization.metadata, + **TableMetadataSet(column_schema=TableSchema(table_columns)), + } + ) + except Exception as e: + self._log.warning( + "An error occurred while fetching column metadata for table %s", + f"Exception: {e}", + exc_info=True, + ) + return materialization @experimental