Skip to content

Commit

Permalink
[12/n][dagster-airbyte] Implement sync_and_poll method in AirbyteClou…
Browse files Browse the repository at this point in the history
…dClient (#26431)

## Summary & Motivation

Implement full sync and poll process in `AirbyteCloudClient`. This will
be used in a subsequent PR in `AirbyteCloudWorkspace.sync_and_poll` to
materialize Airbyte assets.

## How I Tested These Changes

Additional tests with BK
  • Loading branch information
maximearmstrong authored Dec 26, 2024
1 parent 3dd22ec commit 2384e5b
Show file tree
Hide file tree
Showing 9 changed files with 432 additions and 49 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,15 @@
AirbyteCloudResource as AirbyteCloudResource,
AirbyteCloudWorkspace as AirbyteCloudWorkspace,
AirbyteResource as AirbyteResource,
AirbyteState as AirbyteState,
airbyte_cloud_resource as airbyte_cloud_resource,
airbyte_resource as airbyte_resource,
load_airbyte_cloud_asset_specs as load_airbyte_cloud_asset_specs,
)
from dagster_airbyte.translator import DagsterAirbyteTranslator as DagsterAirbyteTranslator
from dagster_airbyte.translator import (
AirbyteJobStatusType as AirbyteJobStatusType,
AirbyteState as AirbyteState,
DagsterAirbyteTranslator as DagsterAirbyteTranslator,
)
from dagster_airbyte.types import AirbyteOutput as AirbyteOutput
from dagster_airbyte.version import __version__ as __version__

Expand Down
124 changes: 104 additions & 20 deletions python_modules/libraries/dagster-airbyte/dagster_airbyte/resources.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@
from dagster_airbyte.translator import (
AirbyteConnection,
AirbyteDestination,
AirbyteJob,
AirbyteJobStatusType,
AirbyteWorkspaceData,
DagsterAirbyteTranslator,
)
Expand All @@ -53,16 +55,6 @@
AIRBYTE_RECONSTRUCTION_METADATA_KEY_PREFIX = "dagster-airbyte/reconstruction_metadata"


class AirbyteState:
RUNNING = "running"
SUCCEEDED = "succeeded"
CANCELLED = "cancelled"
PENDING = "pending"
FAILED = "failed"
ERROR = "error"
INCOMPLETE = "incomplete"


