Skip to content

Commit

Permalink
[10/n][dagster-fivetran] Implement fivetran_assets and build_fivetran…
Browse files Browse the repository at this point in the history
…_assets_definitions (#25944)

## Summary & Motivation

This PR implements the `fivetran_assets` decorator and the
`build_fivetran_assets_definitions` factory.
- `fivetran_assets` can be used to create all assets for a given
Fivetran connector, i.e. one asset per table in the connector.
- `build_fivetran_assets_definitions` can be used to create all Fivetran
assets defs, one per connector. It uses `fivetran_assets`.

Both the asset decorator and factory use `load_fivetran_asset_specs`.
This is motivated by the current implementation of `dagster-dbt`,
`dagster-dlt` and `dagster-sling` - each leverages an asset decorator
that loads the asset specs by itself.

To avoid calling the Fivetran API each time `load_fivetran_asset_specs`
is called, it is cached using `functools.lru_cache`.
`load_fivetran_asset_specs` uses the state-backed defs, so reloading the
code won't make additional calls to the Fivetran API, but calling
`load_fivetran_asset_specs` N times in a script will make N calls to the
Fivetran API.

The goals here are:
- make the Fivetran integration as similar as possible to the other ELT
integrations by using the same patterns, eg. asset decorator
- make the user experience as simple as possible and avoid having users
manage the asset specs and number of calls to the Fivetran API.

## How I Tested These Changes

Additional unit tests with BK.

## Changelog

[dagster-fivetran] The `fivetran_assets` decorator is added. It can be
used with the `FivetranWorkspace` resource and
`DagsterFivetranTranslator` translator to load Fivetran tables for a
given connector as assets in Dagster. The
`build_fivetran_assets_definitions` factory can be used to create assets
for all the connectors in your Fivetran workspace.
  • Loading branch information
maximearmstrong authored and cmpadden committed Dec 5, 2024
1 parent 33a44b4 commit 4680714
Show file tree
Hide file tree
Showing 7 changed files with 363 additions and 4 deletions.
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
from dagster._core.libraries import DagsterLibraryRegistry

from dagster_fivetran.asset_decorator import fivetran_assets as fivetran_assets
from dagster_fivetran.asset_defs import (
build_fivetran_assets as build_fivetran_assets,
build_fivetran_assets_definitions as build_fivetran_assets_definitions,
load_assets_from_fivetran_instance as load_assets_from_fivetran_instance,
)
from dagster_fivetran.ops import (
Expand All @@ -14,7 +16,10 @@
fivetran_resource as fivetran_resource,
load_fivetran_asset_specs as load_fivetran_asset_specs,
)
from dagster_fivetran.translator import DagsterFivetranTranslator as DagsterFivetranTranslator
from dagster_fivetran.translator import (
DagsterFivetranTranslator as DagsterFivetranTranslator,
FivetranConnectorTableProps as FivetranConnectorTableProps,
)
from dagster_fivetran.types import FivetranOutput as FivetranOutput
from dagster_fivetran.version import __version__ as __version__

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
from typing import Any, Callable, Optional

from dagster import AssetsDefinition, multi_asset
from dagster._annotations import experimental

from dagster_fivetran.resources import FivetranWorkspace
from dagster_fivetran.translator import DagsterFivetranTranslator, FivetranMetadataSet


@experimental
def fivetran_assets(
*,
connector_id: str,
workspace: FivetranWorkspace,
name: Optional[str] = None,
group_name: Optional[str] = None,
dagster_fivetran_translator: Optional[DagsterFivetranTranslator] = None,
) -> Callable[[Callable[..., Any]], AssetsDefinition]:
"""Create a definition for how to sync the tables of a given Fivetran connector.
Args:
connector_id (str): The Fivetran Connector ID. You can retrieve this value from the
"Setup" tab of a given connector in the Fivetran UI.
workspace (FivetranWorkspace): The Fivetran workspace to fetch assets from.
name (Optional[str], optional): The name of the op.
group_name (Optional[str], optional): The name of the asset group.
dagster_fivetran_translator (Optional[DagsterFivetranTranslator], optional): The translator to use
to convert Fivetran content into :py:class:`dagster.AssetSpec`.
Defaults to :py:class:`DagsterFivetranTranslator`.
Examples:
Sync the tables of a Fivetran connector:
.. code-block:: python
from dagster_fivetran import FivetranWorkspace, fivetran_assets
import dagster as dg
fivetran_workspace = FivetranWorkspace(
account_id=dg.EnvVar("FIVETRAN_ACCOUNT_ID"),
api_key=dg.EnvVar("FIVETRAN_API_KEY"),
api_secret=dg.EnvVar("FIVETRAN_API_SECRET"),
)
@fivetran_assets(
connector_id="fivetran_connector_id",
name="fivetran_connector_id",
group_name="fivetran_connector_id",
workspace=fivetran_workspace,
)
def fivetran_connector_assets(context: dg.AssetExecutionContext, fivetran: FivetranWorkspace):
yield from fivetran.sync_and_poll(context=context)
defs = dg.Definitions(
assets=[fivetran_connector_assets],
resources={"fivetran": fivetran_workspace},
)
Sync the tables of a Fivetran connector with a custom translator:
.. code-block:: python
from dagster_fivetran import (
DagsterFivetranTranslator,
FivetranConnectorTableProps,
FivetranWorkspace,
fivetran_assets
)
import dagster as dg
from dagster._core.definitions.asset_spec import replace_attributes
class CustomDagsterFivetranTranslator(DagsterFivetranTranslator):
def get_asset_spec(self, props: FivetranConnectorTableProps) -> dg.AssetSpec:
asset_spec = super().get_asset_spec(props)
return replace_attributes(
asset_spec,
key=asset_spec.key.with_prefix("my_prefix"),
)
fivetran_workspace = FivetranWorkspace(
account_id=dg.EnvVar("FIVETRAN_ACCOUNT_ID"),
api_key=dg.EnvVar("FIVETRAN_API_KEY"),
api_secret=dg.EnvVar("FIVETRAN_API_SECRET"),
)
@fivetran_assets(
connector_id="fivetran_connector_id",
name="fivetran_connector_id",
group_name="fivetran_connector_id",
workspace=fivetran_workspace,
dagster_fivetran_translator=CustomDagsterFivetranTranslator(),
)
def fivetran_connector_assets(context: dg.AssetExecutionContext, fivetran: FivetranWorkspace):
yield from fivetran.sync_and_poll(context=context)
defs = dg.Definitions(
assets=[fivetran_connector_assets],
resources={"fivetran": fivetran_workspace},
)
"""
return multi_asset(
name=name,
group_name=group_name,
can_subset=True,
specs=[
spec
for spec in workspace.load_asset_specs(
dagster_fivetran_translator=dagster_fivetran_translator
or DagsterFivetranTranslator()
)
if FivetranMetadataSet.extract(spec.metadata).connector_id == connector_id
],
)
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,14 @@
)

