From 9468df40cb23dbedfb59870e22dc55bbce1a4fd4 Mon Sep 17 00:00:00 2001 From: Maxime Armstrong <46797220+maximearmstrong@users.noreply.github.com> Date: Thu, 26 Dec 2024 10:25:42 -0500 Subject: [PATCH] [9/n][dagster-airbyte] Implement base sync method in AirbyteCloudClient (#26428) ## Summary & Motivation Implement endpoint to start a sync job in Airbyte Cloud. To be used in subsequent PRs in sync and poll process. ## How I Tested These Changes Additional tests with BK --- .../dagster_airbyte/resources.py | 11 ++++++++ .../experimental/conftest.py | 28 ++++++++++++++++++- .../experimental/test_resources.py | 12 +++++--- 3 files changed, 46 insertions(+), 5 deletions(-) diff --git a/python_modules/libraries/dagster-airbyte/dagster_airbyte/resources.py b/python_modules/libraries/dagster-airbyte/dagster_airbyte/resources.py index d17f975d3586a..2be0677807f1b 100644 --- a/python_modules/libraries/dagster-airbyte/dagster_airbyte/resources.py +++ b/python_modules/libraries/dagster-airbyte/dagster_airbyte/resources.py @@ -992,6 +992,17 @@ def get_destination_details(self, destination_id: str) -> Mapping[str, Any]: base_url=self.rest_api_base_url, ) + def start_sync_job(self, connection_id: str) -> Mapping[str, Any]: + return self._make_request( + method="POST", + endpoint="jobs", + base_url=self.rest_api_base_url, + data={ + "connectionId": connection_id, + "jobType": "sync", + }, + ) + @experimental class AirbyteCloudWorkspace(ConfigurableResource): diff --git a/python_modules/libraries/dagster-airbyte/dagster_airbyte_tests/experimental/conftest.py b/python_modules/libraries/dagster-airbyte/dagster_airbyte_tests/experimental/conftest.py index e304655478e57..67d366c7c5717 100644 --- a/python_modules/libraries/dagster-airbyte/dagster_airbyte_tests/experimental/conftest.py +++ b/python_modules/libraries/dagster-airbyte/dagster_airbyte_tests/experimental/conftest.py @@ -144,7 +144,7 @@ } -# Taken from Airbyte API documentation +# Taken from Airbyte REST API documentation # https://reference.airbyte.com/reference/getdestination SAMPLE_DESTINATION_DETAILS = { "destinationId": TEST_DESTINATION_ID, @@ -162,6 +162,17 @@ } +# Taken from Airbyte REST API documentation +# https://reference.airbyte.com/reference/getjob +SAMPLE_JOB_RESPONSE = { + "jobId": "3fa85f64-5717-4562-b3fc-2c963f66afa6", + "status": "running", + "jobType": "sync", + "startTime": "2023-03-25T01:30:50Z", + "connectionId": TEST_CONNECTION_ID, +} + + @pytest.fixture( name="base_api_mocks", ) @@ -201,3 +212,18 @@ def fetch_workspace_data_api_mocks_fixture( status=200, ) yield base_api_mocks + + +@pytest.fixture( + name="all_api_mocks", +) +def all_api_mocks_fixture( + fetch_workspace_data_api_mocks: responses.RequestsMock, +) -> Iterator[responses.RequestsMock]: + fetch_workspace_data_api_mocks.add( + method=responses.POST, + url=f"{AIRBYTE_REST_API_BASE}/{AIRBYTE_REST_API_VERSION}/jobs", + json=SAMPLE_JOB_RESPONSE, + status=200, + ) + yield fetch_workspace_data_api_mocks diff --git a/python_modules/libraries/dagster-airbyte/dagster_airbyte_tests/experimental/test_resources.py b/python_modules/libraries/dagster-airbyte/dagster_airbyte_tests/experimental/test_resources.py index 42340a5cf9182..144bad731c115 100644 --- a/python_modules/libraries/dagster-airbyte/dagster_airbyte_tests/experimental/test_resources.py +++ b/python_modules/libraries/dagster-airbyte/dagster_airbyte_tests/experimental/test_resources.py @@ -35,9 +35,11 @@ def assert_token_call_and_split_calls(calls: responses.CallList): return calls[1:] -def assert_rest_api_call(call: responses.Call, endpoint: str): +def assert_rest_api_call(call: responses.Call, endpoint: str, object_id: Optional[str] = None): rest_api_url = call.request.url.split("?")[0] assert rest_api_url == f"{AIRBYTE_REST_API_BASE}/{AIRBYTE_REST_API_VERSION}/{endpoint}" + if object_id: + assert object_id in call.request.body.decode() assert call.request.headers["Authorization"] == f"Bearer {TEST_ACCESS_TOKEN}" @@ -112,7 +114,7 @@ def test_refresh_access_token(base_api_mocks: responses.RequestsMock) -> None: def test_basic_resource_request( - fetch_workspace_data_api_mocks: responses.RequestsMock, + all_api_mocks: responses.RequestsMock, ) -> None: resource = AirbyteCloudWorkspace( workspace_id=TEST_WORKSPACE_ID, @@ -125,13 +127,15 @@ def test_basic_resource_request( client.get_connections() client.get_connection_details(connection_id=TEST_CONNECTION_ID) client.get_destination_details(destination_id=TEST_DESTINATION_ID) + client.start_sync_job(connection_id=TEST_CONNECTION_ID) - assert len(fetch_workspace_data_api_mocks.calls) == 4 + assert len(all_api_mocks.calls) == 5 # The first call is to create the access token - api_calls = assert_token_call_and_split_calls(calls=fetch_workspace_data_api_mocks.calls) + api_calls = assert_token_call_and_split_calls(calls=all_api_mocks.calls) # The next calls are actual API calls assert_rest_api_call(call=api_calls[0], endpoint="connections") assert_configuration_api_call( call=api_calls[1], endpoint="connections/get", object_id=TEST_CONNECTION_ID ) assert_rest_api_call(call=api_calls[2], endpoint=f"destinations/{TEST_DESTINATION_ID}") + assert_rest_api_call(call=api_calls[3], endpoint="jobs", object_id=TEST_CONNECTION_ID)