Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[10/n][dagster-fivetran] Implement fivetran_assets and build_fivetran_assets_definitions #25944

Merged
merged 7 commits into from
Dec 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
from dagster._core.libraries import DagsterLibraryRegistry

from dagster_fivetran.asset_decorator import fivetran_assets as fivetran_assets
from dagster_fivetran.asset_defs import (
build_fivetran_assets as build_fivetran_assets,
build_fivetran_assets_definitions as build_fivetran_assets_definitions,
load_assets_from_fivetran_instance as load_assets_from_fivetran_instance,
)
from dagster_fivetran.ops import (
Expand All @@ -14,7 +16,10 @@
fivetran_resource as fivetran_resource,
load_fivetran_asset_specs as load_fivetran_asset_specs,
)
from dagster_fivetran.translator import DagsterFivetranTranslator as DagsterFivetranTranslator
from dagster_fivetran.translator import (
DagsterFivetranTranslator as DagsterFivetranTranslator,
FivetranConnectorTableProps as FivetranConnectorTableProps,
)
from dagster_fivetran.types import FivetranOutput as FivetranOutput
from dagster_fivetran.version import __version__ as __version__

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
from typing import Any, Callable, Optional

from dagster import AssetsDefinition, multi_asset
from dagster._annotations import experimental

from dagster_fivetran.resources import FivetranWorkspace
from dagster_fivetran.translator import DagsterFivetranTranslator, FivetranMetadataSet


@experimental
def fivetran_assets(
*,
connector_id: str,
workspace: FivetranWorkspace,
name: Optional[str] = None,
group_name: Optional[str] = None,
dagster_fivetran_translator: Optional[DagsterFivetranTranslator] = None,
) -> Callable[[Callable[..., Any]], AssetsDefinition]:
"""Create a definition for how to sync the tables of a given Fivetran connector.

Args:
connector_id (str): The Fivetran Connector ID. You can retrieve this value from the
"Setup" tab of a given connector in the Fivetran UI.
workspace (FivetranWorkspace): The Fivetran workspace to fetch assets from.
name (Optional[str], optional): The name of the op.
group_name (Optional[str], optional): The name of the asset group.
dagster_fivetran_translator (Optional[DagsterFivetranTranslator], optional): The translator to use
to convert Fivetran content into :py:class:`dagster.AssetSpec`.
Defaults to :py:class:`DagsterFivetranTranslator`.

Examples:
Sync the tables of a Fivetran connector:

.. code-block:: python
from dagster_fivetran import FivetranWorkspace, fivetran_assets

import dagster as dg

fivetran_workspace = FivetranWorkspace(
account_id=dg.EnvVar("FIVETRAN_ACCOUNT_ID"),
api_key=dg.EnvVar("FIVETRAN_API_KEY"),
api_secret=dg.EnvVar("FIVETRAN_API_SECRET"),
)

@fivetran_assets(
connector_id="fivetran_connector_id",
name="fivetran_connector_id",
group_name="fivetran_connector_id",
workspace=fivetran_workspace,
)
def fivetran_connector_assets(context: dg.AssetExecutionContext, fivetran: FivetranWorkspace):
yield from fivetran.sync_and_poll(context=context)

defs = dg.Definitions(
assets=[fivetran_connector_assets],
resources={"fivetran": fivetran_workspace},
)

Sync the tables of a Fivetran connector with a custom translator:

.. code-block:: python
from dagster_fivetran import (
DagsterFivetranTranslator,
FivetranConnectorTableProps,
FivetranWorkspace,
fivetran_assets
)

import dagster as dg
from dagster._core.definitions.asset_spec import replace_attributes

class CustomDagsterFivetranTranslator(DagsterFivetranTranslator):
def get_asset_spec(self, props: FivetranConnectorTableProps) -> dg.AssetSpec:
asset_spec = super().get_asset_spec(props)
return replace_attributes(
asset_spec,
key=asset_spec.key.with_prefix("my_prefix"),
)


fivetran_workspace = FivetranWorkspace(
account_id=dg.EnvVar("FIVETRAN_ACCOUNT_ID"),
api_key=dg.EnvVar("FIVETRAN_API_KEY"),
api_secret=dg.EnvVar("FIVETRAN_API_SECRET"),
)

@fivetran_assets(
connector_id="fivetran_connector_id",
name="fivetran_connector_id",
group_name="fivetran_connector_id",
workspace=fivetran_workspace,
dagster_fivetran_translator=CustomDagsterFivetranTranslator(),
)
def fivetran_connector_assets(context: dg.AssetExecutionContext, fivetran: FivetranWorkspace):
yield from fivetran.sync_and_poll(context=context)

defs = dg.Definitions(
assets=[fivetran_connector_assets],
resources={"fivetran": fivetran_workspace},
)

"""
return multi_asset(
name=name,
group_name=group_name,
can_subset=True,
specs=[
spec
for spec in workspace.load_asset_specs(
dagster_fivetran_translator=dagster_fivetran_translator
or DagsterFivetranTranslator()
)
if FivetranMetadataSet.extract(spec.metadata).connector_id == connector_id
],
)
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,14 @@
)

