diff --git a/python_modules/libraries/dagster-airbyte/dagster_airbyte/asset_decorator.py b/python_modules/libraries/dagster-airbyte/dagster_airbyte/asset_decorator.py index d03ef85395124..1baa7bf3c6035 100644 --- a/python_modules/libraries/dagster-airbyte/dagster_airbyte/asset_decorator.py +++ b/python_modules/libraries/dagster-airbyte/dagster_airbyte/asset_decorator.py @@ -45,8 +45,6 @@ def airbyte_assets( @airbyte_assets( connection_id="airbyte_connection_id", - name="airbyte_connection_id", - group_name="airbyte_connection_id", workspace=airbyte_workspace, ) def airbyte_connection_assets(context: dg.AssetExecutionContext, airbyte: AirbyteCloudWorkspace): @@ -74,8 +72,8 @@ def airbyte_connection_assets(context: dg.AssetExecutionContext, airbyte: Airbyt class CustomDagsterAirbyteTranslator(DagsterAirbyteTranslator): def get_asset_spec(self, props: AirbyteConnectionTableProps) -> dg.AssetSpec: default_spec = super().get_asset_spec(props) - return default_spec.replace_attributes( - key=asset_spec.key.with_prefix("my_prefix"), + return default_spec.merge_attributes( + metadata={"custom": "metadata"}, ) airbyte_workspace = AirbyteCloudWorkspace( @@ -87,8 +85,6 @@ def get_asset_spec(self, props: AirbyteConnectionTableProps) -> dg.AssetSpec: @airbyte_assets( connection_id="airbyte_connection_id", - name="airbyte_connection_id", - group_name="airbyte_connection_id", workspace=airbyte_workspace, dagster_airbyte_translator=CustomDagsterAirbyteTranslator() ) @@ -104,7 +100,7 @@ def airbyte_connection_assets(context: dg.AssetExecutionContext, airbyte: Airbyt return multi_asset( name=name, group_name=group_name, - can_subset=False, + can_subset=True, specs=[ spec for spec in workspace.load_asset_specs( diff --git a/python_modules/libraries/dagster-airbyte/dagster_airbyte/asset_defs.py b/python_modules/libraries/dagster-airbyte/dagster_airbyte/asset_defs.py index fe55c2c4f3193..6d04b782ac063 100644 --- a/python_modules/libraries/dagster-airbyte/dagster_airbyte/asset_defs.py +++ b/python_modules/libraries/dagster-airbyte/dagster_airbyte/asset_defs.py @@ -1102,8 +1102,8 @@ def build_airbyte_assets_definitions( class CustomDagsterAirbyteTranslator(DagsterAirbyteTranslator): def get_asset_spec(self, props: AirbyteConnectionTableProps) -> dg.AssetSpec: default_spec = super().get_asset_spec(props) - return default_spec.replace_attributes( - key=asset_spec.key.with_prefix("my_prefix"), + return default_spec.merge_attributes( + metadata={"custom": "metadata"}, ) airbyte_workspace = AirbyteCloudWorkspace( @@ -1129,19 +1129,22 @@ def get_asset_spec(self, props: AirbyteConnectionTableProps) -> dg.AssetSpec: dagster_airbyte_translator=dagster_airbyte_translator ) - connection_ids = { - check.not_none(AirbyteMetadataSet.extract(spec.metadata).connection_id) + connections = { + ( + check.not_none(AirbyteMetadataSet.extract(spec.metadata).connection_id), + check.not_none(AirbyteMetadataSet.extract(spec.metadata).connection_name), + ) for spec in all_asset_specs } _asset_fns = [] - for connection_id in connection_ids: + for connection_id, connection_name in connections: @airbyte_assets( connection_id=connection_id, workspace=workspace, - name=_clean_name(connection_id), - group_name=_clean_name(connection_id), + name=_clean_name(connection_name), + group_name=_clean_name(connection_name), dagster_airbyte_translator=dagster_airbyte_translator, ) def _asset_fn(context: AssetExecutionContext, airbyte: AirbyteCloudWorkspace):