Skip to content

Commit

Permalink
Update AirbyteCloudClient.sync_and_poll
Browse files Browse the repository at this point in the history
  • Loading branch information
maximearmstrong committed Dec 18, 2024
1 parent 4d66e71 commit 7394328
Show file tree
Hide file tree
Showing 6 changed files with 130 additions and 76 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,15 @@
AirbyteCloudResource as AirbyteCloudResource,
AirbyteCloudWorkspace as AirbyteCloudWorkspace,
AirbyteResource as AirbyteResource,
AirbyteState as AirbyteState,
airbyte_cloud_resource as airbyte_cloud_resource,
airbyte_resource as airbyte_resource,
load_airbyte_cloud_asset_specs as load_airbyte_cloud_asset_specs,
)
from dagster_airbyte.translator import DagsterAirbyteTranslator as DagsterAirbyteTranslator
from dagster_airbyte.translator import (
AirbyteJobStatusType as AirbyteJobStatusType,
AirbyteState as AirbyteState,
DagsterAirbyteTranslator as DagsterAirbyteTranslator,
)
from dagster_airbyte.types import AirbyteOutput as AirbyteOutput
from dagster_airbyte.version import __version__ as __version__

Expand Down
120 changes: 60 additions & 60 deletions python_modules/libraries/dagster-airbyte/dagster_airbyte/resources.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@
from dagster_airbyte.translator import (
AirbyteConnection,
AirbyteDestination,
AirbyteJob,
AirbyteJobStatusType,
AirbyteWorkspaceData,
DagsterAirbyteTranslator,
)
Expand All @@ -53,16 +55,6 @@
AIRBYTE_RECONSTRUCTION_METADATA_KEY_PREFIX = "dagster-airbyte/reconstruction_metadata"


class AirbyteState:
RUNNING = "running"
SUCCEEDED = "succeeded"
CANCELLED = "cancelled"
PENDING = "pending"
FAILED = "failed"
ERROR = "error"
INCOMPLETE = "incomplete"