from dagster import (
AssetExecutionContext,
AssetKey,
AssetsDefinition,
OpExecutionContext,
_check as check,
multi_asset,
)
from dagster._annotations import experimental
from dagster._core.definitions.asset_spec import AssetSpec
from dagster._core.definitions.cacheable_assets import (
AssetsDefinitionCacheableData,
Expand All @@ -41,10 +43,12 @@
from dagster._core.utils import imap
from dagster._utils.log import get_dagster_logger

from dagster_fivetran.resources import DEFAULT_POLL_INTERVAL, FivetranResource
from dagster_fivetran.asset_decorator import fivetran_assets
from dagster_fivetran.resources import DEFAULT_POLL_INTERVAL, FivetranResource, FivetranWorkspace
from dagster_fivetran.translator import (
DagsterFivetranTranslator,
FivetranConnectorTableProps,
FivetranMetadataSet,
FivetranSchemaConfig,
)
from dagster_fivetran.utils import (
Expand Down Expand Up @@ -725,3 +729,114 @@ def load_assets_from_fivetran_instance(
fetch_column_metadata=fetch_column_metadata,
translator=translator,
)


# -----------------------
# Reworked assets factory
# -----------------------


@experimental
def build_fivetran_assets_definitions(
*,
workspace: FivetranWorkspace,
dagster_fivetran_translator: Optional[DagsterFivetranTranslator] = None,
) -> Sequence[AssetsDefinition]:
"""The list of AssetsDefinition for all connectors in the Fivetran workspace.

Args:
workspace (FivetranWorkspace): The Fivetran workspace to fetch assets from.
dagster_fivetran_translator (Optional[DagsterFivetranTranslator], optional): The translator to use
to convert Fivetran content into :py:class:`dagster.AssetSpec`.
Defaults to :py:class:`DagsterFivetranTranslator`.

Returns:
List[AssetsDefinition]: The list of AssetsDefinition for all connectors in the Fivetran workspace.

Examples:
Sync the tables of a Fivetran connector:

.. code-block:: python
from dagster_fivetran import FivetranWorkspace, build_fivetran_assets_definitions

import dagster as dg

fivetran_workspace = FivetranWorkspace(
account_id=dg.EnvVar("FIVETRAN_ACCOUNT_ID"),
api_key=dg.EnvVar("FIVETRAN_API_KEY"),
api_secret=dg.EnvVar("FIVETRAN_API_SECRET"),
)

fivetran_assets = build_fivetran_assets_definitions(workspace=workspace)

defs = dg.Definitions(
assets=[*fivetran_assets],
resources={"fivetran": fivetran_workspace},
)

Sync the tables of a Fivetran connector with a custom translator:

.. code-block:: python
from dagster_fivetran import (
DagsterFivetranTranslator,
FivetranConnectorTableProps,
FivetranWorkspace,
build_fivetran_assets_definitions
)

import dagster as dg
from dagster._core.definitions.asset_spec import replace_attributes

class CustomDagsterFivetranTranslator(DagsterFivetranTranslator):
def get_asset_spec(self, props: FivetranConnectorTableProps) -> dg.AssetSpec:
asset_spec = super().get_asset_spec(props)
return replace_attributes(
asset_spec,
key=asset_spec.key.with_prefix("my_prefix"),
)


fivetran_workspace = FivetranWorkspace(
account_id=dg.EnvVar("FIVETRAN_ACCOUNT_ID"),
api_key=dg.EnvVar("FIVETRAN_API_KEY"),
api_secret=dg.EnvVar("FIVETRAN_API_SECRET"),
)

fivetran_assets = build_fivetran_assets_definitions(
workspace=workspace,
dagster_fivetran_translator=CustomDagsterFivetranTranslator()
)

defs = dg.Definitions(
assets=[*fivetran_assets],
resources={"fivetran": fivetran_workspace},
)

"""
dagster_fivetran_translator = dagster_fivetran_translator or DagsterFivetranTranslator()

all_asset_specs = workspace.load_asset_specs(
dagster_fivetran_translator=dagster_fivetran_translator
)

connector_ids = {
check.not_none(FivetranMetadataSet.extract(spec.metadata).connector_id)
for spec in all_asset_specs
}

_asset_fns = []
for connector_id in connector_ids:

@fivetran_assets(
connector_id=connector_id,
workspace=workspace,
name=connector_id,
group_name=connector_id,
dagster_fivetran_translator=dagster_fivetran_translator,
)
def _asset_fn(context: AssetExecutionContext, fivetran: FivetranWorkspace):
yield from fivetran.sync_and_poll(context=context)
maximearmstrong marked this conversation as resolved.
Show resolved Hide resolved

_asset_fns.append(_asset_fn)

return _asset_fns
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,17 @@
import time
from datetime import datetime, timedelta
from functools import partial
from typing import Any, Callable, Mapping, Optional, Sequence, Tuple, Type
from typing import Any, Callable, Mapping, Optional, Sequence, Tuple, Type, Union
from urllib.parse import urljoin

import requests
from dagster import (
AssetExecutionContext,
Definitions,
Failure,
InitResourceContext,
MetadataValue,
OpExecutionContext,
__version__,
_check as check,
get_dagster_logger,
Expand Down Expand Up @@ -890,6 +892,49 @@ def fetch_fivetran_workspace_data(
schema_configs_by_connector_id=schema_configs_by_connector_id,
)

@cached_method
def load_asset_specs(
self,
dagster_fivetran_translator: Optional[DagsterFivetranTranslator] = None,
) -> Sequence[AssetSpec]:
"""Returns a list of AssetSpecs representing the Fivetran content in the workspace.

Args:
dagster_fivetran_translator (Optional[DagsterFivetranTranslator], optional): The translator to use
to convert Fivetran content into :py:class:`dagster.AssetSpec`.
Defaults to :py:class:`DagsterFivetranTranslator`.

Returns:
List[AssetSpec]: The set of assets representing the Fivetran content in the workspace.

Examples:
Loading the asset specs for a given Fivetran workspace:

.. code-block:: python
from dagster_fivetran import FivetranWorkspace, load_fivetran_asset_specs

import dagster as dg

fivetran_workspace = FivetranWorkspace(
account_id=dg.EnvVar("FIVETRAN_ACCOUNT_ID"),
api_key=dg.EnvVar("FIVETRAN_API_KEY"),
api_secret=dg.EnvVar("FIVETRAN_API_SECRET"),
)

fivetran_specs = fivetran_workspace.load_asset_specs()
defs = dg.Definitions(assets=[*fivetran_specs], resources={"fivetran": fivetran_workspace}
"""
dagster_fivetran_translator = dagster_fivetran_translator or DagsterFivetranTranslator()

return load_fivetran_asset_specs(
workspace=self, dagster_fivetran_translator=dagster_fivetran_translator.__class__
)

def sync_and_poll(
self, context: Optional[Union[OpExecutionContext, AssetExecutionContext]] = None
):
raise NotImplementedError()


@experimental
def load_fivetran_asset_specs(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from dagster import Failure
from dagster._core.definitions.asset_key import AssetKey
from dagster._core.definitions.asset_spec import AssetSpec
from dagster._core.definitions.metadata.metadata_set import NamespacedMetadataSet
from dagster._record import as_dict, record
from dagster._serdes.serdes import whitelist_for_serdes
from dagster._utils.cached_method import cached_method
Expand Down Expand Up @@ -248,6 +249,14 @@ def to_fivetran_connector_table_props_data(self) -> Sequence[FivetranConnectorTa
return data


class FivetranMetadataSet(NamespacedMetadataSet):
connector_id: Optional[str] = None

@classmethod
def namespace(cls) -> str:
return "dagster-fivetran"


class DagsterFivetranTranslator:
"""Translator class which converts a `FivetranConnectorTableProps` object into AssetSpecs.
Subclass this class to implement custom logic for each type of Fivetran content.
Expand Down Expand Up @@ -275,8 +284,10 @@ def get_asset_spec(self, props: FivetranConnectorTableProps) -> AssetSpec:
table=table_name,
)

augmented_metadata = {**metadata, **FivetranMetadataSet(connector_id=props.connector_id)}

return AssetSpec(
key=AssetKey(props.table.split(".")),
metadata=metadata,
metadata=augmented_metadata,
kinds={"fivetran", *({props.service} if props.service else set())},
)
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
TEST_ACCOUNT_ID = "test_account_id"
TEST_API_KEY = "test_api_key"
TEST_API_SECRET = "test_api_secret"
TEST_ANOTHER_ACCOUNT_ID = "test_another_account_id"

# Taken from Fivetran API documentation
# https://fivetran.com/docs/rest-api/api-reference/groups/list-all-groups
Expand Down
Loading