diff --git a/python_modules/dagster/dagster/_core/asset_graph_view/asset_graph_view.py b/python_modules/dagster/dagster/_core/asset_graph_view/asset_graph_view.py index f921bea80a7f8..8b4528e8b5b91 100644 --- a/python_modules/dagster/dagster/_core/asset_graph_view/asset_graph_view.py +++ b/python_modules/dagster/dagster/_core/asset_graph_view/asset_graph_view.py @@ -1,5 +1,5 @@ from datetime import datetime, timedelta -from typing import TYPE_CHECKING, AbstractSet, Dict, NamedTuple, Optional, Type, TypeVar +from typing import TYPE_CHECKING, AbstractSet, Dict, Literal, NamedTuple, Optional, Type, TypeVar from dagster import _check as check from dagster._core.asset_graph_view.entity_subset import EntitySubset, _ValidatedEntitySubsetValue @@ -219,7 +219,7 @@ def compute_parent_subset( check.invariant( parent_key in self.asset_graph.get(subset.key).parent_entity_keys, ) - return self.compute_mapped_subset(parent_key, subset) + return self.compute_mapped_subset(parent_key, subset, direction="up") def compute_child_subset( self, child_key: T_EntityKey, subset: EntitySubset[U_EntityKey] @@ -227,10 +227,10 @@ def compute_child_subset( check.invariant( child_key in self.asset_graph.get(subset.key).child_entity_keys, ) - return self.compute_mapped_subset(child_key, subset) + return self.compute_mapped_subset(child_key, subset, direction="down") def compute_mapped_subset( - self, to_key: T_EntityKey, from_subset: EntitySubset + self, to_key: T_EntityKey, from_subset: EntitySubset, direction: Literal["up", "down"] ) -> EntitySubset[T_EntityKey]: from_key = from_subset.key from_partitions_def = self.asset_graph.get(from_key).partitions_def @@ -238,7 +238,7 @@ def compute_mapped_subset( partition_mapping = self.asset_graph.get_partition_mapping(from_key, to_key) - if from_key in self.asset_graph.get(to_key).parent_entity_keys: + if direction == "down": if from_partitions_def is None or to_partitions_def is None: return ( self.get_empty_subset(key=to_key) diff --git a/python_modules/dagster/dagster/_core/asset_graph_view/entity_subset.py b/python_modules/dagster/dagster/_core/asset_graph_view/entity_subset.py index 4beed6f4998fa..e243c0bbd49d7 100644 --- a/python_modules/dagster/dagster/_core/asset_graph_view/entity_subset.py +++ b/python_modules/dagster/dagster/_core/asset_graph_view/entity_subset.py @@ -1,5 +1,15 @@ import operator -from typing import TYPE_CHECKING, AbstractSet, Any, Callable, Generic, NamedTuple, TypeVar, Union +from typing import ( + TYPE_CHECKING, + AbstractSet, + Any, + Callable, + Generic, + Literal, + NamedTuple, + TypeVar, + Union, +) from typing_extensions import Self @@ -105,8 +115,10 @@ def compute_child_subset(self, child_key: U_EntityKey) -> "EntitySubset[U_Entity return self._asset_graph_view.compute_child_subset(child_key, self) @cached_method - def compute_mapped_subset(self, to_key: U_EntityKey) -> "EntitySubset[U_EntityKey]": - return self._asset_graph_view.compute_mapped_subset(to_key, self) + def compute_mapped_subset( + self, to_key: U_EntityKey, direction: Literal["up", "down"] + ) -> "EntitySubset[U_EntityKey]": + return self._asset_graph_view.compute_mapped_subset(to_key, self, direction=direction) @property def size(self) -> int: diff --git a/python_modules/dagster/dagster/_core/definitions/declarative_automation/automation_condition_evaluator.py b/python_modules/dagster/dagster/_core/definitions/declarative_automation/automation_condition_evaluator.py index 3affaa3c1082a..418698794ddb7 100644 --- a/python_modules/dagster/dagster/_core/definitions/declarative_automation/automation_condition_evaluator.py +++ b/python_modules/dagster/dagster/_core/definitions/declarative_automation/automation_condition_evaluator.py @@ -177,7 +177,9 @@ def _handle_execution_set(self, result: AutomationResult[AssetKey]) -> None: # evaluated it may have had a different requested subset. however, because # all these neighbors must be executed as a unit, we need to union together # the subset of all required neighbors - neighbor_true_subset = result.true_subset.compute_mapped_subset(neighbor_key) + neighbor_true_subset = result.true_subset.compute_mapped_subset( + neighbor_key, direction="up" + ) if neighbor_key in self.current_results_by_key: self.current_results_by_key[ neighbor_key diff --git a/python_modules/dagster/dagster/_core/definitions/declarative_automation/operators/dep_operators.py b/python_modules/dagster/dagster/_core/definitions/declarative_automation/operators/dep_operators.py index 5aa5f9dabbb71..24a8c9f4d8e58 100644 --- a/python_modules/dagster/dagster/_core/definitions/declarative_automation/operators/dep_operators.py +++ b/python_modules/dagster/dagster/_core/definitions/declarative_automation/operators/dep_operators.py @@ -31,14 +31,28 @@ def name(self) -> str: return self.key.to_user_string() def evaluate(self, context: AutomationContext[T_EntityKey]) -> AutomationResult[T_EntityKey]: - to_candidate_subset = context.candidate_subset.compute_mapped_subset(self.key) + # if the key we're mapping to is a child of the key we're mapping from and is not + # self-dependent, use the downstream mapping function, otherwise use upstream + if ( + self.key in context.asset_graph.get(context.key).child_entity_keys + and self.key != context.key + ): + directions = ("down", "up") + else: + directions = ("up", "down") + + to_candidate_subset = context.candidate_subset.compute_mapped_subset( + self.key, direction=directions[0] + ) to_context = context.for_child_condition( child_condition=self.operand, child_index=0, candidate_subset=to_candidate_subset ) to_result = self.operand.evaluate(to_context) - true_subset = to_result.true_subset.compute_mapped_subset(context.key) + true_subset = to_result.true_subset.compute_mapped_subset( + context.key, direction=directions[1] + ) return AutomationResult(context=context, true_subset=true_subset, child_results=[to_result]) diff --git a/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/automation_condition_tests/builtins/test_eager_condition.py b/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/automation_condition_tests/builtins/test_eager_condition.py index 6f65a9baf17ca..50a1d5699d2a2 100644 --- a/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/automation_condition_tests/builtins/test_eager_condition.py +++ b/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/automation_condition_tests/builtins/test_eager_condition.py @@ -1,8 +1,15 @@ from dagster import ( + AssetDep, AssetKey, + AssetMaterialization, AutomationCondition, + DailyPartitionsDefinition, Definitions, + DimensionPartitionMapping, + MultiPartitionMapping, + MultiPartitionsDefinition, StaticPartitionsDefinition, + TimeWindowPartitionMapping, asset, evaluate_automation_conditions, ) @@ -149,3 +156,51 @@ def B() -> None: ... defs=_get_defs(four_partitions), instance=instance, cursor=result.cursor ) assert result.total_requested == 0 + + +def test_eager_multi_partitioned_self_dependency() -> None: + pd = MultiPartitionsDefinition( + { + "time": DailyPartitionsDefinition(start_date="2024-08-01"), + "static": StaticPartitionsDefinition(["a", "b", "c"]), + } + ) + + @asset(partitions_def=pd) + def parent() -> None: ... + + @asset( + deps=[ + parent, + AssetDep( + "child", + partition_mapping=MultiPartitionMapping( + { + "time": DimensionPartitionMapping( + "time", TimeWindowPartitionMapping(start_offset=-1, end_offset=-1) + ), + } + ), + ), + ], + partitions_def=pd, + automation_condition=AutomationCondition.eager().without( + AutomationCondition.in_latest_time_window() + ), + ) + def child() -> None: ... + + defs = Definitions(assets=[parent, child]) + + with instance_for_test() as instance: + # nothing happening + result = evaluate_automation_conditions(defs=defs, instance=instance) + assert result.total_requested == 0 + + # materialize upstream + instance.report_runless_asset_event( + AssetMaterialization("parent", partition="a|2024-08-16") + ) + result = evaluate_automation_conditions(defs=defs, instance=instance, cursor=result.cursor) + # can't materialize downstream yet because previous partition of child is still missing + assert result.total_requested == 0