Skip to content

Commit

Permalink
[11/n][dagster-fivetran] Implement materialization method in Fivetran…
Browse files Browse the repository at this point in the history
…Workspace (#25961)

## Summary & Motivation

This PR implements `FivetranWorkspace.sync_and_poll`, the
materialization method for Fivetran assets. This method:
- calls `FivetranClient.sync_and_poll`
- takes the FivetranOutput returned by `FivetranClient.sync_and_poll`
and generates the asset materializations
- yields `MaterializeResult` for each expected asset and
`AssetMaterialization` for each unexpected asset
- a connector table that was not in the connector at definitions loading
time can be in the FivetranOutput. Eg. the table was added after
definitions loading time and before sync.
- logs a warning for each unmaterialized table
- a connector table can be created at definitions loading time, but can
be missing in the FivetranOutput. Eg. the table was deleted after
definitions loading time and before sync.

Can be leveraged like:

```python
from dagster_fivetran import FivetranWorkspace, fivetran_assets

import dagster as dg

fivetran_workspace = FivetranWorkspace(
    account_id=dg.EnvVar("FIVETRAN_API_KEY"),
    api_key=dg.EnvVar("FIVETRAN_API_KEY"),
    api_secret=dg.EnvVar("FIVETRAN_API_SECRET"),
)


@fivetran_assets(
    connector_id="connector_id",
    name="connector_id",
    group_name="connector_id",
    workspace=fivetran_workspace,
)
def fivetran_connector_assets(context: dg.AssetExecutionContext, fivetran: FivetranWorkspace):
    yield from fivetran.sync_and_poll(context=context)


defs = dg.Definitions(
    assets=[fivetran_connector_assets],
    resources={"fivetran": fivetran_workspace},
)
```

## How I Tested These Changes

Additional tests with BK

Tested with a live Fivetran instance:

<img width="1251" alt="Screenshot 2024-11-26 at 5 46 31 PM"
src="https://github.com/user-attachments/assets/e4253119-045f-4ac7-8b98-eb805e24a843">

## Changelog

[dagster-fivetran]
  • Loading branch information
maximearmstrong authored Dec 5, 2024
1 parent a0f9496 commit f6a914b
Show file tree
Hide file tree
Showing 5 changed files with 405 additions and 153 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

from dagster_fivetran.resources import FivetranWorkspace
from dagster_fivetran.translator import DagsterFivetranTranslator, FivetranMetadataSet
from dagster_fivetran.utils import DAGSTER_FIVETRAN_TRANSLATOR_METADATA_KEY


@experimental
Expand Down Expand Up @@ -100,15 +101,18 @@ def fivetran_connector_assets(context: dg.AssetExecutionContext, fivetran: Fivet
)
"""
dagster_fivetran_translator = dagster_fivetran_translator or DagsterFivetranTranslator()

return multi_asset(
name=name,
group_name=group_name,
can_subset=True,
specs=[
spec
spec.merge_attributes(
metadata={DAGSTER_FIVETRAN_TRANSLATOR_METADATA_KEY: dagster_fivetran_translator}
)
for spec in workspace.load_asset_specs(
dagster_fivetran_translator=dagster_fivetran_translator
or DagsterFivetranTranslator()
)
if FivetranMetadataSet.extract(spec.metadata).connector_id == connector_id
],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,17 @@
import time
from datetime import datetime, timedelta
from functools import partial
from typing import Any, Callable, Mapping, Optional, Sequence, Tuple, Union
from typing import Any, Callable, Iterator, Mapping, Optional, Sequence, Tuple, Union
from urllib.parse import urljoin

import requests
from dagster import (
AssetExecutionContext,
AssetMaterialization,
Definitions,
Failure,
InitResourceContext,
MaterializeResult,
MetadataValue,
OpExecutionContext,
__version__,
Expand All @@ -25,7 +27,7 @@
from dagster._core.definitions.asset_spec import AssetSpec
from dagster._core.definitions.definitions_load_context import StateBackedDefinitionsLoader
from dagster._core.definitions.resource_definition import dagster_maintained_resource
from dagster._record import record
from dagster._record import as_dict, record
from dagster._utils.cached_method import cached_method
from dagster._vendored.dateutil import parser
from pydantic import Field, PrivateAttr
Expand All @@ -36,12 +38,20 @@
DagsterFivetranTranslator,
FivetranConnector,
FivetranConnectorScheduleType,
FivetranConnectorTableProps,
FivetranDestination,
FivetranMetadataSet,
FivetranSchemaConfig,
FivetranWorkspaceData,
)
from dagster_fivetran.types import FivetranOutput
from dagster_fivetran.utils import get_fivetran_connector_url, get_fivetran_logs_url
from dagster_fivetran.utils import (
get_fivetran_connector_table_name,
get_fivetran_connector_url,
get_fivetran_logs_url,
get_translator_from_fivetran_assets,
metadata_for_table,
)

FIVETRAN_API_BASE = "https://api.fivetran.com"
FIVETRAN_API_VERSION = "v1"
Expand Down Expand Up @@ -832,6 +842,7 @@ class FivetranWorkspace(ConfigurableResource):

_client: FivetranClient = PrivateAttr(default=None)

@cached_method
def get_client(self) -> FivetranClient:
return FivetranClient(
api_key=self.api_key,
Expand Down Expand Up @@ -929,10 +940,108 @@ def load_asset_specs(
dagster_fivetran_translator=dagster_fivetran_translator or DagsterFivetranTranslator(),
)

def sync_and_poll(
self, context: Optional[Union[OpExecutionContext, AssetExecutionContext]] = None
def _generate_materialization(
self,
fivetran_output: FivetranOutput,
dagster_fivetran_translator: DagsterFivetranTranslator,
):
raise NotImplementedError()
connector = FivetranConnector.from_connector_details(
connector_details=fivetran_output.connector_details
)
schema_config = FivetranSchemaConfig.from_schema_config_details(
schema_config_details=fivetran_output.schema_config
)

for schema_source_name, schema in schema_config.schemas.items():
if not schema.enabled:
continue

for table_source_name, table in schema.tables.items():
if not table.enabled:
continue

asset_key = dagster_fivetran_translator.get_asset_spec(
props=FivetranConnectorTableProps(
table=get_fivetran_connector_table_name(
schema_name=schema.name_in_destination,
table_name=table.name_in_destination,
),
connector_id=connector.id,
name=connector.name,
connector_url=connector.url,
schema_config=schema_config,
database=None,
service=None,
)
).key

yield AssetMaterialization(
asset_key=asset_key,
description=(
f"Table generated via Fivetran sync: {schema.name_in_destination}.{table.name_in_destination}"
),
metadata={
**metadata_for_table(
as_dict(table),
get_fivetran_connector_url(fivetran_output.connector_details),
include_column_info=True,
database=None,
schema=schema.name_in_destination,
table=table.name_in_destination,
),
"schema_source_name": schema_source_name,
"table_source_name": table_source_name,
},
)

def sync_and_poll(
self, context: Union[OpExecutionContext, AssetExecutionContext]
) -> Iterator[Union[AssetMaterialization, MaterializeResult]]:
"""Executes a sync and poll process to materialize Fivetran assets.
Args:
context (Union[OpExecutionContext, AssetExecutionContext]): The execution context
from within `@fivetran_assets`. If an AssetExecutionContext is passed,
its underlying OpExecutionContext will be used.
Returns:
Iterator[Union[AssetMaterialization, MaterializeResult]]: An iterator of MaterializeResult
or AssetMaterialization.
"""
assets_def = context.assets_def
dagster_fivetran_translator = get_translator_from_fivetran_assets(assets_def)

connector_id = next(
check.not_none(FivetranMetadataSet.extract(spec.metadata).connector_id)
for spec in assets_def.specs
)

client = self.get_client()
fivetran_output = client.sync_and_poll(
connector_id=connector_id,
)

materialized_asset_keys = set()
for materialization in self._generate_materialization(
fivetran_output=fivetran_output, dagster_fivetran_translator=dagster_fivetran_translator
):
# Scan through all tables actually created, if it was expected then emit a MaterializeResult.
# Otherwise, emit a runtime AssetMaterialization.
if materialization.asset_key in context.selected_asset_keys:
yield MaterializeResult(
asset_key=materialization.asset_key, metadata=materialization.metadata
)
materialized_asset_keys.add(materialization.asset_key)
else:
context.log.warning(
f"An unexpected asset was materialized: {materialization.asset_key}. "
f"Yielding a materialization event."
)
yield materialization

unmaterialized_asset_keys = context.selected_asset_keys - materialized_asset_keys
if unmaterialized_asset_keys:
context.log.warning(f"Assets were not materialized: {unmaterialized_asset_keys}")


@experimental
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,23 @@
from typing import Any, Dict, Iterator, Mapping, Optional, Sequence
from typing import TYPE_CHECKING, Any, Dict, Iterator, Mapping, Optional, Sequence

import dagster._check as check
from dagster import AssetMaterialization, MetadataValue
from dagster import (
AssetMaterialization,
AssetsDefinition,
DagsterInvariantViolationError,
MetadataValue,
)
from dagster._core.definitions.metadata import RawMetadataMapping
from dagster._core.definitions.metadata.metadata_set import TableMetadataSet
from dagster._core.definitions.metadata.table import TableColumn, TableSchema

from dagster_fivetran.types import FivetranOutput

if TYPE_CHECKING:
from dagster_fivetran import DagsterFivetranTranslator

DAGSTER_FIVETRAN_TRANSLATOR_METADATA_KEY = "dagster-fivetran/dagster_fivetran_translator"


def get_fivetran_connector_url(connector_details: Mapping[str, Any]) -> str:
service = connector_details["service"]
Expand All @@ -23,6 +33,21 @@ def get_fivetran_connector_table_name(schema_name: str, table_name: str) -> str:
return f"{schema_name}.{table_name}"


def get_translator_from_fivetran_assets(
fivetran_assets: AssetsDefinition,
) -> "DagsterFivetranTranslator":
metadata_by_key = fivetran_assets.metadata_by_key or {}
first_asset_key = next(iter(fivetran_assets.metadata_by_key.keys()))
first_metadata = metadata_by_key.get(first_asset_key, {})
dagster_fivetran_translator = first_metadata.get(DAGSTER_FIVETRAN_TRANSLATOR_METADATA_KEY)
if dagster_fivetran_translator is None:
raise DagsterInvariantViolationError(
f"Expected to find fivetran translator metadata on asset {first_asset_key.to_user_string()},"
" but did not. Did you pass in assets that weren't generated by @fivetran_assets?"
)
return dagster_fivetran_translator


def metadata_for_table(
table_data: Mapping[str, Any],
connector_url: str,
Expand Down
Loading

0 comments on commit f6a914b

Please sign in to comment.