Skip to content

Commit

Permalink
[dagster-fivetran] Implement base poll method in FivetranClient (#26060)
Browse files Browse the repository at this point in the history
## Summary & Motivation

This PR reworks legacy oll method and implements it in the
`FivetranClient`:

-  `poll_sync` is added based on legacy `poll_sync`
- the way of handling the connector sync status has been updated 
  - Logic has been reworked and moved to `FivetranConnector` properties 

Tests mock the request API calls and make sure that all calls are made.

## How I Tested These Changes

Additional unit tests with BK
  • Loading branch information
maximearmstrong authored and cmpadden committed Dec 5, 2024
1 parent 85290cb commit f9f53fc
Show file tree
Hide file tree
Showing 4 changed files with 223 additions and 69 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -668,6 +668,61 @@ def _start_sync(self, request_fn: Callable[[], Mapping[str, Any]], connector_id:
" UI: " + connector.url
)

def poll_sync(
self,
connector_id: str,
previous_sync_completed_at: datetime,
poll_interval: float = DEFAULT_POLL_INTERVAL,
poll_timeout: Optional[float] = None,
) -> Mapping[str, Any]:
"""Given a Fivetran connector and the timestamp at which the previous sync completed, poll
until the next sync completes.
The previous sync completion time is necessary because the only way to tell when a sync
completes is when this value changes.
Args:
connector_id (str): The Fivetran Connector ID. You can retrieve this value from the
"Setup" tab of a given connector in the Fivetran UI.
previous_sync_completed_at (datetime.datetime): The datetime of the previous completed sync
(successful or otherwise) for this connector, prior to running this method.
poll_interval (float): The time (in seconds) that will be waited between successive polls.
poll_timeout (float): The maximum time that will wait before this operation is timed
out. By default, this will never time out.
Returns:
Dict[str, Any]: Parsed json data representing the API response.
"""
poll_start = datetime.now()
while True:
connector_details = self.get_connector_details(connector_id)
connector = FivetranConnector.from_connector_details(
connector_details=connector_details
)
self._log.info(f"Polled '{connector_id}'. Status: [{connector.sync_state}]")

if connector.last_sync_completed_at > previous_sync_completed_at:
break

if poll_timeout and datetime.now() > poll_start + timedelta(seconds=poll_timeout):
raise Failure(
f"Sync for connector '{connector_id}' timed out after "
f"{datetime.now() - poll_start}."
)

# Sleep for the configured time interval before polling again.
time.sleep(poll_interval)

if not connector.is_last_sync_successful:
raise Failure(
f"Sync for connector '{connector_id}' failed!",
metadata={
"connector_details": MetadataValue.json(connector_details),
"log_url": MetadataValue.url(connector.url),
},
)
return connector_details


@experimental
class FivetranWorkspace(ConfigurableResource):
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from datetime import datetime
from enum import Enum
from typing import Any, List, Mapping, NamedTuple, Optional, Sequence

Expand All @@ -7,9 +8,12 @@
from dagster._record import as_dict, record
from dagster._serdes.serdes import whitelist_for_serdes
from dagster._utils.cached_method import cached_method
from dagster._vendored.dateutil import parser

from dagster_fivetran.utils import get_fivetran_connector_table_name, metadata_for_table

MIN_TIME_STR = "0001-01-01 00:00:00+00"


class FivetranConnectorTableProps(NamedTuple):
table: str
Expand Down Expand Up @@ -46,7 +50,10 @@ class FivetranConnector:
service: str
group_id: str
setup_state: str
sync_state: str
paused: bool
succeeded_at: Optional[str]
failed_at: Optional[str]

@property
def url(self) -> str:
Expand All @@ -64,6 +71,32 @@ def is_connected(self) -> bool:
def is_paused(self) -> bool:
return self.paused

@property
def last_sync_completed_at(self) -> datetime:
"""Gets the datetime of the last completed sync of the Fivetran connector.
Returns:
datetime.datetime:
The datetime of the last completed sync of the Fivetran connector.
"""
succeeded_at = parser.parse(self.succeeded_at or MIN_TIME_STR)
failed_at = parser.parse(self.failed_at or MIN_TIME_STR)

return max(succeeded_at, failed_at)

@property
def is_last_sync_successful(self) -> bool:
"""Gets a boolean representing whether the last completed sync of the Fivetran connector was successful or not.
Returns:
bool:
Whether the last completed sync of the Fivetran connector was successful or not.
"""
succeeded_at = parser.parse(self.succeeded_at or MIN_TIME_STR)
failed_at = parser.parse(self.failed_at or MIN_TIME_STR)

return succeeded_at > failed_at

def validate_syncable(self) -> bool:
"""Confirms that the connector can be sync. Will raise a Failure in the event that
the connector is either paused or not fully set up.
Expand All @@ -85,7 +118,10 @@ def from_connector_details(
service=connector_details["service"],
group_id=connector_details["group_id"],
setup_state=connector_details["status"]["setup_state"],
sync_state=connector_details["status"]["sync_state"],
paused=connector_details["paused"],
succeeded_at=connector_details.get("succeeded_at"),
failed_at=connector_details.get("failed_at"),
)


Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import Iterator
from typing import Any, Iterator, Mapping

import pytest
import responses
Expand All @@ -8,6 +8,13 @@
FIVETRAN_CONNECTOR_ENDPOINT,
)

TEST_MAX_TIME_STR = "2024-12-01T15:45:29.013729Z"
TEST_PREVIOUS_MAX_TIME_STR = "2024-12-01T15:43:29.013729Z"

TEST_ACCOUNT_ID = "test_account_id"
TEST_API_KEY = "test_api_key"
TEST_API_SECRET = "test_api_secret"

# Taken from Fivetran API documentation
# https://fivetran.com/docs/rest-api/api-reference/groups/list-all-groups
SAMPLE_GROUPS = {
Expand Down Expand Up @@ -61,7 +68,7 @@
},
"config": {"property1": {}, "property2": {}},
"daily_sync_time": "14:00",
"succeeded_at": "2024-12-01T15:43:29.013729Z",
"succeeded_at": "2024-12-01T15:45:29.013729Z",
"sync_frequency": 360,
"group_id": "my_group_destination_id",
"connected_by": "user_id",
Expand All @@ -75,7 +82,7 @@
],
"source_sync_details": {},
"service_version": 0,
"created_at": "2024-12-01T15:43:29.013729Z",
"created_at": "2024-12-01T15:41:29.013729Z",
"failed_at": "2024-12-01T15:43:29.013729Z",
"private_link_id": "string",
"proxy_agent_id": "string",
Expand Down Expand Up @@ -148,72 +155,79 @@
},
}


# Taken from Fivetran API documentation
# https://fivetran.com/docs/rest-api/api-reference/connectors/connector-details
SAMPLE_CONNECTOR_DETAILS = {
"code": "Success",
"message": "Operation performed.",
"data": {
"id": "connector_id",
"service": "15five",
"schema": "schema.table",
"paused": False,
"status": {
"tasks": [
{
"code": "resync_table_warning",
"message": "Resync Table Warning",
"details": "string",
}
],
"warnings": [
# The sample is parameterized to test the poll method
def get_sample_connection_details(succeeded_at: str, failed_at: str) -> Mapping[str, Any]:
return {
"code": "Success",
"message": "Operation performed.",
"data": {
"id": "connector_id",
"service": "15five",
"schema": "schema.table",
"paused": False,
"status": {
"tasks": [
{
"code": "resync_table_warning",
"message": "Resync Table Warning",
"details": "string",
}
],
"warnings": [
{
"code": "resync_table_warning",
"message": "Resync Table Warning",
"details": "string",
}
],
"schema_status": "ready",
"update_state": "delayed",
"setup_state": "connected",
"sync_state": "scheduled",
"is_historical_sync": False,
"rescheduled_for": "2024-12-01T15:43:29.013729Z",
},
"daily_sync_time": "14:00",
"succeeded_at": succeeded_at,
"sync_frequency": 1440,
"group_id": "my_group_destination_id",
"connected_by": "user_id",
"setup_tests": [
{
"code": "resync_table_warning",
"message": "Resync Table Warning",
"details": "string",
"title": "Test Title",
"status": "PASSED",
"message": "Test Passed",
"details": "Test Details",
}
],
"schema_status": "ready",
"update_state": "delayed",
"setup_state": "connected",
"sync_state": "scheduled",
"is_historical_sync": False,
"rescheduled_for": "2024-12-01T15:43:29.013729Z",
},
"daily_sync_time": "14:00",
"succeeded_at": "2024-03-17T12:31:40.870504Z",
"sync_frequency": 1440,
"group_id": "my_group_destination_id",
"connected_by": "user_id",
"setup_tests": [
{
"title": "Test Title",
"status": "PASSED",
"message": "Test Passed",
"details": "Test Details",
}
],
"source_sync_details": {},
"service_version": 0,
"created_at": "2023-12-01T15:43:29.013729Z",
"failed_at": "2024-04-01T18:13:25.043659Z",
"private_link_id": "private_link_id",
"proxy_agent_id": "proxy_agent_id",
"networking_method": "Directly",
"connect_card": {
"token": "eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJkIjp7ImxvZ2luIjp0cnVlLCJ1c2VyIjoiX2FjY291bnR3b3J0aHkiLCJhY2NvdW50IjoiX21vb25iZWFtX2FjYyIsImdyb3VwIjoiX21vb25iZWFtIiwiY29ubmVjdG9yIjoiY29iYWx0X2VsZXZhdGlvbiIsIm1ldGhvZCI6IlBiZkNhcmQiLCJpZGVudGl0eSI6ZmFsc2V9LCJpYXQiOjE2Njc4MzA2MzZ9.YUMGUbzxW96xsKJLo4bTorqzx8Q19GTrUi3WFRFM8BU",
"uri": "https://fivetran.com/connect-card/setup?auth=eyJ0eXAiOiJKV1QiLCJh...",
"source_sync_details": {},
"service_version": 0,
"created_at": "2024-12-01T15:41:29.013729Z",
"failed_at": failed_at,
"private_link_id": "private_link_id",
"proxy_agent_id": "proxy_agent_id",
"networking_method": "Directly",
"connect_card": {
"token": "eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJkIjp7ImxvZ2luIjp0cnVlLCJ1c2VyIjoiX2FjY291bnR3b3J0aHkiLCJhY2NvdW50IjoiX21vb25iZWFtX2FjYyIsImdyb3VwIjoiX21vb25iZWFtIiwiY29ubmVjdG9yIjoiY29iYWx0X2VsZXZhdGlvbiIsIm1ldGhvZCI6IlBiZkNhcmQiLCJpZGVudGl0eSI6ZmFsc2V9LCJpYXQiOjE2Njc4MzA2MzZ9.YUMGUbzxW96xsKJLo4bTorqzx8Q19GTrUi3WFRFM8BU",
"uri": "https://fivetran.com/connect-card/setup?auth=eyJ0eXAiOiJKV1QiLCJh...",
},
"pause_after_trial": False,
"data_delay_threshold": 0,
"data_delay_sensitivity": "NORMAL",
"schedule_type": "auto",
"local_processing_agent_id": "local_processing_agent_id",
"connect_card_config": {
"redirect_uri": "https://your.site/path",
"hide_setup_guide": True,
},
"hybrid_deployment_agent_id": "hybrid_deployment_agent_id",
"config": {"api_key": "your_15five_api_key"},
},
"pause_after_trial": False,
"data_delay_threshold": 0,
"data_delay_sensitivity": "NORMAL",
"schedule_type": "auto",
"local_processing_agent_id": "local_processing_agent_id",
"connect_card_config": {"redirect_uri": "https://your.site/path", "hide_setup_guide": True},
"hybrid_deployment_agent_id": "hybrid_deployment_agent_id",
"config": {"api_key": "your_15five_api_key"},
},
}
}


# Taken from Fivetran API documentation
# https://fivetran.com/docs/rest-api/api-reference/connector-schema/connector-schema-config
Expand Down Expand Up @@ -382,10 +396,6 @@

SAMPLE_SUCCESS_MESSAGE = {"code": "Success", "message": "Operation performed."}

TEST_ACCOUNT_ID = "test_account_id"
TEST_API_KEY = "test_api_key"
TEST_API_SECRET = "test_api_secret"


@pytest.fixture(name="connector_id")
def connector_id_fixture() -> str:
Expand Down Expand Up @@ -452,13 +462,17 @@ def all_api_mocks_fixture(
fetch_workspace_data_api_mocks.add(
method=responses.GET,
url=f"{FIVETRAN_API_BASE}/{FIVETRAN_API_VERSION}/{FIVETRAN_CONNECTOR_ENDPOINT}/{connector_id}",
json=SAMPLE_CONNECTOR_DETAILS,
json=get_sample_connection_details(
succeeded_at=TEST_MAX_TIME_STR, failed_at=TEST_PREVIOUS_MAX_TIME_STR
),
status=200,
)
fetch_workspace_data_api_mocks.add(
method=responses.PATCH,
url=f"{FIVETRAN_API_BASE}/{FIVETRAN_API_VERSION}/{FIVETRAN_CONNECTOR_ENDPOINT}/{connector_id}",
json=SAMPLE_CONNECTOR_DETAILS,
json=get_sample_connection_details(
succeeded_at=TEST_MAX_TIME_STR, failed_at=TEST_PREVIOUS_MAX_TIME_STR
),
status=200,
)
fetch_workspace_data_api_mocks.add(
Expand Down
Loading

0 comments on commit f9f53fc

Please sign in to comment.