diff --git a/python_modules/libraries/dagster-fivetran/dagster_fivetran/__init__.py b/python_modules/libraries/dagster-fivetran/dagster_fivetran/__init__.py index 962a739de8c5c..58ea1649585b1 100644 --- a/python_modules/libraries/dagster-fivetran/dagster_fivetran/__init__.py +++ b/python_modules/libraries/dagster-fivetran/dagster_fivetran/__init__.py @@ -12,6 +12,7 @@ FivetranResource as FivetranResource, FivetranWorkspace as FivetranWorkspace, fivetran_resource as fivetran_resource, + load_fivetran_asset_specs as load_fivetran_asset_specs, ) from dagster_fivetran.translator import DagsterFivetranTranslator as DagsterFivetranTranslator from dagster_fivetran.types import FivetranOutput as FivetranOutput diff --git a/python_modules/libraries/dagster-fivetran/dagster_fivetran/resources.py b/python_modules/libraries/dagster-fivetran/dagster_fivetran/resources.py index bf96dc253c6f8..6ca2d8e65788e 100644 --- a/python_modules/libraries/dagster-fivetran/dagster_fivetran/resources.py +++ b/python_modules/libraries/dagster-fivetran/dagster_fivetran/resources.py @@ -19,6 +19,7 @@ ) from dagster._annotations import experimental from dagster._config.pythonic_config import ConfigurableResource +from dagster._core.definitions.asset_spec import AssetSpec from dagster._core.definitions.definitions_load_context import StateBackedDefinitionsLoader from dagster._core.definitions.resource_definition import dagster_maintained_resource from dagster._record import record @@ -671,6 +672,33 @@ def fetch_fivetran_workspace_data( return FivetranWorkspaceData.from_content_data(connectors + destinations) +@experimental +def load_fivetran_asset_specs( + workspace: FivetranWorkspace, + dagster_fivetran_translator: Type[DagsterFivetranTranslator] = DagsterFivetranTranslator, +) -> Sequence[AssetSpec]: + """Returns a list of AssetSpecs representing the Fivetran content in the workspace. + + Args: + workspace (FivetranWorkspace): The Fivetran workspace to fetch assets from. + dagster_fivetran_translator (Type[DagsterFivetranTranslator]): The translator to use + to convert Fivetran content into AssetSpecs. Defaults to DagsterFivetranTranslator. + + Returns: + List[AssetSpec]: The set of assets representing the Tableau content in the workspace. + """ + with workspace.process_config_and_initialize_cm() as initialized_workspace: + return check.is_list( + FivetranWorkspaceDefsLoader( + workspace=initialized_workspace, + translator_cls=dagster_fivetran_translator, + ) + .build_defs() + .assets, + AssetSpec, + ) + + @record class FivetranWorkspaceDefsLoader(StateBackedDefinitionsLoader[Mapping[str, Any]]): workspace: FivetranWorkspace diff --git a/python_modules/libraries/dagster-fivetran/dagster_fivetran/translator.py b/python_modules/libraries/dagster-fivetran/dagster_fivetran/translator.py index 49eb9a2ad363a..724541893eff4 100644 --- a/python_modules/libraries/dagster-fivetran/dagster_fivetran/translator.py +++ b/python_modules/libraries/dagster-fivetran/dagster_fivetran/translator.py @@ -19,6 +19,7 @@ class FivetranConnectorTableProps(NamedTuple): service: Optional[str] +@whitelist_for_serdes class FivetranContentType(Enum): """Enum representing each object in Fivetran's ontology.""" @@ -37,6 +38,7 @@ class FivetranContentData: properties: Mapping[str, Any] +@whitelist_for_serdes @record class FivetranWorkspaceData: """A record representing all content in a Fivetran workspace. diff --git a/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/experimental/test_asset_specs.py b/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/experimental/test_asset_specs.py index 0f83e4144e5a3..64d36a61204a6 100644 --- a/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/experimental/test_asset_specs.py +++ b/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/experimental/test_asset_specs.py @@ -1,7 +1,9 @@ import uuid from typing import Callable -from dagster_fivetran import FivetranWorkspace +from dagster._config.field_utils import EnvVar +from dagster._core.test_utils import environ +from dagster_fivetran import FivetranWorkspace, load_fivetran_asset_specs def test_fetch_fivetran_workspace_data(workspace_data_api_mocks_fn: Callable) -> None: @@ -15,3 +17,31 @@ def test_fetch_fivetran_workspace_data(workspace_data_api_mocks_fn: Callable) -> actual_workspace_data = resource.fetch_fivetran_workspace_data() assert len(actual_workspace_data.connectors_by_id) == 1 assert len(actual_workspace_data.destinations_by_id) == 1 + + +def test_translator_spec(workspace_data_api_mocks_fn: Callable) -> None: + account_id = "fake_account_id" + api_key = uuid.uuid4().hex + api_secret = uuid.uuid4().hex + + with environ({"FIVETRAN_API_KEY": api_key, "FIVETRAN_API_SECRET": api_secret}): + resource = FivetranWorkspace( + account_id=account_id, + api_key=EnvVar("FIVETRAN_API_KEY"), + api_secret=EnvVar("FIVETRAN_API_SECRET"), + ) + + with workspace_data_api_mocks_fn(include_sync_endpoints=False): + all_assets = load_fivetran_asset_specs(resource) + all_assets_keys = [asset.key for asset in all_assets] + + # 4 tables for the connector + assert len(all_assets) == 4 + assert len(all_assets_keys) == 4 + + # Sanity check outputs, translator tests cover details here + first_asset_key = next(key for key in all_assets_keys) + assert first_asset_key.path == [ + "schema_name_in_destination_1", + "table_name_in_destination_1", + ]