From 0e4f2ddaa3b79d4f1a39290c3cf8ee95ba9a369f Mon Sep 17 00:00:00 2001 From: Maxime Armstrong Date: Wed, 13 Nov 2024 17:19:39 -0500 Subject: [PATCH] [9/n][dagster-fivetran] Implement sync and poll methods in FivetranClient --- .../dagster_fivetran/resources.py | 253 +++++++++++++++++- .../dagster_fivetran/translator.py | 47 +++- 2 files changed, 298 insertions(+), 2 deletions(-) diff --git a/python_modules/libraries/dagster-fivetran/dagster_fivetran/resources.py b/python_modules/libraries/dagster-fivetran/dagster_fivetran/resources.py index 08e4fea7f0283..5e9adc22b9436 100644 --- a/python_modules/libraries/dagster-fivetran/dagster_fivetran/resources.py +++ b/python_modules/libraries/dagster-fivetran/dagster_fivetran/resources.py @@ -3,7 +3,8 @@ import logging import os import time -from typing import Any, Mapping, Optional, Sequence, Tuple, Type +from functools import partial +from typing import Any, Callable, Mapping, Optional, Sequence, Tuple, Type from urllib.parse import urljoin import requests @@ -469,11 +470,13 @@ def __init__( api_secret: str, request_max_retries: int, request_retry_delay: float, + disable_schedule_on_trigger: bool, ): self.api_key = api_key self.api_secret = api_secret self.request_max_retries = request_max_retries self.request_retry_delay = request_retry_delay + self.disable_schedule_on_trigger = disable_schedule_on_trigger @property def _auth(self) -> HTTPBasicAuth: @@ -592,6 +595,246 @@ def get_groups(self) -> Mapping[str, Any]: """ return self._make_request("GET", "groups") + # TODO: update + def update_connector( + self, connector_id: str, properties: Optional[Mapping[str, Any]] = None + ) -> Mapping[str, Any]: + """Updates properties of a Fivetran Connector. + + Args: + connector_id (str): The Fivetran Connector ID. You can retrieve this value from the + "Setup" tab of a given connector in the Fivetran UI. + properties (Dict[str, Any]): The properties to be updated. For a comprehensive list of + properties, see the [Fivetran docs](https://fivetran.com/docs/rest-api/connectors#modifyaconnector). + + Returns: + Dict[str, Any]: Parsed json data representing the API response. + """ + return self._make_connector_request( + method="PATCH", endpoint=connector_id, data=json.dumps(properties) + ) + + # TODO: update + def update_schedule_type( + self, connector_id: str, schedule_type: Optional[str] = None + ) -> Mapping[str, Any]: + """Updates the schedule type property of the connector to either "auto" or "manual". + + Args: + connector_id (str): The Fivetran Connector ID. You can retrieve this value from the + "Setup" tab of a given connector in the Fivetran UI. + schedule_type (Optional[str]): Either "auto" (to turn the schedule on) or "manual" (to + turn it off). + + Returns: + Dict[str, Any]: Parsed json data representing the API response. + """ + if schedule_type not in ["auto", "manual"]: + check.failed(f"schedule_type must be either 'auto' or 'manual': got '{schedule_type}'") + return self.update_connector(connector_id, properties={"schedule_type": schedule_type}) + + def start_sync(self, connector_id: str) -> None: + """Initiates a sync of a Fivetran connector. + + Args: + connector_id (str): The Fivetran Connector ID. You can retrieve this value from the + "Setup" tab of a given connector in the Fivetran UI. + + Returns: + Dict[str, Any]: Parsed json data representing the connector details API response after + the sync is started. + """ + request_fn = partial( + self._make_connector_request, method="POST", endpoint=f"{connector_id}/force" + ) + self._start_sync(request_fn=request_fn, connector_id=connector_id) + + def start_resync( + self, connector_id: str, resync_parameters: Optional[Mapping[str, Sequence[str]]] = None + ) -> None: + """Initiates a historical sync of all data for multiple schema tables within a Fivetran connector. + + Args: + connector_id (str): The Fivetran Connector ID. You can retrieve this value from the + "Setup" tab of a given connector in the Fivetran UI. + resync_parameters (Optional[Dict[str, List[str]]]): Optional resync parameters to send to the Fivetran API. + An example payload can be found here: https://fivetran.com/docs/rest-api/connectors#request_7 + + Returns: + Dict[str, Any]: Parsed json data representing the connector details API response after + the resync is started. + """ + request_fn = partial( + self._make_connector_request, + method="POST", + endpoint=( + f"{connector_id}/schemas/tables/resync" + if resync_parameters is not None + else f"{connector_id}/resync" + ), + data=json.dumps(resync_parameters) if resync_parameters is not None else None, + ) + self._start_sync(request_fn=request_fn, connector_id=connector_id) + + def _start_sync(self, request_fn: Callable, connector_id: str) -> None: + if self.disable_schedule_on_trigger: + self._log.info("Disabling Fivetran sync schedule.") + self.update_schedule_type(connector_id, "manual") + connector = FivetranConnector.from_connector_details( + connector_details=self.get_connector_details(connector_id) + ) + connector.assert_syncable() + request_fn() + connector = FivetranConnector.from_connector_details( + connector_details=self.get_connector_details(connector_id) + ) + self._log.info( + f"Sync initialized for connector_id={connector_id}. View this sync in the Fivetran" + " UI: " + connector.url + ) + + def poll_sync( + self, + connector_id: str, + initial_last_sync_completion: datetime.datetime, + poll_interval: float = DEFAULT_POLL_INTERVAL, + poll_timeout: Optional[float] = None, + ) -> Mapping[str, Any]: + """Given a Fivetran connector and the timestamp at which the previous sync completed, poll + until the next sync completes. + + The previous sync completion time is necessary because the only way to tell when a sync + completes is when this value changes. + + Args: + connector_id (str): The Fivetran Connector ID. You can retrieve this value from the + "Setup" tab of a given connector in the Fivetran UI. + initial_last_sync_completion (datetime.datetime): The timestamp of the last completed sync + (successful or otherwise) for this connector, prior to running this method. + poll_interval (float): The time (in seconds) that will be waited between successive polls. + poll_timeout (float): The maximum time that will waited before this operation is timed + out. By default, this will never time out. + + Returns: + Dict[str, Any]: Parsed json data representing the API response. + """ + poll_start = datetime.datetime.now() + while True: + connector = FivetranConnector.from_connector_details( + connector_details=self.get_connector_details(connector_id) + ) + ( + curr_last_sync_completion, + curr_last_sync_succeeded, + curr_sync_state, + ) = connector.sync_status + self._log.info(f"Polled '{connector_id}'. Status: [{curr_sync_state}]") + + if curr_last_sync_completion > initial_last_sync_completion: + break + + if poll_timeout and datetime.datetime.now() > poll_start + datetime.timedelta( + seconds=poll_timeout + ): + raise Failure( + f"Sync for connector '{connector_id}' timed out after " + f"{datetime.datetime.now() - poll_start}." + ) + + # Sleep for the configured time interval before polling again. + time.sleep(poll_interval) + + raw_connector_details = self.get_connector_details(connector_id) + connector = FivetranConnector.from_connector_details( + connector_details=self.get_connector_details(connector_id) + ) + if not curr_last_sync_succeeded: + raise Failure( + f"Sync for connector '{connector_id}' failed!", + metadata={ + "connector_details": MetadataValue.json(raw_connector_details), + "log_url": MetadataValue.url(connector.url), + }, + ) + return raw_connector_details + + def sync_and_poll( + self, + connector_id: str, + poll_interval: float = DEFAULT_POLL_INTERVAL, + poll_timeout: Optional[float] = None, + ) -> FivetranOutput: + """Initializes a sync operation for the given connector, and polls until it completes. + + Args: + connector_id (str): The Fivetran Connector ID. You can retrieve this value from the + "Setup" tab of a given connector in the Fivetran UI. + poll_interval (float): The time (in seconds) that will be waited between successive polls. + poll_timeout (float): The maximum time that will waited before this operation is timed + out. By default, this will never time out. + + Returns: + :py:class:`~FivetranOutput`: + Object containing details about the connector and the tables it updates + """ + return self._sync_and_poll( + sync_fn=self.start_sync, + connector_id=connector_id, + poll_interval=poll_interval, + poll_timeout=poll_timeout, + ) + + def resync_and_poll( + self, + connector_id: str, + poll_interval: float = DEFAULT_POLL_INTERVAL, + poll_timeout: Optional[float] = None, + resync_parameters: Optional[Mapping[str, Sequence[str]]] = None, + ) -> FivetranOutput: + """Initializes a historical resync operation for the given connector, and polls until it completes. + + Args: + connector_id (str): The Fivetran Connector ID. You can retrieve this value from the + "Setup" tab of a given connector in the Fivetran UI. + resync_parameters (Dict[str, List[str]]): The payload to send to the Fivetran API. + This should be a dictionary with schema names as the keys and a list of tables + to resync as the values. + poll_interval (float): The time (in seconds) that will be waited between successive polls. + poll_timeout (float): The maximum time that will wait before this operation is timed + out. By default, this will never time out. + + Returns: + :py:class:`~FivetranOutput`: + Object containing details about the connector and the tables it updates + """ + return self._sync_and_poll( + sync_fn=partial(self.start_resync, resync_parameters=resync_parameters), + connector_id=connector_id, + poll_interval=poll_interval, + poll_timeout=poll_timeout, + ) + + def _sync_and_poll( + self, + sync_fn: Callable, + connector_id: str, + poll_interval: float = DEFAULT_POLL_INTERVAL, + poll_timeout: Optional[float] = None, + ) -> FivetranOutput: + schema_config_details = self.get_schema_config_for_connector(connector_id) + connector = FivetranConnector.from_connector_details( + connector_details=self.get_connector_details(connector_id) + ) + init_last_sync_timestamp, _, _ = connector.sync_status + sync_fn(connector_id) + final_details = self.poll_sync( + connector_id, + init_last_sync_timestamp, + poll_interval=poll_interval, + poll_timeout=poll_timeout, + ) + return FivetranOutput(connector_details=final_details, schema_config=schema_config_details) + class FivetranWorkspace(ConfigurableResource): """This class represents a Fivetran workspace and provides utilities @@ -612,6 +855,13 @@ class FivetranWorkspace(ConfigurableResource): default=0.25, description="Time (in seconds) to wait between each request retry.", ) + disable_schedule_on_trigger: bool = Field( + default=True, + description=( + "Specifies if you would like any connector that is sync'd using this " # TODO: update description + "resource to be automatically taken off its Fivetran schedule." + ), + ) _client: FivetranClient = PrivateAttr(default=None) @@ -621,6 +871,7 @@ def get_client(self) -> FivetranClient: api_secret=self.api_secret, request_max_retries=self.request_max_retries, request_retry_delay=self.request_retry_delay, + disable_schedule_on_trigger=self.disable_schedule_on_trigger, ) def fetch_fivetran_workspace_data( diff --git a/python_modules/libraries/dagster-fivetran/dagster_fivetran/translator.py b/python_modules/libraries/dagster-fivetran/dagster_fivetran/translator.py index 554ae463fd768..1a51805f59a18 100644 --- a/python_modules/libraries/dagster-fivetran/dagster_fivetran/translator.py +++ b/python_modules/libraries/dagster-fivetran/dagster_fivetran/translator.py @@ -1,14 +1,19 @@ +from datetime import datetime from enum import Enum -from typing import Any, List, Mapping, NamedTuple, Optional, Sequence +from typing import Any, List, Mapping, NamedTuple, Optional, Sequence, Tuple +from dagster import Failure from dagster._core.definitions.asset_key import AssetKey from dagster._core.definitions.asset_spec import AssetSpec from dagster._record import as_dict, record from dagster._serdes.serdes import whitelist_for_serdes from dagster._utils.cached_method import cached_method +from dagster._vendored.dateutil import parser from dagster_fivetran.utils import get_fivetran_connector_table_name, metadata_for_table +MIN_TIME_STR = "0001-01-01 00:00:00+00" + class FivetranConnectorTableProps(NamedTuple): table: str @@ -38,6 +43,10 @@ class FivetranConnector: service: str group_id: str setup_state: str + sync_state: str + paused: bool + succeeded_at: Optional[str] + failed_at: Optional[str] @property def url(self) -> str: @@ -51,6 +60,38 @@ def destination_id(self) -> str: def is_connected(self) -> bool: return self.setup_state == FivetranConnectorSetupStateType.CONNECTED.value + @property + def is_paused(self) -> bool: + return self.paused + + @property + def sync_status(self) -> Tuple[datetime, bool, str]: + """Gets details about the status of the Fivetran connector. + + Returns: + Tuple[datetime.datetime, bool, str]: + Tuple representing the timestamp of the last completed sync, if it succeeded, and + the currently reported sync status. + """ + succeeded_at = parser.parse(self.succeeded_at or MIN_TIME_STR) + failed_at = parser.parse(self.failed_at or MIN_TIME_STR) + + return ( + max(succeeded_at, failed_at), + succeeded_at > failed_at, + self.sync_state, + ) + + def assert_syncable(self) -> bool: + """Confirms that the connector can be sync. Will raise a Failure in the event that + the connector is either paused or not fully set up. + """ + if self.is_paused: + raise Failure(f"Connector '{self.id}' cannot be synced as it is currently paused.") + if not self.is_connected: + raise Failure(f"Connector '{self.id}' cannot be synced as it has not been setup") + return True + @classmethod def from_connector_details( cls, @@ -62,6 +103,10 @@ def from_connector_details( service=connector_details["service"], group_id=connector_details["group_id"], setup_state=connector_details["status"]["setup_state"], + sync_state=connector_details["status"]["sync_state"], + paused=connector_details["paused"], + succeeded_at=connector_details.get("succeeded_at"), + failed_at=connector_details.get("failed_at"), )