diff --git a/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/experimental/test_resources.py b/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/experimental/test_resources.py index a2ee8f6f5fa02..93e7273942576 100644 --- a/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/experimental/test_resources.py +++ b/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/experimental/test_resources.py @@ -2,13 +2,15 @@ import responses from dagster import Failure from dagster._vendored.dateutil import parser -from dagster_fivetran import FivetranWorkspace +from dagster_fivetran import FivetranOutput, FivetranWorkspace from dagster_fivetran.translator import MIN_TIME_STR from dagster_fivetran_tests.experimental.conftest import ( FIVETRAN_API_BASE, FIVETRAN_API_VERSION, FIVETRAN_CONNECTOR_ENDPOINT, + SAMPLE_SCHEMA_CONFIG_FOR_CONNECTOR, + SAMPLE_SUCCESS_MESSAGE, TEST_ACCOUNT_ID, TEST_API_KEY, TEST_API_SECRET, @@ -108,3 +110,74 @@ def test_basic_resource_request( poll_timeout=2, poll_interval=1, ) + + +@pytest.mark.parametrize( + "n_polls, succeed_at_end", + [(0, True), (0, False), (4, True), (4, False), (30, True)], +) +def test_sync_and_poll(n_polls, succeed_at_end, connector_id): + resource = FivetranWorkspace( + account_id=TEST_ACCOUNT_ID, api_key=TEST_API_KEY, api_secret=TEST_API_SECRET + ) + client = resource.get_client() + + test_connector_endpoint = ( + f"{FIVETRAN_API_BASE}/{FIVETRAN_API_VERSION}/{FIVETRAN_CONNECTOR_ENDPOINT}/{connector_id}" + ) + + test_succeeded_at = TEST_MAX_TIME_STR + test_failed_at = TEST_PREVIOUS_MAX_TIME_STR + # Set `failed_at` as more recent that `succeeded_at` if the sync and poll process is expected to fail + if not succeed_at_end: + test_succeeded_at, test_failed_at = test_failed_at, test_succeeded_at + + # Create mock responses to mock poll behavior + def _mock_interaction(): + with responses.RequestsMock() as response: + response.add( + responses.GET, + f"{test_connector_endpoint}/schemas", + json=SAMPLE_SCHEMA_CONFIG_FOR_CONNECTOR, + ) + response.add(responses.PATCH, test_connector_endpoint, json=SAMPLE_SUCCESS_MESSAGE) + response.add( + responses.POST, f"{test_connector_endpoint}/force", json=SAMPLE_SUCCESS_MESSAGE + ) + # initial state + response.add( + responses.GET, + test_connector_endpoint, + json=get_sample_connection_details( + succeeded_at=MIN_TIME_STR, failed_at=MIN_TIME_STR + ), + ) + # n polls before updating + for _ in range(n_polls): + response.add( + responses.GET, + test_connector_endpoint, + json=get_sample_connection_details( + succeeded_at=MIN_TIME_STR, failed_at=MIN_TIME_STR + ), + ) + # final state will be updated + response.add( + responses.GET, + test_connector_endpoint, + json=get_sample_connection_details( + succeeded_at=test_succeeded_at, failed_at=test_failed_at + ), + ) + return client.sync_and_poll(connector_id, poll_interval=0.1) + + if succeed_at_end: + assert _mock_interaction() == FivetranOutput( + connector_details=get_sample_connection_details( + succeeded_at=test_succeeded_at, failed_at=test_failed_at + )["data"], + schema_config=SAMPLE_SCHEMA_CONFIG_FOR_CONNECTOR["data"], + ) + else: + with pytest.raises(Failure, match="failed!"): + _mock_interaction()