diff --git a/python_modules/dagster/dagster/_core/asset_graph_view/asset_graph_view.py b/python_modules/dagster/dagster/_core/asset_graph_view/asset_graph_view.py index 4c982f3fe4b3c..c1486f0d9c8fc 100644 --- a/python_modules/dagster/dagster/_core/asset_graph_view/asset_graph_view.py +++ b/python_modules/dagster/dagster/_core/asset_graph_view/asset_graph_view.py @@ -391,7 +391,7 @@ def compute_missing_subset(self, key: EntityKey, from_subset: EntitySubset) -> E ) @cached_method - def compute_in_progress_subset(self, *, key: EntityKey) -> EntitySubset: + def compute_run_in_progress_subset(self, *, key: EntityKey) -> EntitySubset: from dagster._core.storage.asset_check_execution_record import ( AssetCheckExecutionResolvedStatus, ) @@ -404,6 +404,19 @@ def compute_in_progress_subset(self, *, key: EntityKey) -> EntitySubset: value = self._queryer.get_in_progress_asset_subset(asset_key=key).value return EntitySubset(self, key=key, value=_ValidatedEntitySubsetValue(value)) + @cached_method + def compute_backfill_in_progress_subset(self, *, key: EntityKey) -> EntitySubset: + if isinstance(key, AssetCheckKey): + # asset checks cannot currently be backfilled + return self.get_empty_subset(key=key) + else: + value = ( + self._queryer.get_active_backfill_in_progress_asset_graph_subset() + .get_asset_subset(asset_key=key, asset_graph=self.asset_graph) + .value + ) + return EntitySubset(self, key=key, value=_ValidatedEntitySubsetValue(value)) + @cached_method def compute_execution_failed_subset(self, *, key: EntityKey) -> EntitySubset: from dagster._core.storage.asset_check_execution_record import ( diff --git a/python_modules/dagster/dagster/_core/definitions/declarative_automation/__init__.py b/python_modules/dagster/dagster/_core/definitions/declarative_automation/__init__.py index f47f5f2d05267..018fbcf7e145e 100644 --- a/python_modules/dagster/dagster/_core/definitions/declarative_automation/__init__.py +++ b/python_modules/dagster/dagster/_core/definitions/declarative_automation/__init__.py @@ -1,11 +1,12 @@ from dagster._core.definitions.declarative_automation.operands import ( + BackfillInProgressAutomationCondition as BackfillInProgressAutomationCondition, CronTickPassedCondition as CronTickPassedCondition, ExecutionFailedAutomationCondition as ExecutionFailedAutomationCondition, InLatestTimeWindowCondition as InLatestTimeWindowCondition, - InProgressAutomationCondition as InProgressAutomationCondition, MissingAutomationCondition as MissingAutomationCondition, NewlyRequestedCondition as NewlyRequestedCondition, NewlyUpdatedCondition as NewlyUpdatedCondition, + RunInProgressAutomationCondition as RunInProgressAutomationCondition, WillBeRequestedCondition as WillBeRequestedCondition, ) from dagster._core.definitions.declarative_automation.operands.operands import ( diff --git a/python_modules/dagster/dagster/_core/definitions/declarative_automation/automation_condition.py b/python_modules/dagster/dagster/_core/definitions/declarative_automation/automation_condition.py index 0f53e3d6ecc8a..e19296542a0eb 100644 --- a/python_modules/dagster/dagster/_core/definitions/declarative_automation/automation_condition.py +++ b/python_modules/dagster/dagster/_core/definitions/declarative_automation/automation_condition.py @@ -325,13 +325,24 @@ def missing() -> "BuiltinAutomationCondition": @public @experimental @staticmethod - def in_progress() -> "BuiltinAutomationCondition": + def run_in_progress() -> "BuiltinAutomationCondition": """Returns an AutomationCondition that is true if the target is part of an in-progress run.""" from dagster._core.definitions.declarative_automation.operands import ( - InProgressAutomationCondition, + RunInProgressAutomationCondition, + ) + + return RunInProgressAutomationCondition() + + @public + @experimental + @staticmethod + def backfill_in_progress() -> "BuiltinAutomationCondition": + """Returns an AutomationCondition that is true if the target is part of an in-progress backfill.""" + from dagster._core.definitions.declarative_automation.operands import ( + BackfillInProgressAutomationCondition, ) - return InProgressAutomationCondition() + return BackfillInProgressAutomationCondition() @public @experimental @@ -344,6 +355,17 @@ def execution_failed() -> "BuiltinAutomationCondition": return ExecutionFailedAutomationCondition() + @public + @experimental + @staticmethod + def in_progress() -> "BuiltinAutomationCondition": + """Returns an AutomationCondition that is true for an asset partition if it is part of an + in-progress run or backfill. + """ + return ( + AutomationCondition.run_in_progress() | AutomationCondition.backfill_in_progress() + ).with_label("in_progress") + @public @experimental @staticmethod diff --git a/python_modules/dagster/dagster/_core/definitions/declarative_automation/operands/__init__.py b/python_modules/dagster/dagster/_core/definitions/declarative_automation/operands/__init__.py index 98e0ba0b11517..bef50dd690a7a 100644 --- a/python_modules/dagster/dagster/_core/definitions/declarative_automation/operands/__init__.py +++ b/python_modules/dagster/dagster/_core/definitions/declarative_automation/operands/__init__.py @@ -1,13 +1,14 @@ from dagster._core.definitions.declarative_automation.operands.operands import ( + BackfillInProgressAutomationCondition as BackfillInProgressAutomationCondition, CheckResultCondition as CheckResultCondition, CodeVersionChangedCondition as CodeVersionChangedCondition, CronTickPassedCondition as CronTickPassedCondition, ExecutionFailedAutomationCondition as ExecutionFailedAutomationCondition, InitialEvaluationCondition as InitialEvaluationCondition, InLatestTimeWindowCondition as InLatestTimeWindowCondition, - InProgressAutomationCondition as InProgressAutomationCondition, MissingAutomationCondition as MissingAutomationCondition, NewlyRequestedCondition as NewlyRequestedCondition, NewlyUpdatedCondition as NewlyUpdatedCondition, + RunInProgressAutomationCondition as RunInProgressAutomationCondition, WillBeRequestedCondition as WillBeRequestedCondition, ) diff --git a/python_modules/dagster/dagster/_core/definitions/declarative_automation/operands/operands.py b/python_modules/dagster/dagster/_core/definitions/declarative_automation/operands/operands.py index 4a808d80a5bc6..5dcda1a07ec82 100644 --- a/python_modules/dagster/dagster/_core/definitions/declarative_automation/operands/operands.py +++ b/python_modules/dagster/dagster/_core/definitions/declarative_automation/operands/operands.py @@ -66,15 +66,26 @@ def compute_subset(self, context: AutomationContext) -> EntitySubset: ) +@whitelist_for_serdes(storage_name="InProgressAutomationCondition") +@record +class RunInProgressAutomationCondition(SubsetAutomationCondition): + @property + def name(self) -> str: + return "execution_in_progress" + + def compute_subset(self, context: AutomationContext) -> EntitySubset: + return context.asset_graph_view.compute_run_in_progress_subset(key=context.key) + + @whitelist_for_serdes @record -class InProgressAutomationCondition(SubsetAutomationCondition): +class BackfillInProgressAutomationCondition(SubsetAutomationCondition): @property def name(self) -> str: - return "in_progress" + return "backfill_in_progress" def compute_subset(self, context: AutomationContext) -> EntitySubset: - return context.asset_graph_view.compute_in_progress_subset(key=context.key) + return context.asset_graph_view.compute_backfill_in_progress_subset(key=context.key) @whitelist_for_serdes(storage_name="FailedAutomationCondition") diff --git a/python_modules/dagster/dagster/_utils/caching_instance_queryer.py b/python_modules/dagster/dagster/_utils/caching_instance_queryer.py index c312a6f7f42c8..f9b5348ce60f1 100644 --- a/python_modules/dagster/dagster/_utils/caching_instance_queryer.py +++ b/python_modules/dagster/dagster/_utils/caching_instance_queryer.py @@ -49,6 +49,7 @@ from dagster._utils.cached_method import cached_method if TYPE_CHECKING: + from dagster._core.execution.asset_backfill import AssetBackfillData from dagster._core.storage.event_log import EventLogRecord from dagster._core.storage.event_log.base import AssetRecord from dagster._core.storage.partition_status_cache import AssetStatusCacheValue @@ -537,13 +538,10 @@ def get_current_materializations_for_run(self, *, run_id: str) -> AbstractSet[As #################### @cached_method - def get_active_backfill_target_asset_graph_subset(self) -> AssetGraphSubset: - """Returns an AssetGraphSubset representing the set of assets that are currently targeted by - an active asset backfill. - """ + def get_active_backfill_datas(self) -> Sequence["AssetBackfillData"]: from dagster._core.execution.backfill import BulkActionsFilter, BulkActionStatus - asset_backfills = [ + active_backfills = [ backfill for backfill in self.instance.get_backfills( filters=BulkActionsFilter(statuses=[BulkActionStatus.REQUESTED]) @@ -551,19 +549,41 @@ def get_active_backfill_target_asset_graph_subset(self) -> AssetGraphSubset: if backfill.is_asset_backfill ] - result = AssetGraphSubset() - for asset_backfill in asset_backfills: + backfill_datas = [] + for backfill in active_backfills: try: - asset_backfill_data = asset_backfill.get_asset_backfill_data(self.asset_graph) + backfill_datas.append(backfill.get_asset_backfill_data(self.asset_graph)) except DagsterDefinitionChangedDeserializationError: self._logger.warning( - f"Not considering assets in backfill {asset_backfill.backfill_id} since its" + f"Not considering assets in backfill {backfill.backfill_id} since its" " data could not be deserialized" ) # Backfill can't be loaded, so no risk of the assets interfering continue + return backfill_datas - result |= asset_backfill_data.target_subset + @cached_method + def get_active_backfill_target_asset_graph_subset(self) -> AssetGraphSubset: + """Returns an AssetGraphSubset representing the set of assets that are currently targeted by + an active asset backfill. + """ + result = AssetGraphSubset() + for data in self.get_active_backfill_datas(): + result |= data.target_subset + + return result + + @cached_method + def get_active_backfill_in_progress_asset_graph_subset(self) -> AssetGraphSubset: + """Returns an AssetGraphSubset representing the set of assets that are currently targeted by + an active asset backfill and have not yet been materialized or failed. + """ + result = AssetGraphSubset() + for data in self.get_active_backfill_datas(): + in_progress_subset = ( + data.target_subset - data.materialized_subset - data.failed_and_downstream_subset + ) + result |= in_progress_subset return result diff --git a/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/automation_condition_tests/fundamentals/test_result_value_hash.py b/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/automation_condition_tests/fundamentals/test_result_value_hash.py index c3b1b2e00b265..1928c61acb9bb 100644 --- a/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/automation_condition_tests/fundamentals/test_result_value_hash.py +++ b/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/automation_condition_tests/fundamentals/test_result_value_hash.py @@ -38,11 +38,11 @@ ("dd74c7cfe19d869931ea4aad9ee10127", SC.on_cron("0 * * * *"), two_parents, False), ("861f8e40d4624d49c4ebdd034c8e1e84", SC.on_cron("0 * * * *"), two_parents_daily, False), # same as above - ("b5cb0d7a1c627bd2c9e7c6da3313ab71", SC.eager(), one_parent, False), - ("7802a65024d04bbe44a3e0e541c0a577", SC.eager(), one_parent, True), - ("5d9c70da7ecca9e40f32c1ad99956b5d", SC.eager(), one_parent_daily, False), - ("904bac575906542d28b9e069129dad37", SC.eager(), two_parents, False), - ("3ef1d373a2b38752ad8e23fe9c053d9f", SC.eager(), two_parents_daily, False), + ("dfb268e321e2e7aa7b0e2e71fa674e06", SC.eager(), one_parent, False), + ("781252e1a53db1ecd5938b0da61dba0b", SC.eager(), one_parent, True), + ("293186887409aac2fe99b09bd633c64b", SC.eager(), one_parent_daily, False), + ("c92d9d5181b4d0a6c7ab5d1c6e26962a", SC.eager(), two_parents, False), + ("911bcc4f8904ec6dae85f6aaf78f5ee5", SC.eager(), two_parents_daily, False), # missing condition is invariant to changes other than partitions def changes ("6d7809c4949e3d812d7eddfb1b60d529", SC.missing(), one_parent, False), ("6d7809c4949e3d812d7eddfb1b60d529", SC.missing(), one_parent, True), diff --git a/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/daemon_tests/test_e2e.py b/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/daemon_tests/test_e2e.py index 0fc3ae4eb0bd0..03d6f8d3c3396 100644 --- a/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/daemon_tests/test_e2e.py +++ b/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/daemon_tests/test_e2e.py @@ -5,7 +5,6 @@ from typing import AbstractSet, Mapping, Sequence, cast import dagster._check as check -import pytest from dagster import AssetMaterialization, RunsFilter, instance_for_test from dagster._core.asset_graph_view.serializable_entity_subset import SerializableEntitySubset from dagster._core.definitions.asset_daemon_cursor import AssetDaemonCursor @@ -419,7 +418,6 @@ def _get_subsets_by_key( return {s.key: s for s in target_subset.iterate_asset_subsets(asset_graph)} -@pytest.mark.skip("Pending change to in_progress() behavior") def test_backfill_creation_simple() -> None: with get_workspace_request_context( ["backfill_simple"] @@ -462,7 +460,6 @@ def test_backfill_creation_simple() -> None: assert len(runs) == 0 -@pytest.mark.skip("Pending change to in_progress() behavior") def test_backfill_with_runs_and_checks() -> None: with get_workspace_request_context( ["backfill_with_runs_and_checks"]