Skip to content

Commit

Permalink
[refactor] Move all operands into a single file (#25187)
Browse files Browse the repository at this point in the history
## Summary & Motivation

Minor refactor -- these implementations are all very simple and it's easier mentally to have them in a single place. Bumped the SubsetAutomationCondition into its own file as it cannot be directly instantiated.

## How I Tested These Changes

## Changelog

NOCHANGELOG
  • Loading branch information
OwenKephart authored Oct 15, 2024
1 parent 564cd78 commit f1468de
Show file tree
Hide file tree
Showing 6 changed files with 61 additions and 57 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
from dagster._core.definitions.declarative_automation.operands import (
CodeVersionChangedCondition as CodeVersionChangedCondition,
CronTickPassedCondition as CronTickPassedCondition,
ExecutionFailedAutomationCondition as ExecutionFailedAutomationCondition,
InLatestTimeWindowCondition as InLatestTimeWindowCondition,
Expand All @@ -9,6 +8,9 @@
NewlyUpdatedCondition as NewlyUpdatedCondition,
WillBeRequestedCondition as WillBeRequestedCondition,
)
from dagster._core.definitions.declarative_automation.operands.operands import (
CodeVersionChangedCondition as CodeVersionChangedCondition,
)
from dagster._core.definitions.declarative_automation.operators import (
AllDepsCondition as AllDepsCondition,
AndAutomationCondition as AndAutomationCondition,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -436,7 +436,7 @@ def code_version_changed() -> "BuiltinAutomationCondition[AssetKey]":
"""Returns an AutomationCondition that is true if the target's code version has been changed
since the previous tick.
"""
from dagster._core.definitions.declarative_automation.operands import (
from dagster._core.definitions.declarative_automation.operands.operands import (
CodeVersionChangedCondition,
)

Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
from dagster._core.definitions.declarative_automation.operands.code_version_changed_condition import (
CodeVersionChangedCondition as CodeVersionChangedCondition,
)
from dagster._core.definitions.declarative_automation.operands.slice_conditions import (
from dagster._core.definitions.declarative_automation.operands.operands import (
CheckResultCondition as CheckResultCondition,
CodeVersionChangedCondition as CodeVersionChangedCondition,
CronTickPassedCondition as CronTickPassedCondition,
ExecutionFailedAutomationCondition as ExecutionFailedAutomationCondition,
InitialEvaluationCondition as InitialEvaluationCondition,
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,20 +1,40 @@
import datetime
from abc import abstractmethod
from typing import Optional

from dagster._core.asset_graph_view.entity_subset import EntitySubset
from dagster._core.definitions.asset_key import AssetCheckKey, T_EntityKey
from dagster._core.definitions.asset_key import AssetCheckKey, AssetKey
from dagster._core.definitions.declarative_automation.automation_condition import (
AutomationResult,
BuiltinAutomationCondition,
)
from dagster._core.definitions.declarative_automation.automation_context import AutomationContext
from dagster._core.definitions.declarative_automation.operands.subset_automation_condition import (
SubsetAutomationCondition,
)
from dagster._core.definitions.declarative_automation.utils import SerializableTimeDelta
from dagster._record import record
from dagster._serdes.serdes import whitelist_for_serdes
from dagster._utils.schedules import reverse_cron_string_iterator


@whitelist_for_serdes
@record
class CodeVersionChangedCondition(BuiltinAutomationCondition[AssetKey]):
@property
def name(self) -> str:
return "code_version_changed"

def evaluate(self, context: AutomationContext) -> AutomationResult[AssetKey]:
previous_code_version = context.cursor
current_code_version = context.asset_graph.get(context.key).code_version
if previous_code_version is None or previous_code_version == current_code_version:
true_subset = context.get_empty_subset()
else:
true_subset = context.candidate_subset

return AutomationResult(context, true_subset, cursor=current_code_version)


@record
@whitelist_for_serdes
class InitialEvaluationCondition(BuiltinAutomationCondition):
Expand All @@ -33,29 +53,6 @@ def evaluate(self, context: AutomationContext) -> AutomationResult:
return AutomationResult(context, subset, cursor=condition_tree_id)


@record
class SubsetAutomationCondition(BuiltinAutomationCondition[T_EntityKey]):
"""Base class for simple conditions which compute a simple subset of the asset graph."""

@property
def requires_cursor(self) -> bool:
return False

@abstractmethod
def compute_subset(
self, context: AutomationContext[T_EntityKey]
) -> EntitySubset[T_EntityKey]: ...

def evaluate(self, context: AutomationContext[T_EntityKey]) -> AutomationResult[T_EntityKey]:
# don't compute anything if there are no candidates
if context.candidate_subset.is_empty:
true_subset = context.get_empty_subset()
else:
true_subset = self.compute_subset(context)

return AutomationResult(context, true_subset)


@whitelist_for_serdes
@record
class MissingAutomationCondition(SubsetAutomationCondition):
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
from abc import abstractmethod

from dagster._core.asset_graph_view.entity_subset import EntitySubset
from dagster._core.definitions.asset_key import T_EntityKey
from dagster._core.definitions.declarative_automation.automation_condition import (
AutomationResult,
BuiltinAutomationCondition,
)
from dagster._core.definitions.declarative_automation.automation_context import AutomationContext
from dagster._record import record


@record
class SubsetAutomationCondition(BuiltinAutomationCondition[T_EntityKey]):
"""Base class for simple conditions which compute a simple subset of the asset graph."""

@property
def requires_cursor(self) -> bool:
return False

@abstractmethod
def compute_subset(
self, context: AutomationContext[T_EntityKey]
) -> EntitySubset[T_EntityKey]: ...

def evaluate(self, context: AutomationContext[T_EntityKey]) -> AutomationResult[T_EntityKey]:
# don't compute anything if there are no candidates
if context.candidate_subset.is_empty:
true_subset = context.get_empty_subset()
else:
true_subset = self.compute_subset(context)

return AutomationResult(context, true_subset)

0 comments on commit f1468de

Please sign in to comment.