class AirbyteResourceState:
def __init__(self) -> None:
self.request_cache: Dict[str, Optional[Mapping[str, object]]] = {}
Expand Down Expand Up @@ -252,21 +244,30 @@ def sync_and_poll(
job_info = cast(Dict[str, object], job_details.get("job", {}))
state = job_info.get("status")

if state in (AirbyteState.RUNNING, AirbyteState.PENDING, AirbyteState.INCOMPLETE):
if state in (
AirbyteJobStatusType.RUNNING,
AirbyteJobStatusType.PENDING,
AirbyteJobStatusType.INCOMPLETE,
):
continue
elif state == AirbyteState.SUCCEEDED:
elif state == AirbyteJobStatusType.SUCCEEDED:
break
elif state == AirbyteState.ERROR:
elif state == AirbyteJobStatusType.ERROR:
raise Failure(f"Job failed: {job_id}")
elif state == AirbyteState.CANCELLED:
elif state == AirbyteJobStatusType.CANCELLED:
raise Failure(f"Job was cancelled: {job_id}")
else:
raise Failure(f"Encountered unexpected state `{state}` for job_id {job_id}")
finally:
# if Airbyte sync has not completed, make sure to cancel it so that it doesn't outlive
# the python process
if (
state not in (AirbyteState.SUCCEEDED, AirbyteState.ERROR, AirbyteState.CANCELLED)
state
not in (
AirbyteJobStatusType.SUCCEEDED,
AirbyteJobStatusType.ERROR,
AirbyteJobStatusType.CANCELLED,
)
and self.cancel_sync_on_run_termination
):
self.cancel_job(job_id)
Expand Down Expand Up @@ -742,21 +743,30 @@ def sync_and_poll(
job_info = cast(Dict[str, object], job_details.get("job", {}))
state = job_info.get("status")

if state in (AirbyteState.RUNNING, AirbyteState.PENDING, AirbyteState.INCOMPLETE):
if state in (
AirbyteJobStatusType.RUNNING,
AirbyteJobStatusType.PENDING,
AirbyteJobStatusType.INCOMPLETE,
):
continue
elif state == AirbyteState.SUCCEEDED:
elif state == AirbyteJobStatusType.SUCCEEDED:
break
elif state == AirbyteState.ERROR:
elif state == AirbyteJobStatusType.ERROR:
raise Failure(f"Job failed: {job_id}")
elif state == AirbyteState.CANCELLED:
elif state == AirbyteJobStatusType.CANCELLED:
raise Failure(f"Job was cancelled: {job_id}")
else:
raise Failure(f"Encountered unexpected state `{state}` for job_id {job_id}")
finally:
# if Airbyte sync has not completed, make sure to cancel it so that it doesn't outlive
# the python process
if (
state not in (AirbyteState.SUCCEEDED, AirbyteState.ERROR, AirbyteState.CANCELLED)
state
not in (
AirbyteJobStatusType.SUCCEEDED,
AirbyteJobStatusType.ERROR,
AirbyteJobStatusType.CANCELLED,
)
and self.cancel_sync_on_run_termination
):
self.cancel_job(job_id)
Expand Down Expand Up @@ -1013,6 +1023,80 @@ def cancel_job(self, job_id: int) -> Mapping[str, Any]:
method="DELETE", endpoint=f"jobs/{job_id}", base_url=self.rest_api_base_url
)

def sync_and_poll(
self,
connection_id: str,
poll_interval: Optional[float] = None,
poll_timeout: Optional[float] = None,
cancel_on_termination: bool = True,
) -> AirbyteOutput:
"""Initializes a sync operation for the given connection, and polls until it completes.
Args:
connection_id (str): The Airbyte Connection ID. You can retrieve this value from the
"Connection" tab of a given connection in the Airbyte UI.
poll_interval (float): The time (in seconds) that will be waited between successive polls.
poll_timeout (float): The maximum time that will wait before this operation is timed
out. By default, this will never time out.
cancel_on_termination (bool): Whether to cancel a sync in Airbyte if the Dagster runner is terminated.
This may be useful to disable if using Airbyte sources that cannot be cancelled and
resumed easily, or if your Dagster deployment may experience runner interruptions
that do not impact your Airbyte deployment.
Returns:
:py:class:`~AirbyteOutput`:
Details of the sync job.
"""
connection_details = self.get_connection_details(connection_id)
start_job_details = self.start_sync_job(connection_id)
job = AirbyteJob.from_job_details(job_details=start_job_details)

self._log.info(f"Job {job.id} initialized for connection_id={connection_id}.")
poll_start = datetime.now()
poll_interval = (
poll_interval if poll_interval is not None else DEFAULT_POLL_INTERVAL_SECONDS
)
try:
while True:
if poll_timeout and datetime.now() > poll_start + timedelta(seconds=poll_timeout):
raise Failure(
f"Timeout: Airbyte job {job.id} is not ready after the timeout"
f" {poll_timeout} seconds"
)

time.sleep(poll_interval)
# We return these job details in the AirbyteOutput when the job succeeds
poll_job_details = self.get_job_details(job.id)
job = AirbyteJob.from_job_details(job_details=poll_job_details)
if job.status in (
AirbyteJobStatusType.RUNNING,
AirbyteJobStatusType.PENDING,
AirbyteJobStatusType.INCOMPLETE,
):
continue
elif job.status == AirbyteJobStatusType.SUCCEEDED:
break
elif job.status in [AirbyteJobStatusType.ERROR, AirbyteJobStatusType.FAILED]:
raise Failure(f"Job failed: {job.id}")
elif job.status == AirbyteJobStatusType.CANCELLED:
raise Failure(f"Job was cancelled: {job.id}")
else:
raise Failure(
f"Encountered unexpected state `{job.status}` for job_id {job.id}"
)
finally:
# if Airbyte sync has not completed, make sure to cancel it so that it doesn't outlive
# the python process
if cancel_on_termination and job.status not in (
AirbyteJobStatusType.SUCCEEDED,
AirbyteJobStatusType.ERROR,
AirbyteJobStatusType.CANCELLED,
AirbyteJobStatusType.FAILED,
):
self.cancel_job(job.id)

return AirbyteOutput(job_details=poll_job_details, connection_details=connection_details)


@experimental
class AirbyteCloudWorkspace(ConfigurableResource):
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from enum import Enum
from typing import Any, List, Mapping, Optional, Sequence

from dagster._annotations import experimental
from dagster._annotations import deprecated, experimental
from dagster._core.definitions.asset_key import AssetKey
from dagster._core.definitions.asset_spec import AssetSpec
from dagster._core.definitions.metadata.metadata_set import NamespacedMetadataSet, TableMetadataSet
Expand All @@ -11,6 +12,27 @@
from dagster_airbyte.utils import generate_table_schema, get_airbyte_connection_table_name


class AirbyteJobStatusType(str, Enum):
RUNNING = "running"
SUCCEEDED = "succeeded"
CANCELLED = "cancelled"
PENDING = "pending"
FAILED = "failed"
ERROR = "error"
INCOMPLETE = "incomplete"


@deprecated(breaking_version="1.10", additional_warn_text="Use `AirbyteJobStatusType` instead.")
class AirbyteState:
RUNNING = AirbyteJobStatusType.RUNNING
SUCCEEDED = AirbyteJobStatusType.SUCCEEDED
CANCELLED = AirbyteJobStatusType.CANCELLED
PENDING = AirbyteJobStatusType.PENDING
FAILED = AirbyteJobStatusType.FAILED
ERROR = AirbyteJobStatusType.ERROR
INCOMPLETE = AirbyteJobStatusType.INCOMPLETE


@record
class AirbyteConnectionTableProps:
table_name: str
Expand Down Expand Up @@ -108,6 +130,25 @@ def from_stream_details(
)


@whitelist_for_serdes
@record
class AirbyteJob:
"""Represents an Airbyte job, based on data as returned from the API."""

id: int
status: str

@classmethod
def from_job_details(
cls,
job_details: Mapping[str, Any],
) -> "AirbyteJob":
return cls(
id=job_details["jobId"],
status=job_details["status"],
)


@whitelist_for_serdes
@record
class AirbyteWorkspaceData:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import Iterator
from typing import Any, Iterator, Mapping

import pytest
import responses
Expand All @@ -8,7 +8,7 @@
AIRBYTE_REST_API_BASE,
AIRBYTE_REST_API_VERSION,
)
from dagster_airbyte.translator import AirbyteConnectionTableProps
from dagster_airbyte.translator import AirbyteConnectionTableProps, AirbyteJobStatusType

