Skip to content

Commit

Permalink
Simply return types of static constructors
Browse files Browse the repository at this point in the history
  • Loading branch information
OwenKephart committed Oct 10, 2024
1 parent 4deb33e commit 5459582
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 62 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,30 +37,9 @@
from dagster._core.definitions.declarative_automation.automation_context import (
AutomationContext,
)
from dagster._core.definitions.declarative_automation.operands import (
CodeVersionChangedCondition,
CronTickPassedCondition,
ExecutionFailedAutomationCondition,
InitialEvaluationCondition,
InLatestTimeWindowCondition,
InProgressAutomationCondition,
MissingAutomationCondition,
NewlyRequestedCondition,
NewlyUpdatedCondition,
WillBeRequestedCondition,
)
from dagster._core.definitions.declarative_automation.operators import (
AllChecksCondition,
AllDepsCondition,
AndAutomationCondition,
AnyChecksCondition,
AnyDepsCondition,
AnyDownstreamConditionsCondition,
EntityMatchesCondition,
NewlyTrueCondition,
NotAutomationCondition,
OrAutomationCondition,
SinceCondition,
from dagster._core.definitions.declarative_automation.operators import AndAutomationCondition
from dagster._core.definitions.declarative_automation.operators.dep_operators import (
DepsAutomationCondition,
)


Expand Down Expand Up @@ -204,15 +183,15 @@ def __and__(

def __or__(
self, other: "AutomationCondition[T_EntityKey]"
) -> "OrAutomationCondition[T_EntityKey]":
) -> "BuiltinAutomationCondition[T_EntityKey]":
from dagster._core.definitions.declarative_automation.operators import OrAutomationCondition

# group OrAutomationConditions together
if isinstance(self, OrAutomationCondition):
return OrAutomationCondition(operands=[*self.operands, other])
return OrAutomationCondition(operands=[self, other])

def __invert__(self) -> "NotAutomationCondition[T_EntityKey]":
def __invert__(self) -> "BuiltinAutomationCondition[T_EntityKey]":
from dagster._core.definitions.declarative_automation.operators import (
NotAutomationCondition,
)
Expand All @@ -221,23 +200,23 @@ def __invert__(self) -> "NotAutomationCondition[T_EntityKey]":

def since(
self, reset_condition: "AutomationCondition[T_EntityKey]"
) -> "SinceCondition[T_EntityKey]":
) -> "BuiltinAutomationCondition[T_EntityKey]":
"""Returns an AutomationCondition that is true if this condition has become true since the
last time the reference condition became true.
"""
from dagster._core.definitions.declarative_automation.operators import SinceCondition

return SinceCondition(trigger_condition=self, reset_condition=reset_condition)

def newly_true(self) -> "NewlyTrueCondition[T_EntityKey]":
def newly_true(self) -> "BuiltinAutomationCondition[T_EntityKey]":
"""Returns an AutomationCondition that is true only on the tick that this condition goes
from false to true for a given asset partition.
"""
from dagster._core.definitions.declarative_automation.operators import NewlyTrueCondition

return NewlyTrueCondition(operand=self)

