Skip to content

Commit

Permalink
[9/n][dagster-fivetran] Implement sync and poll methods in FivetranCl…
Browse files Browse the repository at this point in the history
…ient
  • Loading branch information
maximearmstrong committed Nov 13, 2024
1 parent 079dff2 commit c152a82
Show file tree
Hide file tree
Showing 2 changed files with 298 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@
import logging
import os
import time
from typing import Any, Mapping, Optional, Sequence, Tuple, Type
from functools import partial
from typing import Any, Callable, Mapping, Optional, Sequence, Tuple, Type
from urllib.parse import urljoin

import requests
Expand Down Expand Up @@ -469,11 +470,13 @@ def __init__(
api_secret: str,
request_max_retries: int,
request_retry_delay: float,
disable_schedule_on_trigger: bool,
):
self.api_key = api_key
self.api_secret = api_secret
self.request_max_retries = request_max_retries
self.request_retry_delay = request_retry_delay
self.disable_schedule_on_trigger = disable_schedule_on_trigger

@property
def _auth(self) -> HTTPBasicAuth:
Expand Down Expand Up @@ -592,6 +595,246 @@ def get_groups(self) -> Mapping[str, Any]:
"""
return self._make_request("GET", "groups")

# TODO: update
def update_connector(
self, connector_id: str, properties: Optional[Mapping[str, Any]] = None
) -> Mapping[str, Any]:
"""Updates properties of a Fivetran Connector.
Args:
connector_id (str): The Fivetran Connector ID. You can retrieve this value from the
"Setup" tab of a given connector in the Fivetran UI.
properties (Dict[str, Any]): The properties to be updated. For a comprehensive list of
properties, see the [Fivetran docs](https://fivetran.com/docs/rest-api/connectors#modifyaconnector).
Returns:
Dict[str, Any]: Parsed json data representing the API response.
"""
return self._make_connector_request(
method="PATCH", endpoint=connector_id, data=json.dumps(properties)
)

# TODO: update
def update_schedule_type(
self, connector_id: str, schedule_type: Optional[str] = None
) -> Mapping[str, Any]:
"""Updates the schedule type property of the connector to either "auto" or "manual".
Args:
connector_id (str): The Fivetran Connector ID. You can retrieve this value from the
"Setup" tab of a given connector in the Fivetran UI.
schedule_type (Optional[str]): Either "auto" (to turn the schedule on) or "manual" (to
turn it off).
Returns:
Dict[str, Any]: Parsed json data representing the API response.
"""
if schedule_type not in ["auto", "manual"]:
check.failed(f"schedule_type must be either 'auto' or 'manual': got '{schedule_type}'")
return self.update_connector(connector_id, properties={"schedule_type": schedule_type})

def start_sync(self, connector_id: str) -> None:
"""Initiates a sync of a Fivetran connector.
Args:
connector_id (str): The Fivetran Connector ID. You can retrieve this value from the
"Setup" tab of a given connector in the Fivetran UI.
Returns:
Dict[str, Any]: Parsed json data representing the connector details API response after
the sync is started.
"""
request_fn = partial(
self._make_connector_request, method="POST", endpoint=f"{connector_id}/force"
)
self._start_sync(request_fn=request_fn, connector_id=connector_id)

def start_resync(
self, connector_id: str, resync_parameters: Optional[Mapping[str, Sequence[str]]] = None
) -> None:
"""Initiates a historical sync of all data for multiple schema tables within a Fivetran connector.
Args:
connector_id (str): The Fivetran Connector ID. You can retrieve this value from the
"Setup" tab of a given connector in the Fivetran UI.
resync_parameters (Optional[Dict[str, List[str]]]): Optional resync parameters to send to the Fivetran API.
An example payload can be found here: https://fivetran.com/docs/rest-api/connectors#request_7
Returns:
Dict[str, Any]: Parsed json data representing the connector details API response after
the resync is started.
"""
request_fn = partial(
self._make_connector_request,
method="POST",
endpoint=(
f"{connector_id}/schemas/tables/resync"
if resync_parameters is not None
else f"{connector_id}/resync"
),
data=json.dumps(resync_parameters) if resync_parameters is not None else None,
)
self._start_sync(request_fn=request_fn, connector_id=connector_id)

def _start_sync(self, request_fn: Callable, connector_id: str) -> None:
if self.disable_schedule_on_trigger:
self._log.info("Disabling Fivetran sync schedule.")
self.update_schedule_type(connector_id, "manual")
connector = FivetranConnector.from_connector_details(
connector_details=self.get_connector_details(connector_id)
)
connector.assert_syncable()
request_fn()
connector = FivetranConnector.from_connector_details(
connector_details=self.get_connector_details(connector_id)
)
self._log.info(
f"Sync initialized for connector_id={connector_id}. View this sync in the Fivetran"
" UI: " + connector.url
)

def poll_sync(
self,
connector_id: str,
initial_last_sync_completion: datetime.datetime,
poll_interval: float = DEFAULT_POLL_INTERVAL,
poll_timeout: Optional[float] = None,
) -> Mapping[str, Any]:
"""Given a Fivetran connector and the timestamp at which the previous sync completed, poll
until the next sync completes.
The previous sync completion time is necessary because the only way to tell when a sync
completes is when this value changes.
Args:
connector_id (str): The Fivetran Connector ID. You can retrieve this value from the
"Setup" tab of a given connector in the Fivetran UI.
initial_last_sync_completion (datetime.datetime): The timestamp of the last completed sync
(successful or otherwise) for this connector, prior to running this method.
poll_interval (float): The time (in seconds) that will be waited between successive polls.
poll_timeout (float): The maximum time that will waited before this operation is timed
out. By default, this will never time out.
Returns:
Dict[str, Any]: Parsed json data representing the API response.
"""
poll_start = datetime.datetime.now()
while True:
connector = FivetranConnector.from_connector_details(
connector_details=self.get_connector_details(connector_id)
)
(
curr_last_sync_completion,
curr_last_sync_succeeded,
curr_sync_state,
) = connector.sync_status
self._log.info(f"Polled '{connector_id}'. Status: [{curr_sync_state}]")

if curr_last_sync_completion > initial_last_sync_completion:
break

if poll_timeout and datetime.datetime.now() > poll_start + datetime.timedelta(
seconds=poll_timeout
):
raise Failure(
f"Sync for connector '{connector_id}' timed out after "
f"{datetime.datetime.now() - poll_start}."
)

# Sleep for the configured time interval before polling again.
time.sleep(poll_interval)

raw_connector_details = self.get_connector_details(connector_id)
connector = FivetranConnector.from_connector_details(
connector_details=self.get_connector_details(connector_id)
)
if not curr_last_sync_succeeded:
raise Failure(
f"Sync for connector '{connector_id}' failed!",
metadata={
"connector_details": MetadataValue.json(raw_connector_details),
"log_url": MetadataValue.url(connector.url),
},
)
return raw_connector_details

def sync_and_poll(
self,
connector_id: str,
poll_interval: float = DEFAULT_POLL_INTERVAL,
poll_timeout: Optional[float] = None,
) -> FivetranOutput:
"""Initializes a sync operation for the given connector, and polls until it completes.
Args:
connector_id (str): The Fivetran Connector ID. You can retrieve this value from the
"Setup" tab of a given connector in the Fivetran UI.
poll_interval (float): The time (in seconds) that will be waited between successive polls.
poll_timeout (float): The maximum time that will waited before this operation is timed
out. By default, this will never time out.
Returns:
:py:class:`~FivetranOutput`:
Object containing details about the connector and the tables it updates
"""
return self._sync_and_poll(
sync_fn=self.start_sync,
connector_id=connector_id,
poll_interval=poll_interval,
poll_timeout=poll_timeout,
)

def resync_and_poll(
self,
connector_id: str,
poll_interval: float = DEFAULT_POLL_INTERVAL,
poll_timeout: Optional[float] = None,
resync_parameters: Optional[Mapping[str, Sequence[str]]] = None,
) -> FivetranOutput:
"""Initializes a historical resync operation for the given connector, and polls until it completes.
Args:
connector_id (str): The Fivetran Connector ID. You can retrieve this value from the
"Setup" tab of a given connector in the Fivetran UI.
resync_parameters (Dict[str, List[str]]): The payload to send to the Fivetran API.
This should be a dictionary with schema names as the keys and a list of tables
to resync as the values.
poll_interval (float): The time (in seconds) that will be waited between successive polls.
poll_timeout (float): The maximum time that will wait before this operation is timed
out. By default, this will never time out.
Returns:
:py:class:`~FivetranOutput`:
Object containing details about the connector and the tables it updates
"""
return self._sync_and_poll(
sync_fn=partial(self.start_resync, resync_parameters=resync_parameters),
connector_id=connector_id,
poll_interval=poll_interval,
poll_timeout=poll_timeout,
)

def _sync_and_poll(
self,
sync_fn: Callable,
connector_id: str,
poll_interval: float = DEFAULT_POLL_INTERVAL,
poll_timeout: Optional[float] = None,
) -> FivetranOutput:
schema_config_details = self.get_schema_config_for_connector(connector_id)
connector = FivetranConnector.from_connector_details(
connector_details=self.get_connector_details(connector_id)
)
init_last_sync_timestamp, _, _ = connector.sync_status
sync_fn(connector_id)
final_details = self.poll_sync(
connector_id,
init_last_sync_timestamp,
poll_interval=poll_interval,
poll_timeout=poll_timeout,
)
return FivetranOutput(connector_details=final_details, schema_config=schema_config_details)


class FivetranWorkspace(ConfigurableResource):
"""This class represents a Fivetran workspace and provides utilities
Expand All @@ -612,6 +855,13 @@ class FivetranWorkspace(ConfigurableResource):
default=0.25,
description="Time (in seconds) to wait between each request retry.",
)
disable_schedule_on_trigger: bool = Field(
default=True,
description=(
"Specifies if you would like any connector that is sync'd using this " # TODO: update description
"resource to be automatically taken off its Fivetran schedule."
),
)

