Skip to content

Commit

Permalink
[dagster-fivetran] Implement base resync method in FivetranClient (da…
Browse files Browse the repository at this point in the history
…gster-io#26059)

## Summary & Motivation

This PR reworks legacy resync method and implements it in the
`FivetranClient`:

-  `start_resync` is added based on legacy `start_resync`
- `start_resync` leverages `_start_sync` introduced in dagster-io#25911 
- a [resync in
Fivetran](https://fivetran.com/docs/rest-api/api-reference/connectors/resync-connector)
is historical data sync - the endpoint and result is different, but
logic around how to call and handle a resync is the same as a sync.
- a resync can be done with or without resync parameters, using a
different endpoint.

Tests mock the request API calls and make sure that all calls are made.

## How I Tested These Changes

Additional unit tests with BK
  • Loading branch information
maximearmstrong authored and pskinnerthyme committed Dec 16, 2024
1 parent 38466d9 commit b217974
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -631,6 +631,29 @@ def start_sync(self, connector_id: str) -> None:
)
self._start_sync(request_fn=request_fn, connector_id=connector_id)

def start_resync(
self, connector_id: str, resync_parameters: Optional[Mapping[str, Sequence[str]]] = None
) -> None:
"""Initiates a historical sync of all data for multiple schema tables within a Fivetran connector.
Args:
connector_id (str): The Fivetran Connector ID. You can retrieve this value from the
"Setup" tab of a given connector in the Fivetran UI.
resync_parameters (Optional[Dict[str, List[str]]]): Optional resync parameters to send to the Fivetran API.
An example payload can be found here: https://fivetran.com/docs/rest-api/connectors#request_7
"""
request_fn = partial(
self._make_connector_request,
method="POST",
endpoint=(
f"{connector_id}/schemas/tables/resync"
if resync_parameters is not None
else f"{connector_id}/resync"
),
data=json.dumps(resync_parameters) if resync_parameters is not None else None,
)
self._start_sync(request_fn=request_fn, connector_id=connector_id)

def _start_sync(self, request_fn: Callable[[], Mapping[str, Any]], connector_id: str) -> None:
connector = FivetranConnector.from_connector_details(
connector_details=self.get_connector_details(connector_id)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -467,4 +467,16 @@ def all_api_mocks_fixture(
json=SAMPLE_SUCCESS_MESSAGE,
status=200,
)
fetch_workspace_data_api_mocks.add(
method=responses.POST,
url=f"{FIVETRAN_API_BASE}/{FIVETRAN_API_VERSION}/{FIVETRAN_CONNECTOR_ENDPOINT}/{connector_id}/resync",
json=SAMPLE_SUCCESS_MESSAGE,
status=200,
)
fetch_workspace_data_api_mocks.add(
method=responses.POST,
url=f"{FIVETRAN_API_BASE}/{FIVETRAN_API_VERSION}/{FIVETRAN_CONNECTOR_ENDPOINT}/{connector_id}/schemas/tables/resync",
json=SAMPLE_SUCCESS_MESSAGE,
status=200,
)
yield fetch_workspace_data_api_mocks
Original file line number Diff line number Diff line change
Expand Up @@ -47,3 +47,15 @@ def test_basic_resource_request(
client.start_sync(connector_id=connector_id)
assert len(all_api_mocks.calls) == 3
assert f"{connector_id}/force" in all_api_mocks.calls[2].request.url

# resync calls
all_api_mocks.calls.reset()
client.start_resync(connector_id=connector_id, resync_parameters=None)
assert len(all_api_mocks.calls) == 3
assert f"{connector_id}/resync" in all_api_mocks.calls[2].request.url

# resync calls with parameters
all_api_mocks.calls.reset()
client.start_resync(connector_id=connector_id, resync_parameters={"property1": ["string"]})
assert len(all_api_mocks.calls) == 3
assert f"{connector_id}/schemas/tables/resync" in all_api_mocks.calls[2].request.url

0 comments on commit b217974

Please sign in to comment.