From 2384e5b27dfe360c592681777eaa02b1f2d75cc7 Mon Sep 17 00:00:00 2001 From: Maxime Armstrong <46797220+maximearmstrong@users.noreply.github.com> Date: Thu, 26 Dec 2024 11:30:58 -0500 Subject: [PATCH] [12/n][dagster-airbyte] Implement sync_and_poll method in AirbyteCloudClient (#26431) ## Summary & Motivation Implement full sync and poll process in `AirbyteCloudClient`. This will be used in a subsequent PR in `AirbyteCloudWorkspace.sync_and_poll` to materialize Airbyte assets. ## How I Tested These Changes Additional tests with BK --- .../dagster_airbyte/__init__.py | 7 +- .../dagster_airbyte/resources.py | 124 ++++++++-- .../dagster_airbyte/translator.py | 43 +++- .../experimental/conftest.py | 30 ++- .../experimental/test_resources.py | 223 +++++++++++++++++- .../experimental/utils.py | 18 ++ .../test_cloud_resources.py | 13 +- .../dagster_airbyte_tests/test_resources.py | 15 +- .../dagster_airbyte_tests/utils.py | 8 +- 9 files changed, 432 insertions(+), 49 deletions(-) create mode 100644 python_modules/libraries/dagster-airbyte/dagster_airbyte_tests/experimental/utils.py diff --git a/python_modules/libraries/dagster-airbyte/dagster_airbyte/__init__.py b/python_modules/libraries/dagster-airbyte/dagster_airbyte/__init__.py index 5bc6e79fe628e..8c8a8c72495d6 100644 --- a/python_modules/libraries/dagster-airbyte/dagster_airbyte/__init__.py +++ b/python_modules/libraries/dagster-airbyte/dagster_airbyte/__init__.py @@ -23,12 +23,15 @@ AirbyteCloudResource as AirbyteCloudResource, AirbyteCloudWorkspace as AirbyteCloudWorkspace, AirbyteResource as AirbyteResource, - AirbyteState as AirbyteState, airbyte_cloud_resource as airbyte_cloud_resource, airbyte_resource as airbyte_resource, load_airbyte_cloud_asset_specs as load_airbyte_cloud_asset_specs, ) -from dagster_airbyte.translator import DagsterAirbyteTranslator as DagsterAirbyteTranslator +from dagster_airbyte.translator import ( + AirbyteJobStatusType as AirbyteJobStatusType, + AirbyteState as AirbyteState, + DagsterAirbyteTranslator as DagsterAirbyteTranslator, +) from dagster_airbyte.types import AirbyteOutput as AirbyteOutput from dagster_airbyte.version import __version__ as __version__ diff --git a/python_modules/libraries/dagster-airbyte/dagster_airbyte/resources.py b/python_modules/libraries/dagster-airbyte/dagster_airbyte/resources.py index b00fc7c9f625f..33c30737640ed 100644 --- a/python_modules/libraries/dagster-airbyte/dagster_airbyte/resources.py +++ b/python_modules/libraries/dagster-airbyte/dagster_airbyte/resources.py @@ -33,6 +33,8 @@ from dagster_airbyte.translator import ( AirbyteConnection, AirbyteDestination, + AirbyteJob, + AirbyteJobStatusType, AirbyteWorkspaceData, DagsterAirbyteTranslator, ) @@ -53,16 +55,6 @@ AIRBYTE_RECONSTRUCTION_METADATA_KEY_PREFIX = "dagster-airbyte/reconstruction_metadata" -class AirbyteState: - RUNNING = "running" - SUCCEEDED = "succeeded" - CANCELLED = "cancelled" - PENDING = "pending" - FAILED = "failed" - ERROR = "error" - INCOMPLETE = "incomplete" - - class AirbyteResourceState: def __init__(self) -> None: self.request_cache: Dict[str, Optional[Mapping[str, object]]] = {} @@ -252,13 +244,17 @@ def sync_and_poll( job_info = cast(Dict[str, object], job_details.get("job", {})) state = job_info.get("status") - if state in (AirbyteState.RUNNING, AirbyteState.PENDING, AirbyteState.INCOMPLETE): + if state in ( + AirbyteJobStatusType.RUNNING, + AirbyteJobStatusType.PENDING, + AirbyteJobStatusType.INCOMPLETE, + ): continue - elif state == AirbyteState.SUCCEEDED: + elif state == AirbyteJobStatusType.SUCCEEDED: break - elif state == AirbyteState.ERROR: + elif state == AirbyteJobStatusType.ERROR: raise Failure(f"Job failed: {job_id}") - elif state == AirbyteState.CANCELLED: + elif state == AirbyteJobStatusType.CANCELLED: raise Failure(f"Job was cancelled: {job_id}") else: raise Failure(f"Encountered unexpected state `{state}` for job_id {job_id}") @@ -266,7 +262,12 @@ def sync_and_poll( # if Airbyte sync has not completed, make sure to cancel it so that it doesn't outlive # the python process if ( - state not in (AirbyteState.SUCCEEDED, AirbyteState.ERROR, AirbyteState.CANCELLED) + state + not in ( + AirbyteJobStatusType.SUCCEEDED, + AirbyteJobStatusType.ERROR, + AirbyteJobStatusType.CANCELLED, + ) and self.cancel_sync_on_run_termination ): self.cancel_job(job_id) @@ -742,13 +743,17 @@ def sync_and_poll( job_info = cast(Dict[str, object], job_details.get("job", {})) state = job_info.get("status") - if state in (AirbyteState.RUNNING, AirbyteState.PENDING, AirbyteState.INCOMPLETE): + if state in ( + AirbyteJobStatusType.RUNNING, + AirbyteJobStatusType.PENDING, + AirbyteJobStatusType.INCOMPLETE, + ): continue - elif state == AirbyteState.SUCCEEDED: + elif state == AirbyteJobStatusType.SUCCEEDED: break - elif state == AirbyteState.ERROR: + elif state == AirbyteJobStatusType.ERROR: raise Failure(f"Job failed: {job_id}") - elif state == AirbyteState.CANCELLED: + elif state == AirbyteJobStatusType.CANCELLED: raise Failure(f"Job was cancelled: {job_id}") else: raise Failure(f"Encountered unexpected state `{state}` for job_id {job_id}") @@ -756,7 +761,12 @@ def sync_and_poll( # if Airbyte sync has not completed, make sure to cancel it so that it doesn't outlive # the python process if ( - state not in (AirbyteState.SUCCEEDED, AirbyteState.ERROR, AirbyteState.CANCELLED) + state + not in ( + AirbyteJobStatusType.SUCCEEDED, + AirbyteJobStatusType.ERROR, + AirbyteJobStatusType.CANCELLED, + ) and self.cancel_sync_on_run_termination ): self.cancel_job(job_id) @@ -1013,6 +1023,80 @@ def cancel_job(self, job_id: int) -> Mapping[str, Any]: method="DELETE", endpoint=f"jobs/{job_id}", base_url=self.rest_api_base_url ) + def sync_and_poll( + self, + connection_id: str, + poll_interval: Optional[float] = None, + poll_timeout: Optional[float] = None, + cancel_on_termination: bool = True, + ) -> AirbyteOutput: + """Initializes a sync operation for the given connection, and polls until it completes. + + Args: + connection_id (str): The Airbyte Connection ID. You can retrieve this value from the + "Connection" tab of a given connection in the Airbyte UI. + poll_interval (float): The time (in seconds) that will be waited between successive polls. + poll_timeout (float): The maximum time that will wait before this operation is timed + out. By default, this will never time out. + cancel_on_termination (bool): Whether to cancel a sync in Airbyte if the Dagster runner is terminated. + This may be useful to disable if using Airbyte sources that cannot be cancelled and + resumed easily, or if your Dagster deployment may experience runner interruptions + that do not impact your Airbyte deployment. + + Returns: + :py:class:`~AirbyteOutput`: + Details of the sync job. + """ + connection_details = self.get_connection_details(connection_id) + start_job_details = self.start_sync_job(connection_id) + job = AirbyteJob.from_job_details(job_details=start_job_details) + + self._log.info(f"Job {job.id} initialized for connection_id={connection_id}.") + poll_start = datetime.now() + poll_interval = ( + poll_interval if poll_interval is not None else DEFAULT_POLL_INTERVAL_SECONDS + ) + try: + while True: + if poll_timeout and datetime.now() > poll_start + timedelta(seconds=poll_timeout): + raise Failure( + f"Timeout: Airbyte job {job.id} is not ready after the timeout" + f" {poll_timeout} seconds" + ) + + time.sleep(poll_interval) + # We return these job details in the AirbyteOutput when the job succeeds + poll_job_details = self.get_job_details(job.id) + job = AirbyteJob.from_job_details(job_details=poll_job_details) + if job.status in ( + AirbyteJobStatusType.RUNNING, + AirbyteJobStatusType.PENDING, + AirbyteJobStatusType.INCOMPLETE, + ): + continue + elif job.status == AirbyteJobStatusType.SUCCEEDED: + break + elif job.status in [AirbyteJobStatusType.ERROR, AirbyteJobStatusType.FAILED]: + raise Failure(f"Job failed: {job.id}") + elif job.status == AirbyteJobStatusType.CANCELLED: + raise Failure(f"Job was cancelled: {job.id}") + else: + raise Failure( + f"Encountered unexpected state `{job.status}` for job_id {job.id}" + ) + finally: + # if Airbyte sync has not completed, make sure to cancel it so that it doesn't outlive + # the python process + if cancel_on_termination and job.status not in ( + AirbyteJobStatusType.SUCCEEDED, + AirbyteJobStatusType.ERROR, + AirbyteJobStatusType.CANCELLED, + AirbyteJobStatusType.FAILED, + ): + self.cancel_job(job.id) + + return AirbyteOutput(job_details=poll_job_details, connection_details=connection_details) + @experimental class AirbyteCloudWorkspace(ConfigurableResource): diff --git a/python_modules/libraries/dagster-airbyte/dagster_airbyte/translator.py b/python_modules/libraries/dagster-airbyte/dagster_airbyte/translator.py index df1e0e46945d2..1347b1d24af4e 100644 --- a/python_modules/libraries/dagster-airbyte/dagster_airbyte/translator.py +++ b/python_modules/libraries/dagster-airbyte/dagster_airbyte/translator.py @@ -1,6 +1,7 @@ +from enum import Enum from typing import Any, List, Mapping, Optional, Sequence -from dagster._annotations import experimental +from dagster._annotations import deprecated, experimental from dagster._core.definitions.asset_key import AssetKey from dagster._core.definitions.asset_spec import AssetSpec from dagster._core.definitions.metadata.metadata_set import NamespacedMetadataSet, TableMetadataSet @@ -11,6 +12,27 @@ from dagster_airbyte.utils import generate_table_schema, get_airbyte_connection_table_name +class AirbyteJobStatusType(str, Enum): + RUNNING = "running" + SUCCEEDED = "succeeded" + CANCELLED = "cancelled" + PENDING = "pending" + FAILED = "failed" + ERROR = "error" + INCOMPLETE = "incomplete" + + +@deprecated(breaking_version="1.10", additional_warn_text="Use `AirbyteJobStatusType` instead.") +class AirbyteState: + RUNNING = AirbyteJobStatusType.RUNNING + SUCCEEDED = AirbyteJobStatusType.SUCCEEDED + CANCELLED = AirbyteJobStatusType.CANCELLED + PENDING = AirbyteJobStatusType.PENDING + FAILED = AirbyteJobStatusType.FAILED + ERROR = AirbyteJobStatusType.ERROR + INCOMPLETE = AirbyteJobStatusType.INCOMPLETE + + @record class AirbyteConnectionTableProps: table_name: str @@ -108,6 +130,25 @@ def from_stream_details( ) +@whitelist_for_serdes +@record +class AirbyteJob: + """Represents an Airbyte job, based on data as returned from the API.""" + + id: int + status: str + + @classmethod + def from_job_details( + cls, + job_details: Mapping[str, Any], + ) -> "AirbyteJob": + return cls( + id=job_details["jobId"], + status=job_details["status"], + ) + + @whitelist_for_serdes @record class AirbyteWorkspaceData: diff --git a/python_modules/libraries/dagster-airbyte/dagster_airbyte_tests/experimental/conftest.py b/python_modules/libraries/dagster-airbyte/dagster_airbyte_tests/experimental/conftest.py index 3b67382ab3706..8d3169402d399 100644 --- a/python_modules/libraries/dagster-airbyte/dagster_airbyte_tests/experimental/conftest.py +++ b/python_modules/libraries/dagster-airbyte/dagster_airbyte_tests/experimental/conftest.py @@ -1,4 +1,4 @@ -from typing import Iterator +from typing import Any, Iterator, Mapping import pytest import responses @@ -8,7 +8,7 @@ AIRBYTE_REST_API_BASE, AIRBYTE_REST_API_VERSION, ) -from dagster_airbyte.translator import AirbyteConnectionTableProps +from dagster_airbyte.translator import AirbyteConnectionTableProps, AirbyteJobStatusType TEST_WORKSPACE_ID = "some_workspace_id" TEST_CLIENT_ID = "some_client_id" @@ -29,6 +29,8 @@ TEST_JSON_SCHEMA = {} TEST_JOB_ID = 12345 +TEST_UNRECOGNIZED_AIRBYTE_JOB_STATUS_TYPE = "unrecognized" + TEST_AIRBYTE_CONNECTION_TABLE_PROPS = AirbyteConnectionTableProps( table_name=f"{TEST_STREAM_PREFIX}{TEST_STREAM_NAME}", stream_prefix=TEST_STREAM_PREFIX, @@ -165,13 +167,17 @@ # Taken from Airbyte REST API documentation # https://reference.airbyte.com/reference/getjob -SAMPLE_JOB_RESPONSE = { - "jobId": TEST_JOB_ID, - "status": "running", - "jobType": "sync", - "startTime": "2023-03-25T01:30:50Z", - "connectionId": TEST_CONNECTION_ID, -} +def get_job_details_sample(status: str) -> Mapping[str, Any]: + return { + "jobId": TEST_JOB_ID, + "status": status, + "jobType": "sync", + "startTime": "2023-03-25T01:30:50Z", + "connectionId": TEST_CONNECTION_ID, + } + + +SAMPLE_JOB_RESPONSE_RUNNING = get_job_details_sample(status=AirbyteJobStatusType.RUNNING) @pytest.fixture( @@ -224,19 +230,19 @@ def all_api_mocks_fixture( fetch_workspace_data_api_mocks.add( method=responses.POST, url=f"{AIRBYTE_REST_API_BASE}/{AIRBYTE_REST_API_VERSION}/jobs", - json=SAMPLE_JOB_RESPONSE, + json=SAMPLE_JOB_RESPONSE_RUNNING, status=200, ) fetch_workspace_data_api_mocks.add( method=responses.GET, url=f"{AIRBYTE_REST_API_BASE}/{AIRBYTE_REST_API_VERSION}/jobs/{TEST_JOB_ID}", - json=SAMPLE_JOB_RESPONSE, + json=SAMPLE_JOB_RESPONSE_RUNNING, status=200, ) fetch_workspace_data_api_mocks.add( method=responses.DELETE, url=f"{AIRBYTE_REST_API_BASE}/{AIRBYTE_REST_API_VERSION}/jobs/{TEST_JOB_ID}", - json=SAMPLE_JOB_RESPONSE, + json=SAMPLE_JOB_RESPONSE_RUNNING, status=200, ) yield fetch_workspace_data_api_mocks diff --git a/python_modules/libraries/dagster-airbyte/dagster_airbyte_tests/experimental/test_resources.py b/python_modules/libraries/dagster-airbyte/dagster_airbyte_tests/experimental/test_resources.py index a249a5902f32b..30e88ec95be59 100644 --- a/python_modules/libraries/dagster-airbyte/dagster_airbyte_tests/experimental/test_resources.py +++ b/python_modules/libraries/dagster-airbyte/dagster_airbyte_tests/experimental/test_resources.py @@ -3,7 +3,9 @@ from typing import Optional from unittest import mock +import pytest import responses +from dagster import Failure from dagster_airbyte import AirbyteCloudWorkspace from dagster_airbyte.resources import ( AIRBYTE_CONFIGURATION_API_BASE, @@ -11,16 +13,22 @@ AIRBYTE_REST_API_BASE, AIRBYTE_REST_API_VERSION, ) +from dagster_airbyte.translator import AirbyteJobStatusType +from dagster_airbyte.types import AirbyteOutput from dagster_airbyte_tests.experimental.conftest import ( + SAMPLE_CONNECTION_DETAILS, TEST_ACCESS_TOKEN, TEST_CLIENT_ID, TEST_CLIENT_SECRET, TEST_CONNECTION_ID, TEST_DESTINATION_ID, TEST_JOB_ID, + TEST_UNRECOGNIZED_AIRBYTE_JOB_STATUS_TYPE, TEST_WORKSPACE_ID, + get_job_details_sample, ) +from dagster_airbyte_tests.experimental.utils import optional_pytest_raise def assert_token_call_and_split_calls(calls: responses.CallList): @@ -36,11 +44,18 @@ def assert_token_call_and_split_calls(calls: responses.CallList): return calls[1:] -def assert_rest_api_call(call: responses.Call, endpoint: str, object_id: Optional[str] = None): +def assert_rest_api_call( + call: responses.Call, + endpoint: str, + object_id: Optional[str] = None, + method: Optional[str] = None, +): rest_api_url = call.request.url.split("?")[0] assert rest_api_url == f"{AIRBYTE_REST_API_BASE}/{AIRBYTE_REST_API_VERSION}/{endpoint}" if object_id: assert object_id in call.request.body.decode() + if method: + assert method == call.request.method assert call.request.headers["Authorization"] == f"Bearer {TEST_ACCESS_TOKEN}" @@ -144,3 +159,209 @@ def test_basic_resource_request( assert_rest_api_call(call=api_calls[3], endpoint="jobs", object_id=TEST_CONNECTION_ID) assert_rest_api_call(call=api_calls[4], endpoint=f"jobs/{TEST_JOB_ID}") assert_rest_api_call(call=api_calls[5], endpoint=f"jobs/{TEST_JOB_ID}") + + +@pytest.mark.parametrize( + "status, error_expected, exception_message", + [ + (AirbyteJobStatusType.SUCCEEDED, False, None), + (AirbyteJobStatusType.CANCELLED, True, "Job was cancelled"), + (AirbyteJobStatusType.ERROR, True, "Job failed"), + (AirbyteJobStatusType.FAILED, True, "Job failed"), + (TEST_UNRECOGNIZED_AIRBYTE_JOB_STATUS_TYPE, True, "unexpected state"), + ], + ids=[ + "job_status_succeeded", + "job_status_cancelled", + "job_status_error", + "job_status_failed", + "job_status_unrecognized", + ], +) +def test_airbyte_sync_and_poll_client_job_status( + status: str, + error_expected: bool, + exception_message: str, + base_api_mocks: responses.RequestsMock, +) -> None: + resource = AirbyteCloudWorkspace( + workspace_id=TEST_WORKSPACE_ID, + client_id=TEST_CLIENT_ID, + client_secret=TEST_CLIENT_SECRET, + ) + client = resource.get_client() + + test_job_endpoint = f"jobs/{TEST_JOB_ID}" + test_job_api_url = f"{AIRBYTE_REST_API_BASE}/{AIRBYTE_REST_API_VERSION}/{test_job_endpoint}" + + # Create mock responses to mock full sync and poll behavior to test statuses, used only in this test + base_api_mocks.add( + method=responses.POST, + url=f"{AIRBYTE_REST_API_BASE}/{AIRBYTE_REST_API_VERSION}/jobs", + json=get_job_details_sample(status=AirbyteJobStatusType.PENDING), + status=200, + ) + base_api_mocks.add( + method=responses.GET, + url=test_job_api_url, + json=get_job_details_sample(status=status), + status=200, + ) + base_api_mocks.add( + method=responses.POST, + url=f"{AIRBYTE_CONFIGURATION_API_BASE}/{AIRBYTE_CONFIGURATION_API_VERSION}/connections/get", + json=SAMPLE_CONNECTION_DETAILS, + status=200, + ) + + with optional_pytest_raise( + error_expected=error_expected, exception_cls=Failure, exception_message=exception_message + ): + result = client.sync_and_poll( + connection_id=TEST_CONNECTION_ID, poll_interval=0, cancel_on_termination=False + ) + + if not error_expected: + assert result == AirbyteOutput( + job_details=get_job_details_sample(AirbyteJobStatusType.SUCCEEDED), + connection_details=SAMPLE_CONNECTION_DETAILS, + ) + + +@pytest.mark.parametrize( + "n_polls, error_expected", + [ + (0, False), + (0, True), + (4, False), + (4, True), + (30, False), + ], + ids=[ + "sync_short_success", + "sync_short_failure", + "sync_medium_success", + "sync_medium_failure", + "sync_long_success", + ], +) +def test_airbyte_sync_and_poll_client_poll_process( + n_polls: int, error_expected: bool, base_api_mocks: responses.RequestsMock +): + resource = AirbyteCloudWorkspace( + workspace_id=TEST_WORKSPACE_ID, + client_id=TEST_CLIENT_ID, + client_secret=TEST_CLIENT_SECRET, + ) + client = resource.get_client() + + # Create mock responses to mock full sync and poll behavior, used only in this test + def _mock_interaction(): + # initial state + base_api_mocks.add( + method=responses.POST, + url=f"{AIRBYTE_REST_API_BASE}/{AIRBYTE_REST_API_VERSION}/jobs", + json=get_job_details_sample(status=AirbyteJobStatusType.PENDING), + status=200, + ) + base_api_mocks.add( + method=responses.POST, + url=f"{AIRBYTE_CONFIGURATION_API_BASE}/{AIRBYTE_CONFIGURATION_API_VERSION}/connections/get", + json=SAMPLE_CONNECTION_DETAILS, + status=200, + ) + # n polls before updating + for _ in range(n_polls): + base_api_mocks.add( + method=responses.GET, + url=f"{AIRBYTE_REST_API_BASE}/{AIRBYTE_REST_API_VERSION}/jobs/{TEST_JOB_ID}", + json=get_job_details_sample(status=AirbyteJobStatusType.RUNNING), + status=200, + ) + # final state will be updated + base_api_mocks.add( + method=responses.GET, + url=f"{AIRBYTE_REST_API_BASE}/{AIRBYTE_REST_API_VERSION}/jobs/{TEST_JOB_ID}", + json=get_job_details_sample( + status=AirbyteJobStatusType.SUCCEEDED + if not error_expected + else AirbyteJobStatusType.FAILED + ), + status=200, + ) + return client.sync_and_poll(connection_id=TEST_CONNECTION_ID, poll_interval=0.1) + + with optional_pytest_raise( + error_expected=error_expected, exception_cls=Failure, exception_message="Job failed" + ): + result = _mock_interaction() + + if not error_expected: + assert result == AirbyteOutput( + job_details=get_job_details_sample(AirbyteJobStatusType.SUCCEEDED), + connection_details=SAMPLE_CONNECTION_DETAILS, + ) + + +@pytest.mark.parametrize( + "cancel_on_termination, last_call_method", + [ + (True, responses.DELETE), + (False, responses.GET), + ], + ids=[ + "cancel_on_termination_true", + "cancel_on_termination_false", + ], +) +def test_airbyte_sync_and_poll_client_cancel_on_termination( + cancel_on_termination: bool, last_call_method: str, base_api_mocks: responses.RequestsMock +) -> None: + resource = AirbyteCloudWorkspace( + workspace_id=TEST_WORKSPACE_ID, + client_id=TEST_CLIENT_ID, + client_secret=TEST_CLIENT_SECRET, + ) + client = resource.get_client() + + test_job_endpoint = f"jobs/{TEST_JOB_ID}" + test_job_api_url = f"{AIRBYTE_REST_API_BASE}/{AIRBYTE_REST_API_VERSION}/{test_job_endpoint}" + + # Create mock responses to mock full sync and poll behavior to test statuses, used only in this test + base_api_mocks.add( + method=responses.POST, + url=f"{AIRBYTE_REST_API_BASE}/{AIRBYTE_REST_API_VERSION}/jobs", + json=get_job_details_sample(status=AirbyteJobStatusType.PENDING), + status=200, + ) + base_api_mocks.add( + method=responses.GET, + url=test_job_api_url, + json=get_job_details_sample(status=TEST_UNRECOGNIZED_AIRBYTE_JOB_STATUS_TYPE), + status=200, + ) + base_api_mocks.add( + method=responses.POST, + url=f"{AIRBYTE_CONFIGURATION_API_BASE}/{AIRBYTE_CONFIGURATION_API_VERSION}/connections/get", + json=SAMPLE_CONNECTION_DETAILS, + status=200, + ) + + if cancel_on_termination: + base_api_mocks.add( + method=responses.DELETE, + url=test_job_api_url, + status=200, + json=get_job_details_sample(status=AirbyteJobStatusType.CANCELLED), + ) + + with pytest.raises(Failure, match="unexpected state"): + client.sync_and_poll( + connection_id=TEST_CONNECTION_ID, + poll_interval=0, + cancel_on_termination=cancel_on_termination, + ) + + assert_rest_api_call( + call=base_api_mocks.calls[-1], endpoint=test_job_endpoint, method=last_call_method + ) diff --git a/python_modules/libraries/dagster-airbyte/dagster_airbyte_tests/experimental/utils.py b/python_modules/libraries/dagster-airbyte/dagster_airbyte_tests/experimental/utils.py new file mode 100644 index 0000000000000..4a371156bf561 --- /dev/null +++ b/python_modules/libraries/dagster-airbyte/dagster_airbyte_tests/experimental/utils.py @@ -0,0 +1,18 @@ +from contextlib import contextmanager +from typing import Optional, Type + +import pytest + + +@contextmanager +def optional_pytest_raise( + error_expected: bool, exception_cls: Type[Exception], exception_message: Optional[str] = None +): + if error_expected: + kwargs = {} + if exception_message: + kwargs["match"] = exception_message + with pytest.raises(exception_cls, **kwargs): + yield + else: + yield diff --git a/python_modules/libraries/dagster-airbyte/dagster_airbyte_tests/test_cloud_resources.py b/python_modules/libraries/dagster-airbyte/dagster_airbyte_tests/test_cloud_resources.py index 8d48369cdb124..e46e4daebcf38 100644 --- a/python_modules/libraries/dagster-airbyte/dagster_airbyte_tests/test_cloud_resources.py +++ b/python_modules/libraries/dagster-airbyte/dagster_airbyte_tests/test_cloud_resources.py @@ -6,7 +6,7 @@ import pytest import responses from dagster import Failure -from dagster_airbyte import AirbyteCloudResource, AirbyteOutput, AirbyteState +from dagster_airbyte import AirbyteCloudResource, AirbyteJobStatusType, AirbyteOutput @responses.activate @@ -50,7 +50,12 @@ def test_trigger_connection_fail() -> None: @responses.activate @pytest.mark.parametrize( "state", - [AirbyteState.SUCCEEDED, AirbyteState.CANCELLED, AirbyteState.ERROR, "unrecognized"], + [ + AirbyteJobStatusType.SUCCEEDED, + AirbyteJobStatusType.CANCELLED, + AirbyteJobStatusType.ERROR, + "unrecognized", + ], ) def test_sync_and_poll(state) -> None: ab_resource = AirbyteCloudResource( @@ -83,11 +88,11 @@ def test_sync_and_poll(state) -> None: json={"jobId": 1, "status": "cancelled", "jobType": "sync"}, ) - if state == AirbyteState.ERROR: + if state == AirbyteJobStatusType.ERROR: with pytest.raises(Failure, match="Job failed"): ab_resource.sync_and_poll("some_connection", 0) - elif state == AirbyteState.CANCELLED: + elif state == AirbyteJobStatusType.CANCELLED: with pytest.raises(Failure, match="Job was cancelled"): ab_resource.sync_and_poll("some_connection", 0) diff --git a/python_modules/libraries/dagster-airbyte/dagster_airbyte_tests/test_resources.py b/python_modules/libraries/dagster-airbyte/dagster_airbyte_tests/test_resources.py index 16ee2105fc324..a2be7ca3e49e3 100644 --- a/python_modules/libraries/dagster-airbyte/dagster_airbyte_tests/test_resources.py +++ b/python_modules/libraries/dagster-airbyte/dagster_airbyte_tests/test_resources.py @@ -10,7 +10,7 @@ build_init_resource_context, ) from dagster._core.definitions.metadata import MetadataValue -from dagster_airbyte import AirbyteOutput, AirbyteResource, AirbyteState, airbyte_resource +from dagster_airbyte import AirbyteJobStatusType, AirbyteOutput, AirbyteResource, airbyte_resource from dagster_airbyte.utils import generate_materializations from dagster_airbyte_tests.utils import ( @@ -65,7 +65,12 @@ def test_trigger_connection_fail( @responses.activate @pytest.mark.parametrize( "state", - [AirbyteState.SUCCEEDED, AirbyteState.CANCELLED, AirbyteState.ERROR, "unrecognized"], + [ + AirbyteJobStatusType.SUCCEEDED, + AirbyteJobStatusType.CANCELLED, + AirbyteJobStatusType.ERROR, + "unrecognized", + ], ) @pytest.mark.parametrize( "forward_logs", @@ -113,11 +118,11 @@ def test_sync_and_poll( if state == "unrecognized": responses.add(responses.POST, f"{ab_resource.api_base_url}/jobs/cancel", status=204) - if state == AirbyteState.ERROR: + if state == AirbyteJobStatusType.ERROR: with pytest.raises(Failure, match="Job failed"): ab_resource.sync_and_poll("some_connection", 0) - elif state == AirbyteState.CANCELLED: + elif state == AirbyteJobStatusType.CANCELLED: with pytest.raises(Failure, match="Job was cancelled"): ab_resource.sync_and_poll("some_connection", 0) @@ -295,7 +300,7 @@ def _get_attempt(ls): method=responses.POST, url=ab_resource.api_base_url + "/jobs/get", json={ - "job": {"id": 1, "status": AirbyteState.SUCCEEDED}, + "job": {"id": 1, "status": AirbyteJobStatusType.SUCCEEDED}, "attempts": [ _get_attempt(ls) for ls in [["log1a", "log1b", "log1c"], ["log2a", "log2b"]] ], diff --git a/python_modules/libraries/dagster-airbyte/dagster_airbyte_tests/utils.py b/python_modules/libraries/dagster-airbyte/dagster_airbyte_tests/utils.py index 629c6a53eb0f6..af6f732d36944 100644 --- a/python_modules/libraries/dagster-airbyte/dagster_airbyte_tests/utils.py +++ b/python_modules/libraries/dagster-airbyte/dagster_airbyte_tests/utils.py @@ -1,5 +1,5 @@ from dagster._utils.merger import deep_merge_dicts -from dagster_airbyte import AirbyteState +from dagster_airbyte import AirbyteJobStatusType def get_sample_connection_json(stream_prefix="", **kwargs): @@ -59,7 +59,7 @@ def get_sample_connection_json(stream_prefix="", **kwargs): def get_sample_job_json(schema_prefix=""): return { - "job": {"id": 1, "status": AirbyteState.SUCCEEDED}, + "job": {"id": 1, "status": AirbyteJobStatusType.SUCCEEDED}, "attempts": [ { "attempt": { @@ -89,7 +89,7 @@ def get_sample_job_list_json(schema_prefix=""): return { "jobs": [ { - "job": {"id": 1, "status": AirbyteState.SUCCEEDED}, + "job": {"id": 1, "status": AirbyteJobStatusType.SUCCEEDED}, "attempts": [ { "streamStats": [ @@ -371,7 +371,7 @@ def get_project_connection_json(**kwargs): def get_project_job_json(): return { - "job": {"id": 1, "status": AirbyteState.SUCCEEDED}, + "job": {"id": 1, "status": AirbyteJobStatusType.SUCCEEDED}, "attempts": [ { "attempt": {