From 16a481091f3b3b6661950aece2474c7e04604dcf Mon Sep 17 00:00:00 2001 From: Maxime Armstrong Date: Thu, 12 Dec 2024 00:18:43 -0500 Subject: [PATCH 1/6] [dagster-airbyte] Implement sync_and_poll method in AirbyteCloudClient --- .../dagster_airbyte/resources.py | 78 +++++++++++++++++++ 1 file changed, 78 insertions(+) diff --git a/python_modules/libraries/dagster-airbyte/dagster_airbyte/resources.py b/python_modules/libraries/dagster-airbyte/dagster_airbyte/resources.py index b00fc7c9f625f..70cbe1d3b9f4e 100644 --- a/python_modules/libraries/dagster-airbyte/dagster_airbyte/resources.py +++ b/python_modules/libraries/dagster-airbyte/dagster_airbyte/resources.py @@ -1013,6 +1013,84 @@ def cancel_job(self, job_id: int) -> Mapping[str, Any]: method="DELETE", endpoint=f"jobs/{job_id}", base_url=self.rest_api_base_url ) + def sync_and_poll( + self, + connection_id: str, + poll_interval: Optional[float] = None, + poll_timeout: Optional[float] = None, + ) -> AirbyteOutput: + """Initializes a sync operation for the given connection, and polls until it completes. + + Args: + connection_id (str): The Airbyte Connection ID. You can retrieve this value from the + "Connection" tab of a given connection in the Airbyte UI. + poll_interval (float): The time (in seconds) that will be waited between successive polls. + poll_timeout (float): The maximum time that will wait before this operation is timed + out. By default, this will never time out. + + Returns: + :py:class:`~AirbyteOutput`: + Details of the sync job. + """ + connection_details = self.get_connection_details(connection_id) + job_details = self.start_sync(connection_id) + # TODO: Use AirbyteJob class + job_id = job_details["id"] + + self._log.info(f"Job {job_id} initialized for connection_id={connection_id}.") + start = time.monotonic() + logged_attempts = 0 + logged_lines = 0 + state = None + + try: + while True: + if poll_timeout and start + poll_timeout < time.monotonic(): + raise Failure( + f"Timeout: Airbyte job {job_id} is not ready after the timeout" + f" {poll_timeout} seconds" + ) + time.sleep(poll_interval or self.poll_interval) + job_details = self.get_job_status(connection_id, job_id) + attempts = cast(List, job_details.get("attempts", [])) + cur_attempt = len(attempts) + # spit out the available Airbyte log info + if cur_attempt: + if self._should_forward_logs: + log_lines = attempts[logged_attempts].get("logs", {}).get("logLines", []) + + for line in log_lines[logged_lines:]: + sys.stdout.write(line + "\n") + sys.stdout.flush() + logged_lines = len(log_lines) + + # if there's a next attempt, this one will have no more log messages + if logged_attempts < cur_attempt - 1: + logged_lines = 0 + logged_attempts += 1 + + state = job_details["status"] + if state in (AirbyteState.RUNNING, AirbyteState.PENDING, AirbyteState.INCOMPLETE): + continue + elif state == AirbyteState.SUCCEEDED: + break + elif state == AirbyteState.ERROR: + raise Failure(f"Job failed: {job_id}") + elif state == AirbyteState.CANCELLED: + raise Failure(f"Job was cancelled: {job_id}") + else: + raise Failure(f"Encountered unexpected state `{state}` for job_id {job_id}") + finally: + # if Airbyte sync has not completed, make sure to cancel it so that it doesn't outlive + # the python process + if ( + state not in (AirbyteState.SUCCEEDED, AirbyteState.ERROR, AirbyteState.CANCELLED) + and self.cancel_sync_on_run_termination + ): + self.cancel_job(job_id) + + return AirbyteOutput(job_details=job_details, connection_details=connection_details) + @experimental class AirbyteCloudWorkspace(ConfigurableResource): From 2e9e0454289b19da53b53bb8605426582fbca5a8 Mon Sep 17 00:00:00 2001 From: Maxime Armstrong Date: Mon, 16 Dec 2024 16:58:03 -0500 Subject: [PATCH 2/6] Update AirbyteCloudClient.sync_and_poll --- .../dagster_airbyte/__init__.py | 7 +- .../dagster_airbyte/resources.py | 120 +++++++++--------- .../dagster_airbyte/translator.py | 43 ++++++- .../test_cloud_resources.py | 13 +- .../dagster_airbyte_tests/test_resources.py | 15 ++- .../dagster_airbyte_tests/utils.py | 8 +- 6 files changed, 130 insertions(+), 76 deletions(-) diff --git a/python_modules/libraries/dagster-airbyte/dagster_airbyte/__init__.py b/python_modules/libraries/dagster-airbyte/dagster_airbyte/__init__.py index 5bc6e79fe628e..8c8a8c72495d6 100644 --- a/python_modules/libraries/dagster-airbyte/dagster_airbyte/__init__.py +++ b/python_modules/libraries/dagster-airbyte/dagster_airbyte/__init__.py @@ -23,12 +23,15 @@ AirbyteCloudResource as AirbyteCloudResource, AirbyteCloudWorkspace as AirbyteCloudWorkspace, AirbyteResource as AirbyteResource, - AirbyteState as AirbyteState, airbyte_cloud_resource as airbyte_cloud_resource, airbyte_resource as airbyte_resource, load_airbyte_cloud_asset_specs as load_airbyte_cloud_asset_specs, ) -from dagster_airbyte.translator import DagsterAirbyteTranslator as DagsterAirbyteTranslator +from dagster_airbyte.translator import ( + AirbyteJobStatusType as AirbyteJobStatusType, + AirbyteState as AirbyteState, + DagsterAirbyteTranslator as DagsterAirbyteTranslator, +) from dagster_airbyte.types import AirbyteOutput as AirbyteOutput from dagster_airbyte.version import __version__ as __version__ diff --git a/python_modules/libraries/dagster-airbyte/dagster_airbyte/resources.py b/python_modules/libraries/dagster-airbyte/dagster_airbyte/resources.py index 70cbe1d3b9f4e..2fb16db3d21c9 100644 --- a/python_modules/libraries/dagster-airbyte/dagster_airbyte/resources.py +++ b/python_modules/libraries/dagster-airbyte/dagster_airbyte/resources.py @@ -33,6 +33,8 @@ from dagster_airbyte.translator import ( AirbyteConnection, AirbyteDestination, + AirbyteJob, + AirbyteJobStatusType, AirbyteWorkspaceData, DagsterAirbyteTranslator, ) @@ -53,16 +55,6 @@ AIRBYTE_RECONSTRUCTION_METADATA_KEY_PREFIX = "dagster-airbyte/reconstruction_metadata" -class AirbyteState: - RUNNING = "running" - SUCCEEDED = "succeeded" - CANCELLED = "cancelled" - PENDING = "pending" - FAILED = "failed" - ERROR = "error" - INCOMPLETE = "incomplete" - - class AirbyteResourceState: def __init__(self) -> None: self.request_cache: Dict[str, Optional[Mapping[str, object]]] = {} @@ -252,13 +244,17 @@ def sync_and_poll( job_info = cast(Dict[str, object], job_details.get("job", {})) state = job_info.get("status") - if state in (AirbyteState.RUNNING, AirbyteState.PENDING, AirbyteState.INCOMPLETE): + if state in ( + AirbyteJobStatusType.RUNNING, + AirbyteJobStatusType.PENDING, + AirbyteJobStatusType.INCOMPLETE, + ): continue - elif state == AirbyteState.SUCCEEDED: + elif state == AirbyteJobStatusType.SUCCEEDED: break - elif state == AirbyteState.ERROR: + elif state == AirbyteJobStatusType.ERROR: raise Failure(f"Job failed: {job_id}") - elif state == AirbyteState.CANCELLED: + elif state == AirbyteJobStatusType.CANCELLED: raise Failure(f"Job was cancelled: {job_id}") else: raise Failure(f"Encountered unexpected state `{state}` for job_id {job_id}") @@ -266,7 +262,12 @@ def sync_and_poll( # if Airbyte sync has not completed, make sure to cancel it so that it doesn't outlive # the python process if ( - state not in (AirbyteState.SUCCEEDED, AirbyteState.ERROR, AirbyteState.CANCELLED) + state + not in ( + AirbyteJobStatusType.SUCCEEDED, + AirbyteJobStatusType.ERROR, + AirbyteJobStatusType.CANCELLED, + ) and self.cancel_sync_on_run_termination ): self.cancel_job(job_id) @@ -742,13 +743,17 @@ def sync_and_poll( job_info = cast(Dict[str, object], job_details.get("job", {})) state = job_info.get("status") - if state in (AirbyteState.RUNNING, AirbyteState.PENDING, AirbyteState.INCOMPLETE): + if state in ( + AirbyteJobStatusType.RUNNING, + AirbyteJobStatusType.PENDING, + AirbyteJobStatusType.INCOMPLETE, + ): continue - elif state == AirbyteState.SUCCEEDED: + elif state == AirbyteJobStatusType.SUCCEEDED: break - elif state == AirbyteState.ERROR: + elif state == AirbyteJobStatusType.ERROR: raise Failure(f"Job failed: {job_id}") - elif state == AirbyteState.CANCELLED: + elif state == AirbyteJobStatusType.CANCELLED: raise Failure(f"Job was cancelled: {job_id}") else: raise Failure(f"Encountered unexpected state `{state}` for job_id {job_id}") @@ -756,7 +761,12 @@ def sync_and_poll( # if Airbyte sync has not completed, make sure to cancel it so that it doesn't outlive # the python process if ( - state not in (AirbyteState.SUCCEEDED, AirbyteState.ERROR, AirbyteState.CANCELLED) + state + not in ( + AirbyteJobStatusType.SUCCEEDED, + AirbyteJobStatusType.ERROR, + AirbyteJobStatusType.CANCELLED, + ) and self.cancel_sync_on_run_termination ): self.cancel_job(job_id) @@ -1018,6 +1028,7 @@ def sync_and_poll( connection_id: str, poll_interval: Optional[float] = None, poll_timeout: Optional[float] = None, + cancel_on_termination: bool = True, ) -> AirbyteOutput: """Initializes a sync operation for the given connection, and polls until it completes. @@ -1027,69 +1038,58 @@ def sync_and_poll( poll_interval (float): The time (in seconds) that will be waited between successive polls. poll_timeout (float): The maximum time that will wait before this operation is timed out. By default, this will never time out. + cancel_on_termination (bool): Whether to cancel a sync in Airbyte if the Dagster runner is terminated. + This may be useful to disable if using Airbyte sources that cannot be cancelled and + resumed easily, or if your Dagster deployment may experience runner interruptions + that do not impact your Airbyte deployment. Returns: :py:class:`~AirbyteOutput`: Details of the sync job. """ connection_details = self.get_connection_details(connection_id) - job_details = self.start_sync(connection_id) - # TODO: Use AirbyteJob class - job_id = job_details["id"] + start_job_details = self.start_sync(connection_id) + job = AirbyteJob.from_job_details(job_details=start_job_details) - self._log.info(f"Job {job_id} initialized for connection_id={connection_id}.") + self._log.info(f"Job {job.id} initialized for connection_id={connection_id}.") start = time.monotonic() - logged_attempts = 0 - logged_lines = 0 - state = None - try: while True: if poll_timeout and start + poll_timeout < time.monotonic(): raise Failure( - f"Timeout: Airbyte job {job_id} is not ready after the timeout" + f"Timeout: Airbyte job {job.id} is not ready after the timeout" f" {poll_timeout} seconds" ) time.sleep(poll_interval or self.poll_interval) - job_details = self.get_job_status(connection_id, job_id) - attempts = cast(List, job_details.get("attempts", [])) - cur_attempt = len(attempts) - # spit out the available Airbyte log info - if cur_attempt: - if self._should_forward_logs: - log_lines = attempts[logged_attempts].get("logs", {}).get("logLines", []) - - for line in log_lines[logged_lines:]: - sys.stdout.write(line + "\n") - sys.stdout.flush() - logged_lines = len(log_lines) - - # if there's a next attempt, this one will have no more log messages - if logged_attempts < cur_attempt - 1: - logged_lines = 0 - logged_attempts += 1 - - state = job_details["status"] - if state in (AirbyteState.RUNNING, AirbyteState.PENDING, AirbyteState.INCOMPLETE): + poll_job_details = self.get_job_status(connection_id, job.id) + job = AirbyteJob.from_job_details(job_details=poll_job_details) + if job.status in ( + AirbyteJobStatusType.RUNNING, + AirbyteJobStatusType.PENDING, + AirbyteJobStatusType.INCOMPLETE, + ): continue - elif state == AirbyteState.SUCCEEDED: + elif job.status == AirbyteJobStatusType.SUCCEEDED: break - elif state == AirbyteState.ERROR: - raise Failure(f"Job failed: {job_id}") - elif state == AirbyteState.CANCELLED: - raise Failure(f"Job was cancelled: {job_id}") + elif job.status == AirbyteJobStatusType.ERROR: + raise Failure(f"Job failed: {job.id}") + elif job.status == AirbyteJobStatusType.CANCELLED: + raise Failure(f"Job was cancelled: {job.id}") else: - raise Failure(f"Encountered unexpected state `{state}` for job_id {job_id}") + raise Failure( + f"Encountered unexpected state `{job.status}` for job_id {job.id}" + ) finally: # if Airbyte sync has not completed, make sure to cancel it so that it doesn't outlive # the python process - if ( - state not in (AirbyteState.SUCCEEDED, AirbyteState.ERROR, AirbyteState.CANCELLED) - and self.cancel_sync_on_run_termination + if cancel_on_termination and job.status not in ( + AirbyteJobStatusType.SUCCEEDED, + AirbyteJobStatusType.ERROR, + AirbyteJobStatusType.CANCELLED, ): - self.cancel_job(job_id) + self.cancel_job(job.id) - return AirbyteOutput(job_details=job_details, connection_details=connection_details) + return AirbyteOutput(job_details=poll_job_details, connection_details=connection_details) @experimental diff --git a/python_modules/libraries/dagster-airbyte/dagster_airbyte/translator.py b/python_modules/libraries/dagster-airbyte/dagster_airbyte/translator.py index df1e0e46945d2..f25817334549a 100644 --- a/python_modules/libraries/dagster-airbyte/dagster_airbyte/translator.py +++ b/python_modules/libraries/dagster-airbyte/dagster_airbyte/translator.py @@ -1,6 +1,7 @@ +from enum import Enum from typing import Any, List, Mapping, Optional, Sequence -from dagster._annotations import experimental +from dagster._annotations import deprecated, experimental from dagster._core.definitions.asset_key import AssetKey from dagster._core.definitions.asset_spec import AssetSpec from dagster._core.definitions.metadata.metadata_set import NamespacedMetadataSet, TableMetadataSet @@ -11,6 +12,27 @@ from dagster_airbyte.utils import generate_table_schema, get_airbyte_connection_table_name +class AirbyteJobStatusType(str, Enum): + RUNNING = "running" + SUCCEEDED = "succeeded" + CANCELLED = "cancelled" + PENDING = "pending" + FAILED = "failed" + ERROR = "error" + INCOMPLETE = "incomplete" + + +@deprecated(breaking_version="1.10", additional_warn_text="Use `AirbyteJobStatusType` instead.") +class AirbyteState: + RUNNING = AirbyteJobStatusType.RUNNING + SUCCEEDED = AirbyteJobStatusType.SUCCEEDED + CANCELLED = AirbyteJobStatusType.CANCELLED + PENDING = AirbyteJobStatusType.PENDING + FAILED = AirbyteJobStatusType.FAILED + ERROR = AirbyteJobStatusType.ERROR + INCOMPLETE = AirbyteJobStatusType.INCOMPLETE + + @record class AirbyteConnectionTableProps: table_name: str @@ -108,6 +130,25 @@ def from_stream_details( ) +@whitelist_for_serdes +@record +class AirbyteJob: + """Represents an Airbyte job, based on data as returned from the API.""" + + id: str + status: str + + @classmethod + def from_job_details( + cls, + job_details: Mapping[str, Any], + ) -> "AirbyteStream": + return cls( + id=job_details["id"], + status=job_details["status"], + ) + + @whitelist_for_serdes @record class AirbyteWorkspaceData: diff --git a/python_modules/libraries/dagster-airbyte/dagster_airbyte_tests/test_cloud_resources.py b/python_modules/libraries/dagster-airbyte/dagster_airbyte_tests/test_cloud_resources.py index 8d48369cdb124..e46e4daebcf38 100644 --- a/python_modules/libraries/dagster-airbyte/dagster_airbyte_tests/test_cloud_resources.py +++ b/python_modules/libraries/dagster-airbyte/dagster_airbyte_tests/test_cloud_resources.py @@ -6,7 +6,7 @@ import pytest import responses from dagster import Failure -from dagster_airbyte import AirbyteCloudResource, AirbyteOutput, AirbyteState +from dagster_airbyte import AirbyteCloudResource, AirbyteJobStatusType, AirbyteOutput @responses.activate @@ -50,7 +50,12 @@ def test_trigger_connection_fail() -> None: @responses.activate @pytest.mark.parametrize( "state", - [AirbyteState.SUCCEEDED, AirbyteState.CANCELLED, AirbyteState.ERROR, "unrecognized"], + [ + AirbyteJobStatusType.SUCCEEDED, + AirbyteJobStatusType.CANCELLED, + AirbyteJobStatusType.ERROR, + "unrecognized", + ], ) def test_sync_and_poll(state) -> None: ab_resource = AirbyteCloudResource( @@ -83,11 +88,11 @@ def test_sync_and_poll(state) -> None: json={"jobId": 1, "status": "cancelled", "jobType": "sync"}, ) - if state == AirbyteState.ERROR: + if state == AirbyteJobStatusType.ERROR: with pytest.raises(Failure, match="Job failed"): ab_resource.sync_and_poll("some_connection", 0) - elif state == AirbyteState.CANCELLED: + elif state == AirbyteJobStatusType.CANCELLED: with pytest.raises(Failure, match="Job was cancelled"): ab_resource.sync_and_poll("some_connection", 0) diff --git a/python_modules/libraries/dagster-airbyte/dagster_airbyte_tests/test_resources.py b/python_modules/libraries/dagster-airbyte/dagster_airbyte_tests/test_resources.py index 16ee2105fc324..a2be7ca3e49e3 100644 --- a/python_modules/libraries/dagster-airbyte/dagster_airbyte_tests/test_resources.py +++ b/python_modules/libraries/dagster-airbyte/dagster_airbyte_tests/test_resources.py @@ -10,7 +10,7 @@ build_init_resource_context, ) from dagster._core.definitions.metadata import MetadataValue -from dagster_airbyte import AirbyteOutput, AirbyteResource, AirbyteState, airbyte_resource +from dagster_airbyte import AirbyteJobStatusType, AirbyteOutput, AirbyteResource, airbyte_resource from dagster_airbyte.utils import generate_materializations from dagster_airbyte_tests.utils import ( @@ -65,7 +65,12 @@ def test_trigger_connection_fail( @responses.activate @pytest.mark.parametrize( "state", - [AirbyteState.SUCCEEDED, AirbyteState.CANCELLED, AirbyteState.ERROR, "unrecognized"], + [ + AirbyteJobStatusType.SUCCEEDED, + AirbyteJobStatusType.CANCELLED, + AirbyteJobStatusType.ERROR, + "unrecognized", + ], ) @pytest.mark.parametrize( "forward_logs", @@ -113,11 +118,11 @@ def test_sync_and_poll( if state == "unrecognized": responses.add(responses.POST, f"{ab_resource.api_base_url}/jobs/cancel", status=204) - if state == AirbyteState.ERROR: + if state == AirbyteJobStatusType.ERROR: with pytest.raises(Failure, match="Job failed"): ab_resource.sync_and_poll("some_connection", 0) - elif state == AirbyteState.CANCELLED: + elif state == AirbyteJobStatusType.CANCELLED: with pytest.raises(Failure, match="Job was cancelled"): ab_resource.sync_and_poll("some_connection", 0) @@ -295,7 +300,7 @@ def _get_attempt(ls): method=responses.POST, url=ab_resource.api_base_url + "/jobs/get", json={ - "job": {"id": 1, "status": AirbyteState.SUCCEEDED}, + "job": {"id": 1, "status": AirbyteJobStatusType.SUCCEEDED}, "attempts": [ _get_attempt(ls) for ls in [["log1a", "log1b", "log1c"], ["log2a", "log2b"]] ], diff --git a/python_modules/libraries/dagster-airbyte/dagster_airbyte_tests/utils.py b/python_modules/libraries/dagster-airbyte/dagster_airbyte_tests/utils.py index 629c6a53eb0f6..af6f732d36944 100644 --- a/python_modules/libraries/dagster-airbyte/dagster_airbyte_tests/utils.py +++ b/python_modules/libraries/dagster-airbyte/dagster_airbyte_tests/utils.py @@ -1,5 +1,5 @@ from dagster._utils.merger import deep_merge_dicts -from dagster_airbyte import AirbyteState +from dagster_airbyte import AirbyteJobStatusType def get_sample_connection_json(stream_prefix="", **kwargs): @@ -59,7 +59,7 @@ def get_sample_connection_json(stream_prefix="", **kwargs): def get_sample_job_json(schema_prefix=""): return { - "job": {"id": 1, "status": AirbyteState.SUCCEEDED}, + "job": {"id": 1, "status": AirbyteJobStatusType.SUCCEEDED}, "attempts": [ { "attempt": { @@ -89,7 +89,7 @@ def get_sample_job_list_json(schema_prefix=""): return { "jobs": [ { - "job": {"id": 1, "status": AirbyteState.SUCCEEDED}, + "job": {"id": 1, "status": AirbyteJobStatusType.SUCCEEDED}, "attempts": [ { "streamStats": [ @@ -371,7 +371,7 @@ def get_project_connection_json(**kwargs): def get_project_job_json(): return { - "job": {"id": 1, "status": AirbyteState.SUCCEEDED}, + "job": {"id": 1, "status": AirbyteJobStatusType.SUCCEEDED}, "attempts": [ { "attempt": { From 3c76dca71e8f6d5d6058611fbab9798625d69a90 Mon Sep 17 00:00:00 2001 From: Maxime Armstrong Date: Tue, 17 Dec 2024 15:47:41 -0500 Subject: [PATCH 3/6] Add tests --- .../dagster_airbyte/resources.py | 17 +- .../dagster_airbyte/translator.py | 6 +- .../experimental/conftest.py | 30 ++- .../experimental/test_resources.py | 228 +++++++++++++++++- 4 files changed, 259 insertions(+), 22 deletions(-) diff --git a/python_modules/libraries/dagster-airbyte/dagster_airbyte/resources.py b/python_modules/libraries/dagster-airbyte/dagster_airbyte/resources.py index 2fb16db3d21c9..c0cf0b97dd55b 100644 --- a/python_modules/libraries/dagster-airbyte/dagster_airbyte/resources.py +++ b/python_modules/libraries/dagster-airbyte/dagster_airbyte/resources.py @@ -1048,20 +1048,24 @@ def sync_and_poll( Details of the sync job. """ connection_details = self.get_connection_details(connection_id) - start_job_details = self.start_sync(connection_id) + start_job_details = self.start_sync_job(connection_id) job = AirbyteJob.from_job_details(job_details=start_job_details) self._log.info(f"Job {job.id} initialized for connection_id={connection_id}.") - start = time.monotonic() + poll_start = datetime.now() + poll_interval = ( + poll_interval if poll_interval is not None else DEFAULT_POLL_INTERVAL_SECONDS + ) try: while True: - if poll_timeout and start + poll_timeout < time.monotonic(): + if poll_timeout and datetime.now() > poll_start + timedelta(seconds=poll_timeout): raise Failure( f"Timeout: Airbyte job {job.id} is not ready after the timeout" f" {poll_timeout} seconds" ) - time.sleep(poll_interval or self.poll_interval) - poll_job_details = self.get_job_status(connection_id, job.id) + + time.sleep(poll_interval) + poll_job_details = self.get_job_details(job.id) job = AirbyteJob.from_job_details(job_details=poll_job_details) if job.status in ( AirbyteJobStatusType.RUNNING, @@ -1071,7 +1075,7 @@ def sync_and_poll( continue elif job.status == AirbyteJobStatusType.SUCCEEDED: break - elif job.status == AirbyteJobStatusType.ERROR: + elif job.status in [AirbyteJobStatusType.ERROR, AirbyteJobStatusType.FAILED]: raise Failure(f"Job failed: {job.id}") elif job.status == AirbyteJobStatusType.CANCELLED: raise Failure(f"Job was cancelled: {job.id}") @@ -1086,6 +1090,7 @@ def sync_and_poll( AirbyteJobStatusType.SUCCEEDED, AirbyteJobStatusType.ERROR, AirbyteJobStatusType.CANCELLED, + AirbyteJobStatusType.FAILED, ): self.cancel_job(job.id) diff --git a/python_modules/libraries/dagster-airbyte/dagster_airbyte/translator.py b/python_modules/libraries/dagster-airbyte/dagster_airbyte/translator.py index f25817334549a..1347b1d24af4e 100644 --- a/python_modules/libraries/dagster-airbyte/dagster_airbyte/translator.py +++ b/python_modules/libraries/dagster-airbyte/dagster_airbyte/translator.py @@ -135,16 +135,16 @@ def from_stream_details( class AirbyteJob: """Represents an Airbyte job, based on data as returned from the API.""" - id: str + id: int status: str @classmethod def from_job_details( cls, job_details: Mapping[str, Any], - ) -> "AirbyteStream": + ) -> "AirbyteJob": return cls( - id=job_details["id"], + id=job_details["jobId"], status=job_details["status"], ) diff --git a/python_modules/libraries/dagster-airbyte/dagster_airbyte_tests/experimental/conftest.py b/python_modules/libraries/dagster-airbyte/dagster_airbyte_tests/experimental/conftest.py index 3b67382ab3706..8d3169402d399 100644 --- a/python_modules/libraries/dagster-airbyte/dagster_airbyte_tests/experimental/conftest.py +++ b/python_modules/libraries/dagster-airbyte/dagster_airbyte_tests/experimental/conftest.py @@ -1,4 +1,4 @@ -from typing import Iterator +from typing import Any, Iterator, Mapping import pytest import responses @@ -8,7 +8,7 @@ AIRBYTE_REST_API_BASE, AIRBYTE_REST_API_VERSION, ) -from dagster_airbyte.translator import AirbyteConnectionTableProps +from dagster_airbyte.translator import AirbyteConnectionTableProps, AirbyteJobStatusType TEST_WORKSPACE_ID = "some_workspace_id" TEST_CLIENT_ID = "some_client_id" @@ -29,6 +29,8 @@ TEST_JSON_SCHEMA = {} TEST_JOB_ID = 12345 +TEST_UNRECOGNIZED_AIRBYTE_JOB_STATUS_TYPE = "unrecognized" + TEST_AIRBYTE_CONNECTION_TABLE_PROPS = AirbyteConnectionTableProps( table_name=f"{TEST_STREAM_PREFIX}{TEST_STREAM_NAME}", stream_prefix=TEST_STREAM_PREFIX, @@ -165,13 +167,17 @@ # Taken from Airbyte REST API documentation # https://reference.airbyte.com/reference/getjob -SAMPLE_JOB_RESPONSE = { - "jobId": TEST_JOB_ID, - "status": "running", - "jobType": "sync", - "startTime": "2023-03-25T01:30:50Z", - "connectionId": TEST_CONNECTION_ID, -} +def get_job_details_sample(status: str) -> Mapping[str, Any]: + return { + "jobId": TEST_JOB_ID, + "status": status, + "jobType": "sync", + "startTime": "2023-03-25T01:30:50Z", + "connectionId": TEST_CONNECTION_ID, + } + + +SAMPLE_JOB_RESPONSE_RUNNING = get_job_details_sample(status=AirbyteJobStatusType.RUNNING) @pytest.fixture( @@ -224,19 +230,19 @@ def all_api_mocks_fixture( fetch_workspace_data_api_mocks.add( method=responses.POST, url=f"{AIRBYTE_REST_API_BASE}/{AIRBYTE_REST_API_VERSION}/jobs", - json=SAMPLE_JOB_RESPONSE, + json=SAMPLE_JOB_RESPONSE_RUNNING, status=200, ) fetch_workspace_data_api_mocks.add( method=responses.GET, url=f"{AIRBYTE_REST_API_BASE}/{AIRBYTE_REST_API_VERSION}/jobs/{TEST_JOB_ID}", - json=SAMPLE_JOB_RESPONSE, + json=SAMPLE_JOB_RESPONSE_RUNNING, status=200, ) fetch_workspace_data_api_mocks.add( method=responses.DELETE, url=f"{AIRBYTE_REST_API_BASE}/{AIRBYTE_REST_API_VERSION}/jobs/{TEST_JOB_ID}", - json=SAMPLE_JOB_RESPONSE, + json=SAMPLE_JOB_RESPONSE_RUNNING, status=200, ) yield fetch_workspace_data_api_mocks diff --git a/python_modules/libraries/dagster-airbyte/dagster_airbyte_tests/experimental/test_resources.py b/python_modules/libraries/dagster-airbyte/dagster_airbyte_tests/experimental/test_resources.py index a249a5902f32b..35f6c690ab254 100644 --- a/python_modules/libraries/dagster-airbyte/dagster_airbyte_tests/experimental/test_resources.py +++ b/python_modules/libraries/dagster-airbyte/dagster_airbyte_tests/experimental/test_resources.py @@ -3,7 +3,9 @@ from typing import Optional from unittest import mock +import pytest import responses +from dagster import Failure from dagster_airbyte import AirbyteCloudWorkspace from dagster_airbyte.resources import ( AIRBYTE_CONFIGURATION_API_BASE, @@ -11,15 +13,20 @@ AIRBYTE_REST_API_BASE, AIRBYTE_REST_API_VERSION, ) +from dagster_airbyte.translator import AirbyteJobStatusType +from dagster_airbyte.types import AirbyteOutput from dagster_airbyte_tests.experimental.conftest import ( + SAMPLE_CONNECTION_DETAILS, TEST_ACCESS_TOKEN, TEST_CLIENT_ID, TEST_CLIENT_SECRET, TEST_CONNECTION_ID, TEST_DESTINATION_ID, TEST_JOB_ID, + TEST_UNRECOGNIZED_AIRBYTE_JOB_STATUS_TYPE, TEST_WORKSPACE_ID, + get_job_details_sample, ) @@ -36,11 +43,18 @@ def assert_token_call_and_split_calls(calls: responses.CallList): return calls[1:] -def assert_rest_api_call(call: responses.Call, endpoint: str, object_id: Optional[str] = None): +def assert_rest_api_call( + call: responses.Call, + endpoint: str, + object_id: Optional[str] = None, + method: Optional[str] = None, +): rest_api_url = call.request.url.split("?")[0] assert rest_api_url == f"{AIRBYTE_REST_API_BASE}/{AIRBYTE_REST_API_VERSION}/{endpoint}" if object_id: assert object_id in call.request.body.decode() + if method: + assert method == call.request.method assert call.request.headers["Authorization"] == f"Bearer {TEST_ACCESS_TOKEN}" @@ -144,3 +158,215 @@ def test_basic_resource_request( assert_rest_api_call(call=api_calls[3], endpoint="jobs", object_id=TEST_CONNECTION_ID) assert_rest_api_call(call=api_calls[4], endpoint=f"jobs/{TEST_JOB_ID}") assert_rest_api_call(call=api_calls[5], endpoint=f"jobs/{TEST_JOB_ID}") + + +@pytest.mark.parametrize( + "status", + [ + AirbyteJobStatusType.SUCCEEDED, + AirbyteJobStatusType.CANCELLED, + AirbyteJobStatusType.ERROR, + AirbyteJobStatusType.FAILED, + TEST_UNRECOGNIZED_AIRBYTE_JOB_STATUS_TYPE, + ], +) +def test_airbyte_sync_and_poll_client_job_status( + status: str, base_api_mocks: responses.RequestsMock +) -> None: + resource = AirbyteCloudWorkspace( + workspace_id=TEST_WORKSPACE_ID, + client_id=TEST_CLIENT_ID, + client_secret=TEST_CLIENT_SECRET, + ) + client = resource.get_client() + + test_job_endpoint = f"jobs/{TEST_JOB_ID}" + test_job_api_url = f"{AIRBYTE_REST_API_BASE}/{AIRBYTE_REST_API_VERSION}/{test_job_endpoint}" + + # Create mock responses to mock full sync and poll behavior to test statuses, used only in this test + base_api_mocks.add( + method=responses.POST, + url=f"{AIRBYTE_REST_API_BASE}/{AIRBYTE_REST_API_VERSION}/jobs", + json=get_job_details_sample(status=AirbyteJobStatusType.PENDING), + status=200, + ) + base_api_mocks.add( + method=responses.GET, + url=test_job_api_url, + json=get_job_details_sample(status=status), + status=200, + ) + base_api_mocks.add( + method=responses.POST, + url=f"{AIRBYTE_CONFIGURATION_API_BASE}/{AIRBYTE_CONFIGURATION_API_VERSION}/connections/get", + json=SAMPLE_CONNECTION_DETAILS, + status=200, + ) + + if status == TEST_UNRECOGNIZED_AIRBYTE_JOB_STATUS_TYPE: + base_api_mocks.add( + method=responses.DELETE, + url=test_job_api_url, + status=200, + json=get_job_details_sample(status=AirbyteJobStatusType.CANCELLED), + ) + + if status in [AirbyteJobStatusType.ERROR, AirbyteJobStatusType.FAILED]: + with pytest.raises(Failure, match="Job failed"): + client.sync_and_poll(connection_id=TEST_CONNECTION_ID, poll_interval=0) + + elif status == AirbyteJobStatusType.CANCELLED: + with pytest.raises(Failure, match="Job was cancelled"): + client.sync_and_poll(connection_id=TEST_CONNECTION_ID, poll_interval=0) + + elif status == TEST_UNRECOGNIZED_AIRBYTE_JOB_STATUS_TYPE: + with pytest.raises(Failure, match="unexpected state"): + client.sync_and_poll(connection_id=TEST_CONNECTION_ID, poll_interval=0) + assert_rest_api_call( + call=base_api_mocks.calls[-1], endpoint=test_job_endpoint, method=responses.DELETE + ) + + else: + result = client.sync_and_poll(connection_id=TEST_CONNECTION_ID, poll_interval=0) + assert result == AirbyteOutput( + job_details=get_job_details_sample(AirbyteJobStatusType.SUCCEEDED), + connection_details=SAMPLE_CONNECTION_DETAILS, + ) + + +@pytest.mark.parametrize( + "n_polls, succeed_at_end", + [ + (0, True), + (0, False), + (4, True), + (4, False), + (30, True), + ], + ids=[ + "sync_short_success", + "sync_short_failure", + "sync_medium_success", + "sync_medium_failure", + "sync_long_success", + ], +) +def test_airbyte_sync_and_poll_client_poll_process( + n_polls, succeed_at_end, base_api_mocks: responses.RequestsMock +): + resource = AirbyteCloudWorkspace( + workspace_id=TEST_WORKSPACE_ID, + client_id=TEST_CLIENT_ID, + client_secret=TEST_CLIENT_SECRET, + ) + client = resource.get_client() + + # Create mock responses to mock full sync and poll behavior, used only in this test + def _mock_interaction(): + # initial state + base_api_mocks.add( + method=responses.POST, + url=f"{AIRBYTE_REST_API_BASE}/{AIRBYTE_REST_API_VERSION}/jobs", + json=get_job_details_sample(status=AirbyteJobStatusType.PENDING), + status=200, + ) + base_api_mocks.add( + method=responses.POST, + url=f"{AIRBYTE_CONFIGURATION_API_BASE}/{AIRBYTE_CONFIGURATION_API_VERSION}/connections/get", + json=SAMPLE_CONNECTION_DETAILS, + status=200, + ) + # n polls before updating + for _ in range(n_polls): + base_api_mocks.add( + method=responses.GET, + url=f"{AIRBYTE_REST_API_BASE}/{AIRBYTE_REST_API_VERSION}/jobs/{TEST_JOB_ID}", + json=get_job_details_sample(status=AirbyteJobStatusType.RUNNING), + status=200, + ) + # final state will be updated + base_api_mocks.add( + method=responses.GET, + url=f"{AIRBYTE_REST_API_BASE}/{AIRBYTE_REST_API_VERSION}/jobs/{TEST_JOB_ID}", + json=get_job_details_sample( + status=AirbyteJobStatusType.SUCCEEDED + if succeed_at_end + else AirbyteJobStatusType.FAILED + ), + status=200, + ) + return client.sync_and_poll(connection_id=TEST_CONNECTION_ID, poll_interval=0.1) + + if succeed_at_end: + assert _mock_interaction() == AirbyteOutput( + job_details=get_job_details_sample(AirbyteJobStatusType.SUCCEEDED), + connection_details=SAMPLE_CONNECTION_DETAILS, + ) + else: + with pytest.raises(Failure, match="Job failed"): + _mock_interaction() + + +@pytest.mark.parametrize( + "cancel_on_termination", + [ + True, + False, + ], +) +def test_airbyte_sync_and_poll_client_cancel_on_termination( + cancel_on_termination: bool, base_api_mocks: responses.RequestsMock +) -> None: + resource = AirbyteCloudWorkspace( + workspace_id=TEST_WORKSPACE_ID, + client_id=TEST_CLIENT_ID, + client_secret=TEST_CLIENT_SECRET, + ) + client = resource.get_client() + + test_job_endpoint = f"jobs/{TEST_JOB_ID}" + test_job_api_url = f"{AIRBYTE_REST_API_BASE}/{AIRBYTE_REST_API_VERSION}/{test_job_endpoint}" + + # Create mock responses to mock full sync and poll behavior to test statuses, used only in this test + base_api_mocks.add( + method=responses.POST, + url=f"{AIRBYTE_REST_API_BASE}/{AIRBYTE_REST_API_VERSION}/jobs", + json=get_job_details_sample(status=AirbyteJobStatusType.PENDING), + status=200, + ) + base_api_mocks.add( + method=responses.GET, + url=test_job_api_url, + json=get_job_details_sample(status=TEST_UNRECOGNIZED_AIRBYTE_JOB_STATUS_TYPE), + status=200, + ) + base_api_mocks.add( + method=responses.POST, + url=f"{AIRBYTE_CONFIGURATION_API_BASE}/{AIRBYTE_CONFIGURATION_API_VERSION}/connections/get", + json=SAMPLE_CONNECTION_DETAILS, + status=200, + ) + + if cancel_on_termination: + base_api_mocks.add( + method=responses.DELETE, + url=test_job_api_url, + status=200, + json=get_job_details_sample(status=AirbyteJobStatusType.CANCELLED), + ) + + with pytest.raises(Failure, match="unexpected state"): + client.sync_and_poll( + connection_id=TEST_CONNECTION_ID, + poll_interval=0, + cancel_on_termination=cancel_on_termination, + ) + if cancel_on_termination: + assert_rest_api_call( + call=base_api_mocks.calls[-1], endpoint=test_job_endpoint, method=responses.DELETE + ) + else: + # If we don't cancel on termination, the last call will be a call to fetch the job details + assert_rest_api_call( + call=base_api_mocks.calls[-1], endpoint=test_job_endpoint, method=responses.GET + ) From 875d62f30e08f5915db6367b0ffa950ed3d86935 Mon Sep 17 00:00:00 2001 From: Maxime Armstrong Date: Tue, 17 Dec 2024 20:04:21 -0500 Subject: [PATCH 4/6] Update tests --- .../experimental/test_resources.py | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/python_modules/libraries/dagster-airbyte/dagster_airbyte_tests/experimental/test_resources.py b/python_modules/libraries/dagster-airbyte/dagster_airbyte_tests/experimental/test_resources.py index 35f6c690ab254..37a618041eb66 100644 --- a/python_modules/libraries/dagster-airbyte/dagster_airbyte_tests/experimental/test_resources.py +++ b/python_modules/libraries/dagster-airbyte/dagster_airbyte_tests/experimental/test_resources.py @@ -169,6 +169,13 @@ def test_basic_resource_request( AirbyteJobStatusType.FAILED, TEST_UNRECOGNIZED_AIRBYTE_JOB_STATUS_TYPE, ], + ids=[ + "job_status_succeeded", + "job_status_cancelled", + "job_status_error", + "job_status_failed", + "job_status_unrecognized", + ], ) def test_airbyte_sync_and_poll_client_job_status( status: str, base_api_mocks: responses.RequestsMock @@ -313,6 +320,10 @@ def _mock_interaction(): True, False, ], + ids=[ + "cancel_on_termination_true", + "cancel_on_termination_false", + ], ) def test_airbyte_sync_and_poll_client_cancel_on_termination( cancel_on_termination: bool, base_api_mocks: responses.RequestsMock From 3c04d0b652a2aedfcb57c27c8c2bcf0d10f7fa29 Mon Sep 17 00:00:00 2001 From: Maxime Armstrong Date: Wed, 18 Dec 2024 17:34:18 -0500 Subject: [PATCH 5/6] Update tests --- .../experimental/test_resources.py | 92 ++++++++----------- .../experimental/utils.py | 18 ++++ 2 files changed, 56 insertions(+), 54 deletions(-) create mode 100644 python_modules/libraries/dagster-airbyte/dagster_airbyte_tests/experimental/utils.py diff --git a/python_modules/libraries/dagster-airbyte/dagster_airbyte_tests/experimental/test_resources.py b/python_modules/libraries/dagster-airbyte/dagster_airbyte_tests/experimental/test_resources.py index 37a618041eb66..30e88ec95be59 100644 --- a/python_modules/libraries/dagster-airbyte/dagster_airbyte_tests/experimental/test_resources.py +++ b/python_modules/libraries/dagster-airbyte/dagster_airbyte_tests/experimental/test_resources.py @@ -28,6 +28,7 @@ TEST_WORKSPACE_ID, get_job_details_sample, ) +from dagster_airbyte_tests.experimental.utils import optional_pytest_raise def assert_token_call_and_split_calls(calls: responses.CallList): @@ -161,13 +162,13 @@ def test_basic_resource_request( @pytest.mark.parametrize( - "status", + "status, error_expected, exception_message", [ - AirbyteJobStatusType.SUCCEEDED, - AirbyteJobStatusType.CANCELLED, - AirbyteJobStatusType.ERROR, - AirbyteJobStatusType.FAILED, - TEST_UNRECOGNIZED_AIRBYTE_JOB_STATUS_TYPE, + (AirbyteJobStatusType.SUCCEEDED, False, None), + (AirbyteJobStatusType.CANCELLED, True, "Job was cancelled"), + (AirbyteJobStatusType.ERROR, True, "Job failed"), + (AirbyteJobStatusType.FAILED, True, "Job failed"), + (TEST_UNRECOGNIZED_AIRBYTE_JOB_STATUS_TYPE, True, "unexpected state"), ], ids=[ "job_status_succeeded", @@ -178,7 +179,10 @@ def test_basic_resource_request( ], ) def test_airbyte_sync_and_poll_client_job_status( - status: str, base_api_mocks: responses.RequestsMock + status: str, + error_expected: bool, + exception_message: str, + base_api_mocks: responses.RequestsMock, ) -> None: resource = AirbyteCloudWorkspace( workspace_id=TEST_WORKSPACE_ID, @@ -210,31 +214,14 @@ def test_airbyte_sync_and_poll_client_job_status( status=200, ) - if status == TEST_UNRECOGNIZED_AIRBYTE_JOB_STATUS_TYPE: - base_api_mocks.add( - method=responses.DELETE, - url=test_job_api_url, - status=200, - json=get_job_details_sample(status=AirbyteJobStatusType.CANCELLED), + with optional_pytest_raise( + error_expected=error_expected, exception_cls=Failure, exception_message=exception_message + ): + result = client.sync_and_poll( + connection_id=TEST_CONNECTION_ID, poll_interval=0, cancel_on_termination=False ) - if status in [AirbyteJobStatusType.ERROR, AirbyteJobStatusType.FAILED]: - with pytest.raises(Failure, match="Job failed"): - client.sync_and_poll(connection_id=TEST_CONNECTION_ID, poll_interval=0) - - elif status == AirbyteJobStatusType.CANCELLED: - with pytest.raises(Failure, match="Job was cancelled"): - client.sync_and_poll(connection_id=TEST_CONNECTION_ID, poll_interval=0) - - elif status == TEST_UNRECOGNIZED_AIRBYTE_JOB_STATUS_TYPE: - with pytest.raises(Failure, match="unexpected state"): - client.sync_and_poll(connection_id=TEST_CONNECTION_ID, poll_interval=0) - assert_rest_api_call( - call=base_api_mocks.calls[-1], endpoint=test_job_endpoint, method=responses.DELETE - ) - - else: - result = client.sync_and_poll(connection_id=TEST_CONNECTION_ID, poll_interval=0) + if not error_expected: assert result == AirbyteOutput( job_details=get_job_details_sample(AirbyteJobStatusType.SUCCEEDED), connection_details=SAMPLE_CONNECTION_DETAILS, @@ -242,13 +229,13 @@ def test_airbyte_sync_and_poll_client_job_status( @pytest.mark.parametrize( - "n_polls, succeed_at_end", + "n_polls, error_expected", [ - (0, True), (0, False), - (4, True), + (0, True), (4, False), - (30, True), + (4, True), + (30, False), ], ids=[ "sync_short_success", @@ -259,7 +246,7 @@ def test_airbyte_sync_and_poll_client_job_status( ], ) def test_airbyte_sync_and_poll_client_poll_process( - n_polls, succeed_at_end, base_api_mocks: responses.RequestsMock + n_polls: int, error_expected: bool, base_api_mocks: responses.RequestsMock ): resource = AirbyteCloudWorkspace( workspace_id=TEST_WORKSPACE_ID, @@ -297,28 +284,30 @@ def _mock_interaction(): url=f"{AIRBYTE_REST_API_BASE}/{AIRBYTE_REST_API_VERSION}/jobs/{TEST_JOB_ID}", json=get_job_details_sample( status=AirbyteJobStatusType.SUCCEEDED - if succeed_at_end + if not error_expected else AirbyteJobStatusType.FAILED ), status=200, ) return client.sync_and_poll(connection_id=TEST_CONNECTION_ID, poll_interval=0.1) - if succeed_at_end: - assert _mock_interaction() == AirbyteOutput( + with optional_pytest_raise( + error_expected=error_expected, exception_cls=Failure, exception_message="Job failed" + ): + result = _mock_interaction() + + if not error_expected: + assert result == AirbyteOutput( job_details=get_job_details_sample(AirbyteJobStatusType.SUCCEEDED), connection_details=SAMPLE_CONNECTION_DETAILS, ) - else: - with pytest.raises(Failure, match="Job failed"): - _mock_interaction() @pytest.mark.parametrize( - "cancel_on_termination", + "cancel_on_termination, last_call_method", [ - True, - False, + (True, responses.DELETE), + (False, responses.GET), ], ids=[ "cancel_on_termination_true", @@ -326,7 +315,7 @@ def _mock_interaction(): ], ) def test_airbyte_sync_and_poll_client_cancel_on_termination( - cancel_on_termination: bool, base_api_mocks: responses.RequestsMock + cancel_on_termination: bool, last_call_method: str, base_api_mocks: responses.RequestsMock ) -> None: resource = AirbyteCloudWorkspace( workspace_id=TEST_WORKSPACE_ID, @@ -372,12 +361,7 @@ def test_airbyte_sync_and_poll_client_cancel_on_termination( poll_interval=0, cancel_on_termination=cancel_on_termination, ) - if cancel_on_termination: - assert_rest_api_call( - call=base_api_mocks.calls[-1], endpoint=test_job_endpoint, method=responses.DELETE - ) - else: - # If we don't cancel on termination, the last call will be a call to fetch the job details - assert_rest_api_call( - call=base_api_mocks.calls[-1], endpoint=test_job_endpoint, method=responses.GET - ) + + assert_rest_api_call( + call=base_api_mocks.calls[-1], endpoint=test_job_endpoint, method=last_call_method + ) diff --git a/python_modules/libraries/dagster-airbyte/dagster_airbyte_tests/experimental/utils.py b/python_modules/libraries/dagster-airbyte/dagster_airbyte_tests/experimental/utils.py new file mode 100644 index 0000000000000..4a371156bf561 --- /dev/null +++ b/python_modules/libraries/dagster-airbyte/dagster_airbyte_tests/experimental/utils.py @@ -0,0 +1,18 @@ +from contextlib import contextmanager +from typing import Optional, Type + +import pytest + + +@contextmanager +def optional_pytest_raise( + error_expected: bool, exception_cls: Type[Exception], exception_message: Optional[str] = None +): + if error_expected: + kwargs = {} + if exception_message: + kwargs["match"] = exception_message + with pytest.raises(exception_cls, **kwargs): + yield + else: + yield From fa564847cff9e1ee39e341a71eb9017be48552aa Mon Sep 17 00:00:00 2001 From: Maxime Armstrong Date: Wed, 18 Dec 2024 17:38:33 -0500 Subject: [PATCH 6/6] Add comment in sync_and_poll --- .../libraries/dagster-airbyte/dagster_airbyte/resources.py | 1 + 1 file changed, 1 insertion(+) diff --git a/python_modules/libraries/dagster-airbyte/dagster_airbyte/resources.py b/python_modules/libraries/dagster-airbyte/dagster_airbyte/resources.py index c0cf0b97dd55b..33c30737640ed 100644 --- a/python_modules/libraries/dagster-airbyte/dagster_airbyte/resources.py +++ b/python_modules/libraries/dagster-airbyte/dagster_airbyte/resources.py @@ -1065,6 +1065,7 @@ def sync_and_poll( ) time.sleep(poll_interval) + # We return these job details in the AirbyteOutput when the job succeeds poll_job_details = self.get_job_details(job.id) job = AirbyteJob.from_job_details(job_details=poll_job_details) if job.status in (