From 190eef7cc4da64c24ccd7d519322704b9078dc1f Mon Sep 17 00:00:00 2001 From: Maxime Armstrong Date: Wed, 20 Nov 2024 16:48:28 -0500 Subject: [PATCH 1/3] [dagster-fivetran] Implement resync_and_poll method in FivetranClient --- .../dagster_fivetran/resources.py | 30 +++++++++++++++++++ 1 file changed, 30 insertions(+) diff --git a/python_modules/libraries/dagster-fivetran/dagster_fivetran/resources.py b/python_modules/libraries/dagster-fivetran/dagster_fivetran/resources.py index 119695e23b365..4fa288f9daa38 100644 --- a/python_modules/libraries/dagster-fivetran/dagster_fivetran/resources.py +++ b/python_modules/libraries/dagster-fivetran/dagster_fivetran/resources.py @@ -749,6 +749,36 @@ def sync_and_poll( poll_timeout=poll_timeout, ) + def resync_and_poll( + self, + connector_id: str, + poll_interval: float = DEFAULT_POLL_INTERVAL, + poll_timeout: Optional[float] = None, + resync_parameters: Optional[Mapping[str, Sequence[str]]] = None, + ) -> FivetranOutput: + """Initializes a historical resync operation for the given connector, and polls until it completes. + + Args: + connector_id (str): The Fivetran Connector ID. You can retrieve this value from the + "Setup" tab of a given connector in the Fivetran UI. + resync_parameters (Dict[str, List[str]]): The payload to send to the Fivetran API. + This should be a dictionary with schema names as the keys and a list of tables + to resync as the values. + poll_interval (float): The time (in seconds) that will be waited between successive polls. + poll_timeout (float): The maximum time that will wait before this operation is timed + out. By default, this will never time out. + + Returns: + :py:class:`~FivetranOutput`: + Object containing details about the connector and the tables it updates + """ + return self._sync_and_poll( + sync_fn=partial(self.start_resync, resync_parameters=resync_parameters), + connector_id=connector_id, + poll_interval=poll_interval, + poll_timeout=poll_timeout, + ) + def _sync_and_poll( self, sync_fn: Callable, From 0b4c08888833da00fd45bb4bca7fbc365bd53fed Mon Sep 17 00:00:00 2001 From: Maxime Armstrong Date: Thu, 21 Nov 2024 18:44:38 -0500 Subject: [PATCH 2/3] Add tests --- .../experimental/test_resources.py | 29 ++++++++++++++----- 1 file changed, 22 insertions(+), 7 deletions(-) diff --git a/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/experimental/test_resources.py b/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/experimental/test_resources.py index dad8c12264953..6c0dbb9135daa 100644 --- a/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/experimental/test_resources.py +++ b/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/experimental/test_resources.py @@ -111,17 +111,33 @@ def test_basic_resource_request( @pytest.mark.parametrize( - "n_polls, succeed_at_end", - [(0, True), (0, False), (4, True), (4, False), (30, True)], + "method, n_polls, succeed_at_end", + [ + ("sync_and_poll", 0, True), + ("sync_and_poll", 0, False), + ("sync_and_poll", 4, True), + ("sync_and_poll", 4, False), + ("sync_and_poll", 30, True), + ("resync_and_poll", 0, True), + ("resync_and_poll", 0, False), + ("resync_and_poll", 4, True), + ("resync_and_poll", 4, False), + ("resync_and_poll", 30, True), + ], ids=["short_success", "short_failure", "medium_success", "medium_failure", "long_success"], ) -def test_sync_and_poll(n_polls, succeed_at_end, connector_id): +def test_sync_and_poll_methods(method, n_polls, succeed_at_end, connector_id): resource = FivetranWorkspace( account_id=TEST_ACCOUNT_ID, api_key=TEST_API_KEY, api_secret=TEST_API_SECRET ) client = resource.get_client() test_connector_api_url = get_fivetran_connector_api_url(connector_id) + test_sync_api_url = ( + f"{test_connector_api_url}/force" + if method == "sync_and_poll" + else f"{test_connector_api_url}/resync" + ) test_succeeded_at = TEST_MAX_TIME_STR test_failed_at = TEST_PREVIOUS_MAX_TIME_STR @@ -139,9 +155,7 @@ def _mock_interaction(): json=SAMPLE_SCHEMA_CONFIG_FOR_CONNECTOR, ) response.add(responses.PATCH, test_connector_api_url, json=SAMPLE_SUCCESS_MESSAGE) - response.add( - responses.POST, f"{test_connector_api_url}/force", json=SAMPLE_SUCCESS_MESSAGE - ) + response.add(responses.POST, test_sync_api_url, json=SAMPLE_SUCCESS_MESSAGE) # initial state response.add( responses.GET, @@ -167,7 +181,8 @@ def _mock_interaction(): succeeded_at=test_succeeded_at, failed_at=test_failed_at ), ) - return client.sync_and_poll(connector_id, poll_interval=0.1) + test_method = getattr(client, method) + return test_method(connector_id, poll_interval=0.1) if succeed_at_end: assert _mock_interaction() == FivetranOutput( From cd041b01121d782b297ad066221d0ec52186f307 Mon Sep 17 00:00:00 2001 From: Maxime Armstrong Date: Mon, 25 Nov 2024 08:05:33 -0500 Subject: [PATCH 3/3] Add test ids --- .../experimental/test_resources.py | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/experimental/test_resources.py b/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/experimental/test_resources.py index 6c0dbb9135daa..9c4fb9594253b 100644 --- a/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/experimental/test_resources.py +++ b/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/experimental/test_resources.py @@ -124,7 +124,18 @@ def test_basic_resource_request( ("resync_and_poll", 4, False), ("resync_and_poll", 30, True), ], - ids=["short_success", "short_failure", "medium_success", "medium_failure", "long_success"], + ids=[ + "sync_short_success", + "sync_short_failure", + "sync_medium_success", + "sync_medium_failure", + "sync_long_success", + "resync_short_success", + "resync_short_failure", + "resync_medium_success", + "resync_medium_failure", + "resync_long_success", + ], ) def test_sync_and_poll_methods(method, n_polls, succeed_at_end, connector_id): resource = FivetranWorkspace(