Skip to content

Commit

Permalink
Add executed_with_root_target condition to handle partial runs / fa…
Browse files Browse the repository at this point in the history
…ilures (dagster-io#25253)

## Summary & Motivation

Resolves: dagster-io#24389

To put it simply, this ensures the `any_deps_updated` condition does not "count" an update from a parent if the run that materialized that parent also planned to materialize the child.

This guards against a variety of situations where a run can be launched with the intention of updating many assets, but fail to do so for whatever reason (intentional skip, failure event). In those cases, the assumption is that the child should NOT be attempted again (even though the parent did successfully materialize).

This is a fairly niche implementation detail that users should not have to think about when crafting their own policies.

## How I Tested These Changes

I added a parameterized test that failed before this change (for the skip and fail cases) and passes now.

## Changelog

Fixed an issue with `AutomationCondition.eager()` that could cause it to attempt to launch a second attempt of an asset in cases where it was skipped or failed during a run where one of its parents successfully materialized.
  • Loading branch information
OwenKephart authored and Grzyblon committed Oct 26, 2024
1 parent ba44cd2 commit f3c03c9
Show file tree
Hide file tree
Showing 7 changed files with 175 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -676,7 +676,7 @@ def A() -> None: ...
assert record["numRequested"] == 0

# all nodes in the tree
assert len(record["evaluationNodes"]) == 32
assert len(record["evaluationNodes"]) == 35

rootNode = record["evaluationNodes"][0]
assert rootNode["uniqueId"] == record["rootUniqueId"]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
from dagster._core.definitions.partition import PartitionsDefinition
from dagster._core.instance import DagsterInstance
from dagster._core.storage.asset_check_execution_record import AssetCheckExecutionResolvedStatus
from dagster._core.storage.dagster_run import RunRecord
from dagster._utils.caching_instance_queryer import CachingInstanceQueryer


Expand Down Expand Up @@ -475,6 +476,86 @@ def compute_missing_subset(self, *, key: EntityKey, from_subset: EntitySubset) -
),
)

def _expensively_filter_entity_subset(
self, subset: EntitySubset, filter_fn: Callable[[Optional[str]], bool]
) -> EntitySubset:
if subset.is_partitioned:
return subset.compute_intersection_with_partition_keys(
{pk for pk in subset.expensively_compute_partition_keys() if filter_fn(pk)}
)
else:
return (
subset
if not subset.is_empty and filter_fn(None)
else self.get_empty_subset(key=subset.key)
)

def _run_record_targets_entity(self, run_record: "RunRecord", target_key: EntityKey) -> bool:
asset_selection = run_record.dagster_run.asset_selection or set()
check_selection = run_record.dagster_run.asset_check_selection or set()
return target_key in (asset_selection | check_selection)

def _compute_latest_check_run_executed_with_target(
self, partition_key: Optional[str], query_key: AssetCheckKey, target_key: EntityKey
) -> bool:
from dagster._core.storage.asset_check_execution_record import AssetCheckExecutionRecord
from dagster._core.storage.dagster_run import RunRecord

check.invariant(partition_key is None, "Partitioned checks not supported")
check_record = AssetCheckExecutionRecord.blocking_get(self, query_key)
if check_record and check_record.event:
run_record = RunRecord.blocking_get(self, check_record.event.run_id)
return bool(run_record) and self._run_record_targets_entity(run_record, target_key)
else:
return False

def _compute_latest_asset_run_executed_with_target(
self, partition_key: Optional[str], query_key: AssetKey, target_key: EntityKey
) -> bool:
from dagster._core.storage.dagster_run import RunRecord
from dagster._core.storage.event_log.base import AssetRecord

asset_record = AssetRecord.blocking_get(self, query_key)
if (
asset_record
and asset_record.asset_entry.last_materialization
and asset_record.asset_entry.last_materialization.asset_materialization
and asset_record.asset_entry.last_materialization.asset_materialization.partition
== partition_key
):
run_record = RunRecord.blocking_get(
self, asset_record.asset_entry.last_materialization.run_id
)
return bool(run_record) and self._run_record_targets_entity(run_record, target_key)
else:
return False

def compute_latest_run_executed_with_subset(
self, from_subset: EntitySubset, target: EntityKey
) -> EntitySubset:
"""Computes the subset of from_subset for which the latest run also targeted
the provided target EntityKey.
"""
return _dispatch(
key=from_subset.key,
check_method=lambda k: self._expensively_filter_entity_subset(
from_subset,
filter_fn=functools.partial(
self._compute_latest_check_run_executed_with_target,
query_key=k,
target_key=target,
),
),
asset_method=lambda k: self._expensively_filter_entity_subset(
from_subset,
filter_fn=functools.partial(
self._compute_latest_asset_run_executed_with_target,
query_key=k,
target_key=target,
),
),
)

def _compute_updated_since_cursor_subset(
self, key: AssetKey, cursor: Optional[int]
) -> EntitySubset[AssetKey]:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -441,6 +441,18 @@ def newly_updated() -> "BuiltinAutomationCondition":

return NewlyUpdatedCondition()

@experimental
@staticmethod
def executed_with_root_target() -> "BuiltinAutomationCondition":
"""Returns an AutomationCondition that is true if the latest run that updated the target also executed
with the root key that the global condition is applied to.
"""
from dagster._core.definitions.declarative_automation.operands import (
LatestRunExecutedWithRootTargetCondition,
)

return LatestRunExecutedWithRootTargetCondition()

