From c805a1caafcab9ff53cb7fa8f5eee44cd959fdf7 Mon Sep 17 00:00:00 2001 From: Owen Kephart Date: Thu, 10 Oct 2024 09:14:47 -0700 Subject: [PATCH] Simply return types of static constructors --- .../automation_condition.py | 85 +++++++------------ .../operators/check_operators.py | 7 +- .../operators/dep_operators.py | 10 +-- 3 files changed, 40 insertions(+), 62 deletions(-) diff --git a/python_modules/dagster/dagster/_core/definitions/declarative_automation/automation_condition.py b/python_modules/dagster/dagster/_core/definitions/declarative_automation/automation_condition.py index 76de63350e8c2..db0983fb9d1bc 100644 --- a/python_modules/dagster/dagster/_core/definitions/declarative_automation/automation_condition.py +++ b/python_modules/dagster/dagster/_core/definitions/declarative_automation/automation_condition.py @@ -37,30 +37,9 @@ from dagster._core.definitions.declarative_automation.automation_context import ( AutomationContext, ) - from dagster._core.definitions.declarative_automation.operands import ( - CodeVersionChangedCondition, - CronTickPassedCondition, - ExecutionFailedAutomationCondition, - InitialEvaluationCondition, - InLatestTimeWindowCondition, - InProgressAutomationCondition, - MissingAutomationCondition, - NewlyRequestedCondition, - NewlyUpdatedCondition, - WillBeRequestedCondition, - ) - from dagster._core.definitions.declarative_automation.operators import ( - AllChecksCondition, - AllDepsCondition, - AndAutomationCondition, - AnyChecksCondition, - AnyDepsCondition, - AnyDownstreamConditionsCondition, - EntityMatchesCondition, - NewlyTrueCondition, - NotAutomationCondition, - OrAutomationCondition, - SinceCondition, + from dagster._core.definitions.declarative_automation.operators import AndAutomationCondition + from dagster._core.definitions.declarative_automation.operators.dep_operators import ( + DepsAutomationCondition, ) @@ -204,7 +183,7 @@ def __and__( def __or__( self, other: "AutomationCondition[T_EntityKey]" - ) -> "OrAutomationCondition[T_EntityKey]": + ) -> "BuiltinAutomationCondition[T_EntityKey]": from dagster._core.definitions.declarative_automation.operators import OrAutomationCondition # group OrAutomationConditions together @@ -212,7 +191,7 @@ def __or__( return OrAutomationCondition(operands=[*self.operands, other]) return OrAutomationCondition(operands=[self, other]) - def __invert__(self) -> "NotAutomationCondition[T_EntityKey]": + def __invert__(self) -> "BuiltinAutomationCondition[T_EntityKey]": from dagster._core.definitions.declarative_automation.operators import ( NotAutomationCondition, ) @@ -221,7 +200,7 @@ def __invert__(self) -> "NotAutomationCondition[T_EntityKey]": def since( self, reset_condition: "AutomationCondition[T_EntityKey]" - ) -> "SinceCondition[T_EntityKey]": + ) -> "BuiltinAutomationCondition[T_EntityKey]": """Returns an AutomationCondition that is true if this condition has become true since the last time the reference condition became true. """ @@ -229,7 +208,7 @@ def since( return SinceCondition(trigger_condition=self, reset_condition=reset_condition) - def newly_true(self) -> "NewlyTrueCondition[T_EntityKey]": + def newly_true(self) -> "BuiltinAutomationCondition[T_EntityKey]": """Returns an AutomationCondition that is true only on the tick that this condition goes from false to true for a given target. """ @@ -237,7 +216,7 @@ def newly_true(self) -> "NewlyTrueCondition[T_EntityKey]": return NewlyTrueCondition(operand=self) - def since_last_handled(self: "AutomationCondition") -> "SinceCondition": + def since_last_handled(self: "AutomationCondition") -> "BuiltinAutomationCondition": """Returns an AutomationCondition that is true if this condition has become true since the target was last requested or updated, and since the last time this target's condition was modified. """ @@ -255,7 +234,7 @@ def since_last_handled(self: "AutomationCondition") -> "SinceCondition": @staticmethod def asset_matches( key: "CoercibleToAssetKey", condition: "AutomationCondition[AssetKey]" - ) -> "EntityMatchesCondition": + ) -> "BuiltinAutomationCondition": """Returns an AutomationCondition that is true if this condition is true for the given key.""" from dagster._core.definitions.declarative_automation.operators import ( EntityMatchesCondition, @@ -267,7 +246,7 @@ def asset_matches( @public @experimental @staticmethod - def any_deps_match(condition: "AutomationCondition") -> "AnyDepsCondition": + def any_deps_match(condition: "AutomationCondition") -> "DepsAutomationCondition": """Returns an AutomationCondition that is true for a if at least one partition of the any of the target's dependencies evaluate to True for the given condition. @@ -282,7 +261,7 @@ def any_deps_match(condition: "AutomationCondition") -> "AnyDepsCondition": @public @experimental @staticmethod - def all_deps_match(condition: "AutomationCondition") -> "AllDepsCondition": + def all_deps_match(condition: "AutomationCondition") -> "DepsAutomationCondition": """Returns an AutomationCondition that is true for a if at least one partition of the all of the target's dependencies evaluate to True for the given condition. @@ -299,7 +278,7 @@ def all_deps_match(condition: "AutomationCondition") -> "AllDepsCondition": @staticmethod def any_checks_match( condition: "AutomationCondition[AssetCheckKey]", blocking_only: bool = False - ) -> "AnyChecksCondition": + ) -> "BuiltinAutomationCondition": """Returns an AutomationCondition that is true for if at least one of the target's checks evaluate to True for the given condition. @@ -318,7 +297,7 @@ def any_checks_match( @staticmethod def all_checks_match( condition: "AutomationCondition[AssetCheckKey]", blocking_only: bool = False - ) -> "AllChecksCondition": + ) -> "BuiltinAutomationCondition[AssetKey]": """Returns an AutomationCondition that is true for an asset partition if all of its checks evaluate to True for the given condition. @@ -335,7 +314,7 @@ def all_checks_match( @public @experimental @staticmethod - def missing() -> "MissingAutomationCondition": + def missing() -> "BuiltinAutomationCondition": """Returns an AutomationCondition that is true if the target has not been executed.""" from dagster._core.definitions.declarative_automation.operands import ( MissingAutomationCondition, @@ -346,7 +325,7 @@ def missing() -> "MissingAutomationCondition": @public @experimental @staticmethod - def in_progress() -> "InProgressAutomationCondition": + def in_progress() -> "BuiltinAutomationCondition": """Returns an AutomationCondition that is true if the target is part of an in-progress run.""" from dagster._core.definitions.declarative_automation.operands import ( InProgressAutomationCondition, @@ -357,7 +336,7 @@ def in_progress() -> "InProgressAutomationCondition": @public @experimental @staticmethod - def execution_failed() -> "ExecutionFailedAutomationCondition": + def execution_failed() -> "BuiltinAutomationCondition": """Returns an AutomationCondition that is true if the latest execution of the target failed.""" from dagster._core.definitions.declarative_automation.operands import ( ExecutionFailedAutomationCondition, @@ -368,7 +347,7 @@ def execution_failed() -> "ExecutionFailedAutomationCondition": @public @experimental @staticmethod - def check_passed() -> "AutomationCondition[AssetCheckKey]": + def check_passed() -> "BuiltinAutomationCondition[AssetCheckKey]": """Returns an AutomationCondition that is true for an asset check if it has evaluated against the latest materialization of an asset and passed. """ @@ -379,7 +358,7 @@ def check_passed() -> "AutomationCondition[AssetCheckKey]": @public @experimental @staticmethod - def check_failed() -> "AutomationCondition[AssetCheckKey]": + def check_failed() -> "BuiltinAutomationCondition[AssetCheckKey]": """Returns an AutomationCondition that is true for an asset check if it has evaluated against the latest materialization of an asset and failed. """ @@ -390,7 +369,7 @@ def check_failed() -> "AutomationCondition[AssetCheckKey]": @public @experimental @staticmethod - def initial_evaluation() -> "InitialEvaluationCondition": + def initial_evaluation() -> "BuiltinAutomationCondition": """Returns an AutomationCondition that is true on the first evaluation of the expression.""" from dagster._core.definitions.declarative_automation.operands import ( InitialEvaluationCondition, @@ -403,7 +382,7 @@ def initial_evaluation() -> "InitialEvaluationCondition": @staticmethod def in_latest_time_window( lookback_delta: Optional[datetime.timedelta] = None, - ) -> "InLatestTimeWindowCondition": + ) -> "BuiltinAutomationCondition": """Returns an AutomationCondition that is true when the target it is within the latest time window. @@ -422,7 +401,7 @@ def in_latest_time_window( @public @experimental @staticmethod - def will_be_requested() -> "WillBeRequestedCondition": + def will_be_requested() -> "BuiltinAutomationCondition": """Returns an AutomationCondition that is true if the target will be requested this tick.""" from dagster._core.definitions.declarative_automation.operands import ( WillBeRequestedCondition, @@ -433,7 +412,7 @@ def will_be_requested() -> "WillBeRequestedCondition": @public @experimental @staticmethod - def newly_updated() -> "NewlyUpdatedCondition": + def newly_updated() -> "BuiltinAutomationCondition": """Returns an AutomationCondition that is true if the target has been updated since the previous tick.""" from dagster._core.definitions.declarative_automation.operands import NewlyUpdatedCondition @@ -442,7 +421,7 @@ def newly_updated() -> "NewlyUpdatedCondition": @public @experimental @staticmethod - def newly_requested() -> "NewlyRequestedCondition": + def newly_requested() -> "BuiltinAutomationCondition": """Returns an AutomationCondition that is true if the target was requested on the previous tick.""" from dagster._core.definitions.declarative_automation.operands import ( NewlyRequestedCondition, @@ -453,7 +432,7 @@ def newly_requested() -> "NewlyRequestedCondition": @public @experimental @staticmethod - def code_version_changed() -> "CodeVersionChangedCondition": + def code_version_changed() -> "BuiltinAutomationCondition[AssetKey]": """Returns an AutomationCondition that is true if the target's code version has been changed since the previous tick. """ @@ -468,7 +447,7 @@ def code_version_changed() -> "CodeVersionChangedCondition": @staticmethod def cron_tick_passed( cron_schedule: str, cron_timezone: str = "UTC" - ) -> "CronTickPassedCondition": + ) -> "BuiltinAutomationCondition": """Returns an AutomationCondition that is whenever a cron tick of the provided schedule is passed.""" from dagster._core.definitions.declarative_automation.operands import ( CronTickPassedCondition, @@ -480,14 +459,14 @@ def cron_tick_passed( @experimental @staticmethod - def newly_missing() -> "AutomationCondition": + def newly_missing() -> "BuiltinAutomationCondition": """Returns an AutomationCondition that is true on the tick that the target becomes missing.""" with disable_dagster_warnings(): return AutomationCondition.missing().newly_true().with_label("newly_missing") @experimental @staticmethod - def any_deps_updated() -> "AnyDepsCondition": + def any_deps_updated() -> "DepsAutomationCondition": """Returns an AutomationCondition that is true if the target has at least one dependency that has updated since the previous tick, or will be requested on this tick. """ @@ -498,7 +477,7 @@ def any_deps_updated() -> "AnyDepsCondition": @experimental @staticmethod - def any_deps_missing() -> "AnyDepsCondition": + def any_deps_missing() -> "DepsAutomationCondition": """Returns an AutomationCondition that is true if the target has at least one dependency that is missing, and will not be requested on this tick. """ @@ -509,7 +488,7 @@ def any_deps_missing() -> "AnyDepsCondition": @experimental @staticmethod - def any_deps_in_progress() -> "AnyDepsCondition": + def any_deps_in_progress() -> "DepsAutomationCondition": """Returns an AutomationCondition that is true if the target has at least one dependency that is in progress. """ @@ -520,7 +499,7 @@ def any_deps_in_progress() -> "AnyDepsCondition": @experimental @staticmethod - def all_deps_blocking_checks_passed() -> "AllDepsCondition": + def all_deps_blocking_checks_passed() -> "DepsAutomationCondition": """Returns an AutomationCondition that is true for any partition where all upstream blocking checks have passed, or will be requested on this tick. @@ -541,7 +520,7 @@ def all_deps_blocking_checks_passed() -> "AllDepsCondition": @staticmethod def all_deps_updated_since_cron( cron_schedule: str, cron_timezone: str = "UTC" - ) -> "AllDepsCondition": + ) -> "DepsAutomationCondition": """Returns an AutomatonCondition that is true if all of the target's dependencies have updated since the latest tick of the provided cron schedule. """ @@ -622,7 +601,7 @@ def on_missing() -> "AndAutomationCondition": @public @experimental @staticmethod - def any_downstream_conditions() -> "AnyDownstreamConditionsCondition": + def any_downstream_conditions() -> "BuiltinAutomationCondition": """Returns an AutomationCondition which represents the union of all distinct downstream conditions.""" from dagster._core.definitions.declarative_automation.operators import ( AnyDownstreamConditionsCondition, diff --git a/python_modules/dagster/dagster/_core/definitions/declarative_automation/operators/check_operators.py b/python_modules/dagster/dagster/_core/definitions/declarative_automation/operators/check_operators.py index 4f07078ff058c..c62a043152f3f 100644 --- a/python_modules/dagster/dagster/_core/definitions/declarative_automation/operators/check_operators.py +++ b/python_modules/dagster/dagster/_core/definitions/declarative_automation/operators/check_operators.py @@ -16,9 +16,8 @@ from dagster._serdes.serdes import whitelist_for_serdes -@whitelist_for_serdes @record -class ChecksCondition(BuiltinAutomationCondition[AssetKey]): +class ChecksAutomationCondition(BuiltinAutomationCondition[AssetKey]): operand: AutomationCondition[AssetCheckKey] blocking_only: bool = False @@ -52,7 +51,7 @@ def _get_check_keys( @whitelist_for_serdes @record -class AnyChecksCondition(ChecksCondition): +class AnyChecksCondition(ChecksAutomationCondition): @property def base_description(self) -> str: return "Any" @@ -85,7 +84,7 @@ def evaluate(self, context: AutomationContext[AssetKey]) -> AutomationResult[Ass @whitelist_for_serdes @record -class AllChecksCondition(ChecksCondition): +class AllChecksCondition(ChecksAutomationCondition): @property def base_description(self) -> str: return "All" diff --git a/python_modules/dagster/dagster/_core/definitions/declarative_automation/operators/dep_operators.py b/python_modules/dagster/dagster/_core/definitions/declarative_automation/operators/dep_operators.py index 5aa5f9dabbb71..f4b1fe6a3b756 100644 --- a/python_modules/dagster/dagster/_core/definitions/declarative_automation/operators/dep_operators.py +++ b/python_modules/dagster/dagster/_core/definitions/declarative_automation/operators/dep_operators.py @@ -43,7 +43,7 @@ def evaluate(self, context: AutomationContext[T_EntityKey]) -> AutomationResult[ @record -class DepCondition(BuiltinAutomationCondition[T_EntityKey]): +class DepsAutomationCondition(BuiltinAutomationCondition[T_EntityKey]): operand: AutomationCondition # Should be AssetSelection, but this causes circular reference issues @@ -67,7 +67,7 @@ def description(self) -> str: def requires_cursor(self) -> bool: return False - def allow(self, selection: "AssetSelection") -> "DepCondition": + def allow(self, selection: "AssetSelection") -> "DepsAutomationCondition": """Returns a copy of this condition that will only consider dependencies within the provided AssetSelection. """ @@ -79,7 +79,7 @@ def allow(self, selection: "AssetSelection") -> "DepCondition": ) return copy(self, allow_selection=allow_selection) - def ignore(self, selection: "AssetSelection") -> "DepCondition": + def ignore(self, selection: "AssetSelection") -> "DepsAutomationCondition": """Returns a copy of this condition that will ignore dependencies within the provided AssetSelection. """ @@ -103,7 +103,7 @@ def _get_dep_keys( @whitelist_for_serdes -class AnyDepsCondition(DepCondition[T_EntityKey]): +class AnyDepsCondition(DepsAutomationCondition[T_EntityKey]): @property def base_description(self) -> str: return "Any" @@ -133,7 +133,7 @@ def evaluate(self, context: AutomationContext[T_EntityKey]) -> AutomationResult[ @whitelist_for_serdes -class AllDepsCondition(DepCondition[T_EntityKey]): +class AllDepsCondition(DepsAutomationCondition[T_EntityKey]): @property def base_description(self) -> str: return "All"