TEST_WORKSPACE_ID = "some_workspace_id"
TEST_CLIENT_ID = "some_client_id"
Expand All @@ -29,6 +29,8 @@
TEST_JSON_SCHEMA = {}
TEST_JOB_ID = 12345

TEST_UNRECOGNIZED_AIRBYTE_JOB_STATUS_TYPE = "unrecognized"

TEST_AIRBYTE_CONNECTION_TABLE_PROPS = AirbyteConnectionTableProps(
table_name=f"{TEST_STREAM_PREFIX}{TEST_STREAM_NAME}",
stream_prefix=TEST_STREAM_PREFIX,
Expand Down Expand Up @@ -165,13 +167,17 @@

# Taken from Airbyte REST API documentation
# https://reference.airbyte.com/reference/getjob
SAMPLE_JOB_RESPONSE = {
"jobId": TEST_JOB_ID,
"status": "running",
"jobType": "sync",
"startTime": "2023-03-25T01:30:50Z",
"connectionId": TEST_CONNECTION_ID,
}
def get_job_details_sample(status: str) -> Mapping[str, Any]:
return {
"jobId": TEST_JOB_ID,
"status": status,
"jobType": "sync",
"startTime": "2023-03-25T01:30:50Z",
"connectionId": TEST_CONNECTION_ID,
}


SAMPLE_JOB_RESPONSE_RUNNING = get_job_details_sample(status=AirbyteJobStatusType.RUNNING)


@pytest.fixture(
Expand Down Expand Up @@ -224,19 +230,19 @@ def all_api_mocks_fixture(
fetch_workspace_data_api_mocks.add(
method=responses.POST,
url=f"{AIRBYTE_REST_API_BASE}/{AIRBYTE_REST_API_VERSION}/jobs",
json=SAMPLE_JOB_RESPONSE,
json=SAMPLE_JOB_RESPONSE_RUNNING,
status=200,
)
fetch_workspace_data_api_mocks.add(
method=responses.GET,
url=f"{AIRBYTE_REST_API_BASE}/{AIRBYTE_REST_API_VERSION}/jobs/{TEST_JOB_ID}",
json=SAMPLE_JOB_RESPONSE,
json=SAMPLE_JOB_RESPONSE_RUNNING,
status=200,
)
fetch_workspace_data_api_mocks.add(
method=responses.DELETE,
url=f"{AIRBYTE_REST_API_BASE}/{AIRBYTE_REST_API_VERSION}/jobs/{TEST_JOB_ID}",
json=SAMPLE_JOB_RESPONSE,
json=SAMPLE_JOB_RESPONSE_RUNNING,
status=200,
)
yield fetch_workspace_data_api_mocks
Loading

0 comments on commit 2384e5b

Please sign in to comment.