from dagster import (
AssetExecutionContext,
AssetKey,
AssetsDefinition,
OpExecutionContext,
_check as check,
multi_asset,
)
from dagster._annotations import experimental
from dagster._core.definitions.asset_spec import AssetSpec
from dagster._core.definitions.cacheable_assets import (
AssetsDefinitionCacheableData,
Expand All @@ -41,10 +43,12 @@
from dagster._core.utils import imap
from dagster._utils.log import get_dagster_logger

from dagster_fivetran.resources import DEFAULT_POLL_INTERVAL, FivetranResource
from dagster_fivetran.asset_decorator import fivetran_assets
from dagster_fivetran.resources import DEFAULT_POLL_INTERVAL, FivetranResource, FivetranWorkspace
from dagster_fivetran.translator import (
DagsterFivetranTranslator,
FivetranConnectorTableProps,
FivetranMetadataSet,
FivetranSchemaConfig,
)
from dagster_fivetran.utils import (
Expand Down Expand Up @@ -725,3 +729,114 @@ def load_assets_from_fivetran_instance(
fetch_column_metadata=fetch_column_metadata,
translator=translator,
)


# -----------------------
# Reworked assets factory
# -----------------------


@experimental
def build_fivetran_assets_definitions(
*,
workspace: FivetranWorkspace,
dagster_fivetran_translator: Optional[DagsterFivetranTranslator] = None,
) -> Sequence[AssetsDefinition]:
"""The list of AssetsDefinition for all connectors in the Fivetran workspace.
Args:
workspace (FivetranWorkspace): The Fivetran workspace to fetch assets from.
dagster_fivetran_translator (Optional[DagsterFivetranTranslator], optional): The translator to use
to convert Fivetran content into :py:class:`dagster.AssetSpec`.
Defaults to :py:class:`DagsterFivetranTranslator`.
Returns:
List[AssetsDefinition]: The list of AssetsDefinition for all connectors in the Fivetran workspace.
Examples:
Sync the tables of a Fivetran connector:
.. code-block:: python
from dagster_fivetran import FivetranWorkspace, build_fivetran_assets_definitions
import dagster as dg
fivetran_workspace = FivetranWorkspace(
account_id=dg.EnvVar("FIVETRAN_ACCOUNT_ID"),
api_key=dg.EnvVar("FIVETRAN_API_KEY"),
api_secret=dg.EnvVar("FIVETRAN_API_SECRET"),
)
fivetran_assets = build_fivetran_assets_definitions(workspace=workspace)
defs = dg.Definitions(
assets=[*fivetran_assets],
resources={"fivetran": fivetran_workspace},
)
Sync the tables of a Fivetran connector with a custom translator:
.. code-block:: python
from dagster_fivetran import (
DagsterFivetranTranslator,
FivetranConnectorTableProps,
FivetranWorkspace,
build_fivetran_assets_definitions
)
import dagster as dg
from dagster._core.definitions.asset_spec import replace_attributes
class CustomDagsterFivetranTranslator(DagsterFivetranTranslator):
def get_asset_spec(self, props: FivetranConnectorTableProps) -> dg.AssetSpec:
asset_spec = super().get_asset_spec(props)
return replace_attributes(
asset_spec,
key=asset_spec.key.with_prefix("my_prefix"),
)
fivetran_workspace = FivetranWorkspace(
account_id=dg.EnvVar("FIVETRAN_ACCOUNT_ID"),
api_key=dg.EnvVar("FIVETRAN_API_KEY"),
api_secret=dg.EnvVar("FIVETRAN_API_SECRET"),
)
fivetran_assets = build_fivetran_assets_definitions(
workspace=workspace,
dagster_fivetran_translator=CustomDagsterFivetranTranslator()
)
defs = dg.Definitions(
assets=[*fivetran_assets],
resources={"fivetran": fivetran_workspace},
)
"""
dagster_fivetran_translator = dagster_fivetran_translator or DagsterFivetranTranslator()

all_asset_specs = workspace.load_asset_specs(
dagster_fivetran_translator=dagster_fivetran_translator
)

connector_ids = {
check.not_none(FivetranMetadataSet.extract(spec.metadata).connector_id)
for spec in all_asset_specs
}

_asset_fns = []
for connector_id in connector_ids:

@fivetran_assets(
connector_id=connector_id,
workspace=workspace,
name=connector_id,
group_name=connector_id,
dagster_fivetran_translator=dagster_fivetran_translator,
)
def _asset_fn(context: AssetExecutionContext, fivetran: FivetranWorkspace):
yield from fivetran.sync_and_poll(context=context)

_asset_fns.append(_asset_fn)

return _asset_fns
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,17 @@
import time
from datetime import datetime, timedelta
from functools import partial
from typing import Any, Callable, Mapping, Optional, Sequence, Tuple, Type
from typing import Any, Callable, Mapping, Optional, Sequence, Tuple, Type, Union
from urllib.parse import urljoin

import requests
from dagster import (
AssetExecutionContext,
Definitions,
Failure,
InitResourceContext,
MetadataValue,
OpExecutionContext,
__version__,
_check as check,
get_dagster_logger,
Expand Down Expand Up @@ -890,6 +892,49 @@ def fetch_fivetran_workspace_data(
schema_configs_by_connector_id=schema_configs_by_connector_id,
)

@cached_method
def load_asset_specs(
self,
dagster_fivetran_translator: Optional[DagsterFivetranTranslator] = None,
) -> Sequence[AssetSpec]:
"""Returns a list of AssetSpecs representing the Fivetran content in the workspace.
Args:
dagster_fivetran_translator (Optional[DagsterFivetranTranslator], optional): The translator to use
to convert Fivetran content into :py:class:`dagster.AssetSpec`.
Defaults to :py:class:`DagsterFivetranTranslator`.
Returns:
List[AssetSpec]: The set of assets representing the Fivetran content in the workspace.
Examples:
Loading the asset specs for a given Fivetran workspace:
.. code-block:: python
from dagster_fivetran import FivetranWorkspace, load_fivetran_asset_specs
import dagster as dg
fivetran_workspace = FivetranWorkspace(
account_id=dg.EnvVar("FIVETRAN_ACCOUNT_ID"),
api_key=dg.EnvVar("FIVETRAN_API_KEY"),
api_secret=dg.EnvVar("FIVETRAN_API_SECRET"),
)
fivetran_specs = fivetran_workspace.load_asset_specs()
defs = dg.Definitions(assets=[*fivetran_specs], resources={"fivetran": fivetran_workspace}
"""
dagster_fivetran_translator = dagster_fivetran_translator or DagsterFivetranTranslator()

return load_fivetran_asset_specs(
workspace=self, dagster_fivetran_translator=dagster_fivetran_translator.__class__
)

def sync_and_poll(
self, context: Optional[Union[OpExecutionContext, AssetExecutionContext]] = None
):
raise NotImplementedError()


@experimental
def load_fivetran_asset_specs(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from dagster import Failure
from dagster._core.definitions.asset_key import AssetKey
from dagster._core.definitions.asset_spec import AssetSpec
from dagster._core.definitions.metadata.metadata_set import NamespacedMetadataSet
from dagster._record import as_dict, record
from dagster._serdes.serdes import whitelist_for_serdes
from dagster._utils.cached_method import cached_method
Expand Down Expand Up @@ -248,6 +249,14 @@ def to_fivetran_connector_table_props_data(self) -> Sequence[FivetranConnectorTa
return data


class FivetranMetadataSet(NamespacedMetadataSet):
connector_id: Optional[str] = None

@classmethod
def namespace(cls) -> str:
return "dagster-fivetran"


class DagsterFivetranTranslator:
"""Translator class which converts a `FivetranConnectorTableProps` object into AssetSpecs.
Subclass this class to implement custom logic for each type of Fivetran content.
Expand Down Expand Up @@ -275,8 +284,10 @@ def get_asset_spec(self, props: FivetranConnectorTableProps) -> AssetSpec:
table=table_name,
)

augmented_metadata = {**metadata, **FivetranMetadataSet(connector_id=props.connector_id)}

return AssetSpec(
key=AssetKey(props.table.split(".")),
metadata=metadata,
metadata=augmented_metadata,
kinds={"fivetran", *({props.service} if props.service else set())},
)
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
TEST_ACCOUNT_ID = "test_account_id"
TEST_API_KEY = "test_api_key"
TEST_API_SECRET = "test_api_secret"
TEST_ANOTHER_ACCOUNT_ID = "test_another_account_id"

# Taken from Fivetran API documentation
# https://fivetran.com/docs/rest-api/api-reference/groups/list-all-groups
Expand Down
Loading

0 comments on commit 4680714

Please sign in to comment.