def since_last_handled(self: "AutomationCondition") -> "SinceCondition":
def since_last_handled(self: "AutomationCondition") -> "BuiltinAutomationCondition":
"""Returns an AutomationCondition that is true if this condition has become true since the
asset partition was last requested or updated, and since the last time this entity's
condition was modified.
Expand All @@ -256,7 +235,7 @@ def since_last_handled(self: "AutomationCondition") -> "SinceCondition":
@staticmethod
def asset_matches(
key: "CoercibleToAssetKey", condition: "AutomationCondition[AssetKey]"
) -> "EntityMatchesCondition":
) -> "BuiltinAutomationCondition":
"""Returns an AutomationCondition that is true if this condition is true for the given entity key."""
from dagster._core.definitions.declarative_automation.operators import (
EntityMatchesCondition,
Expand All @@ -268,7 +247,7 @@ def asset_matches(
@public
@experimental
@staticmethod
def any_deps_match(condition: "AutomationCondition") -> "AnyDepsCondition":
def any_deps_match(condition: "AutomationCondition") -> "DepsAutomationCondition":
"""Returns an AutomationCondition that is true for an asset partition if at least one partition
of any of its dependencies evaluate to True for the given condition.
Expand All @@ -283,7 +262,7 @@ def any_deps_match(condition: "AutomationCondition") -> "AnyDepsCondition":
@public
@experimental
@staticmethod
def all_deps_match(condition: "AutomationCondition") -> "AllDepsCondition":
def all_deps_match(condition: "AutomationCondition") -> "DepsAutomationCondition":
"""Returns an AutomationCondition that is true for an asset partition if at least one partition
of all of its dependencies evaluate to True for the given condition.
Expand All @@ -300,7 +279,7 @@ def all_deps_match(condition: "AutomationCondition") -> "AllDepsCondition":
@staticmethod
def any_checks_match(
condition: "AutomationCondition[AssetCheckKey]", blocking_only: bool = False
) -> "AnyChecksCondition":
) -> "BuiltinAutomationCondition[AssetKey]":
"""Returns an AutomationCondition that is true for an asset partition if at least one of its
checks evaluate to True for the given condition.
Expand All @@ -319,7 +298,7 @@ def any_checks_match(
@staticmethod
def all_checks_match(
condition: "AutomationCondition[AssetCheckKey]", blocking_only: bool = False
) -> "AllChecksCondition":
) -> "BuiltinAutomationCondition[AssetKey]":
"""Returns an AutomationCondition that is true for an asset partition if all of its checks
evaluate to True for the given condition.
Expand All @@ -336,7 +315,7 @@ def all_checks_match(
@public
@experimental
@staticmethod
def missing() -> "MissingAutomationCondition":
def missing() -> "BuiltinAutomationCondition":
"""Returns an AutomationCondition that is true for an asset partition if it has never been
materialized or observed.
"""
Expand All @@ -349,7 +328,7 @@ def missing() -> "MissingAutomationCondition":
@public
@experimental
@staticmethod
def in_progress() -> "InProgressAutomationCondition":
def in_progress() -> "BuiltinAutomationCondition":
"""Returns an AutomationCondition that is true for an asset partition if it is part of an
in-progress run.
"""
Expand All @@ -362,7 +341,7 @@ def in_progress() -> "InProgressAutomationCondition":
@public
@experimental
@staticmethod
def execution_failed() -> "ExecutionFailedAutomationCondition":
def execution_failed() -> "BuiltinAutomationCondition":
"""Returns an AutomationCondition that is true for an asset partition if its latest execution failed."""
from dagster._core.definitions.declarative_automation.operands import (
ExecutionFailedAutomationCondition,
Expand All @@ -373,7 +352,7 @@ def execution_failed() -> "ExecutionFailedAutomationCondition":
@public
@experimental
@staticmethod
def check_passed() -> "AutomationCondition[AssetCheckKey]":
def check_passed() -> "BuiltinAutomationCondition[AssetCheckKey]":
"""Returns an AutomationCondition that is true for an asset check if it has evaluated against
the latest materialization of an asset and passed.
"""
Expand All @@ -384,7 +363,7 @@ def check_passed() -> "AutomationCondition[AssetCheckKey]":
@public
@experimental
@staticmethod
def check_failed() -> "AutomationCondition[AssetCheckKey]":
def check_failed() -> "BuiltinAutomationCondition[AssetCheckKey]":
"""Returns an AutomationCondition that is true for an asset check if it has evaluated against
the latest materialization of an asset and failed.
"""
Expand All @@ -395,7 +374,7 @@ def check_failed() -> "AutomationCondition[AssetCheckKey]":
@public
@experimental
@staticmethod
def initial_evaluation() -> "InitialEvaluationCondition":
def initial_evaluation() -> "BuiltinAutomationCondition":
"""Returns an AutomationCondition that is true on the first evaluation of the expression."""
from dagster._core.definitions.declarative_automation.operands import (
InitialEvaluationCondition,
Expand All @@ -408,7 +387,7 @@ def initial_evaluation() -> "InitialEvaluationCondition":
@staticmethod
def in_latest_time_window(
lookback_delta: Optional[datetime.timedelta] = None,
) -> "InLatestTimeWindowCondition":
) -> "BuiltinAutomationCondition":
"""Returns an AutomationCondition that is true for an asset partition when it is within the latest
time window.
Expand All @@ -427,7 +406,7 @@ def in_latest_time_window(
@public
@experimental
@staticmethod
def will_be_requested() -> "WillBeRequestedCondition":
def will_be_requested() -> "BuiltinAutomationCondition":
"""Returns an AutomationCondition that is true for an asset partition if it will be requested this tick."""
from dagster._core.definitions.declarative_automation.operands import (
WillBeRequestedCondition,
Expand All @@ -438,7 +417,7 @@ def will_be_requested() -> "WillBeRequestedCondition":
@public
@experimental
@staticmethod
def newly_updated() -> "NewlyUpdatedCondition":
def newly_updated() -> "BuiltinAutomationCondition":
"""Returns an AutomationCondition that is true for an asset partition if it has been updated since the previous tick."""
from dagster._core.definitions.declarative_automation.operands import NewlyUpdatedCondition

Expand All @@ -447,7 +426,7 @@ def newly_updated() -> "NewlyUpdatedCondition":
@public
@experimental
@staticmethod
def newly_requested() -> "NewlyRequestedCondition":
def newly_requested() -> "BuiltinAutomationCondition":
"""Returns an AutomationCondition that is true for an asset partition if it was requested on the previous tick."""
from dagster._core.definitions.declarative_automation.operands import (
NewlyRequestedCondition,
Expand All @@ -458,7 +437,7 @@ def newly_requested() -> "NewlyRequestedCondition":
@public
@experimental
@staticmethod
def code_version_changed() -> "CodeVersionChangedCondition":
def code_version_changed() -> "BuiltinAutomationCondition[AssetKey]":
"""Returns an AutomationCondition that is true for an asset partition if its asset's code
version has been changed since the previous tick.
"""
Expand All @@ -473,7 +452,7 @@ def code_version_changed() -> "CodeVersionChangedCondition":
@staticmethod
def cron_tick_passed(
cron_schedule: str, cron_timezone: str = "UTC"
) -> "CronTickPassedCondition":
) -> "BuiltinAutomationCondition":
"""Returns an AutomationCondition that is true for all asset partitions whenever a cron tick of the provided schedule is passed."""
from dagster._core.definitions.declarative_automation.operands import (
CronTickPassedCondition,
Expand All @@ -485,14 +464,14 @@ def cron_tick_passed(

@experimental
@staticmethod
def newly_missing() -> "AutomationCondition":
def newly_missing() -> "BuiltinAutomationCondition":
"""Returns an AutomationCondition that is true on the tick that an asset partition becomes missing."""
with disable_dagster_warnings():
return AutomationCondition.missing().newly_true().with_label("newly_missing")

@experimental
@staticmethod
def any_deps_updated() -> "AnyDepsCondition":
def any_deps_updated() -> "DepsAutomationCondition":
"""Returns an AutomationCondition that is true for any asset partition with at least one
dependency that has updated since the previous tick, or will be requested on this tick.
"""
Expand All @@ -503,7 +482,7 @@ def any_deps_updated() -> "AnyDepsCondition":

@experimental
@staticmethod
def any_deps_missing() -> "AnyDepsCondition":
def any_deps_missing() -> "DepsAutomationCondition":
"""Returns an AutomationCondition that is true for any asset partition with at least one
dependency that is missing, and will not be requested on this tick.
"""
Expand All @@ -514,7 +493,7 @@ def any_deps_missing() -> "AnyDepsCondition":

@experimental
@staticmethod
def any_deps_in_progress() -> "AnyDepsCondition":
def any_deps_in_progress() -> "DepsAutomationCondition":
"""Returns an AutomationCondition that is true for any asset partition with at least one
dependency that is in progress.
"""
Expand All @@ -525,7 +504,7 @@ def any_deps_in_progress() -> "AnyDepsCondition":

@experimental
@staticmethod
def all_deps_blocking_checks_passed() -> "AllDepsCondition":
def all_deps_blocking_checks_passed() -> "DepsAutomationCondition":
"""Returns an AutomationCondition that is true for any partition where all upstream
blocking checks have passed, or will be requested.
"""
Expand All @@ -541,7 +520,7 @@ def all_deps_blocking_checks_passed() -> "AllDepsCondition":
@staticmethod
def all_deps_updated_since_cron(
cron_schedule: str, cron_timezone: str = "UTC"
) -> "AllDepsCondition":
) -> "DepsAutomationCondition":
"""Returns an AutomatonCondition that is true for any asset partition where all of its
dependencies have updated since the latest tick of the provided cron schedule.
"""
Expand Down Expand Up @@ -622,7 +601,7 @@ def on_missing() -> "AndAutomationCondition":
@public
@experimental
@staticmethod
def any_downstream_conditions() -> "AnyDownstreamConditionsCondition":
def any_downstream_conditions() -> "BuiltinAutomationCondition":
"""Returns an AutomationCondition which represents the union of all distinct downstream conditions."""
from dagster._core.definitions.declarative_automation.operators import (
AnyDownstreamConditionsCondition,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,8 @@
from dagster._serdes.serdes import whitelist_for_serdes


@whitelist_for_serdes
@record
class ChecksCondition(BuiltinAutomationCondition[AssetKey]):
class ChecksAutomationCondition(BuiltinAutomationCondition[AssetKey]):
operand: AutomationCondition[AssetCheckKey]

blocking_only: bool = False
Expand Down Expand Up @@ -52,7 +51,7 @@ def _get_check_keys(

@whitelist_for_serdes
@record
class AnyChecksCondition(ChecksCondition):
class AnyChecksCondition(ChecksAutomationCondition):
@property
def base_description(self) -> str:
return "Any"
Expand Down Expand Up @@ -85,7 +84,7 @@ def evaluate(self, context: AutomationContext[AssetKey]) -> AutomationResult[Ass

@whitelist_for_serdes
@record
class AllChecksCondition(ChecksCondition):
class AllChecksCondition(ChecksAutomationCondition):
@property
def base_description(self) -> str:
return "All"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ def evaluate(self, context: AutomationContext[T_EntityKey]) -> AutomationResult[


@record
class DepCondition(BuiltinAutomationCondition[T_EntityKey]):
class DepsAutomationCondition(BuiltinAutomationCondition[T_EntityKey]):
operand: AutomationCondition

# Should be AssetSelection, but this causes circular reference issues
Expand All @@ -67,7 +67,7 @@ def description(self) -> str:
def requires_cursor(self) -> bool:
return False

def allow(self, selection: "AssetSelection") -> "DepCondition":
def allow(self, selection: "AssetSelection") -> "DepsAutomationCondition":
"""Returns a copy of this condition that will only consider dependencies within the provided
AssetSelection.
"""
Expand All @@ -79,7 +79,7 @@ def allow(self, selection: "AssetSelection") -> "DepCondition":
)
return copy(self, allow_selection=allow_selection)

def ignore(self, selection: "AssetSelection") -> "DepCondition":
def ignore(self, selection: "AssetSelection") -> "DepsAutomationCondition":
"""Returns a copy of this condition that will ignore dependencies within the provided
AssetSelection.
"""
Expand All @@ -103,7 +103,7 @@ def _get_dep_keys(


@whitelist_for_serdes
class AnyDepsCondition(DepCondition[T_EntityKey]):
class AnyDepsCondition(DepsAutomationCondition[T_EntityKey]):
@property
def base_description(self) -> str:
return "Any"
Expand Down Expand Up @@ -133,7 +133,7 @@ def evaluate(self, context: AutomationContext[T_EntityKey]) -> AutomationResult[


@whitelist_for_serdes
class AllDepsCondition(DepCondition[T_EntityKey]):
class AllDepsCondition(DepsAutomationCondition[T_EntityKey]):
@property
def base_description(self) -> str:
return "All"
Expand Down

0 comments on commit 5459582

Please sign in to comment.