Skip to content

Commit

Permalink
Convert AutomationCondition to @record (#24630)
Browse files Browse the repository at this point in the history
## Summary & Motivation

Converts all of the built-in automation conditions to use @record instead of pydantic models.

## How I Tested These Changes

## Changelog

NOCHANGELOG

- [ ] `NEW` _(added new feature or capability)_
- [ ] `BUGFIX` _(fixed a bug)_
- [ ] `DOCS` _(added or updated documentation)_
  • Loading branch information
OwenKephart authored Sep 26, 2024
1 parent 3eb4c44 commit 0097003
Show file tree
Hide file tree
Showing 10 changed files with 40 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
)
from dagster._core.definitions.partition import AllPartitionsSubset
from dagster._core.definitions.time_window_partitions import BaseTimeWindowPartitionsSubset
from dagster._model import DagsterModel
from dagster._record import copy, record
from dagster._serdes.serdes import is_whitelisted_for_serdes_object
from dagster._time import get_current_timestamp
from dagster._utils.security import non_secure_md5_hash_str
Expand Down Expand Up @@ -556,7 +556,8 @@ def any_downstream_conditions() -> "AnyDownstreamConditionsCondition":
return AnyDownstreamConditionsCondition()


class BuiltinAutomationCondition(DagsterModel, AutomationCondition[T_EntityKey]):
@record
class BuiltinAutomationCondition(AutomationCondition[T_EntityKey]):
"""Base class for AutomationConditions provided by the core dagster framework."""

label: Optional[str] = None
Expand All @@ -567,7 +568,7 @@ def get_label(self) -> Optional[str]:
@public
def with_label(self, label: Optional[str]) -> Self:
"""Returns a copy of this AutomationCondition with a human-readable label."""
return self.model_copy(update={"label": label})
return copy(self, label=label)

def __hash__(self) -> int:
return self.get_hash()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
AutomationResult,
BuiltinAutomationCondition,
)
from dagster._record import record
from dagster._serdes.serdes import whitelist_for_serdes
from dagster._utils.security import non_secure_md5_hash_str

Expand All @@ -16,6 +17,7 @@


@whitelist_for_serdes
@record
class RuleCondition(BuiltinAutomationCondition[AssetKey]):
"""This class represents the condition that a particular AutoMaterializeRule is satisfied."""

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,12 @@
BuiltinAutomationCondition,
)
from dagster._core.definitions.declarative_automation.automation_context import AutomationContext
from dagster._record import record
from dagster._serdes.serdes import whitelist_for_serdes


@whitelist_for_serdes
@record
class CodeVersionChangedCondition(BuiltinAutomationCondition[AssetKey]):
@property
def description(self) -> str:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,12 @@
)
from dagster._core.definitions.declarative_automation.automation_context import AutomationContext
from dagster._core.definitions.declarative_automation.utils import SerializableTimeDelta
from dagster._record import record
from dagster._serdes.serdes import whitelist_for_serdes
from dagster._utils.schedules import reverse_cron_string_iterator


@record
class SubsetAutomationCondition(BuiltinAutomationCondition[T_EntityKey]):
"""Base class for simple conditions which compute a simple subset of the asset graph."""

Expand All @@ -37,6 +39,7 @@ def evaluate(self, context: AutomationContext[T_EntityKey]) -> AutomationResult[


@whitelist_for_serdes
@record
class MissingAutomationCondition(SubsetAutomationCondition[AssetKey]):
@property
def description(self) -> str:
Expand All @@ -53,6 +56,7 @@ def compute_subset(self, context: AutomationContext) -> EntitySubset[AssetKey]:


@whitelist_for_serdes
@record
class InProgressAutomationCondition(SubsetAutomationCondition[AssetKey]):
@property
def description(self) -> str:
Expand All @@ -67,6 +71,7 @@ def compute_subset(self, context: AutomationContext) -> EntitySubset[AssetKey]:


@whitelist_for_serdes
@record
class FailedAutomationCondition(SubsetAutomationCondition[AssetKey]):
@property
def description(self) -> str:
Expand All @@ -81,6 +86,7 @@ def compute_subset(self, context: AutomationContext) -> EntitySubset[AssetKey]:


@whitelist_for_serdes
@record
class WillBeRequestedCondition(SubsetAutomationCondition[AssetKey]):
@property
def description(self) -> str:
Expand Down Expand Up @@ -114,6 +120,7 @@ def compute_subset(self, context: AutomationContext) -> EntitySubset[AssetKey]:


@whitelist_for_serdes
@record
class NewlyRequestedCondition(SubsetAutomationCondition[AssetKey]):
@property
def description(self) -> str:
Expand All @@ -128,6 +135,7 @@ def compute_subset(self, context: AutomationContext) -> EntitySubset[AssetKey]:


@whitelist_for_serdes
@record
class NewlyUpdatedCondition(SubsetAutomationCondition[AssetKey]):
@property
def description(self) -> str:
Expand All @@ -148,6 +156,7 @@ def compute_subset(self, context: AutomationContext) -> EntitySubset[AssetKey]:


@whitelist_for_serdes
@record
class CronTickPassedCondition(SubsetAutomationCondition):
cron_schedule: str
cron_timezone: str
Expand Down Expand Up @@ -182,6 +191,7 @@ def compute_subset(self, context: AutomationContext) -> EntitySubset:


@whitelist_for_serdes
@record
class InLatestTimeWindowCondition(SubsetAutomationCondition[AssetKey]):
serializable_lookback_timedelta: Optional[SerializableTimeDelta] = None

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,11 @@
BuiltinAutomationCondition,
)
from dagster._core.definitions.declarative_automation.automation_context import AutomationContext
from dagster._record import record
from dagster._serdes.serdes import whitelist_for_serdes


