From 33bb5204181388a7c53e70dd15de9ff3c05cc8a9 Mon Sep 17 00:00:00 2001 From: Maxime Armstrong Date: Thu, 12 Dec 2024 00:40:42 -0500 Subject: [PATCH 1/5] [dagster-airbyte] Implement airbyte_assets and build_airbyte_assets_definitions --- .../dagster_airbyte/asset_decorator.py | 115 ++++++++++++++++++ 1 file changed, 115 insertions(+) create mode 100644 python_modules/libraries/dagster-airbyte/dagster_airbyte/asset_decorator.py diff --git a/python_modules/libraries/dagster-airbyte/dagster_airbyte/asset_decorator.py b/python_modules/libraries/dagster-airbyte/dagster_airbyte/asset_decorator.py new file mode 100644 index 0000000000000..59b99e807d8f4 --- /dev/null +++ b/python_modules/libraries/dagster-airbyte/dagster_airbyte/asset_decorator.py @@ -0,0 +1,115 @@ +from typing import Any, Callable, Optional + +from dagster import AssetsDefinition, multi_asset +from dagster._annotations import experimental + +from dagster_airbyte.resources import AirbyteCloudResource +from dagster_airbyte.translator import AirbyteMetadataSet, DagsterAirbyteTranslator + + +@experimental +def airbyte_assets( + *, + connection_id: str, + workspace: AirbyteCloudResource, + name: Optional[str] = None, + group_name: Optional[str] = None, + dagster_airbyte_translator: Optional[DagsterAirbyteTranslator] = None, +) -> Callable[[Callable[..., Any]], AssetsDefinition]: + """Create a definition for how to sync the tables of a given Airbyte connection. + + Args: + connection_id (str): The Airbyte Connection ID. + workspace (AirbyteCloudWorkspace): The Airbyte workspace to fetch assets from. + name (Optional[str], optional): The name of the op. + group_name (Optional[str], optional): The name of the asset group. + dagster_airbyte_translator (Optional[DagsterAirbyteTranslator], optional): The translator to use + to convert Airbyte content into :py:class:`dagster.AssetSpec`. + Defaults to :py:class:`DagsterAirbyteTranslator`. + + Examples: + Sync the tables of an Airbyte connection: + + .. code-block:: python + + from dagster_airbyte import AirbyteCloudWorkspace, airbyte_assets + + import dagster as dg + + airbyte_workspace = AirbyteCloudWorkspace( + workspace_id=dg.EnvVar("AIRBYTE_CLOUD_WORKSPACE_ID"), + client_id=dg.EnvVar("AIRBYTE_CLOUD_CLIENT_ID"), + client_secret=dg.EnvVar("AIRBYTE_CLOUD_CLIENT_SECRET"), + ) + + + @airbyte_assets( + connection_id="airbyte_connection_id", + name="airbyte_connection_id", + group_name="airbyte_connection_id", + workspace=airbyte_workspace, + ) + def airbyte_connection_assets(context: dg.AssetExecutionContext, airbyte: AirbyteCloudWorkspace): + yield from airbyte.sync_and_poll(context=context) + + + defs = dg.Definitions( + assets=[airbyte_connection_assets], + resources={"airbyte": airbyte_workspace}, + ) + + Sync the tables of an Airbyte connection with a custom translator: + + .. code-block:: python + + from dagster_airbyte import ( + DagsterAirbyteTranslator, + AirbyteConnectionTableProps, + AirbyteCloudWorkspace, + airbyte_assets + ) + + import dagster as dg + + class CustomDagsterAirbyteTranslator(DagsterAirbyteTranslator): + def get_asset_spec(self, props: AirbyteConnectionTableProps) -> dg.AssetSpec: + default_spec = super().get_asset_spec(props) + return default_spec.replace_attributes( + key=asset_spec.key.with_prefix("my_prefix"), + ) + + airbyte_workspace = AirbyteCloudWorkspace( + workspace_id=dg.EnvVar("AIRBYTE_CLOUD_WORKSPACE_ID"), + client_id=dg.EnvVar("AIRBYTE_CLOUD_CLIENT_ID"), + client_secret=dg.EnvVar("AIRBYTE_CLOUD_CLIENT_SECRET"), + ) + + + @airbyte_assets( + connection_id="airbyte_connection_id", + name="airbyte_connection_id", + group_name="airbyte_connection_id", + workspace=airbyte_workspace, + dagster_airbyte_translator=CustomDagsterAirbyteTranslator() + ) + def airbyte_connection_assets(context: dg.AssetExecutionContext, airbyte: AirbyteCloudWorkspace): + yield from airbyte.sync_and_poll(context=context) + + + defs = dg.Definitions( + assets=[airbyte_connection_assets], + resources={"airbyte": airbyte_workspace}, + ) + """ + return multi_asset( + name=name, + group_name=group_name, + can_subset=False, + specs=[ + spec + for spec in workspace.load_asset_specs( + dagster_airbyte_translator=dagster_airbyte_translator or DagsterAirbyteTranslator() + ) + if AirbyteMetadataSet.extract(spec.metadata).connection_id == connection_id + ], + ) From b8d1e564fd8b496b09e339e759e6e421b89ecf70 Mon Sep 17 00:00:00 2001 From: Maxime Armstrong Date: Tue, 17 Dec 2024 18:09:52 -0500 Subject: [PATCH 2/5] Add factory, cached method and tests --- .../dagster_airbyte/__init__.py | 3 + .../dagster_airbyte/asset_decorator.py | 4 +- .../dagster_airbyte/asset_defs.py | 120 +++++++++++++++++- .../dagster_airbyte/resources.py | 47 ++++++- .../experimental/conftest.py | 2 + .../experimental/test_asset_specs.py | 118 ++++++++++++++++- .../experimental/test_translator.py | 3 +- 7 files changed, 290 insertions(+), 7 deletions(-) diff --git a/python_modules/libraries/dagster-airbyte/dagster_airbyte/__init__.py b/python_modules/libraries/dagster-airbyte/dagster_airbyte/__init__.py index 8c8a8c72495d6..f1d2b295944f8 100644 --- a/python_modules/libraries/dagster-airbyte/dagster_airbyte/__init__.py +++ b/python_modules/libraries/dagster-airbyte/dagster_airbyte/__init__.py @@ -14,8 +14,10 @@ except ImportError: pass +from dagster_airbyte.asset_decorator import airbyte_assets as airbyte_assets from dagster_airbyte.asset_defs import ( build_airbyte_assets as build_airbyte_assets, + build_airbyte_assets_definitions as build_airbyte_assets_definitions, load_assets_from_airbyte_instance as load_assets_from_airbyte_instance, ) from dagster_airbyte.ops import airbyte_sync_op as airbyte_sync_op @@ -28,6 +30,7 @@ load_airbyte_cloud_asset_specs as load_airbyte_cloud_asset_specs, ) from dagster_airbyte.translator import ( + AirbyteConnectionTableProps as AirbyteConnectionTableProps, AirbyteJobStatusType as AirbyteJobStatusType, AirbyteState as AirbyteState, DagsterAirbyteTranslator as DagsterAirbyteTranslator, diff --git a/python_modules/libraries/dagster-airbyte/dagster_airbyte/asset_decorator.py b/python_modules/libraries/dagster-airbyte/dagster_airbyte/asset_decorator.py index 59b99e807d8f4..d03ef85395124 100644 --- a/python_modules/libraries/dagster-airbyte/dagster_airbyte/asset_decorator.py +++ b/python_modules/libraries/dagster-airbyte/dagster_airbyte/asset_decorator.py @@ -3,7 +3,7 @@ from dagster import AssetsDefinition, multi_asset from dagster._annotations import experimental -from dagster_airbyte.resources import AirbyteCloudResource +from dagster_airbyte.resources import AirbyteCloudWorkspace from dagster_airbyte.translator import AirbyteMetadataSet, DagsterAirbyteTranslator @@ -11,7 +11,7 @@ def airbyte_assets( *, connection_id: str, - workspace: AirbyteCloudResource, + workspace: AirbyteCloudWorkspace, name: Optional[str] = None, group_name: Optional[str] = None, dagster_airbyte_translator: Optional[DagsterAirbyteTranslator] = None, diff --git a/python_modules/libraries/dagster-airbyte/dagster_airbyte/asset_defs.py b/python_modules/libraries/dagster-airbyte/dagster_airbyte/asset_defs.py index f982d71748a10..fe55c2c4f3193 100644 --- a/python_modules/libraries/dagster-airbyte/dagster_airbyte/asset_defs.py +++ b/python_modules/libraries/dagster-airbyte/dagster_airbyte/asset_defs.py @@ -23,6 +23,7 @@ import yaml from dagster import ( + AssetExecutionContext, AssetKey, AssetOut, AutoMaterializePolicy, @@ -33,6 +34,7 @@ SourceAsset, _check as check, ) +from dagster._annotations import experimental from dagster._core.definitions import AssetsDefinition, multi_asset from dagster._core.definitions.cacheable_assets import ( AssetsDefinitionCacheableData, @@ -45,7 +47,14 @@ from dagster._core.execution.context.init import build_init_resource_context from dagster._utils.merger import merge_dicts -from dagster_airbyte.resources import AirbyteCloudResource, AirbyteResource, BaseAirbyteResource +from dagster_airbyte.asset_decorator import airbyte_assets +from dagster_airbyte.resources import ( + AirbyteCloudResource, + AirbyteCloudWorkspace, + AirbyteResource, + BaseAirbyteResource, +) +from dagster_airbyte.translator import AirbyteMetadataSet, DagsterAirbyteTranslator from dagster_airbyte.types import AirbyteTableMetadata from dagster_airbyte.utils import ( generate_materializations, @@ -1032,3 +1041,112 @@ def load_assets_from_airbyte_instance( connection_to_freshness_policy_fn=connection_to_freshness_policy_fn, connection_to_auto_materialize_policy_fn=connection_to_auto_materialize_policy_fn, ) + + +# ----------------------- +# Reworked assets factory +# ----------------------- + + +@experimental +def build_airbyte_assets_definitions( + *, + workspace: AirbyteCloudWorkspace, + dagster_airbyte_translator: Optional[DagsterAirbyteTranslator] = None, +) -> Sequence[AssetsDefinition]: + """The list of AssetsDefinition for all connections in the Airbyte workspace. + + Args: + workspace (AirbyteCloudWorkspace): The Airbyte workspace to fetch assets from. + dagster_airbyte_translator (Optional[DagsterAirbyteTranslator], optional): The translator to use + to convert Airbyte content into :py:class:`dagster.AssetSpec`. + Defaults to :py:class:`DagsterAirbyteTranslator`. + + Returns: + List[AssetsDefinition]: The list of AssetsDefinition for all connections in the Airbyte workspace. + + Examples: + Sync the tables of a Airbyte connection: + .. code-block:: python + + from dagster_airbyte import AirbyteCloudWorkspace, build_airbyte_assets_definitions + + import dagster as dg + + airbyte_workspace = AirbyteCloudWorkspace( + workspace_id=dg.EnvVar("AIRBYTE_CLOUD_WORKSPACE_ID"), + client_id=dg.EnvVar("AIRBYTE_CLOUD_CLIENT_ID"), + client_secret=dg.EnvVar("AIRBYTE_CLOUD_CLIENT_SECRET"), + ) + + + airbyte_assets = build_airbyte_assets_definitions(workspace=workspace) + + defs = dg.Definitions( + assets=airbyte_assets, + resources={"airbyte": airbyte_workspace}, + ) + + Sync the tables of a Airbyte connection with a custom translator: + .. code-block:: python + + from dagster_airbyte import ( + DagsterAirbyteTranslator, + AirbyteConnectionTableProps, + AirbyteCloudWorkspace, + build_airbyte_assets_definitions + ) + + import dagster as dg + + class CustomDagsterAirbyteTranslator(DagsterAirbyteTranslator): + def get_asset_spec(self, props: AirbyteConnectionTableProps) -> dg.AssetSpec: + default_spec = super().get_asset_spec(props) + return default_spec.replace_attributes( + key=asset_spec.key.with_prefix("my_prefix"), + ) + + airbyte_workspace = AirbyteCloudWorkspace( + workspace_id=dg.EnvVar("AIRBYTE_CLOUD_WORKSPACE_ID"), + client_id=dg.EnvVar("AIRBYTE_CLOUD_CLIENT_ID"), + client_secret=dg.EnvVar("AIRBYTE_CLOUD_CLIENT_SECRET"), + ) + + + airbyte_assets = build_airbyte_assets_definitions( + workspace=workspace, + dagster_airbyte_translator=CustomDagsterAirbyteTranslator() + ) + + defs = dg.Definitions( + assets=airbyte_assets, + resources={"airbyte": airbyte_workspace}, + ) + """ + dagster_airbyte_translator = dagster_airbyte_translator or DagsterAirbyteTranslator() + + all_asset_specs = workspace.load_asset_specs( + dagster_airbyte_translator=dagster_airbyte_translator + ) + + connection_ids = { + check.not_none(AirbyteMetadataSet.extract(spec.metadata).connection_id) + for spec in all_asset_specs + } + + _asset_fns = [] + for connection_id in connection_ids: + + @airbyte_assets( + connection_id=connection_id, + workspace=workspace, + name=_clean_name(connection_id), + group_name=_clean_name(connection_id), + dagster_airbyte_translator=dagster_airbyte_translator, + ) + def _asset_fn(context: AssetExecutionContext, airbyte: AirbyteCloudWorkspace): + yield from airbyte.sync_and_poll(context=context) + + _asset_fns.append(_asset_fn) + + return _asset_fns diff --git a/python_modules/libraries/dagster-airbyte/dagster_airbyte/resources.py b/python_modules/libraries/dagster-airbyte/dagster_airbyte/resources.py index 33c30737640ed..ea6730cbe046e 100644 --- a/python_modules/libraries/dagster-airbyte/dagster_airbyte/resources.py +++ b/python_modules/libraries/dagster-airbyte/dagster_airbyte/resources.py @@ -6,14 +6,16 @@ from abc import abstractmethod from contextlib import contextmanager from datetime import datetime, timedelta -from typing import Any, Dict, List, Mapping, Optional, Sequence, cast +from typing import Any, Dict, List, Mapping, Optional, Sequence, Union, cast import requests from dagster import ( + AssetExecutionContext, ConfigurableResource, Definitions, Failure, InitResourceContext, + OpExecutionContext, _check as check, get_dagster_logger, resource, @@ -1172,6 +1174,49 @@ def fetch_airbyte_workspace_data( destinations_by_id=destinations_by_id, ) + @cached_method + def load_asset_specs( + self, + dagster_airbyte_translator: Optional[DagsterAirbyteTranslator] = None, + ) -> Sequence[AssetSpec]: + """Returns a list of AssetSpecs representing the Airbyte content in the workspace. + + Args: + dagster_airbyte_translator (Optional[DagsterAirbyteTranslator], optional): The translator to use + to convert Airbyte content into :py:class:`dagster.AssetSpec`. + Defaults to :py:class:`DagsterAirbyteTranslator`. + + Returns: + List[AssetSpec]: The set of assets representing the Airbyte content in the workspace. + + Examples: + Loading the asset specs for a given Airbyte workspace: + .. code-block:: python + + from dagster_airbyte import AirbyteCloudWorkspace + + import dagster as dg + + airbyte_workspace = AirbyteCloudWorkspace( + workspace_id=dg.EnvVar("AIRBYTE_CLOUD_WORKSPACE_ID"), + client_id=dg.EnvVar("AIRBYTE_CLOUD_CLIENT_ID"), + client_secret=dg.EnvVar("AIRBYTE_CLOUD_CLIENT_SECRET"), + ) + + airbyte_specs = airbyte_workspace.load_asset_specs() + defs = dg.Definitions(assets=airbyte_specs, resources={"airbyte": airbyte_workspace} + """ + dagster_airbyte_translator = dagster_airbyte_translator or DagsterAirbyteTranslator() + + return load_airbyte_cloud_asset_specs( + workspace=self, dagster_airbyte_translator=dagster_airbyte_translator + ) + + def sync_and_poll( + self, context: Optional[Union[OpExecutionContext, AssetExecutionContext]] = None + ): + raise NotImplementedError() + @experimental def load_airbyte_cloud_asset_specs( diff --git a/python_modules/libraries/dagster-airbyte/dagster_airbyte_tests/experimental/conftest.py b/python_modules/libraries/dagster-airbyte/dagster_airbyte_tests/experimental/conftest.py index 8d3169402d399..3efd9ac841041 100644 --- a/python_modules/libraries/dagster-airbyte/dagster_airbyte_tests/experimental/conftest.py +++ b/python_modules/libraries/dagster-airbyte/dagster_airbyte_tests/experimental/conftest.py @@ -14,6 +14,8 @@ TEST_CLIENT_ID = "some_client_id" TEST_CLIENT_SECRET = "some_client_secret" +TEST_ANOTHER_WORKSPACE_ID = "some_other_workspace_id" + TEST_ACCESS_TOKEN = "some_access_token" # Taken from the examples in the Airbyte REST API documentation diff --git a/python_modules/libraries/dagster-airbyte/dagster_airbyte_tests/experimental/test_asset_specs.py b/python_modules/libraries/dagster-airbyte/dagster_airbyte_tests/experimental/test_asset_specs.py index 5c812fb348aaf..dc29adc0eb2ac 100644 --- a/python_modules/libraries/dagster-airbyte/dagster_airbyte_tests/experimental/test_asset_specs.py +++ b/python_modules/libraries/dagster-airbyte/dagster_airbyte_tests/experimental/test_asset_specs.py @@ -1,11 +1,25 @@ import responses from dagster._config.field_utils import EnvVar +from dagster._core.definitions.asset_spec import AssetSpec +from dagster._core.definitions.tags import has_kind from dagster._core.test_utils import environ -from dagster_airbyte import AirbyteCloudWorkspace, load_airbyte_cloud_asset_specs +from dagster_airbyte import ( + AirbyteCloudWorkspace, + build_airbyte_assets_definitions, + load_airbyte_cloud_asset_specs, +) +from dagster_airbyte.translator import ( + AirbyteConnectionTableProps, + AirbyteMetadataSet, + DagsterAirbyteTranslator, +) from dagster_airbyte_tests.experimental.conftest import ( + TEST_ANOTHER_WORKSPACE_ID, TEST_CLIENT_ID, TEST_CLIENT_SECRET, + TEST_CONNECTION_ID, + TEST_DESTINATION_TYPE, TEST_WORKSPACE_ID, ) @@ -46,3 +60,105 @@ def test_translator_spec( # Test the asset key for the connection table the_asset_key = next(iter(all_assets_keys)) assert the_asset_key.path == ["test_prefix_test_stream"] + + first_asset_metadata = next(asset.metadata for asset in all_assets) + assert AirbyteMetadataSet.extract(first_asset_metadata).connection_id == TEST_CONNECTION_ID + + +def test_cached_load_spec_single_resource( + fetch_workspace_data_api_mocks: responses.RequestsMock, +) -> None: + with environ( + {"AIRBYTE_CLIENT_ID": TEST_CLIENT_ID, "AIRBYTE_CLIENT_SECRET": TEST_CLIENT_SECRET} + ): + workspace = AirbyteCloudWorkspace( + workspace_id=TEST_WORKSPACE_ID, + client_id=EnvVar("AIRBYTE_CLIENT_ID"), + client_secret=EnvVar("AIRBYTE_CLIENT_SECRET"), + ) + + # load asset specs a first time + workspace.load_asset_specs() + assert len(fetch_workspace_data_api_mocks.calls) == 4 + + # load asset specs a first time, no additional calls are made + workspace.load_asset_specs() + assert len(fetch_workspace_data_api_mocks.calls) == 4 + + +def test_cached_load_spec_multiple_resources( + fetch_workspace_data_api_mocks: responses.RequestsMock, +) -> None: + with environ( + {"AIRBYTE_CLIENT_ID": TEST_CLIENT_ID, "AIRBYTE_CLIENT_SECRET": TEST_CLIENT_SECRET} + ): + workspace = AirbyteCloudWorkspace( + workspace_id=TEST_WORKSPACE_ID, + client_id=EnvVar("AIRBYTE_CLIENT_ID"), + client_secret=EnvVar("AIRBYTE_CLIENT_SECRET"), + ) + + another_workspace = AirbyteCloudWorkspace( + workspace_id=TEST_ANOTHER_WORKSPACE_ID, + client_id=EnvVar("AIRBYTE_CLIENT_ID"), + client_secret=EnvVar("AIRBYTE_CLIENT_SECRET"), + ) + + # load asset specs with a resource + workspace.load_asset_specs() + assert len(fetch_workspace_data_api_mocks.calls) == 4 + + # load asset specs with another resource, + # additional calls are made to load its specs + another_workspace.load_asset_specs() + assert len(fetch_workspace_data_api_mocks.calls) == 4 + 4 + + +def test_cached_load_spec_with_asset_factory( + fetch_workspace_data_api_mocks: responses.RequestsMock, +) -> None: + with environ( + {"AIRBYTE_CLIENT_ID": TEST_CLIENT_ID, "AIRBYTE_CLIENT_SECRET": TEST_CLIENT_SECRET} + ): + workspace = AirbyteCloudWorkspace( + workspace_id=TEST_WORKSPACE_ID, + client_id=EnvVar("AIRBYTE_CLIENT_ID"), + client_secret=EnvVar("AIRBYTE_CLIENT_SECRET"), + ) + + # build_airbyte_assets_definitions calls workspace.load_asset_specs to get the connection IDs, + # then workspace.load_asset_specs is called once per connection ID in airbyte_assets, + # but the four calls to the API are only made once. + build_airbyte_assets_definitions(workspace=workspace) + assert len(fetch_workspace_data_api_mocks.calls) == 4 + + +class MyCustomTranslator(DagsterAirbyteTranslator): + def get_asset_spec(self, data: AirbyteConnectionTableProps) -> AssetSpec: + default_spec = super().get_asset_spec(data) + return default_spec.replace_attributes( + key=default_spec.key.with_prefix("test_connection"), + ).merge_attributes(metadata={"custom": "metadata"}) + + +def test_translator_custom_metadata( + fetch_workspace_data_api_mocks: responses.RequestsMock, +) -> None: + with environ( + {"AIRBYTE_CLIENT_ID": TEST_CLIENT_ID, "AIRBYTE_CLIENT_SECRET": TEST_CLIENT_SECRET} + ): + workspace = AirbyteCloudWorkspace( + workspace_id=TEST_WORKSPACE_ID, + client_id=EnvVar("AIRBYTE_CLIENT_ID"), + client_secret=EnvVar("AIRBYTE_CLIENT_SECRET"), + ) + all_asset_specs = workspace.load_asset_specs( + dagster_airbyte_translator=MyCustomTranslator() + ) + asset_spec = next(spec for spec in all_asset_specs) + + assert "custom" in asset_spec.metadata + assert asset_spec.metadata["custom"] == "metadata" + assert asset_spec.key.path == ["test_connection", "test_prefix_test_stream"] + assert has_kind(asset_spec.tags, "airbyte") + assert has_kind(asset_spec.tags, TEST_DESTINATION_TYPE) diff --git a/python_modules/libraries/dagster-airbyte/dagster_airbyte_tests/experimental/test_translator.py b/python_modules/libraries/dagster-airbyte/dagster_airbyte_tests/experimental/test_translator.py index 2b3c4ccbea6a5..70996ec48eb42 100644 --- a/python_modules/libraries/dagster-airbyte/dagster_airbyte_tests/experimental/test_translator.py +++ b/python_modules/libraries/dagster-airbyte/dagster_airbyte_tests/experimental/test_translator.py @@ -78,8 +78,7 @@ def get_asset_spec(self, props: AirbyteConnectionTableProps) -> AssetSpec: default_spec = super().get_asset_spec(props) return default_spec.replace_attributes( key=default_spec.key.with_prefix("test_connection"), - metadata={**default_spec.metadata, "custom": "metadata"}, - ) + ).merge_attributes(metadata={"custom": "metadata"}) def test_custom_translator( From 6a274c9a64c9ff03a5f50482a0eaba82275ae641 Mon Sep 17 00:00:00 2001 From: Maxime Armstrong Date: Wed, 18 Dec 2024 20:10:09 -0500 Subject: [PATCH 3/5] Update post review --- .../dagster_airbyte/asset_decorator.py | 10 +++------- .../dagster_airbyte/asset_defs.py | 17 ++++++++++------- 2 files changed, 13 insertions(+), 14 deletions(-) diff --git a/python_modules/libraries/dagster-airbyte/dagster_airbyte/asset_decorator.py b/python_modules/libraries/dagster-airbyte/dagster_airbyte/asset_decorator.py index d03ef85395124..1baa7bf3c6035 100644 --- a/python_modules/libraries/dagster-airbyte/dagster_airbyte/asset_decorator.py +++ b/python_modules/libraries/dagster-airbyte/dagster_airbyte/asset_decorator.py @@ -45,8 +45,6 @@ def airbyte_assets( @airbyte_assets( connection_id="airbyte_connection_id", - name="airbyte_connection_id", - group_name="airbyte_connection_id", workspace=airbyte_workspace, ) def airbyte_connection_assets(context: dg.AssetExecutionContext, airbyte: AirbyteCloudWorkspace): @@ -74,8 +72,8 @@ def airbyte_connection_assets(context: dg.AssetExecutionContext, airbyte: Airbyt class CustomDagsterAirbyteTranslator(DagsterAirbyteTranslator): def get_asset_spec(self, props: AirbyteConnectionTableProps) -> dg.AssetSpec: default_spec = super().get_asset_spec(props) - return default_spec.replace_attributes( - key=asset_spec.key.with_prefix("my_prefix"), + return default_spec.merge_attributes( + metadata={"custom": "metadata"}, ) airbyte_workspace = AirbyteCloudWorkspace( @@ -87,8 +85,6 @@ def get_asset_spec(self, props: AirbyteConnectionTableProps) -> dg.AssetSpec: @airbyte_assets( connection_id="airbyte_connection_id", - name="airbyte_connection_id", - group_name="airbyte_connection_id", workspace=airbyte_workspace, dagster_airbyte_translator=CustomDagsterAirbyteTranslator() ) @@ -104,7 +100,7 @@ def airbyte_connection_assets(context: dg.AssetExecutionContext, airbyte: Airbyt return multi_asset( name=name, group_name=group_name, - can_subset=False, + can_subset=True, specs=[ spec for spec in workspace.load_asset_specs( diff --git a/python_modules/libraries/dagster-airbyte/dagster_airbyte/asset_defs.py b/python_modules/libraries/dagster-airbyte/dagster_airbyte/asset_defs.py index fe55c2c4f3193..6d04b782ac063 100644 --- a/python_modules/libraries/dagster-airbyte/dagster_airbyte/asset_defs.py +++ b/python_modules/libraries/dagster-airbyte/dagster_airbyte/asset_defs.py @@ -1102,8 +1102,8 @@ def build_airbyte_assets_definitions( class CustomDagsterAirbyteTranslator(DagsterAirbyteTranslator): def get_asset_spec(self, props: AirbyteConnectionTableProps) -> dg.AssetSpec: default_spec = super().get_asset_spec(props) - return default_spec.replace_attributes( - key=asset_spec.key.with_prefix("my_prefix"), + return default_spec.merge_attributes( + metadata={"custom": "metadata"}, ) airbyte_workspace = AirbyteCloudWorkspace( @@ -1129,19 +1129,22 @@ def get_asset_spec(self, props: AirbyteConnectionTableProps) -> dg.AssetSpec: dagster_airbyte_translator=dagster_airbyte_translator ) - connection_ids = { - check.not_none(AirbyteMetadataSet.extract(spec.metadata).connection_id) + connections = { + ( + check.not_none(AirbyteMetadataSet.extract(spec.metadata).connection_id), + check.not_none(AirbyteMetadataSet.extract(spec.metadata).connection_name), + ) for spec in all_asset_specs } _asset_fns = [] - for connection_id in connection_ids: + for connection_id, connection_name in connections: @airbyte_assets( connection_id=connection_id, workspace=workspace, - name=_clean_name(connection_id), - group_name=_clean_name(connection_id), + name=_clean_name(connection_name), + group_name=_clean_name(connection_name), dagster_airbyte_translator=dagster_airbyte_translator, ) def _asset_fn(context: AssetExecutionContext, airbyte: AirbyteCloudWorkspace): From e390c64e83e63da338362d3c442fa22867abaf76 Mon Sep 17 00:00:00 2001 From: Maxime Armstrong Date: Wed, 18 Dec 2024 21:36:13 -0500 Subject: [PATCH 4/5] Remove OpExecutionContext --- .../libraries/dagster-airbyte/dagster_airbyte/resources.py | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/python_modules/libraries/dagster-airbyte/dagster_airbyte/resources.py b/python_modules/libraries/dagster-airbyte/dagster_airbyte/resources.py index ea6730cbe046e..d6ba0d0ea0308 100644 --- a/python_modules/libraries/dagster-airbyte/dagster_airbyte/resources.py +++ b/python_modules/libraries/dagster-airbyte/dagster_airbyte/resources.py @@ -6,7 +6,7 @@ from abc import abstractmethod from contextlib import contextmanager from datetime import datetime, timedelta -from typing import Any, Dict, List, Mapping, Optional, Sequence, Union, cast +from typing import Any, Dict, List, Mapping, Optional, Sequence, cast import requests from dagster import ( @@ -15,7 +15,6 @@ Definitions, Failure, InitResourceContext, - OpExecutionContext, _check as check, get_dagster_logger, resource, @@ -1212,9 +1211,7 @@ def load_asset_specs( workspace=self, dagster_airbyte_translator=dagster_airbyte_translator ) - def sync_and_poll( - self, context: Optional[Union[OpExecutionContext, AssetExecutionContext]] = None - ): + def sync_and_poll(self, context: AssetExecutionContext): raise NotImplementedError() From e13a65e2516d66b024335b4f304534b304e09fd6 Mon Sep 17 00:00:00 2001 From: Maxime Armstrong Date: Thu, 26 Dec 2024 11:42:19 -0500 Subject: [PATCH 5/5] Update test comments --- .../dagster_airbyte_tests/experimental/test_asset_specs.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/python_modules/libraries/dagster-airbyte/dagster_airbyte_tests/experimental/test_asset_specs.py b/python_modules/libraries/dagster-airbyte/dagster_airbyte_tests/experimental/test_asset_specs.py index dc29adc0eb2ac..f0fc78a6b71cb 100644 --- a/python_modules/libraries/dagster-airbyte/dagster_airbyte_tests/experimental/test_asset_specs.py +++ b/python_modules/libraries/dagster-airbyte/dagster_airbyte_tests/experimental/test_asset_specs.py @@ -77,11 +77,11 @@ def test_cached_load_spec_single_resource( client_secret=EnvVar("AIRBYTE_CLIENT_SECRET"), ) - # load asset specs a first time + # load asset specs, calls are made workspace.load_asset_specs() assert len(fetch_workspace_data_api_mocks.calls) == 4 - # load asset specs a first time, no additional calls are made + # load asset specs another time, no additional calls are made workspace.load_asset_specs() assert len(fetch_workspace_data_api_mocks.calls) == 4