From e1ec6de68e405b6cf2fc8b97ec731036e420e9e2 Mon Sep 17 00:00:00 2001 From: Maxime Armstrong Date: Thu, 12 Dec 2024 00:40:42 -0500 Subject: [PATCH] [dagster-airbyte] Implement airbyte_assets and build_airbyte_assets_definitions --- .../dagster_airbyte/asset_decorator.py | 115 ++++++++++++++++++ 1 file changed, 115 insertions(+) create mode 100644 python_modules/libraries/dagster-airbyte/dagster_airbyte/asset_decorator.py diff --git a/python_modules/libraries/dagster-airbyte/dagster_airbyte/asset_decorator.py b/python_modules/libraries/dagster-airbyte/dagster_airbyte/asset_decorator.py new file mode 100644 index 0000000000000..59b99e807d8f4 --- /dev/null +++ b/python_modules/libraries/dagster-airbyte/dagster_airbyte/asset_decorator.py @@ -0,0 +1,115 @@ +from typing import Any, Callable, Optional + +from dagster import AssetsDefinition, multi_asset +from dagster._annotations import experimental + +from dagster_airbyte.resources import AirbyteCloudResource +from dagster_airbyte.translator import AirbyteMetadataSet, DagsterAirbyteTranslator + + +@experimental +def airbyte_assets( + *, + connection_id: str, + workspace: AirbyteCloudResource, + name: Optional[str] = None, + group_name: Optional[str] = None, + dagster_airbyte_translator: Optional[DagsterAirbyteTranslator] = None, +) -> Callable[[Callable[..., Any]], AssetsDefinition]: + """Create a definition for how to sync the tables of a given Airbyte connection. + + Args: + connection_id (str): The Airbyte Connection ID. + workspace (AirbyteCloudWorkspace): The Airbyte workspace to fetch assets from. + name (Optional[str], optional): The name of the op. + group_name (Optional[str], optional): The name of the asset group. + dagster_airbyte_translator (Optional[DagsterAirbyteTranslator], optional): The translator to use + to convert Airbyte content into :py:class:`dagster.AssetSpec`. + Defaults to :py:class:`DagsterAirbyteTranslator`. + + Examples: + Sync the tables of an Airbyte connection: + + .. code-block:: python + + from dagster_airbyte import AirbyteCloudWorkspace, airbyte_assets + + import dagster as dg + + airbyte_workspace = AirbyteCloudWorkspace( + workspace_id=dg.EnvVar("AIRBYTE_CLOUD_WORKSPACE_ID"), + client_id=dg.EnvVar("AIRBYTE_CLOUD_CLIENT_ID"), + client_secret=dg.EnvVar("AIRBYTE_CLOUD_CLIENT_SECRET"), + ) + + + @airbyte_assets( + connection_id="airbyte_connection_id", + name="airbyte_connection_id", + group_name="airbyte_connection_id", + workspace=airbyte_workspace, + ) + def airbyte_connection_assets(context: dg.AssetExecutionContext, airbyte: AirbyteCloudWorkspace): + yield from airbyte.sync_and_poll(context=context) + + + defs = dg.Definitions( + assets=[airbyte_connection_assets], + resources={"airbyte": airbyte_workspace}, + ) + + Sync the tables of an Airbyte connection with a custom translator: + + .. code-block:: python + + from dagster_airbyte import ( + DagsterAirbyteTranslator, + AirbyteConnectionTableProps, + AirbyteCloudWorkspace, + airbyte_assets + ) + + import dagster as dg + + class CustomDagsterAirbyteTranslator(DagsterAirbyteTranslator): + def get_asset_spec(self, props: AirbyteConnectionTableProps) -> dg.AssetSpec: + default_spec = super().get_asset_spec(props) + return default_spec.replace_attributes( + key=asset_spec.key.with_prefix("my_prefix"), + ) + + airbyte_workspace = AirbyteCloudWorkspace( + workspace_id=dg.EnvVar("AIRBYTE_CLOUD_WORKSPACE_ID"), + client_id=dg.EnvVar("AIRBYTE_CLOUD_CLIENT_ID"), + client_secret=dg.EnvVar("AIRBYTE_CLOUD_CLIENT_SECRET"), + ) + + + @airbyte_assets( + connection_id="airbyte_connection_id", + name="airbyte_connection_id", + group_name="airbyte_connection_id", + workspace=airbyte_workspace, + dagster_airbyte_translator=CustomDagsterAirbyteTranslator() + ) + def airbyte_connection_assets(context: dg.AssetExecutionContext, airbyte: AirbyteCloudWorkspace): + yield from airbyte.sync_and_poll(context=context) + + + defs = dg.Definitions( + assets=[airbyte_connection_assets], + resources={"airbyte": airbyte_workspace}, + ) + """ + return multi_asset( + name=name, + group_name=group_name, + can_subset=False, + specs=[ + spec + for spec in workspace.load_asset_specs( + dagster_airbyte_translator=dagster_airbyte_translator or DagsterAirbyteTranslator() + ) + if AirbyteMetadataSet.extract(spec.metadata).connection_id == connection_id + ], + )