From 65b190b5c32d8acef79b1c4da9cb963385f802ec Mon Sep 17 00:00:00 2001 From: Maxime Armstrong Date: Wed, 11 Dec 2024 23:58:04 -0500 Subject: [PATCH 1/7] [dagster-airbyte] Implement base sync method in AirbyteCloudClient --- .../dagster-airbyte/dagster_airbyte/resources.py | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/python_modules/libraries/dagster-airbyte/dagster_airbyte/resources.py b/python_modules/libraries/dagster-airbyte/dagster_airbyte/resources.py index d17f975d3586a..784b2b3971fd0 100644 --- a/python_modules/libraries/dagster-airbyte/dagster_airbyte/resources.py +++ b/python_modules/libraries/dagster-airbyte/dagster_airbyte/resources.py @@ -992,6 +992,16 @@ def get_destination_details(self, destination_id: str) -> Mapping[str, Any]: base_url=self.rest_api_base_url, ) + def start_sync_job(self, connection_id: str) -> Mapping[str, Any]: + return self._make_request( + endpoint="jobs", + data={ + "connectionId": connection_id, + "jobType": "sync", + }, + ) + + @experimental class AirbyteCloudWorkspace(ConfigurableResource): From bc872df4c2e973777c7d3fcad234c77ada18caff Mon Sep 17 00:00:00 2001 From: Maxime Armstrong Date: Thu, 12 Dec 2024 00:02:45 -0500 Subject: [PATCH 2/7] Fix start_sync_job --- .../dagster-airbyte/dagster_airbyte/resources.py | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/python_modules/libraries/dagster-airbyte/dagster_airbyte/resources.py b/python_modules/libraries/dagster-airbyte/dagster_airbyte/resources.py index 784b2b3971fd0..2be0677807f1b 100644 --- a/python_modules/libraries/dagster-airbyte/dagster_airbyte/resources.py +++ b/python_modules/libraries/dagster-airbyte/dagster_airbyte/resources.py @@ -994,13 +994,14 @@ def get_destination_details(self, destination_id: str) -> Mapping[str, Any]: def start_sync_job(self, connection_id: str) -> Mapping[str, Any]: return self._make_request( - endpoint="jobs", - data={ - "connectionId": connection_id, - "jobType": "sync", - }, - ) - + method="POST", + endpoint="jobs", + base_url=self.rest_api_base_url, + data={ + "connectionId": connection_id, + "jobType": "sync", + }, + ) @experimental From 6c68b4c6e960f7844a112caa99736ed2812a9c11 Mon Sep 17 00:00:00 2001 From: Maxime Armstrong Date: Thu, 12 Dec 2024 18:48:12 -0500 Subject: [PATCH 3/7] Add test --- .../experimental/conftest.py | 22 +++++++++++++++++++ .../experimental/test_resources.py | 12 ++++++---- 2 files changed, 30 insertions(+), 4 deletions(-) diff --git a/python_modules/libraries/dagster-airbyte/dagster_airbyte_tests/experimental/conftest.py b/python_modules/libraries/dagster-airbyte/dagster_airbyte_tests/experimental/conftest.py index e304655478e57..44017f6d95b40 100644 --- a/python_modules/libraries/dagster-airbyte/dagster_airbyte_tests/experimental/conftest.py +++ b/python_modules/libraries/dagster-airbyte/dagster_airbyte_tests/experimental/conftest.py @@ -162,6 +162,13 @@ } +SAMPLE_JOB_RESPONSE = { + "id": "3fa85f64-5717-4562-b3fc-2c963f66afa6", + "status": "running", + "jobType": "sync", +} + + @pytest.fixture( name="base_api_mocks", ) @@ -201,3 +208,18 @@ def fetch_workspace_data_api_mocks_fixture( status=200, ) yield base_api_mocks + + +@pytest.fixture( + name="all_api_mocks", +) +def all_api_mocks_fixture( + fetch_workspace_data_api_mocks: responses.RequestsMock, +) -> Iterator[responses.RequestsMock]: + fetch_workspace_data_api_mocks.add( + method=responses.POST, + url=f"{AIRBYTE_REST_API_BASE}/{AIRBYTE_REST_API_VERSION}/jobs", + json=SAMPLE_JOB_RESPONSE, + status=200, + ) + yield fetch_workspace_data_api_mocks diff --git a/python_modules/libraries/dagster-airbyte/dagster_airbyte_tests/experimental/test_resources.py b/python_modules/libraries/dagster-airbyte/dagster_airbyte_tests/experimental/test_resources.py index 42340a5cf9182..144bad731c115 100644 --- a/python_modules/libraries/dagster-airbyte/dagster_airbyte_tests/experimental/test_resources.py +++ b/python_modules/libraries/dagster-airbyte/dagster_airbyte_tests/experimental/test_resources.py @@ -35,9 +35,11 @@ def assert_token_call_and_split_calls(calls: responses.CallList): return calls[1:] -def assert_rest_api_call(call: responses.Call, endpoint: str): +def assert_rest_api_call(call: responses.Call, endpoint: str, object_id: Optional[str] = None): rest_api_url = call.request.url.split("?")[0] assert rest_api_url == f"{AIRBYTE_REST_API_BASE}/{AIRBYTE_REST_API_VERSION}/{endpoint}" + if object_id: + assert object_id in call.request.body.decode() assert call.request.headers["Authorization"] == f"Bearer {TEST_ACCESS_TOKEN}" @@ -112,7 +114,7 @@ def test_refresh_access_token(base_api_mocks: responses.RequestsMock) -> None: def test_basic_resource_request( - fetch_workspace_data_api_mocks: responses.RequestsMock, + all_api_mocks: responses.RequestsMock, ) -> None: resource = AirbyteCloudWorkspace( workspace_id=TEST_WORKSPACE_ID, @@ -125,13 +127,15 @@ def test_basic_resource_request( client.get_connections() client.get_connection_details(connection_id=TEST_CONNECTION_ID) client.get_destination_details(destination_id=TEST_DESTINATION_ID) + client.start_sync_job(connection_id=TEST_CONNECTION_ID) - assert len(fetch_workspace_data_api_mocks.calls) == 4 + assert len(all_api_mocks.calls) == 5 # The first call is to create the access token - api_calls = assert_token_call_and_split_calls(calls=fetch_workspace_data_api_mocks.calls) + api_calls = assert_token_call_and_split_calls(calls=all_api_mocks.calls) # The next calls are actual API calls assert_rest_api_call(call=api_calls[0], endpoint="connections") assert_configuration_api_call( call=api_calls[1], endpoint="connections/get", object_id=TEST_CONNECTION_ID ) assert_rest_api_call(call=api_calls[2], endpoint=f"destinations/{TEST_DESTINATION_ID}") + assert_rest_api_call(call=api_calls[3], endpoint="jobs", object_id=TEST_CONNECTION_ID) From aa066f078843a10e5dfd84ee4ee1488e5fcdf0ae Mon Sep 17 00:00:00 2001 From: Maxime Armstrong Date: Thu, 12 Dec 2024 18:56:35 -0500 Subject: [PATCH 4/7] Update job sample --- .../dagster_airbyte_tests/experimental/conftest.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/python_modules/libraries/dagster-airbyte/dagster_airbyte_tests/experimental/conftest.py b/python_modules/libraries/dagster-airbyte/dagster_airbyte_tests/experimental/conftest.py index 44017f6d95b40..0c48992de2668 100644 --- a/python_modules/libraries/dagster-airbyte/dagster_airbyte_tests/experimental/conftest.py +++ b/python_modules/libraries/dagster-airbyte/dagster_airbyte_tests/experimental/conftest.py @@ -166,6 +166,8 @@ "id": "3fa85f64-5717-4562-b3fc-2c963f66afa6", "status": "running", "jobType": "sync", + "startTime": "2023-03-25T01:30:50Z", + "connectionId": TEST_CONNECTION_ID } From aa0d5268fa976374dd97235f0ed8e95464c7f72c Mon Sep 17 00:00:00 2001 From: Maxime Armstrong Date: Mon, 16 Dec 2024 13:46:52 -0500 Subject: [PATCH 5/7] Lint --- .../dagster_airbyte_tests/experimental/conftest.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python_modules/libraries/dagster-airbyte/dagster_airbyte_tests/experimental/conftest.py b/python_modules/libraries/dagster-airbyte/dagster_airbyte_tests/experimental/conftest.py index 0c48992de2668..ec0d169d7a28a 100644 --- a/python_modules/libraries/dagster-airbyte/dagster_airbyte_tests/experimental/conftest.py +++ b/python_modules/libraries/dagster-airbyte/dagster_airbyte_tests/experimental/conftest.py @@ -167,7 +167,7 @@ "status": "running", "jobType": "sync", "startTime": "2023-03-25T01:30:50Z", - "connectionId": TEST_CONNECTION_ID + "connectionId": TEST_CONNECTION_ID, } From f466aa68f35ac194b2c51fe71f46d877f2b473cd Mon Sep 17 00:00:00 2001 From: Maxime Armstrong Date: Tue, 17 Dec 2024 12:37:41 -0500 Subject: [PATCH 6/7] Update job response example --- .../dagster_airbyte_tests/experimental/conftest.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python_modules/libraries/dagster-airbyte/dagster_airbyte_tests/experimental/conftest.py b/python_modules/libraries/dagster-airbyte/dagster_airbyte_tests/experimental/conftest.py index ec0d169d7a28a..49b87fc2c07cc 100644 --- a/python_modules/libraries/dagster-airbyte/dagster_airbyte_tests/experimental/conftest.py +++ b/python_modules/libraries/dagster-airbyte/dagster_airbyte_tests/experimental/conftest.py @@ -163,7 +163,7 @@ SAMPLE_JOB_RESPONSE = { - "id": "3fa85f64-5717-4562-b3fc-2c963f66afa6", + "jobId": "3fa85f64-5717-4562-b3fc-2c963f66afa6", "status": "running", "jobType": "sync", "startTime": "2023-03-25T01:30:50Z", From 7ff659c254099ab421cd48c340f76c696d5974c7 Mon Sep 17 00:00:00 2001 From: Maxime Armstrong Date: Tue, 17 Dec 2024 14:40:17 -0500 Subject: [PATCH 7/7] Add comment to job response example --- .../dagster_airbyte_tests/experimental/conftest.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/python_modules/libraries/dagster-airbyte/dagster_airbyte_tests/experimental/conftest.py b/python_modules/libraries/dagster-airbyte/dagster_airbyte_tests/experimental/conftest.py index 49b87fc2c07cc..67d366c7c5717 100644 --- a/python_modules/libraries/dagster-airbyte/dagster_airbyte_tests/experimental/conftest.py +++ b/python_modules/libraries/dagster-airbyte/dagster_airbyte_tests/experimental/conftest.py @@ -144,7 +144,7 @@ } -# Taken from Airbyte API documentation +# Taken from Airbyte REST API documentation # https://reference.airbyte.com/reference/getdestination SAMPLE_DESTINATION_DETAILS = { "destinationId": TEST_DESTINATION_ID, @@ -162,6 +162,8 @@ } +# Taken from Airbyte REST API documentation +# https://reference.airbyte.com/reference/getjob SAMPLE_JOB_RESPONSE = { "jobId": "3fa85f64-5717-4562-b3fc-2c963f66afa6", "status": "running",