diff --git a/python_modules/dagster/dagster/_core/asset_graph_view/asset_graph_view.py b/python_modules/dagster/dagster/_core/asset_graph_view/asset_graph_view.py index f921bea80a7f8..4c982f3fe4b3c 100644 --- a/python_modules/dagster/dagster/_core/asset_graph_view/asset_graph_view.py +++ b/python_modules/dagster/dagster/_core/asset_graph_view/asset_graph_view.py @@ -30,8 +30,10 @@ from dagster._core.definitions.definitions_class import Definitions from dagster._core.definitions.partition import PartitionsDefinition from dagster._core.instance import DagsterInstance + from dagster._core.storage.asset_check_execution_record import AssetCheckExecutionResolvedStatus from dagster._utils.caching_instance_queryer import CachingInstanceQueryer + U_EntityKey = TypeVar("U_EntityKey", AssetKey, AssetCheckKey, EntityKey) @@ -343,21 +345,38 @@ def compute_latest_time_window_subset( else: check.failed(f"Unsupported partitions_def: {partitions_def}") - def compute_missing_subset( - self, asset_key: "AssetKey", from_subset: EntitySubset[AssetKey] - ) -> EntitySubset[AssetKey]: + def compute_subset_with_status( + self, key: AssetCheckKey, status: Optional["AssetCheckExecutionResolvedStatus"] + ): + """Returns the subset of an asset check that matches a given status.""" + from dagster._core.storage.asset_check_execution_record import AssetCheckExecutionRecord + + latest_record = AssetCheckExecutionRecord.blocking_get(self, key) + resolved_status = ( + latest_record.resolve_status(self) + if latest_record and latest_record.targets_latest_materialization(self) + else None + ) + if resolved_status == status: + return self.get_full_subset(key=key) + else: + return self.get_empty_subset(key=key) + + def compute_missing_subset(self, key: EntityKey, from_subset: EntitySubset) -> EntitySubset: """Returns a subset which is the subset of the input subset that has never been materialized (if it is a materializable asset) or observered (if it is an observable asset). """ + if isinstance(key, AssetCheckKey): + return self.compute_subset_with_status(key, status=None) # TODO: this logic should be simplified once we have a unified way of detecting both # materializations and observations through the parittion status cache. at that point, the # definition will slightly change to search for materializations and observations regardless # of the materializability of the asset - if self.asset_graph.get(asset_key).is_materializable: + elif self.asset_graph.get(key).is_materializable: # cheap call which takes advantage of the partition status cache - materialized_subset = self._queryer.get_materialized_asset_subset(asset_key=asset_key) + materialized_subset = self._queryer.get_materialized_asset_subset(asset_key=key) materialized_subset = EntitySubset( - self, key=asset_key, value=_ValidatedEntitySubsetValue(materialized_subset.value) + self, key=key, value=_ValidatedEntitySubsetValue(materialized_subset.value) ) return from_subset.compute_difference(materialized_subset) else: @@ -368,19 +387,36 @@ def compute_missing_subset( if not self._queryer.asset_partition_has_materialization_or_observation(ap) } return self.get_asset_subset_from_asset_partitions( - key=asset_key, asset_partitions=missing_asset_partitions + key=key, asset_partitions=missing_asset_partitions ) @cached_method - def compute_in_progress_asset_subset(self, *, asset_key: AssetKey) -> EntitySubset[AssetKey]: - # part of in progress run - value = self._queryer.get_in_progress_asset_subset(asset_key=asset_key).value - return EntitySubset(self, key=asset_key, value=_ValidatedEntitySubsetValue(value)) + def compute_in_progress_subset(self, *, key: EntityKey) -> EntitySubset: + from dagster._core.storage.asset_check_execution_record import ( + AssetCheckExecutionResolvedStatus, + ) + + if isinstance(key, AssetCheckKey): + return self.compute_subset_with_status( + key, AssetCheckExecutionResolvedStatus.IN_PROGRESS + ) + else: + value = self._queryer.get_in_progress_asset_subset(asset_key=key).value + return EntitySubset(self, key=key, value=_ValidatedEntitySubsetValue(value)) @cached_method - def compute_failed_asset_subset(self, *, asset_key: "AssetKey") -> EntitySubset[AssetKey]: - value = self._queryer.get_failed_asset_subset(asset_key=asset_key).value - return EntitySubset(self, key=asset_key, value=_ValidatedEntitySubsetValue(value)) + def compute_execution_failed_subset(self, *, key: EntityKey) -> EntitySubset: + from dagster._core.storage.asset_check_execution_record import ( + AssetCheckExecutionResolvedStatus, + ) + + if isinstance(key, AssetCheckKey): + return self.compute_subset_with_status( + key, AssetCheckExecutionResolvedStatus.EXECUTION_FAILED + ) + else: + value = self._queryer.get_failed_asset_subset(asset_key=key).value + return EntitySubset(self, key=key, value=_ValidatedEntitySubsetValue(value)) @cached_method def compute_updated_since_cursor_subset( @@ -391,6 +427,25 @@ def compute_updated_since_cursor_subset( ).value return EntitySubset(self, key=asset_key, value=_ValidatedEntitySubsetValue(value)) + @cached_method + def compute_updated_since_time_subset( + self, *, key: AssetCheckKey, time: datetime + ) -> EntitySubset[AssetCheckKey]: + from dagster._core.events import DagsterEventType + from dagster._core.storage.asset_check_execution_record import AssetCheckExecutionRecord + + # intentionally left unimplemented for AssetKey, as this is a less performant query + record = AssetCheckExecutionRecord.blocking_get(self, key) + if ( + record is None + or record.event is None + or record.event.dagster_event_type != DagsterEventType.ASSET_CHECK_EVALUATION + or record.event.timestamp < time.timestamp() + ): + return self.get_empty_subset(key=key) + else: + return self.get_full_subset(key=key) + class MultiDimInfo(NamedTuple): tw_dim: PartitionDimensionDefinition secondary_dim: PartitionDimensionDefinition diff --git a/python_modules/dagster/dagster/_core/definitions/declarative_automation/__init__.py b/python_modules/dagster/dagster/_core/definitions/declarative_automation/__init__.py index e42d23c711f37..39ca4d1528a60 100644 --- a/python_modules/dagster/dagster/_core/definitions/declarative_automation/__init__.py +++ b/python_modules/dagster/dagster/_core/definitions/declarative_automation/__init__.py @@ -1,7 +1,7 @@ from dagster._core.definitions.declarative_automation.operands import ( CodeVersionChangedCondition as CodeVersionChangedCondition, CronTickPassedCondition as CronTickPassedCondition, - FailedAutomationCondition as FailedAutomationCondition, + ExecutionFailedAutomationCondition as ExecutionFailedAutomationCondition, InLatestTimeWindowCondition as InLatestTimeWindowCondition, InProgressAutomationCondition as InProgressAutomationCondition, MissingAutomationCondition as MissingAutomationCondition, diff --git a/python_modules/dagster/dagster/_core/definitions/declarative_automation/automation_condition.py b/python_modules/dagster/dagster/_core/definitions/declarative_automation/automation_condition.py index ab9a6944bb735..2aad22f3aa142 100644 --- a/python_modules/dagster/dagster/_core/definitions/declarative_automation/automation_condition.py +++ b/python_modules/dagster/dagster/_core/definitions/declarative_automation/automation_condition.py @@ -40,7 +40,7 @@ from dagster._core.definitions.declarative_automation.operands import ( CodeVersionChangedCondition, CronTickPassedCondition, - FailedAutomationCondition, + ExecutionFailedAutomationCondition, InitialEvaluationCondition, InLatestTimeWindowCondition, InProgressAutomationCondition, @@ -231,16 +231,15 @@ def since( def newly_true(self) -> "NewlyTrueCondition[T_EntityKey]": """Returns an AutomationCondition that is true only on the tick that this condition goes - from false to true for a given asset partition. + from false to true for a given target. """ from dagster._core.definitions.declarative_automation.operators import NewlyTrueCondition return NewlyTrueCondition(operand=self) - def since_last_handled(self: "AutomationCondition[AssetKey]") -> "SinceCondition[AssetKey]": + def since_last_handled(self: "AutomationCondition") -> "SinceCondition": """Returns an AutomationCondition that is true if this condition has become true since the - asset partition was last requested or updated, and since the last time this entity's - condition was modified. + target was last requested or updated, and since the last time this target's condition was modified. """ with disable_dagster_warnings(): return self.since( @@ -257,7 +256,7 @@ def since_last_handled(self: "AutomationCondition[AssetKey]") -> "SinceCondition def asset_matches( key: "CoercibleToAssetKey", condition: "AutomationCondition[AssetKey]" ) -> "EntityMatchesCondition": - """Returns an AutomationCondition that is true if this condition is true for the given entity key.""" + """Returns an AutomationCondition that is true if this condition is true for the given key.""" from dagster._core.definitions.declarative_automation.operators import ( EntityMatchesCondition, ) @@ -269,12 +268,12 @@ def asset_matches( @experimental @staticmethod def any_deps_match(condition: "AutomationCondition") -> "AnyDepsCondition": - """Returns an AutomationCondition that is true for an asset partition if at least one partition - of any of its dependencies evaluate to True for the given condition. + """Returns an AutomationCondition that is true for a if at least one partition + of the any of the target's dependencies evaluate to True for the given condition. Args: condition (AutomationCondition): The AutomationCondition that will be evaluated against - this asset's dependencies. + this target's dependencies. """ from dagster._core.definitions.declarative_automation.operators import AnyDepsCondition @@ -284,12 +283,12 @@ def any_deps_match(condition: "AutomationCondition") -> "AnyDepsCondition": @experimental @staticmethod def all_deps_match(condition: "AutomationCondition") -> "AllDepsCondition": - """Returns an AutomationCondition that is true for an asset partition if at least one partition - of all of its dependencies evaluate to True for the given condition. + """Returns an AutomationCondition that is true for a if at least one partition + of the all of the target's dependencies evaluate to True for the given condition. Args: condition (AutomationCondition): The AutomationCondition that will be evaluated against - this asset's dependencies. + this target's dependencies. """ from dagster._core.definitions.declarative_automation.operators import AllDepsCondition @@ -301,7 +300,7 @@ def all_deps_match(condition: "AutomationCondition") -> "AllDepsCondition": def any_checks_match( condition: "AutomationCondition[AssetCheckKey]", blocking_only: bool = False ) -> "AnyChecksCondition": - """Returns an AutomationCondition that is true for an asset partition if at least one of its + """Returns an AutomationCondition that is true for if at least one of the target's checks evaluate to True for the given condition. Args: @@ -337,9 +336,7 @@ def all_checks_match( @experimental @staticmethod def missing() -> "MissingAutomationCondition": - """Returns an AutomationCondition that is true for an asset partition if it has never been - materialized or observed. - """ + """Returns an AutomationCondition that is true if the target has not been executed.""" from dagster._core.definitions.declarative_automation.operands import ( MissingAutomationCondition, ) @@ -350,9 +347,7 @@ def missing() -> "MissingAutomationCondition": @experimental @staticmethod def in_progress() -> "InProgressAutomationCondition": - """Returns an AutomationCondition that is true for an asset partition if it is part of an - in-progress run. - """ + """Returns an AutomationCondition that is true if the target is part of an in-progress run.""" from dagster._core.definitions.declarative_automation.operands import ( InProgressAutomationCondition, ) @@ -362,13 +357,35 @@ def in_progress() -> "InProgressAutomationCondition": @public @experimental @staticmethod - def failed() -> "FailedAutomationCondition": - """Returns an AutomationCondition that is true for an asset partition if its latest run failed.""" + def execution_failed() -> "ExecutionFailedAutomationCondition": + """Returns an AutomationCondition that is true if the latest execution of the target failed.""" from dagster._core.definitions.declarative_automation.operands import ( - FailedAutomationCondition, + ExecutionFailedAutomationCondition, ) - return FailedAutomationCondition() + return ExecutionFailedAutomationCondition() + + @public + @experimental + @staticmethod + def check_passed() -> "AutomationCondition[AssetCheckKey]": + """Returns an AutomationCondition that is true for an asset check if it has evaluated against + the latest materialization of an asset and passed. + """ + from dagster._core.definitions.declarative_automation.operands import CheckResultCondition + + return CheckResultCondition(passed=True) + + @public + @experimental + @staticmethod + def check_failed() -> "AutomationCondition[AssetCheckKey]": + """Returns an AutomationCondition that is true for an asset check if it has evaluated against + the latest materialization of an asset and failed. + """ + from dagster._core.definitions.declarative_automation.operands import CheckResultCondition + + return CheckResultCondition(passed=False) @public @experimental @@ -387,7 +404,7 @@ def initial_evaluation() -> "InitialEvaluationCondition": def in_latest_time_window( lookback_delta: Optional[datetime.timedelta] = None, ) -> "InLatestTimeWindowCondition": - """Returns an AutomationCondition that is true for an asset partition when it is within the latest + """Returns an AutomationCondition that is true when the target it is within the latest time window. Args: @@ -406,7 +423,7 @@ def in_latest_time_window( @experimental @staticmethod def will_be_requested() -> "WillBeRequestedCondition": - """Returns an AutomationCondition that is true for an asset partition if it will be requested this tick.""" + """Returns an AutomationCondition that is true if the target will be requested this tick.""" from dagster._core.definitions.declarative_automation.operands import ( WillBeRequestedCondition, ) @@ -417,7 +434,7 @@ def will_be_requested() -> "WillBeRequestedCondition": @experimental @staticmethod def newly_updated() -> "NewlyUpdatedCondition": - """Returns an AutomationCondition that is true for an asset partition if it has been updated since the previous tick.""" + """Returns an AutomationCondition that is true if the target has been updated since the previous tick.""" from dagster._core.definitions.declarative_automation.operands import NewlyUpdatedCondition return NewlyUpdatedCondition() @@ -426,7 +443,7 @@ def newly_updated() -> "NewlyUpdatedCondition": @experimental @staticmethod def newly_requested() -> "NewlyRequestedCondition": - """Returns an AutomationCondition that is true for an asset partition if it was requested on the previous tick.""" + """Returns an AutomationCondition that is true if the target was requested on the previous tick.""" from dagster._core.definitions.declarative_automation.operands import ( NewlyRequestedCondition, ) @@ -437,8 +454,8 @@ def newly_requested() -> "NewlyRequestedCondition": @experimental @staticmethod def code_version_changed() -> "CodeVersionChangedCondition": - """Returns an AutomationCondition that is true for an asset partition if its asset's code - version has been changed since the previous tick. + """Returns an AutomationCondition that is true if the target's code version has been changed + since the previous tick. """ from dagster._core.definitions.declarative_automation.operands import ( CodeVersionChangedCondition, @@ -452,7 +469,7 @@ def code_version_changed() -> "CodeVersionChangedCondition": def cron_tick_passed( cron_schedule: str, cron_timezone: str = "UTC" ) -> "CronTickPassedCondition": - """Returns an AutomationCondition that is true for all asset partitions whenever a cron tick of the provided schedule is passed.""" + """Returns an AutomationCondition that is whenever a cron tick of the provided schedule is passed.""" from dagster._core.definitions.declarative_automation.operands import ( CronTickPassedCondition, ) @@ -464,15 +481,15 @@ def cron_tick_passed( @experimental @staticmethod def newly_missing() -> "AutomationCondition": - """Returns an AutomationCondition that is true on the tick that an asset partition becomes missing.""" + """Returns an AutomationCondition that is true on the tick that the target becomes missing.""" with disable_dagster_warnings(): return AutomationCondition.missing().newly_true().with_label("newly_missing") @experimental @staticmethod def any_deps_updated() -> "AnyDepsCondition": - """Returns an AutomationCondition that is true for any asset partition with at least one - dependency that has updated since the previous tick, or will be requested on this tick. + """Returns an AutomationCondition that is true if the target has at least one dependency + that has updated since the previous tick, or will be requested on this tick. """ with disable_dagster_warnings(): return AutomationCondition.any_deps_match( @@ -482,8 +499,8 @@ def any_deps_updated() -> "AnyDepsCondition": @experimental @staticmethod def any_deps_missing() -> "AnyDepsCondition": - """Returns an AutomationCondition that is true for any asset partition with at least one - dependency that is missing, and will not be requested on this tick. + """Returns an AutomationCondition that is true if the target has at least one dependency + that is missing, and will not be requested on this tick. """ with disable_dagster_warnings(): return AutomationCondition.any_deps_match( @@ -493,8 +510,8 @@ def any_deps_missing() -> "AnyDepsCondition": @experimental @staticmethod def any_deps_in_progress() -> "AnyDepsCondition": - """Returns an AutomationCondition that is true for any asset partition with at least one - dependency that is in progress. + """Returns an AutomationCondition that is true if the target has at least one dependency + that is in progress. """ with disable_dagster_warnings(): return AutomationCondition.any_deps_match(AutomationCondition.in_progress()).with_label( @@ -506,8 +523,8 @@ def any_deps_in_progress() -> "AnyDepsCondition": def all_deps_updated_since_cron( cron_schedule: str, cron_timezone: str = "UTC" ) -> "AllDepsCondition": - """Returns an AutomatonCondition that is true for any asset partition where all of its - dependencies have updated since the latest tick of the provided cron schedule. + """Returns an AutomatonCondition that is true if all of the target's dependencies have + updated since the latest tick of the provided cron schedule. """ with disable_dagster_warnings(): return AutomationCondition.all_deps_match( @@ -520,16 +537,15 @@ def all_deps_updated_since_cron( @public @experimental @staticmethod - def eager() -> "AndAutomationCondition[AssetKey]": - """Returns an AutomationCondition which will cause missing asset partitions to be - materialized, and will materialize asset partitions whenever their parents are updated. + def eager() -> "AndAutomationCondition": + """Returns an AutomationCondition which will cause a target to be executed if any of + its dependencies update, and will execute missing partitions if they become missing + after this condition is applied to the target. - Will only materialize missing partitions if they become missing after this condition is - added to an asset. For time partitioned assets, only the latest time partition will be considered. + This will not execute targets that have any missing or in progress dependencies, or + are currently in progress. - This will never evaluate to true if the asset has any upstream partitions which are missing - or part of an in progress run, and will never evaluate to true if the provided asset partition - is already part of an in progress run. + For time partitioned assets, only the latest time partition will be considered. """ with disable_dagster_warnings(): return ( @@ -545,11 +561,9 @@ def eager() -> "AndAutomationCondition[AssetKey]": @public @experimental @staticmethod - def on_cron( - cron_schedule: str, cron_timezone: str = "UTC" - ) -> "AndAutomationCondition[AssetKey]": - """Returns an AutomationCondition which will cause asset partitions to be materialized - on a given cron schedule, after all of their dependencies have been updated since the latest + def on_cron(cron_schedule: str, cron_timezone: str = "UTC") -> "AndAutomationCondition": + """Returns an AutomationCondition which will cause a target to be executed on a given + cron schedule, after all of its dependencies have been updated since the latest tick of that cron schedule. For time partitioned assets, only the latest time partition will be considered. @@ -566,12 +580,13 @@ def on_cron( @public @experimental @staticmethod - def on_missing() -> "AndAutomationCondition[AssetKey]": - """Returns an AutomationCondition which will cause missing asset partitions to be materialized as soon as possible, - after all of their dependencies have been materialized. + def on_missing() -> "AndAutomationCondition": + """Returns an AutomationCondition which will execute partitions of the target that + are added after this condition is applied to the asset. - Will only materialize missing partitions if they become missing after this condition is - added to an asset. For time partitioned assets, only the latest time partition will be considered. + This will not execute targets that have any missing dependencies. + + For time partitioned assets, only the latest time partition will be considered. """ with disable_dagster_warnings(): return ( diff --git a/python_modules/dagster/dagster/_core/definitions/declarative_automation/automation_condition_tester.py b/python_modules/dagster/dagster/_core/definitions/declarative_automation/automation_condition_tester.py index a7c3ba805607c..cd80079f169e6 100644 --- a/python_modules/dagster/dagster/_core/definitions/declarative_automation/automation_condition_tester.py +++ b/python_modules/dagster/dagster/_core/definitions/declarative_automation/automation_condition_tester.py @@ -46,7 +46,7 @@ def _requested_partitions_by_asset_key(self) -> Mapping[AssetKey, AbstractSet[Op @property def total_requested(self) -> int: """Returns the total number of asset partitions requested during this evaluation.""" - return len(self._requested_asset_partitions) + return sum(r.true_subset.size for r in self.results) def get_requested_partitions(self, asset_key: AssetKey) -> AbstractSet[Optional[str]]: """Returns the specific partition keys requested for the given asset during this evaluation.""" @@ -113,7 +113,9 @@ def my_dbt_assets(context: AssetExecutionContext, dbt: DbtCliResource): defs = Definitions(assets=defs) if asset_selection is None: - asset_selection = AssetSelection.all(include_sources=True) + asset_selection = ( + AssetSelection.all(include_sources=True) | AssetSelection.all_asset_checks() + ) asset_graph = defs.get_asset_graph() evaluator = AutomationConditionEvaluator( @@ -122,6 +124,7 @@ def my_dbt_assets(context: AssetExecutionContext, dbt: DbtCliResource): entity_keys={ key for key in asset_selection.resolve(asset_graph) + | asset_selection.resolve_checks(asset_graph) if asset_graph.get(key).automation_condition is not None }, evaluation_time=evaluation_time, diff --git a/python_modules/dagster/dagster/_core/definitions/declarative_automation/operands/__init__.py b/python_modules/dagster/dagster/_core/definitions/declarative_automation/operands/__init__.py index 7ccdcc6f73380..f4801d5911656 100644 --- a/python_modules/dagster/dagster/_core/definitions/declarative_automation/operands/__init__.py +++ b/python_modules/dagster/dagster/_core/definitions/declarative_automation/operands/__init__.py @@ -2,8 +2,9 @@ CodeVersionChangedCondition as CodeVersionChangedCondition, ) from dagster._core.definitions.declarative_automation.operands.slice_conditions import ( + CheckResultCondition as CheckResultCondition, CronTickPassedCondition as CronTickPassedCondition, - FailedAutomationCondition as FailedAutomationCondition, + ExecutionFailedAutomationCondition as ExecutionFailedAutomationCondition, InitialEvaluationCondition as InitialEvaluationCondition, InLatestTimeWindowCondition as InLatestTimeWindowCondition, InProgressAutomationCondition as InProgressAutomationCondition, diff --git a/python_modules/dagster/dagster/_core/definitions/declarative_automation/operands/slice_conditions.py b/python_modules/dagster/dagster/_core/definitions/declarative_automation/operands/slice_conditions.py index ab7c9cdde61d4..50b210f61a83a 100644 --- a/python_modules/dagster/dagster/_core/definitions/declarative_automation/operands/slice_conditions.py +++ b/python_modules/dagster/dagster/_core/definitions/declarative_automation/operands/slice_conditions.py @@ -3,7 +3,7 @@ from typing import Optional from dagster._core.asset_graph_view.entity_subset import EntitySubset -from dagster._core.definitions.asset_key import AssetKey, T_EntityKey +from dagster._core.definitions.asset_key import AssetCheckKey, AssetKey, T_EntityKey from dagster._core.definitions.declarative_automation.automation_condition import ( AutomationResult, BuiltinAutomationCondition, @@ -62,7 +62,7 @@ def evaluate(self, context: AutomationContext[T_EntityKey]) -> AutomationResult[ @whitelist_for_serdes @record -class MissingAutomationCondition(SubsetAutomationCondition[AssetKey]): +class MissingAutomationCondition(SubsetAutomationCondition): @property def description(self) -> str: return "Missing" @@ -71,7 +71,7 @@ def description(self) -> str: def name(self) -> str: return "missing" - def compute_subset(self, context: AutomationContext) -> EntitySubset[AssetKey]: + def compute_subset(self, context: AutomationContext) -> EntitySubset: return context.asset_graph_view.compute_missing_subset( context.key, from_subset=context.candidate_subset ) @@ -79,7 +79,7 @@ def compute_subset(self, context: AutomationContext) -> EntitySubset[AssetKey]: @whitelist_for_serdes @record -class InProgressAutomationCondition(SubsetAutomationCondition[AssetKey]): +class InProgressAutomationCondition(SubsetAutomationCondition): @property def description(self) -> str: return "Part of an in-progress run" @@ -88,28 +88,28 @@ def description(self) -> str: def name(self) -> str: return "in_progress" - def compute_subset(self, context: AutomationContext) -> EntitySubset[AssetKey]: - return context.asset_graph_view.compute_in_progress_asset_subset(asset_key=context.key) + def compute_subset(self, context: AutomationContext) -> EntitySubset: + return context.asset_graph_view.compute_in_progress_subset(key=context.key) -@whitelist_for_serdes +@whitelist_for_serdes(storage_name="FailedAutomationCondition") @record -class FailedAutomationCondition(SubsetAutomationCondition[AssetKey]): +class ExecutionFailedAutomationCondition(SubsetAutomationCondition): @property def description(self) -> str: - return "Latest run failed" + return "Latest execution failed" @property def name(self) -> str: - return "failed" + return "execution_failed" - def compute_subset(self, context: AutomationContext) -> EntitySubset[AssetKey]: - return context.asset_graph_view.compute_failed_asset_subset(asset_key=context.key) + def compute_subset(self, context: AutomationContext) -> EntitySubset: + return context.asset_graph_view.compute_execution_failed_subset(key=context.key) @whitelist_for_serdes @record -class WillBeRequestedCondition(SubsetAutomationCondition[AssetKey]): +class WillBeRequestedCondition(SubsetAutomationCondition): @property def description(self) -> str: return "Will be requested this tick" @@ -129,7 +129,7 @@ def _executable_with_root_context_key(self, context: AutomationContext) -> bool: parent_key=context.key, ) - def compute_subset(self, context: AutomationContext) -> EntitySubset[AssetKey]: + def compute_subset(self, context: AutomationContext) -> EntitySubset: current_result = context.current_results_by_key.get(context.key) if ( current_result @@ -143,7 +143,7 @@ def compute_subset(self, context: AutomationContext) -> EntitySubset[AssetKey]: @whitelist_for_serdes @record -class NewlyRequestedCondition(SubsetAutomationCondition[AssetKey]): +class NewlyRequestedCondition(SubsetAutomationCondition): @property def description(self) -> str: return "Was requested on the previous tick" @@ -152,13 +152,13 @@ def description(self) -> str: def name(self) -> str: return "newly_requested" - def compute_subset(self, context: AutomationContext) -> EntitySubset[AssetKey]: + def compute_subset(self, context: AutomationContext) -> EntitySubset: return context.previous_requested_subset or context.get_empty_subset() @whitelist_for_serdes @record -class NewlyUpdatedCondition(SubsetAutomationCondition[AssetKey]): +class NewlyUpdatedCondition(SubsetAutomationCondition): @property def description(self) -> str: return "Updated since previous tick" @@ -167,14 +167,19 @@ def description(self) -> str: def name(self) -> str: return "newly_updated" - def compute_subset(self, context: AutomationContext) -> EntitySubset[AssetKey]: + def compute_subset(self, context: AutomationContext) -> EntitySubset: # if it's the first time evaluating, just return the empty subset if context.previous_evaluation_time is None: return context.get_empty_subset() - else: + elif isinstance(context.key, AssetKey): return context.asset_graph_view.compute_updated_since_cursor_subset( asset_key=context.key, cursor=context.previous_max_storage_id ) + else: + return context.asset_graph_view.compute_updated_since_time_subset( + key=context.key, + time=context.previous_evaluation_time, + ) @whitelist_for_serdes @@ -214,7 +219,7 @@ def compute_subset(self, context: AutomationContext) -> EntitySubset: @whitelist_for_serdes @record -class InLatestTimeWindowCondition(SubsetAutomationCondition[AssetKey]): +class InLatestTimeWindowCondition(SubsetAutomationCondition): serializable_lookback_timedelta: Optional[SerializableTimeDelta] = None @staticmethod @@ -247,7 +252,33 @@ def description(self) -> str: def name(self) -> str: return "in_latest_time_window" - def compute_subset(self, context: AutomationContext) -> EntitySubset[AssetKey]: + def compute_subset(self, context: AutomationContext) -> EntitySubset: return context.asset_graph_view.compute_latest_time_window_subset( context.key, lookback_delta=self.lookback_timedelta ) + + +@whitelist_for_serdes +@record +class CheckResultCondition(SubsetAutomationCondition[AssetCheckKey]): + passed: bool + + @property + def name(self) -> str: + return "check_passed" if self.passed else "check_failed" + + def compute_subset( + self, context: AutomationContext[AssetCheckKey] + ) -> EntitySubset[AssetCheckKey]: + from dagster._core.storage.asset_check_execution_record import ( + AssetCheckExecutionResolvedStatus, + ) + + target_status = ( + AssetCheckExecutionResolvedStatus.SUCCEEDED + if self.passed + else AssetCheckExecutionResolvedStatus.FAILED + ) + return context.asset_graph_view.compute_subset_with_status( + key=context.key, status=target_status + ) diff --git a/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/automation_condition_tests/builtins/test_check_passed_failed_conditions.py b/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/automation_condition_tests/builtins/test_check_passed_failed_conditions.py new file mode 100644 index 0000000000000..823ba93fa5374 --- /dev/null +++ b/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/automation_condition_tests/builtins/test_check_passed_failed_conditions.py @@ -0,0 +1,56 @@ +import pytest +from dagster import ( + AssetCheckResult, + AssetMaterialization, + AutomationCondition, + DagsterInstance, + Definitions, + asset, + asset_check, + evaluate_automation_conditions, +) + + +@pytest.mark.parametrize("passed", [True, False]) +def test_check_result_conditions(passed: bool) -> None: + condition = AutomationCondition.check_passed() if passed else AutomationCondition.check_failed() + + @asset + def A() -> None: ... + + @asset_check(asset=A, automation_condition=condition) + def foo_check() -> AssetCheckResult: + return AssetCheckResult(passed=passed) + + defs = Definitions(assets=[A], asset_checks=[foo_check]) + instance = DagsterInstance.ephemeral() + check_job = defs.get_implicit_global_asset_job_def().get_subset( + asset_check_selection={foo_check.check_key} + ) + + # hasn't been executed + result = evaluate_automation_conditions(defs=defs, instance=instance) + assert result.total_requested == 0 + + # now gets executed, so the status matches + check_job.execute_in_process(instance=instance) + result = evaluate_automation_conditions(defs=defs, instance=instance, cursor=result.cursor) + assert result.total_requested == 1 + + # stays true + result = evaluate_automation_conditions(defs=defs, instance=instance, cursor=result.cursor) + assert result.total_requested == 1 + result = evaluate_automation_conditions(defs=defs, instance=instance, cursor=result.cursor) + assert result.total_requested == 1 + + # now the parent asset gets materialized, which means that the status goes to "missing" + instance.report_runless_asset_event(AssetMaterialization("A")) + result = evaluate_automation_conditions(defs=defs, instance=instance, cursor=result.cursor) + assert result.total_requested == 0 + result = evaluate_automation_conditions(defs=defs, instance=instance, cursor=result.cursor) + assert result.total_requested == 0 + + # now gets executed again, so the status matches + check_job.execute_in_process(instance=instance) + result = evaluate_automation_conditions(defs=defs, instance=instance, cursor=result.cursor) + assert result.total_requested == 1 diff --git a/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/automation_condition_tests/builtins/test_eager_condition.py b/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/automation_condition_tests/builtins/test_eager_condition.py index 6f65a9baf17ca..06420573bea6c 100644 --- a/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/automation_condition_tests/builtins/test_eager_condition.py +++ b/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/automation_condition_tests/builtins/test_eager_condition.py @@ -1,9 +1,12 @@ from dagster import ( AssetKey, + AssetMaterialization, AutomationCondition, + DagsterInstance, Definitions, StaticPartitionsDefinition, asset, + asset_check, evaluate_automation_conditions, ) from dagster._core.instance_for_test import instance_for_test @@ -149,3 +152,37 @@ def B() -> None: ... defs=_get_defs(four_partitions), instance=instance, cursor=result.cursor ) assert result.total_requested == 0 + + +def test_eager_on_asset_check() -> None: + @asset + def A() -> None: ... + + @asset_check(asset=A, automation_condition=AutomationCondition.eager()) + def foo_check() -> ...: ... + + defs = Definitions(assets=[A], asset_checks=[foo_check]) + + instance = DagsterInstance.ephemeral() + + # parent hasn't been updated yet + result = evaluate_automation_conditions(defs=defs, instance=instance) + assert result.total_requested == 0 + + # now A is updated, so request + instance.report_runless_asset_event(AssetMaterialization("A")) + result = evaluate_automation_conditions(defs=defs, instance=instance, cursor=result.cursor) + assert result.total_requested == 1 + + # don't keep requesting + result = evaluate_automation_conditions(defs=defs, instance=instance, cursor=result.cursor) + assert result.total_requested == 0 + + # A updated again, re-request + instance.report_runless_asset_event(AssetMaterialization("A")) + result = evaluate_automation_conditions(defs=defs, instance=instance, cursor=result.cursor) + assert result.total_requested == 1 + + # don't keep requesting + result = evaluate_automation_conditions(defs=defs, instance=instance, cursor=result.cursor) + assert result.total_requested == 0 diff --git a/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/automation_condition_tests/builtins/test_failed_condition.py b/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/automation_condition_tests/builtins/test_failed_condition.py index aee586911594f..9f20b4eb9b2b5 100644 --- a/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/automation_condition_tests/builtins/test_failed_condition.py +++ b/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/automation_condition_tests/builtins/test_failed_condition.py @@ -16,7 +16,7 @@ def test_failed_unpartitioned() -> None: state = AutomationConditionScenarioState( - one_asset, automation_condition=AutomationCondition.failed() + one_asset, automation_condition=AutomationCondition.execution_failed() ) # no failed partitions @@ -36,7 +36,7 @@ def test_failed_unpartitioned() -> None: def test_in_progress_static_partitioned() -> None: state = AutomationConditionScenarioState( - one_asset, automation_condition=AutomationCondition.failed() + one_asset, automation_condition=AutomationCondition.execution_failed() ).with_asset_properties(partitions_def=two_partitions_def) # no failed_runs diff --git a/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/automation_condition_tests/builtins/test_newly_updated_condition.py b/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/automation_condition_tests/builtins/test_newly_updated_condition.py index fa2eebba3d15e..712b30c7c22c3 100644 --- a/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/automation_condition_tests/builtins/test_newly_updated_condition.py +++ b/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/automation_condition_tests/builtins/test_newly_updated_condition.py @@ -1,4 +1,13 @@ -from dagster import AutomationCondition +from dagster import ( + AssetCheckResult, + AssetMaterialization, + AutomationCondition, + DagsterInstance, + Definitions, + asset, + asset_check, + evaluate_automation_conditions, +) from dagster_tests.definitions_tests.declarative_automation_tests.scenario_utils.automation_condition_scenario import ( AutomationConditionScenarioState, @@ -84,3 +93,41 @@ def test_newly_updated_condition_data_version() -> None: # no new data version state, result = state.evaluate("B") assert result.true_subset.size == 0 + + +def test_newly_updated_on_asset_check() -> None: + @asset + def A() -> None: ... + + @asset_check(asset=A, automation_condition=AutomationCondition.newly_updated()) + def foo_check() -> AssetCheckResult: + return AssetCheckResult(passed=True) + + defs = Definitions(assets=[A], asset_checks=[foo_check]) + instance = DagsterInstance.ephemeral() + check_job = defs.get_implicit_global_asset_job_def().get_subset( + asset_check_selection={foo_check.check_key} + ) + + # hasn't newly updated + result = evaluate_automation_conditions(defs=defs, instance=instance) + assert result.total_requested == 0 + + # now updates + check_job.execute_in_process(instance=instance) + result = evaluate_automation_conditions(defs=defs, instance=instance, cursor=result.cursor) + assert result.total_requested == 1 + + # no longer "newly updated" + result = evaluate_automation_conditions(defs=defs, instance=instance, cursor=result.cursor) + assert result.total_requested == 0 + + # now updates again + check_job.execute_in_process(instance=instance) + result = evaluate_automation_conditions(defs=defs, instance=instance, cursor=result.cursor) + assert result.total_requested == 1 + + # parent updated, doesn't matter + instance.report_runless_asset_event(AssetMaterialization("A")) + result = evaluate_automation_conditions(defs=defs, instance=instance, cursor=result.cursor) + assert result.total_requested == 0 diff --git a/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/automation_condition_tests/builtins/test_on_cron_condition.py b/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/automation_condition_tests/builtins/test_on_cron_condition.py index 3a2428c0af91a..34d5c1c073116 100644 --- a/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/automation_condition_tests/builtins/test_on_cron_condition.py +++ b/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/automation_condition_tests/builtins/test_on_cron_condition.py @@ -1,4 +1,14 @@ -from dagster import AutomationCondition +import datetime + +from dagster import ( + AssetMaterialization, + AutomationCondition, + DagsterInstance, + Definitions, + asset, + asset_check, + evaluate_automation_conditions, +) from dagster_tests.definitions_tests.declarative_automation_tests.scenario_utils.automation_condition_scenario import ( AutomationConditionScenarioState, @@ -113,3 +123,66 @@ def test_on_cron_hourly_partitioned() -> None: state = state.with_runs(run_request("A", "2020-02-02-01:00")) state, result = state.evaluate("B") assert result.true_subset.size == 1 + + +def test_on_cron_on_asset_check() -> None: + @asset + def A() -> None: ... + + @asset_check(asset=A, automation_condition=AutomationCondition.on_cron("@hourly")) + def foo_check() -> ...: ... + + current_time = datetime.datetime(2024, 8, 16, 4, 35) + defs = Definitions(assets=[A], asset_checks=[foo_check]) + instance = DagsterInstance.ephemeral() + + # hasn't passed a cron tick + result = evaluate_automation_conditions( + defs=defs, instance=instance, evaluation_time=current_time + ) + assert result.total_requested == 0 + + # now passed a cron tick, but parent hasn't been updated + current_time += datetime.timedelta(minutes=30) + result = evaluate_automation_conditions( + defs=defs, instance=instance, cursor=result.cursor, evaluation_time=current_time + ) + assert result.total_requested == 0 + + # now parent is updated, so fire + current_time += datetime.timedelta(minutes=1) + instance.report_runless_asset_event(AssetMaterialization("A")) + result = evaluate_automation_conditions( + defs=defs, instance=instance, cursor=result.cursor, evaluation_time=current_time + ) + assert result.total_requested == 1 + + # don't keep firing + current_time += datetime.timedelta(minutes=1) + result = evaluate_automation_conditions( + defs=defs, instance=instance, cursor=result.cursor, evaluation_time=current_time + ) + assert result.total_requested == 0 + + # ...even if the parent is updated again + current_time += datetime.timedelta(minutes=1) + instance.report_runless_asset_event(AssetMaterialization("A")) + result = evaluate_automation_conditions( + defs=defs, instance=instance, cursor=result.cursor, evaluation_time=current_time + ) + assert result.total_requested == 0 + + # new tick passes... + current_time += datetime.timedelta(hours=1) + result = evaluate_automation_conditions( + defs=defs, instance=instance, cursor=result.cursor, evaluation_time=current_time + ) + assert result.total_requested == 0 + + # and parent is updated + current_time += datetime.timedelta(minutes=1) + instance.report_runless_asset_event(AssetMaterialization("A")) + result = evaluate_automation_conditions( + defs=defs, instance=instance, cursor=result.cursor, evaluation_time=current_time + ) + assert result.total_requested == 1