From 9e1a519196cbe4f51e40ab854d0325dba949859c Mon Sep 17 00:00:00 2001 From: Maxime Armstrong Date: Fri, 8 Nov 2024 11:42:24 -0500 Subject: [PATCH] [7/n][dagster-fivetran] Implement load_fivetran_asset_specs --- .../dagster_fivetran/__init__.py | 1 + .../dagster_fivetran/resources.py | 28 +++++++++++++++++ .../dagster_fivetran/translator.py | 2 ++ .../experimental/test_asset_specs.py | 31 ++++++++++++++++++- 4 files changed, 61 insertions(+), 1 deletion(-) diff --git a/python_modules/libraries/dagster-fivetran/dagster_fivetran/__init__.py b/python_modules/libraries/dagster-fivetran/dagster_fivetran/__init__.py index 962a739de8c5c..58ea1649585b1 100644 --- a/python_modules/libraries/dagster-fivetran/dagster_fivetran/__init__.py +++ b/python_modules/libraries/dagster-fivetran/dagster_fivetran/__init__.py @@ -12,6 +12,7 @@ FivetranResource as FivetranResource, FivetranWorkspace as FivetranWorkspace, fivetran_resource as fivetran_resource, + load_fivetran_asset_specs as load_fivetran_asset_specs, ) from dagster_fivetran.translator import DagsterFivetranTranslator as DagsterFivetranTranslator from dagster_fivetran.types import FivetranOutput as FivetranOutput diff --git a/python_modules/libraries/dagster-fivetran/dagster_fivetran/resources.py b/python_modules/libraries/dagster-fivetran/dagster_fivetran/resources.py index 6221a37430a68..aa63557934379 100644 --- a/python_modules/libraries/dagster-fivetran/dagster_fivetran/resources.py +++ b/python_modules/libraries/dagster-fivetran/dagster_fivetran/resources.py @@ -20,6 +20,7 @@ ) from dagster._annotations import experimental from dagster._config.pythonic_config import ConfigurableResource +from dagster._core.definitions.asset_spec import AssetSpec from dagster._core.definitions.definitions_load_context import StateBackedDefinitionsLoader from dagster._core.definitions.resource_definition import dagster_maintained_resource from dagster._record import record @@ -683,6 +684,33 @@ def fetch_fivetran_workspace_data( return FivetranWorkspaceData.from_content_data(connectors + destinations) +@experimental +def load_fivetran_asset_specs( + workspace: FivetranWorkspace, + dagster_fivetran_translator: Type[DagsterFivetranTranslator] = DagsterFivetranTranslator, +) -> Sequence[AssetSpec]: + """Returns a list of AssetSpecs representing the Fivetran content in the workspace. + + Args: + workspace (FivetranWorkspace): The Fivetran workspace to fetch assets from. + dagster_fivetran_translator (Type[DagsterFivetranTranslator]): The translator to use + to convert Fivetran content into AssetSpecs. Defaults to DagsterFivetranTranslator. + + Returns: + List[AssetSpec]: The set of assets representing the Tableau content in the workspace. + """ + with workspace.process_config_and_initialize_cm() as initialized_workspace: + return check.is_list( + FivetranWorkspaceDefsLoader( + workspace=initialized_workspace, + translator_cls=dagster_fivetran_translator, + ) + .build_defs() + .assets, + AssetSpec, + ) + + @record class FivetranWorkspaceDefsLoader(StateBackedDefinitionsLoader[Mapping[str, Any]]): workspace: FivetranWorkspace diff --git a/python_modules/libraries/dagster-fivetran/dagster_fivetran/translator.py b/python_modules/libraries/dagster-fivetran/dagster_fivetran/translator.py index 2bcf970cb1dca..1f1d87b03df13 100644 --- a/python_modules/libraries/dagster-fivetran/dagster_fivetran/translator.py +++ b/python_modules/libraries/dagster-fivetran/dagster_fivetran/translator.py @@ -24,6 +24,7 @@ class FivetranConnectorTableProps(NamedTuple): service: Optional[str] +@whitelist_for_serdes class FivetranContentType(Enum): """Enum representing each object in Fivetran's ontology.""" @@ -42,6 +43,7 @@ class FivetranContentData: properties: Mapping[str, Any] +@whitelist_for_serdes @record class FivetranWorkspaceData: """A record representing all content in a Fivetran workspace. diff --git a/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/experimental/test_asset_specs.py b/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/experimental/test_asset_specs.py index 8798b9f2422e0..9a0154665f2c9 100644 --- a/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/experimental/test_asset_specs.py +++ b/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/experimental/test_asset_specs.py @@ -1,5 +1,7 @@ import responses -from dagster_fivetran import FivetranWorkspace +from dagster._config.field_utils import EnvVar +from dagster._core.test_utils import environ +from dagster_fivetran import FivetranWorkspace, load_fivetran_asset_specs def test_fetch_fivetran_workspace_data( @@ -13,3 +15,30 @@ def test_fetch_fivetran_workspace_data( actual_workspace_data = resource.fetch_fivetran_workspace_data() assert len(actual_workspace_data.connectors_by_id) == 1 assert len(actual_workspace_data.destinations_by_id) == 1 + + +def test_translator_spec(fetch_workspace_data_api_mocks: responses.RequestsMock) -> None: + account_id = "fake_account_id" + api_key = uuid.uuid4().hex + api_secret = uuid.uuid4().hex + + with environ({"FIVETRAN_API_KEY": api_key, "FIVETRAN_API_SECRET": api_secret}): + resource = FivetranWorkspace( + account_id=account_id, + api_key=EnvVar("FIVETRAN_API_KEY"), + api_secret=EnvVar("FIVETRAN_API_SECRET"), + ) + + all_assets = load_fivetran_asset_specs(resource) + all_assets_keys = [asset.key for asset in all_assets] + + # 4 tables for the connector + assert len(all_assets) == 4 + assert len(all_assets_keys) == 4 + + # Sanity check outputs, translator tests cover details here + first_asset_key = next(key for key in all_assets_keys) + assert first_asset_key.path == [ + "schema_name_in_destination_1", + "table_name_in_destination_1", + ]