diff --git a/python_modules/libraries/dagster-fivetran/dagster_fivetran/resources.py b/python_modules/libraries/dagster-fivetran/dagster_fivetran/resources.py index 0987ec770054a..6221a37430a68 100644 --- a/python_modules/libraries/dagster-fivetran/dagster_fivetran/resources.py +++ b/python_modules/libraries/dagster-fivetran/dagster_fivetran/resources.py @@ -4,11 +4,12 @@ import os import time from enum import Enum -from typing import Any, Mapping, Optional, Sequence, Tuple +from typing import Any, Mapping, Optional, Sequence, Tuple, Type from urllib.parse import urljoin import requests from dagster import ( + Definitions, Failure, InitResourceContext, MetadataValue, @@ -19,7 +20,9 @@ ) from dagster._annotations import experimental from dagster._config.pythonic_config import ConfigurableResource +from dagster._core.definitions.definitions_load_context import StateBackedDefinitionsLoader from dagster._core.definitions.resource_definition import dagster_maintained_resource +from dagster._record import record from dagster._utils.cached_method import cached_method from dagster._vendored.dateutil import parser from pydantic import Field, PrivateAttr @@ -27,6 +30,7 @@ from requests.exceptions import RequestException from dagster_fivetran.translator import ( + DagsterFivetranTranslator, FivetranContentData, FivetranContentType, FivetranWorkspaceData, @@ -43,6 +47,8 @@ # default polling interval (in seconds) DEFAULT_POLL_INTERVAL = 10 +FIVETRAN_RECONSTRUCTION_METADATA_KEY_PREFIX = "dagster-fivetran/reconstruction_metadata" + class FivetranConnectorSetupStateType(Enum): """Enum representing each setup state for a connector in Fivetran's ontology.""" @@ -599,6 +605,7 @@ class FivetranWorkspace(ConfigurableResource): to interact with Fivetran APIs. """ + account_id: str = Field(description="The Fivetran account ID.") api_key: str = Field(description="The Fivetran API key to use for this resource.") api_secret: str = Field(description="The Fivetran API secret to use for this resource.") request_max_retries: int = Field( @@ -674,3 +681,26 @@ def fetch_fivetran_workspace_data( ) return FivetranWorkspaceData.from_content_data(connectors + destinations) + + +@record +class FivetranWorkspaceDefsLoader(StateBackedDefinitionsLoader[Mapping[str, Any]]): + workspace: FivetranWorkspace + translator_cls: Type[DagsterFivetranTranslator] + + @property + def defs_key(self) -> str: + return f"{FIVETRAN_RECONSTRUCTION_METADATA_KEY_PREFIX}/{self.workspace.account_id}" + + def fetch_state(self) -> FivetranWorkspaceData: + return self.workspace.fetch_fivetran_workspace_data() + + def defs_from_state(self, state: FivetranWorkspaceData) -> Definitions: + translator = self.translator_cls() + + all_asset_specs = [ + translator.get_asset_spec(props) + for props in state.to_fivetran_connector_table_props_data() + ] + + return Definitions(assets=all_asset_specs) diff --git a/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/experimental/conftest.py b/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/experimental/conftest.py index 7c14b24e0ca4c..82b44013d6a67 100644 --- a/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/experimental/conftest.py +++ b/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/experimental/conftest.py @@ -380,6 +380,7 @@ }, } +TEST_ACCOUNT_ID = "test_account_id" TEST_API_KEY = "test_api_key" TEST_API_SECRET = "test_api_secret" diff --git a/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/experimental/test_asset_specs.py b/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/experimental/test_asset_specs.py index 1f9577b1fa5f4..c558f694f25a5 100644 --- a/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/experimental/test_asset_specs.py +++ b/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/experimental/test_asset_specs.py @@ -1,13 +1,19 @@ import responses from dagster_fivetran import FivetranWorkspace -from dagster_fivetran_tests.experimental.conftest import TEST_API_KEY, TEST_API_SECRET +from dagster_fivetran_tests.experimental.conftest import ( + TEST_ACCOUNT_ID, + TEST_API_KEY, + TEST_API_SECRET, +) def test_fetch_fivetran_workspace_data( fetch_workspace_data_api_mocks: responses.RequestsMock, ) -> None: - resource = FivetranWorkspace(api_key=TEST_API_KEY, api_secret=TEST_API_SECRET) + resource = FivetranWorkspace( + account_id=TEST_ACCOUNT_ID, api_key=TEST_API_KEY, api_secret=TEST_API_SECRET + ) actual_workspace_data = resource.fetch_fivetran_workspace_data() assert len(actual_workspace_data.connectors_by_id) == 1 diff --git a/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/experimental/test_resources.py b/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/experimental/test_resources.py index 28a02c5cdc15b..4a7b9db5298c2 100644 --- a/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/experimental/test_resources.py +++ b/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/experimental/test_resources.py @@ -1,7 +1,11 @@ import responses from dagster_fivetran import FivetranWorkspace -from dagster_fivetran_tests.experimental.conftest import TEST_API_KEY, TEST_API_SECRET +from dagster_fivetran_tests.experimental.conftest import ( + TEST_ACCOUNT_ID, + TEST_API_KEY, + TEST_API_SECRET, +) def test_basic_resource_request( @@ -10,7 +14,9 @@ def test_basic_resource_request( group_id: str, all_api_mocks: responses.RequestsMock, ) -> None: - resource = FivetranWorkspace(api_key=TEST_API_KEY, api_secret=TEST_API_SECRET) + resource = FivetranWorkspace( + account_id=TEST_ACCOUNT_ID, api_key=TEST_API_KEY, api_secret=TEST_API_SECRET + ) client = resource.get_client() client.get_connector_details(connector_id=connector_id) diff --git a/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/experimental/test_translator.py b/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/experimental/test_translator.py index ae78aee3ef18d..a437c30cdc340 100644 --- a/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/experimental/test_translator.py +++ b/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/experimental/test_translator.py @@ -2,13 +2,19 @@ from dagster_fivetran import FivetranWorkspace -from dagster_fivetran_tests.experimental.conftest import TEST_API_KEY, TEST_API_SECRET +from dagster_fivetran_tests.experimental.conftest import ( + TEST_ACCOUNT_ID, + TEST_API_KEY, + TEST_API_SECRET, +) def test_fivetran_workspace_data_to_fivetran_connector_table_props_data( fetch_workspace_data_api_mocks: Callable, ) -> None: - resource = FivetranWorkspace(api_key=TEST_API_KEY, api_secret=TEST_API_SECRET) + resource = FivetranWorkspace( + account_id=TEST_ACCOUNT_ID, api_key=TEST_API_KEY, api_secret=TEST_API_SECRET + ) actual_workspace_data = resource.fetch_fivetran_workspace_data() table_props_data = actual_workspace_data.to_fivetran_connector_table_props_data()