-
Notifications
You must be signed in to change notification settings - Fork 1.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[12/n][dagster-airbyte] Implement sync_and_poll method in AirbyteCloudClient #26431
Changes from all commits
16a4810
2e9e045
3c76dca
875d62f
3c04d0b
fa56484
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,6 +1,7 @@ | ||
from enum import Enum | ||
from typing import Any, List, Mapping, Optional, Sequence | ||
|
||
from dagster._annotations import experimental | ||
from dagster._annotations import deprecated, experimental | ||
from dagster._core.definitions.asset_key import AssetKey | ||
from dagster._core.definitions.asset_spec import AssetSpec | ||
from dagster._core.definitions.metadata.metadata_set import NamespacedMetadataSet, TableMetadataSet | ||
|
@@ -11,6 +12,27 @@ | |
from dagster_airbyte.utils import generate_table_schema, get_airbyte_connection_table_name | ||
|
||
|
||
class AirbyteJobStatusType(str, Enum): | ||
RUNNING = "running" | ||
SUCCEEDED = "succeeded" | ||
CANCELLED = "cancelled" | ||
PENDING = "pending" | ||
FAILED = "failed" | ||
ERROR = "error" | ||
INCOMPLETE = "incomplete" | ||
|
||
|
||
@deprecated(breaking_version="1.10", additional_warn_text="Use `AirbyteJobStatusType` instead.") | ||
class AirbyteState: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This enum is renamed to be Kept for backcompat and to be removed in 1.10 because it was exposed in There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is 1.10 the right thing to say here; since dagster-airbyte is on a pre 1.0 release ? |
||
RUNNING = AirbyteJobStatusType.RUNNING | ||
SUCCEEDED = AirbyteJobStatusType.SUCCEEDED | ||
CANCELLED = AirbyteJobStatusType.CANCELLED | ||
PENDING = AirbyteJobStatusType.PENDING | ||
FAILED = AirbyteJobStatusType.FAILED | ||
ERROR = AirbyteJobStatusType.ERROR | ||
INCOMPLETE = AirbyteJobStatusType.INCOMPLETE | ||
|
||
|
||
@record | ||
class AirbyteConnectionTableProps: | ||
table_name: str | ||
|
@@ -108,6 +130,25 @@ def from_stream_details( | |
) | ||
|
||
|
||
@whitelist_for_serdes | ||
@record | ||
class AirbyteJob: | ||
"""Represents an Airbyte job, based on data as returned from the API.""" | ||
|
||
id: int | ||
status: str | ||
|
||
@classmethod | ||
def from_job_details( | ||
cls, | ||
job_details: Mapping[str, Any], | ||
) -> "AirbyteJob": | ||
return cls( | ||
id=job_details["jobId"], | ||
status=job_details["status"], | ||
) | ||
|
||
|
||
@whitelist_for_serdes | ||
@record | ||
class AirbyteWorkspaceData: | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,4 +1,4 @@ | ||
from typing import Iterator | ||
from typing import Any, Iterator, Mapping | ||
|
||
import pytest | ||
import responses | ||
|
@@ -8,7 +8,7 @@ | |
AIRBYTE_REST_API_BASE, | ||
AIRBYTE_REST_API_VERSION, | ||
) | ||
from dagster_airbyte.translator import AirbyteConnectionTableProps | ||
from dagster_airbyte.translator import AirbyteConnectionTableProps, AirbyteJobStatusType | ||
|
||
TEST_WORKSPACE_ID = "some_workspace_id" | ||
TEST_CLIENT_ID = "some_client_id" | ||
|
@@ -29,6 +29,8 @@ | |
TEST_JSON_SCHEMA = {} | ||
TEST_JOB_ID = 12345 | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Fixes an error in Airbyte's example in the API documentation here. The json example uses a UUID as the Job ID, but the documentation mentions that the ID is an int. |
||
|
||
TEST_UNRECOGNIZED_AIRBYTE_JOB_STATUS_TYPE = "unrecognized" | ||
|
||
TEST_AIRBYTE_CONNECTION_TABLE_PROPS = AirbyteConnectionTableProps( | ||
table_name=f"{TEST_STREAM_PREFIX}{TEST_STREAM_NAME}", | ||
stream_prefix=TEST_STREAM_PREFIX, | ||
|
@@ -165,13 +167,17 @@ | |
|
||
# Taken from Airbyte REST API documentation | ||
# https://reference.airbyte.com/reference/getjob | ||
SAMPLE_JOB_RESPONSE = { | ||
"jobId": TEST_JOB_ID, | ||
"status": "running", | ||
"jobType": "sync", | ||
"startTime": "2023-03-25T01:30:50Z", | ||
"connectionId": TEST_CONNECTION_ID, | ||
} | ||
def get_job_details_sample(status: str) -> Mapping[str, Any]: | ||
return { | ||
"jobId": TEST_JOB_ID, | ||
"status": status, | ||
"jobType": "sync", | ||
"startTime": "2023-03-25T01:30:50Z", | ||
"connectionId": TEST_CONNECTION_ID, | ||
} | ||
|
||
|
||
SAMPLE_JOB_RESPONSE_RUNNING = get_job_details_sample(status=AirbyteJobStatusType.RUNNING) | ||
|
||
|
||
@pytest.fixture( | ||
|
@@ -224,19 +230,19 @@ def all_api_mocks_fixture( | |
fetch_workspace_data_api_mocks.add( | ||
method=responses.POST, | ||
url=f"{AIRBYTE_REST_API_BASE}/{AIRBYTE_REST_API_VERSION}/jobs", | ||
json=SAMPLE_JOB_RESPONSE, | ||
json=SAMPLE_JOB_RESPONSE_RUNNING, | ||
status=200, | ||
) | ||
fetch_workspace_data_api_mocks.add( | ||
method=responses.GET, | ||
url=f"{AIRBYTE_REST_API_BASE}/{AIRBYTE_REST_API_VERSION}/jobs/{TEST_JOB_ID}", | ||
json=SAMPLE_JOB_RESPONSE, | ||
json=SAMPLE_JOB_RESPONSE_RUNNING, | ||
status=200, | ||
) | ||
fetch_workspace_data_api_mocks.add( | ||
method=responses.DELETE, | ||
url=f"{AIRBYTE_REST_API_BASE}/{AIRBYTE_REST_API_VERSION}/jobs/{TEST_JOB_ID}", | ||
json=SAMPLE_JOB_RESPONSE, | ||
json=SAMPLE_JOB_RESPONSE_RUNNING, | ||
status=200, | ||
) | ||
yield fetch_workspace_data_api_mocks |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit; why make the caller initialize the job from the details. Feels like we should just return it directly