diff --git a/python_modules/libraries/dagster-airbyte/dagster_airbyte/resources.py b/python_modules/libraries/dagster-airbyte/dagster_airbyte/resources.py index cfbbe344a099c..b31f6f1121540 100644 --- a/python_modules/libraries/dagster-airbyte/dagster_airbyte/resources.py +++ b/python_modules/libraries/dagster-airbyte/dagster_airbyte/resources.py @@ -26,7 +26,7 @@ from pydantic import Field, PrivateAttr from requests.exceptions import RequestException -from dagster_airbyte.translator import AirbyteWorkspaceData +from dagster_airbyte.translator import AirbyteConnection, AirbyteDestination, AirbyteWorkspaceData from dagster_airbyte.types import AirbyteOutput AIRBYTE_REST_API_BASE = "https://api.airbyte.com" @@ -1028,4 +1028,30 @@ def fetch_airbyte_workspace_data( Returns: AirbyteWorkspaceData: A snapshot of the Airbyte workspace's content. """ - raise NotImplementedError() + connections_by_id = {} + destinations_by_id = {} + + client = self.get_client() + connections = client.get_connections()["data"] + + for partial_connection_details in connections: + full_connection_details = client.get_connection_details( + connection_id=partial_connection_details["connectionId"] + ) + connection = AirbyteConnection.from_connection_details( + connection_details=full_connection_details + ) + connections_by_id[connection.id] = connection + + destination_details = client.get_destination_details( + destination_id=connection.destination_id + ) + destination = AirbyteDestination.from_destination_details( + destination_details=destination_details + ) + destinations_by_id[destination.id] = destination + + return AirbyteWorkspaceData( + connections_by_id=connections_by_id, + destinations_by_id=destinations_by_id, + ) diff --git a/python_modules/libraries/dagster-airbyte/dagster_airbyte/translator.py b/python_modules/libraries/dagster-airbyte/dagster_airbyte/translator.py index 09259fcd71ca8..ce4e745bb3fd2 100644 --- a/python_modules/libraries/dagster-airbyte/dagster_airbyte/translator.py +++ b/python_modules/libraries/dagster-airbyte/dagster_airbyte/translator.py @@ -1,4 +1,4 @@ -from typing import Any, Mapping, NamedTuple +from typing import Any, Mapping, NamedTuple, Optional from dagster._annotations import experimental from dagster._core.definitions.asset_spec import AssetSpec @@ -14,12 +14,29 @@ class AirbyteConnectionTableProps(NamedTuple): ... class AirbyteConnection: """Represents an Airbyte connection, based on data as returned from the API.""" + id: str + name: str + stream_prefix: Optional[str] + streams: Mapping[str, "AirbyteStream"] + destination_id: str + @classmethod def from_connection_details( cls, connection_details: Mapping[str, Any], ) -> "AirbyteConnection": - raise NotImplementedError() + return cls( + id=connection_details["connectionId"], + name=connection_details["name"], + stream_prefix=connection_details.get("prefix"), + streams={ + stream_details["stream"]["name"]: AirbyteStream.from_stream_details( + stream_details=stream_details + ) + for stream_details in connection_details.get("syncCatalog", {}).get("streams", []) + }, + destination_id=connection_details["destinationId"], + ) @whitelist_for_serdes @@ -27,12 +44,43 @@ def from_connection_details( class AirbyteDestination: """Represents an Airbyte destination, based on data as returned from the API.""" + id: str + database: Optional[str] + schema: Optional[str] + @classmethod def from_destination_details( cls, destination_details: Mapping[str, Any], ) -> "AirbyteDestination": - raise NotImplementedError() + return cls( + id=destination_details["destinationId"], + database=destination_details["configuration"].get("database"), + schema=destination_details["configuration"].get("schema"), + ) + + +@whitelist_for_serdes +@record +class AirbyteStream: + """Represents an Airbyte stream, based on data as returned from the API. + A stream in Airbyte corresponds to a table. + """ + + name: str + selected: bool + json_schema: Mapping[str, Any] + + @classmethod + def from_stream_details( + cls, + stream_details: Mapping[str, Any], + ) -> "AirbyteStream": + return cls( + name=stream_details["stream"]["name"], + selected=stream_details["config"].get("selected", False), + json_schema=stream_details["stream"].get("jsonSchema", {}), + ) @record diff --git a/python_modules/libraries/dagster-airbyte/dagster_airbyte_tests/experimental/test_asset_specs.py b/python_modules/libraries/dagster-airbyte/dagster_airbyte_tests/experimental/test_asset_specs.py new file mode 100644 index 0000000000000..fc54e7d3f2a30 --- /dev/null +++ b/python_modules/libraries/dagster-airbyte/dagster_airbyte_tests/experimental/test_asset_specs.py @@ -0,0 +1,22 @@ +import responses +from dagster_airbyte import AirbyteCloudWorkspace + +from dagster_airbyte_tests.experimental.conftest import ( + TEST_CLIENT_ID, + TEST_CLIENT_SECRET, + TEST_WORKSPACE_ID, +) + + +def test_fetch_airbyte_workspace_data( + fetch_workspace_data_api_mocks: responses.RequestsMock, +) -> None: + resource = AirbyteCloudWorkspace( + workspace_id=TEST_WORKSPACE_ID, + client_id=TEST_CLIENT_ID, + client_secret=TEST_CLIENT_SECRET, + ) + + actual_workspace_data = resource.fetch_airbyte_workspace_data() + assert len(actual_workspace_data.connections_by_id) == 1 + assert len(actual_workspace_data.destinations_by_id) == 1