diff --git a/python_modules/libraries/dagster-fivetran/dagster_fivetran/resources.py b/python_modules/libraries/dagster-fivetran/dagster_fivetran/resources.py index 2c9072934bfd6..a4fb0af2ce432 100644 --- a/python_modules/libraries/dagster-fivetran/dagster_fivetran/resources.py +++ b/python_modules/libraries/dagster-fivetran/dagster_fivetran/resources.py @@ -668,6 +668,61 @@ def _start_sync(self, request_fn: Callable[[], Mapping[str, Any]], connector_id: " UI: " + connector.url ) + def poll_sync( + self, + connector_id: str, + previous_sync_completed_at: datetime, + poll_interval: float = DEFAULT_POLL_INTERVAL, + poll_timeout: Optional[float] = None, + ) -> Mapping[str, Any]: + """Given a Fivetran connector and the timestamp at which the previous sync completed, poll + until the next sync completes. + + The previous sync completion time is necessary because the only way to tell when a sync + completes is when this value changes. + + Args: + connector_id (str): The Fivetran Connector ID. You can retrieve this value from the + "Setup" tab of a given connector in the Fivetran UI. + previous_sync_completed_at (datetime.datetime): The datetime of the previous completed sync + (successful or otherwise) for this connector, prior to running this method. + poll_interval (float): The time (in seconds) that will be waited between successive polls. + poll_timeout (float): The maximum time that will wait before this operation is timed + out. By default, this will never time out. + + Returns: + Dict[str, Any]: Parsed json data representing the API response. + """ + poll_start = datetime.now() + while True: + connector_details = self.get_connector_details(connector_id) + connector = FivetranConnector.from_connector_details( + connector_details=connector_details + ) + self._log.info(f"Polled '{connector_id}'. Status: [{connector.sync_state}]") + + if connector.last_sync_completed_at > previous_sync_completed_at: + break + + if poll_timeout and datetime.now() > poll_start + timedelta(seconds=poll_timeout): + raise Failure( + f"Sync for connector '{connector_id}' timed out after " + f"{datetime.now() - poll_start}." + ) + + # Sleep for the configured time interval before polling again. + time.sleep(poll_interval) + + if not connector.is_last_sync_successful: + raise Failure( + f"Sync for connector '{connector_id}' failed!", + metadata={ + "connector_details": MetadataValue.json(connector_details), + "log_url": MetadataValue.url(connector.url), + }, + ) + return connector_details + @experimental class FivetranWorkspace(ConfigurableResource): diff --git a/python_modules/libraries/dagster-fivetran/dagster_fivetran/translator.py b/python_modules/libraries/dagster-fivetran/dagster_fivetran/translator.py index 253113051c7f5..09dc7597e4373 100644 --- a/python_modules/libraries/dagster-fivetran/dagster_fivetran/translator.py +++ b/python_modules/libraries/dagster-fivetran/dagster_fivetran/translator.py @@ -1,3 +1,4 @@ +from datetime import datetime from enum import Enum from typing import Any, List, Mapping, NamedTuple, Optional, Sequence @@ -7,9 +8,12 @@ from dagster._record import as_dict, record from dagster._serdes.serdes import whitelist_for_serdes from dagster._utils.cached_method import cached_method +from dagster._vendored.dateutil import parser from dagster_fivetran.utils import get_fivetran_connector_table_name, metadata_for_table +MIN_TIME_STR = "0001-01-01 00:00:00+00" + class FivetranConnectorTableProps(NamedTuple): table: str @@ -46,7 +50,10 @@ class FivetranConnector: service: str group_id: str setup_state: str + sync_state: str paused: bool + succeeded_at: Optional[str] + failed_at: Optional[str] @property def url(self) -> str: @@ -64,6 +71,32 @@ def is_connected(self) -> bool: def is_paused(self) -> bool: return self.paused + @property + def last_sync_completed_at(self) -> datetime: + """Gets the datetime of the last completed sync of the Fivetran connector. + + Returns: + datetime.datetime: + The datetime of the last completed sync of the Fivetran connector. + """ + succeeded_at = parser.parse(self.succeeded_at or MIN_TIME_STR) + failed_at = parser.parse(self.failed_at or MIN_TIME_STR) + + return max(succeeded_at, failed_at) + + @property + def is_last_sync_successful(self) -> bool: + """Gets a boolean representing whether the last completed sync of the Fivetran connector was successful or not. + + Returns: + bool: + Whether the last completed sync of the Fivetran connector was successful or not. + """ + succeeded_at = parser.parse(self.succeeded_at or MIN_TIME_STR) + failed_at = parser.parse(self.failed_at or MIN_TIME_STR) + + return succeeded_at > failed_at + def validate_syncable(self) -> bool: """Confirms that the connector can be sync. Will raise a Failure in the event that the connector is either paused or not fully set up. @@ -85,7 +118,10 @@ def from_connector_details( service=connector_details["service"], group_id=connector_details["group_id"], setup_state=connector_details["status"]["setup_state"], + sync_state=connector_details["status"]["sync_state"], paused=connector_details["paused"], + succeeded_at=connector_details.get("succeeded_at"), + failed_at=connector_details.get("failed_at"), ) diff --git a/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/experimental/conftest.py b/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/experimental/conftest.py index 267b536263d13..86d10da45c82b 100644 --- a/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/experimental/conftest.py +++ b/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/experimental/conftest.py @@ -1,4 +1,4 @@ -from typing import Iterator +from typing import Any, Iterator, Mapping import pytest import responses @@ -8,6 +8,13 @@ FIVETRAN_CONNECTOR_ENDPOINT, ) +TEST_MAX_TIME_STR = "2024-12-01T15:45:29.013729Z" +TEST_PREVIOUS_MAX_TIME_STR = "2024-12-01T15:43:29.013729Z" + +TEST_ACCOUNT_ID = "test_account_id" +TEST_API_KEY = "test_api_key" +TEST_API_SECRET = "test_api_secret" + # Taken from Fivetran API documentation # https://fivetran.com/docs/rest-api/api-reference/groups/list-all-groups SAMPLE_GROUPS = { @@ -61,7 +68,7 @@ }, "config": {"property1": {}, "property2": {}}, "daily_sync_time": "14:00", - "succeeded_at": "2024-12-01T15:43:29.013729Z", + "succeeded_at": "2024-12-01T15:45:29.013729Z", "sync_frequency": 360, "group_id": "my_group_destination_id", "connected_by": "user_id", @@ -75,7 +82,7 @@ ], "source_sync_details": {}, "service_version": 0, - "created_at": "2024-12-01T15:43:29.013729Z", + "created_at": "2024-12-01T15:41:29.013729Z", "failed_at": "2024-12-01T15:43:29.013729Z", "private_link_id": "string", "proxy_agent_id": "string", @@ -148,72 +155,79 @@ }, } + # Taken from Fivetran API documentation # https://fivetran.com/docs/rest-api/api-reference/connectors/connector-details -SAMPLE_CONNECTOR_DETAILS = { - "code": "Success", - "message": "Operation performed.", - "data": { - "id": "connector_id", - "service": "15five", - "schema": "schema.table", - "paused": False, - "status": { - "tasks": [ - { - "code": "resync_table_warning", - "message": "Resync Table Warning", - "details": "string", - } - ], - "warnings": [ +# The sample is parameterized to test the poll method +def get_sample_connection_details(succeeded_at: str, failed_at: str) -> Mapping[str, Any]: + return { + "code": "Success", + "message": "Operation performed.", + "data": { + "id": "connector_id", + "service": "15five", + "schema": "schema.table", + "paused": False, + "status": { + "tasks": [ + { + "code": "resync_table_warning", + "message": "Resync Table Warning", + "details": "string", + } + ], + "warnings": [ + { + "code": "resync_table_warning", + "message": "Resync Table Warning", + "details": "string", + } + ], + "schema_status": "ready", + "update_state": "delayed", + "setup_state": "connected", + "sync_state": "scheduled", + "is_historical_sync": False, + "rescheduled_for": "2024-12-01T15:43:29.013729Z", + }, + "daily_sync_time": "14:00", + "succeeded_at": succeeded_at, + "sync_frequency": 1440, + "group_id": "my_group_destination_id", + "connected_by": "user_id", + "setup_tests": [ { - "code": "resync_table_warning", - "message": "Resync Table Warning", - "details": "string", + "title": "Test Title", + "status": "PASSED", + "message": "Test Passed", + "details": "Test Details", } ], - "schema_status": "ready", - "update_state": "delayed", - "setup_state": "connected", - "sync_state": "scheduled", - "is_historical_sync": False, - "rescheduled_for": "2024-12-01T15:43:29.013729Z", - }, - "daily_sync_time": "14:00", - "succeeded_at": "2024-03-17T12:31:40.870504Z", - "sync_frequency": 1440, - "group_id": "my_group_destination_id", - "connected_by": "user_id", - "setup_tests": [ - { - "title": "Test Title", - "status": "PASSED", - "message": "Test Passed", - "details": "Test Details", - } - ], - "source_sync_details": {}, - "service_version": 0, - "created_at": "2023-12-01T15:43:29.013729Z", - "failed_at": "2024-04-01T18:13:25.043659Z", - "private_link_id": "private_link_id", - "proxy_agent_id": "proxy_agent_id", - "networking_method": "Directly", - "connect_card": { - "token": "eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJkIjp7ImxvZ2luIjp0cnVlLCJ1c2VyIjoiX2FjY291bnR3b3J0aHkiLCJhY2NvdW50IjoiX21vb25iZWFtX2FjYyIsImdyb3VwIjoiX21vb25iZWFtIiwiY29ubmVjdG9yIjoiY29iYWx0X2VsZXZhdGlvbiIsIm1ldGhvZCI6IlBiZkNhcmQiLCJpZGVudGl0eSI6ZmFsc2V9LCJpYXQiOjE2Njc4MzA2MzZ9.YUMGUbzxW96xsKJLo4bTorqzx8Q19GTrUi3WFRFM8BU", - "uri": "https://fivetran.com/connect-card/setup?auth=eyJ0eXAiOiJKV1QiLCJh...", + "source_sync_details": {}, + "service_version": 0, + "created_at": "2024-12-01T15:41:29.013729Z", + "failed_at": failed_at, + "private_link_id": "private_link_id", + "proxy_agent_id": "proxy_agent_id", + "networking_method": "Directly", + "connect_card": { + "token": "eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJkIjp7ImxvZ2luIjp0cnVlLCJ1c2VyIjoiX2FjY291bnR3b3J0aHkiLCJhY2NvdW50IjoiX21vb25iZWFtX2FjYyIsImdyb3VwIjoiX21vb25iZWFtIiwiY29ubmVjdG9yIjoiY29iYWx0X2VsZXZhdGlvbiIsIm1ldGhvZCI6IlBiZkNhcmQiLCJpZGVudGl0eSI6ZmFsc2V9LCJpYXQiOjE2Njc4MzA2MzZ9.YUMGUbzxW96xsKJLo4bTorqzx8Q19GTrUi3WFRFM8BU", + "uri": "https://fivetran.com/connect-card/setup?auth=eyJ0eXAiOiJKV1QiLCJh...", + }, + "pause_after_trial": False, + "data_delay_threshold": 0, + "data_delay_sensitivity": "NORMAL", + "schedule_type": "auto", + "local_processing_agent_id": "local_processing_agent_id", + "connect_card_config": { + "redirect_uri": "https://your.site/path", + "hide_setup_guide": True, + }, + "hybrid_deployment_agent_id": "hybrid_deployment_agent_id", + "config": {"api_key": "your_15five_api_key"}, }, - "pause_after_trial": False, - "data_delay_threshold": 0, - "data_delay_sensitivity": "NORMAL", - "schedule_type": "auto", - "local_processing_agent_id": "local_processing_agent_id", - "connect_card_config": {"redirect_uri": "https://your.site/path", "hide_setup_guide": True}, - "hybrid_deployment_agent_id": "hybrid_deployment_agent_id", - "config": {"api_key": "your_15five_api_key"}, - }, -} + } + # Taken from Fivetran API documentation # https://fivetran.com/docs/rest-api/api-reference/connector-schema/connector-schema-config @@ -382,10 +396,6 @@ SAMPLE_SUCCESS_MESSAGE = {"code": "Success", "message": "Operation performed."} -TEST_ACCOUNT_ID = "test_account_id" -TEST_API_KEY = "test_api_key" -TEST_API_SECRET = "test_api_secret" - @pytest.fixture(name="connector_id") def connector_id_fixture() -> str: @@ -452,13 +462,17 @@ def all_api_mocks_fixture( fetch_workspace_data_api_mocks.add( method=responses.GET, url=f"{FIVETRAN_API_BASE}/{FIVETRAN_API_VERSION}/{FIVETRAN_CONNECTOR_ENDPOINT}/{connector_id}", - json=SAMPLE_CONNECTOR_DETAILS, + json=get_sample_connection_details( + succeeded_at=TEST_MAX_TIME_STR, failed_at=TEST_PREVIOUS_MAX_TIME_STR + ), status=200, ) fetch_workspace_data_api_mocks.add( method=responses.PATCH, url=f"{FIVETRAN_API_BASE}/{FIVETRAN_API_VERSION}/{FIVETRAN_CONNECTOR_ENDPOINT}/{connector_id}", - json=SAMPLE_CONNECTOR_DETAILS, + json=get_sample_connection_details( + succeeded_at=TEST_MAX_TIME_STR, failed_at=TEST_PREVIOUS_MAX_TIME_STR + ), status=200, ) fetch_workspace_data_api_mocks.add( diff --git a/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/experimental/test_resources.py b/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/experimental/test_resources.py index 43b1bd744918e..a2ee8f6f5fa02 100644 --- a/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/experimental/test_resources.py +++ b/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/experimental/test_resources.py @@ -1,10 +1,20 @@ +import pytest import responses +from dagster import Failure +from dagster._vendored.dateutil import parser from dagster_fivetran import FivetranWorkspace +from dagster_fivetran.translator import MIN_TIME_STR from dagster_fivetran_tests.experimental.conftest import ( + FIVETRAN_API_BASE, + FIVETRAN_API_VERSION, + FIVETRAN_CONNECTOR_ENDPOINT, TEST_ACCOUNT_ID, TEST_API_KEY, TEST_API_SECRET, + TEST_MAX_TIME_STR, + TEST_PREVIOUS_MAX_TIME_STR, + get_sample_connection_details, ) @@ -59,3 +69,42 @@ def test_basic_resource_request( client.start_resync(connector_id=connector_id, resync_parameters={"property1": ["string"]}) assert len(all_api_mocks.calls) == 3 assert f"{connector_id}/schemas/tables/resync" in all_api_mocks.calls[2].request.url + + # poll calls + # Succeeded poll + all_api_mocks.calls.reset() + client.poll_sync( + connector_id=connector_id, previous_sync_completed_at=parser.parse(MIN_TIME_STR) + ) + assert len(all_api_mocks.calls) == 1 + + # Timed out poll + all_api_mocks.calls.reset() + with pytest.raises(Failure, match=f"Sync for connector '{connector_id}' timed out"): + client.poll_sync( + connector_id=connector_id, + # The poll process will time out because the value of + # `FivetranConnector.last_sync_completed_at` does not change in the test + previous_sync_completed_at=parser.parse(TEST_MAX_TIME_STR), + poll_timeout=2, + poll_interval=1, + ) + + # Failed poll + all_api_mocks.calls.reset() + # Replace the mock API call and set `failed_at` as more recent that `succeeded_at` + all_api_mocks.replace( + method_or_response=responses.GET, + url=f"{FIVETRAN_API_BASE}/{FIVETRAN_API_VERSION}/{FIVETRAN_CONNECTOR_ENDPOINT}/{connector_id}", + json=get_sample_connection_details( + succeeded_at=TEST_PREVIOUS_MAX_TIME_STR, failed_at=TEST_MAX_TIME_STR + ), + status=200, + ) + with pytest.raises(Failure, match=f"Sync for connector '{connector_id}' failed!"): + client.poll_sync( + connector_id=connector_id, + previous_sync_completed_at=parser.parse(MIN_TIME_STR), + poll_timeout=2, + poll_interval=1, + )