@record
class DownstreamConditionWrapperCondition(BuiltinAutomationCondition[AssetKey]):
"""Wrapper object which evaluates a condition against a dependency and returns a subset
representing the subset of downstream asset which has at least one parent which evaluated to
Expand Down Expand Up @@ -45,6 +47,7 @@ def evaluate(self, context: AutomationContext[AssetKey]) -> AutomationResult[Ass


@whitelist_for_serdes
@record
class AnyDownstreamConditionsCondition(BuiltinAutomationCondition[AssetKey]):
@property
def description(self) -> str:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,12 @@
BuiltinAutomationCondition,
)
from dagster._core.definitions.declarative_automation.automation_context import AutomationContext
from dagster._record import record
from dagster._serdes.serdes import whitelist_for_serdes


@whitelist_for_serdes(storage_name="AndAssetCondition")
@record
class AndAutomationCondition(BuiltinAutomationCondition[T_EntityKey]):
"""This class represents the condition that all of its children evaluate to true."""

Expand Down Expand Up @@ -59,6 +61,7 @@ def without(self, condition: AutomationCondition) -> "AndAutomationCondition":


@whitelist_for_serdes(storage_name="OrAssetCondition")
@record
class OrAutomationCondition(BuiltinAutomationCondition[T_EntityKey]):
"""This class represents the condition that any of its children evaluate to true."""

Expand Down Expand Up @@ -95,6 +98,7 @@ def evaluate(self, context: AutomationContext[T_EntityKey]) -> AutomationResult[


@whitelist_for_serdes(storage_name="NotAssetCondition")
@record
class NotAutomationCondition(BuiltinAutomationCondition[T_EntityKey]):
"""This class represents the condition that none of its children evaluate to true."""

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,11 @@
BuiltinAutomationCondition,
)
from dagster._core.definitions.declarative_automation.automation_context import AutomationContext
from dagster._record import record
from dagster._serdes.serdes import whitelist_for_serdes


@record
class CheckConditionWrapperCondition(BuiltinAutomationCondition[AssetKey]):
check_key: AssetCheckKey
operand: AutomationCondition[AssetCheckKey]
Expand All @@ -38,6 +40,8 @@ def evaluate(self, context: AutomationContext[AssetKey]) -> AutomationResult[Ass
)


@whitelist_for_serdes
@record
class ChecksCondition(BuiltinAutomationCondition[AssetKey]):
operand: AutomationCondition[AssetCheckKey]

Expand Down Expand Up @@ -71,6 +75,7 @@ def _get_check_keys(


@whitelist_for_serdes
@record
class AnyChecksCondition(ChecksCondition):
@property
def base_description(self) -> str:
Expand Down Expand Up @@ -105,6 +110,7 @@ def evaluate(self, context: AutomationContext[AssetKey]) -> AutomationResult[Ass


@whitelist_for_serdes
@record
class AllChecksCondition(ChecksCondition):
@property
def base_description(self) -> str:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,14 @@
BuiltinAutomationCondition,
)
from dagster._core.definitions.declarative_automation.automation_context import AutomationContext
from dagster._record import copy, record
from dagster._serdes.serdes import whitelist_for_serdes

if TYPE_CHECKING:
from dagster._core.definitions.asset_selection import AssetSelection


@record
class DepConditionWrapperCondition(BuiltinAutomationCondition[T_EntityKey]):
"""Wrapper object which evaluates a condition against a dependency and returns a subset
representing the subset of downstream asset which has at least one parent which evaluated to
Expand Down Expand Up @@ -51,6 +53,7 @@ def evaluate(self, context: AutomationContext[T_EntityKey]) -> AutomationResult[
)


@record
class DepCondition(BuiltinAutomationCondition[T_EntityKey]):
operand: AutomationCondition

Expand Down Expand Up @@ -85,7 +88,7 @@ def allow(self, selection: "AssetSelection") -> "DepCondition":
allow_selection = (
selection if self.allow_selection is None else selection | self.allow_selection
)
return self.model_copy(update={"allow_selection": allow_selection})
return copy(self, allow_selection=allow_selection)

def ignore(self, selection: "AssetSelection") -> "DepCondition":
"""Returns a copy of this condition that will ignore dependencies within the provided
Expand All @@ -97,7 +100,7 @@ def ignore(self, selection: "AssetSelection") -> "DepCondition":
ignore_selection = (
selection if self.ignore_selection is None else selection | self.ignore_selection
)
return self.model_copy(update={"ignore_selection": ignore_selection})
return copy(self, ignore_selection=ignore_selection)

def _get_dep_keys(
self, key: T_EntityKey, asset_graph: BaseAssetGraph[BaseAssetNode]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,12 @@
BuiltinAutomationCondition,
)
from dagster._core.definitions.declarative_automation.automation_context import AutomationContext
from dagster._record import record
from dagster._serdes.serdes import whitelist_for_serdes


@whitelist_for_serdes
@record
class NewlyTrueCondition(BuiltinAutomationCondition[T_EntityKey]):
operand: AutomationCondition[T_EntityKey]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,12 @@
BuiltinAutomationCondition,
)
from dagster._core.definitions.declarative_automation.automation_context import AutomationContext
from dagster._record import record
from dagster._serdes.serdes import whitelist_for_serdes


@whitelist_for_serdes
@record
class SinceCondition(BuiltinAutomationCondition[T_EntityKey]):
trigger_condition: AutomationCondition[T_EntityKey]
reset_condition: AutomationCondition[T_EntityKey]
Expand Down

0 comments on commit 0097003

Please sign in to comment.