_client: FivetranClient = PrivateAttr(default=None)

Expand All @@ -621,6 +871,7 @@ def get_client(self) -> FivetranClient:
api_secret=self.api_secret,
request_max_retries=self.request_max_retries,
request_retry_delay=self.request_retry_delay,
disable_schedule_on_trigger=self.disable_schedule_on_trigger,
)

def fetch_fivetran_workspace_data(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,19 @@
from datetime import datetime
from enum import Enum
from typing import Any, List, Mapping, NamedTuple, Optional, Sequence
from typing import Any, List, Mapping, NamedTuple, Optional, Sequence, Tuple

from dagster import Failure
from dagster._core.definitions.asset_key import AssetKey
from dagster._core.definitions.asset_spec import AssetSpec
from dagster._record import as_dict, record
from dagster._serdes.serdes import whitelist_for_serdes
from dagster._utils.cached_method import cached_method
from dagster._vendored.dateutil import parser

from dagster_fivetran.utils import get_fivetran_connector_table_name, metadata_for_table

MIN_TIME_STR = "0001-01-01 00:00:00+00"


class FivetranConnectorTableProps(NamedTuple):
table: str
Expand Down Expand Up @@ -38,6 +43,10 @@ class FivetranConnector:
service: str
group_id: str
setup_state: str
sync_state: str
paused: bool
succeeded_at: Optional[str]
failed_at: Optional[str]

@property
def url(self) -> str:
Expand All @@ -51,6 +60,38 @@ def destination_id(self) -> str:
def is_connected(self) -> bool:
return self.setup_state == FivetranConnectorSetupStateType.CONNECTED.value

@property
def is_paused(self) -> bool:
return self.paused

@property
def sync_status(self) -> Tuple[datetime, bool, str]:
"""Gets details about the status of the Fivetran connector.
Returns:
Tuple[datetime.datetime, bool, str]:
Tuple representing the timestamp of the last completed sync, if it succeeded, and
the currently reported sync status.
"""
succeeded_at = parser.parse(self.succeeded_at or MIN_TIME_STR)
failed_at = parser.parse(self.failed_at or MIN_TIME_STR)

return (
max(succeeded_at, failed_at),
succeeded_at > failed_at,
self.sync_state,
)

def assert_syncable(self) -> bool:
"""Confirms that the connector can be sync. Will raise a Failure in the event that
the connector is either paused or not fully set up.
"""
if self.is_paused:
raise Failure(f"Connector '{self.id}' cannot be synced as it is currently paused.")
if not self.is_connected:
raise Failure(f"Connector '{self.id}' cannot be synced as it has not been setup")
return True

@classmethod
def from_connector_details(
cls,
Expand All @@ -62,6 +103,10 @@ def from_connector_details(
service=connector_details["service"],
group_id=connector_details["group_id"],
setup_state=connector_details["status"]["setup_state"],
sync_state=connector_details["status"]["sync_state"],
paused=connector_details["paused"],
succeeded_at=connector_details.get("succeeded_at"),
failed_at=connector_details.get("failed_at"),
)


Expand Down

0 comments on commit c152a82

Please sign in to comment.