Skip to content

Commit

Permalink
Add executed_with_root_target condition to handle partial runs / fa…
Browse files Browse the repository at this point in the history
…ilures
  • Loading branch information
OwenKephart committed Oct 14, 2024
1 parent dce5c4f commit 4e4b437
Show file tree
Hide file tree
Showing 5 changed files with 166 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
from dagster._core.definitions.partition import PartitionsDefinition
from dagster._core.instance import DagsterInstance
from dagster._core.storage.asset_check_execution_record import AssetCheckExecutionResolvedStatus
from dagster._core.storage.dagster_run import RunRecord
from dagster._utils.caching_instance_queryer import CachingInstanceQueryer


Expand Down Expand Up @@ -465,6 +466,86 @@ def compute_missing_subset(self, *, key: EntityKey, from_subset: EntitySubset) -
),
)

def _expensively_filter_entity_subset(
self, subset: EntitySubset, filter_fn: Callable[[Optional[str]], bool]
) -> EntitySubset:
if subset.is_partitioned:
return subset.compute_intersection_with_partition_keys(
{pk for pk in subset.expensively_compute_partition_keys() if filter_fn(pk)}
)
else:
return (
subset
if not subset.is_empty and filter_fn(None)
else self.get_empty_subset(key=subset.key)
)

def _run_record_targets_entity(self, run_record: "RunRecord", target_key: EntityKey) -> bool:
asset_selection = run_record.dagster_run.asset_selection or set()
check_selection = run_record.dagster_run.asset_check_selection or set()
return target_key in (asset_selection | check_selection)

def _compute_latest_check_run_executed_with_target(
self, partition_key: Optional[str], query_key: AssetCheckKey, target_key: EntityKey
) -> bool:
from dagster._core.storage.asset_check_execution_record import AssetCheckExecutionRecord
from dagster._core.storage.dagster_run import RunRecord

check.invariant(partition_key is None, "Partitioned checks not supported")
check_record = AssetCheckExecutionRecord.blocking_get(self, query_key)
if check_record and check_record.event:
run_record = RunRecord.blocking_get(self, check_record.event.run_id)
return bool(run_record) and self._run_record_targets_entity(run_record, target_key)
else:
return False

def _compute_latest_asset_run_executed_with_target(
self, partition_key: Optional[str], query_key: AssetKey, target_key: EntityKey
) -> bool:
from dagster._core.storage.dagster_run import RunRecord
from dagster._core.storage.event_log.base import AssetRecord

asset_record = AssetRecord.blocking_get(self, query_key)
if (
asset_record
and asset_record.asset_entry.last_materialization
and asset_record.asset_entry.last_materialization.asset_materialization
and asset_record.asset_entry.last_materialization.asset_materialization.partition
== partition_key
):
run_record = RunRecord.blocking_get(
self, asset_record.asset_entry.last_materialization.run_id
)
return bool(run_record) and self._run_record_targets_entity(run_record, target_key)
else:
return False

def compute_latest_run_executed_with_subset(
self, from_subset: EntitySubset, target: EntityKey
) -> EntitySubset:
"""Computes the subset of from_subset for which the latest run also targeted
the provided target EntityKey.
"""
return _dispatch(
key=from_subset.key,
check_method=lambda k: self._expensively_filter_entity_subset(
from_subset,
filter_fn=functools.partial(
self._compute_latest_check_run_executed_with_target,
query_key=k,
target_key=target,
),
),
asset_method=lambda k: self._expensively_filter_entity_subset(
from_subset,
filter_fn=functools.partial(
self._compute_latest_asset_run_executed_with_target,
query_key=k,
target_key=target,
),
),
)

def _compute_updated_since_cursor_subset(
self, key: AssetKey, cursor: Optional[int]
) -> EntitySubset[AssetKey]:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -441,6 +441,18 @@ def newly_updated() -> "BuiltinAutomationCondition":

return NewlyUpdatedCondition()

@experimental
@staticmethod
def executed_with_root_target() -> "BuiltinAutomationCondition":
"""Returns an AutomationCondition that is true if the latest run that updated the target also executed
with the root key that the global condition is applied to.
"""
from dagster._core.definitions.declarative_automation.operands import (
LatestRunExecutedWithRootTargetCondition,
)

