From 25ca2ca53cff44b4897b29618f09571b1afce42f Mon Sep 17 00:00:00 2001 From: Deepyaman Datta Date: Fri, 22 Nov 2024 12:39:23 -0700 Subject: [PATCH] Do not collapse `AutomationCondition`s with labels (#26097) ## Summary & Motivation Imagine you have: ```python a = (AutomationCondition.foo() & AutomationCondition.bar()).with_label("something") b = a & AutomationCondition.baz() ``` The implementation of `__and__` automatically transforms `And(And(c1, c2), c3)` into `And(c1, c2, c3)`. This process is generally desirable, as this reduces the number of levels in the hierarchy, making it easier to parse and understand expressions. However, if an expression has a label, then this indicates that it has its own semantic meaning, which gets erased by this process. We should prevent this automatic collapsing in cases where the existing condition's label is not `None`. The same applies to `__or__`! ## How I Tested These Changes Added unit tests in python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/automation_condition_tests/fundamentals/test_automation_condition.py. ## Changelog Fixed a bug where using the `&` or `|` operators on `AutomationCondition`s with labels would cause that label to be erased. --- .../automation_condition.py | 38 ++++++-- .../fundamentals/test_automation_condition.py | 87 +++++++++++++++++++ 2 files changed, 117 insertions(+), 8 deletions(-) diff --git a/python_modules/dagster/dagster/_core/definitions/declarative_automation/automation_condition.py b/python_modules/dagster/dagster/_core/definitions/declarative_automation/automation_condition.py index 0adb6dcea5e25..e8c5634a76012 100644 --- a/python_modules/dagster/dagster/_core/definitions/declarative_automation/automation_condition.py +++ b/python_modules/dagster/dagster/_core/definitions/declarative_automation/automation_condition.py @@ -177,20 +177,42 @@ def __and__( AndAutomationCondition, ) - # group AndAutomationConditions together - if isinstance(self, AndAutomationCondition): - return AndAutomationCondition(operands=[*self.operands, other]) - return AndAutomationCondition(operands=[self, other]) + # Consolidate any unlabeled `AndAutomationCondition`s together. + return AndAutomationCondition( + operands=[ + *( + self.operands + if isinstance(self, AndAutomationCondition) and self.label is None + else (self,) + ), + *( + other.operands + if isinstance(other, AndAutomationCondition) and other.label is None + else (other,) + ), + ] + ) def __or__( self, other: "AutomationCondition[T_EntityKey]" ) -> "BuiltinAutomationCondition[T_EntityKey]": from dagster._core.definitions.declarative_automation.operators import OrAutomationCondition - # group OrAutomationConditions together - if isinstance(self, OrAutomationCondition): - return OrAutomationCondition(operands=[*self.operands, other]) - return OrAutomationCondition(operands=[self, other]) + # Consolidate any unlabeled `OrAutomationCondition`s together. + return OrAutomationCondition( + operands=[ + *( + self.operands + if isinstance(self, OrAutomationCondition) and self.label is None + else (self,) + ), + *( + other.operands + if isinstance(other, OrAutomationCondition) and other.label is None + else (other,) + ), + ] + ) def __invert__(self) -> "BuiltinAutomationCondition[T_EntityKey]": from dagster._core.definitions.declarative_automation.operators import ( diff --git a/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/automation_condition_tests/fundamentals/test_automation_condition.py b/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/automation_condition_tests/fundamentals/test_automation_condition.py index 1486bfa1bec55..7b3d97f0eed18 100644 --- a/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/automation_condition_tests/fundamentals/test_automation_condition.py +++ b/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/automation_condition_tests/fundamentals/test_automation_condition.py @@ -1,10 +1,15 @@ import datetime +import operator import pytest from dagster import AutoMaterializePolicy, AutomationCondition, Definitions, asset from dagster._check.functions import CheckError from dagster._core.definitions.declarative_automation.automation_condition import AutomationResult from dagster._core.definitions.declarative_automation.automation_context import AutomationContext +from dagster._core.definitions.declarative_automation.operators import ( + AndAutomationCondition, + OrAutomationCondition, +) from dagster._core.remote_representation.external_data import RepositorySnap from dagster._serdes import serialize_value from dagster._serdes.serdes import deserialize_value @@ -176,3 +181,85 @@ def test_without_automation_condition() -> None: with pytest.raises(CheckError, match="fewer than 2 operands"): orig.without(a).without(b) + + +@pytest.mark.parametrize( + ("op", "cond"), [(operator.and_, AndAutomationCondition), (operator.or_, OrAutomationCondition)] +) +def test_consolidate_automation_conditions(op, cond) -> None: + not_missing = ~AutomationCondition.missing() + not_in_progress = ~AutomationCondition.in_progress() + not_missing_not_in_progress = op(not_missing, not_in_progress) + in_latest_time_window = AutomationCondition.in_latest_time_window() + not_any_deps_in_progress = ~AutomationCondition.any_deps_in_progress() + in_latest_time_window_not_any_deps_in_progress = op( + in_latest_time_window, not_any_deps_in_progress + ) + + assert op(not_missing_not_in_progress, in_latest_time_window_not_any_deps_in_progress) == ( + cond( + operands=[ + not_missing, + not_in_progress, + in_latest_time_window, + not_any_deps_in_progress, + ] + ) + ) + + assert op(not_missing_not_in_progress, in_latest_time_window) == ( + cond( + operands=[ + not_missing, + not_in_progress, + in_latest_time_window, + ] + ) + ) + + second_labeled_automation_condition = in_latest_time_window_not_any_deps_in_progress.with_label( + "in_latest_time_window_not_any_deps_in_progress" + ) + assert op(not_missing_not_in_progress, second_labeled_automation_condition) == ( + cond( + operands=[ + not_missing, + not_in_progress, + second_labeled_automation_condition, + ] + ) + ) + + assert op(not_missing, in_latest_time_window_not_any_deps_in_progress) == ( + cond( + operands=[ + not_missing, + in_latest_time_window, + not_any_deps_in_progress, + ] + ) + ) + + first_labeled_automation_condition = not_missing_not_in_progress.with_label( + "not_missing_not_in_progress" + ) + assert op( + first_labeled_automation_condition, in_latest_time_window_not_any_deps_in_progress + ) == ( + cond( + operands=[ + first_labeled_automation_condition, + in_latest_time_window, + not_any_deps_in_progress, + ] + ) + ) + + assert op(first_labeled_automation_condition, second_labeled_automation_condition) == ( + cond( + operands=[ + first_labeled_automation_condition, + second_labeled_automation_condition, + ] + ) + )