From abf0d5478c9df1fddf6d1eb448b0fd6b8dbe7154 Mon Sep 17 00:00:00 2001 From: OwenKephart Date: Tue, 15 Oct 2024 10:38:59 -0700 Subject: [PATCH] [bug] Fix issue with partition mapping handling in the EntityMatches condition (#25278) ## Summary & Motivation As title. For self-dependent assets, the inference logic that determines the direction we should map partitions in does not work for self-dependent assets, as these are always upstream of themselves, meaning from_key will always be a parent of to_key, and we will always take the first branch. This solves the issue by adding an optional direction parameter to the EntityMatches condition, and threads that through to the underlying mapping function, allowing us to determine what direction we should map our partitions in. ## How I Tested These Changes ## Changelog Fixed an issue which could cause incorrect evaluation results when using self-dependent partition mappings with `AutomationConditions` that operate over dependencies. --- .../asset_graph_view/asset_graph_view.py | 10 ++-- .../_core/asset_graph_view/entity_subset.py | 18 +++++- .../automation_condition_evaluator.py | 4 +- .../operators/dep_operators.py | 18 +++++- .../builtins/test_eager_condition.py | 55 +++++++++++++++++++ 5 files changed, 94 insertions(+), 11 deletions(-) diff --git a/python_modules/dagster/dagster/_core/asset_graph_view/asset_graph_view.py b/python_modules/dagster/dagster/_core/asset_graph_view/asset_graph_view.py index f921bea80a7f8..8b4528e8b5b91 100644 --- a/python_modules/dagster/dagster/_core/asset_graph_view/asset_graph_view.py +++ b/python_modules/dagster/dagster/_core/asset_graph_view/asset_graph_view.py @@ -1,5 +1,5 @@ from datetime import datetime, timedelta -from typing import TYPE_CHECKING, AbstractSet, Dict, NamedTuple, Optional, Type, TypeVar +from typing import TYPE_CHECKING, AbstractSet, Dict, Literal, NamedTuple, Optional, Type, TypeVar from dagster import _check as check from dagster._core.asset_graph_view.entity_subset import EntitySubset, _ValidatedEntitySubsetValue @@ -219,7 +219,7 @@ def compute_parent_subset( check.invariant( parent_key in self.asset_graph.get(subset.key).parent_entity_keys, ) - return self.compute_mapped_subset(parent_key, subset) + return self.compute_mapped_subset(parent_key, subset, direction="up") def compute_child_subset( self, child_key: T_EntityKey, subset: EntitySubset[U_EntityKey] @@ -227,10 +227,10 @@ def compute_child_subset( check.invariant( child_key in self.asset_graph.get(subset.key).child_entity_keys, ) - return self.compute_mapped_subset(child_key, subset) + return self.compute_mapped_subset(child_key, subset, direction="down") def compute_mapped_subset( - self, to_key: T_EntityKey, from_subset: EntitySubset + self, to_key: T_EntityKey, from_subset: EntitySubset, direction: Literal["up", "down"] ) -> EntitySubset[T_EntityKey]: from_key = from_subset.key from_partitions_def = self.asset_graph.get(from_key).partitions_def @@ -238,7 +238,7 @@ def compute_mapped_subset( partition_mapping = self.asset_graph.get_partition_mapping(from_key, to_key) - if from_key in self.asset_graph.get(to_key).parent_entity_keys: + if direction == "down": if from_partitions_def is None or to_partitions_def is None: return ( self.get_empty_subset(key=to_key) diff --git a/python_modules/dagster/dagster/_core/asset_graph_view/entity_subset.py b/python_modules/dagster/dagster/_core/asset_graph_view/entity_subset.py index 4beed6f4998fa..e243c0bbd49d7 100644 --- a/python_modules/dagster/dagster/_core/asset_graph_view/entity_subset.py +++ b/python_modules/dagster/dagster/_core/asset_graph_view/entity_subset.py @@ -1,5 +1,15 @@ import operator -from typing import TYPE_CHECKING, AbstractSet, Any, Callable, Generic, NamedTuple, TypeVar, Union +from typing import ( + TYPE_CHECKING, + AbstractSet, + Any, + Callable, + Generic, + Literal, + NamedTuple, + TypeVar, + Union, +) from typing_extensions import Self @@ -105,8 +115,10 @@ def compute_child_subset(self, child_key: U_EntityKey) -> "EntitySubset[U_Entity return self._asset_graph_view.compute_child_subset(child_key, self) @cached_method - def compute_mapped_subset(self, to_key: U_EntityKey) -> "EntitySubset[U_EntityKey]": - return self._asset_graph_view.compute_mapped_subset(to_key, self) + def compute_mapped_subset( + self, to_key: U_EntityKey, direction: Literal["up", "down"] + ) -> "EntitySubset[U_EntityKey]": + return self._asset_graph_view.compute_mapped_subset(to_key, self, direction=direction) @property def size(self) -> int: diff --git a/python_modules/dagster/dagster/_core/definitions/declarative_automation/automation_condition_evaluator.py b/python_modules/dagster/dagster/_core/definitions/declarative_automation/automation_condition_evaluator.py index 3affaa3c1082a..418698794ddb7 100644 --- a/python_modules/dagster/dagster/_core/definitions/declarative_automation/automation_condition_evaluator.py +++ b/python_modules/dagster/dagster/_core/definitions/declarative_automation/automation_condition_evaluator.py @@ -177,7 +177,9 @@ def _handle_execution_set(self, result: AutomationResult[AssetKey]) -> None: # evaluated it may have had a different requested subset. however, because # all these neighbors must be executed as a unit, we need to union together # the subset of all required neighbors - neighbor_true_subset = result.true_subset.compute_mapped_subset(neighbor_key) + neighbor_true_subset = result.true_subset.compute_mapped_subset( + neighbor_key, direction="up" + ) if neighbor_key in self.current_results_by_key: self.current_results_by_key[ neighbor_key diff --git a/python_modules/dagster/dagster/_core/definitions/declarative_automation/operators/dep_operators.py b/python_modules/dagster/dagster/_core/definitions/declarative_automation/operators/dep_operators.py index 5aa5f9dabbb71..24a8c9f4d8e58 100644 --- a/python_modules/dagster/dagster/_core/definitions/declarative_automation/operators/dep_operators.py +++ b/python_modules/dagster/dagster/_core/definitions/declarative_automation/operators/dep_operators.py @@ -31,14 +31,28 @@ def name(self) -> str: return self.key.to_user_string() def evaluate(self, context: AutomationContext[T_EntityKey]) -> AutomationResult[T_EntityKey]: - to_candidate_subset = context.candidate_subset.compute_mapped_subset(self.key) + # if the key we're mapping to is a child of the key we're mapping from and is not + # self-dependent, use the downstream mapping function, otherwise use upstream + if ( + self.key in context.asset_graph.get(context.key).child_entity_keys + and self.key != context.key + ): + directions = ("down", "up") + else: + directions = ("up", "down") + + to_candidate_subset = context.candidate_subset.compute_mapped_subset( + self.key, direction=directions[0] + ) to_context = context.for_child_condition( child_condition=self.operand, child_index=0, candidate_subset=to_candidate_subset ) to_result = self.operand.evaluate(to_context) - true_subset = to_result.true_subset.compute_mapped_subset(context.key) + true_subset = to_result.true_subset.compute_mapped_subset( + context.key, direction=directions[1] + ) return AutomationResult(context=context, true_subset=true_subset, child_results=[to_result]) diff --git a/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/automation_condition_tests/builtins/test_eager_condition.py b/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/automation_condition_tests/builtins/test_eager_condition.py index 6f65a9baf17ca..50a1d5699d2a2 100644 --- a/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/automation_condition_tests/builtins/test_eager_condition.py +++ b/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/automation_condition_tests/builtins/test_eager_condition.py @@ -1,8 +1,15 @@ from dagster import ( + AssetDep, AssetKey, + AssetMaterialization, AutomationCondition, + DailyPartitionsDefinition, Definitions, + DimensionPartitionMapping, + MultiPartitionMapping, + MultiPartitionsDefinition, StaticPartitionsDefinition, + TimeWindowPartitionMapping, asset, evaluate_automation_conditions, ) @@ -149,3 +156,51 @@ def B() -> None: ... defs=_get_defs(four_partitions), instance=instance, cursor=result.cursor ) assert result.total_requested == 0 + + +def test_eager_multi_partitioned_self_dependency() -> None: + pd = MultiPartitionsDefinition( + { + "time": DailyPartitionsDefinition(start_date="2024-08-01"), + "static": StaticPartitionsDefinition(["a", "b", "c"]), + } + ) + + @asset(partitions_def=pd) + def parent() -> None: ... + + @asset( + deps=[ + parent, + AssetDep( + "child", + partition_mapping=MultiPartitionMapping( + { + "time": DimensionPartitionMapping( + "time", TimeWindowPartitionMapping(start_offset=-1, end_offset=-1) + ), + } + ), + ), + ], + partitions_def=pd, + automation_condition=AutomationCondition.eager().without( + AutomationCondition.in_latest_time_window() + ), + ) + def child() -> None: ... + + defs = Definitions(assets=[parent, child]) + + with instance_for_test() as instance: + # nothing happening + result = evaluate_automation_conditions(defs=defs, instance=instance) + assert result.total_requested == 0 + + # materialize upstream + instance.report_runless_asset_event( + AssetMaterialization("parent", partition="a|2024-08-16") + ) + result = evaluate_automation_conditions(defs=defs, instance=instance, cursor=result.cursor) + # can't materialize downstream yet because previous partition of child is still missing + assert result.total_requested == 0