class AirbyteResourceState:
def __init__(self) -> None:
self.request_cache: Dict[str, Optional[Mapping[str, object]]] = {}
Expand Down Expand Up @@ -252,21 +244,30 @@ def sync_and_poll(
job_info = cast(Dict[str, object], job_details.get("job", {}))
state = job_info.get("status")

if state in (AirbyteState.RUNNING, AirbyteState.PENDING, AirbyteState.INCOMPLETE):
if state in (
AirbyteJobStatusType.RUNNING,
AirbyteJobStatusType.PENDING,
AirbyteJobStatusType.INCOMPLETE,
):
continue
elif state == AirbyteState.SUCCEEDED:
elif state == AirbyteJobStatusType.SUCCEEDED:
break
elif state == AirbyteState.ERROR:
elif state == AirbyteJobStatusType.ERROR:
raise Failure(f"Job failed: {job_id}")
elif state == AirbyteState.CANCELLED:
elif state == AirbyteJobStatusType.CANCELLED:
raise Failure(f"Job was cancelled: {job_id}")
else:
raise Failure(f"Encountered unexpected state `{state}` for job_id {job_id}")
finally:
# if Airbyte sync has not completed, make sure to cancel it so that it doesn't outlive
# the python process
if (
state not in (AirbyteState.SUCCEEDED, AirbyteState.ERROR, AirbyteState.CANCELLED)
state
not in (
AirbyteJobStatusType.SUCCEEDED,
AirbyteJobStatusType.ERROR,
AirbyteJobStatusType.CANCELLED,
)
and self.cancel_sync_on_run_termination
):
self.cancel_job(job_id)
Expand Down Expand Up @@ -742,21 +743,30 @@ def sync_and_poll(
job_info = cast(Dict[str, object], job_details.get("job", {}))
state = job_info.get("status")

if state in (AirbyteState.RUNNING, AirbyteState.PENDING, AirbyteState.INCOMPLETE):
if state in (
AirbyteJobStatusType.RUNNING,
AirbyteJobStatusType.PENDING,
AirbyteJobStatusType.INCOMPLETE,
):
continue
elif state == AirbyteState.SUCCEEDED:
elif state == AirbyteJobStatusType.SUCCEEDED:
break
elif state == AirbyteState.ERROR:
elif state == AirbyteJobStatusType.ERROR:
raise Failure(f"Job failed: {job_id}")
elif state == AirbyteState.CANCELLED:
elif state == AirbyteJobStatusType.CANCELLED:
raise Failure(f"Job was cancelled: {job_id}")
else:
raise Failure(f"Encountered unexpected state `{state}` for job_id {job_id}")
finally:
# if Airbyte sync has not completed, make sure to cancel it so that it doesn't outlive
# the python process
if (
state not in (AirbyteState.SUCCEEDED, AirbyteState.ERROR, AirbyteState.CANCELLED)
state
not in (
AirbyteJobStatusType.SUCCEEDED,
AirbyteJobStatusType.ERROR,
AirbyteJobStatusType.CANCELLED,
)
and self.cancel_sync_on_run_termination
):
self.cancel_job(job_id)
Expand Down Expand Up @@ -1018,6 +1028,7 @@ def sync_and_poll(
connection_id: str,
poll_interval: Optional[float] = None,
poll_timeout: Optional[float] = None,
cancel_on_termination: bool = True,
) -> AirbyteOutput:
"""Initializes a sync operation for the given connection, and polls until it completes.
Expand All @@ -1027,69 +1038,58 @@ def sync_and_poll(
poll_interval (float): The time (in seconds) that will be waited between successive polls.
poll_timeout (float): The maximum time that will wait before this operation is timed
out. By default, this will never time out.
cancel_on_termination (bool): Whether to cancel a sync in Airbyte if the Dagster runner is terminated.
This may be useful to disable if using Airbyte sources that cannot be cancelled and
resumed easily, or if your Dagster deployment may experience runner interruptions
that do not impact your Airbyte deployment.
Returns:
:py:class:`~AirbyteOutput`:
Details of the sync job.
"""
connection_details = self.get_connection_details(connection_id)
job_details = self.start_sync(connection_id)
# TODO: Use AirbyteJob class
job_id = job_details["id"]
start_job_details = self.start_sync(connection_id)
job = AirbyteJob.from_job_details(job_details=start_job_details)

self._log.info(f"Job {job_id} initialized for connection_id={connection_id}.")
self._log.info(f"Job {job.id} initialized for connection_id={connection_id}.")
start = time.monotonic()
logged_attempts = 0
logged_lines = 0
state = None

try:
while True:
if poll_timeout and start + poll_timeout < time.monotonic():
raise Failure(
f"Timeout: Airbyte job {job_id} is not ready after the timeout"
f"Timeout: Airbyte job {job.id} is not ready after the timeout"
f" {poll_timeout} seconds"
)
time.sleep(poll_interval or self.poll_interval)
job_details = self.get_job_status(connection_id, job_id)
attempts = cast(List, job_details.get("attempts", []))
cur_attempt = len(attempts)
# spit out the available Airbyte log info
if cur_attempt:
if self._should_forward_logs:
log_lines = attempts[logged_attempts].get("logs", {}).get("logLines", [])

for line in log_lines[logged_lines:]:
sys.stdout.write(line + "\n")
sys.stdout.flush()
logged_lines = len(log_lines)

# if there's a next attempt, this one will have no more log messages
if logged_attempts < cur_attempt - 1:
logged_lines = 0
logged_attempts += 1

state = job_details["status"]
if state in (AirbyteState.RUNNING, AirbyteState.PENDING, AirbyteState.INCOMPLETE):
poll_job_details = self.get_job_status(connection_id, job.id)
job = AirbyteJob.from_job_details(job_details=poll_job_details)
if job.status in (
AirbyteJobStatusType.RUNNING,
AirbyteJobStatusType.PENDING,
AirbyteJobStatusType.INCOMPLETE,
):
continue
elif state == AirbyteState.SUCCEEDED:
elif job.status == AirbyteJobStatusType.SUCCEEDED:
break
elif state == AirbyteState.ERROR:
raise Failure(f"Job failed: {job_id}")
elif state == AirbyteState.CANCELLED:
raise Failure(f"Job was cancelled: {job_id}")
elif job.status == AirbyteJobStatusType.ERROR:
raise Failure(f"Job failed: {job.id}")
elif job.status == AirbyteJobStatusType.CANCELLED:
raise Failure(f"Job was cancelled: {job.id}")
else:
raise Failure(f"Encountered unexpected state `{state}` for job_id {job_id}")
raise Failure(
f"Encountered unexpected state `{job.status}` for job_id {job.id}"
)
finally:
# if Airbyte sync has not completed, make sure to cancel it so that it doesn't outlive
# the python process
if (
state not in (AirbyteState.SUCCEEDED, AirbyteState.ERROR, AirbyteState.CANCELLED)
and self.cancel_sync_on_run_termination
if cancel_on_termination and job.status not in (
AirbyteJobStatusType.SUCCEEDED,
AirbyteJobStatusType.ERROR,
AirbyteJobStatusType.CANCELLED,
):
self.cancel_job(job_id)
self.cancel_job(job.id)

return AirbyteOutput(job_details=job_details, connection_details=connection_details)
return AirbyteOutput(job_details=poll_job_details, connection_details=connection_details)


@experimental
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from enum import Enum
from typing import Any, List, Mapping, Optional, Sequence

from dagster._annotations import experimental
from dagster._annotations import deprecated, experimental
from dagster._core.definitions.asset_key import AssetKey
from dagster._core.definitions.asset_spec import AssetSpec
from dagster._core.definitions.metadata.metadata_set import NamespacedMetadataSet, TableMetadataSet
Expand All @@ -11,6 +12,27 @@
from dagster_airbyte.utils import generate_table_schema, get_airbyte_connection_table_name


class AirbyteJobStatusType(str, Enum):
RUNNING = "running"
SUCCEEDED = "succeeded"
CANCELLED = "cancelled"
PENDING = "pending"
FAILED = "failed"
ERROR = "error"
INCOMPLETE = "incomplete"


@deprecated(breaking_version="1.10", additional_warn_text="Use `AirbyteJobStatusType` instead.")
class AirbyteState:
RUNNING = AirbyteJobStatusType.RUNNING
SUCCEEDED = AirbyteJobStatusType.SUCCEEDED
CANCELLED = AirbyteJobStatusType.CANCELLED
PENDING = AirbyteJobStatusType.PENDING
FAILED = AirbyteJobStatusType.FAILED
ERROR = AirbyteJobStatusType.ERROR
INCOMPLETE = AirbyteJobStatusType.INCOMPLETE


@record
class AirbyteConnectionTableProps:
table_name: str
Expand Down Expand Up @@ -108,6 +130,25 @@ def from_stream_details(
)


@whitelist_for_serdes
@record
class AirbyteJob:
"""Represents an Airbyte job, based on data as returned from the API."""

id: str
status: str

@classmethod
def from_job_details(
cls,
job_details: Mapping[str, Any],
) -> "AirbyteStream":
return cls(
id=job_details["id"],
status=job_details["status"],
)


@whitelist_for_serdes
@record
class AirbyteWorkspaceData:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import pytest
import responses
from dagster import Failure
from dagster_airbyte import AirbyteCloudResource, AirbyteOutput, AirbyteState
from dagster_airbyte import AirbyteCloudResource, AirbyteJobStatusType, AirbyteOutput


@responses.activate
Expand Down Expand Up @@ -50,7 +50,12 @@ def test_trigger_connection_fail() -> None:
@responses.activate
@pytest.mark.parametrize(
"state",
[AirbyteState.SUCCEEDED, AirbyteState.CANCELLED, AirbyteState.ERROR, "unrecognized"],
[
AirbyteJobStatusType.SUCCEEDED,
AirbyteJobStatusType.CANCELLED,
AirbyteJobStatusType.ERROR,
"unrecognized",
],
)
def test_sync_and_poll(state) -> None:
ab_resource = AirbyteCloudResource(
Expand Down Expand Up @@ -83,11 +88,11 @@ def test_sync_and_poll(state) -> None:
json={"jobId": 1, "status": "cancelled", "jobType": "sync"},
)

if state == AirbyteState.ERROR:
if state == AirbyteJobStatusType.ERROR:
with pytest.raises(Failure, match="Job failed"):
ab_resource.sync_and_poll("some_connection", 0)

elif state == AirbyteState.CANCELLED:
elif state == AirbyteJobStatusType.CANCELLED:
with pytest.raises(Failure, match="Job was cancelled"):
ab_resource.sync_and_poll("some_connection", 0)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
build_init_resource_context,
)
from dagster._core.definitions.metadata import MetadataValue
from dagster_airbyte import AirbyteOutput, AirbyteResource, AirbyteState, airbyte_resource
from dagster_airbyte import AirbyteJobStatusType, AirbyteOutput, AirbyteResource, airbyte_resource
from dagster_airbyte.utils import generate_materializations

from dagster_airbyte_tests.utils import (
Expand Down Expand Up @@ -65,7 +65,12 @@ def test_trigger_connection_fail(
@responses.activate
@pytest.mark.parametrize(
"state",
[AirbyteState.SUCCEEDED, AirbyteState.CANCELLED, AirbyteState.ERROR, "unrecognized"],
[
AirbyteJobStatusType.SUCCEEDED,
AirbyteJobStatusType.CANCELLED,
AirbyteJobStatusType.ERROR,
"unrecognized",
],
)
@pytest.mark.parametrize(
"forward_logs",
Expand Down Expand Up @@ -113,11 +118,11 @@ def test_sync_and_poll(
if state == "unrecognized":
responses.add(responses.POST, f"{ab_resource.api_base_url}/jobs/cancel", status=204)

if state == AirbyteState.ERROR:
if state == AirbyteJobStatusType.ERROR:
with pytest.raises(Failure, match="Job failed"):
ab_resource.sync_and_poll("some_connection", 0)

elif state == AirbyteState.CANCELLED:
elif state == AirbyteJobStatusType.CANCELLED:
with pytest.raises(Failure, match="Job was cancelled"):
ab_resource.sync_and_poll("some_connection", 0)

Expand Down Expand Up @@ -295,7 +300,7 @@ def _get_attempt(ls):
method=responses.POST,
url=ab_resource.api_base_url + "/jobs/get",
json={
"job": {"id": 1, "status": AirbyteState.SUCCEEDED},
"job": {"id": 1, "status": AirbyteJobStatusType.SUCCEEDED},
"attempts": [
_get_attempt(ls) for ls in [["log1a", "log1b", "log1c"], ["log2a", "log2b"]]
],
Expand Down
Loading

0 comments on commit 7394328

Please sign in to comment.