Skip to content

Commit

Permalink
[4/n][dagster-airbyte] Implement connection and destination methods i…
Browse files Browse the repository at this point in the history
…n AirbyteClient (dagster-io#26242)

## Summary & Motivation

This PR implements the `get_connections`, `get_connection_details` and
`get_destination_destails` in the `AirbyteCloudClient`.

Note that we need to use the [internal Airbyte Configuration
API](https://airbyte-public-api-docs.s3.us-east-2.amazonaws.com/rapidoc-api-docs.html#overview)
to get the stream configs in the connection details. The other methods
are using the public REST API.

## How I Tested These Changes

Additional unit tests with BK
  • Loading branch information
maximearmstrong authored and pskinnerthyme committed Dec 16, 2024
1 parent 802bf96 commit f6e734d
Show file tree
Hide file tree
Showing 3 changed files with 276 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,11 @@
from dagster_airbyte.translator import AirbyteWorkspaceData
from dagster_airbyte.types import AirbyteOutput

AIRBYTE_API_BASE = "https://api.airbyte.com"
AIRBYTE_API_VERSION = "v1"
AIRBYTE_REST_API_BASE = "https://api.airbyte.com"
AIRBYTE_REST_API_VERSION = "v1"

AIRBYTE_CONFIGURATION_API_BASE = "https://cloud.airbyte.com/api"
AIRBYTE_CONFIGURATION_API_VERSION = "v1"

DEFAULT_POLL_INTERVAL_SECONDS = 10

Expand Down Expand Up @@ -836,8 +839,12 @@ def _log(self) -> logging.Logger:
return get_dagster_logger()

@property
def api_base_url(self) -> str:
return f"{AIRBYTE_API_BASE}/{AIRBYTE_API_VERSION}"
def rest_api_base_url(self) -> str:
return f"{AIRBYTE_REST_API_BASE}/{AIRBYTE_REST_API_VERSION}"

@property
def configuration_api_base_url(self) -> str:
return f"{AIRBYTE_CONFIGURATION_API_BASE}/{AIRBYTE_CONFIGURATION_API_VERSION}"

@property
def all_additional_request_params(self) -> Mapping[str, Any]:
Expand All @@ -862,8 +869,8 @@ def _refresh_access_token(self) -> None:
response = check.not_none(
self._make_request(
method="POST",
endpoint="/applications/token",
base_url=self.api_base_url,
endpoint="applications/token",
base_url=self.rest_api_base_url,
data={
"client_id": self.client_id,
"client_secret": self.client_secret,
Expand Down Expand Up @@ -902,6 +909,7 @@ def _make_request(
endpoint: str,
base_url: str,
data: Optional[Mapping[str, Any]] = None,
params: Optional[Mapping[str, Any]] = None,
include_additional_request_params: bool = True,
) -> Mapping[str, Any]:
"""Creates and sends a request to the desired Airbyte REST API endpoint.
Expand All @@ -911,13 +919,14 @@ def _make_request(
endpoint (str): The Airbyte API endpoint to send this request to.
base_url (str): The base url to the Airbyte API to use.
data (Optional[Dict[str, Any]]): JSON-formatted data string to be included in the request.
params (Optional[Dict[str, Any]]): JSON-formatted query params to be included in the request.
include_additional_request_params (bool): Whether to include authorization and user-agent headers
to the request parameters. Defaults to True.
Returns:
Dict[str, Any]: Parsed json data from the response to this request
"""
url = base_url + endpoint
url = f"{base_url}/{endpoint}"

num_retries = 0
while True:
Expand All @@ -926,7 +935,7 @@ def _make_request(
include_additional_request_params=include_additional_request_params
)
response = session.request(
method=method, url=url, json=data, timeout=self.request_timeout
method=method, url=url, json=data, params=params, timeout=self.request_timeout
)
response.raise_for_status()
return response.json()
Expand All @@ -942,12 +951,35 @@ def _make_request(
raise Failure(f"Max retries ({self.request_max_retries}) exceeded with url: {url}.")

def get_connections(self) -> Mapping[str, Any]:
"""Fetches all connections of an Airbyte workspace from the Airbyte API."""
raise NotImplementedError()
"""Fetches all connections of an Airbyte workspace from the Airbyte REST API."""
return self._make_request(
method="GET",
endpoint="connections",
base_url=self.rest_api_base_url,
params={"workspaceIds": self.workspace_id},
)

def get_connection_details(self, connection_id) -> Mapping[str, Any]:
"""Fetches details about a given connection from the Airbyte Configuration API.
The Airbyte Configuration API is an internal and may change in the future.
"""
# Using the Airbyte Configuration API to get the connection details, including streams and their configs.
# https://airbyte-public-api-docs.s3.us-east-2.amazonaws.com/rapidoc-api-docs.html#post-/v1/connections/get
# https://github.com/airbytehq/airbyte-platform/blob/v1.0.0/airbyte-api/server-api/src/main/openapi/config.yaml
return self._make_request(
method="POST",
endpoint="connections/get",
base_url=self.configuration_api_base_url,
data={"connectionId": connection_id},
)

def get_destination_details(self, destination_id: str) -> Mapping[str, Any]:
"""Fetches details about a given destination from the Airbyte API."""
raise NotImplementedError()
"""Fetches details about a given destination from the Airbyte REST API."""
return self._make_request(
method="GET",
endpoint=f"destinations/{destination_id}",
base_url=self.rest_api_base_url,
)


@experimental
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,28 +2,179 @@

import pytest
import responses
from dagster_airbyte.resources import AIRBYTE_API_BASE, AIRBYTE_API_VERSION
from dagster_airbyte.resources import (
AIRBYTE_CONFIGURATION_API_BASE,
AIRBYTE_CONFIGURATION_API_VERSION,
AIRBYTE_REST_API_BASE,
AIRBYTE_REST_API_VERSION,
)

TEST_WORKSPACE_ID = "some_workspace_id"
TEST_CLIENT_ID = "some_client_id"
TEST_CLIENT_SECRET = "some_client_secret"

TEST_ACCESS_TOKEN = "some_access_token"

# Taken from Airbyte API documentation
# Taken from the examples in the Airbyte REST API documentation
TEST_DESTINATION_ID = "18dccc91-0ab1-4f72-9ed7-0b8fc27c5826"
TEST_CONNECTION_ID = "9924bcd0-99be-453d-ba47-c2c9766f7da5"


# Taken from Airbyte REST API documentation
# https://reference.airbyte.com/reference/createaccesstoken
SAMPLE_ACCESS_TOKEN = {"access_token": TEST_ACCESS_TOKEN}


# Taken from Airbyte REST API documentation
# https://reference.airbyte.com/reference/listconnections
SAMPLE_CONNECTIONS = {
"next": "https://api.airbyte.com/v1/connections?limit=5&offset=10",
"previous": "https://api.airbyte.com/v1/connections?limit=5&offset=0",
"data": [
{
"connectionId": TEST_CONNECTION_ID,
"workspaceId": "744cc0ed-7f05-4949-9e60-2a814f90c035",
"name": "Postgres To Snowflake",
"sourceId": "0c31738c-0b2d-4887-b506-e2cd1c39cc35",
"destinationId": TEST_DESTINATION_ID,
"status": "active",
"schedule": {
"schedule_type": "cron",
},
}
],
}


# Taken from Airbyte Configuration API documentation
# https://airbyte-public-api-docs.s3.us-east-2.amazonaws.com/rapidoc-api-docs.html#post-/v1/connections/get
# https://github.com/airbytehq/airbyte-platform/blob/v1.0.0/airbyte-api/server-api/src/main/openapi/config.yaml
SAMPLE_CONNECTION_DETAILS = {
"connectionId": TEST_CONNECTION_ID,
"name": "string",
"namespaceDefinition": "source",
"namespaceFormat": "${SOURCE_NAMESPACE}",
"prefix": "string",
"sourceId": "0c31738c-0b2d-4887-b506-e2cd1c39cc35",
"destinationId": TEST_DESTINATION_ID,
"operationIds": ["1938d12e-b540-4000-8c46-1be33f00ab01"],
"syncCatalog": {
"streams": [
{
"stream": {
"name": "string",
"jsonSchema": {},
"supportedSyncModes": ["full_refresh"],
"sourceDefinedCursor": False,
"defaultCursorField": ["string"],
"sourceDefinedPrimaryKey": [["string"]],
"namespace": "string",
"isResumable": False,
},
"config": {
"syncMode": "full_refresh",
"cursorField": ["string"],
"destinationSyncMode": "append",
"primaryKey": [["string"]],
"aliasName": "string",
"selected": False,
"suggested": False,
"fieldSelectionEnabled": False,
"selectedFields": [{"fieldPath": ["string"]}],
"hashedFields": [{"fieldPath": ["string"]}],
"mappers": [
{
"id": "1938d12e-b540-4000-8ff0-46231e18f301",
"type": "hashing",
"mapperConfiguration": {},
}
],
"minimumGenerationId": 0,
"generationId": 0,
"syncId": 0,
},
}
]
},
"schedule": {"units": 0, "timeUnit": "minutes"},
"scheduleType": "manual",
"scheduleData": {
"basicSchedule": {"timeUnit": "minutes", "units": 0},
"cron": {"cronExpression": "string", "cronTimeZone": "string"},
},
"status": "active",
"resourceRequirements": {
"cpu_request": "string",
"cpu_limit": "string",
"memory_request": "string",
"memory_limit": "string",
"ephemeral_storage_request": "string",
"ephemeral_storage_limit": "string",
},
"sourceCatalogId": "1938d12e-b540-4000-85a4-7ecc2445a901",
"geography": "auto",
"breakingChange": False,
"notifySchemaChanges": False,
"notifySchemaChangesByEmail": False,
"nonBreakingChangesPreference": "ignore",
"created_at": 0,
"backfillPreference": "enabled",
"workspaceId": "744cc0ed-7f05-4949-9e60-2a814f90c035",
}


# Taken from Airbyte API documentation
# https://reference.airbyte.com/reference/getdestination
SAMPLE_DESTINATION_DETAILS = {
"destinationId": TEST_DESTINATION_ID,
"name": "My Destination",
"sourceType": "postgres",
"workspaceId": "744cc0ed-7f05-4949-9e60-2a814f90c035",
"configuration": {
"conversion_window_days": 14,
"customer_id": "1234567890",
"start_date": "2023-01-01",
"end_date": "2024-01-01",
},
}


@pytest.fixture(
name="base_api_mocks",
)
def base_api_mocks_fixture() -> Iterator[responses.RequestsMock]:
with responses.RequestsMock() as response:
response.add(
method=responses.POST,
url=f"{AIRBYTE_API_BASE}/{AIRBYTE_API_VERSION}/applications/token",
url=f"{AIRBYTE_REST_API_BASE}/{AIRBYTE_REST_API_VERSION}/applications/token",
json=SAMPLE_ACCESS_TOKEN,
status=201,
)
yield response


@pytest.fixture(
name="fetch_workspace_data_api_mocks",
)
def fetch_workspace_data_api_mocks_fixture(
base_api_mocks: responses.RequestsMock,
) -> Iterator[responses.RequestsMock]:
base_api_mocks.add(
method=responses.GET,
url=f"{AIRBYTE_REST_API_BASE}/{AIRBYTE_REST_API_VERSION}/connections",
json=SAMPLE_CONNECTIONS,
status=200,
)
base_api_mocks.add(
method=responses.POST,
url=f"{AIRBYTE_CONFIGURATION_API_BASE}/{AIRBYTE_CONFIGURATION_API_VERSION}/connections/get",
json=SAMPLE_CONNECTION_DETAILS,
status=200,
)
base_api_mocks.add(
method=responses.GET,
url=f"{AIRBYTE_REST_API_BASE}/{AIRBYTE_REST_API_VERSION}/destinations/{TEST_DESTINATION_ID}",
json=SAMPLE_DESTINATION_DETAILS,
status=200,
)
yield base_api_mocks
Loading

0 comments on commit f6e734d

Please sign in to comment.