diff --git a/python_modules/libraries/dagster-fivetran/dagster_fivetran/__init__.py b/python_modules/libraries/dagster-fivetran/dagster_fivetran/__init__.py index 58ea1649585b1..14b96afe5b1ea 100644 --- a/python_modules/libraries/dagster-fivetran/dagster_fivetran/__init__.py +++ b/python_modules/libraries/dagster-fivetran/dagster_fivetran/__init__.py @@ -1,7 +1,9 @@ from dagster._core.libraries import DagsterLibraryRegistry +from dagster_fivetran.asset_decorator import fivetran_assets as fivetran_assets from dagster_fivetran.asset_defs import ( build_fivetran_assets as build_fivetran_assets, + build_fivetran_assets_definitions as build_fivetran_assets_definitions, load_assets_from_fivetran_instance as load_assets_from_fivetran_instance, ) from dagster_fivetran.ops import ( @@ -14,7 +16,10 @@ fivetran_resource as fivetran_resource, load_fivetran_asset_specs as load_fivetran_asset_specs, ) -from dagster_fivetran.translator import DagsterFivetranTranslator as DagsterFivetranTranslator +from dagster_fivetran.translator import ( + DagsterFivetranTranslator as DagsterFivetranTranslator, + FivetranConnectorTableProps as FivetranConnectorTableProps, +) from dagster_fivetran.types import FivetranOutput as FivetranOutput from dagster_fivetran.version import __version__ as __version__ diff --git a/python_modules/libraries/dagster-fivetran/dagster_fivetran/asset_decorator.py b/python_modules/libraries/dagster-fivetran/dagster_fivetran/asset_decorator.py new file mode 100644 index 0000000000000..86c8929421dac --- /dev/null +++ b/python_modules/libraries/dagster-fivetran/dagster_fivetran/asset_decorator.py @@ -0,0 +1,115 @@ +from typing import Any, Callable, Optional + +from dagster import AssetsDefinition, multi_asset +from dagster._annotations import experimental + +from dagster_fivetran.resources import FivetranWorkspace +from dagster_fivetran.translator import DagsterFivetranTranslator, FivetranMetadataSet + + +@experimental +def fivetran_assets( + *, + connector_id: str, + workspace: FivetranWorkspace, + name: Optional[str] = None, + group_name: Optional[str] = None, + dagster_fivetran_translator: Optional[DagsterFivetranTranslator] = None, +) -> Callable[[Callable[..., Any]], AssetsDefinition]: + """Create a definition for how to sync the tables of a given Fivetran connector. + + Args: + connector_id (str): The Fivetran Connector ID. You can retrieve this value from the + "Setup" tab of a given connector in the Fivetran UI. + workspace (FivetranWorkspace): The Fivetran workspace to fetch assets from. + name (Optional[str], optional): The name of the op. + group_name (Optional[str], optional): The name of the asset group. + dagster_fivetran_translator (Optional[DagsterFivetranTranslator], optional): The translator to use + to convert Fivetran content into :py:class:`dagster.AssetSpec`. + Defaults to :py:class:`DagsterFivetranTranslator`. + + Examples: + Sync the tables of a Fivetran connector: + + .. code-block:: python + from dagster_fivetran import FivetranWorkspace, fivetran_assets + + import dagster as dg + + fivetran_workspace = FivetranWorkspace( + account_id=dg.EnvVar("FIVETRAN_ACCOUNT_ID"), + api_key=dg.EnvVar("FIVETRAN_API_KEY"), + api_secret=dg.EnvVar("FIVETRAN_API_SECRET"), + ) + + @fivetran_assets( + connector_id="fivetran_connector_id", + name="fivetran_connector_id", + group_name="fivetran_connector_id", + workspace=fivetran_workspace, + ) + def fivetran_connector_assets(context: dg.AssetExecutionContext, fivetran: FivetranWorkspace): + yield from fivetran.sync_and_poll(context=context) + + defs = dg.Definitions( + assets=[fivetran_connector_assets], + resources={"fivetran": fivetran_workspace}, + ) + + Sync the tables of a Fivetran connector with a custom translator: + + .. code-block:: python + from dagster_fivetran import ( + DagsterFivetranTranslator, + FivetranConnectorTableProps, + FivetranWorkspace, + fivetran_assets + ) + + import dagster as dg + from dagster._core.definitions.asset_spec import replace_attributes + + class CustomDagsterFivetranTranslator(DagsterFivetranTranslator): + def get_asset_spec(self, props: FivetranConnectorTableProps) -> dg.AssetSpec: + asset_spec = super().get_asset_spec(props) + return replace_attributes( + asset_spec, + key=asset_spec.key.with_prefix("my_prefix"), + ) + + + fivetran_workspace = FivetranWorkspace( + account_id=dg.EnvVar("FIVETRAN_ACCOUNT_ID"), + api_key=dg.EnvVar("FIVETRAN_API_KEY"), + api_secret=dg.EnvVar("FIVETRAN_API_SECRET"), + ) + + @fivetran_assets( + connector_id="fivetran_connector_id", + name="fivetran_connector_id", + group_name="fivetran_connector_id", + workspace=fivetran_workspace, + dagster_fivetran_translator=CustomDagsterFivetranTranslator(), + ) + def fivetran_connector_assets(context: dg.AssetExecutionContext, fivetran: FivetranWorkspace): + yield from fivetran.sync_and_poll(context=context) + + defs = dg.Definitions( + assets=[fivetran_connector_assets], + resources={"fivetran": fivetran_workspace}, + ) + + """ + return multi_asset( + name=name, + group_name=group_name, + can_subset=True, + specs=[ + spec + for spec in workspace.load_asset_specs( + dagster_fivetran_translator=dagster_fivetran_translator + or DagsterFivetranTranslator() + ) + if FivetranMetadataSet.extract(spec.metadata).connector_id == connector_id + ], + ) diff --git a/python_modules/libraries/dagster-fivetran/dagster_fivetran/asset_defs.py b/python_modules/libraries/dagster-fivetran/dagster_fivetran/asset_defs.py index aa530331ec2f8..74d5752ea72c2 100644 --- a/python_modules/libraries/dagster-fivetran/dagster_fivetran/asset_defs.py +++ b/python_modules/libraries/dagster-fivetran/dagster_fivetran/asset_defs.py @@ -19,12 +19,14 @@ ) from dagster import ( + AssetExecutionContext, AssetKey, AssetsDefinition, OpExecutionContext, _check as check, multi_asset, ) +from dagster._annotations import experimental from dagster._core.definitions.asset_spec import AssetSpec from dagster._core.definitions.cacheable_assets import ( AssetsDefinitionCacheableData, @@ -41,10 +43,12 @@ from dagster._core.utils import imap from dagster._utils.log import get_dagster_logger -from dagster_fivetran.resources import DEFAULT_POLL_INTERVAL, FivetranResource +from dagster_fivetran.asset_decorator import fivetran_assets +from dagster_fivetran.resources import DEFAULT_POLL_INTERVAL, FivetranResource, FivetranWorkspace from dagster_fivetran.translator import ( DagsterFivetranTranslator, FivetranConnectorTableProps, + FivetranMetadataSet, FivetranSchemaConfig, ) from dagster_fivetran.utils import ( @@ -725,3 +729,114 @@ def load_assets_from_fivetran_instance( fetch_column_metadata=fetch_column_metadata, translator=translator, ) + + +# ----------------------- +# Reworked assets factory +# ----------------------- + + +@experimental +def build_fivetran_assets_definitions( + *, + workspace: FivetranWorkspace, + dagster_fivetran_translator: Optional[DagsterFivetranTranslator] = None, +) -> Sequence[AssetsDefinition]: + """The list of AssetsDefinition for all connectors in the Fivetran workspace. + + Args: + workspace (FivetranWorkspace): The Fivetran workspace to fetch assets from. + dagster_fivetran_translator (Optional[DagsterFivetranTranslator], optional): The translator to use + to convert Fivetran content into :py:class:`dagster.AssetSpec`. + Defaults to :py:class:`DagsterFivetranTranslator`. + + Returns: + List[AssetsDefinition]: The list of AssetsDefinition for all connectors in the Fivetran workspace. + + Examples: + Sync the tables of a Fivetran connector: + + .. code-block:: python + from dagster_fivetran import FivetranWorkspace, build_fivetran_assets_definitions + + import dagster as dg + + fivetran_workspace = FivetranWorkspace( + account_id=dg.EnvVar("FIVETRAN_ACCOUNT_ID"), + api_key=dg.EnvVar("FIVETRAN_API_KEY"), + api_secret=dg.EnvVar("FIVETRAN_API_SECRET"), + ) + + fivetran_assets = build_fivetran_assets_definitions(workspace=workspace) + + defs = dg.Definitions( + assets=[*fivetran_assets], + resources={"fivetran": fivetran_workspace}, + ) + + Sync the tables of a Fivetran connector with a custom translator: + + .. code-block:: python + from dagster_fivetran import ( + DagsterFivetranTranslator, + FivetranConnectorTableProps, + FivetranWorkspace, + build_fivetran_assets_definitions + ) + + import dagster as dg + from dagster._core.definitions.asset_spec import replace_attributes + + class CustomDagsterFivetranTranslator(DagsterFivetranTranslator): + def get_asset_spec(self, props: FivetranConnectorTableProps) -> dg.AssetSpec: + asset_spec = super().get_asset_spec(props) + return replace_attributes( + asset_spec, + key=asset_spec.key.with_prefix("my_prefix"), + ) + + + fivetran_workspace = FivetranWorkspace( + account_id=dg.EnvVar("FIVETRAN_ACCOUNT_ID"), + api_key=dg.EnvVar("FIVETRAN_API_KEY"), + api_secret=dg.EnvVar("FIVETRAN_API_SECRET"), + ) + + fivetran_assets = build_fivetran_assets_definitions( + workspace=workspace, + dagster_fivetran_translator=CustomDagsterFivetranTranslator() + ) + + defs = dg.Definitions( + assets=[*fivetran_assets], + resources={"fivetran": fivetran_workspace}, + ) + + """ + dagster_fivetran_translator = dagster_fivetran_translator or DagsterFivetranTranslator() + + all_asset_specs = workspace.load_asset_specs( + dagster_fivetran_translator=dagster_fivetran_translator + ) + + connector_ids = { + check.not_none(FivetranMetadataSet.extract(spec.metadata).connector_id) + for spec in all_asset_specs + } + + _asset_fns = [] + for connector_id in connector_ids: + + @fivetran_assets( + connector_id=connector_id, + workspace=workspace, + name=connector_id, + group_name=connector_id, + dagster_fivetran_translator=dagster_fivetran_translator, + ) + def _asset_fn(context: AssetExecutionContext, fivetran: FivetranWorkspace): + yield from fivetran.sync_and_poll(context=context) + + _asset_fns.append(_asset_fn) + + return _asset_fns diff --git a/python_modules/libraries/dagster-fivetran/dagster_fivetran/resources.py b/python_modules/libraries/dagster-fivetran/dagster_fivetran/resources.py index 4fa288f9daa38..363a67ac0b983 100644 --- a/python_modules/libraries/dagster-fivetran/dagster_fivetran/resources.py +++ b/python_modules/libraries/dagster-fivetran/dagster_fivetran/resources.py @@ -4,15 +4,17 @@ import time from datetime import datetime, timedelta from functools import partial -from typing import Any, Callable, Mapping, Optional, Sequence, Tuple, Type +from typing import Any, Callable, Mapping, Optional, Sequence, Tuple, Type, Union from urllib.parse import urljoin import requests from dagster import ( + AssetExecutionContext, Definitions, Failure, InitResourceContext, MetadataValue, + OpExecutionContext, __version__, _check as check, get_dagster_logger, @@ -890,6 +892,49 @@ def fetch_fivetran_workspace_data( schema_configs_by_connector_id=schema_configs_by_connector_id, ) + @cached_method + def load_asset_specs( + self, + dagster_fivetran_translator: Optional[DagsterFivetranTranslator] = None, + ) -> Sequence[AssetSpec]: + """Returns a list of AssetSpecs representing the Fivetran content in the workspace. + + Args: + dagster_fivetran_translator (Optional[DagsterFivetranTranslator], optional): The translator to use + to convert Fivetran content into :py:class:`dagster.AssetSpec`. + Defaults to :py:class:`DagsterFivetranTranslator`. + + Returns: + List[AssetSpec]: The set of assets representing the Fivetran content in the workspace. + + Examples: + Loading the asset specs for a given Fivetran workspace: + + .. code-block:: python + from dagster_fivetran import FivetranWorkspace, load_fivetran_asset_specs + + import dagster as dg + + fivetran_workspace = FivetranWorkspace( + account_id=dg.EnvVar("FIVETRAN_ACCOUNT_ID"), + api_key=dg.EnvVar("FIVETRAN_API_KEY"), + api_secret=dg.EnvVar("FIVETRAN_API_SECRET"), + ) + + fivetran_specs = fivetran_workspace.load_asset_specs() + defs = dg.Definitions(assets=[*fivetran_specs], resources={"fivetran": fivetran_workspace} + """ + dagster_fivetran_translator = dagster_fivetran_translator or DagsterFivetranTranslator() + + return load_fivetran_asset_specs( + workspace=self, dagster_fivetran_translator=dagster_fivetran_translator.__class__ + ) + + def sync_and_poll( + self, context: Optional[Union[OpExecutionContext, AssetExecutionContext]] = None + ): + raise NotImplementedError() + @experimental def load_fivetran_asset_specs( diff --git a/python_modules/libraries/dagster-fivetran/dagster_fivetran/translator.py b/python_modules/libraries/dagster-fivetran/dagster_fivetran/translator.py index 09dc7597e4373..e19e422235d6f 100644 --- a/python_modules/libraries/dagster-fivetran/dagster_fivetran/translator.py +++ b/python_modules/libraries/dagster-fivetran/dagster_fivetran/translator.py @@ -5,6 +5,7 @@ from dagster import Failure from dagster._core.definitions.asset_key import AssetKey from dagster._core.definitions.asset_spec import AssetSpec +from dagster._core.definitions.metadata.metadata_set import NamespacedMetadataSet from dagster._record import as_dict, record from dagster._serdes.serdes import whitelist_for_serdes from dagster._utils.cached_method import cached_method @@ -248,6 +249,14 @@ def to_fivetran_connector_table_props_data(self) -> Sequence[FivetranConnectorTa return data +class FivetranMetadataSet(NamespacedMetadataSet): + connector_id: Optional[str] = None + + @classmethod + def namespace(cls) -> str: + return "dagster-fivetran" + + class DagsterFivetranTranslator: """Translator class which converts a `FivetranConnectorTableProps` object into AssetSpecs. Subclass this class to implement custom logic for each type of Fivetran content. @@ -275,8 +284,10 @@ def get_asset_spec(self, props: FivetranConnectorTableProps) -> AssetSpec: table=table_name, ) + augmented_metadata = {**metadata, **FivetranMetadataSet(connector_id=props.connector_id)} + return AssetSpec( key=AssetKey(props.table.split(".")), - metadata=metadata, + metadata=augmented_metadata, kinds={"fivetran", *({props.service} if props.service else set())}, ) diff --git a/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/experimental/conftest.py b/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/experimental/conftest.py index 691af75950039..7fcdc69cc9cc2 100644 --- a/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/experimental/conftest.py +++ b/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/experimental/conftest.py @@ -14,6 +14,7 @@ TEST_ACCOUNT_ID = "test_account_id" TEST_API_KEY = "test_api_key" TEST_API_SECRET = "test_api_secret" +TEST_ANOTHER_ACCOUNT_ID = "test_another_account_id" # Taken from Fivetran API documentation # https://fivetran.com/docs/rest-api/api-reference/groups/list-all-groups diff --git a/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/experimental/test_asset_specs.py b/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/experimental/test_asset_specs.py index b36cf38cb40a8..6c1a83a7d7a0a 100644 --- a/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/experimental/test_asset_specs.py +++ b/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/experimental/test_asset_specs.py @@ -2,9 +2,12 @@ from dagster._config.field_utils import EnvVar from dagster._core.test_utils import environ from dagster_fivetran import FivetranWorkspace, load_fivetran_asset_specs +from dagster_fivetran.asset_defs import build_fivetran_assets_definitions +from dagster_fivetran.translator import FivetranMetadataSet from dagster_fivetran_tests.experimental.conftest import ( TEST_ACCOUNT_ID, + TEST_ANOTHER_ACCOUNT_ID, TEST_API_KEY, TEST_API_SECRET, ) @@ -45,3 +48,67 @@ def test_translator_spec( "schema_name_in_destination_1", "table_name_in_destination_1", ] + + first_asset_metadata = next(asset.metadata for asset in all_assets) + assert FivetranMetadataSet.extract(first_asset_metadata).connector_id == "connector_id" + + +def test_cached_load_spec_single_resource( + fetch_workspace_data_api_mocks: responses.RequestsMock, +) -> None: + with environ({"FIVETRAN_API_KEY": TEST_API_KEY, "FIVETRAN_API_SECRET": TEST_API_SECRET}): + workspace = FivetranWorkspace( + account_id=TEST_ACCOUNT_ID, + api_key=EnvVar("FIVETRAN_API_KEY"), + api_secret=EnvVar("FIVETRAN_API_SECRET"), + ) + + # load asset specs a first time + workspace.load_asset_specs() + assert len(fetch_workspace_data_api_mocks.calls) == 4 + + # load asset specs a first time, no additional calls are made + workspace.load_asset_specs() + assert len(fetch_workspace_data_api_mocks.calls) == 4 + + +def test_cached_load_spec_multiple_resources( + fetch_workspace_data_api_mocks: responses.RequestsMock, +) -> None: + with environ({"FIVETRAN_API_KEY": TEST_API_KEY, "FIVETRAN_API_SECRET": TEST_API_SECRET}): + workspace = FivetranWorkspace( + account_id=TEST_ACCOUNT_ID, + api_key=EnvVar("FIVETRAN_API_KEY"), + api_secret=EnvVar("FIVETRAN_API_SECRET"), + ) + + another_workspace = FivetranWorkspace( + account_id=TEST_ANOTHER_ACCOUNT_ID, + api_key=EnvVar("FIVETRAN_API_KEY"), + api_secret=EnvVar("FIVETRAN_API_SECRET"), + ) + + # load asset specs with a resource + workspace.load_asset_specs() + assert len(fetch_workspace_data_api_mocks.calls) == 4 + + # load asset specs with another resource, + # additional calls are made to load its specs + another_workspace.load_asset_specs() + assert len(fetch_workspace_data_api_mocks.calls) == 4 + 4 + + +def test_cached_load_spec_with_asset_factory( + fetch_workspace_data_api_mocks: responses.RequestsMock, +) -> None: + with environ({"FIVETRAN_API_KEY": TEST_API_KEY, "FIVETRAN_API_SECRET": TEST_API_SECRET}): + resource = FivetranWorkspace( + account_id=TEST_ACCOUNT_ID, + api_key=EnvVar("FIVETRAN_API_KEY"), + api_secret=EnvVar("FIVETRAN_API_SECRET"), + ) + + # build_fivetran_assets_definitions calls load_fivetran_asset_specs to get the connector IDs, + # then load_fivetran_asset_specs is called once per connector ID in fivetran_assets + build_fivetran_assets_definitions(workspace=resource) + assert len(fetch_workspace_data_api_mocks.calls) == 4