From e7c4c96879390f6162246e3b3df5c899923c6ce6 Mon Sep 17 00:00:00 2001 From: OwenKephart Date: Fri, 13 Sep 2024 10:46:00 -0700 Subject: [PATCH] Ignore legacy AutoMaterializePolicies in AnyDownstreamConditions (#24411) ## Summary & Motivation Due to how we gate access to the legacy context object, and the fact that the "fully resolved" automation condition is not available until we execute the AnyDownstreamConditions condition (because we need access to the full asset graph), this would previously cause errors. These errors are weird, as the method says "any downstream *conditions*", whereas the downstream causing that failure has an AutoMaterializePolicy, not a condition. This change just ignores AMPs that are found downstream, avoiding the error ## How I Tested These Changes ## Changelog Fixed issue which could cause errors when using `AutomationCondition.any_downstream_condition()` with downstream `AutoMaterializePolicy` objects. - [ ] `NEW` _(added new feature or capability)_ - [x] `BUGFIX` _(fixed a bug)_ - [ ] `DOCS` _(added or updated documentation)_ --- .../any_downstream_conditions_operator.py | 15 ++++++++++++--- .../builtins/test_any_downstream_conditions.py | 18 ++++++++++++++++++ 2 files changed, 30 insertions(+), 3 deletions(-) diff --git a/python_modules/dagster/dagster/_core/definitions/declarative_automation/operators/any_downstream_conditions_operator.py b/python_modules/dagster/dagster/_core/definitions/declarative_automation/operators/any_downstream_conditions_operator.py index de3f5a00a4d52..3534620b4e00d 100644 --- a/python_modules/dagster/dagster/_core/definitions/declarative_automation/operators/any_downstream_conditions_operator.py +++ b/python_modules/dagster/dagster/_core/definitions/declarative_automation/operators/any_downstream_conditions_operator.py @@ -1,4 +1,4 @@ -from typing import AbstractSet, Optional, Sequence +from typing import AbstractSet, Mapping, Optional, Sequence from dagster._core.definitions.asset_key import AssetKey from dagster._core.definitions.declarative_automation.automation_condition import ( @@ -73,10 +73,19 @@ def _get_ignored_conditions( ignored_conditions.add(context.condition) return ignored_conditions + def _get_validated_downstream_conditions( + self, downstream_conditions: Mapping[AutomationCondition, AbstractSet[AssetKey]] + ) -> Mapping[AutomationCondition, AbstractSet[AssetKey]]: + return { + condition: keys + for condition, keys in downstream_conditions.items() + if not condition.has_rule_condition + } + def evaluate(self, context: AutomationContext) -> AutomationResult: ignored_conditions = self._get_ignored_conditions(context) - downstream_conditions = context.asset_graph.get_downstream_automation_conditions( - asset_key=context.asset_key + downstream_conditions = self._get_validated_downstream_conditions( + context.asset_graph.get_downstream_automation_conditions(asset_key=context.asset_key) ) true_slice = context.get_empty_slice() diff --git a/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/automation_condition_tests/builtins/test_any_downstream_conditions.py b/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/automation_condition_tests/builtins/test_any_downstream_conditions.py index 1979c2f631ad4..439ca331475b2 100644 --- a/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/automation_condition_tests/builtins/test_any_downstream_conditions.py +++ b/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/automation_condition_tests/builtins/test_any_downstream_conditions.py @@ -8,6 +8,7 @@ evaluate_automation_conditions, ) from dagster._core.definitions.asset_key import CoercibleToAssetKey +from dagster._core.definitions.auto_materialize_policy import AutoMaterializePolicy from dagster._core.definitions.declarative_automation.automation_condition import AutomationResult from dagster._core.definitions.declarative_automation.operators.boolean_operators import ( AndAutomationCondition, @@ -39,6 +40,23 @@ def b(): ... assert a_result.child_results[0].child_results[0].condition == cond2 +def test_basic_with_amp() -> None: + cond1 = AutomationCondition.any_downstream_conditions() + cond2 = AutoMaterializePolicy.eager() + + @asset(automation_condition=cond1) + def a(): ... + + @asset(auto_materialize_policy=cond2, deps=[a]) + def b(): ... + + result = evaluate_automation_conditions([a, b], instance=DagsterInstance.ephemeral()) + + a_result = _get_result(a.key, result.results) + # do not pick up child result + assert len(a_result.child_results) == 0 + + def test_multiple_downstreams() -> None: cond1 = AutomationCondition.any_downstream_conditions() cond2 = AutomationCondition.in_progress()