Skip to content

Commit

Permalink
Add tests
Browse files Browse the repository at this point in the history
  • Loading branch information
maximearmstrong committed Dec 18, 2024
1 parent 7394328 commit 7d58e97
Show file tree
Hide file tree
Showing 4 changed files with 260 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1048,20 +1048,24 @@ def sync_and_poll(
Details of the sync job.
"""
connection_details = self.get_connection_details(connection_id)
start_job_details = self.start_sync(connection_id)
start_job_details = self.start_sync_job(connection_id)
job = AirbyteJob.from_job_details(job_details=start_job_details)

self._log.info(f"Job {job.id} initialized for connection_id={connection_id}.")
start = time.monotonic()
poll_start = datetime.now()
poll_interval = (
poll_interval if poll_interval is not None else DEFAULT_POLL_INTERVAL_SECONDS
)
try:
while True:
if poll_timeout and start + poll_timeout < time.monotonic():
if poll_timeout and datetime.now() > poll_start + timedelta(seconds=poll_timeout):
raise Failure(
f"Timeout: Airbyte job {job.id} is not ready after the timeout"
f" {poll_timeout} seconds"
)
time.sleep(poll_interval or self.poll_interval)
poll_job_details = self.get_job_status(connection_id, job.id)

time.sleep(poll_interval)
poll_job_details = self.get_job_details(job.id)
job = AirbyteJob.from_job_details(job_details=poll_job_details)
if job.status in (
AirbyteJobStatusType.RUNNING,
Expand All @@ -1071,7 +1075,7 @@ def sync_and_poll(
continue
elif job.status == AirbyteJobStatusType.SUCCEEDED:
break
elif job.status == AirbyteJobStatusType.ERROR:
elif job.status in [AirbyteJobStatusType.ERROR, AirbyteJobStatusType.FAILED]:
raise Failure(f"Job failed: {job.id}")
elif job.status == AirbyteJobStatusType.CANCELLED:
raise Failure(f"Job was cancelled: {job.id}")
Expand All @@ -1086,6 +1090,7 @@ def sync_and_poll(
AirbyteJobStatusType.SUCCEEDED,
AirbyteJobStatusType.ERROR,
AirbyteJobStatusType.CANCELLED,
AirbyteJobStatusType.FAILED,
):
self.cancel_job(job.id)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,16 +135,16 @@ def from_stream_details(
class AirbyteJob:
"""Represents an Airbyte job, based on data as returned from the API."""

id: str
id: int
status: str

@classmethod
def from_job_details(
cls,
job_details: Mapping[str, Any],
) -> "AirbyteStream":
) -> "AirbyteJob":
return cls(
id=job_details["id"],
id=job_details["jobId"],
status=job_details["status"],
)

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import Iterator
from typing import Any, Iterator, Mapping

import pytest
import responses
Expand All @@ -8,7 +8,7 @@
AIRBYTE_REST_API_BASE,
AIRBYTE_REST_API_VERSION,
)
from dagster_airbyte.translator import AirbyteConnectionTableProps
from dagster_airbyte.translator import AirbyteConnectionTableProps, AirbyteJobStatusType

TEST_WORKSPACE_ID = "some_workspace_id"
TEST_CLIENT_ID = "some_client_id"
Expand All @@ -27,7 +27,9 @@
TEST_STREAM_NAME = "test_stream"
TEST_SELECTED = True
TEST_JSON_SCHEMA = {}
TEST_JOB_ID = "3fa85f64-5717-4562-b3fc-2c963f66afa6"
TEST_JOB_ID = 12345

TEST_UNRECOGNIZED_AIRBYTE_JOB_STATUS_TYPE = "unrecognized"

TEST_AIRBYTE_CONNECTION_TABLE_PROPS = AirbyteConnectionTableProps(
table_name=f"{TEST_STREAM_PREFIX}{TEST_STREAM_NAME}",
Expand Down Expand Up @@ -165,13 +167,17 @@

# Taken from Airbyte REST API documentation
# https://reference.airbyte.com/reference/getjob
SAMPLE_JOB_RESPONSE = {
"jobId": TEST_JOB_ID,
"status": "running",
"jobType": "sync",
"startTime": "2023-03-25T01:30:50Z",
"connectionId": TEST_CONNECTION_ID,
}
def get_job_details_sample(status: str) -> Mapping[str, Any]:
return {
"jobId": TEST_JOB_ID,
"status": status,
"jobType": "sync",
"startTime": "2023-03-25T01:30:50Z",
"connectionId": TEST_CONNECTION_ID,
}


SAMPLE_JOB_RESPONSE_RUNNING = get_job_details_sample(status=AirbyteJobStatusType.RUNNING)


@pytest.fixture(
Expand Down Expand Up @@ -224,19 +230,19 @@ def all_api_mocks_fixture(
fetch_workspace_data_api_mocks.add(
method=responses.POST,
url=f"{AIRBYTE_REST_API_BASE}/{AIRBYTE_REST_API_VERSION}/jobs",
json=SAMPLE_JOB_RESPONSE,
json=SAMPLE_JOB_RESPONSE_RUNNING,
status=200,
)
fetch_workspace_data_api_mocks.add(
method=responses.GET,
url=f"{AIRBYTE_REST_API_BASE}/{AIRBYTE_REST_API_VERSION}/jobs/{TEST_JOB_ID}",
json=SAMPLE_JOB_RESPONSE,
json=SAMPLE_JOB_RESPONSE_RUNNING,
status=200,
)
fetch_workspace_data_api_mocks.add(
method=responses.DELETE,
url=f"{AIRBYTE_REST_API_BASE}/{AIRBYTE_REST_API_VERSION}/jobs/{TEST_JOB_ID}",
json=SAMPLE_JOB_RESPONSE,
json=SAMPLE_JOB_RESPONSE_RUNNING,
status=200,
)
yield fetch_workspace_data_api_mocks
Loading

0 comments on commit 7d58e97

Please sign in to comment.