Skip to content

Commit

Permalink
[dagster-airbyte] Scaffold DagsterAirbyteTranslator for rework
Browse files Browse the repository at this point in the history
  • Loading branch information
maximearmstrong committed Dec 2, 2024
1 parent 93a7844 commit 3dcc50c
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,13 @@
from dagster._annotations import experimental
from dagster._config.pythonic_config import infer_schema_from_config_class
from dagster._core.definitions.resource_definition import dagster_maintained_resource
from dagster._record import record
from dagster._serdes.serdes import whitelist_for_serdes
from dagster._utils.cached_method import cached_method
from dagster._utils.merger import deep_merge_dicts
from pydantic import Field, PrivateAttr
from requests.exceptions import RequestException

from dagster_airbyte.types import AirbyteOutput
from dagster_airbyte.translator import AirbyteWorkspaceData

DEFAULT_POLL_INTERVAL_SECONDS = 10

Expand Down Expand Up @@ -801,42 +800,6 @@ def airbyte_cloud_resource(context) -> AirbyteCloudResource:
# ------------------


@whitelist_for_serdes
@record
class AirbyteConnection:
"""Represents an Airbyte connection, based on data as returned from the API."""

@classmethod
def from_connection_details(
cls,
connection_details: Mapping[str, Any],
) -> "AirbyteConnection":
raise NotImplementedError()


@whitelist_for_serdes
@record
class AirbyteDestination:
"""Represents an Airbyte destination, based on data as returned from the API."""

@classmethod
def from_destination_details(
cls,
destination_details: Mapping[str, Any],
) -> "AirbyteDestination":
raise NotImplementedError()


@record
class AirbyteWorkspaceData:
"""A record representing all content in an Airbyte workspace.
Provided as context for the translator so that it can resolve dependencies between content.
"""

connections_by_id: Mapping[str, AirbyteConnection]
destinations_by_id: Mapping[str, AirbyteDestination]


@experimental
class AirbyteClient:
"""This class exposes methods on top of the Airbyte REST API."""
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
from typing import Any, Mapping, NamedTuple

from dagster._record import record
from dagster._annotations import experimental
from dagster._core.definitions.asset_spec import AssetSpec
from dagster._serdes.serdes import whitelist_for_serdes


class AirbyteConnectionTableProps(NamedTuple):
...


@whitelist_for_serdes
@record
class AirbyteConnection:
"""Represents an Airbyte connection, based on data as returned from the API."""

@classmethod
def from_connection_details(
cls,
connection_details: Mapping[str, Any],
) -> "AirbyteConnection":
raise NotImplementedError()


@whitelist_for_serdes
@record
class AirbyteDestination:
"""Represents an Airbyte destination, based on data as returned from the API."""

@classmethod
def from_destination_details(
cls,
destination_details: Mapping[str, Any],
) -> "AirbyteDestination":
raise NotImplementedError()


@record
class AirbyteWorkspaceData:
"""A record representing all content in an Airbyte workspace.
Provided as context for the translator so that it can resolve dependencies between content.
"""

connections_by_id: Mapping[str, AirbyteConnection]
destinations_by_id: Mapping[str, AirbyteDestination]


@experimental
class DagsterAirbyteTranslator:
"""Translator class which converts a `AirbyteConnectionTableProps` object into AssetSpecs.
Subclass this class to implement custom logic for Airbyte content.
"""

def get_asset_spec(self, props: AirbyteConnectionTableProps) -> AssetSpec:
"""Get the AssetSpec for a table synced by an Airbyte connection."""
raise NotImplementedError()

0 comments on commit 3dcc50c

Please sign in to comment.