Skip to content

Commit

Permalink
Add tests
Browse files Browse the repository at this point in the history
  • Loading branch information
maximearmstrong committed Nov 21, 2024
1 parent 683db31 commit 3b2685e
Showing 1 changed file with 74 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,15 @@
import responses
from dagster import Failure
from dagster._vendored.dateutil import parser
from dagster_fivetran import FivetranWorkspace
from dagster_fivetran import FivetranOutput, FivetranWorkspace
from dagster_fivetran.translator import MIN_TIME_STR

from dagster_fivetran_tests.experimental.conftest import (
FIVETRAN_API_BASE,
FIVETRAN_API_VERSION,
FIVETRAN_CONNECTOR_ENDPOINT,
SAMPLE_SCHEMA_CONFIG_FOR_CONNECTOR,
SAMPLE_SUCCESS_MESSAGE,
TEST_ACCOUNT_ID,
TEST_API_KEY,
TEST_API_SECRET,
Expand Down Expand Up @@ -108,3 +110,74 @@ def test_basic_resource_request(
poll_timeout=2,
poll_interval=1,
)


@pytest.mark.parametrize(
"n_polls, succeed_at_end",
[(0, True), (0, False), (4, True), (4, False), (30, True)],
)
def test_sync_and_poll(n_polls, succeed_at_end, connector_id):
resource = FivetranWorkspace(
account_id=TEST_ACCOUNT_ID, api_key=TEST_API_KEY, api_secret=TEST_API_SECRET
)
client = resource.get_client()

test_connector_endpoint = (
f"{FIVETRAN_API_BASE}/{FIVETRAN_API_VERSION}/{FIVETRAN_CONNECTOR_ENDPOINT}/{connector_id}"
)

test_succeeded_at = TEST_MAX_TIME_STR
test_failed_at = TEST_PREVIOUS_MAX_TIME_STR
# Set `failed_at` as more recent that `succeeded_at` if the sync and poll process is expected to fail
if not succeed_at_end:
test_succeeded_at, test_failed_at = test_failed_at, test_succeeded_at

# Create mock responses to mock poll behavior
def _mock_interaction():
with responses.RequestsMock() as response:
response.add(
responses.GET,
f"{test_connector_endpoint}/schemas",
json=SAMPLE_SCHEMA_CONFIG_FOR_CONNECTOR,
)
response.add(responses.PATCH, test_connector_endpoint, json=SAMPLE_SUCCESS_MESSAGE)
response.add(
responses.POST, f"{test_connector_endpoint}/force", json=SAMPLE_SUCCESS_MESSAGE
)
# initial state
response.add(
responses.GET,
test_connector_endpoint,
json=get_sample_connection_details(
succeeded_at=MIN_TIME_STR, failed_at=MIN_TIME_STR
),
)
# n polls before updating
for _ in range(n_polls):
response.add(
responses.GET,
test_connector_endpoint,
json=get_sample_connection_details(
succeeded_at=MIN_TIME_STR, failed_at=MIN_TIME_STR
),
)
# final state will be updated
response.add(
responses.GET,
test_connector_endpoint,
json=get_sample_connection_details(
succeeded_at=test_succeeded_at, failed_at=test_failed_at
),
)
return client.sync_and_poll(connector_id, poll_interval=0.1)

if succeed_at_end:
assert _mock_interaction() == FivetranOutput(
connector_details=get_sample_connection_details(
succeeded_at=test_succeeded_at, failed_at=test_failed_at
)["data"],
schema_config=SAMPLE_SCHEMA_CONFIG_FOR_CONNECTOR["data"],
)
else:
with pytest.raises(Failure, match="failed!"):
_mock_interaction()

0 comments on commit 3b2685e

Please sign in to comment.