From 4df9fdd81cd4cc8c6c88d5a4f9f372703724cac8 Mon Sep 17 00:00:00 2001 From: Maxime Armstrong Date: Wed, 18 Dec 2024 22:27:35 -0500 Subject: [PATCH] Move translator as metadata to specs loader --- .../dagster_airbyte/asset_decorator.py | 5 +--- .../dagster_airbyte/resources.py | 24 ++++++++++++------- 2 files changed, 17 insertions(+), 12 deletions(-) diff --git a/python_modules/libraries/dagster-airbyte/dagster_airbyte/asset_decorator.py b/python_modules/libraries/dagster-airbyte/dagster_airbyte/asset_decorator.py index 871a116a7f82e..9efe75ebed956 100644 --- a/python_modules/libraries/dagster-airbyte/dagster_airbyte/asset_decorator.py +++ b/python_modules/libraries/dagster-airbyte/dagster_airbyte/asset_decorator.py @@ -5,7 +5,6 @@ from dagster_airbyte.resources import AirbyteCloudWorkspace from dagster_airbyte.translator import AirbyteMetadataSet, DagsterAirbyteTranslator -from dagster_airbyte.utils import DAGSTER_AIRBYTE_TRANSLATOR_METADATA_KEY @experimental @@ -105,9 +104,7 @@ def airbyte_connection_assets(context: dg.AssetExecutionContext, airbyte: Airbyt group_name=group_name, can_subset=True, specs=[ - spec.merge_attributes( - metadata={DAGSTER_AIRBYTE_TRANSLATOR_METADATA_KEY: dagster_airbyte_translator} - ) + spec for spec in workspace.load_asset_specs( dagster_airbyte_translator=dagster_airbyte_translator ) diff --git a/python_modules/libraries/dagster-airbyte/dagster_airbyte/resources.py b/python_modules/libraries/dagster-airbyte/dagster_airbyte/resources.py index 9ddb8e386f8c2..5ae8903fa84c0 100644 --- a/python_modules/libraries/dagster-airbyte/dagster_airbyte/resources.py +++ b/python_modules/libraries/dagster-airbyte/dagster_airbyte/resources.py @@ -45,6 +45,7 @@ ) from dagster_airbyte.types import AirbyteOutput from dagster_airbyte.utils import ( + DAGSTER_AIRBYTE_TRANSLATOR_METADATA_KEY, get_airbyte_connection_table_name, get_translator_from_airbyte_assets, ) @@ -1339,16 +1340,23 @@ def load_airbyte_cloud_asset_specs( airbyte_cloud_specs = load_airbyte_cloud_asset_specs(airbyte_cloud_workspace) defs = dg.Definitions(assets=airbyte_cloud_specs) """ + dagster_airbyte_translator = dagster_airbyte_translator or DagsterAirbyteTranslator() + with workspace.process_config_and_initialize_cm() as initialized_workspace: - return check.is_list( - AirbyteCloudWorkspaceDefsLoader( - workspace=initialized_workspace, - translator=dagster_airbyte_translator or DagsterAirbyteTranslator(), + return [ + spec.merge_attributes( + metadata={DAGSTER_AIRBYTE_TRANSLATOR_METADATA_KEY: dagster_airbyte_translator} ) - .build_defs() - .assets, - AssetSpec, - ) + for spec in check.is_list( + AirbyteCloudWorkspaceDefsLoader( + workspace=initialized_workspace, + translator=dagster_airbyte_translator, + ) + .build_defs() + .assets, + AssetSpec, + ) + ] @record