diff --git a/python_modules/libraries/dagster-fivetran/dagster_fivetran/resources.py b/python_modules/libraries/dagster-fivetran/dagster_fivetran/resources.py index 119695e23b365..4fa288f9daa38 100644 --- a/python_modules/libraries/dagster-fivetran/dagster_fivetran/resources.py +++ b/python_modules/libraries/dagster-fivetran/dagster_fivetran/resources.py @@ -749,6 +749,36 @@ def sync_and_poll( poll_timeout=poll_timeout, ) + def resync_and_poll( + self, + connector_id: str, + poll_interval: float = DEFAULT_POLL_INTERVAL, + poll_timeout: Optional[float] = None, + resync_parameters: Optional[Mapping[str, Sequence[str]]] = None, + ) -> FivetranOutput: + """Initializes a historical resync operation for the given connector, and polls until it completes. + + Args: + connector_id (str): The Fivetran Connector ID. You can retrieve this value from the + "Setup" tab of a given connector in the Fivetran UI. + resync_parameters (Dict[str, List[str]]): The payload to send to the Fivetran API. + This should be a dictionary with schema names as the keys and a list of tables + to resync as the values. + poll_interval (float): The time (in seconds) that will be waited between successive polls. + poll_timeout (float): The maximum time that will wait before this operation is timed + out. By default, this will never time out. + + Returns: + :py:class:`~FivetranOutput`: + Object containing details about the connector and the tables it updates + """ + return self._sync_and_poll( + sync_fn=partial(self.start_resync, resync_parameters=resync_parameters), + connector_id=connector_id, + poll_interval=poll_interval, + poll_timeout=poll_timeout, + ) + def _sync_and_poll( self, sync_fn: Callable, diff --git a/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/experimental/test_resources.py b/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/experimental/test_resources.py index dad8c12264953..9c4fb9594253b 100644 --- a/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/experimental/test_resources.py +++ b/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/experimental/test_resources.py @@ -111,17 +111,44 @@ def test_basic_resource_request( @pytest.mark.parametrize( - "n_polls, succeed_at_end", - [(0, True), (0, False), (4, True), (4, False), (30, True)], - ids=["short_success", "short_failure", "medium_success", "medium_failure", "long_success"], + "method, n_polls, succeed_at_end", + [ + ("sync_and_poll", 0, True), + ("sync_and_poll", 0, False), + ("sync_and_poll", 4, True), + ("sync_and_poll", 4, False), + ("sync_and_poll", 30, True), + ("resync_and_poll", 0, True), + ("resync_and_poll", 0, False), + ("resync_and_poll", 4, True), + ("resync_and_poll", 4, False), + ("resync_and_poll", 30, True), + ], + ids=[ + "sync_short_success", + "sync_short_failure", + "sync_medium_success", + "sync_medium_failure", + "sync_long_success", + "resync_short_success", + "resync_short_failure", + "resync_medium_success", + "resync_medium_failure", + "resync_long_success", + ], ) -def test_sync_and_poll(n_polls, succeed_at_end, connector_id): +def test_sync_and_poll_methods(method, n_polls, succeed_at_end, connector_id): resource = FivetranWorkspace( account_id=TEST_ACCOUNT_ID, api_key=TEST_API_KEY, api_secret=TEST_API_SECRET ) client = resource.get_client() test_connector_api_url = get_fivetran_connector_api_url(connector_id) + test_sync_api_url = ( + f"{test_connector_api_url}/force" + if method == "sync_and_poll" + else f"{test_connector_api_url}/resync" + ) test_succeeded_at = TEST_MAX_TIME_STR test_failed_at = TEST_PREVIOUS_MAX_TIME_STR @@ -139,9 +166,7 @@ def _mock_interaction(): json=SAMPLE_SCHEMA_CONFIG_FOR_CONNECTOR, ) response.add(responses.PATCH, test_connector_api_url, json=SAMPLE_SUCCESS_MESSAGE) - response.add( - responses.POST, f"{test_connector_api_url}/force", json=SAMPLE_SUCCESS_MESSAGE - ) + response.add(responses.POST, test_sync_api_url, json=SAMPLE_SUCCESS_MESSAGE) # initial state response.add( responses.GET, @@ -167,7 +192,8 @@ def _mock_interaction(): succeeded_at=test_succeeded_at, failed_at=test_failed_at ), ) - return client.sync_and_poll(connector_id, poll_interval=0.1) + test_method = getattr(client, method) + return test_method(connector_id, poll_interval=0.1) if succeed_at_end: assert _mock_interaction() == FivetranOutput(