Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[refactor] Move all operands into a single file #25187

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
from dagster._core.definitions.declarative_automation.operands import (
CodeVersionChangedCondition as CodeVersionChangedCondition,
CronTickPassedCondition as CronTickPassedCondition,
ExecutionFailedAutomationCondition as ExecutionFailedAutomationCondition,
InLatestTimeWindowCondition as InLatestTimeWindowCondition,
Expand All @@ -9,6 +8,9 @@
NewlyUpdatedCondition as NewlyUpdatedCondition,
WillBeRequestedCondition as WillBeRequestedCondition,
)
from dagster._core.definitions.declarative_automation.operands.operands import (
CodeVersionChangedCondition as CodeVersionChangedCondition,
)
from dagster._core.definitions.declarative_automation.operators import (
AllDepsCondition as AllDepsCondition,
AndAutomationCondition as AndAutomationCondition,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -436,7 +436,7 @@ def code_version_changed() -> "BuiltinAutomationCondition[AssetKey]":
"""Returns an AutomationCondition that is true if the target's code version has been changed
since the previous tick.
"""
from dagster._core.definitions.declarative_automation.operands import (
from dagster._core.definitions.declarative_automation.operands.operands import (
CodeVersionChangedCondition,
)

Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
from dagster._core.definitions.declarative_automation.operands.code_version_changed_condition import (
CodeVersionChangedCondition as CodeVersionChangedCondition,
)
from dagster._core.definitions.declarative_automation.operands.slice_conditions import (
from dagster._core.definitions.declarative_automation.operands.operands import (
CheckResultCondition as CheckResultCondition,
CodeVersionChangedCondition as CodeVersionChangedCondition,
CronTickPassedCondition as CronTickPassedCondition,
ExecutionFailedAutomationCondition as ExecutionFailedAutomationCondition,
InitialEvaluationCondition as InitialEvaluationCondition,
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,20 +1,40 @@
import datetime
from abc import abstractmethod
from typing import Optional

from dagster._core.asset_graph_view.entity_subset import EntitySubset
from dagster._core.definitions.asset_key import AssetCheckKey, T_EntityKey
from dagster._core.definitions.asset_key import AssetCheckKey, AssetKey
from dagster._core.definitions.declarative_automation.automation_condition import (
AutomationResult,
BuiltinAutomationCondition,
)
from dagster._core.definitions.declarative_automation.automation_context import AutomationContext
from dagster._core.definitions.declarative_automation.operands.subset_automation_condition import (
SubsetAutomationCondition,
)
from dagster._core.definitions.declarative_automation.utils import SerializableTimeDelta
from dagster._record import record
from dagster._serdes.serdes import whitelist_for_serdes
from dagster._utils.schedules import reverse_cron_string_iterator


@whitelist_for_serdes
@record
class CodeVersionChangedCondition(BuiltinAutomationCondition[AssetKey]):
@property
def name(self) -> str:
return "code_version_changed"

def evaluate(self, context: AutomationContext) -> AutomationResult[AssetKey]:
previous_code_version = context.cursor
current_code_version = context.asset_graph.get(context.key).code_version
if previous_code_version is None or previous_code_version == current_code_version:
true_subset = context.get_empty_subset()
else:
true_subset = context.candidate_subset

return AutomationResult(context, true_subset, cursor=current_code_version)


@record
@whitelist_for_serdes
class InitialEvaluationCondition(BuiltinAutomationCondition):
Expand All @@ -33,29 +53,6 @@ def evaluate(self, context: AutomationContext) -> AutomationResult:
return AutomationResult(context, subset, cursor=condition_tree_id)


@record
class SubsetAutomationCondition(BuiltinAutomationCondition[T_EntityKey]):
"""Base class for simple conditions which compute a simple subset of the asset graph."""

@property
def requires_cursor(self) -> bool:
return False

@abstractmethod
def compute_subset(
self, context: AutomationContext[T_EntityKey]
) -> EntitySubset[T_EntityKey]: ...

def evaluate(self, context: AutomationContext[T_EntityKey]) -> AutomationResult[T_EntityKey]:
# don't compute anything if there are no candidates
if context.candidate_subset.is_empty:
true_subset = context.get_empty_subset()
else:
true_subset = self.compute_subset(context)

return AutomationResult(context, true_subset)


@whitelist_for_serdes
@record
class MissingAutomationCondition(SubsetAutomationCondition):
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
from abc import abstractmethod

from dagster._core.asset_graph_view.entity_subset import EntitySubset
from dagster._core.definitions.asset_key import T_EntityKey
from dagster._core.definitions.declarative_automation.automation_condition import (
AutomationResult,
BuiltinAutomationCondition,
)
from dagster._core.definitions.declarative_automation.automation_context import AutomationContext
from dagster._record import record


@record
class SubsetAutomationCondition(BuiltinAutomationCondition[T_EntityKey]):
"""Base class for simple conditions which compute a simple subset of the asset graph."""

@property
def requires_cursor(self) -> bool:
return False

@abstractmethod
def compute_subset(
self, context: AutomationContext[T_EntityKey]
) -> EntitySubset[T_EntityKey]: ...

def evaluate(self, context: AutomationContext[T_EntityKey]) -> AutomationResult[T_EntityKey]:
# don't compute anything if there are no candidates
if context.candidate_subset.is_empty:
true_subset = context.get_empty_subset()
else:
true_subset = self.compute_subset(context)

return AutomationResult(context, true_subset)