Skip to content

Commit

Permalink
[14/n][dagster-airbyte] Implement materialization method for AirbyteC…
Browse files Browse the repository at this point in the history
…loudWorkspace (#26559)

## Summary & Motivation

This PR implements `AirbyteCloudWorkspace.sync_and_poll`, the
materialization method for Airbyte Cloud assets. This method:
- calls `AirbyteCloudClient.sync_and_poll`
- takes the AirbyteOutput returned by `AirbyteCloudClient.sync_and_poll`
and generates the asset materializations
- yields `MaterializeResult` for each expected asset and
`AssetMaterialization` for each unexpected asset
- a connection table that was not in the connection at definitions
loading time can be in the AirbyteOutput. Eg. the table was added after
definitions loading time and before sync.
- logs a warning for each unmaterialized table
- a connection table can be created at definitions loading time, but can
be missing in the AirbyteOutput. Eg. the table was deleted after
definitions loading time and before sync.

Can be leveraged like:

```python
from dagster_airbyte import AirbyteCloudWorkspace, airbyte_assets

import dagster as dg

airbyte_workspace = AirbyteCloudWorkspace(
    workspace_id=dg.EnvVar("AIRBYTE_CLOUD_WORKSPACE_ID"),
    client_id=dg.EnvVar("AIRBYTE_CLOUD_CLIENT_ID"),
    client_secret=dg.EnvVar("AIRBYTE_CLOUD_CLIENT_SECRET"),
)


@airbyte_assets(
    connection_id="airbyte_connection_id",
    name="airbyte_connection_name",
    group_name="airbyte_connection_name",
    workspace=airbyte_workspace,
)
def airbyte_connection_assets(context: dg.AssetExecutionContext, airbyte: AirbyteCloudWorkspace):
    yield from airbyte.sync_and_poll(context=context)


defs = dg.Definitions(
    assets=[airbyte_connection_assets],
    resources={"airbyte": airbyte_workspace},
)
```

## How I Tested These Changes

Additional tests with BK

## Changelog

[dagster-airbyte] Airbyte Cloud assets can now be materialized using the
`AirbyteCloudWorkspace.sync_and_poll(…)` method in the definition of a
`@airbyte_assets` decorator.
  • Loading branch information
maximearmstrong authored Dec 26, 2024
1 parent 8825029 commit 5ee51b7
Show file tree
Hide file tree
Showing 10 changed files with 357 additions and 110 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -97,14 +97,16 @@ def airbyte_connection_assets(context: dg.AssetExecutionContext, airbyte: Airbyt
resources={"airbyte": airbyte_workspace},
)
"""
dagster_airbyte_translator = dagster_airbyte_translator or DagsterAirbyteTranslator()

return multi_asset(
name=name,
group_name=group_name,
can_subset=True,
specs=[
spec
for spec in workspace.load_asset_specs(
dagster_airbyte_translator=dagster_airbyte_translator or DagsterAirbyteTranslator()
dagster_airbyte_translator=dagster_airbyte_translator
)
if AirbyteMetadataSet.extract(spec.metadata).connection_id == connection_id
],
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import hashlib
import inspect
import os
import re
from abc import abstractmethod
from functools import partial
from itertools import chain
Expand Down Expand Up @@ -57,6 +56,7 @@
from dagster_airbyte.translator import AirbyteMetadataSet, DagsterAirbyteTranslator
from dagster_airbyte.types import AirbyteTableMetadata
from dagster_airbyte.utils import (
clean_name,
generate_materializations,
generate_table_schema,
is_basic_normalization_operation,
Expand Down Expand Up @@ -471,11 +471,6 @@ def _get_normalization_tables_for_schema(
return out


def _clean_name(name: str) -> str:
"""Cleans an input to be a valid Dagster asset name."""
return re.sub(r"[^a-z0-9]+", "_", name.lower())


class AirbyteConnectionMetadata(
NamedTuple(
"_AirbyteConnectionMetadata",
Expand Down Expand Up @@ -917,7 +912,7 @@ def load_assets_from_airbyte_instance(
workspace_id: Optional[str] = None,
key_prefix: Optional[CoercibleToAssetKeyPrefix] = None,
create_assets_for_normalization_tables: bool = True,
connection_to_group_fn: Optional[Callable[[str], Optional[str]]] = _clean_name,
connection_to_group_fn: Optional[Callable[[str], Optional[str]]] = clean_name,
connection_meta_to_group_fn: Optional[
Callable[[AirbyteConnectionMetadata], Optional[str]]
] = None,
Expand Down Expand Up @@ -1022,7 +1017,7 @@ def load_assets_from_airbyte_instance(
check.invariant(
not connection_meta_to_group_fn
or not connection_to_group_fn
or connection_to_group_fn == _clean_name,
or connection_to_group_fn == clean_name,
"Cannot specify both connection_meta_to_group_fn and connection_to_group_fn",
)

Expand Down Expand Up @@ -1143,8 +1138,8 @@ def get_asset_spec(self, props: AirbyteConnectionTableProps) -> dg.AssetSpec:
@airbyte_assets(
connection_id=connection_id,
workspace=workspace,
name=_clean_name(connection_name),
group_name=_clean_name(connection_name),
name=clean_name(connection_name),
group_name=clean_name(connection_name),
dagster_airbyte_translator=dagster_airbyte_translator,
)
def _asset_fn(context: AssetExecutionContext, airbyte: AirbyteCloudWorkspace):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
from dagster_airbyte.asset_defs import (
AirbyteConnectionMetadata,
AirbyteInstanceCacheableAssetsDefinition,
_clean_name,
)
from dagster_airbyte.managed.types import (
MANAGED_ELEMENTS_DEPRECATION_MSG,
Expand All @@ -50,7 +49,7 @@
InitializedAirbyteSource,
)
from dagster_airbyte.resources import AirbyteResource
from dagster_airbyte.utils import is_basic_normalization_operation
from dagster_airbyte.utils import clean_name, is_basic_normalization_operation


def gen_configured_stream_json(
Expand Down Expand Up @@ -746,7 +745,7 @@ def load_assets_from_connections(
connections: Iterable[AirbyteConnection],
key_prefix: Optional[CoercibleToAssetKeyPrefix] = None,
create_assets_for_normalization_tables: bool = True,
connection_to_group_fn: Optional[Callable[[str], Optional[str]]] = _clean_name,
connection_to_group_fn: Optional[Callable[[str], Optional[str]]] = clean_name,
connection_meta_to_group_fn: Optional[
Callable[[AirbyteConnectionMetadata], Optional[str]]
] = None,
Expand Down Expand Up @@ -821,7 +820,7 @@ def load_assets_from_connections(
check.invariant(
not connection_meta_to_group_fn
or not connection_to_group_fn
or connection_to_group_fn == _clean_name,
or connection_to_group_fn == clean_name,
"Cannot specify both connection_meta_to_group_fn and connection_to_group_fn",
)

Expand Down
116 changes: 107 additions & 9 deletions python_modules/libraries/dagster-airbyte/dagster_airbyte/resources.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,12 @@
import requests
from dagster import (
AssetExecutionContext,
AssetMaterialization,
ConfigurableResource,
Definitions,
Failure,
InitResourceContext,
MaterializeResult,
_check as check,
get_dagster_logger,
resource,
Expand All @@ -33,13 +35,20 @@

from dagster_airbyte.translator import (
AirbyteConnection,
AirbyteConnectionTableProps,
AirbyteDestination,
AirbyteJob,
AirbyteJobStatusType,
AirbyteMetadataSet,
AirbyteWorkspaceData,
DagsterAirbyteTranslator,
)
from dagster_airbyte.types import AirbyteOutput
from dagster_airbyte.utils import (
DAGSTER_AIRBYTE_TRANSLATOR_METADATA_KEY,
get_airbyte_connection_table_name,
get_translator_from_airbyte_assets,
)

AIRBYTE_REST_API_BASE = "https://api.airbyte.com"
AIRBYTE_REST_API_VERSION = "v1"
Expand Down Expand Up @@ -1211,8 +1220,90 @@ def load_asset_specs(
workspace=self, dagster_airbyte_translator=dagster_airbyte_translator
)

def _generate_materialization(
self,
airbyte_output: AirbyteOutput,
dagster_airbyte_translator: DagsterAirbyteTranslator,
):
connection = AirbyteConnection.from_connection_details(
connection_details=airbyte_output.connection_details
)

for stream in connection.streams.values():
if stream.selected:
connection_table_name = get_airbyte_connection_table_name(
stream_prefix=connection.stream_prefix,
stream_name=stream.name,
)
stream_asset_spec = dagster_airbyte_translator.get_asset_spec(
props=AirbyteConnectionTableProps(
table_name=connection_table_name,
stream_prefix=connection.stream_prefix,
stream_name=stream.name,
json_schema=stream.json_schema,
connection_id=connection.id,
connection_name=connection.name,
destination_type=None,
database=None,
schema=None,
)
)

yield AssetMaterialization(
asset_key=stream_asset_spec.key,
description=(
f"Table generated via Airbyte Cloud sync "
f"for connection {connection.name}: {connection_table_name}"
),
metadata=stream_asset_spec.metadata,
)

@experimental
def sync_and_poll(self, context: AssetExecutionContext):
raise NotImplementedError()
"""Executes a sync and poll process to materialize Airbyte Cloud assets.
This method can only be used in the context of an asset execution.
Args:
context (AssetExecutionContext): The execution context
from within `@airbyte_assets`.
Returns:
Iterator[Union[AssetMaterialization, MaterializeResult]]: An iterator of MaterializeResult
or AssetMaterialization.
"""
assets_def = context.assets_def
dagster_airbyte_translator = get_translator_from_airbyte_assets(assets_def)
connection_id = next(
check.not_none(AirbyteMetadataSet.extract(spec.metadata).connection_id)
for spec in assets_def.specs
)

client = self.get_client()
airbyte_output = client.sync_and_poll(
connection_id=connection_id,
)

materialized_asset_keys = set()
for materialization in self._generate_materialization(
airbyte_output=airbyte_output, dagster_airbyte_translator=dagster_airbyte_translator
):
# Scan through all tables actually created, if it was expected then emit a MaterializeResult.
# Otherwise, emit a runtime AssetMaterialization.
if materialization.asset_key in context.selected_asset_keys:
yield MaterializeResult(
asset_key=materialization.asset_key, metadata=materialization.metadata
)
materialized_asset_keys.add(materialization.asset_key)
else:
context.log.warning(
f"An unexpected asset was materialized: {materialization.asset_key}. "
f"Yielding a materialization event."
)
yield materialization

unmaterialized_asset_keys = context.selected_asset_keys - materialized_asset_keys
if unmaterialized_asset_keys:
context.log.warning(f"Assets were not materialized: {unmaterialized_asset_keys}")


@experimental
Expand Down Expand Up @@ -1250,16 +1341,23 @@ def load_airbyte_cloud_asset_specs(
airbyte_cloud_specs = load_airbyte_cloud_asset_specs(airbyte_cloud_workspace)
defs = dg.Definitions(assets=airbyte_cloud_specs)
"""
dagster_airbyte_translator = dagster_airbyte_translator or DagsterAirbyteTranslator()

with workspace.process_config_and_initialize_cm() as initialized_workspace:
return check.is_list(
AirbyteCloudWorkspaceDefsLoader(
workspace=initialized_workspace,
translator=dagster_airbyte_translator or DagsterAirbyteTranslator(),
return [
spec.merge_attributes(
metadata={DAGSTER_AIRBYTE_TRANSLATOR_METADATA_KEY: dagster_airbyte_translator}
)
.build_defs()
.assets,
AssetSpec,
)
for spec in check.is_list(
AirbyteCloudWorkspaceDefsLoader(
workspace=initialized_workspace,
translator=dagster_airbyte_translator,
)
.build_defs()
.assets,
AssetSpec,
)
]


@record
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ class AirbyteConnectionTableProps:
json_schema: Mapping[str, Any]
connection_id: str
connection_name: str
destination_type: str
destination_type: Optional[str]
database: Optional[str]
schema: Optional[str]

Expand Down Expand Up @@ -231,5 +231,5 @@ def get_asset_spec(self, props: AirbyteConnectionTableProps) -> AssetSpec:
return AssetSpec(
key=AssetKey(props.table_name),
metadata=metadata,
kinds={"airbyte", props.destination_type},
kinds={"airbyte", *({props.destination_type} if props.destination_type else set())},
)
35 changes: 33 additions & 2 deletions python_modules/libraries/dagster-airbyte/dagster_airbyte/utils.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,26 @@
from typing import Any, Iterator, Mapping, Optional, Sequence
import re
from typing import TYPE_CHECKING, Any, Iterator, Mapping, Optional, Sequence

from dagster import AssetMaterialization, MetadataValue
from dagster import (
AssetMaterialization,
AssetsDefinition,
DagsterInvariantViolationError,
MetadataValue,
)
from dagster._core.definitions.metadata.table import TableColumn, TableSchema

from dagster_airbyte.types import AirbyteOutput

if TYPE_CHECKING:
from dagster_airbyte import DagsterAirbyteTranslator

DAGSTER_AIRBYTE_TRANSLATOR_METADATA_KEY = "dagster-airbyte/dagster_airbyte_translator"


def clean_name(name: str) -> str:
"""Cleans an input to be a valid Dagster asset name."""
return re.sub(r"[^a-z0-9]+", "_", name.lower())


def get_airbyte_connection_table_name(stream_prefix: Optional[str], stream_name: str) -> str:
return f"{stream_prefix if stream_prefix else ''}{stream_name}"
Expand Down Expand Up @@ -78,3 +94,18 @@ def generate_materializations(
all_stream_stats.get(stream_name, {}),
asset_key_prefix=asset_key_prefix,
)


def get_translator_from_airbyte_assets(
airbyte_assets: AssetsDefinition,
) -> "DagsterAirbyteTranslator":
metadata_by_key = airbyte_assets.metadata_by_key or {}
first_asset_key = next(iter(airbyte_assets.metadata_by_key.keys()))
first_metadata = metadata_by_key.get(first_asset_key, {})
dagster_airbyte_translator = first_metadata.get(DAGSTER_AIRBYTE_TRANSLATOR_METADATA_KEY)
if dagster_airbyte_translator is None:
raise DagsterInvariantViolationError(
f"Expected to find airbyte translator metadata on asset {first_asset_key.to_user_string()},"
" but did not. Did you pass in assets that weren't generated by @airbyte_assets?"
)
return dagster_airbyte_translator
Loading

0 comments on commit 5ee51b7

Please sign in to comment.