return LatestRunExecutedWithRootTargetCondition()

@public
@experimental
@staticmethod
Expand Down Expand Up @@ -496,7 +508,11 @@ def any_deps_updated() -> "DepsAutomationCondition":
"""
with disable_dagster_warnings():
return AutomationCondition.any_deps_match(
AutomationCondition.newly_updated() | AutomationCondition.will_be_requested()
(
AutomationCondition.newly_updated()
& ~AutomationCondition.executed_with_root_target()
).with_label("newly_updated_without_root")
| AutomationCondition.will_be_requested()
).with_label("any_deps_updated")

@experimental
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
ExecutionFailedAutomationCondition as ExecutionFailedAutomationCondition,
InitialEvaluationCondition as InitialEvaluationCondition,
InLatestTimeWindowCondition as InLatestTimeWindowCondition,
LatestRunExecutedWithRootTargetCondition as LatestRunExecutedWithRootTargetCondition,
MissingAutomationCondition as MissingAutomationCondition,
NewlyRequestedCondition as NewlyRequestedCondition,
NewlyUpdatedCondition as NewlyUpdatedCondition,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,19 @@ def compute_subset(self, context: AutomationContext) -> EntitySubset:
return context.previous_requested_subset or context.get_empty_subset()


@whitelist_for_serdes
@record
class LatestRunExecutedWithRootTargetCondition(SubsetAutomationCondition):
@property
def name(self) -> str:
return "executed_with_root_target"

def compute_subset(self, context: AutomationContext) -> EntitySubset:
return context.asset_graph_view.compute_latest_run_executed_with_subset(
from_subset=context.candidate_subset, target=context.root_context.key
)


@whitelist_for_serdes
@record
class NewlyUpdatedCondition(SubsetAutomationCondition):
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
import pytest
from dagster import (
AssetKey,
AssetMaterialization,
AutomationCondition,
DagsterInstance,
Definitions,
Output,
StaticPartitionsDefinition,
asset,
asset_check,
Expand Down Expand Up @@ -186,3 +188,55 @@ def foo_check() -> ...: ...
# don't keep requesting
result = evaluate_automation_conditions(defs=defs, instance=instance, cursor=result.cursor)
assert result.total_requested == 0


@pytest.mark.parametrize("b_result", ["skip", "fail", "materialize"])
def test_eager_partial_run(b_result: str) -> None:
@asset
def root() -> None: ...

@asset(deps=[root], automation_condition=AutomationCondition.eager())
def A() -> None: ...

@asset(deps=[A], output_required=False, automation_condition=AutomationCondition.eager())
def B():
if b_result == "skip":
pass
elif b_result == "materialize":
yield Output(1)
else:
return 1 / 0

@asset(deps=[B], automation_condition=AutomationCondition.eager())
def C() -> None: ...

defs = Definitions(assets=[root, A, B, C])
instance = DagsterInstance.ephemeral()

# nothing updated yet
result = evaluate_automation_conditions(defs=defs, instance=instance)
assert result.total_requested == 0

# now root updated, so request a, b, and c
instance.report_runless_asset_event(AssetMaterialization("root"))
result = evaluate_automation_conditions(defs=defs, instance=instance, cursor=result.cursor)
assert result.total_requested == 3

# don't keep requesting
result = evaluate_automation_conditions(defs=defs, instance=instance, cursor=result.cursor)
assert result.total_requested == 0

# now simulate the above run, B / C will not be materialized
defs.get_implicit_global_asset_job_def().execute_in_process(
instance=instance, asset_selection=[A.key, B.key, C.key], raise_on_error=False
)
result = evaluate_automation_conditions(defs=defs, instance=instance, cursor=result.cursor)
# A gets materialized, but this shouldn't kick off B and C
assert result.total_requested == 0

# A gets materialized on its own, do kick off B and C
defs.get_implicit_global_asset_job_def().execute_in_process(
instance=instance, asset_selection=[A.key]
)
result = evaluate_automation_conditions(defs=defs, instance=instance, cursor=result.cursor)
assert result.total_requested == 2

0 comments on commit 4e4b437

Please sign in to comment.