Skip to content

Commit

Permalink
[DA] Add doc block to AutomationCondition, mark static constructors p…
Browse files Browse the repository at this point in the history
…ublic
  • Loading branch information
OwenKephart committed Aug 16, 2024
1 parent a3524d6 commit db0e2cf
Showing 1 changed file with 55 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from abc import ABC, abstractmethod
from typing import TYPE_CHECKING, Any, Mapping, NamedTuple, Optional, Sequence, Union

from dagster._annotations import experimental
from dagster._annotations import experimental, public
from dagster._core.asset_graph_view.asset_graph_view import AssetSlice, TemporalContext
from dagster._core.definitions.asset_key import AssetKey
from dagster._core.definitions.asset_subset import AssetSubset
Expand Down Expand Up @@ -49,6 +49,41 @@


class AutomationCondition(ABC):
"""An AutomationCondition represents a condition of an asset that impacts whether it should be
automatically executed. For example, you can have a condition which becomes true whenever the
code version of the asset is changed, or whenever an upstream dependency is updated.
.. code-block:: python
from dagster import AutomationCondition, asset
@asset(automation_condition=AutomationCondition.on_cron("0 0 * * *"))
def my_asset(): ...
AutomationConditions may be combined together into expressions using a variety of operators.
.. code-block:: python
from dagster import AssetSelection, AutomationCondition, asset
# any dependencies from the "important" group are missing
any_important_deps_missing = AutomationCondition.any_deps_match(
AutomationCondition.missing(),
).allow(AssetSelection.groups("important"))
# there is a new code version for this asset since the last time it was requested
new_code_version = AutomationCondition.code_version_changed().since(
AutomationCondition.newly_requested()
)
# there is a new code version and no important dependencies are missing
my_condition = new_code_version & ~any_important_deps_missing
@asset(automation_condition=my_condition)
def my_asset(): ...
"""

@property
def requires_cursor(self) -> bool:
return False
Expand Down Expand Up @@ -119,10 +154,6 @@ def is_rule_condition(self):
def evaluate(self, context: "AutomationContext") -> "AutomationResult":
raise NotImplementedError()

def with_label(self, label: Optional[str]) -> "AutomationCondition":
"""Returns a copy of this AutomationCondition with a human-readable label."""
return copy(self, label=label)

def __and__(self, other: "AutomationCondition") -> "AndAutomationCondition":
from .operators import AndAutomationCondition

Expand All @@ -144,6 +175,11 @@ def __invert__(self) -> "NotAutomationCondition":

return NotAutomationCondition(operand=self)

@public
def with_label(self, label: Optional[str]) -> "AutomationCondition":
"""Returns a copy of this AutomationCondition with a human-readable label."""
return copy(self, label=label)

def since(self, reset_condition: "AutomationCondition") -> "SinceCondition":
"""Returns a AutomationCondition that is true if this condition has become true since the
last time the reference condition became true.
Expand All @@ -160,6 +196,7 @@ def newly_true(self) -> "NewlyTrueCondition":

return NewlyTrueCondition(operand=self)

@public
@experimental
@staticmethod
def any_deps_match(condition: "AutomationCondition") -> "AnyDepsCondition":
Expand All @@ -174,6 +211,7 @@ def any_deps_match(condition: "AutomationCondition") -> "AnyDepsCondition":

return AnyDepsCondition(operand=condition)

@public
@experimental
@staticmethod
def all_deps_match(condition: "AutomationCondition") -> "AllDepsCondition":
Expand All @@ -188,6 +226,7 @@ def all_deps_match(condition: "AutomationCondition") -> "AllDepsCondition":

return AllDepsCondition(operand=condition)

@public
@experimental
@staticmethod
def missing() -> "MissingAutomationCondition":
Expand All @@ -198,6 +237,7 @@ def missing() -> "MissingAutomationCondition":

return MissingAutomationCondition()

@public
@experimental
@staticmethod
def in_progress() -> "InProgressAutomationCondition":
Expand All @@ -206,6 +246,7 @@ def in_progress() -> "InProgressAutomationCondition":

return InProgressAutomationCondition()

@public
@experimental
@staticmethod
def failed() -> "FailedAutomationCondition":
Expand All @@ -214,6 +255,7 @@ def failed() -> "FailedAutomationCondition":

return FailedAutomationCondition()

@public
@experimental
@staticmethod
def in_latest_time_window(
Expand All @@ -232,6 +274,7 @@ def in_latest_time_window(

return InLatestTimeWindowCondition.from_lookback_delta(lookback_delta)

@public
@experimental
@staticmethod
def will_be_requested() -> "WillBeRequestedCondition":
Expand All @@ -240,6 +283,7 @@ def will_be_requested() -> "WillBeRequestedCondition":

return WillBeRequestedCondition()

@public
@experimental
@staticmethod
def newly_updated() -> "NewlyUpdatedCondition":
Expand All @@ -248,6 +292,7 @@ def newly_updated() -> "NewlyUpdatedCondition":

return NewlyUpdatedCondition()

@public
@experimental
@staticmethod
def newly_requested() -> "NewlyRequestedCondition":
Expand All @@ -256,6 +301,7 @@ def newly_requested() -> "NewlyRequestedCondition":

return NewlyRequestedCondition()

@public
@experimental
@staticmethod
def code_version_changed() -> "CodeVersionChangedCondition":
Expand All @@ -266,6 +312,7 @@ def code_version_changed() -> "CodeVersionChangedCondition":

return CodeVersionChangedCondition()

@public
@experimental
@staticmethod
def cron_tick_passed(
Expand All @@ -276,6 +323,7 @@ def cron_tick_passed(

return CronTickPassedCondition(cron_schedule=cron_schedule, cron_timezone=cron_timezone)

@public
@experimental
@staticmethod
def eager() -> "AutomationCondition":
Expand Down Expand Up @@ -316,6 +364,7 @@ def eager() -> "AutomationCondition":
& ~AutomationCondition.in_progress()
).with_label("eager")

@public
@experimental
@staticmethod
def on_cron(cron_schedule: str, cron_timezone: str = "UTC") -> "AutomationCondition":
Expand Down Expand Up @@ -347,6 +396,7 @@ def on_cron(cron_schedule: str, cron_timezone: str = "UTC") -> "AutomationCondit
& all_deps_updated_since_cron
).with_label(f"on cron {cron_label}")

@public
@experimental
@staticmethod
def any_downstream_conditions() -> "AnyDownstreamConditionsCondition":
Expand Down

0 comments on commit db0e2cf

Please sign in to comment.