Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DA] Add doc block to AutomationCondition, mark static constructors public #23649

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Binary file modified docs/content/api/modules.json.gz
Binary file not shown.
Binary file modified docs/content/api/searchindex.json.gz
Binary file not shown.
Binary file modified docs/content/api/sections.json.gz
Binary file not shown.
Binary file modified docs/next/public/objects.inv
Binary file not shown.
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from abc import ABC, abstractmethod
from typing import TYPE_CHECKING, Any, Mapping, NamedTuple, Optional, Sequence, Union

from dagster._annotations import experimental
from dagster._annotations import experimental, public
from dagster._core.asset_graph_view.asset_graph_view import AssetSlice, TemporalContext
from dagster._core.definitions.asset_key import AssetKey
from dagster._core.definitions.asset_subset import AssetSubset
Expand Down Expand Up @@ -49,6 +49,41 @@


class AutomationCondition(ABC):
"""An AutomationCondition represents a condition of an asset that impacts whether it should be
automatically executed. For example, you can have a condition which becomes true whenever the
code version of the asset is changed, or whenever an upstream dependency is updated.

.. code-block:: python

from dagster import AutomationCondition, asset

@asset(automation_condition=AutomationCondition.on_cron("0 0 * * *"))
def my_asset(): ...

AutomationConditions may be combined together into expressions using a variety of operators.

.. code-block:: python

from dagster import AssetSelection, AutomationCondition, asset

# any dependencies from the "important" group are missing
any_important_deps_missing = AutomationCondition.any_deps_match(
AutomationCondition.missing(),
).allow(AssetSelection.groups("important"))

# there is a new code version for this asset since the last time it was requested
new_code_version = AutomationCondition.code_version_changed().since(
AutomationCondition.newly_requested()
)

# there is a new code version and no important dependencies are missing
my_condition = new_code_version & ~any_important_deps_missing

@asset(automation_condition=my_condition)
def my_asset(): ...

"""

@property
def requires_cursor(self) -> bool:
return False
Expand Down Expand Up @@ -119,10 +154,6 @@ def is_rule_condition(self):
def evaluate(self, context: "AutomationContext") -> "AutomationResult":
raise NotImplementedError()

def with_label(self, label: Optional[str]) -> "AutomationCondition":
"""Returns a copy of this AutomationCondition with a human-readable label."""
return copy(self, label=label)

def __and__(self, other: "AutomationCondition") -> "AndAutomationCondition":
from .operators import AndAutomationCondition

Expand All @@ -144,6 +175,11 @@ def __invert__(self) -> "NotAutomationCondition":

return NotAutomationCondition(operand=self)

@public
def with_label(self, label: Optional[str]) -> "AutomationCondition":
"""Returns a copy of this AutomationCondition with a human-readable label."""
return copy(self, label=label)

def since(self, reset_condition: "AutomationCondition") -> "SinceCondition":
"""Returns a AutomationCondition that is true if this condition has become true since the
last time the reference condition became true.
Expand All @@ -160,6 +196,7 @@ def newly_true(self) -> "NewlyTrueCondition":

return NewlyTrueCondition(operand=self)

@public
@experimental
@staticmethod
def any_deps_match(condition: "AutomationCondition") -> "AnyDepsCondition":
Expand All @@ -174,6 +211,7 @@ def any_deps_match(condition: "AutomationCondition") -> "AnyDepsCondition":

return AnyDepsCondition(operand=condition)

@public
@experimental
@staticmethod
def all_deps_match(condition: "AutomationCondition") -> "AllDepsCondition":
Expand All @@ -188,6 +226,7 @@ def all_deps_match(condition: "AutomationCondition") -> "AllDepsCondition":

return AllDepsCondition(operand=condition)

@public
@experimental
@staticmethod
def missing() -> "MissingAutomationCondition":
Expand All @@ -198,6 +237,7 @@ def missing() -> "MissingAutomationCondition":

return MissingAutomationCondition()

@public
@experimental
@staticmethod
def in_progress() -> "InProgressAutomationCondition":
Expand All @@ -206,6 +246,7 @@ def in_progress() -> "InProgressAutomationCondition":

return InProgressAutomationCondition()

@public
@experimental
@staticmethod
def failed() -> "FailedAutomationCondition":
Expand All @@ -214,6 +255,7 @@ def failed() -> "FailedAutomationCondition":

return FailedAutomationCondition()

@public
@experimental
@staticmethod
def in_latest_time_window(
Expand All @@ -232,6 +274,7 @@ def in_latest_time_window(

return InLatestTimeWindowCondition.from_lookback_delta(lookback_delta)

@public
@experimental
@staticmethod
def will_be_requested() -> "WillBeRequestedCondition":
Expand All @@ -240,6 +283,7 @@ def will_be_requested() -> "WillBeRequestedCondition":

return WillBeRequestedCondition()

@public
@experimental
@staticmethod
def newly_updated() -> "NewlyUpdatedCondition":
Expand All @@ -248,6 +292,7 @@ def newly_updated() -> "NewlyUpdatedCondition":

return NewlyUpdatedCondition()

@public
@experimental
@staticmethod
def newly_requested() -> "NewlyRequestedCondition":
Expand All @@ -256,6 +301,7 @@ def newly_requested() -> "NewlyRequestedCondition":

return NewlyRequestedCondition()

@public
@experimental
@staticmethod
def code_version_changed() -> "CodeVersionChangedCondition":
Expand All @@ -266,6 +312,7 @@ def code_version_changed() -> "CodeVersionChangedCondition":

return CodeVersionChangedCondition()

@public
@experimental
@staticmethod
def cron_tick_passed(
Expand All @@ -276,6 +323,7 @@ def cron_tick_passed(

return CronTickPassedCondition(cron_schedule=cron_schedule, cron_timezone=cron_timezone)

@public
@experimental
@staticmethod
def eager() -> "AutomationCondition":
Expand Down Expand Up @@ -316,12 +364,13 @@ def eager() -> "AutomationCondition":
& ~AutomationCondition.in_progress()
).with_label("eager")

@public
@experimental
@staticmethod
def on_cron(cron_schedule: str, cron_timezone: str = "UTC") -> "AutomationCondition":
"""Returns a condition which will materialize asset partitions within the latest time window
on a given cron schedule, after their parents have been updated. For example, if the
cron_schedule is set to "0 0 * * *" (every day at midnight), then this rule will not become
cron_schedule is set to "`0 0 * * *`" (every day at midnight), then this rule will not become
true on a given day until all of its parents have been updated during that same day.

Specifically, this is a composite AutomationCondition which is true for an asset partition
Expand All @@ -347,6 +396,7 @@ def on_cron(cron_schedule: str, cron_timezone: str = "UTC") -> "AutomationCondit
& all_deps_updated_since_cron
).with_label(f"on cron {cron_label}")

@public
@experimental
@staticmethod
def any_downstream_conditions() -> "AnyDownstreamConditionsCondition":
Expand Down
Loading