@public
@experimental
@staticmethod
Expand Down Expand Up @@ -496,7 +508,14 @@ def any_deps_updated() -> "DepsAutomationCondition":
"""
with disable_dagster_warnings():
return AutomationCondition.any_deps_match(
AutomationCondition.newly_updated() | AutomationCondition.will_be_requested()
(
AutomationCondition.newly_updated()
# executed_with_root_target is fairly expensive on a per-partition basis,
# but newly_updated is bounded in the number of partitions that might be
# updated on a single tick
& ~AutomationCondition.executed_with_root_target()
).with_label("newly_updated_without_root")
| AutomationCondition.will_be_requested()
).with_label("any_deps_updated")

@experimental
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
ExecutionFailedAutomationCondition as ExecutionFailedAutomationCondition,
InitialEvaluationCondition as InitialEvaluationCondition,
InLatestTimeWindowCondition as InLatestTimeWindowCondition,
LatestRunExecutedWithRootTargetCondition as LatestRunExecutedWithRootTargetCondition,
MissingAutomationCondition as MissingAutomationCondition,
NewlyRequestedCondition as NewlyRequestedCondition,
NewlyUpdatedCondition as NewlyUpdatedCondition,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,19 @@ def compute_subset(self, context: AutomationContext) -> EntitySubset:
return context.previous_requested_subset or context.get_empty_subset()


@whitelist_for_serdes
@record
class LatestRunExecutedWithRootTargetCondition(SubsetAutomationCondition):
@property
def name(self) -> str:
return "executed_with_root_target"

def compute_subset(self, context: AutomationContext) -> EntitySubset:
return context.asset_graph_view.compute_latest_run_executed_with_subset(
from_subset=context.candidate_subset, target=context.root_context.key
)


@whitelist_for_serdes
@record
class NewlyUpdatedCondition(SubsetAutomationCondition):
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import pytest
from dagster import (
AssetDep,
AssetKey,
Expand All @@ -9,6 +10,7 @@
DimensionPartitionMapping,
MultiPartitionMapping,
MultiPartitionsDefinition,
Output,
StaticPartitionsDefinition,
TimeWindowPartitionMapping,
asset,
Expand Down Expand Up @@ -240,3 +242,55 @@ def foo_check() -> ...: ...
# don't keep requesting
result = evaluate_automation_conditions(defs=defs, instance=instance, cursor=result.cursor)
assert result.total_requested == 0


@pytest.mark.parametrize("b_result", ["skip", "fail", "materialize"])
def test_eager_partial_run(b_result: str) -> None:
@asset
def root() -> None: ...

@asset(deps=[root], automation_condition=AutomationCondition.eager())
def A() -> None: ...

@asset(deps=[A], output_required=False, automation_condition=AutomationCondition.eager())
def B():
if b_result == "skip":
pass
elif b_result == "materialize":
yield Output(1)
else:
return 1 / 0

@asset(deps=[B], automation_condition=AutomationCondition.eager())
def C() -> None: ...

defs = Definitions(assets=[root, A, B, C])
instance = DagsterInstance.ephemeral()

# nothing updated yet
result = evaluate_automation_conditions(defs=defs, instance=instance)
assert result.total_requested == 0

# now root updated, so request a, b, and c
instance.report_runless_asset_event(AssetMaterialization("root"))
result = evaluate_automation_conditions(defs=defs, instance=instance, cursor=result.cursor)
assert result.total_requested == 3

# don't keep requesting
result = evaluate_automation_conditions(defs=defs, instance=instance, cursor=result.cursor)
assert result.total_requested == 0

# now simulate the above run, B / C will not be materialized
defs.get_implicit_global_asset_job_def().execute_in_process(
instance=instance, asset_selection=[A.key, B.key, C.key], raise_on_error=False
)
result = evaluate_automation_conditions(defs=defs, instance=instance, cursor=result.cursor)
# A gets materialized, but this shouldn't kick off B and C
assert result.total_requested == 0

# A gets materialized on its own, do kick off B and C
defs.get_implicit_global_asset_job_def().execute_in_process(
instance=instance, asset_selection=[A.key]
)
result = evaluate_automation_conditions(defs=defs, instance=instance, cursor=result.cursor)
assert result.total_requested == 2
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,11 @@
("dd74c7cfe19d869931ea4aad9ee10127", SC.on_cron("0 * * * *"), two_parents, False),
("861f8e40d4624d49c4ebdd034c8e1e84", SC.on_cron("0 * * * *"), two_parents_daily, False),
# same as above
("dfb268e321e2e7aa7b0e2e71fa674e06", SC.eager(), one_parent, False),
("781252e1a53db1ecd5938b0da61dba0b", SC.eager(), one_parent, True),
("293186887409aac2fe99b09bd633c64b", SC.eager(), one_parent_daily, False),
("c92d9d5181b4d0a6c7ab5d1c6e26962a", SC.eager(), two_parents, False),
("911bcc4f8904ec6dae85f6aaf78f5ee5", SC.eager(), two_parents_daily, False),
("9b4c0a55b5fbea860f72ecc9d7a27e0e", SC.eager(), one_parent, False),
("6925b7770eb2122048e4f50aa0a3303f", SC.eager(), one_parent, True),
("45c2493882264dd06bee6c9abffabf3e", SC.eager(), one_parent_daily, False),
("81132ce4972e8ac8d8be1a0e468637c0", SC.eager(), two_parents, False),
("724d21fef8f7404f1e26b40ddeab711d", SC.eager(), two_parents_daily, False),
# missing condition is invariant to changes other than partitions def changes
("6d7809c4949e3d812d7eddfb1b60d529", SC.missing(), one_parent, False),
("6d7809c4949e3d812d7eddfb1b60d529", SC.missing(), one_parent, True),
Expand Down

0 comments on commit f3c03c9

Please sign in to comment.