diff --git a/python_modules/dagster/dagster/_core/definitions/declarative_automation/automation_condition.py b/python_modules/dagster/dagster/_core/definitions/declarative_automation/automation_condition.py index f478c57dc9986..ea092e315702e 100644 --- a/python_modules/dagster/dagster/_core/definitions/declarative_automation/automation_condition.py +++ b/python_modules/dagster/dagster/_core/definitions/declarative_automation/automation_condition.py @@ -2,7 +2,7 @@ from abc import ABC, abstractmethod from typing import TYPE_CHECKING, Any, Mapping, NamedTuple, Optional, Sequence, Union -from dagster._annotations import experimental +from dagster._annotations import experimental, public from dagster._core.asset_graph_view.asset_graph_view import AssetSlice, TemporalContext from dagster._core.definitions.asset_key import AssetKey from dagster._core.definitions.asset_subset import AssetSubset @@ -49,6 +49,41 @@ class AutomationCondition(ABC): + """An AutomationCondition represents a condition of an asset that impacts whether it should be + automatically executed. For example, you can have a condition which becomes true whenever the + code version of the asset is changed, or whenever an upstream dependency is updated. + + .. code-block:: python + + from dagster import AutomationCondition, asset + + @asset(automation_condition=AutomationCondition.on_cron("0 0 * * *")) + def my_asset(): ... + + AutomationConditions may be combined together into expressions using a variety of operators. + + .. code-block:: python + + from dagster import AssetSelection, AutomationCondition, asset + + # any dependencies from the "important" group are missing + any_important_deps_missing = AutomationCondition.any_deps_match( + AutomationCondition.missing(), + ).allow(AssetSelection.groups("important")) + + # there is a new code version for this asset since the last time it was requested + new_code_version = AutomationCondition.code_version_changed().since( + AutomationCondition.newly_requested() + ) + + # there is a new code version and no important dependencies are missing + my_condition = new_code_version & ~any_important_deps_missing + + @asset(automation_condition=my_condition) + def my_asset(): ... + + """ + @property def requires_cursor(self) -> bool: return False @@ -119,10 +154,6 @@ def is_rule_condition(self): def evaluate(self, context: "AutomationContext") -> "AutomationResult": raise NotImplementedError() - def with_label(self, label: Optional[str]) -> "AutomationCondition": - """Returns a copy of this AutomationCondition with a human-readable label.""" - return copy(self, label=label) - def __and__(self, other: "AutomationCondition") -> "AndAutomationCondition": from .operators import AndAutomationCondition @@ -144,6 +175,11 @@ def __invert__(self) -> "NotAutomationCondition": return NotAutomationCondition(operand=self) + @public + def with_label(self, label: Optional[str]) -> "AutomationCondition": + """Returns a copy of this AutomationCondition with a human-readable label.""" + return copy(self, label=label) + def since(self, reset_condition: "AutomationCondition") -> "SinceCondition": """Returns a AutomationCondition that is true if this condition has become true since the last time the reference condition became true. @@ -160,6 +196,7 @@ def newly_true(self) -> "NewlyTrueCondition": return NewlyTrueCondition(operand=self) + @public @experimental @staticmethod def any_deps_match(condition: "AutomationCondition") -> "AnyDepsCondition": @@ -174,6 +211,7 @@ def any_deps_match(condition: "AutomationCondition") -> "AnyDepsCondition": return AnyDepsCondition(operand=condition) + @public @experimental @staticmethod def all_deps_match(condition: "AutomationCondition") -> "AllDepsCondition": @@ -188,6 +226,7 @@ def all_deps_match(condition: "AutomationCondition") -> "AllDepsCondition": return AllDepsCondition(operand=condition) + @public @experimental @staticmethod def missing() -> "MissingAutomationCondition": @@ -198,6 +237,7 @@ def missing() -> "MissingAutomationCondition": return MissingAutomationCondition() + @public @experimental @staticmethod def in_progress() -> "InProgressAutomationCondition": @@ -206,6 +246,7 @@ def in_progress() -> "InProgressAutomationCondition": return InProgressAutomationCondition() + @public @experimental @staticmethod def failed() -> "FailedAutomationCondition": @@ -214,6 +255,7 @@ def failed() -> "FailedAutomationCondition": return FailedAutomationCondition() + @public @experimental @staticmethod def in_latest_time_window( @@ -232,6 +274,7 @@ def in_latest_time_window( return InLatestTimeWindowCondition.from_lookback_delta(lookback_delta) + @public @experimental @staticmethod def will_be_requested() -> "WillBeRequestedCondition": @@ -240,6 +283,7 @@ def will_be_requested() -> "WillBeRequestedCondition": return WillBeRequestedCondition() + @public @experimental @staticmethod def newly_updated() -> "NewlyUpdatedCondition": @@ -248,6 +292,7 @@ def newly_updated() -> "NewlyUpdatedCondition": return NewlyUpdatedCondition() + @public @experimental @staticmethod def newly_requested() -> "NewlyRequestedCondition": @@ -256,6 +301,7 @@ def newly_requested() -> "NewlyRequestedCondition": return NewlyRequestedCondition() + @public @experimental @staticmethod def code_version_changed() -> "CodeVersionChangedCondition": @@ -266,6 +312,7 @@ def code_version_changed() -> "CodeVersionChangedCondition": return CodeVersionChangedCondition() + @public @experimental @staticmethod def cron_tick_passed( @@ -276,6 +323,7 @@ def cron_tick_passed( return CronTickPassedCondition(cron_schedule=cron_schedule, cron_timezone=cron_timezone) + @public @experimental @staticmethod def eager() -> "AutomationCondition": @@ -316,6 +364,7 @@ def eager() -> "AutomationCondition": & ~AutomationCondition.in_progress() ).with_label("eager") + @public @experimental @staticmethod def on_cron(cron_schedule: str, cron_timezone: str = "UTC") -> "AutomationCondition": @@ -347,6 +396,7 @@ def on_cron(cron_schedule: str, cron_timezone: str = "UTC") -> "AutomationCondit & all_deps_updated_since_cron ).with_label(f"on cron {cron_label}") + @public @experimental @staticmethod def any_downstream_conditions() -> "AnyDownstreamConditionsCondition":