Skip to content

Commit

Permalink
[1.9] Remove experimental warnings from AutomationCondition static co…
Browse files Browse the repository at this point in the history
…nstructors
  • Loading branch information
OwenKephart committed Oct 22, 2024
1 parent 4692bda commit 32592e7
Showing 1 changed file with 61 additions and 93 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,6 @@ def since_last_handled(self: "AutomationCondition") -> "BuiltinAutomationConditi
)

@public
@experimental
@staticmethod
def asset_matches(
key: "CoercibleToAssetKey", condition: "AutomationCondition[AssetKey]"
Expand All @@ -245,7 +244,6 @@ def asset_matches(
return EntityMatchesCondition(key=asset_key, operand=condition)

@public
@experimental
@staticmethod
def any_deps_match(condition: "AutomationCondition") -> "DepsAutomationCondition":
"""Returns an AutomationCondition that is true for a if at least one partition
Expand All @@ -260,7 +258,6 @@ def any_deps_match(condition: "AutomationCondition") -> "DepsAutomationCondition
return AnyDepsCondition(operand=condition)

@public
@experimental
@staticmethod
def all_deps_match(condition: "AutomationCondition") -> "DepsAutomationCondition":
"""Returns an AutomationCondition that is true for a if at least one partition
Expand All @@ -275,7 +272,6 @@ def all_deps_match(condition: "AutomationCondition") -> "DepsAutomationCondition
return AllDepsCondition(operand=condition)

@public
@experimental
@staticmethod
def any_checks_match(
condition: "AutomationCondition[AssetCheckKey]", blocking_only: bool = False
Expand All @@ -294,7 +290,6 @@ def any_checks_match(
return AnyChecksCondition(operand=condition, blocking_only=blocking_only)

@public
@experimental
@staticmethod
def all_checks_match(
condition: "AutomationCondition[AssetCheckKey]", blocking_only: bool = False
Expand All @@ -313,7 +308,6 @@ def all_checks_match(
return AllChecksCondition(operand=condition, blocking_only=blocking_only)

@public
@experimental
@staticmethod
def missing() -> "BuiltinAutomationCondition":
"""Returns an AutomationCondition that is true if the target has not been executed."""
Expand All @@ -324,7 +318,6 @@ def missing() -> "BuiltinAutomationCondition":
return MissingAutomationCondition()

@public
@experimental
@staticmethod
def run_in_progress() -> "BuiltinAutomationCondition":
"""Returns an AutomationCondition that is true if the target is part of an in-progress run."""
Expand All @@ -335,7 +328,6 @@ def run_in_progress() -> "BuiltinAutomationCondition":
return RunInProgressAutomationCondition()

@public
@experimental
@staticmethod
def backfill_in_progress() -> "BuiltinAutomationCondition":
"""Returns an AutomationCondition that is true if the target is part of an in-progress backfill."""
Expand All @@ -346,7 +338,6 @@ def backfill_in_progress() -> "BuiltinAutomationCondition":
return BackfillInProgressAutomationCondition()

@public
@experimental
@staticmethod
def execution_failed() -> "BuiltinAutomationCondition":
"""Returns an AutomationCondition that is true if the latest execution of the target failed."""
Expand All @@ -357,7 +348,6 @@ def execution_failed() -> "BuiltinAutomationCondition":
return ExecutionFailedAutomationCondition()

@public
@experimental
@staticmethod
def in_progress() -> "BuiltinAutomationCondition":
"""Returns an AutomationCondition that is true for an asset partition if it is part of an
Expand All @@ -368,7 +358,6 @@ def in_progress() -> "BuiltinAutomationCondition":
).with_label("in_progress")

@public
@experimental
@staticmethod
def check_passed() -> "BuiltinAutomationCondition[AssetCheckKey]":
"""Returns an AutomationCondition that is true for an asset check if it has evaluated against
Expand All @@ -379,7 +368,6 @@ def check_passed() -> "BuiltinAutomationCondition[AssetCheckKey]":
return CheckResultCondition(passed=True)

@public
@experimental
@staticmethod
def check_failed() -> "BuiltinAutomationCondition[AssetCheckKey]":
"""Returns an AutomationCondition that is true for an asset check if it has evaluated against
Expand All @@ -390,7 +378,6 @@ def check_failed() -> "BuiltinAutomationCondition[AssetCheckKey]":
return CheckResultCondition(passed=False)

@public
@experimental
@staticmethod
def initial_evaluation() -> "BuiltinAutomationCondition":
"""Returns an AutomationCondition that is true on the first evaluation of the expression."""
Expand All @@ -401,7 +388,6 @@ def initial_evaluation() -> "BuiltinAutomationCondition":
return InitialEvaluationCondition()

@public
@experimental
@staticmethod
def in_latest_time_window(
lookback_delta: Optional[datetime.timedelta] = None,
Expand All @@ -422,7 +408,6 @@ def in_latest_time_window(
return InLatestTimeWindowCondition.from_lookback_delta(lookback_delta)

@public
@experimental
@staticmethod
def will_be_requested() -> "BuiltinAutomationCondition":
"""Returns an AutomationCondition that is true if the target will be requested this tick."""
Expand All @@ -433,15 +418,13 @@ def will_be_requested() -> "BuiltinAutomationCondition":
return WillBeRequestedCondition()

@public
@experimental
@staticmethod
def newly_updated() -> "BuiltinAutomationCondition":
"""Returns an AutomationCondition that is true if the target has been updated since the previous tick."""
from dagster._core.definitions.declarative_automation.operands import NewlyUpdatedCondition

return NewlyUpdatedCondition()

@experimental
@staticmethod
def executed_with_root_target() -> "BuiltinAutomationCondition":
"""Returns an AutomationCondition that is true if the latest run that updated the target also executed
Expand All @@ -454,7 +437,6 @@ def executed_with_root_target() -> "BuiltinAutomationCondition":
return LatestRunExecutedWithRootTargetCondition()

@public
@experimental
@staticmethod
def newly_requested() -> "BuiltinAutomationCondition":
"""Returns an AutomationCondition that is true if the target was requested on the previous tick."""
Expand All @@ -465,7 +447,6 @@ def newly_requested() -> "BuiltinAutomationCondition":
return NewlyRequestedCondition()

@public
@experimental
@staticmethod
def code_version_changed() -> "BuiltinAutomationCondition[AssetKey]":
"""Returns an AutomationCondition that is true if the target's code version has been changed
Expand All @@ -478,7 +459,6 @@ def code_version_changed() -> "BuiltinAutomationCondition[AssetKey]":
return CodeVersionChangedCondition()

@public
@experimental
@staticmethod
def cron_tick_passed(
cron_schedule: str, cron_timezone: str = "UTC"
Expand All @@ -490,14 +470,13 @@ def cron_tick_passed(

return CronTickPassedCondition(cron_schedule=cron_schedule, cron_timezone=cron_timezone)

@experimental
@public
@staticmethod
def newly_missing() -> "BuiltinAutomationCondition":
"""Returns an AutomationCondition that is true on the tick that the target becomes missing."""
with disable_dagster_warnings():
return AutomationCondition.missing().newly_true().with_label("newly_missing")
return AutomationCondition.missing().newly_true().with_label("newly_missing")

@experimental
@public
@staticmethod
def any_deps_updated() -> "DepsAutomationCondition":
"""Returns an AutomationCondition that is true if the target has at least one dependency
Expand All @@ -506,41 +485,38 @@ def any_deps_updated() -> "DepsAutomationCondition":
Will ignore parent updates if the run that updated the parent also plans to update
the asset or check that this condition is applied to.
"""
with disable_dagster_warnings():
return AutomationCondition.any_deps_match(
(
AutomationCondition.newly_updated()
# executed_with_root_target is fairly expensive on a per-partition basis,
# but newly_updated is bounded in the number of partitions that might be
# updated on a single tick
& ~AutomationCondition.executed_with_root_target()
).with_label("newly_updated_without_root")
| AutomationCondition.will_be_requested()
).with_label("any_deps_updated")
return AutomationCondition.any_deps_match(
(
AutomationCondition.newly_updated()
# executed_with_root_target is fairly expensive on a per-partition basis,
# but newly_updated is bounded in the number of partitions that might be
# updated on a single tick
& ~AutomationCondition.executed_with_root_target()
).with_label("newly_updated_without_root")
| AutomationCondition.will_be_requested()
).with_label("any_deps_updated")

@experimental
@public
@staticmethod
def any_deps_missing() -> "DepsAutomationCondition":
"""Returns an AutomationCondition that is true if the target has at least one dependency
that is missing, and will not be requested on this tick.
"""
with disable_dagster_warnings():
return AutomationCondition.any_deps_match(
AutomationCondition.missing() & ~AutomationCondition.will_be_requested()
).with_label("any_deps_missing")
return AutomationCondition.any_deps_match(
AutomationCondition.missing() & ~AutomationCondition.will_be_requested()
).with_label("any_deps_missing")

@experimental
@public
@staticmethod
def any_deps_in_progress() -> "DepsAutomationCondition":
"""Returns an AutomationCondition that is true if the target has at least one dependency
that is in progress.
"""
with disable_dagster_warnings():
return AutomationCondition.any_deps_match(AutomationCondition.in_progress()).with_label(
"any_deps_in_progress"
)
return AutomationCondition.any_deps_match(AutomationCondition.in_progress()).with_label(
"any_deps_in_progress"
)

@experimental
@public
@staticmethod
def all_deps_blocking_checks_passed() -> "DepsAutomationCondition":
"""Returns an AutomationCondition that is true for any partition where all upstream
Expand All @@ -551,32 +527,29 @@ def all_deps_blocking_checks_passed() -> "DepsAutomationCondition":
they fail within the run, the run machinery will prevent the child from being
materialized.
"""
with disable_dagster_warnings():
return AutomationCondition.all_deps_match(
AutomationCondition.all_checks_match(
AutomationCondition.check_passed() | AutomationCondition.will_be_requested(),
blocking_only=True,
).with_label("all_blocking_checks_passed")
).with_label("all_deps_blocking_checks_passed")
return AutomationCondition.all_deps_match(
AutomationCondition.all_checks_match(
AutomationCondition.check_passed() | AutomationCondition.will_be_requested(),
blocking_only=True,
).with_label("all_blocking_checks_passed")
).with_label("all_deps_blocking_checks_passed")

@experimental
@public
@staticmethod
def all_deps_updated_since_cron(
cron_schedule: str, cron_timezone: str = "UTC"
) -> "DepsAutomationCondition":
"""Returns an AutomatonCondition that is true if all of the target's dependencies have
updated since the latest tick of the provided cron schedule.
"""
with disable_dagster_warnings():
return AutomationCondition.all_deps_match(
AutomationCondition.newly_updated().since(
AutomationCondition.cron_tick_passed(cron_schedule, cron_timezone)
)
| AutomationCondition.will_be_requested()
).with_label(f"all_deps_updated_since_cron({cron_schedule}, {cron_timezone})")
return AutomationCondition.all_deps_match(
AutomationCondition.newly_updated().since(
AutomationCondition.cron_tick_passed(cron_schedule, cron_timezone)
)
| AutomationCondition.will_be_requested()
).with_label(f"all_deps_updated_since_cron({cron_schedule}, {cron_timezone})")

@public
@experimental
@staticmethod
def eager() -> "AndAutomationCondition":
"""Returns an AutomationCondition which will cause a target to be executed if any of
Expand All @@ -588,19 +561,17 @@ def eager() -> "AndAutomationCondition":
For time partitioned assets, only the latest time partition will be considered.
"""
with disable_dagster_warnings():
return (
AutomationCondition.in_latest_time_window()
& (
AutomationCondition.newly_missing() | AutomationCondition.any_deps_updated()
).since_last_handled()
& ~AutomationCondition.any_deps_missing()
& ~AutomationCondition.any_deps_in_progress()
& ~AutomationCondition.in_progress()
).with_label("eager")
return (
AutomationCondition.in_latest_time_window()
& (
AutomationCondition.newly_missing() | AutomationCondition.any_deps_updated()
).since_last_handled()
& ~AutomationCondition.any_deps_missing()
& ~AutomationCondition.any_deps_in_progress()
& ~AutomationCondition.in_progress()
).with_label("eager")

@public
@experimental
@staticmethod
def on_cron(cron_schedule: str, cron_timezone: str = "UTC") -> "AndAutomationCondition":
"""Returns an AutomationCondition which will cause a target to be executed on a given
Expand All @@ -609,17 +580,15 @@ def on_cron(cron_schedule: str, cron_timezone: str = "UTC") -> "AndAutomationCon
For time partitioned assets, only the latest time partition will be considered.
"""
with disable_dagster_warnings():
return (
AutomationCondition.in_latest_time_window()
& AutomationCondition.cron_tick_passed(
cron_schedule, cron_timezone
).since_last_handled()
& AutomationCondition.all_deps_updated_since_cron(cron_schedule, cron_timezone)
).with_label(f"on_cron({cron_schedule}, {cron_timezone})")
return (
AutomationCondition.in_latest_time_window()
& AutomationCondition.cron_tick_passed(
cron_schedule, cron_timezone
).since_last_handled()
& AutomationCondition.all_deps_updated_since_cron(cron_schedule, cron_timezone)
).with_label(f"on_cron({cron_schedule}, {cron_timezone})")

@public
@experimental
@staticmethod
def on_missing() -> "AndAutomationCondition":
"""Returns an AutomationCondition which will execute partitions of the target that
Expand All @@ -629,17 +598,16 @@ def on_missing() -> "AndAutomationCondition":
For time partitioned assets, only the latest time partition will be considered.
"""
with disable_dagster_warnings():
return (
AutomationCondition.in_latest_time_window()
& (
AutomationCondition.missing()
.newly_true()
.since_last_handled()
.with_label("missing_since_last_handled")
)
& ~AutomationCondition.any_deps_missing()
).with_label("on_missing")
return (
AutomationCondition.in_latest_time_window()
& (
AutomationCondition.missing()
.newly_true()
.since_last_handled()
.with_label("missing_since_last_handled")
)
& ~AutomationCondition.any_deps_missing()
).with_label("on_missing")

@public
@experimental
Expand Down

0 comments on commit 32592e7

Please sign in to comment.