diff --git a/python_modules/dagster/dagster/_core/definitions/declarative_automation/operators/any_downstream_conditions_operator.py b/python_modules/dagster/dagster/_core/definitions/declarative_automation/operators/any_downstream_conditions_operator.py index de3f5a00a4d52..3534620b4e00d 100644 --- a/python_modules/dagster/dagster/_core/definitions/declarative_automation/operators/any_downstream_conditions_operator.py +++ b/python_modules/dagster/dagster/_core/definitions/declarative_automation/operators/any_downstream_conditions_operator.py @@ -1,4 +1,4 @@ -from typing import AbstractSet, Optional, Sequence +from typing import AbstractSet, Mapping, Optional, Sequence from dagster._core.definitions.asset_key import AssetKey from dagster._core.definitions.declarative_automation.automation_condition import ( @@ -73,10 +73,19 @@ def _get_ignored_conditions( ignored_conditions.add(context.condition) return ignored_conditions + def _get_validated_downstream_conditions( + self, downstream_conditions: Mapping[AutomationCondition, AbstractSet[AssetKey]] + ) -> Mapping[AutomationCondition, AbstractSet[AssetKey]]: + return { + condition: keys + for condition, keys in downstream_conditions.items() + if not condition.has_rule_condition + } + def evaluate(self, context: AutomationContext) -> AutomationResult: ignored_conditions = self._get_ignored_conditions(context) - downstream_conditions = context.asset_graph.get_downstream_automation_conditions( - asset_key=context.asset_key + downstream_conditions = self._get_validated_downstream_conditions( + context.asset_graph.get_downstream_automation_conditions(asset_key=context.asset_key) ) true_slice = context.get_empty_slice() diff --git a/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/automation_condition_tests/builtins/test_any_downstream_conditions.py b/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/automation_condition_tests/builtins/test_any_downstream_conditions.py index 1979c2f631ad4..439ca331475b2 100644 --- a/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/automation_condition_tests/builtins/test_any_downstream_conditions.py +++ b/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/automation_condition_tests/builtins/test_any_downstream_conditions.py @@ -8,6 +8,7 @@ evaluate_automation_conditions, ) from dagster._core.definitions.asset_key import CoercibleToAssetKey +from dagster._core.definitions.auto_materialize_policy import AutoMaterializePolicy from dagster._core.definitions.declarative_automation.automation_condition import AutomationResult from dagster._core.definitions.declarative_automation.operators.boolean_operators import ( AndAutomationCondition, @@ -39,6 +40,23 @@ def b(): ... assert a_result.child_results[0].child_results[0].condition == cond2 +def test_basic_with_amp() -> None: + cond1 = AutomationCondition.any_downstream_conditions() + cond2 = AutoMaterializePolicy.eager() + + @asset(automation_condition=cond1) + def a(): ... + + @asset(auto_materialize_policy=cond2, deps=[a]) + def b(): ... + + result = evaluate_automation_conditions([a, b], instance=DagsterInstance.ephemeral()) + + a_result = _get_result(a.key, result.results) + # do not pick up child result + assert len(a_result.child_results) == 0 + + def test_multiple_downstreams() -> None: cond1 = AutomationCondition.any_downstream_conditions() cond2 = AutomationCondition.in_progress()