Skip to content

Commit

Permalink
Update tests
Browse files Browse the repository at this point in the history
  • Loading branch information
maximearmstrong committed Dec 19, 2024
1 parent 017b5ce commit a2e3a43
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 54 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
TEST_WORKSPACE_ID,
get_job_details_sample,
)
from dagster_airbyte_tests.experimental.utils import optional_pytest_raise


def assert_token_call_and_split_calls(calls: responses.CallList):
Expand Down Expand Up @@ -161,13 +162,13 @@ def test_basic_resource_request(


@pytest.mark.parametrize(
"status",
"status, error_expected, exception_message",
[
AirbyteJobStatusType.SUCCEEDED,
AirbyteJobStatusType.CANCELLED,
AirbyteJobStatusType.ERROR,
AirbyteJobStatusType.FAILED,
TEST_UNRECOGNIZED_AIRBYTE_JOB_STATUS_TYPE,
(AirbyteJobStatusType.SUCCEEDED, False, None),
(AirbyteJobStatusType.CANCELLED, True, "Job was cancelled"),
(AirbyteJobStatusType.ERROR, True, "Job failed"),
(AirbyteJobStatusType.FAILED, True, "Job failed"),
(TEST_UNRECOGNIZED_AIRBYTE_JOB_STATUS_TYPE, True, "unexpected state"),
],
ids=[
"job_status_succeeded",
Expand All @@ -178,7 +179,10 @@ def test_basic_resource_request(
],
)
def test_airbyte_sync_and_poll_client_job_status(
status: str, base_api_mocks: responses.RequestsMock
status: str,
error_expected: bool,
exception_message: str,
base_api_mocks: responses.RequestsMock,
) -> None:
resource = AirbyteCloudWorkspace(
workspace_id=TEST_WORKSPACE_ID,
Expand Down Expand Up @@ -210,45 +214,28 @@ def test_airbyte_sync_and_poll_client_job_status(
status=200,
)

if status == TEST_UNRECOGNIZED_AIRBYTE_JOB_STATUS_TYPE:
base_api_mocks.add(
method=responses.DELETE,
url=test_job_api_url,
status=200,
json=get_job_details_sample(status=AirbyteJobStatusType.CANCELLED),
with optional_pytest_raise(
error_expected=error_expected, exception_cls=Failure, exception_message=exception_message
):
result = client.sync_and_poll(
connection_id=TEST_CONNECTION_ID, poll_interval=0, cancel_on_termination=False
)

if status in [AirbyteJobStatusType.ERROR, AirbyteJobStatusType.FAILED]:
with pytest.raises(Failure, match="Job failed"):
client.sync_and_poll(connection_id=TEST_CONNECTION_ID, poll_interval=0)

elif status == AirbyteJobStatusType.CANCELLED:
with pytest.raises(Failure, match="Job was cancelled"):
client.sync_and_poll(connection_id=TEST_CONNECTION_ID, poll_interval=0)

elif status == TEST_UNRECOGNIZED_AIRBYTE_JOB_STATUS_TYPE:
with pytest.raises(Failure, match="unexpected state"):
client.sync_and_poll(connection_id=TEST_CONNECTION_ID, poll_interval=0)
assert_rest_api_call(
call=base_api_mocks.calls[-1], endpoint=test_job_endpoint, method=responses.DELETE
)

else:
result = client.sync_and_poll(connection_id=TEST_CONNECTION_ID, poll_interval=0)
if not error_expected:
assert result == AirbyteOutput(
job_details=get_job_details_sample(AirbyteJobStatusType.SUCCEEDED),
connection_details=SAMPLE_CONNECTION_DETAILS,
)


@pytest.mark.parametrize(
"n_polls, succeed_at_end",
"n_polls, error_expected",
[
(0, True),
(0, False),
(4, True),
(0, True),
(4, False),
(30, True),
(4, True),
(30, False),
],
ids=[
"sync_short_success",
Expand All @@ -259,7 +246,7 @@ def test_airbyte_sync_and_poll_client_job_status(
],
)
def test_airbyte_sync_and_poll_client_poll_process(
n_polls, succeed_at_end, base_api_mocks: responses.RequestsMock
n_polls: int, error_expected: bool, base_api_mocks: responses.RequestsMock
):
resource = AirbyteCloudWorkspace(
workspace_id=TEST_WORKSPACE_ID,
Expand Down Expand Up @@ -297,36 +284,38 @@ def _mock_interaction():
url=f"{AIRBYTE_REST_API_BASE}/{AIRBYTE_REST_API_VERSION}/jobs/{TEST_JOB_ID}",
json=get_job_details_sample(
status=AirbyteJobStatusType.SUCCEEDED
if succeed_at_end
if not error_expected
else AirbyteJobStatusType.FAILED
),
status=200,
)
return client.sync_and_poll(connection_id=TEST_CONNECTION_ID, poll_interval=0.1)

if succeed_at_end:
assert _mock_interaction() == AirbyteOutput(
with optional_pytest_raise(
error_expected=error_expected, exception_cls=Failure, exception_message="Job failed"
):
result = _mock_interaction()

if not error_expected:
assert result == AirbyteOutput(
job_details=get_job_details_sample(AirbyteJobStatusType.SUCCEEDED),
connection_details=SAMPLE_CONNECTION_DETAILS,
)
else:
with pytest.raises(Failure, match="Job failed"):
_mock_interaction()


@pytest.mark.parametrize(
"cancel_on_termination",
"cancel_on_termination, last_call_method",
[
True,
False,
(True, responses.DELETE),
(False, responses.GET),
],
ids=[
"cancel_on_termination_true",
"cancel_on_termination_false",
],
)
def test_airbyte_sync_and_poll_client_cancel_on_termination(
cancel_on_termination: bool, base_api_mocks: responses.RequestsMock
cancel_on_termination: bool, last_call_method: str, base_api_mocks: responses.RequestsMock
) -> None:
resource = AirbyteCloudWorkspace(
workspace_id=TEST_WORKSPACE_ID,
Expand Down Expand Up @@ -372,12 +361,7 @@ def test_airbyte_sync_and_poll_client_cancel_on_termination(
poll_interval=0,
cancel_on_termination=cancel_on_termination,
)
if cancel_on_termination:
assert_rest_api_call(
call=base_api_mocks.calls[-1], endpoint=test_job_endpoint, method=responses.DELETE
)
else:
# If we don't cancel on termination, the last call will be a call to fetch the job details
assert_rest_api_call(
call=base_api_mocks.calls[-1], endpoint=test_job_endpoint, method=responses.GET
)

assert_rest_api_call(
call=base_api_mocks.calls[-1], endpoint=test_job_endpoint, method=last_call_method
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
from contextlib import contextmanager
from typing import Optional, Type

import pytest


@contextmanager
def optional_pytest_raise(
error_expected: bool, exception_cls: Type[Exception], exception_message: Optional[str] = None
):
if error_expected:
kwargs = {}
if exception_message:
kwargs["match"] = exception_message
with pytest.raises(exception_cls, **kwargs):
yield
else:
yield

0 comments on commit a2e3a43

Please sign in to comment.