Skip to content

Commit

Permalink
Update materialization logic
Browse files Browse the repository at this point in the history
  • Loading branch information
maximearmstrong committed Nov 27, 2024
1 parent c92f0f4 commit 3b478fc
Show file tree
Hide file tree
Showing 3 changed files with 99 additions and 21 deletions.
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
from typing import Any, Callable, Optional, Type
from typing import Any, Callable, Optional

from dagster import AssetsDefinition, multi_asset
from dagster._annotations import experimental

from dagster_fivetran.resources import FivetranWorkspace
from dagster_fivetran.translator import DagsterFivetranTranslator, FivetranMetadataSet
from dagster_fivetran.utils import DAGSTER_FIVETRAN_TRANSLATOR_METADATA_KEY


@experimental
Expand Down Expand Up @@ -105,9 +106,12 @@ def fivetran_connector_assets(context: dg.AssetExecutionContext, fivetran: Fivet
group_name=group_name,
can_subset=True,
specs=[
spec
spec.merge_attributes(
metadata={DAGSTER_FIVETRAN_TRANSLATOR_METADATA_KEY: dagster_fivetran_translator}
)
for spec in workspace.load_asset_specs(
dagster_fivetran_translator=dagster_fivetran_translator or DagsterFivetranTranslator()
dagster_fivetran_translator=dagster_fivetran_translator
or DagsterFivetranTranslator()
)
if FivetranMetadataSet.extract(spec.metadata).connector_id == connector_id
],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import requests
from dagster import (
AssetExecutionContext,
AssetMaterialization,
Definitions,
Failure,
InitResourceContext,
Expand Down Expand Up @@ -37,16 +38,19 @@
DagsterFivetranTranslator,
FivetranConnector,
FivetranConnectorScheduleType,
FivetranConnectorTableProps,
FivetranDestination,
FivetranMetadataSet,
FivetranSchemaConfig,
FivetranWorkspaceData,
)
from dagster_fivetran.types import FivetranOutput
from dagster_fivetran.utils import (
generate_materializations,
get_fivetran_connector_table_name,
get_fivetran_connector_url,
get_fivetran_logs_url,
get_translator_from_fivetran_assets,
metadata_for_table,
)

FIVETRAN_API_BASE = "https://api.fivetran.com"
Expand Down Expand Up @@ -942,6 +946,7 @@ def sync_and_poll(
):
# TODO: Add docstrings
assets_def = context.assets_def
dagster_fivetran_translator = get_translator_from_fivetran_assets(assets_def)

connector_id = next(
check.not_none(FivetranMetadataSet.extract(spec.metadata).connector_id)
Expand All @@ -952,26 +957,70 @@ def sync_and_poll(
fivetran_output = client.sync_and_poll(
connector_id=connector_id,
)
connector = FivetranConnector.from_connector_details(
connector_details=fivetran_output.connector_details
)
schema_config = FivetranSchemaConfig.from_schema_config_details(
schema_config_details=fivetran_output.schema_config
)

materialized_asset_keys = set()
for schema_source_name, schema in schema_config.schemas.items():
if not schema.enabled:
continue

# TODO: Create new asset materialization fn with assets and not asset key prefix
for materialization in generate_materializations(
fivetran_output,
asset_key_prefix=[],
):
# scan through all tables actually created, if it was expected then emit an Output.
# otherwise, emit a runtime AssetMaterialization
if materialization.asset_key in context.selected_asset_keys:
yield Output(
value=None,
output_name=materialization.asset_key.to_python_identifier(),
metadata=materialization.metadata,
for table_source_name, table in schema.tables.items():
if not table.enabled:
continue

asset_key = dagster_fivetran_translator.get_asset_spec(
props=FivetranConnectorTableProps(
table=get_fivetran_connector_table_name(
schema_name=schema.name_in_destination,
table_name=table.name_in_destination,
),
connector_id=connector.id,
name=connector.name,
connector_url=connector.url,
schema_config=schema_config,
database=None,
service=None,
)
).key

materialization = AssetMaterialization(
asset_key=asset_key,
description=f"Table generated via Fivetran sync: {schema.name}.{table.name}",
metadata={
**metadata_for_table(
table,
get_fivetran_connector_url(fivetran_output.connector_details),
include_column_info=True,
database=None,
schema=schema.name,
table=table.name,
),
"schema_source_name": schema_source_name,
"table_source_name": table_source_name,
},
)
materialized_asset_keys.add(materialization.asset_key)
if not materialization:
continue

else:
yield materialization
# scan through all tables actually created, if it was expected then emit an Output.
# otherwise, emit a runtime AssetMaterialization
if materialization.asset_key in context.selected_asset_keys:
yield Output(
value=None,
output_name=materialization.asset_key.to_python_identifier(),
metadata=materialization.metadata,
)
materialized_asset_keys.add(materialization.asset_key)
else:
context.log.warning(
f"An unexpected asset was materialized: {materialization.asset_key}"
)
yield materialization

unmaterialized_asset_keys = context.selected_asset_keys - materialized_asset_keys
if unmaterialized_asset_keys:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,23 @@
from typing import Any, Dict, Iterator, Mapping, Optional, Sequence
from typing import TYPE_CHECKING, Any, Dict, Iterator, Mapping, Optional, Sequence

import dagster._check as check
from dagster import AssetMaterialization, MetadataValue
from dagster import (
AssetMaterialization,
AssetsDefinition,
DagsterInvariantViolationError,
MetadataValue,
)
from dagster._core.definitions.metadata import RawMetadataMapping
from dagster._core.definitions.metadata.metadata_set import TableMetadataSet
from dagster._core.definitions.metadata.table import TableColumn, TableSchema

from dagster_fivetran.types import FivetranOutput

if TYPE_CHECKING:
from dagster_fivetran import DagsterFivetranTranslator

DAGSTER_FIVETRAN_TRANSLATOR_METADATA_KEY = "dagster-fivetran/dagster_fivetran_translator"


def get_fivetran_connector_url(connector_details: Mapping[str, Any]) -> str:
service = connector_details["service"]
Expand All @@ -23,6 +33,21 @@ def get_fivetran_connector_table_name(schema_name: str, table_name: str) -> str:
return f"{schema_name}.{table_name}"


def get_translator_from_fivetran_assets(
fivetran_assets: AssetsDefinition,
) -> "DagsterFivetranTranslator":
metadata_by_key = fivetran_assets.metadata_by_key or {}
first_asset_key = next(iter(fivetran_assets.metadata_by_key.keys()))
first_metadata = metadata_by_key.get(first_asset_key, {})
dagster_fivetran_translator = first_metadata.get(DAGSTER_FIVETRAN_TRANSLATOR_METADATA_KEY)
if dagster_fivetran_translator is None:
raise DagsterInvariantViolationError(
f"Expected to find fivetran translator metadata on asset {first_asset_key.to_user_string()},"
" but did not. Did you pass in assets that weren't generated by @fivetran_assets?"
)
return dagster_fivetran_translator


def metadata_for_table(
table_data: Mapping[str, Any],
connector_url: str,
Expand Down

0 comments on commit 3b478fc

Please sign in to comment.