From f1468dece159cbcf21da3e386633d5a7b01ddb6b Mon Sep 17 00:00:00 2001 From: OwenKephart Date: Tue, 15 Oct 2024 12:58:39 -0700 Subject: [PATCH] [refactor] Move all operands into a single file (#25187) ## Summary & Motivation Minor refactor -- these implementations are all very simple and it's easier mentally to have them in a single place. Bumped the SubsetAutomationCondition into its own file as it cannot be directly instantiated. ## How I Tested These Changes ## Changelog NOCHANGELOG --- .../declarative_automation/__init__.py | 4 +- .../automation_condition.py | 2 +- .../operands/__init__.py | 6 +-- .../code_version_changed_condition.py | 26 ---------- .../{slice_conditions.py => operands.py} | 47 +++++++++---------- .../operands/subset_automation_condition.py | 33 +++++++++++++ 6 files changed, 61 insertions(+), 57 deletions(-) delete mode 100644 python_modules/dagster/dagster/_core/definitions/declarative_automation/operands/code_version_changed_condition.py rename python_modules/dagster/dagster/_core/definitions/declarative_automation/operands/{slice_conditions.py => operands.py} (90%) create mode 100644 python_modules/dagster/dagster/_core/definitions/declarative_automation/operands/subset_automation_condition.py diff --git a/python_modules/dagster/dagster/_core/definitions/declarative_automation/__init__.py b/python_modules/dagster/dagster/_core/definitions/declarative_automation/__init__.py index 39ca4d1528a60..f47f5f2d05267 100644 --- a/python_modules/dagster/dagster/_core/definitions/declarative_automation/__init__.py +++ b/python_modules/dagster/dagster/_core/definitions/declarative_automation/__init__.py @@ -1,5 +1,4 @@ from dagster._core.definitions.declarative_automation.operands import ( - CodeVersionChangedCondition as CodeVersionChangedCondition, CronTickPassedCondition as CronTickPassedCondition, ExecutionFailedAutomationCondition as ExecutionFailedAutomationCondition, InLatestTimeWindowCondition as InLatestTimeWindowCondition, @@ -9,6 +8,9 @@ NewlyUpdatedCondition as NewlyUpdatedCondition, WillBeRequestedCondition as WillBeRequestedCondition, ) +from dagster._core.definitions.declarative_automation.operands.operands import ( + CodeVersionChangedCondition as CodeVersionChangedCondition, +) from dagster._core.definitions.declarative_automation.operators import ( AllDepsCondition as AllDepsCondition, AndAutomationCondition as AndAutomationCondition, diff --git a/python_modules/dagster/dagster/_core/definitions/declarative_automation/automation_condition.py b/python_modules/dagster/dagster/_core/definitions/declarative_automation/automation_condition.py index 8689a68eb6ce3..e29c5134448b2 100644 --- a/python_modules/dagster/dagster/_core/definitions/declarative_automation/automation_condition.py +++ b/python_modules/dagster/dagster/_core/definitions/declarative_automation/automation_condition.py @@ -436,7 +436,7 @@ def code_version_changed() -> "BuiltinAutomationCondition[AssetKey]": """Returns an AutomationCondition that is true if the target's code version has been changed since the previous tick. """ - from dagster._core.definitions.declarative_automation.operands import ( + from dagster._core.definitions.declarative_automation.operands.operands import ( CodeVersionChangedCondition, ) diff --git a/python_modules/dagster/dagster/_core/definitions/declarative_automation/operands/__init__.py b/python_modules/dagster/dagster/_core/definitions/declarative_automation/operands/__init__.py index f4801d5911656..98e0ba0b11517 100644 --- a/python_modules/dagster/dagster/_core/definitions/declarative_automation/operands/__init__.py +++ b/python_modules/dagster/dagster/_core/definitions/declarative_automation/operands/__init__.py @@ -1,8 +1,6 @@ -from dagster._core.definitions.declarative_automation.operands.code_version_changed_condition import ( - CodeVersionChangedCondition as CodeVersionChangedCondition, -) -from dagster._core.definitions.declarative_automation.operands.slice_conditions import ( +from dagster._core.definitions.declarative_automation.operands.operands import ( CheckResultCondition as CheckResultCondition, + CodeVersionChangedCondition as CodeVersionChangedCondition, CronTickPassedCondition as CronTickPassedCondition, ExecutionFailedAutomationCondition as ExecutionFailedAutomationCondition, InitialEvaluationCondition as InitialEvaluationCondition, diff --git a/python_modules/dagster/dagster/_core/definitions/declarative_automation/operands/code_version_changed_condition.py b/python_modules/dagster/dagster/_core/definitions/declarative_automation/operands/code_version_changed_condition.py deleted file mode 100644 index 793736027a8a6..0000000000000 --- a/python_modules/dagster/dagster/_core/definitions/declarative_automation/operands/code_version_changed_condition.py +++ /dev/null @@ -1,26 +0,0 @@ -from dagster._core.definitions.asset_key import AssetKey -from dagster._core.definitions.declarative_automation.automation_condition import ( - AutomationResult, - BuiltinAutomationCondition, -) -from dagster._core.definitions.declarative_automation.automation_context import AutomationContext -from dagster._record import record -from dagster._serdes.serdes import whitelist_for_serdes - - -@whitelist_for_serdes -@record -class CodeVersionChangedCondition(BuiltinAutomationCondition[AssetKey]): - @property - def name(self) -> str: - return "code_version_changed" - - def evaluate(self, context: AutomationContext) -> AutomationResult[AssetKey]: - previous_code_version = context.cursor - current_code_version = context.asset_graph.get(context.key).code_version - if previous_code_version is None or previous_code_version == current_code_version: - true_subset = context.get_empty_subset() - else: - true_subset = context.candidate_subset - - return AutomationResult(context, true_subset, cursor=current_code_version) diff --git a/python_modules/dagster/dagster/_core/definitions/declarative_automation/operands/slice_conditions.py b/python_modules/dagster/dagster/_core/definitions/declarative_automation/operands/operands.py similarity index 90% rename from python_modules/dagster/dagster/_core/definitions/declarative_automation/operands/slice_conditions.py rename to python_modules/dagster/dagster/_core/definitions/declarative_automation/operands/operands.py index 80670152ce5c8..15e6e28e647da 100644 --- a/python_modules/dagster/dagster/_core/definitions/declarative_automation/operands/slice_conditions.py +++ b/python_modules/dagster/dagster/_core/definitions/declarative_automation/operands/operands.py @@ -1,20 +1,40 @@ import datetime -from abc import abstractmethod from typing import Optional from dagster._core.asset_graph_view.entity_subset import EntitySubset -from dagster._core.definitions.asset_key import AssetCheckKey, T_EntityKey +from dagster._core.definitions.asset_key import AssetCheckKey, AssetKey from dagster._core.definitions.declarative_automation.automation_condition import ( AutomationResult, BuiltinAutomationCondition, ) from dagster._core.definitions.declarative_automation.automation_context import AutomationContext +from dagster._core.definitions.declarative_automation.operands.subset_automation_condition import ( + SubsetAutomationCondition, +) from dagster._core.definitions.declarative_automation.utils import SerializableTimeDelta from dagster._record import record from dagster._serdes.serdes import whitelist_for_serdes from dagster._utils.schedules import reverse_cron_string_iterator +@whitelist_for_serdes +@record +class CodeVersionChangedCondition(BuiltinAutomationCondition[AssetKey]): + @property + def name(self) -> str: + return "code_version_changed" + + def evaluate(self, context: AutomationContext) -> AutomationResult[AssetKey]: + previous_code_version = context.cursor + current_code_version = context.asset_graph.get(context.key).code_version + if previous_code_version is None or previous_code_version == current_code_version: + true_subset = context.get_empty_subset() + else: + true_subset = context.candidate_subset + + return AutomationResult(context, true_subset, cursor=current_code_version) + + @record @whitelist_for_serdes class InitialEvaluationCondition(BuiltinAutomationCondition): @@ -33,29 +53,6 @@ def evaluate(self, context: AutomationContext) -> AutomationResult: return AutomationResult(context, subset, cursor=condition_tree_id) -@record -class SubsetAutomationCondition(BuiltinAutomationCondition[T_EntityKey]): - """Base class for simple conditions which compute a simple subset of the asset graph.""" - - @property - def requires_cursor(self) -> bool: - return False - - @abstractmethod - def compute_subset( - self, context: AutomationContext[T_EntityKey] - ) -> EntitySubset[T_EntityKey]: ... - - def evaluate(self, context: AutomationContext[T_EntityKey]) -> AutomationResult[T_EntityKey]: - # don't compute anything if there are no candidates - if context.candidate_subset.is_empty: - true_subset = context.get_empty_subset() - else: - true_subset = self.compute_subset(context) - - return AutomationResult(context, true_subset) - - @whitelist_for_serdes @record class MissingAutomationCondition(SubsetAutomationCondition): diff --git a/python_modules/dagster/dagster/_core/definitions/declarative_automation/operands/subset_automation_condition.py b/python_modules/dagster/dagster/_core/definitions/declarative_automation/operands/subset_automation_condition.py new file mode 100644 index 0000000000000..a94d4138d3893 --- /dev/null +++ b/python_modules/dagster/dagster/_core/definitions/declarative_automation/operands/subset_automation_condition.py @@ -0,0 +1,33 @@ +from abc import abstractmethod + +from dagster._core.asset_graph_view.entity_subset import EntitySubset +from dagster._core.definitions.asset_key import T_EntityKey +from dagster._core.definitions.declarative_automation.automation_condition import ( + AutomationResult, + BuiltinAutomationCondition, +) +from dagster._core.definitions.declarative_automation.automation_context import AutomationContext +from dagster._record import record + + +@record +class SubsetAutomationCondition(BuiltinAutomationCondition[T_EntityKey]): + """Base class for simple conditions which compute a simple subset of the asset graph.""" + + @property + def requires_cursor(self) -> bool: + return False + + @abstractmethod + def compute_subset( + self, context: AutomationContext[T_EntityKey] + ) -> EntitySubset[T_EntityKey]: ... + + def evaluate(self, context: AutomationContext[T_EntityKey]) -> AutomationResult[T_EntityKey]: + # don't compute anything if there are no candidates + if context.candidate_subset.is_empty: + true_subset = context.get_empty_subset() + else: + true_subset = self.compute_subset(context) + + return AutomationResult(context, true_subset)