Skip to content

Commit

Permalink
[5/n][dagster-airbyte] Implement fetch_airbyte_workspace_data (dagste…
Browse files Browse the repository at this point in the history
…r-io#26253)

## Summary & Motivation

This PR implements `AirbyteCloudWorkspace.fetch_airbyte_workspace_data`,
that fetches the connections and destinations included in a given
workspace.

## How I Tested These Changes

Additional unit tests with BK.
  • Loading branch information
maximearmstrong authored and pskinnerthyme committed Dec 16, 2024
1 parent f6e734d commit 1a1b03b
Show file tree
Hide file tree
Showing 3 changed files with 101 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
from pydantic import Field, PrivateAttr
from requests.exceptions import RequestException

from dagster_airbyte.translator import AirbyteWorkspaceData
from dagster_airbyte.translator import AirbyteConnection, AirbyteDestination, AirbyteWorkspaceData
from dagster_airbyte.types import AirbyteOutput

AIRBYTE_REST_API_BASE = "https://api.airbyte.com"
Expand Down Expand Up @@ -1028,4 +1028,30 @@ def fetch_airbyte_workspace_data(
Returns:
AirbyteWorkspaceData: A snapshot of the Airbyte workspace's content.
"""
raise NotImplementedError()
connections_by_id = {}
destinations_by_id = {}

client = self.get_client()
connections = client.get_connections()["data"]

for partial_connection_details in connections:
full_connection_details = client.get_connection_details(
connection_id=partial_connection_details["connectionId"]
)
connection = AirbyteConnection.from_connection_details(
connection_details=full_connection_details
)
connections_by_id[connection.id] = connection

destination_details = client.get_destination_details(
destination_id=connection.destination_id
)
destination = AirbyteDestination.from_destination_details(
destination_details=destination_details
)
destinations_by_id[destination.id] = destination

return AirbyteWorkspaceData(
connections_by_id=connections_by_id,
destinations_by_id=destinations_by_id,
)
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import Any, Mapping, NamedTuple
from typing import Any, Mapping, NamedTuple, Optional

from dagster._annotations import experimental
from dagster._core.definitions.asset_spec import AssetSpec
Expand All @@ -14,25 +14,73 @@ class AirbyteConnectionTableProps(NamedTuple): ...
class AirbyteConnection:
"""Represents an Airbyte connection, based on data as returned from the API."""

id: str
name: str
stream_prefix: Optional[str]
streams: Mapping[str, "AirbyteStream"]
destination_id: str

@classmethod
def from_connection_details(
cls,
connection_details: Mapping[str, Any],
) -> "AirbyteConnection":
raise NotImplementedError()
return cls(
id=connection_details["connectionId"],
name=connection_details["name"],
stream_prefix=connection_details.get("prefix"),
streams={
stream_details["stream"]["name"]: AirbyteStream.from_stream_details(
stream_details=stream_details
)
for stream_details in connection_details.get("syncCatalog", {}).get("streams", [])
},
destination_id=connection_details["destinationId"],
)


@whitelist_for_serdes
@record
class AirbyteDestination:
"""Represents an Airbyte destination, based on data as returned from the API."""

id: str
database: Optional[str]
schema: Optional[str]

@classmethod
def from_destination_details(
cls,
destination_details: Mapping[str, Any],
) -> "AirbyteDestination":
raise NotImplementedError()
return cls(
id=destination_details["destinationId"],
database=destination_details["configuration"].get("database"),
schema=destination_details["configuration"].get("schema"),
)


@whitelist_for_serdes
@record
class AirbyteStream:
"""Represents an Airbyte stream, based on data as returned from the API.
A stream in Airbyte corresponds to a table.
"""

name: str
selected: bool
json_schema: Mapping[str, Any]

@classmethod
def from_stream_details(
cls,
stream_details: Mapping[str, Any],
) -> "AirbyteStream":
return cls(
name=stream_details["stream"]["name"],
selected=stream_details["config"].get("selected", False),
json_schema=stream_details["stream"].get("jsonSchema", {}),
)


@record
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
import responses
from dagster_airbyte import AirbyteCloudWorkspace

from dagster_airbyte_tests.experimental.conftest import (
TEST_CLIENT_ID,
TEST_CLIENT_SECRET,
TEST_WORKSPACE_ID,
)


def test_fetch_airbyte_workspace_data(
fetch_workspace_data_api_mocks: responses.RequestsMock,
) -> None:
resource = AirbyteCloudWorkspace(
workspace_id=TEST_WORKSPACE_ID,
client_id=TEST_CLIENT_ID,
client_secret=TEST_CLIENT_SECRET,
)

actual_workspace_data = resource.fetch_airbyte_workspace_data()
assert len(actual_workspace_data.connections_by_id) == 1
assert len(actual_workspace_data.destinations_by_id) == 1

0 comments on commit 1a1b03b

Please sign in to comment.