Skip to content

Commit

Permalink
fix(dbt): support MultiPartitionMapping (#23363)
Browse files Browse the repository at this point in the history
fix build_dbt_multi_asset_args to accept MultiParitionMapping:
- deps var was defined as a set, and was throwing an error when trying
to insert an AssetDep with a MultiPartitionMapping
- MultiPartitionMapping is taking in a dict, making it unhashable
Error I was getting
```
File "/Users/username/dev/data_orchestrations/project/internals/assets/dbt_assets.py", line 21, in dbt_asset_factory
    @dbt_assets(
     ^^^^^^^^^^^
  File "/Users/username/dev/data_orchestrations/project/.venv/lib/python3.12/site-packages/dagster/_core/decorator_utils.py", line 223, in wrapped_with_context_manager_fn
    return fn(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^
  File "/Users/username/dev/data_orchestrations/project/.venv/lib/python3.12/site-packages/dagster_dbt/asset_decorator.py", line 311, in dbt_assets
    ) = build_dbt_multi_asset_args(
        ^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/username/dev/data_orchestrations/project/.venv/lib/python3.12/site-packages/dagster_dbt/asset_utils.py", line 995, in build_dbt_multi_asset_args
    deps.add(
TypeError: unhashable type: 'dict'
```

## Summary & Motivation
I have flagged this issues while working on my own pipelines 

## How I Tested These Changes

Co-authored-by: Rex Ledesma <[email protected]>
  • Loading branch information
arookieds and rexledesma authored Aug 15, 2024
1 parent 1a02c86 commit 0b1ba53
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 12 deletions.
20 changes: 8 additions & 12 deletions python_modules/libraries/dagster-dbt/dagster_dbt/asset_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -758,7 +758,7 @@ def build_dbt_multi_asset_args(
asset_resource_types=ASSET_RESOURCE_TYPES,
)

deps: Set[AssetDep] = set()
deps: Dict[AssetKey, AssetDep] = {}
outs: Dict[str, AssetOut] = {}
internal_asset_deps: Dict[str, Set[AssetKey]] = {}
check_specs_by_key: Dict[AssetCheckKey, AssetCheckSpec] = {}
Expand Down Expand Up @@ -870,23 +870,19 @@ def build_dbt_multi_asset_args(

# Mark this parent as an input if it has no dependencies
if parent_unique_id not in dbt_unique_id_deps:
deps.add(
AssetDep(
asset=parent_asset_key,
partition_mapping=parent_partition_mapping,
)
deps[parent_asset_key] = AssetDep(
asset=parent_asset_key,
partition_mapping=parent_partition_mapping,
)

self_partition_mapping = dagster_dbt_translator.get_partition_mapping(
dbt_resource_props,
dbt_parent_resource_props=dbt_resource_props,
)
if self_partition_mapping and has_self_dependency(dbt_resource_props):
deps.add(
AssetDep(
asset=asset_key,
partition_mapping=self_partition_mapping,
)
deps[asset_key] = AssetDep(
asset=asset_key,
partition_mapping=self_partition_mapping,
)
output_internal_deps.add(asset_key)

Expand Down Expand Up @@ -923,7 +919,7 @@ def build_dbt_multi_asset_args(
"\n\n".join([DUPLICATE_ASSET_KEY_ERROR_MESSAGE, *error_messages])
)

return list(deps), outs, internal_asset_deps, list(check_specs_by_key.values())
return list(deps.values()), outs, internal_asset_deps, list(check_specs_by_key.values())


def get_asset_deps(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,17 @@
DailyPartitionsDefinition,
Definitions,
DependencyDefinition,
DimensionPartitionMapping,
FreshnessPolicy,
Jitter,
LastPartitionMapping,
MultiPartitionMapping,
NodeInvocation,
OpDefinition,
PartitionMapping,
PartitionsDefinition,
RetryPolicy,
StaticPartitionMapping,
StaticPartitionsDefinition,
TimeWindowPartitionMapping,
asset,
Expand Down Expand Up @@ -467,6 +470,18 @@ def my_dbt_assets(): ...
None,
LastPartitionMapping(),
TimeWindowPartitionMapping(start_offset=-1, end_offset=-1),
MultiPartitionMapping(
{
"abc": DimensionPartitionMapping(
dimension_name="123",
partition_mapping=StaticPartitionMapping({"a": "1", "b": "2", "c": "3"}),
),
"weekly": DimensionPartitionMapping(
dimension_name="daily",
partition_mapping=TimeWindowPartitionMapping(),
),
}
),
],
)
def test_with_partition_mappings(
Expand Down

0 comments on commit 0b1ba53

Please sign in to comment.