diff --git a/python_modules/dagster/dagster/_core/definitions/declarative_automation/automation_condition.py b/python_modules/dagster/dagster/_core/definitions/declarative_automation/automation_condition.py index 1aacd97b16566..3d50da60a1bdc 100644 --- a/python_modules/dagster/dagster/_core/definitions/declarative_automation/automation_condition.py +++ b/python_modules/dagster/dagster/_core/definitions/declarative_automation/automation_condition.py @@ -521,6 +521,25 @@ def any_deps_in_progress() -> "AnyDepsCondition": "any_deps_in_progress" ) + @experimental + @staticmethod + def all_deps_blocking_checks_passed() -> "AllDepsCondition": + """Returns an AutomationCondition that is true for any partition where all upstream + blocking checks have passed, or will be requested on this tick. + + In-tick requests are allowed to enable creating runs that target both a parent with + blocking checks and a child. Even though the checks have not currently passed, if + they fail within the run, the run machinery will prevent the child from being + materialized. + """ + with disable_dagster_warnings(): + return AutomationCondition.all_deps_match( + AutomationCondition.all_checks_match( + AutomationCondition.check_passed() | AutomationCondition.will_be_requested(), + blocking_only=True, + ).with_label("all_blocking_checks_passed") + ).with_label("all_deps_blocking_checks_passed") + @experimental @staticmethod def all_deps_updated_since_cron( diff --git a/python_modules/dagster/dagster/_core/definitions/declarative_automation/automation_condition_evaluator.py b/python_modules/dagster/dagster/_core/definitions/declarative_automation/automation_condition_evaluator.py index 3affaa3c1082a..2df359190ec70 100644 --- a/python_modules/dagster/dagster/_core/definitions/declarative_automation/automation_condition_evaluator.py +++ b/python_modules/dagster/dagster/_core/definitions/declarative_automation/automation_condition_evaluator.py @@ -1,17 +1,7 @@ import datetime import logging from collections import defaultdict -from typing import ( - TYPE_CHECKING, - AbstractSet, - Dict, - Iterable, - List, - Mapping, - Optional, - Sequence, - Tuple, -) +from typing import TYPE_CHECKING, AbstractSet, Dict, Mapping, Optional, Sequence, Tuple from dagster._core.asset_graph_view.asset_graph_view import AssetGraphView, TemporalContext from dagster._core.asset_graph_view.entity_subset import EntitySubset @@ -74,9 +64,7 @@ def __init__( self.legacy_expected_data_time_by_key: Dict[AssetKey, Optional[datetime.datetime]] = {} self.legacy_data_time_resolver = CachingDataTimeResolver(self.instance_queryer) - self._execution_set_extras: Dict[EntityKey, List[EntitySubset[EntityKey]]] = defaultdict( - list - ) + self.request_subsets_by_key: Dict[EntityKey, EntitySubset] = {} @property def instance_queryer(self) -> "CachingInstanceQueryer": @@ -145,7 +133,9 @@ def evaluate(self) -> Tuple[Sequence[AutomationResult], Sequence[EntitySubset[En f"({format(result.end_timestamp - result.start_timestamp, '.3f')} seconds)" ) num_evaluated += 1 - return list(self.current_results_by_key.values()), list(self._get_entity_subsets()) + return list(self.current_results_by_key.values()), [ + v for v in self.request_subsets_by_key.values() if not v.is_empty + ] def evaluate_entity(self, key: EntityKey) -> None: # evaluate the condition of this asset @@ -154,12 +144,22 @@ def evaluate_entity(self, key: EntityKey) -> None: # update dictionaries to keep track of this result self.current_results_by_key[key] = result + self._add_request_subset(result.true_subset) if isinstance(key, AssetKey): self.legacy_expected_data_time_by_key[key] = result.compute_legacy_expected_data_time() # handle cases where an entity must be materialized with others self._handle_execution_set(result) + def _add_request_subset(self, subset: EntitySubset) -> None: + """Adds the provided subset to the dictionary tracking what we will request on this tick.""" + if subset.key not in self.request_subsets_by_key: + self.request_subsets_by_key[subset.key] = subset + else: + self.request_subsets_by_key[subset.key] = self.request_subsets_by_key[ + subset.key + ].compute_union(subset) + def _handle_execution_set(self, result: AutomationResult[AssetKey]) -> None: # if we need to materialize any partitions of a non-subsettable multi-asset, we need to # materialize all of them @@ -185,20 +185,4 @@ def _handle_execution_set(self, result: AutomationResult[AssetKey]) -> None: neighbor_true_subset.convert_to_serializable_subset() ) - self._execution_set_extras[neighbor_key].append(neighbor_true_subset) - - def _get_entity_subsets(self) -> Iterable[EntitySubset[EntityKey]]: - subsets_by_key = { - key: result.true_subset - for key, result in self.current_results_by_key.items() - if not result.true_subset.is_empty - } - # add in any additional asset partitions we need to request to abide by execution - # set rules - for key, extras in self._execution_set_extras.items(): - new_value = subsets_by_key.get(key) or self.asset_graph_view.get_empty_subset(key=key) - for extra in extras: - new_value = new_value.compute_union(extra) - subsets_by_key[key] = new_value - - return subsets_by_key.values() + self._add_request_subset(neighbor_true_subset) diff --git a/python_modules/dagster/dagster/_core/definitions/declarative_automation/automation_context.py b/python_modules/dagster/dagster/_core/definitions/declarative_automation/automation_context.py index 4faa0adbb2d2e..c57ee0bba6dc1 100644 --- a/python_modules/dagster/dagster/_core/definitions/declarative_automation/automation_context.py +++ b/python_modules/dagster/dagster/_core/definitions/declarative_automation/automation_context.py @@ -9,7 +9,6 @@ from dagster._core.definitions.asset_key import AssetCheckKey, AssetKey, EntityKey, T_EntityKey from dagster._core.definitions.declarative_automation.automation_condition import ( AutomationCondition, - AutomationResult, ) from dagster._core.definitions.declarative_automation.legacy.legacy_context import ( LegacyRuleEvaluationContext, @@ -53,7 +52,7 @@ class AutomationContext(Generic[T_EntityKey]): create_time: datetime.datetime asset_graph_view: AssetGraphView - current_results_by_key: Mapping[EntityKey, AutomationResult] + request_subsets_by_key: Mapping[EntityKey, EntitySubset] parent_context: Optional["AutomationContext"] @@ -81,7 +80,7 @@ def create(key: EntityKey, evaluator: "AutomationConditionEvaluator") -> "Automa candidate_subset=evaluator.asset_graph_view.get_full_subset(key=key), create_time=get_current_datetime(), asset_graph_view=asset_graph_view, - current_results_by_key=evaluator.current_results_by_key, + request_subsets_by_key=evaluator.request_subsets_by_key, parent_context=None, _cursor=evaluator.cursor.get_previous_condition_cursor(key), _legacy_context=LegacyRuleEvaluationContext.create(key, evaluator) @@ -105,7 +104,7 @@ def for_child_condition( candidate_subset=candidate_subset, create_time=get_current_datetime(), asset_graph_view=self.asset_graph_view, - current_results_by_key=self.current_results_by_key, + request_subsets_by_key=self.request_subsets_by_key, parent_context=self, _cursor=self._cursor, _legacy_context=self._legacy_context.for_child( diff --git a/python_modules/dagster/dagster/_core/definitions/declarative_automation/legacy/legacy_context.py b/python_modules/dagster/dagster/_core/definitions/declarative_automation/legacy/legacy_context.py index be1839cad1929..7742652589e58 100644 --- a/python_modules/dagster/dagster/_core/definitions/declarative_automation/legacy/legacy_context.py +++ b/python_modules/dagster/dagster/_core/definitions/declarative_automation/legacy/legacy_context.py @@ -22,7 +22,6 @@ from dagster._core.asset_graph_view.entity_subset import EntitySubset from dagster._core.asset_graph_view.serializable_entity_subset import SerializableEntitySubset from dagster._core.definitions.asset_key import EntityKey -from dagster._core.definitions.declarative_automation.automation_condition import AutomationResult from dagster._core.definitions.declarative_automation.legacy.valid_asset_subset import ( ValidAssetSubset, ) @@ -77,7 +76,7 @@ class LegacyRuleEvaluationContext: instance_queryer: "CachingInstanceQueryer" data_time_resolver: "CachingDataTimeResolver" - current_results_by_key: Mapping[EntityKey, AutomationResult] + request_subsets_by_key: Mapping[EntityKey, EntitySubset] expected_data_time_mapping: Mapping[AssetKey, Optional[datetime.datetime]] start_timestamp: float @@ -111,7 +110,7 @@ def create(asset_key: AssetKey, evaluator: "AutomationConditionEvaluator"): ), data_time_resolver=evaluator.legacy_data_time_resolver, instance_queryer=instance_queryer, - current_results_by_key=evaluator.current_results_by_key, + request_subsets_by_key=evaluator.request_subsets_by_key, expected_data_time_mapping=evaluator.legacy_expected_data_time_by_key, start_timestamp=get_current_timestamp(), respect_materialization_data_versions=evaluator.legacy_respect_materialization_data_versions, @@ -198,11 +197,11 @@ def parent_will_update_subset(self) -> ValidAssetSubset: for parent_key in self.asset_graph.get(self.asset_key).parent_keys: if not self.materializable_in_same_run(self.asset_key, parent_key): continue - parent_result = self.current_results_by_key.get(parent_key) - if not parent_result: + parent_subset = self.request_subsets_by_key.get(parent_key) + if not parent_subset: continue parent_subset = ValidAssetSubset.coerce_from_subset( - parent_result.get_serializable_subset(), self.partitions_def + parent_subset.convert_to_serializable_subset(), self.partitions_def ) subset |= replace(parent_subset, key=self.asset_key) return subset @@ -378,10 +377,10 @@ def get_parents_that_will_not_be_materialized_on_current_tick( } def will_update_asset_partition(self, asset_partition: AssetKeyPartitionKey) -> bool: - parent_result = self.current_results_by_key.get(asset_partition.asset_key) - if not parent_result: + parent_subset = self.request_subsets_by_key.get(asset_partition.asset_key) + if not parent_subset: return False - return asset_partition in parent_result.get_serializable_subset() + return asset_partition in parent_subset.convert_to_serializable_subset() def add_evaluation_data_from_previous_tick( self, diff --git a/python_modules/dagster/dagster/_core/definitions/declarative_automation/operands/slice_conditions.py b/python_modules/dagster/dagster/_core/definitions/declarative_automation/operands/slice_conditions.py index d7f8b430515e9..4b7889b7abbd1 100644 --- a/python_modules/dagster/dagster/_core/definitions/declarative_automation/operands/slice_conditions.py +++ b/python_modules/dagster/dagster/_core/definitions/declarative_automation/operands/slice_conditions.py @@ -130,13 +130,9 @@ def _executable_with_root_context_key(self, context: AutomationContext) -> bool: ) def compute_subset(self, context: AutomationContext) -> EntitySubset: - current_result = context.current_results_by_key.get(context.key) - if ( - current_result - and current_result.true_subset - and self._executable_with_root_context_key(context) - ): - return current_result.true_subset + current_result = context.request_subsets_by_key.get(context.key) + if current_result and self._executable_with_root_context_key(context): + return current_result else: return context.get_empty_subset() diff --git a/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/automation_condition_tests/builtins/test_check_condition.py b/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/automation_condition_tests/builtins/test_check_condition.py index 76c16799279bb..71ce3823fad90 100644 --- a/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/automation_condition_tests/builtins/test_check_condition.py +++ b/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/automation_condition_tests/builtins/test_check_condition.py @@ -1,8 +1,20 @@ import pytest -from dagster import AutomationCondition -from dagster._core.definitions.asset_check_spec import AssetCheckSpec -from dagster._core.definitions.asset_key import AssetCheckKey, AssetKey -from dagster._core.definitions.asset_spec import AssetSpec +from dagster import ( + AssetCheckKey, + AssetCheckResult, + AssetCheckSpec, + AssetKey, + AssetMaterialization, + AssetSpec, + AutomationCondition, + DagsterInstance, + Definitions, + MaterializeResult, + asset, + asset_check, + evaluate_automation_conditions, +) +from dagster._core.definitions import materialize from dagster._core.definitions.declarative_automation.automation_condition import AutomationResult from dagster._core.definitions.declarative_automation.automation_context import AutomationContext @@ -104,3 +116,136 @@ def test_any_checks_match_basic() -> None: # there is no upstream check for D state, result = state.evaluate("D") assert result.true_subset.size == 0 + + +def test_all_deps_blocking_checks_passed_condition() -> None: + @asset + def A() -> None: ... + + @asset(deps=[A], automation_condition=AutomationCondition.all_deps_blocking_checks_passed()) + def B() -> None: ... + + @asset_check(asset=A, blocking=True) + def blocking1(context) -> AssetCheckResult: + passed = "passed" in context.run.tags + return AssetCheckResult(passed=passed) + + @asset_check(asset=A, blocking=True) + def blocking2(context) -> AssetCheckResult: + passed = "passed" in context.run.tags + return AssetCheckResult(passed=passed) + + @asset_check(asset=A, blocking=False) + def nonblocking1(context) -> AssetCheckResult: + passed = "passed" in context.run.tags + return AssetCheckResult(passed=passed) + + @asset_check(asset=B, blocking=True) + def blocking3(context) -> AssetCheckResult: + passed = "passed" in context.run.tags + return AssetCheckResult(passed=passed) + + defs = Definitions(assets=[A, B], asset_checks=[blocking1, blocking2, blocking3, nonblocking1]) + instance = DagsterInstance.ephemeral() + + # no checks evaluated + result = evaluate_automation_conditions(defs=defs, instance=instance) + assert result.total_requested == 0 + + # blocking1 passes, still not all of them + defs.get_implicit_global_asset_job_def().get_subset( + asset_check_selection={blocking1.check_key} + ).execute_in_process(tags={"passed": ""}, instance=instance) + result = evaluate_automation_conditions(defs=defs, instance=instance, cursor=result.cursor) + assert result.total_requested == 0 + + # blocking2 passes, now all have passed + defs.get_implicit_global_asset_job_def().get_subset( + asset_check_selection={blocking2.check_key} + ).execute_in_process(tags={"passed": ""}, instance=instance) + result = evaluate_automation_conditions(defs=defs, instance=instance, cursor=result.cursor) + assert result.total_requested == 1 + + # blocking3 fails, no impact (as it's not on a dep) + defs.get_implicit_global_asset_job_def().get_subset( + asset_check_selection={blocking3.check_key} + ).execute_in_process(instance=instance, raise_on_error=False) + result = evaluate_automation_conditions(defs=defs, instance=instance, cursor=result.cursor) + assert result.total_requested == 1 + + # nonblocking1 fails, no impact (as it's non-blocking) + defs.get_implicit_global_asset_job_def().get_subset( + asset_check_selection={nonblocking1.check_key} + ).execute_in_process(instance=instance, raise_on_error=False) + result = evaluate_automation_conditions(defs=defs, instance=instance, cursor=result.cursor) + assert result.total_requested == 1 + + # now A gets rematerialized, blocking checks haven't been executed yet + instance.report_runless_asset_event(AssetMaterialization("A")) + result = evaluate_automation_conditions(defs=defs, instance=instance, cursor=result.cursor) + assert result.total_requested == 0 + + # blocking1 passes, but blocking2 fails + defs.get_implicit_global_asset_job_def().get_subset( + asset_check_selection={blocking1.check_key} + ).execute_in_process(tags={"passed": ""}, instance=instance) + defs.get_implicit_global_asset_job_def().get_subset( + asset_check_selection={blocking2.check_key} + ).execute_in_process(instance=instance, raise_on_error=False) + result = evaluate_automation_conditions(defs=defs, instance=instance, cursor=result.cursor) + assert result.total_requested == 0 + + # now blocking2 passes + defs.get_implicit_global_asset_job_def().get_subset( + asset_check_selection={blocking2.check_key} + ).execute_in_process(tags={"passed": ""}, instance=instance) + result = evaluate_automation_conditions(defs=defs, instance=instance, cursor=result.cursor) + assert result.total_requested == 1 + + +def test_blocking_checks_with_eager() -> None: + cond = AutomationCondition.eager() & AutomationCondition.all_deps_blocking_checks_passed() + + @asset + def root() -> None: ... + + @asset( + deps=[root], + automation_condition=cond, + check_specs=[AssetCheckSpec("x", asset="A", blocking=True)], + ) + def A() -> MaterializeResult: + return MaterializeResult(check_results=[AssetCheckResult(passed=True)]) + + @asset(deps=[A], automation_condition=cond) + def B() -> None: ... + + defs = Definitions(assets=[root, A, B]) + instance = DagsterInstance.ephemeral() + + # nothing to do yet + result = evaluate_automation_conditions(defs=defs, instance=instance) + assert result.total_requested == 0 + + # root is materialized, should kick off a run of both A and B + instance.report_runless_asset_event(AssetMaterialization("root")) + result = evaluate_automation_conditions(defs=defs, instance=instance, cursor=result.cursor) + assert result.total_requested == 2 + + # don't launch again + result = evaluate_automation_conditions(defs=defs, instance=instance, cursor=result.cursor) + assert result.total_requested == 0 + + # A is materialized in a vacuum (technically impossible), don't kick off + instance.report_runless_asset_event(AssetMaterialization("A")) + result = evaluate_automation_conditions(defs=defs, instance=instance, cursor=result.cursor) + assert result.total_requested == 0 + + # A is now materialized with its check, do kick off B + materialize([A], instance=instance) + result = evaluate_automation_conditions(defs=defs, instance=instance, cursor=result.cursor) + assert result.total_requested == 1 + + # don't launch again + result = evaluate_automation_conditions(defs=defs, instance=instance, cursor=result.cursor) + assert result.total_requested == 0 diff --git a/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/legacy_tests/scenarios/basic_scenarios.py b/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/legacy_tests/scenarios/basic_scenarios.py index 3db4e5fcb1fd6..000a5077b08d6 100644 --- a/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/legacy_tests/scenarios/basic_scenarios.py +++ b/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/legacy_tests/scenarios/basic_scenarios.py @@ -74,7 +74,7 @@ unevaluated_runs=[run(["asset1", "asset2", "asset3", "asset4", "asset5", "asset6"])], ), # don't need to run asset4 for reconciliation but asset4 must run when asset3 does - expected_run_requests=[run_request(asset_keys=["asset3", "asset4", "asset5"])], + expected_run_requests=[run_request(asset_keys=["asset3", "asset4", "asset5", "asset6"])], ), "multi_asset_in_middle_single_parent_rematerialized_subsettable": AssetReconciliationScenario( assets=multi_asset_in_middle_subsettable, diff --git a/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/legacy_tests/scenarios/freshness_policy_scenarios.py b/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/legacy_tests/scenarios/freshness_policy_scenarios.py index ba3bdc609fda5..bb22413389d94 100644 --- a/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/legacy_tests/scenarios/freshness_policy_scenarios.py +++ b/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/legacy_tests/scenarios/freshness_policy_scenarios.py @@ -65,7 +65,10 @@ unevaluated_runs=[run([f"asset{i}" for i in range(1, 6)])], evaluation_delta=datetime.timedelta(minutes=35), # need to run assets 1, 2 and 3 as they're all part of the same non-subsettable multi asset - expected_run_requests=[run_request(asset_keys=["asset1", "asset2", "asset3", "asset5"])], + # run 4 as it is eager and downstream of 1 + expected_run_requests=[ + run_request(asset_keys=["asset1", "asset2", "asset3", "asset4", "asset5"]) + ], expected_evaluations=[ AssetEvaluationSpec.from_single_rule( "asset2", diff --git a/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/scenario_utils/automation_condition_scenario.py b/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/scenario_utils/automation_condition_scenario.py index a0568ecd2bcf1..3f50cd2b88be3 100644 --- a/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/scenario_utils/automation_condition_scenario.py +++ b/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/scenario_utils/automation_condition_scenario.py @@ -4,9 +4,9 @@ from typing import Mapping, Optional, Sequence, Tuple import dagster._check as check -import mock from dagster import AssetKey from dagster._core.asset_graph_view.asset_graph_view import AssetGraphView +from dagster._core.asset_graph_view.entity_subset import EntitySubset from dagster._core.definitions.asset_daemon_cursor import AssetDaemonCursor from dagster._core.definitions.auto_materialize_policy import AutoMaterializePolicy from dagster._core.definitions.declarative_automation.automation_condition import ( @@ -52,19 +52,16 @@ class AutomationConditionScenarioState(ScenarioState): ensure_empty_result: bool = True request_backfills: bool = False - def _get_current_results_by_key( + def _get_request_subsets_by_key( self, asset_graph_view: AssetGraphView - ) -> Mapping[AssetKey, AutomationResult]: + ) -> Mapping[AssetKey, EntitySubset]: if self.requested_asset_partitions is None: return {} ap_by_key = defaultdict(set) for ap in self.requested_asset_partitions: ap_by_key[ap.asset_key].add(ap) return { - asset_key: mock.MagicMock( - true_subset=asset_graph_view.get_asset_subset_from_asset_partitions(asset_key, aps), - cursor=None, - ) + asset_key: asset_graph_view.get_asset_subset_from_asset_partitions(asset_key, aps) for asset_key, aps in ap_by_key.items() } @@ -99,7 +96,7 @@ def evaluate( logger=self.logger, allow_backfills=False, ) - evaluator.current_results_by_key = self._get_current_results_by_key( + evaluator.request_subsets_by_key = self._get_request_subsets_by_key( evaluator.asset_graph_view ) # type: ignore context = AutomationContext.create(key=asset_key, evaluator=evaluator)