Skip to content

Commit

Permalink
[dagster-fivetran] Support fetch_column_metadata in FivetranWorkspace…
Browse files Browse the repository at this point in the history
….sync_and_poll
  • Loading branch information
maximearmstrong committed Dec 9, 2024
1 parent 8a0b091 commit 58b5e88
Showing 1 changed file with 84 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import logging
import os
import time
from concurrent.futures import ThreadPoolExecutor
from datetime import datetime, timedelta
from functools import partial
from typing import Any, Callable, Iterator, Mapping, Optional, Sequence, Tuple, Union
Expand All @@ -26,7 +27,10 @@
from dagster._config.pythonic_config import ConfigurableResource
from dagster._core.definitions.asset_spec import AssetSpec
from dagster._core.definitions.definitions_load_context import StateBackedDefinitionsLoader
from dagster._core.definitions.metadata.metadata_set import TableMetadataSet
from dagster._core.definitions.metadata.table import TableColumn, TableSchema
from dagster._core.definitions.resource_definition import dagster_maintained_resource
from dagster._core.utils import imap
from dagster._record import as_dict, record
from dagster._utils.cached_method import cached_method
from dagster._vendored.dateutil import parser
Expand Down Expand Up @@ -59,6 +63,9 @@
FIVETRAN_API_VERSION_PATH = f"{FIVETRAN_API_VERSION}/"
FIVETRAN_CONNECTOR_PATH = f"{FIVETRAN_CONNECTOR_ENDPOINT}/"

DAGSTER_FIVETRAN_FETCH_COLUMN_METADATA_KEY = "dagster-fivetran/fetch_column_metadata"
DEFAULT_MAX_THREADPOOL_WORKERS = 10

# default polling interval (in seconds)
DEFAULT_POLL_INTERVAL = 10

Expand Down Expand Up @@ -1029,6 +1036,9 @@ def sync_and_poll(
assets_def = context.assets_def
dagster_fivetran_translator = get_translator_from_fivetran_assets(assets_def)

# TODO: Add op tags to fivetran_assets decorator and build_fivetran_assets_definitions factory
fetch_column_metadata = context.op.tags.get(DAGSTER_FIVETRAN_FETCH_COLUMN_METADATA_KEY)

connector_id = next(
check.not_none(FivetranMetadataSet.extract(spec.metadata).connector_id)
for spec in assets_def.specs
Expand All @@ -1040,26 +1050,81 @@ def sync_and_poll(
)

materialized_asset_keys = set()
for materialization in self._generate_materialization(
fivetran_output=fivetran_output, dagster_fivetran_translator=dagster_fivetran_translator
):
# Scan through all tables actually created, if it was expected then emit a MaterializeResult.
# Otherwise, emit a runtime AssetMaterialization.
if materialization.asset_key in context.selected_asset_keys:
yield MaterializeResult(
asset_key=materialization.asset_key, metadata=materialization.metadata
)
materialized_asset_keys.add(materialization.asset_key)
else:
context.log.warning(
f"An unexpected asset was materialized: {materialization.asset_key}. "
f"Yielding a materialization event."
)
yield materialization

unmaterialized_asset_keys = context.selected_asset_keys - materialized_asset_keys
if unmaterialized_asset_keys:
context.log.warning(f"Assets were not materialized: {unmaterialized_asset_keys}")
_map_fn: Callable[[AssetMaterialization], AssetMaterialization] = (
lambda materialization: self._fetch_and_attach_col_metadata(
connector_id, materialization
)
if fetch_column_metadata
else materialization
)
with ThreadPoolExecutor(
max_workers=DEFAULT_MAX_THREADPOOL_WORKERS,
thread_name_prefix=f"fivetran_{connector_id}",
) as executor:
for materialization in imap(
executor=executor,
iterable=self._generate_materialization(
fivetran_output=fivetran_output, dagster_fivetran_translator=dagster_fivetran_translator
),
func=_map_fn,
):
# Scan through all tables actually created, if it was expected then emit a MaterializeResult.
# Otherwise, emit a runtime AssetMaterialization.
if materialization.asset_key in context.selected_asset_keys:
yield MaterializeResult(
asset_key=materialization.asset_key,
metadata=materialization.metadata
)
materialized_asset_keys.add(materialization.asset_key)
else:
context.log.warning(
f"An unexpected asset was materialized: {materialization.asset_key}."
f"Yielding a materialization event."
)
yield materialization

unmaterialized_asset_keys = context.selected_asset_keys - materialized_asset_keys
if unmaterialized_asset_keys:
context.log.warning(f"Assets were not materialized: {unmaterialized_asset_keys}")

def _fetch_and_attach_col_metadata(
self, connector_id: str, materialization: AssetMaterialization
) -> AssetMaterialization:
"""Subroutine to fetch column metadata for a given table from the Fivetran API and attach it to the
materialization.
"""
try:
schema_source_name = materialization.metadata["schema_source_name"].value
table_source_name = materialization.metadata["table_source_name"].value

table_conn_data = self.get_client().get_columns_for_table(
connector_id=connector_id,
schema_name=schema_source_name,
table_name=table_source_name,
)
columns = check.dict_elem(table_conn_data, "columns")
table_columns = sorted(
[
TableColumn(name=col["name_in_destination"], type="")
for col in columns.values()
if "name_in_destination" in col and col.get("enabled")
],
key=lambda col: col.name,
)
return materialization.with_metadata(
{
**materialization.metadata,
**TableMetadataSet(column_schema=TableSchema(table_columns)),
}
)
except Exception as e:
self._log.warning(
"An error occurred while fetching column metadata for table %s",
f"Exception: {e}",
exc_info=True,
)
return materialization


@experimental
Expand Down

0 comments on commit 58b5e88

Please sign in to comment.