Skip to content

Commit

Permalink
[9/n][dagster-airbyte] Implement base sync method in AirbyteCloudClie…
Browse files Browse the repository at this point in the history
…nt (#26428)

## Summary & Motivation

Implement endpoint to start a sync job in Airbyte Cloud. To be used in
subsequent PRs in sync and poll process.

## How I Tested These Changes

Additional tests with BK
  • Loading branch information
maximearmstrong authored Dec 26, 2024
1 parent 8c6746a commit 9468df4
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -992,6 +992,17 @@ def get_destination_details(self, destination_id: str) -> Mapping[str, Any]:
base_url=self.rest_api_base_url,
)

def start_sync_job(self, connection_id: str) -> Mapping[str, Any]:
return self._make_request(
method="POST",
endpoint="jobs",
base_url=self.rest_api_base_url,
data={
"connectionId": connection_id,
"jobType": "sync",
},
)


@experimental
class AirbyteCloudWorkspace(ConfigurableResource):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@
}


# Taken from Airbyte API documentation
# Taken from Airbyte REST API documentation
# https://reference.airbyte.com/reference/getdestination
SAMPLE_DESTINATION_DETAILS = {
"destinationId": TEST_DESTINATION_ID,
Expand All @@ -162,6 +162,17 @@
}


# Taken from Airbyte REST API documentation
# https://reference.airbyte.com/reference/getjob
SAMPLE_JOB_RESPONSE = {
"jobId": "3fa85f64-5717-4562-b3fc-2c963f66afa6",
"status": "running",
"jobType": "sync",
"startTime": "2023-03-25T01:30:50Z",
"connectionId": TEST_CONNECTION_ID,
}


@pytest.fixture(
name="base_api_mocks",
)
Expand Down Expand Up @@ -201,3 +212,18 @@ def fetch_workspace_data_api_mocks_fixture(
status=200,
)
yield base_api_mocks


@pytest.fixture(
name="all_api_mocks",
)
def all_api_mocks_fixture(
fetch_workspace_data_api_mocks: responses.RequestsMock,
) -> Iterator[responses.RequestsMock]:
fetch_workspace_data_api_mocks.add(
method=responses.POST,
url=f"{AIRBYTE_REST_API_BASE}/{AIRBYTE_REST_API_VERSION}/jobs",
json=SAMPLE_JOB_RESPONSE,
status=200,
)
yield fetch_workspace_data_api_mocks
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,11 @@ def assert_token_call_and_split_calls(calls: responses.CallList):
return calls[1:]


def assert_rest_api_call(call: responses.Call, endpoint: str):
def assert_rest_api_call(call: responses.Call, endpoint: str, object_id: Optional[str] = None):
rest_api_url = call.request.url.split("?")[0]
assert rest_api_url == f"{AIRBYTE_REST_API_BASE}/{AIRBYTE_REST_API_VERSION}/{endpoint}"
if object_id:
assert object_id in call.request.body.decode()
assert call.request.headers["Authorization"] == f"Bearer {TEST_ACCESS_TOKEN}"


Expand Down Expand Up @@ -112,7 +114,7 @@ def test_refresh_access_token(base_api_mocks: responses.RequestsMock) -> None:


def test_basic_resource_request(
fetch_workspace_data_api_mocks: responses.RequestsMock,
all_api_mocks: responses.RequestsMock,
) -> None:
resource = AirbyteCloudWorkspace(
workspace_id=TEST_WORKSPACE_ID,
Expand All @@ -125,13 +127,15 @@ def test_basic_resource_request(
client.get_connections()
client.get_connection_details(connection_id=TEST_CONNECTION_ID)
client.get_destination_details(destination_id=TEST_DESTINATION_ID)
client.start_sync_job(connection_id=TEST_CONNECTION_ID)

assert len(fetch_workspace_data_api_mocks.calls) == 4
assert len(all_api_mocks.calls) == 5
# The first call is to create the access token
api_calls = assert_token_call_and_split_calls(calls=fetch_workspace_data_api_mocks.calls)
api_calls = assert_token_call_and_split_calls(calls=all_api_mocks.calls)
# The next calls are actual API calls
assert_rest_api_call(call=api_calls[0], endpoint="connections")
assert_configuration_api_call(
call=api_calls[1], endpoint="connections/get", object_id=TEST_CONNECTION_ID
)
assert_rest_api_call(call=api_calls[2], endpoint=f"destinations/{TEST_DESTINATION_ID}")
assert_rest_api_call(call=api_calls[3], endpoint="jobs", object_id=TEST_CONNECTION_ID)

0 comments on commit 9468df4

Please sign in to comment.