From 230df043064ad5903c7cdbbfd27454a7bd527805 Mon Sep 17 00:00:00 2001 From: Owen Kephart Date: Mon, 14 Oct 2024 09:17:58 -0700 Subject: [PATCH] Add `executed_with_root_target` condition to handle partial runs / failures --- .../test_asset_condition_evaluations.py | 2 +- .../asset_graph_view/asset_graph_view.py | 81 +++++++++++++++++++ .../automation_condition.py | 18 ++++- .../operands/__init__.py | 1 + .../operands/operands.py | 13 +++ .../builtins/test_eager_condition.py | 54 +++++++++++++ 6 files changed, 167 insertions(+), 2 deletions(-) diff --git a/python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_asset_condition_evaluations.py b/python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_asset_condition_evaluations.py index 2454366577b2c..79b460170e8ac 100644 --- a/python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_asset_condition_evaluations.py +++ b/python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_asset_condition_evaluations.py @@ -676,7 +676,7 @@ def A() -> None: ... assert record["numRequested"] == 0 # all nodes in the tree - assert len(record["evaluationNodes"]) == 32 + assert len(record["evaluationNodes"]) == 35 rootNode = record["evaluationNodes"][0] assert rootNode["uniqueId"] == record["rootUniqueId"] diff --git a/python_modules/dagster/dagster/_core/asset_graph_view/asset_graph_view.py b/python_modules/dagster/dagster/_core/asset_graph_view/asset_graph_view.py index 958f3038301f4..5cc2ada07786a 100644 --- a/python_modules/dagster/dagster/_core/asset_graph_view/asset_graph_view.py +++ b/python_modules/dagster/dagster/_core/asset_graph_view/asset_graph_view.py @@ -32,6 +32,7 @@ from dagster._core.definitions.partition import PartitionsDefinition from dagster._core.instance import DagsterInstance from dagster._core.storage.asset_check_execution_record import AssetCheckExecutionResolvedStatus + from dagster._core.storage.dagster_run import RunRecord from dagster._utils.caching_instance_queryer import CachingInstanceQueryer @@ -465,6 +466,86 @@ def compute_missing_subset(self, *, key: EntityKey, from_subset: EntitySubset) - ), ) + def _expensively_filter_entity_subset( + self, subset: EntitySubset, filter_fn: Callable[[Optional[str]], bool] + ) -> EntitySubset: + if subset.is_partitioned: + return subset.compute_intersection_with_partition_keys( + {pk for pk in subset.expensively_compute_partition_keys() if filter_fn(pk)} + ) + else: + return ( + subset + if not subset.is_empty and filter_fn(None) + else self.get_empty_subset(key=subset.key) + ) + + def _run_record_targets_entity(self, run_record: "RunRecord", target_key: EntityKey) -> bool: + asset_selection = run_record.dagster_run.asset_selection or set() + check_selection = run_record.dagster_run.asset_check_selection or set() + return target_key in (asset_selection | check_selection) + + def _compute_latest_check_run_executed_with_target( + self, partition_key: Optional[str], query_key: AssetCheckKey, target_key: EntityKey + ) -> bool: + from dagster._core.storage.asset_check_execution_record import AssetCheckExecutionRecord + from dagster._core.storage.dagster_run import RunRecord + + check.invariant(partition_key is None, "Partitioned checks not supported") + check_record = AssetCheckExecutionRecord.blocking_get(self, query_key) + if check_record and check_record.event: + run_record = RunRecord.blocking_get(self, check_record.event.run_id) + return bool(run_record) and self._run_record_targets_entity(run_record, target_key) + else: + return False + + def _compute_latest_asset_run_executed_with_target( + self, partition_key: Optional[str], query_key: AssetKey, target_key: EntityKey + ) -> bool: + from dagster._core.storage.dagster_run import RunRecord + from dagster._core.storage.event_log.base import AssetRecord + + asset_record = AssetRecord.blocking_get(self, query_key) + if ( + asset_record + and asset_record.asset_entry.last_materialization + and asset_record.asset_entry.last_materialization.asset_materialization + and asset_record.asset_entry.last_materialization.asset_materialization.partition + == partition_key + ): + run_record = RunRecord.blocking_get( + self, asset_record.asset_entry.last_materialization.run_id + ) + return bool(run_record) and self._run_record_targets_entity(run_record, target_key) + else: + return False + + def compute_latest_run_executed_with_subset( + self, from_subset: EntitySubset, target: EntityKey + ) -> EntitySubset: + """Computes the subset of from_subset for which the latest run also targeted + the provided target EntityKey. + """ + return _dispatch( + key=from_subset.key, + check_method=lambda k: self._expensively_filter_entity_subset( + from_subset, + filter_fn=functools.partial( + self._compute_latest_check_run_executed_with_target, + query_key=k, + target_key=target, + ), + ), + asset_method=lambda k: self._expensively_filter_entity_subset( + from_subset, + filter_fn=functools.partial( + self._compute_latest_asset_run_executed_with_target, + query_key=k, + target_key=target, + ), + ), + ) + def _compute_updated_since_cursor_subset( self, key: AssetKey, cursor: Optional[int] ) -> EntitySubset[AssetKey]: diff --git a/python_modules/dagster/dagster/_core/definitions/declarative_automation/automation_condition.py b/python_modules/dagster/dagster/_core/definitions/declarative_automation/automation_condition.py index bd2d1172dabae..59ab36f712408 100644 --- a/python_modules/dagster/dagster/_core/definitions/declarative_automation/automation_condition.py +++ b/python_modules/dagster/dagster/_core/definitions/declarative_automation/automation_condition.py @@ -441,6 +441,18 @@ def newly_updated() -> "BuiltinAutomationCondition": return NewlyUpdatedCondition() + @experimental + @staticmethod + def executed_with_root_target() -> "BuiltinAutomationCondition": + """Returns an AutomationCondition that is true if the latest run that updated the target also executed + with the root key that the global condition is applied to. + """ + from dagster._core.definitions.declarative_automation.operands import ( + LatestRunExecutedWithRootTargetCondition, + ) + + return LatestRunExecutedWithRootTargetCondition() + @public @experimental @staticmethod @@ -496,7 +508,11 @@ def any_deps_updated() -> "DepsAutomationCondition": """ with disable_dagster_warnings(): return AutomationCondition.any_deps_match( - AutomationCondition.newly_updated() | AutomationCondition.will_be_requested() + ( + AutomationCondition.newly_updated() + & ~AutomationCondition.executed_with_root_target() + ).with_label("newly_updated_without_root") + | AutomationCondition.will_be_requested() ).with_label("any_deps_updated") @experimental diff --git a/python_modules/dagster/dagster/_core/definitions/declarative_automation/operands/__init__.py b/python_modules/dagster/dagster/_core/definitions/declarative_automation/operands/__init__.py index bef50dd690a7a..68a58ffface39 100644 --- a/python_modules/dagster/dagster/_core/definitions/declarative_automation/operands/__init__.py +++ b/python_modules/dagster/dagster/_core/definitions/declarative_automation/operands/__init__.py @@ -6,6 +6,7 @@ ExecutionFailedAutomationCondition as ExecutionFailedAutomationCondition, InitialEvaluationCondition as InitialEvaluationCondition, InLatestTimeWindowCondition as InLatestTimeWindowCondition, + LatestRunExecutedWithRootTargetCondition as LatestRunExecutedWithRootTargetCondition, MissingAutomationCondition as MissingAutomationCondition, NewlyRequestedCondition as NewlyRequestedCondition, NewlyUpdatedCondition as NewlyUpdatedCondition, diff --git a/python_modules/dagster/dagster/_core/definitions/declarative_automation/operands/operands.py b/python_modules/dagster/dagster/_core/definitions/declarative_automation/operands/operands.py index 3f5c3157936e1..10d71d330ca15 100644 --- a/python_modules/dagster/dagster/_core/definitions/declarative_automation/operands/operands.py +++ b/python_modules/dagster/dagster/_core/definitions/declarative_automation/operands/operands.py @@ -140,6 +140,19 @@ def compute_subset(self, context: AutomationContext) -> EntitySubset: return context.previous_requested_subset or context.get_empty_subset() +@whitelist_for_serdes +@record +class LatestRunExecutedWithRootTargetCondition(SubsetAutomationCondition): + @property + def name(self) -> str: + return "executed_with_root_target" + + def compute_subset(self, context: AutomationContext) -> EntitySubset: + return context.asset_graph_view.compute_latest_run_executed_with_subset( + from_subset=context.candidate_subset, target=context.root_context.key + ) + + @whitelist_for_serdes @record class NewlyUpdatedCondition(SubsetAutomationCondition): diff --git a/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/automation_condition_tests/builtins/test_eager_condition.py b/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/automation_condition_tests/builtins/test_eager_condition.py index 06420573bea6c..ed4c9effecada 100644 --- a/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/automation_condition_tests/builtins/test_eager_condition.py +++ b/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/automation_condition_tests/builtins/test_eager_condition.py @@ -1,9 +1,11 @@ +import pytest from dagster import ( AssetKey, AssetMaterialization, AutomationCondition, DagsterInstance, Definitions, + Output, StaticPartitionsDefinition, asset, asset_check, @@ -186,3 +188,55 @@ def foo_check() -> ...: ... # don't keep requesting result = evaluate_automation_conditions(defs=defs, instance=instance, cursor=result.cursor) assert result.total_requested == 0 + + +@pytest.mark.parametrize("b_result", ["skip", "fail", "materialize"]) +def test_eager_partial_run(b_result: str) -> None: + @asset + def root() -> None: ... + + @asset(deps=[root], automation_condition=AutomationCondition.eager()) + def A() -> None: ... + + @asset(deps=[A], output_required=False, automation_condition=AutomationCondition.eager()) + def B(): + if b_result == "skip": + pass + elif b_result == "materialize": + yield Output(1) + else: + return 1 / 0 + + @asset(deps=[B], automation_condition=AutomationCondition.eager()) + def C() -> None: ... + + defs = Definitions(assets=[root, A, B, C]) + instance = DagsterInstance.ephemeral() + + # nothing updated yet + result = evaluate_automation_conditions(defs=defs, instance=instance) + assert result.total_requested == 0 + + # now root updated, so request a, b, and c + instance.report_runless_asset_event(AssetMaterialization("root")) + result = evaluate_automation_conditions(defs=defs, instance=instance, cursor=result.cursor) + assert result.total_requested == 3 + + # don't keep requesting + result = evaluate_automation_conditions(defs=defs, instance=instance, cursor=result.cursor) + assert result.total_requested == 0 + + # now simulate the above run, B / C will not be materialized + defs.get_implicit_global_asset_job_def().execute_in_process( + instance=instance, asset_selection=[A.key, B.key, C.key], raise_on_error=False + ) + result = evaluate_automation_conditions(defs=defs, instance=instance, cursor=result.cursor) + # A gets materialized, but this shouldn't kick off B and C + assert result.total_requested == 0 + + # A gets materialized on its own, do kick off B and C + defs.get_implicit_global_asset_job_def().execute_in_process( + instance=instance, asset_selection=[A.key] + ) + result = evaluate_automation_conditions(defs=defs, instance=instance, cursor=result.cursor) + assert result.total_requested == 2