Skip to content

Commit

Permalink
Add all_deps_blocking_checks_passed condition
Browse files Browse the repository at this point in the history
  • Loading branch information
OwenKephart committed Oct 15, 2024
1 parent 037c715 commit f8c1d2c
Show file tree
Hide file tree
Showing 9 changed files with 208 additions and 66 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -521,6 +521,25 @@ def any_deps_in_progress() -> "AnyDepsCondition":
"any_deps_in_progress"
)

@experimental
@staticmethod
def all_deps_blocking_checks_passed() -> "AllDepsCondition":
"""Returns an AutomationCondition that is true for any partition where all upstream
blocking checks have passed, or will be requested on this tick.
In-tick requests are allowed to enable creating runs that target both a parent with
blocking checks and a child. Even though the checks have not currently passed, if
they fail within the run, the run machinery will prevent the child from being
materialized.
"""
with disable_dagster_warnings():
return AutomationCondition.all_deps_match(
AutomationCondition.all_checks_match(
AutomationCondition.check_passed() | AutomationCondition.will_be_requested(),
blocking_only=True,
).with_label("all_blocking_checks_passed")
).with_label("all_deps_blocking_checks_passed")

@experimental
@staticmethod
def all_deps_updated_since_cron(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,17 +1,7 @@
import datetime
import logging
from collections import defaultdict
from typing import (
TYPE_CHECKING,
AbstractSet,
Dict,
Iterable,
List,
Mapping,
Optional,
Sequence,
Tuple,
)
from typing import TYPE_CHECKING, AbstractSet, Dict, Mapping, Optional, Sequence, Tuple

from dagster._core.asset_graph_view.asset_graph_view import AssetGraphView, TemporalContext
from dagster._core.asset_graph_view.entity_subset import EntitySubset
Expand Down Expand Up @@ -74,9 +64,7 @@ def __init__(
self.legacy_expected_data_time_by_key: Dict[AssetKey, Optional[datetime.datetime]] = {}
self.legacy_data_time_resolver = CachingDataTimeResolver(self.instance_queryer)

self._execution_set_extras: Dict[EntityKey, List[EntitySubset[EntityKey]]] = defaultdict(
list
)
self.request_subsets_by_key: Dict[EntityKey, EntitySubset] = {}

@property
def instance_queryer(self) -> "CachingInstanceQueryer":
Expand Down Expand Up @@ -145,7 +133,9 @@ def evaluate(self) -> Tuple[Sequence[AutomationResult], Sequence[EntitySubset[En
f"({format(result.end_timestamp - result.start_timestamp, '.3f')} seconds)"
)
num_evaluated += 1
return list(self.current_results_by_key.values()), list(self._get_entity_subsets())
return list(self.current_results_by_key.values()), [
v for v in self.request_subsets_by_key.values() if not v.is_empty
]

def evaluate_entity(self, key: EntityKey) -> None:
# evaluate the condition of this asset
Expand All @@ -154,12 +144,22 @@ def evaluate_entity(self, key: EntityKey) -> None:

# update dictionaries to keep track of this result
self.current_results_by_key[key] = result
self._add_request_subset(result.true_subset)

if isinstance(key, AssetKey):
self.legacy_expected_data_time_by_key[key] = result.compute_legacy_expected_data_time()
# handle cases where an entity must be materialized with others
self._handle_execution_set(result)

def _add_request_subset(self, subset: EntitySubset) -> None:
"""Adds the provided subset to the dictionary tracking what we will request on this tick."""
if subset.key not in self.request_subsets_by_key:
self.request_subsets_by_key[subset.key] = subset
else:
self.request_subsets_by_key[subset.key] = self.request_subsets_by_key[
subset.key
].compute_union(subset)

def _handle_execution_set(self, result: AutomationResult[AssetKey]) -> None:
# if we need to materialize any partitions of a non-subsettable multi-asset, we need to
# materialize all of them
Expand All @@ -185,20 +185,4 @@ def _handle_execution_set(self, result: AutomationResult[AssetKey]) -> None:
neighbor_true_subset.convert_to_serializable_subset()
)

self._execution_set_extras[neighbor_key].append(neighbor_true_subset)

def _get_entity_subsets(self) -> Iterable[EntitySubset[EntityKey]]:
subsets_by_key = {
key: result.true_subset
for key, result in self.current_results_by_key.items()
if not result.true_subset.is_empty
}
# add in any additional asset partitions we need to request to abide by execution
# set rules
for key, extras in self._execution_set_extras.items():
new_value = subsets_by_key.get(key) or self.asset_graph_view.get_empty_subset(key=key)
for extra in extras:
new_value = new_value.compute_union(extra)
subsets_by_key[key] = new_value

return subsets_by_key.values()
self._add_request_subset(neighbor_true_subset)
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
from dagster._core.definitions.asset_key import AssetCheckKey, AssetKey, EntityKey, T_EntityKey
from dagster._core.definitions.declarative_automation.automation_condition import (
AutomationCondition,
AutomationResult,
)
from dagster._core.definitions.declarative_automation.legacy.legacy_context import (
LegacyRuleEvaluationContext,
Expand Down Expand Up @@ -53,7 +52,7 @@ class AutomationContext(Generic[T_EntityKey]):
create_time: datetime.datetime

asset_graph_view: AssetGraphView
current_results_by_key: Mapping[EntityKey, AutomationResult]
request_subsets_by_key: Mapping[EntityKey, EntitySubset]

parent_context: Optional["AutomationContext"]

Expand Down Expand Up @@ -81,7 +80,7 @@ def create(key: EntityKey, evaluator: "AutomationConditionEvaluator") -> "Automa
candidate_subset=evaluator.asset_graph_view.get_full_subset(key=key),
create_time=get_current_datetime(),
asset_graph_view=asset_graph_view,
current_results_by_key=evaluator.current_results_by_key,
request_subsets_by_key=evaluator.request_subsets_by_key,
parent_context=None,
_cursor=evaluator.cursor.get_previous_condition_cursor(key),
_legacy_context=LegacyRuleEvaluationContext.create(key, evaluator)
Expand All @@ -105,7 +104,7 @@ def for_child_condition(
candidate_subset=candidate_subset,
create_time=get_current_datetime(),
asset_graph_view=self.asset_graph_view,
current_results_by_key=self.current_results_by_key,
request_subsets_by_key=self.request_subsets_by_key,
parent_context=self,
_cursor=self._cursor,
_legacy_context=self._legacy_context.for_child(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
from dagster._core.asset_graph_view.entity_subset import EntitySubset
from dagster._core.asset_graph_view.serializable_entity_subset import SerializableEntitySubset
from dagster._core.definitions.asset_key import EntityKey
from dagster._core.definitions.declarative_automation.automation_condition import AutomationResult
from dagster._core.definitions.declarative_automation.legacy.valid_asset_subset import (
ValidAssetSubset,
)
Expand Down Expand Up @@ -77,7 +76,7 @@ class LegacyRuleEvaluationContext:
instance_queryer: "CachingInstanceQueryer"
data_time_resolver: "CachingDataTimeResolver"

current_results_by_key: Mapping[EntityKey, AutomationResult]
request_subsets_by_key: Mapping[EntityKey, EntitySubset]
expected_data_time_mapping: Mapping[AssetKey, Optional[datetime.datetime]]

start_timestamp: float
Expand Down Expand Up @@ -111,7 +110,7 @@ def create(asset_key: AssetKey, evaluator: "AutomationConditionEvaluator"):
),
data_time_resolver=evaluator.legacy_data_time_resolver,
instance_queryer=instance_queryer,
current_results_by_key=evaluator.current_results_by_key,
request_subsets_by_key=evaluator.request_subsets_by_key,
expected_data_time_mapping=evaluator.legacy_expected_data_time_by_key,
start_timestamp=get_current_timestamp(),
respect_materialization_data_versions=evaluator.legacy_respect_materialization_data_versions,
Expand Down Expand Up @@ -198,11 +197,11 @@ def parent_will_update_subset(self) -> ValidAssetSubset:
for parent_key in self.asset_graph.get(self.asset_key).parent_keys:
if not self.materializable_in_same_run(self.asset_key, parent_key):
continue
parent_result = self.current_results_by_key.get(parent_key)
if not parent_result:
parent_subset = self.request_subsets_by_key.get(parent_key)
if not parent_subset:
continue
parent_subset = ValidAssetSubset.coerce_from_subset(
parent_result.get_serializable_subset(), self.partitions_def
parent_subset.convert_to_serializable_subset(), self.partitions_def
)
subset |= replace(parent_subset, key=self.asset_key)
return subset
Expand Down Expand Up @@ -378,10 +377,10 @@ def get_parents_that_will_not_be_materialized_on_current_tick(
}

def will_update_asset_partition(self, asset_partition: AssetKeyPartitionKey) -> bool:
parent_result = self.current_results_by_key.get(asset_partition.asset_key)
if not parent_result:
parent_subset = self.request_subsets_by_key.get(asset_partition.asset_key)
if not parent_subset:
return False
return asset_partition in parent_result.get_serializable_subset()
return asset_partition in parent_subset.convert_to_serializable_subset()

def add_evaluation_data_from_previous_tick(
self,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,13 +130,9 @@ def _executable_with_root_context_key(self, context: AutomationContext) -> bool:
)

def compute_subset(self, context: AutomationContext) -> EntitySubset:
current_result = context.current_results_by_key.get(context.key)
if (
current_result
and current_result.true_subset
and self._executable_with_root_context_key(context)
):
return current_result.true_subset
current_result = context.request_subsets_by_key.get(context.key)
if current_result and self._executable_with_root_context_key(context):
return current_result
else:
return context.get_empty_subset()

Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,20 @@
import pytest
from dagster import AutomationCondition
from dagster._core.definitions.asset_check_spec import AssetCheckSpec
from dagster._core.definitions.asset_key import AssetCheckKey, AssetKey
from dagster._core.definitions.asset_spec import AssetSpec
from dagster import (
AssetCheckKey,
AssetCheckResult,
AssetCheckSpec,
AssetKey,
AssetMaterialization,
AssetSpec,
AutomationCondition,
DagsterInstance,
Definitions,
MaterializeResult,
asset,
asset_check,
evaluate_automation_conditions,
)
from dagster._core.definitions import materialize
from dagster._core.definitions.declarative_automation.automation_condition import AutomationResult
from dagster._core.definitions.declarative_automation.automation_context import AutomationContext

Expand Down Expand Up @@ -104,3 +116,136 @@ def test_any_checks_match_basic() -> None:
# there is no upstream check for D
state, result = state.evaluate("D")
assert result.true_subset.size == 0


def test_all_deps_blocking_checks_passed_condition() -> None:
@asset
def A() -> None: ...

@asset(deps=[A], automation_condition=AutomationCondition.all_deps_blocking_checks_passed())
def B() -> None: ...

@asset_check(asset=A, blocking=True)
def blocking1(context) -> AssetCheckResult:
passed = "passed" in context.run.tags
return AssetCheckResult(passed=passed)

@asset_check(asset=A, blocking=True)
def blocking2(context) -> AssetCheckResult:
passed = "passed" in context.run.tags
return AssetCheckResult(passed=passed)

@asset_check(asset=A, blocking=False)
def nonblocking1(context) -> AssetCheckResult:
passed = "passed" in context.run.tags
return AssetCheckResult(passed=passed)

@asset_check(asset=B, blocking=True)
def blocking3(context) -> AssetCheckResult:
passed = "passed" in context.run.tags
return AssetCheckResult(passed=passed)

defs = Definitions(assets=[A, B], asset_checks=[blocking1, blocking2, blocking3, nonblocking1])
instance = DagsterInstance.ephemeral()

# no checks evaluated
result = evaluate_automation_conditions(defs=defs, instance=instance)
assert result.total_requested == 0

# blocking1 passes, still not all of them
defs.get_implicit_global_asset_job_def().get_subset(
asset_check_selection={blocking1.check_key}
).execute_in_process(tags={"passed": ""}, instance=instance)
result = evaluate_automation_conditions(defs=defs, instance=instance, cursor=result.cursor)
assert result.total_requested == 0

# blocking2 passes, now all have passed
defs.get_implicit_global_asset_job_def().get_subset(
asset_check_selection={blocking2.check_key}
).execute_in_process(tags={"passed": ""}, instance=instance)
result = evaluate_automation_conditions(defs=defs, instance=instance, cursor=result.cursor)
assert result.total_requested == 1

# blocking3 fails, no impact (as it's not on a dep)
defs.get_implicit_global_asset_job_def().get_subset(
asset_check_selection={blocking3.check_key}
).execute_in_process(instance=instance, raise_on_error=False)
result = evaluate_automation_conditions(defs=defs, instance=instance, cursor=result.cursor)
assert result.total_requested == 1

# nonblocking1 fails, no impact (as it's non-blocking)
defs.get_implicit_global_asset_job_def().get_subset(
asset_check_selection={nonblocking1.check_key}
).execute_in_process(instance=instance, raise_on_error=False)
result = evaluate_automation_conditions(defs=defs, instance=instance, cursor=result.cursor)
assert result.total_requested == 1

# now A gets rematerialized, blocking checks haven't been executed yet
instance.report_runless_asset_event(AssetMaterialization("A"))
result = evaluate_automation_conditions(defs=defs, instance=instance, cursor=result.cursor)
assert result.total_requested == 0

# blocking1 passes, but blocking2 fails
defs.get_implicit_global_asset_job_def().get_subset(
asset_check_selection={blocking1.check_key}
).execute_in_process(tags={"passed": ""}, instance=instance)
defs.get_implicit_global_asset_job_def().get_subset(
asset_check_selection={blocking2.check_key}
).execute_in_process(instance=instance, raise_on_error=False)
result = evaluate_automation_conditions(defs=defs, instance=instance, cursor=result.cursor)
assert result.total_requested == 0

# now blocking2 passes
defs.get_implicit_global_asset_job_def().get_subset(
asset_check_selection={blocking2.check_key}
).execute_in_process(tags={"passed": ""}, instance=instance)
result = evaluate_automation_conditions(defs=defs, instance=instance, cursor=result.cursor)
assert result.total_requested == 1


def test_blocking_checks_with_eager() -> None:
cond = AutomationCondition.eager() & AutomationCondition.all_deps_blocking_checks_passed()

@asset
def root() -> None: ...

@asset(
deps=[root],
automation_condition=cond,
check_specs=[AssetCheckSpec("x", asset="A", blocking=True)],
)
def A() -> MaterializeResult:
return MaterializeResult(check_results=[AssetCheckResult(passed=True)])

@asset(deps=[A], automation_condition=cond)
def B() -> None: ...

defs = Definitions(assets=[root, A, B])
instance = DagsterInstance.ephemeral()

# nothing to do yet
result = evaluate_automation_conditions(defs=defs, instance=instance)
assert result.total_requested == 0

# root is materialized, should kick off a run of both A and B
instance.report_runless_asset_event(AssetMaterialization("root"))
result = evaluate_automation_conditions(defs=defs, instance=instance, cursor=result.cursor)
assert result.total_requested == 2

# don't launch again
result = evaluate_automation_conditions(defs=defs, instance=instance, cursor=result.cursor)
assert result.total_requested == 0

# A is materialized in a vacuum (technically impossible), don't kick off
instance.report_runless_asset_event(AssetMaterialization("A"))
result = evaluate_automation_conditions(defs=defs, instance=instance, cursor=result.cursor)
assert result.total_requested == 0

# A is now materialized with its check, do kick off B
materialize([A], instance=instance)
result = evaluate_automation_conditions(defs=defs, instance=instance, cursor=result.cursor)
assert result.total_requested == 1

# don't launch again
result = evaluate_automation_conditions(defs=defs, instance=instance, cursor=result.cursor)
assert result.total_requested == 0
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@
unevaluated_runs=[run(["asset1", "asset2", "asset3", "asset4", "asset5", "asset6"])],
),
# don't need to run asset4 for reconciliation but asset4 must run when asset3 does
expected_run_requests=[run_request(asset_keys=["asset3", "asset4", "asset5"])],
expected_run_requests=[run_request(asset_keys=["asset3", "asset4", "asset5", "asset6"])],
),
"multi_asset_in_middle_single_parent_rematerialized_subsettable": AssetReconciliationScenario(
assets=multi_asset_in_middle_subsettable,
Expand Down
Loading

0 comments on commit f8c1d2c

Please sign in to comment.