Skip to content

Commit

Permalink
Update first tick behavior of AutomationCondition.eager() (#24700)
Browse files Browse the repository at this point in the history
## Summary & Motivation

Updates the behavior of the `newly_true()` condition to optionally change its "baseline" assumption of what was true before the first evaluation. This allows us to better fulfill the promise of "only reacting to changes which happened after the condition was added to the asset", as we do for the "deps updated" part of the condition.

In short, this provides a strong bit of protection against the "surprise backfill" issue, without having to add any new rate-limiting concepts to the system.

## How I Tested These Changes

## Changelog

AutomationCondition.eager() will now only launch runs for missing partitions which become missing *after* the condition has been added to the asset. This avoids situations in which the eager policy kicks off a large amount of work when added to an asset with many missing historical static/dynamic partitions.

- [x] `NEW` _(added new feature or capability)_
- [ ] `BUGFIX` _(fixed a bug)_
- [ ] `DOCS` _(added or updated documentation)_
  • Loading branch information
OwenKephart authored Oct 7, 2024
1 parent 52816a4 commit 888a66f
Show file tree
Hide file tree
Showing 17 changed files with 325 additions and 56 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -676,7 +676,7 @@ def A() -> None: ...
assert record["numRequested"] == 0

# all nodes in the tree
assert len(record["evaluationNodes"]) == 27
assert len(record["evaluationNodes"]) == 28

rootNode = record["evaluationNodes"][0]
assert rootNode["uniqueId"] == record["rootUniqueId"]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
CodeVersionChangedCondition,
CronTickPassedCondition,
FailedAutomationCondition,
InitialEvaluationCondition,
InLatestTimeWindowCondition,
InProgressAutomationCondition,
MissingAutomationCondition,
Expand Down Expand Up @@ -134,31 +135,34 @@ def get_snapshot(
self, *, parent_unique_id: Optional[str] = None, index: Optional[int] = None
) -> AutomationConditionSnapshot:
"""Returns a serializable snapshot of the entire AutomationCondition tree."""
unique_id = self.get_unique_id(parent_unique_id=parent_unique_id, index=index)
unique_id = self.get_node_unique_id(parent_unique_id=parent_unique_id, index=index)
node_snapshot = self.get_node_snapshot(unique_id)
children = [
child.get_snapshot(parent_unique_id=unique_id, index=i)
for (i, child) in enumerate(self.children)
]
return AutomationConditionSnapshot(node_snapshot=node_snapshot, children=children)

def get_unique_id(self, *, parent_unique_id: Optional[str], index: Optional[int]) -> str:
def get_node_unique_id(self, *, parent_unique_id: Optional[str], index: Optional[int]) -> str:
"""Returns a unique identifier for this condition within the broader condition tree."""
parts = [str(parent_unique_id), str(index), self.__class__.__name__, self.description]
return non_secure_md5_hash_str("".join(parts).encode())

def get_hash(
self, *, parent_unique_id: Optional[str] = None, index: Optional[int] = None
) -> int:
"""Generates a hash based off of the unique ids of all children."""
unique_id = self.get_unique_id(parent_unique_id=parent_unique_id, index=index)
hashes = [hash(unique_id)]
for i, child in enumerate(self.children):
hashes.append(child.get_hash(parent_unique_id=unique_id, index=i))
return hash(tuple(hashes))
def get_unique_id(
self, *, parent_node_unique_id: Optional[str] = None, index: Optional[int] = None
) -> str:
"""Returns a unique identifier for the entire subtree."""
node_unique_id = self.get_node_unique_id(
parent_unique_id=parent_node_unique_id, index=index
)
child_unique_ids = [
child.get_unique_id(parent_node_unique_id=node_unique_id, index=i)
for i, child in enumerate(self.children)
]
return non_secure_md5_hash_str("".join([node_unique_id, *child_unique_ids]).encode())

def __hash__(self) -> int:
return self.get_hash()
return hash(self.get_unique_id())

@property
def has_rule_condition(self) -> bool:
Expand Down Expand Up @@ -235,13 +239,15 @@ def newly_true(self) -> "NewlyTrueCondition[T_EntityKey]":

def since_last_handled(self: "AutomationCondition[AssetKey]") -> "SinceCondition[AssetKey]":
"""Returns an AutomationCondition that is true if this condition has become true since the
last time this asset partition was requested or updated.
asset partition was last requested or updated, and since the last time this entity's
condition was modified.
"""
with disable_dagster_warnings():
return self.since(
(
AutomationCondition[AssetKey].newly_requested()
| AutomationCondition[AssetKey].newly_updated()
AutomationCondition.newly_requested()
| AutomationCondition.newly_updated()
| AutomationCondition.initial_evaluation()
).with_label("handled")
)

Expand Down Expand Up @@ -364,6 +370,17 @@ def failed() -> "FailedAutomationCondition":

return FailedAutomationCondition()

@public
@experimental
@staticmethod
def initial_evaluation() -> "InitialEvaluationCondition":
"""Returns an AutomationCondition that is true on the first evaluation of the expression."""
from dagster._core.definitions.declarative_automation.operands import (
InitialEvaluationCondition,
)

return InitialEvaluationCondition()

@public
@experimental
@staticmethod
Expand Down Expand Up @@ -507,7 +524,8 @@ def eager() -> "AndAutomationCondition[AssetKey]":
"""Returns an AutomationCondition which will cause missing asset partitions to be
materialized, and will materialize asset partitions whenever their parents are updated.
For time partitioned assets, only the latest time partition will be considered.
Will only materialize missing partitions if they become missing after this condition is
added to an asset. For time partitioned assets, only the latest time partition will be considered.
This will never evaluate to true if the asset has any upstream partitions which are missing
or part of an in progress run, and will never evaluate to true if the provided asset partition
Expand Down Expand Up @@ -552,7 +570,8 @@ def on_missing() -> "AndAutomationCondition[AssetKey]":
"""Returns an AutomationCondition which will cause missing asset partitions to be materialized as soon as possible,
after all of their dependencies have been materialized.
For time partitioned assets, only the latest time partition will be considered.
Will only materialize missing partitions if they become missing after this condition is
added to an asset. For time partitioned assets, only the latest time partition will be considered.
"""
with disable_dagster_warnings():
return (
Expand All @@ -561,7 +580,7 @@ def on_missing() -> "AndAutomationCondition[AssetKey]":
AutomationCondition.missing()
.newly_true()
.since_last_handled()
.with_label("missing_since_last_requested")
.with_label("missing_since_last_handled")
)
& ~AutomationCondition.any_deps_missing()
).with_label("on_missing")
Expand Down Expand Up @@ -592,9 +611,6 @@ def with_label(self, label: Optional[str]) -> Self:
"""Returns a copy of this AutomationCondition with a human-readable label."""
return copy(self, label=label)

def __hash__(self) -> int:
return self.get_hash()


class AutomationResult(Generic[T_EntityKey]):
"""The result of evaluating an AutomationCondition."""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
)
from dagster._core.definitions.definitions_class import Definitions
from dagster._core.instance import DagsterInstance
from dagster._serdes.serdes import deserialize_value, serialize_value


class EvaluateAutomationConditionsResult:
Expand Down Expand Up @@ -126,7 +127,10 @@ def my_dbt_assets(context: AssetExecutionContext, dbt: DbtCliResource):
evaluation_time=evaluation_time,
allow_backfills=False,
logger=logging.getLogger("dagster.automation_condition_tester"),
cursor=cursor or AssetDaemonCursor.empty(),
# round-trip the provided cursor to simulate actual usage
cursor=deserialize_value(serialize_value(cursor), AssetDaemonCursor)
if cursor
else AssetDaemonCursor.empty(),
)
results, requested_subsets = evaluator.evaluate()
cursor = AssetDaemonCursor(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ def create(key: EntityKey, evaluator: "AutomationConditionEvaluator") -> "Automa
condition = check.not_none(
evaluator.asset_graph.get(key).automation_condition or evaluator.default_condition
)
condition_unqiue_id = condition.get_unique_id(parent_unique_id=None, index=None)
condition_unqiue_id = condition.get_node_unique_id(parent_unique_id=None, index=None)

if condition.has_rule_condition and evaluator.allow_backfills:
raise DagsterInvalidDefinitionError(
Expand Down Expand Up @@ -96,7 +96,7 @@ def for_child_condition(
child_index: int,
candidate_subset: EntitySubset[U_EntityKey],
) -> "AutomationContext[U_EntityKey]":
condition_unqiue_id = child_condition.get_unique_id(
condition_unqiue_id = child_condition.get_node_unique_id(
parent_unique_id=self.condition_unique_id, index=child_index
)
return AutomationContext(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ def create(asset_key: AssetKey, evaluator: "AutomationConditionEvaluator"):
condition=condition,
cursor=cursor,
node_cursor=cursor.node_cursors_by_unique_id.get(
condition.get_unique_id(parent_unique_id=None, index=0)
condition.get_node_unique_id(parent_unique_id=None, index=0)
)
if cursor
else None,
Expand Down Expand Up @@ -242,17 +242,17 @@ def _previous_tick_discarded_subset(self) -> Optional[SerializableEntitySubset[A
# Or(MaterializeCond, Not(SkipCond), Not(DiscardCond))
if len(self.condition.children) != 3:
return None
unique_id = self.condition.get_unique_id(parent_unique_id=None, index=None)
unique_id = self.condition.get_node_unique_id(parent_unique_id=None, index=None)

# get Not(DiscardCond)
not_discard_condition = self.condition.children[2]
unique_id = not_discard_condition.get_unique_id(parent_unique_id=unique_id, index=2)
unique_id = not_discard_condition.get_node_unique_id(parent_unique_id=unique_id, index=2)
if not isinstance(not_discard_condition, NotAutomationCondition):
return None

# get DiscardCond
discard_condition = not_discard_condition.children[0]
unique_id = discard_condition.get_unique_id(parent_unique_id=unique_id, index=0)
unique_id = discard_condition.get_node_unique_id(parent_unique_id=unique_id, index=0)
if not isinstance(discard_condition, RuleCondition) or not isinstance(
discard_condition.rule, DiscardOnMaxMaterializationsExceededRule
):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ class RuleCondition(BuiltinAutomationCondition[AssetKey]):

rule: AutoMaterializeRule

def get_unique_id(self, *, parent_unique_id: Optional[str], index: Optional[str]) -> str:
def get_node_unique_id(self, *, parent_unique_id: Optional[str], index: Optional[str]) -> str:
# preserves old (bad) behavior of not including the parent_unique_id to avoid inavlidating
# old serialized information
parts = [self.rule.__class__.__name__, self.description]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from dagster._core.definitions.declarative_automation.operands.slice_conditions import (
CronTickPassedCondition as CronTickPassedCondition,
FailedAutomationCondition as FailedAutomationCondition,
InitialEvaluationCondition as InitialEvaluationCondition,
InLatestTimeWindowCondition as InLatestTimeWindowCondition,
InProgressAutomationCondition as InProgressAutomationCondition,
MissingAutomationCondition as MissingAutomationCondition,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,28 @@
from dagster._utils.schedules import reverse_cron_string_iterator


@record
@whitelist_for_serdes
class InitialEvaluationCondition(BuiltinAutomationCondition):
"""Condition to determine if this is the initial evaluation of a given AutomationCondition."""

@property
def description(self) -> str:
return "Initial evaluation"

@property
def name(self) -> str:
return "initial_evaluation"

def evaluate(self, context: AutomationContext) -> AutomationResult:
condition_tree_id = context.root_context.condition.get_unique_id()
if context.previous_true_subset is None or condition_tree_id != context.cursor:
subset = context.candidate_subset
else:
subset = context.get_empty_subset()
return AutomationResult(context, subset, cursor=condition_tree_id)


@record
class SubsetAutomationCondition(BuiltinAutomationCondition[T_EntityKey]):
"""Base class for simple conditions which compute a simple subset of the asset graph."""
Expand Down
4 changes: 2 additions & 2 deletions python_modules/dagster/dagster/_core/definitions/partition.py
Original file line number Diff line number Diff line change
Expand Up @@ -1337,6 +1337,6 @@ def empty_subset(
return self.partitions_def.empty_subset()

def to_serializable_subset(self) -> PartitionsSubset:
return self.partitions_def.subset_with_partition_keys(
self.get_partition_keys()
return self.partitions_def.subset_with_all_partitions(
current_time=self.current_time, dynamic_partitions_store=self.dynamic_partitions_store
).to_serializable_subset()
Original file line number Diff line number Diff line change
Expand Up @@ -964,6 +964,22 @@ def partitions_subset_class(self) -> Type["PartitionsSubset"]:
def empty_subset(self) -> "PartitionsSubset":
return self.partitions_subset_class.empty_subset(self)

def subset_with_all_partitions(
self,
current_time: Optional[datetime] = None,
dynamic_partitions_store: Optional[DynamicPartitionsStore] = None,
) -> "PartitionsSubset":
first_window = self.get_first_partition_window(current_time)
last_window = self.get_last_partition_window(current_time)
windows = (
[]
if first_window is None or last_window is None
else [TimeWindow(first_window.start, last_window.end)]
)
return TimeWindowPartitionsSubset(
partitions_def=self, num_partitions=None, included_time_windows=windows
)

def get_serializable_unique_identifier(
self, dynamic_partitions_store: Optional[DynamicPartitionsStore] = None
) -> str:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,12 @@
from dagster import AutomationCondition
from dagster import (
AssetKey,
AutomationCondition,
Definitions,
StaticPartitionsDefinition,
asset,
evaluate_automation_conditions,
)
from dagster._core.instance_for_test import instance_for_test

from dagster_tests.definitions_tests.declarative_automation_tests.scenario_utils.automation_condition_scenario import (
AutomationConditionScenarioState,
Expand Down Expand Up @@ -108,3 +116,36 @@ def test_eager_hourly_partitioned() -> None:
# B does not get immediately requested again
state, result = state.evaluate("B")
assert result.true_subset.size == 0


def test_eager_static_partitioned() -> None:
two_partitions = StaticPartitionsDefinition(["a", "b"])
four_partitions = StaticPartitionsDefinition(["a", "b", "c", "d"])

def _get_defs(pd: StaticPartitionsDefinition) -> Definitions:
@asset(partitions_def=pd, automation_condition=AutomationCondition.eager())
def A() -> None: ...

@asset(partitions_def=pd, automation_condition=AutomationCondition.eager())
def B() -> None: ...

return Definitions(assets=[A, B])

with instance_for_test() as instance:
# no "surprise backfill"
result = evaluate_automation_conditions(defs=_get_defs(two_partitions), instance=instance)
assert result.total_requested == 0

# now add two more partitions to the definition, kick off a run for those
result = evaluate_automation_conditions(
defs=_get_defs(four_partitions), instance=instance, cursor=result.cursor
)
assert result.total_requested == 4
assert result.get_requested_partitions(AssetKey("A")) == {"c", "d"}
assert result.get_requested_partitions(AssetKey("B")) == {"c", "d"}

# already requested, no more
result = evaluate_automation_conditions(
defs=_get_defs(four_partitions), instance=instance, cursor=result.cursor
)
assert result.total_requested == 0
Loading

0 comments on commit 888a66f

Please sign in to comment.