Skip to content

Commit

Permalink
[DA] Add without() method to AndAutomationCondition (#24226)
Browse files Browse the repository at this point in the history
## Summary & Motivation

This adds a convenient way for users to remove operands from a top-level expression. A common use case here would be to remove the "in_latest_time_window" condition from one of the built-in expressions.

## How I Tested These Changes

## Changelog [New]

Added a new `.without()` method to `AutomationCondition.eager()`, `AutomationCondition.on_cron()`, and `AutomationCondition.on_missing()` which allows sub-conditions to be removed, e.g. `AutomationCondition.eager().without(AutomationCondition.in_latest_time_window())`.
  • Loading branch information
OwenKephart authored Sep 16, 2024
1 parent 95825f4 commit aed17e5
Show file tree
Hide file tree
Showing 4 changed files with 84 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -439,7 +439,7 @@ def all_deps_updated_since_cron(
@public
@experimental
@staticmethod
def eager() -> "AutomationCondition":
def eager() -> "AndAutomationCondition":
"""Returns an AutomationCondition which will cause missing asset partitions to be
materialized, and will materialize asset partitions whenever their parents are updated.
Expand All @@ -463,7 +463,7 @@ def eager() -> "AutomationCondition":
@public
@experimental
@staticmethod
def on_cron(cron_schedule: str, cron_timezone: str = "UTC") -> "AutomationCondition":
def on_cron(cron_schedule: str, cron_timezone: str = "UTC") -> "AndAutomationCondition":
"""Returns an AutomationCondition which will cause asset partitions to be materialized
on a given cron schedule, after all of their dependencies have been updated since the latest
tick of that cron schedule.
Expand All @@ -482,7 +482,7 @@ def on_cron(cron_schedule: str, cron_timezone: str = "UTC") -> "AutomationCondit
@public
@experimental
@staticmethod
def on_missing() -> "AutomationCondition":
def on_missing() -> "AndAutomationCondition":
"""Returns an AutomationCondition which will cause missing asset partitions to be materialized as soon as possible,
after all of their dependencies have been materialized.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from typing import List, Optional, Sequence

import dagster._check as check
from dagster._core.definitions.declarative_automation.automation_condition import (
AutomationCondition,
AutomationResult,
Expand Down Expand Up @@ -45,6 +46,18 @@ def evaluate(self, context: AutomationContext) -> AutomationResult:
true_slice = true_slice.compute_intersection(child_result.true_slice)
return AutomationResult(context, true_slice, child_results=child_results)

def without(self, condition: AutomationCondition) -> "AndAutomationCondition":
"""Returns a copy of this condition without the specified child condition."""
check.param_invariant(
condition in self.operands, "condition", "Condition not found in operands"
)
operands = [child for child in self.operands if child != condition]
if len(operands) < 2:
check.failed("Cannot have fewer than 2 operands in an AndAutomationCondition")
return AndAutomationCondition(
operands=[child for child in self.operands if child != condition]
)


@whitelist_for_serdes(storage_name="OrAssetCondition")
@record
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,3 +97,31 @@ def test_on_missing_hourly_partitioned() -> None:
state = state.with_runs(run_request("A", "2020-02-02-01:00"))
state, result = state.evaluate("B")
assert result.true_subset.size == 0


def test_on_missing_without_time_limit() -> None:
state = (
AutomationConditionScenarioState(
two_assets_in_sequence,
automation_condition=AutomationCondition.on_missing().without(
AutomationCondition.in_latest_time_window()
),
ensure_empty_result=False,
)
.with_asset_properties(partitions_def=hourly_partitions_def)
.with_current_time("2020-02-02T01:05:00")
)

# parent hasn't updated yet
state, result = state.evaluate("B")
assert result.true_subset.size == 0

# historical parents updated, matters
state = state.with_runs(run_request("A", "2019-07-05-00:00"))
state = state.with_runs(run_request("A", "2019-04-05-00:00"))
state, result = state.evaluate("B")
assert result.true_subset.size == 2

# B has been requested, so don't request again
state, result = state.evaluate("B")
assert result.true_subset.size == 0
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
import datetime

import pytest
from dagster import AutoMaterializePolicy, AutomationCondition, Definitions, asset
from dagster._check.functions import CheckError
from dagster._core.definitions.declarative_automation.automation_condition import AutomationResult
from dagster._core.definitions.declarative_automation.automation_context import AutomationContext
from dagster._core.remote_representation.external_data import (
Expand Down Expand Up @@ -137,3 +141,39 @@ def test_label_automation_condition() -> None:
assert not_missing_and_not_in_progress.get_node_snapshot("").label == "Blah"
assert not_missing_and_not_in_progress.children[0].label == "Not missing"
assert not_missing_and_not_in_progress.children[1].label == "Not in progress"


def test_without_automation_condition() -> None:
a = AutomationCondition.in_latest_time_window()
b = AutomationCondition.any_deps_match(AutomationCondition.in_progress())
c = ~AutomationCondition.any_deps_in_progress()

orig = a & b & c

# simple cases
assert orig.without(a) == b & c
assert orig.without(b) == a & c
assert orig.without(c) == a & b

# ensure works if using different instances of the same operands
assert orig.without(AutomationCondition.in_latest_time_window()) == b & c
assert orig.without(~AutomationCondition.any_deps_in_progress()) == a & b

# make sure it errors if an invalid condition is passed
with pytest.raises(CheckError, match="Condition not found"):
orig.without(AutomationCondition.in_progress())

with pytest.raises(CheckError, match="Condition not found"):
orig.without(
AutomationCondition.any_deps_match(
AutomationCondition.in_progress() | AutomationCondition.missing()
)
)

with pytest.raises(CheckError, match="Condition not found"):
orig.without(
AutomationCondition.in_latest_time_window(lookback_delta=datetime.timedelta(hours=3))
)

with pytest.raises(CheckError, match="fewer than 2 operands"):
orig.without(a).without(b)

0 comments on commit aed17e5

Please sign in to comment.