diff --git a/python_modules/libraries/dagster-airbyte/dagster_airbyte_tests/experimental/test_resources.py b/python_modules/libraries/dagster-airbyte/dagster_airbyte_tests/experimental/test_resources.py index 37a618041eb66..30e88ec95be59 100644 --- a/python_modules/libraries/dagster-airbyte/dagster_airbyte_tests/experimental/test_resources.py +++ b/python_modules/libraries/dagster-airbyte/dagster_airbyte_tests/experimental/test_resources.py @@ -28,6 +28,7 @@ TEST_WORKSPACE_ID, get_job_details_sample, ) +from dagster_airbyte_tests.experimental.utils import optional_pytest_raise def assert_token_call_and_split_calls(calls: responses.CallList): @@ -161,13 +162,13 @@ def test_basic_resource_request( @pytest.mark.parametrize( - "status", + "status, error_expected, exception_message", [ - AirbyteJobStatusType.SUCCEEDED, - AirbyteJobStatusType.CANCELLED, - AirbyteJobStatusType.ERROR, - AirbyteJobStatusType.FAILED, - TEST_UNRECOGNIZED_AIRBYTE_JOB_STATUS_TYPE, + (AirbyteJobStatusType.SUCCEEDED, False, None), + (AirbyteJobStatusType.CANCELLED, True, "Job was cancelled"), + (AirbyteJobStatusType.ERROR, True, "Job failed"), + (AirbyteJobStatusType.FAILED, True, "Job failed"), + (TEST_UNRECOGNIZED_AIRBYTE_JOB_STATUS_TYPE, True, "unexpected state"), ], ids=[ "job_status_succeeded", @@ -178,7 +179,10 @@ def test_basic_resource_request( ], ) def test_airbyte_sync_and_poll_client_job_status( - status: str, base_api_mocks: responses.RequestsMock + status: str, + error_expected: bool, + exception_message: str, + base_api_mocks: responses.RequestsMock, ) -> None: resource = AirbyteCloudWorkspace( workspace_id=TEST_WORKSPACE_ID, @@ -210,31 +214,14 @@ def test_airbyte_sync_and_poll_client_job_status( status=200, ) - if status == TEST_UNRECOGNIZED_AIRBYTE_JOB_STATUS_TYPE: - base_api_mocks.add( - method=responses.DELETE, - url=test_job_api_url, - status=200, - json=get_job_details_sample(status=AirbyteJobStatusType.CANCELLED), + with optional_pytest_raise( + error_expected=error_expected, exception_cls=Failure, exception_message=exception_message + ): + result = client.sync_and_poll( + connection_id=TEST_CONNECTION_ID, poll_interval=0, cancel_on_termination=False ) - if status in [AirbyteJobStatusType.ERROR, AirbyteJobStatusType.FAILED]: - with pytest.raises(Failure, match="Job failed"): - client.sync_and_poll(connection_id=TEST_CONNECTION_ID, poll_interval=0) - - elif status == AirbyteJobStatusType.CANCELLED: - with pytest.raises(Failure, match="Job was cancelled"): - client.sync_and_poll(connection_id=TEST_CONNECTION_ID, poll_interval=0) - - elif status == TEST_UNRECOGNIZED_AIRBYTE_JOB_STATUS_TYPE: - with pytest.raises(Failure, match="unexpected state"): - client.sync_and_poll(connection_id=TEST_CONNECTION_ID, poll_interval=0) - assert_rest_api_call( - call=base_api_mocks.calls[-1], endpoint=test_job_endpoint, method=responses.DELETE - ) - - else: - result = client.sync_and_poll(connection_id=TEST_CONNECTION_ID, poll_interval=0) + if not error_expected: assert result == AirbyteOutput( job_details=get_job_details_sample(AirbyteJobStatusType.SUCCEEDED), connection_details=SAMPLE_CONNECTION_DETAILS, @@ -242,13 +229,13 @@ def test_airbyte_sync_and_poll_client_job_status( @pytest.mark.parametrize( - "n_polls, succeed_at_end", + "n_polls, error_expected", [ - (0, True), (0, False), - (4, True), + (0, True), (4, False), - (30, True), + (4, True), + (30, False), ], ids=[ "sync_short_success", @@ -259,7 +246,7 @@ def test_airbyte_sync_and_poll_client_job_status( ], ) def test_airbyte_sync_and_poll_client_poll_process( - n_polls, succeed_at_end, base_api_mocks: responses.RequestsMock + n_polls: int, error_expected: bool, base_api_mocks: responses.RequestsMock ): resource = AirbyteCloudWorkspace( workspace_id=TEST_WORKSPACE_ID, @@ -297,28 +284,30 @@ def _mock_interaction(): url=f"{AIRBYTE_REST_API_BASE}/{AIRBYTE_REST_API_VERSION}/jobs/{TEST_JOB_ID}", json=get_job_details_sample( status=AirbyteJobStatusType.SUCCEEDED - if succeed_at_end + if not error_expected else AirbyteJobStatusType.FAILED ), status=200, ) return client.sync_and_poll(connection_id=TEST_CONNECTION_ID, poll_interval=0.1) - if succeed_at_end: - assert _mock_interaction() == AirbyteOutput( + with optional_pytest_raise( + error_expected=error_expected, exception_cls=Failure, exception_message="Job failed" + ): + result = _mock_interaction() + + if not error_expected: + assert result == AirbyteOutput( job_details=get_job_details_sample(AirbyteJobStatusType.SUCCEEDED), connection_details=SAMPLE_CONNECTION_DETAILS, ) - else: - with pytest.raises(Failure, match="Job failed"): - _mock_interaction() @pytest.mark.parametrize( - "cancel_on_termination", + "cancel_on_termination, last_call_method", [ - True, - False, + (True, responses.DELETE), + (False, responses.GET), ], ids=[ "cancel_on_termination_true", @@ -326,7 +315,7 @@ def _mock_interaction(): ], ) def test_airbyte_sync_and_poll_client_cancel_on_termination( - cancel_on_termination: bool, base_api_mocks: responses.RequestsMock + cancel_on_termination: bool, last_call_method: str, base_api_mocks: responses.RequestsMock ) -> None: resource = AirbyteCloudWorkspace( workspace_id=TEST_WORKSPACE_ID, @@ -372,12 +361,7 @@ def test_airbyte_sync_and_poll_client_cancel_on_termination( poll_interval=0, cancel_on_termination=cancel_on_termination, ) - if cancel_on_termination: - assert_rest_api_call( - call=base_api_mocks.calls[-1], endpoint=test_job_endpoint, method=responses.DELETE - ) - else: - # If we don't cancel on termination, the last call will be a call to fetch the job details - assert_rest_api_call( - call=base_api_mocks.calls[-1], endpoint=test_job_endpoint, method=responses.GET - ) + + assert_rest_api_call( + call=base_api_mocks.calls[-1], endpoint=test_job_endpoint, method=last_call_method + ) diff --git a/python_modules/libraries/dagster-airbyte/dagster_airbyte_tests/experimental/utils.py b/python_modules/libraries/dagster-airbyte/dagster_airbyte_tests/experimental/utils.py new file mode 100644 index 0000000000000..4a371156bf561 --- /dev/null +++ b/python_modules/libraries/dagster-airbyte/dagster_airbyte_tests/experimental/utils.py @@ -0,0 +1,18 @@ +from contextlib import contextmanager +from typing import Optional, Type + +import pytest + + +@contextmanager +def optional_pytest_raise( + error_expected: bool, exception_cls: Type[Exception], exception_message: Optional[str] = None +): + if error_expected: + kwargs = {} + if exception_message: + kwargs["match"] = exception_message + with pytest.raises(exception_cls, **kwargs): + yield + else: + yield