From 0b1ba5315e206add7480dde364b88241ad103377 Mon Sep 17 00:00:00 2001 From: ARookieDS Date: Thu, 15 Aug 2024 20:53:48 +0200 Subject: [PATCH] fix(dbt): support `MultiPartitionMapping` (#23363) fix build_dbt_multi_asset_args to accept MultiParitionMapping: - deps var was defined as a set, and was throwing an error when trying to insert an AssetDep with a MultiPartitionMapping - MultiPartitionMapping is taking in a dict, making it unhashable Error I was getting ``` File "/Users/username/dev/data_orchestrations/project/internals/assets/dbt_assets.py", line 21, in dbt_asset_factory @dbt_assets( ^^^^^^^^^^^ File "/Users/username/dev/data_orchestrations/project/.venv/lib/python3.12/site-packages/dagster/_core/decorator_utils.py", line 223, in wrapped_with_context_manager_fn return fn(*args, **kwargs) ^^^^^^^^^^^^^^^^^^^ File "/Users/username/dev/data_orchestrations/project/.venv/lib/python3.12/site-packages/dagster_dbt/asset_decorator.py", line 311, in dbt_assets ) = build_dbt_multi_asset_args( ^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/Users/username/dev/data_orchestrations/project/.venv/lib/python3.12/site-packages/dagster_dbt/asset_utils.py", line 995, in build_dbt_multi_asset_args deps.add( TypeError: unhashable type: 'dict' ``` ## Summary & Motivation I have flagged this issues while working on my own pipelines ## How I Tested These Changes Co-authored-by: Rex Ledesma --- .../dagster-dbt/dagster_dbt/asset_utils.py | 20 ++++++++----------- .../core/test_asset_decorator.py | 15 ++++++++++++++ 2 files changed, 23 insertions(+), 12 deletions(-) diff --git a/python_modules/libraries/dagster-dbt/dagster_dbt/asset_utils.py b/python_modules/libraries/dagster-dbt/dagster_dbt/asset_utils.py index 11fc5c39f0705..34a2d86e8477f 100644 --- a/python_modules/libraries/dagster-dbt/dagster_dbt/asset_utils.py +++ b/python_modules/libraries/dagster-dbt/dagster_dbt/asset_utils.py @@ -758,7 +758,7 @@ def build_dbt_multi_asset_args( asset_resource_types=ASSET_RESOURCE_TYPES, ) - deps: Set[AssetDep] = set() + deps: Dict[AssetKey, AssetDep] = {} outs: Dict[str, AssetOut] = {} internal_asset_deps: Dict[str, Set[AssetKey]] = {} check_specs_by_key: Dict[AssetCheckKey, AssetCheckSpec] = {} @@ -870,11 +870,9 @@ def build_dbt_multi_asset_args( # Mark this parent as an input if it has no dependencies if parent_unique_id not in dbt_unique_id_deps: - deps.add( - AssetDep( - asset=parent_asset_key, - partition_mapping=parent_partition_mapping, - ) + deps[parent_asset_key] = AssetDep( + asset=parent_asset_key, + partition_mapping=parent_partition_mapping, ) self_partition_mapping = dagster_dbt_translator.get_partition_mapping( @@ -882,11 +880,9 @@ def build_dbt_multi_asset_args( dbt_parent_resource_props=dbt_resource_props, ) if self_partition_mapping and has_self_dependency(dbt_resource_props): - deps.add( - AssetDep( - asset=asset_key, - partition_mapping=self_partition_mapping, - ) + deps[asset_key] = AssetDep( + asset=asset_key, + partition_mapping=self_partition_mapping, ) output_internal_deps.add(asset_key) @@ -923,7 +919,7 @@ def build_dbt_multi_asset_args( "\n\n".join([DUPLICATE_ASSET_KEY_ERROR_MESSAGE, *error_messages]) ) - return list(deps), outs, internal_asset_deps, list(check_specs_by_key.values()) + return list(deps.values()), outs, internal_asset_deps, list(check_specs_by_key.values()) def get_asset_deps( diff --git a/python_modules/libraries/dagster-dbt/dagster_dbt_tests/core/test_asset_decorator.py b/python_modules/libraries/dagster-dbt/dagster_dbt_tests/core/test_asset_decorator.py index 0dffea0a042d1..6bef0a293efa2 100644 --- a/python_modules/libraries/dagster-dbt/dagster_dbt_tests/core/test_asset_decorator.py +++ b/python_modules/libraries/dagster-dbt/dagster_dbt_tests/core/test_asset_decorator.py @@ -12,14 +12,17 @@ DailyPartitionsDefinition, Definitions, DependencyDefinition, + DimensionPartitionMapping, FreshnessPolicy, Jitter, LastPartitionMapping, + MultiPartitionMapping, NodeInvocation, OpDefinition, PartitionMapping, PartitionsDefinition, RetryPolicy, + StaticPartitionMapping, StaticPartitionsDefinition, TimeWindowPartitionMapping, asset, @@ -467,6 +470,18 @@ def my_dbt_assets(): ... None, LastPartitionMapping(), TimeWindowPartitionMapping(start_offset=-1, end_offset=-1), + MultiPartitionMapping( + { + "abc": DimensionPartitionMapping( + dimension_name="123", + partition_mapping=StaticPartitionMapping({"a": "1", "b": "2", "c": "3"}), + ), + "weekly": DimensionPartitionMapping( + dimension_name="daily", + partition_mapping=TimeWindowPartitionMapping(), + ), + } + ), ], ) def test_with_partition_mappings(