diff --git a/python_modules/libraries/dagster-fivetran/dagster_fivetran/resources.py b/python_modules/libraries/dagster-fivetran/dagster_fivetran/resources.py index bce8012c9a7bd..579bfd8206a70 100644 --- a/python_modules/libraries/dagster-fivetran/dagster_fivetran/resources.py +++ b/python_modules/libraries/dagster-fivetran/dagster_fivetran/resources.py @@ -3,7 +3,6 @@ import logging import os import time -from enum import Enum from typing import Any, Mapping, Optional, Sequence, Tuple, Type from urllib.parse import urljoin @@ -51,14 +50,6 @@ FIVETRAN_RECONSTRUCTION_METADATA_KEY_PREFIX = "dagster-fivetran/reconstruction_metadata" -class FivetranConnectorSetupStateType(Enum): - """Enum representing each setup state for a connector in Fivetran's ontology.""" - - INCOMPLETE = "incomplete" - CONNECTED = "connected" - BROKEN = "broken" - - class FivetranResource(ConfigurableResource): """This class exposes methods on top of the Fivetran REST API.""" @@ -651,31 +642,24 @@ def fetch_fivetran_workspace_data( group_id = group["id"] destination_details = client.get_destination_details(destination_id=group_id) - destination_id = destination_details["id"] - - destinations_by_id[destination_id] = FivetranDestination.from_destination_details( + destination = FivetranDestination.from_destination_details( destination_details=destination_details ) + destinations_by_id[destination.id] = destination connectors_details = client.get_connectors_for_group(group_id=group_id)["items"] for connector_details in connectors_details: - if connector_details["status"]["setup_state"] in ( - FivetranConnectorSetupStateType.INCOMPLETE, - FivetranConnectorSetupStateType.BROKEN, - ): - continue - - connector_id = connector_details["id"] - schema_config_details = client.get_schema_config_for_connector( - connector_id=connector_id - ) - - connectors_by_id[connector_id] = FivetranConnector.from_api_details( + connector = FivetranConnector.from_api_details( connector_details=connector_details, destination_details=destination_details, - schema_config_details=schema_config_details, + schema_config_details=client.get_schema_config_for_connector( + connector_id=connector_details["id"] + ), ) + if not connector.has_bad_status: + connectors_by_id[connector.id] = connector + return FivetranWorkspaceData( connectors_by_id=connectors_by_id, destinations_by_id=destinations_by_id ) diff --git a/python_modules/libraries/dagster-fivetran/dagster_fivetran/translator.py b/python_modules/libraries/dagster-fivetran/dagster_fivetran/translator.py index 41af40db0a43b..baf65b93035f6 100644 --- a/python_modules/libraries/dagster-fivetran/dagster_fivetran/translator.py +++ b/python_modules/libraries/dagster-fivetran/dagster_fivetran/translator.py @@ -1,3 +1,4 @@ +from enum import Enum from typing import Any, List, Mapping, NamedTuple, Optional, Sequence from dagster._core.definitions.asset_key import AssetKey @@ -19,6 +20,14 @@ class FivetranConnectorTableProps(NamedTuple): service: Optional[str] +class FivetranConnectorSetupStateType(Enum): + """Enum representing each setup state for a connector in Fivetran's ontology.""" + + INCOMPLETE = "incomplete" + CONNECTED = "connected" + BROKEN = "broken" + + @whitelist_for_serdes @record class FivetranConnector: @@ -29,6 +38,7 @@ class FivetranConnector: service: str schema_config: "FivetranSchemaConfig" destination_id: str + has_bad_status: bool @property def url(self) -> str: @@ -49,6 +59,11 @@ def from_api_details( schema_config_details=schema_config_details ), destination_id=destination_details["id"], + has_bad_status=connector_details["status"]["setup_state"] + in ( + FivetranConnectorSetupStateType.INCOMPLETE, + FivetranConnectorSetupStateType.BROKEN, + ), )