From 00970032ea1188b64c3a48904be189d6c43c572a Mon Sep 17 00:00:00 2001 From: OwenKephart Date: Thu, 26 Sep 2024 14:49:31 -0700 Subject: [PATCH] Convert AutomationCondition to @record (#24630) ## Summary & Motivation Converts all of the built-in automation conditions to use @record instead of pydantic models. ## How I Tested These Changes ## Changelog NOCHANGELOG - [ ] `NEW` _(added new feature or capability)_ - [ ] `BUGFIX` _(fixed a bug)_ - [ ] `DOCS` _(added or updated documentation)_ --- .../declarative_automation/automation_condition.py | 7 ++++--- .../declarative_automation/legacy/rule_condition.py | 2 ++ .../operands/code_version_changed_condition.py | 2 ++ .../operands/slice_conditions.py | 10 ++++++++++ .../operators/any_downstream_conditions_operator.py | 3 +++ .../operators/boolean_operators.py | 4 ++++ .../operators/check_operators.py | 6 ++++++ .../declarative_automation/operators/dep_operators.py | 7 +++++-- .../operators/newly_true_operator.py | 2 ++ .../declarative_automation/operators/since_operator.py | 2 ++ 10 files changed, 40 insertions(+), 5 deletions(-) diff --git a/python_modules/dagster/dagster/_core/definitions/declarative_automation/automation_condition.py b/python_modules/dagster/dagster/_core/definitions/declarative_automation/automation_condition.py index 1d71cb8ca0907..64a1efe023e84 100644 --- a/python_modules/dagster/dagster/_core/definitions/declarative_automation/automation_condition.py +++ b/python_modules/dagster/dagster/_core/definitions/declarative_automation/automation_condition.py @@ -21,7 +21,7 @@ ) from dagster._core.definitions.partition import AllPartitionsSubset from dagster._core.definitions.time_window_partitions import BaseTimeWindowPartitionsSubset -from dagster._model import DagsterModel +from dagster._record import copy, record from dagster._serdes.serdes import is_whitelisted_for_serdes_object from dagster._time import get_current_timestamp from dagster._utils.security import non_secure_md5_hash_str @@ -556,7 +556,8 @@ def any_downstream_conditions() -> "AnyDownstreamConditionsCondition": return AnyDownstreamConditionsCondition() -class BuiltinAutomationCondition(DagsterModel, AutomationCondition[T_EntityKey]): +@record +class BuiltinAutomationCondition(AutomationCondition[T_EntityKey]): """Base class for AutomationConditions provided by the core dagster framework.""" label: Optional[str] = None @@ -567,7 +568,7 @@ def get_label(self) -> Optional[str]: @public def with_label(self, label: Optional[str]) -> Self: """Returns a copy of this AutomationCondition with a human-readable label.""" - return self.model_copy(update={"label": label}) + return copy(self, label=label) def __hash__(self) -> int: return self.get_hash() diff --git a/python_modules/dagster/dagster/_core/definitions/declarative_automation/legacy/rule_condition.py b/python_modules/dagster/dagster/_core/definitions/declarative_automation/legacy/rule_condition.py index a5b94bbc7f6bf..72028c7687a4d 100644 --- a/python_modules/dagster/dagster/_core/definitions/declarative_automation/legacy/rule_condition.py +++ b/python_modules/dagster/dagster/_core/definitions/declarative_automation/legacy/rule_condition.py @@ -6,6 +6,7 @@ AutomationResult, BuiltinAutomationCondition, ) +from dagster._record import record from dagster._serdes.serdes import whitelist_for_serdes from dagster._utils.security import non_secure_md5_hash_str @@ -16,6 +17,7 @@ @whitelist_for_serdes +@record class RuleCondition(BuiltinAutomationCondition[AssetKey]): """This class represents the condition that a particular AutoMaterializeRule is satisfied.""" diff --git a/python_modules/dagster/dagster/_core/definitions/declarative_automation/operands/code_version_changed_condition.py b/python_modules/dagster/dagster/_core/definitions/declarative_automation/operands/code_version_changed_condition.py index ce3b6e8418b70..f68c4af467c3a 100644 --- a/python_modules/dagster/dagster/_core/definitions/declarative_automation/operands/code_version_changed_condition.py +++ b/python_modules/dagster/dagster/_core/definitions/declarative_automation/operands/code_version_changed_condition.py @@ -4,10 +4,12 @@ BuiltinAutomationCondition, ) from dagster._core.definitions.declarative_automation.automation_context import AutomationContext +from dagster._record import record from dagster._serdes.serdes import whitelist_for_serdes @whitelist_for_serdes +@record class CodeVersionChangedCondition(BuiltinAutomationCondition[AssetKey]): @property def description(self) -> str: diff --git a/python_modules/dagster/dagster/_core/definitions/declarative_automation/operands/slice_conditions.py b/python_modules/dagster/dagster/_core/definitions/declarative_automation/operands/slice_conditions.py index f4d968bcc793a..7dfcf6646b6ec 100644 --- a/python_modules/dagster/dagster/_core/definitions/declarative_automation/operands/slice_conditions.py +++ b/python_modules/dagster/dagster/_core/definitions/declarative_automation/operands/slice_conditions.py @@ -10,10 +10,12 @@ ) from dagster._core.definitions.declarative_automation.automation_context import AutomationContext from dagster._core.definitions.declarative_automation.utils import SerializableTimeDelta +from dagster._record import record from dagster._serdes.serdes import whitelist_for_serdes from dagster._utils.schedules import reverse_cron_string_iterator +@record class SubsetAutomationCondition(BuiltinAutomationCondition[T_EntityKey]): """Base class for simple conditions which compute a simple subset of the asset graph.""" @@ -37,6 +39,7 @@ def evaluate(self, context: AutomationContext[T_EntityKey]) -> AutomationResult[ @whitelist_for_serdes +@record class MissingAutomationCondition(SubsetAutomationCondition[AssetKey]): @property def description(self) -> str: @@ -53,6 +56,7 @@ def compute_subset(self, context: AutomationContext) -> EntitySubset[AssetKey]: @whitelist_for_serdes +@record class InProgressAutomationCondition(SubsetAutomationCondition[AssetKey]): @property def description(self) -> str: @@ -67,6 +71,7 @@ def compute_subset(self, context: AutomationContext) -> EntitySubset[AssetKey]: @whitelist_for_serdes +@record class FailedAutomationCondition(SubsetAutomationCondition[AssetKey]): @property def description(self) -> str: @@ -81,6 +86,7 @@ def compute_subset(self, context: AutomationContext) -> EntitySubset[AssetKey]: @whitelist_for_serdes +@record class WillBeRequestedCondition(SubsetAutomationCondition[AssetKey]): @property def description(self) -> str: @@ -114,6 +120,7 @@ def compute_subset(self, context: AutomationContext) -> EntitySubset[AssetKey]: @whitelist_for_serdes +@record class NewlyRequestedCondition(SubsetAutomationCondition[AssetKey]): @property def description(self) -> str: @@ -128,6 +135,7 @@ def compute_subset(self, context: AutomationContext) -> EntitySubset[AssetKey]: @whitelist_for_serdes +@record class NewlyUpdatedCondition(SubsetAutomationCondition[AssetKey]): @property def description(self) -> str: @@ -148,6 +156,7 @@ def compute_subset(self, context: AutomationContext) -> EntitySubset[AssetKey]: @whitelist_for_serdes +@record class CronTickPassedCondition(SubsetAutomationCondition): cron_schedule: str cron_timezone: str @@ -182,6 +191,7 @@ def compute_subset(self, context: AutomationContext) -> EntitySubset: @whitelist_for_serdes +@record class InLatestTimeWindowCondition(SubsetAutomationCondition[AssetKey]): serializable_lookback_timedelta: Optional[SerializableTimeDelta] = None diff --git a/python_modules/dagster/dagster/_core/definitions/declarative_automation/operators/any_downstream_conditions_operator.py b/python_modules/dagster/dagster/_core/definitions/declarative_automation/operators/any_downstream_conditions_operator.py index da7cef33b1b16..161c062814bf5 100644 --- a/python_modules/dagster/dagster/_core/definitions/declarative_automation/operators/any_downstream_conditions_operator.py +++ b/python_modules/dagster/dagster/_core/definitions/declarative_automation/operators/any_downstream_conditions_operator.py @@ -7,9 +7,11 @@ BuiltinAutomationCondition, ) from dagster._core.definitions.declarative_automation.automation_context import AutomationContext +from dagster._record import record from dagster._serdes.serdes import whitelist_for_serdes +@record class DownstreamConditionWrapperCondition(BuiltinAutomationCondition[AssetKey]): """Wrapper object which evaluates a condition against a dependency and returns a subset representing the subset of downstream asset which has at least one parent which evaluated to @@ -45,6 +47,7 @@ def evaluate(self, context: AutomationContext[AssetKey]) -> AutomationResult[Ass @whitelist_for_serdes +@record class AnyDownstreamConditionsCondition(BuiltinAutomationCondition[AssetKey]): @property def description(self) -> str: diff --git a/python_modules/dagster/dagster/_core/definitions/declarative_automation/operators/boolean_operators.py b/python_modules/dagster/dagster/_core/definitions/declarative_automation/operators/boolean_operators.py index e49ace9d660ca..23cf97fe84e20 100644 --- a/python_modules/dagster/dagster/_core/definitions/declarative_automation/operators/boolean_operators.py +++ b/python_modules/dagster/dagster/_core/definitions/declarative_automation/operators/boolean_operators.py @@ -8,10 +8,12 @@ BuiltinAutomationCondition, ) from dagster._core.definitions.declarative_automation.automation_context import AutomationContext +from dagster._record import record from dagster._serdes.serdes import whitelist_for_serdes @whitelist_for_serdes(storage_name="AndAssetCondition") +@record class AndAutomationCondition(BuiltinAutomationCondition[T_EntityKey]): """This class represents the condition that all of its children evaluate to true.""" @@ -59,6 +61,7 @@ def without(self, condition: AutomationCondition) -> "AndAutomationCondition": @whitelist_for_serdes(storage_name="OrAssetCondition") +@record class OrAutomationCondition(BuiltinAutomationCondition[T_EntityKey]): """This class represents the condition that any of its children evaluate to true.""" @@ -95,6 +98,7 @@ def evaluate(self, context: AutomationContext[T_EntityKey]) -> AutomationResult[ @whitelist_for_serdes(storage_name="NotAssetCondition") +@record class NotAutomationCondition(BuiltinAutomationCondition[T_EntityKey]): """This class represents the condition that none of its children evaluate to true.""" diff --git a/python_modules/dagster/dagster/_core/definitions/declarative_automation/operators/check_operators.py b/python_modules/dagster/dagster/_core/definitions/declarative_automation/operators/check_operators.py index 46b673e3fde71..285846ff73e5a 100644 --- a/python_modules/dagster/dagster/_core/definitions/declarative_automation/operators/check_operators.py +++ b/python_modules/dagster/dagster/_core/definitions/declarative_automation/operators/check_operators.py @@ -9,9 +9,11 @@ BuiltinAutomationCondition, ) from dagster._core.definitions.declarative_automation.automation_context import AutomationContext +from dagster._record import record from dagster._serdes.serdes import whitelist_for_serdes +@record class CheckConditionWrapperCondition(BuiltinAutomationCondition[AssetKey]): check_key: AssetCheckKey operand: AutomationCondition[AssetCheckKey] @@ -38,6 +40,8 @@ def evaluate(self, context: AutomationContext[AssetKey]) -> AutomationResult[Ass ) +@whitelist_for_serdes +@record class ChecksCondition(BuiltinAutomationCondition[AssetKey]): operand: AutomationCondition[AssetCheckKey] @@ -71,6 +75,7 @@ def _get_check_keys( @whitelist_for_serdes +@record class AnyChecksCondition(ChecksCondition): @property def base_description(self) -> str: @@ -105,6 +110,7 @@ def evaluate(self, context: AutomationContext[AssetKey]) -> AutomationResult[Ass @whitelist_for_serdes +@record class AllChecksCondition(ChecksCondition): @property def base_description(self) -> str: diff --git a/python_modules/dagster/dagster/_core/definitions/declarative_automation/operators/dep_operators.py b/python_modules/dagster/dagster/_core/definitions/declarative_automation/operators/dep_operators.py index 8b619e0d4dffa..874a5e682253c 100644 --- a/python_modules/dagster/dagster/_core/definitions/declarative_automation/operators/dep_operators.py +++ b/python_modules/dagster/dagster/_core/definitions/declarative_automation/operators/dep_operators.py @@ -10,12 +10,14 @@ BuiltinAutomationCondition, ) from dagster._core.definitions.declarative_automation.automation_context import AutomationContext +from dagster._record import copy, record from dagster._serdes.serdes import whitelist_for_serdes if TYPE_CHECKING: from dagster._core.definitions.asset_selection import AssetSelection +@record class DepConditionWrapperCondition(BuiltinAutomationCondition[T_EntityKey]): """Wrapper object which evaluates a condition against a dependency and returns a subset representing the subset of downstream asset which has at least one parent which evaluated to @@ -51,6 +53,7 @@ def evaluate(self, context: AutomationContext[T_EntityKey]) -> AutomationResult[ ) +@record class DepCondition(BuiltinAutomationCondition[T_EntityKey]): operand: AutomationCondition @@ -85,7 +88,7 @@ def allow(self, selection: "AssetSelection") -> "DepCondition": allow_selection = ( selection if self.allow_selection is None else selection | self.allow_selection ) - return self.model_copy(update={"allow_selection": allow_selection}) + return copy(self, allow_selection=allow_selection) def ignore(self, selection: "AssetSelection") -> "DepCondition": """Returns a copy of this condition that will ignore dependencies within the provided @@ -97,7 +100,7 @@ def ignore(self, selection: "AssetSelection") -> "DepCondition": ignore_selection = ( selection if self.ignore_selection is None else selection | self.ignore_selection ) - return self.model_copy(update={"ignore_selection": ignore_selection}) + return copy(self, ignore_selection=ignore_selection) def _get_dep_keys( self, key: T_EntityKey, asset_graph: BaseAssetGraph[BaseAssetNode] diff --git a/python_modules/dagster/dagster/_core/definitions/declarative_automation/operators/newly_true_operator.py b/python_modules/dagster/dagster/_core/definitions/declarative_automation/operators/newly_true_operator.py index a99a512074e9d..81ba978b214a8 100644 --- a/python_modules/dagster/dagster/_core/definitions/declarative_automation/operators/newly_true_operator.py +++ b/python_modules/dagster/dagster/_core/definitions/declarative_automation/operators/newly_true_operator.py @@ -9,10 +9,12 @@ BuiltinAutomationCondition, ) from dagster._core.definitions.declarative_automation.automation_context import AutomationContext +from dagster._record import record from dagster._serdes.serdes import whitelist_for_serdes @whitelist_for_serdes +@record class NewlyTrueCondition(BuiltinAutomationCondition[T_EntityKey]): operand: AutomationCondition[T_EntityKey] diff --git a/python_modules/dagster/dagster/_core/definitions/declarative_automation/operators/since_operator.py b/python_modules/dagster/dagster/_core/definitions/declarative_automation/operators/since_operator.py index b9c238f405405..d732aecae063c 100644 --- a/python_modules/dagster/dagster/_core/definitions/declarative_automation/operators/since_operator.py +++ b/python_modules/dagster/dagster/_core/definitions/declarative_automation/operators/since_operator.py @@ -7,10 +7,12 @@ BuiltinAutomationCondition, ) from dagster._core.definitions.declarative_automation.automation_context import AutomationContext +from dagster._record import record from dagster._serdes.serdes import whitelist_for_serdes @whitelist_for_serdes +@record class SinceCondition(BuiltinAutomationCondition[T_EntityKey]): trigger_condition: AutomationCondition[T_EntityKey] reset_condition: AutomationCondition[T_EntityKey]