Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[13/n][dagster-airbyte] Implement airbyte_assets and build_airbyte_assets_definitions #26432

Merged
merged 5 commits into from
Dec 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,10 @@
except ImportError:
pass

from dagster_airbyte.asset_decorator import airbyte_assets as airbyte_assets
from dagster_airbyte.asset_defs import (
build_airbyte_assets as build_airbyte_assets,
build_airbyte_assets_definitions as build_airbyte_assets_definitions,
load_assets_from_airbyte_instance as load_assets_from_airbyte_instance,
)
from dagster_airbyte.ops import airbyte_sync_op as airbyte_sync_op
Expand All @@ -28,6 +30,7 @@
load_airbyte_cloud_asset_specs as load_airbyte_cloud_asset_specs,
)
from dagster_airbyte.translator import (
AirbyteConnectionTableProps as AirbyteConnectionTableProps,
AirbyteJobStatusType as AirbyteJobStatusType,
AirbyteState as AirbyteState,
DagsterAirbyteTranslator as DagsterAirbyteTranslator,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
from typing import Any, Callable, Optional

from dagster import AssetsDefinition, multi_asset
from dagster._annotations import experimental

from dagster_airbyte.resources import AirbyteCloudWorkspace
from dagster_airbyte.translator import AirbyteMetadataSet, DagsterAirbyteTranslator


@experimental
def airbyte_assets(
*,
connection_id: str,
workspace: AirbyteCloudWorkspace,
name: Optional[str] = None,
group_name: Optional[str] = None,
dagster_airbyte_translator: Optional[DagsterAirbyteTranslator] = None,
) -> Callable[[Callable[..., Any]], AssetsDefinition]:
"""Create a definition for how to sync the tables of a given Airbyte connection.

Args:
connection_id (str): The Airbyte Connection ID.
workspace (AirbyteCloudWorkspace): The Airbyte workspace to fetch assets from.
name (Optional[str], optional): The name of the op.
group_name (Optional[str], optional): The name of the asset group.
dagster_airbyte_translator (Optional[DagsterAirbyteTranslator], optional): The translator to use
to convert Airbyte content into :py:class:`dagster.AssetSpec`.
Defaults to :py:class:`DagsterAirbyteTranslator`.

Examples:
Sync the tables of an Airbyte connection:

.. code-block:: python

from dagster_airbyte import AirbyteCloudWorkspace, airbyte_assets

import dagster as dg

airbyte_workspace = AirbyteCloudWorkspace(
workspace_id=dg.EnvVar("AIRBYTE_CLOUD_WORKSPACE_ID"),
client_id=dg.EnvVar("AIRBYTE_CLOUD_CLIENT_ID"),
client_secret=dg.EnvVar("AIRBYTE_CLOUD_CLIENT_SECRET"),
)


@airbyte_assets(
connection_id="airbyte_connection_id",
workspace=airbyte_workspace,
)
def airbyte_connection_assets(context: dg.AssetExecutionContext, airbyte: AirbyteCloudWorkspace):
yield from airbyte.sync_and_poll(context=context)


defs = dg.Definitions(
assets=[airbyte_connection_assets],
resources={"airbyte": airbyte_workspace},
)

Sync the tables of an Airbyte connection with a custom translator:

.. code-block:: python

from dagster_airbyte import (
DagsterAirbyteTranslator,
AirbyteConnectionTableProps,
AirbyteCloudWorkspace,
airbyte_assets
)

import dagster as dg

class CustomDagsterAirbyteTranslator(DagsterAirbyteTranslator):
def get_asset_spec(self, props: AirbyteConnectionTableProps) -> dg.AssetSpec:
default_spec = super().get_asset_spec(props)
return default_spec.merge_attributes(
metadata={"custom": "metadata"},
)

airbyte_workspace = AirbyteCloudWorkspace(
workspace_id=dg.EnvVar("AIRBYTE_CLOUD_WORKSPACE_ID"),
client_id=dg.EnvVar("AIRBYTE_CLOUD_CLIENT_ID"),
client_secret=dg.EnvVar("AIRBYTE_CLOUD_CLIENT_SECRET"),
)


@airbyte_assets(
connection_id="airbyte_connection_id",
workspace=airbyte_workspace,
dpeng817 marked this conversation as resolved.
Show resolved Hide resolved
dagster_airbyte_translator=CustomDagsterAirbyteTranslator()
)
def airbyte_connection_assets(context: dg.AssetExecutionContext, airbyte: AirbyteCloudWorkspace):
yield from airbyte.sync_and_poll(context=context)


defs = dg.Definitions(
assets=[airbyte_connection_assets],
resources={"airbyte": airbyte_workspace},
)
"""
return multi_asset(
name=name,
group_name=group_name,
can_subset=True,
specs=[
spec
for spec in workspace.load_asset_specs(
dagster_airbyte_translator=dagster_airbyte_translator or DagsterAirbyteTranslator()
)
if AirbyteMetadataSet.extract(spec.metadata).connection_id == connection_id
],
)
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

import yaml
from dagster import (
AssetExecutionContext,
AssetKey,
AssetOut,
AutoMaterializePolicy,
Expand All @@ -33,6 +34,7 @@
SourceAsset,
_check as check,
)
from dagster._annotations import experimental
from dagster._core.definitions import AssetsDefinition, multi_asset
from dagster._core.definitions.cacheable_assets import (
AssetsDefinitionCacheableData,
Expand All @@ -45,7 +47,14 @@
from dagster._core.execution.context.init import build_init_resource_context
from dagster._utils.merger import merge_dicts

from dagster_airbyte.resources import AirbyteCloudResource, AirbyteResource, BaseAirbyteResource
from dagster_airbyte.asset_decorator import airbyte_assets
from dagster_airbyte.resources import (
AirbyteCloudResource,
AirbyteCloudWorkspace,
AirbyteResource,
BaseAirbyteResource,
)
from dagster_airbyte.translator import AirbyteMetadataSet, DagsterAirbyteTranslator
from dagster_airbyte.types import AirbyteTableMetadata
from dagster_airbyte.utils import (
generate_materializations,
Expand Down Expand Up @@ -1032,3 +1041,115 @@ def load_assets_from_airbyte_instance(
connection_to_freshness_policy_fn=connection_to_freshness_policy_fn,
connection_to_auto_materialize_policy_fn=connection_to_auto_materialize_policy_fn,
)


# -----------------------
# Reworked assets factory
# -----------------------


@experimental
def build_airbyte_assets_definitions(
*,
workspace: AirbyteCloudWorkspace,
dagster_airbyte_translator: Optional[DagsterAirbyteTranslator] = None,
) -> Sequence[AssetsDefinition]:
"""The list of AssetsDefinition for all connections in the Airbyte workspace.

Args:
workspace (AirbyteCloudWorkspace): The Airbyte workspace to fetch assets from.
dagster_airbyte_translator (Optional[DagsterAirbyteTranslator], optional): The translator to use
to convert Airbyte content into :py:class:`dagster.AssetSpec`.
Defaults to :py:class:`DagsterAirbyteTranslator`.

Returns:
List[AssetsDefinition]: The list of AssetsDefinition for all connections in the Airbyte workspace.

Examples:
Sync the tables of a Airbyte connection:
.. code-block:: python

from dagster_airbyte import AirbyteCloudWorkspace, build_airbyte_assets_definitions

import dagster as dg

airbyte_workspace = AirbyteCloudWorkspace(
workspace_id=dg.EnvVar("AIRBYTE_CLOUD_WORKSPACE_ID"),
client_id=dg.EnvVar("AIRBYTE_CLOUD_CLIENT_ID"),
client_secret=dg.EnvVar("AIRBYTE_CLOUD_CLIENT_SECRET"),
)


airbyte_assets = build_airbyte_assets_definitions(workspace=workspace)

defs = dg.Definitions(
assets=airbyte_assets,
resources={"airbyte": airbyte_workspace},
)

Sync the tables of a Airbyte connection with a custom translator:
.. code-block:: python

from dagster_airbyte import (
DagsterAirbyteTranslator,
AirbyteConnectionTableProps,
AirbyteCloudWorkspace,
build_airbyte_assets_definitions
)

import dagster as dg

class CustomDagsterAirbyteTranslator(DagsterAirbyteTranslator):
def get_asset_spec(self, props: AirbyteConnectionTableProps) -> dg.AssetSpec:
default_spec = super().get_asset_spec(props)
return default_spec.merge_attributes(
metadata={"custom": "metadata"},
)

airbyte_workspace = AirbyteCloudWorkspace(
workspace_id=dg.EnvVar("AIRBYTE_CLOUD_WORKSPACE_ID"),
client_id=dg.EnvVar("AIRBYTE_CLOUD_CLIENT_ID"),
client_secret=dg.EnvVar("AIRBYTE_CLOUD_CLIENT_SECRET"),
)


airbyte_assets = build_airbyte_assets_definitions(
workspace=workspace,
dagster_airbyte_translator=CustomDagsterAirbyteTranslator()
)

defs = dg.Definitions(
assets=airbyte_assets,
resources={"airbyte": airbyte_workspace},
)
"""
dagster_airbyte_translator = dagster_airbyte_translator or DagsterAirbyteTranslator()

all_asset_specs = workspace.load_asset_specs(
dagster_airbyte_translator=dagster_airbyte_translator
)

connections = {
(
check.not_none(AirbyteMetadataSet.extract(spec.metadata).connection_id),
check.not_none(AirbyteMetadataSet.extract(spec.metadata).connection_name),
)
for spec in all_asset_specs
}

_asset_fns = []
for connection_id, connection_name in connections:

@airbyte_assets(
connection_id=connection_id,
workspace=workspace,
name=_clean_name(connection_name),
group_name=_clean_name(connection_name),
dagster_airbyte_translator=dagster_airbyte_translator,
)
def _asset_fn(context: AssetExecutionContext, airbyte: AirbyteCloudWorkspace):
yield from airbyte.sync_and_poll(context=context)

_asset_fns.append(_asset_fn)

return _asset_fns
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

import requests
from dagster import (
AssetExecutionContext,
ConfigurableResource,
Definitions,
Failure,
Expand Down Expand Up @@ -1172,6 +1173,47 @@ def fetch_airbyte_workspace_data(
destinations_by_id=destinations_by_id,
)

@cached_method
def load_asset_specs(
self,
dagster_airbyte_translator: Optional[DagsterAirbyteTranslator] = None,
) -> Sequence[AssetSpec]:
"""Returns a list of AssetSpecs representing the Airbyte content in the workspace.

Args:
dagster_airbyte_translator (Optional[DagsterAirbyteTranslator], optional): The translator to use
to convert Airbyte content into :py:class:`dagster.AssetSpec`.
Defaults to :py:class:`DagsterAirbyteTranslator`.

Returns:
List[AssetSpec]: The set of assets representing the Airbyte content in the workspace.

Examples:
Loading the asset specs for a given Airbyte workspace:
.. code-block:: python

from dagster_airbyte import AirbyteCloudWorkspace

import dagster as dg

airbyte_workspace = AirbyteCloudWorkspace(
workspace_id=dg.EnvVar("AIRBYTE_CLOUD_WORKSPACE_ID"),
client_id=dg.EnvVar("AIRBYTE_CLOUD_CLIENT_ID"),
client_secret=dg.EnvVar("AIRBYTE_CLOUD_CLIENT_SECRET"),
)

airbyte_specs = airbyte_workspace.load_asset_specs()
defs = dg.Definitions(assets=airbyte_specs, resources={"airbyte": airbyte_workspace}
"""
dagster_airbyte_translator = dagster_airbyte_translator or DagsterAirbyteTranslator()

return load_airbyte_cloud_asset_specs(
workspace=self, dagster_airbyte_translator=dagster_airbyte_translator
)

def sync_and_poll(self, context: AssetExecutionContext):
raise NotImplementedError()


@experimental
def load_airbyte_cloud_asset_specs(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
TEST_CLIENT_ID = "some_client_id"
TEST_CLIENT_SECRET = "some_client_secret"

TEST_ANOTHER_WORKSPACE_ID = "some_other_workspace_id"

TEST_ACCESS_TOKEN = "some_access_token"

# Taken from the examples in the Airbyte REST API documentation
Expand Down
Loading