diff --git a/python_modules/libraries/dagster-fivetran/dagster_fivetran/resources.py b/python_modules/libraries/dagster-fivetran/dagster_fivetran/resources.py index 579bfd8206a70..43e91d6dd856a 100644 --- a/python_modules/libraries/dagster-fivetran/dagster_fivetran/resources.py +++ b/python_modules/libraries/dagster-fivetran/dagster_fivetran/resources.py @@ -33,6 +33,7 @@ DagsterFivetranTranslator, FivetranConnector, FivetranDestination, + FivetranSchemaConfig, FivetranWorkspaceData, ) from dagster_fivetran.types import FivetranOutput @@ -632,8 +633,8 @@ def fetch_fivetran_workspace_data( FivetranWorkspaceData: A snapshot of the Fivetran workspace's content. """ connectors_by_id = {} - destinations_by_id = {} + schema_configs_by_connector_id = {} client = self.get_client() groups = client.get_groups()["items"] @@ -649,19 +650,28 @@ def fetch_fivetran_workspace_data( connectors_details = client.get_connectors_for_group(group_id=group_id)["items"] for connector_details in connectors_details: - connector = FivetranConnector.from_api_details( + connector = FivetranConnector.from_connector_details( connector_details=connector_details, - destination_details=destination_details, - schema_config_details=client.get_schema_config_for_connector( - connector_id=connector_details["id"] - ), ) - if not connector.has_bad_status: - connectors_by_id[connector.id] = connector + if connector.has_bad_setup_state: + continue + + connectors_by_id[connector.id] = connector + + schema_config_details = client.get_schema_config_for_connector( + connector_id=connector.id + ) + schema_config = FivetranSchemaConfig.from_schema_config_details( + schema_config_details=schema_config_details + ) + + schema_configs_by_connector_id[connector.id] = schema_config return FivetranWorkspaceData( - connectors_by_id=connectors_by_id, destinations_by_id=destinations_by_id + connectors_by_id=connectors_by_id, + destinations_by_id=destinations_by_id, + schema_configs_by_connector_id=schema_configs_by_connector_id, ) diff --git a/python_modules/libraries/dagster-fivetran/dagster_fivetran/translator.py b/python_modules/libraries/dagster-fivetran/dagster_fivetran/translator.py index baf65b93035f6..9a28107d7a7b2 100644 --- a/python_modules/libraries/dagster-fivetran/dagster_fivetran/translator.py +++ b/python_modules/libraries/dagster-fivetran/dagster_fivetran/translator.py @@ -20,6 +20,7 @@ class FivetranConnectorTableProps(NamedTuple): service: Optional[str] +@whitelist_for_serdes class FivetranConnectorSetupStateType(Enum): """Enum representing each setup state for a connector in Fivetran's ontology.""" @@ -36,34 +37,32 @@ class FivetranConnector: id: str name: str service: str - schema_config: "FivetranSchemaConfig" - destination_id: str - has_bad_status: bool + group_id: str + setup_state: FivetranConnectorSetupStateType @property def url(self) -> str: return f"https://fivetran.com/dashboard/connectors/{self.service}/{self.name}" + @property + def destination_id(self) -> str: + return self.group_id + + @property + def has_bad_setup_state(self) -> bool: + return self.setup_state is not FivetranConnectorSetupStateType.CONNECTED + @classmethod - def from_api_details( + def from_connector_details( cls, connector_details: Mapping[str, Any], - destination_details: Mapping[str, Any], - schema_config_details: Mapping[str, Any], ) -> "FivetranConnector": return cls( id=connector_details["id"], name=connector_details["schema"], service=connector_details["service"], - schema_config=FivetranSchemaConfig.from_schema_config_details( - schema_config_details=schema_config_details - ), - destination_id=destination_details["id"], - has_bad_status=connector_details["status"]["setup_state"] - in ( - FivetranConnectorSetupStateType.INCOMPLETE, - FivetranConnectorSetupStateType.BROKEN, - ), + group_id=connector_details["group_id"], + setup_state=FivetranConnectorSetupStateType(connector_details["status"]["setup_state"]), ) @@ -155,6 +154,7 @@ class FivetranWorkspaceData: connectors_by_id: Mapping[str, FivetranConnector] destinations_by_id: Mapping[str, FivetranDestination] + schema_configs_by_connector_id: Mapping[str, FivetranSchemaConfig] @cached_method def to_fivetran_connector_table_props_data(self) -> Sequence[FivetranConnectorTableProps]: @@ -163,10 +163,11 @@ def to_fivetran_connector_table_props_data(self) -> Sequence[FivetranConnectorTa """ data: List[FivetranConnectorTableProps] = [] - for connector_id, connector in self.connectors_by_id.items(): + for connector in self.connectors_by_id.values(): destination = self.destinations_by_id[connector.destination_id] + schema_config = self.schema_configs_by_connector_id[connector.id] - for schema in connector.schema_config.schemas.values(): + for schema in schema_config.schemas.values(): if schema.enabled: for table in schema.tables.values(): if table.enabled: @@ -176,10 +177,10 @@ def to_fivetran_connector_table_props_data(self) -> Sequence[FivetranConnectorTa schema_name=schema.name_in_destination, table_name=table.name_in_destination, ), - connector_id=connector_id, + connector_id=connector.id, name=connector.name, connector_url=connector.url, - schema_config=connector.schema_config, + schema_config=schema_config, database=destination.database, service=destination.service, )