From 888a66f94b1c787b6229ecfe5de9259eb73882a8 Mon Sep 17 00:00:00 2001 From: OwenKephart Date: Mon, 7 Oct 2024 16:45:43 -0700 Subject: [PATCH] Update first tick behavior of AutomationCondition.eager() (#24700) ## Summary & Motivation Updates the behavior of the `newly_true()` condition to optionally change its "baseline" assumption of what was true before the first evaluation. This allows us to better fulfill the promise of "only reacting to changes which happened after the condition was added to the asset", as we do for the "deps updated" part of the condition. In short, this provides a strong bit of protection against the "surprise backfill" issue, without having to add any new rate-limiting concepts to the system. ## How I Tested These Changes ## Changelog AutomationCondition.eager() will now only launch runs for missing partitions which become missing *after* the condition has been added to the asset. This avoids situations in which the eager policy kicks off a large amount of work when added to an asset with many missing historical static/dynamic partitions. - [x] `NEW` _(added new feature or capability)_ - [ ] `BUGFIX` _(fixed a bug)_ - [ ] `DOCS` _(added or updated documentation)_ --- .../test_asset_condition_evaluations.py | 2 +- .../automation_condition.py | 58 ++++--- .../automation_condition_tester.py | 6 +- .../automation_context.py | 4 +- .../legacy/legacy_context.py | 8 +- .../legacy/rule_condition.py | 2 +- .../operands/__init__.py | 1 + .../operands/slice_conditions.py | 22 +++ .../dagster/_core/definitions/partition.py | 4 +- .../definitions/time_window_partitions.py | 16 ++ .../builtins/test_eager_condition.py | 43 +++++- .../test_initial_evaluation_condition.py | 146 ++++++++++++++++++ .../builtins/test_newly_true_condition.py | 3 +- .../builtins/test_on_missing_condition.py | 20 ++- .../test_automation_condition_tester.py | 22 ++- .../fundamentals/test_result_value_hash.py | 22 +-- .../definitions/default_condition.py | 2 +- 17 files changed, 325 insertions(+), 56 deletions(-) create mode 100644 python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/automation_condition_tests/builtins/test_initial_evaluation_condition.py diff --git a/python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_asset_condition_evaluations.py b/python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_asset_condition_evaluations.py index 7054d2e44328b..0b22b30c30d14 100644 --- a/python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_asset_condition_evaluations.py +++ b/python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_asset_condition_evaluations.py @@ -676,7 +676,7 @@ def A() -> None: ... assert record["numRequested"] == 0 # all nodes in the tree - assert len(record["evaluationNodes"]) == 27 + assert len(record["evaluationNodes"]) == 28 rootNode = record["evaluationNodes"][0] assert rootNode["uniqueId"] == record["rootUniqueId"] diff --git a/python_modules/dagster/dagster/_core/definitions/declarative_automation/automation_condition.py b/python_modules/dagster/dagster/_core/definitions/declarative_automation/automation_condition.py index 88d0e5fd257fa..ab9a6944bb735 100644 --- a/python_modules/dagster/dagster/_core/definitions/declarative_automation/automation_condition.py +++ b/python_modules/dagster/dagster/_core/definitions/declarative_automation/automation_condition.py @@ -41,6 +41,7 @@ CodeVersionChangedCondition, CronTickPassedCondition, FailedAutomationCondition, + InitialEvaluationCondition, InLatestTimeWindowCondition, InProgressAutomationCondition, MissingAutomationCondition, @@ -134,7 +135,7 @@ def get_snapshot( self, *, parent_unique_id: Optional[str] = None, index: Optional[int] = None ) -> AutomationConditionSnapshot: """Returns a serializable snapshot of the entire AutomationCondition tree.""" - unique_id = self.get_unique_id(parent_unique_id=parent_unique_id, index=index) + unique_id = self.get_node_unique_id(parent_unique_id=parent_unique_id, index=index) node_snapshot = self.get_node_snapshot(unique_id) children = [ child.get_snapshot(parent_unique_id=unique_id, index=i) @@ -142,23 +143,26 @@ def get_snapshot( ] return AutomationConditionSnapshot(node_snapshot=node_snapshot, children=children) - def get_unique_id(self, *, parent_unique_id: Optional[str], index: Optional[int]) -> str: + def get_node_unique_id(self, *, parent_unique_id: Optional[str], index: Optional[int]) -> str: """Returns a unique identifier for this condition within the broader condition tree.""" parts = [str(parent_unique_id), str(index), self.__class__.__name__, self.description] return non_secure_md5_hash_str("".join(parts).encode()) - def get_hash( - self, *, parent_unique_id: Optional[str] = None, index: Optional[int] = None - ) -> int: - """Generates a hash based off of the unique ids of all children.""" - unique_id = self.get_unique_id(parent_unique_id=parent_unique_id, index=index) - hashes = [hash(unique_id)] - for i, child in enumerate(self.children): - hashes.append(child.get_hash(parent_unique_id=unique_id, index=i)) - return hash(tuple(hashes)) + def get_unique_id( + self, *, parent_node_unique_id: Optional[str] = None, index: Optional[int] = None + ) -> str: + """Returns a unique identifier for the entire subtree.""" + node_unique_id = self.get_node_unique_id( + parent_unique_id=parent_node_unique_id, index=index + ) + child_unique_ids = [ + child.get_unique_id(parent_node_unique_id=node_unique_id, index=i) + for i, child in enumerate(self.children) + ] + return non_secure_md5_hash_str("".join([node_unique_id, *child_unique_ids]).encode()) def __hash__(self) -> int: - return self.get_hash() + return hash(self.get_unique_id()) @property def has_rule_condition(self) -> bool: @@ -235,13 +239,15 @@ def newly_true(self) -> "NewlyTrueCondition[T_EntityKey]": def since_last_handled(self: "AutomationCondition[AssetKey]") -> "SinceCondition[AssetKey]": """Returns an AutomationCondition that is true if this condition has become true since the - last time this asset partition was requested or updated. + asset partition was last requested or updated, and since the last time this entity's + condition was modified. """ with disable_dagster_warnings(): return self.since( ( - AutomationCondition[AssetKey].newly_requested() - | AutomationCondition[AssetKey].newly_updated() + AutomationCondition.newly_requested() + | AutomationCondition.newly_updated() + | AutomationCondition.initial_evaluation() ).with_label("handled") ) @@ -364,6 +370,17 @@ def failed() -> "FailedAutomationCondition": return FailedAutomationCondition() + @public + @experimental + @staticmethod + def initial_evaluation() -> "InitialEvaluationCondition": + """Returns an AutomationCondition that is true on the first evaluation of the expression.""" + from dagster._core.definitions.declarative_automation.operands import ( + InitialEvaluationCondition, + ) + + return InitialEvaluationCondition() + @public @experimental @staticmethod @@ -507,7 +524,8 @@ def eager() -> "AndAutomationCondition[AssetKey]": """Returns an AutomationCondition which will cause missing asset partitions to be materialized, and will materialize asset partitions whenever their parents are updated. - For time partitioned assets, only the latest time partition will be considered. + Will only materialize missing partitions if they become missing after this condition is + added to an asset. For time partitioned assets, only the latest time partition will be considered. This will never evaluate to true if the asset has any upstream partitions which are missing or part of an in progress run, and will never evaluate to true if the provided asset partition @@ -552,7 +570,8 @@ def on_missing() -> "AndAutomationCondition[AssetKey]": """Returns an AutomationCondition which will cause missing asset partitions to be materialized as soon as possible, after all of their dependencies have been materialized. - For time partitioned assets, only the latest time partition will be considered. + Will only materialize missing partitions if they become missing after this condition is + added to an asset. For time partitioned assets, only the latest time partition will be considered. """ with disable_dagster_warnings(): return ( @@ -561,7 +580,7 @@ def on_missing() -> "AndAutomationCondition[AssetKey]": AutomationCondition.missing() .newly_true() .since_last_handled() - .with_label("missing_since_last_requested") + .with_label("missing_since_last_handled") ) & ~AutomationCondition.any_deps_missing() ).with_label("on_missing") @@ -592,9 +611,6 @@ def with_label(self, label: Optional[str]) -> Self: """Returns a copy of this AutomationCondition with a human-readable label.""" return copy(self, label=label) - def __hash__(self) -> int: - return self.get_hash() - class AutomationResult(Generic[T_EntityKey]): """The result of evaluating an AutomationCondition.""" diff --git a/python_modules/dagster/dagster/_core/definitions/declarative_automation/automation_condition_tester.py b/python_modules/dagster/dagster/_core/definitions/declarative_automation/automation_condition_tester.py index fb63b409cbc5c..a7c3ba805607c 100644 --- a/python_modules/dagster/dagster/_core/definitions/declarative_automation/automation_condition_tester.py +++ b/python_modules/dagster/dagster/_core/definitions/declarative_automation/automation_condition_tester.py @@ -15,6 +15,7 @@ ) from dagster._core.definitions.definitions_class import Definitions from dagster._core.instance import DagsterInstance +from dagster._serdes.serdes import deserialize_value, serialize_value class EvaluateAutomationConditionsResult: @@ -126,7 +127,10 @@ def my_dbt_assets(context: AssetExecutionContext, dbt: DbtCliResource): evaluation_time=evaluation_time, allow_backfills=False, logger=logging.getLogger("dagster.automation_condition_tester"), - cursor=cursor or AssetDaemonCursor.empty(), + # round-trip the provided cursor to simulate actual usage + cursor=deserialize_value(serialize_value(cursor), AssetDaemonCursor) + if cursor + else AssetDaemonCursor.empty(), ) results, requested_subsets = evaluator.evaluate() cursor = AssetDaemonCursor( diff --git a/python_modules/dagster/dagster/_core/definitions/declarative_automation/automation_context.py b/python_modules/dagster/dagster/_core/definitions/declarative_automation/automation_context.py index 3d4d73c6aeb94..67f3e0b87ec28 100644 --- a/python_modules/dagster/dagster/_core/definitions/declarative_automation/automation_context.py +++ b/python_modules/dagster/dagster/_core/definitions/declarative_automation/automation_context.py @@ -68,7 +68,7 @@ def create(key: EntityKey, evaluator: "AutomationConditionEvaluator") -> "Automa condition = check.not_none( evaluator.asset_graph.get(key).automation_condition or evaluator.default_condition ) - condition_unqiue_id = condition.get_unique_id(parent_unique_id=None, index=None) + condition_unqiue_id = condition.get_node_unique_id(parent_unique_id=None, index=None) if condition.has_rule_condition and evaluator.allow_backfills: raise DagsterInvalidDefinitionError( @@ -96,7 +96,7 @@ def for_child_condition( child_index: int, candidate_subset: EntitySubset[U_EntityKey], ) -> "AutomationContext[U_EntityKey]": - condition_unqiue_id = child_condition.get_unique_id( + condition_unqiue_id = child_condition.get_node_unique_id( parent_unique_id=self.condition_unique_id, index=child_index ) return AutomationContext( diff --git a/python_modules/dagster/dagster/_core/definitions/declarative_automation/legacy/legacy_context.py b/python_modules/dagster/dagster/_core/definitions/declarative_automation/legacy/legacy_context.py index 06813b09b5527..be1839cad1929 100644 --- a/python_modules/dagster/dagster/_core/definitions/declarative_automation/legacy/legacy_context.py +++ b/python_modules/dagster/dagster/_core/definitions/declarative_automation/legacy/legacy_context.py @@ -99,7 +99,7 @@ def create(asset_key: AssetKey, evaluator: "AutomationConditionEvaluator"): condition=condition, cursor=cursor, node_cursor=cursor.node_cursors_by_unique_id.get( - condition.get_unique_id(parent_unique_id=None, index=0) + condition.get_node_unique_id(parent_unique_id=None, index=0) ) if cursor else None, @@ -242,17 +242,17 @@ def _previous_tick_discarded_subset(self) -> Optional[SerializableEntitySubset[A # Or(MaterializeCond, Not(SkipCond), Not(DiscardCond)) if len(self.condition.children) != 3: return None - unique_id = self.condition.get_unique_id(parent_unique_id=None, index=None) + unique_id = self.condition.get_node_unique_id(parent_unique_id=None, index=None) # get Not(DiscardCond) not_discard_condition = self.condition.children[2] - unique_id = not_discard_condition.get_unique_id(parent_unique_id=unique_id, index=2) + unique_id = not_discard_condition.get_node_unique_id(parent_unique_id=unique_id, index=2) if not isinstance(not_discard_condition, NotAutomationCondition): return None # get DiscardCond discard_condition = not_discard_condition.children[0] - unique_id = discard_condition.get_unique_id(parent_unique_id=unique_id, index=0) + unique_id = discard_condition.get_node_unique_id(parent_unique_id=unique_id, index=0) if not isinstance(discard_condition, RuleCondition) or not isinstance( discard_condition.rule, DiscardOnMaxMaterializationsExceededRule ): diff --git a/python_modules/dagster/dagster/_core/definitions/declarative_automation/legacy/rule_condition.py b/python_modules/dagster/dagster/_core/definitions/declarative_automation/legacy/rule_condition.py index 72028c7687a4d..4cddeb680e87e 100644 --- a/python_modules/dagster/dagster/_core/definitions/declarative_automation/legacy/rule_condition.py +++ b/python_modules/dagster/dagster/_core/definitions/declarative_automation/legacy/rule_condition.py @@ -23,7 +23,7 @@ class RuleCondition(BuiltinAutomationCondition[AssetKey]): rule: AutoMaterializeRule - def get_unique_id(self, *, parent_unique_id: Optional[str], index: Optional[str]) -> str: + def get_node_unique_id(self, *, parent_unique_id: Optional[str], index: Optional[str]) -> str: # preserves old (bad) behavior of not including the parent_unique_id to avoid inavlidating # old serialized information parts = [self.rule.__class__.__name__, self.description] diff --git a/python_modules/dagster/dagster/_core/definitions/declarative_automation/operands/__init__.py b/python_modules/dagster/dagster/_core/definitions/declarative_automation/operands/__init__.py index de6ba915cbd4e..7ccdcc6f73380 100644 --- a/python_modules/dagster/dagster/_core/definitions/declarative_automation/operands/__init__.py +++ b/python_modules/dagster/dagster/_core/definitions/declarative_automation/operands/__init__.py @@ -4,6 +4,7 @@ from dagster._core.definitions.declarative_automation.operands.slice_conditions import ( CronTickPassedCondition as CronTickPassedCondition, FailedAutomationCondition as FailedAutomationCondition, + InitialEvaluationCondition as InitialEvaluationCondition, InLatestTimeWindowCondition as InLatestTimeWindowCondition, InProgressAutomationCondition as InProgressAutomationCondition, MissingAutomationCondition as MissingAutomationCondition, diff --git a/python_modules/dagster/dagster/_core/definitions/declarative_automation/operands/slice_conditions.py b/python_modules/dagster/dagster/_core/definitions/declarative_automation/operands/slice_conditions.py index 7dfcf6646b6ec..ab7c9cdde61d4 100644 --- a/python_modules/dagster/dagster/_core/definitions/declarative_automation/operands/slice_conditions.py +++ b/python_modules/dagster/dagster/_core/definitions/declarative_automation/operands/slice_conditions.py @@ -15,6 +15,28 @@ from dagster._utils.schedules import reverse_cron_string_iterator +@record +@whitelist_for_serdes +class InitialEvaluationCondition(BuiltinAutomationCondition): + """Condition to determine if this is the initial evaluation of a given AutomationCondition.""" + + @property + def description(self) -> str: + return "Initial evaluation" + + @property + def name(self) -> str: + return "initial_evaluation" + + def evaluate(self, context: AutomationContext) -> AutomationResult: + condition_tree_id = context.root_context.condition.get_unique_id() + if context.previous_true_subset is None or condition_tree_id != context.cursor: + subset = context.candidate_subset + else: + subset = context.get_empty_subset() + return AutomationResult(context, subset, cursor=condition_tree_id) + + @record class SubsetAutomationCondition(BuiltinAutomationCondition[T_EntityKey]): """Base class for simple conditions which compute a simple subset of the asset graph.""" diff --git a/python_modules/dagster/dagster/_core/definitions/partition.py b/python_modules/dagster/dagster/_core/definitions/partition.py index fd4d344892c6e..420e4ece45641 100644 --- a/python_modules/dagster/dagster/_core/definitions/partition.py +++ b/python_modules/dagster/dagster/_core/definitions/partition.py @@ -1337,6 +1337,6 @@ def empty_subset( return self.partitions_def.empty_subset() def to_serializable_subset(self) -> PartitionsSubset: - return self.partitions_def.subset_with_partition_keys( - self.get_partition_keys() + return self.partitions_def.subset_with_all_partitions( + current_time=self.current_time, dynamic_partitions_store=self.dynamic_partitions_store ).to_serializable_subset() diff --git a/python_modules/dagster/dagster/_core/definitions/time_window_partitions.py b/python_modules/dagster/dagster/_core/definitions/time_window_partitions.py index 5369f318bb109..8d3982cf2bd9c 100644 --- a/python_modules/dagster/dagster/_core/definitions/time_window_partitions.py +++ b/python_modules/dagster/dagster/_core/definitions/time_window_partitions.py @@ -964,6 +964,22 @@ def partitions_subset_class(self) -> Type["PartitionsSubset"]: def empty_subset(self) -> "PartitionsSubset": return self.partitions_subset_class.empty_subset(self) + def subset_with_all_partitions( + self, + current_time: Optional[datetime] = None, + dynamic_partitions_store: Optional[DynamicPartitionsStore] = None, + ) -> "PartitionsSubset": + first_window = self.get_first_partition_window(current_time) + last_window = self.get_last_partition_window(current_time) + windows = ( + [] + if first_window is None or last_window is None + else [TimeWindow(first_window.start, last_window.end)] + ) + return TimeWindowPartitionsSubset( + partitions_def=self, num_partitions=None, included_time_windows=windows + ) + def get_serializable_unique_identifier( self, dynamic_partitions_store: Optional[DynamicPartitionsStore] = None ) -> str: diff --git a/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/automation_condition_tests/builtins/test_eager_condition.py b/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/automation_condition_tests/builtins/test_eager_condition.py index 3fe12a89fc30d..6f65a9baf17ca 100644 --- a/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/automation_condition_tests/builtins/test_eager_condition.py +++ b/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/automation_condition_tests/builtins/test_eager_condition.py @@ -1,4 +1,12 @@ -from dagster import AutomationCondition +from dagster import ( + AssetKey, + AutomationCondition, + Definitions, + StaticPartitionsDefinition, + asset, + evaluate_automation_conditions, +) +from dagster._core.instance_for_test import instance_for_test from dagster_tests.definitions_tests.declarative_automation_tests.scenario_utils.automation_condition_scenario import ( AutomationConditionScenarioState, @@ -108,3 +116,36 @@ def test_eager_hourly_partitioned() -> None: # B does not get immediately requested again state, result = state.evaluate("B") assert result.true_subset.size == 0 + + +def test_eager_static_partitioned() -> None: + two_partitions = StaticPartitionsDefinition(["a", "b"]) + four_partitions = StaticPartitionsDefinition(["a", "b", "c", "d"]) + + def _get_defs(pd: StaticPartitionsDefinition) -> Definitions: + @asset(partitions_def=pd, automation_condition=AutomationCondition.eager()) + def A() -> None: ... + + @asset(partitions_def=pd, automation_condition=AutomationCondition.eager()) + def B() -> None: ... + + return Definitions(assets=[A, B]) + + with instance_for_test() as instance: + # no "surprise backfill" + result = evaluate_automation_conditions(defs=_get_defs(two_partitions), instance=instance) + assert result.total_requested == 0 + + # now add two more partitions to the definition, kick off a run for those + result = evaluate_automation_conditions( + defs=_get_defs(four_partitions), instance=instance, cursor=result.cursor + ) + assert result.total_requested == 4 + assert result.get_requested_partitions(AssetKey("A")) == {"c", "d"} + assert result.get_requested_partitions(AssetKey("B")) == {"c", "d"} + + # already requested, no more + result = evaluate_automation_conditions( + defs=_get_defs(four_partitions), instance=instance, cursor=result.cursor + ) + assert result.total_requested == 0 diff --git a/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/automation_condition_tests/builtins/test_initial_evaluation_condition.py b/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/automation_condition_tests/builtins/test_initial_evaluation_condition.py new file mode 100644 index 0000000000000..e760d5905f864 --- /dev/null +++ b/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/automation_condition_tests/builtins/test_initial_evaluation_condition.py @@ -0,0 +1,146 @@ +from typing import Optional, Sequence + +import dagster as dg +from dagster._core.definitions.declarative_automation.automation_condition_tester import ( + EvaluateAutomationConditionsResult, +) + + +def test_update_on_partitions_def_change() -> None: + """We should update whenever the partitions definition changes.""" + + def _get_defs(pd: Optional[dg.PartitionsDefinition]) -> dg.Definitions: + @dg.asset( + partitions_def=pd, automation_condition=dg.AutomationCondition.initial_evaluation() + ) + def a() -> None: ... + + return dg.Definitions(assets=[a]) + + instance = dg.DagsterInstance.ephemeral() + unpartitioned_defs = _get_defs(None) + static_partitioned_defs = _get_defs(dg.StaticPartitionsDefinition(["a", "b"])) + + result = dg.evaluate_automation_conditions(defs=unpartitioned_defs, instance=instance) + assert result.total_requested == 1 + + result = dg.evaluate_automation_conditions( + defs=unpartitioned_defs, instance=instance, cursor=result.cursor + ) + assert result.total_requested == 0 + + result = dg.evaluate_automation_conditions( + defs=unpartitioned_defs, instance=instance, cursor=result.cursor + ) + assert result.total_requested == 0 + + # change partitions definition + result = dg.evaluate_automation_conditions( + defs=static_partitioned_defs, instance=instance, cursor=result.cursor + ) + assert result.total_requested == 2 + + result = dg.evaluate_automation_conditions( + defs=static_partitioned_defs, instance=instance, cursor=result.cursor + ) + assert result.total_requested == 0 + + # change it back + result = dg.evaluate_automation_conditions( + defs=unpartitioned_defs, instance=instance, cursor=result.cursor + ) + assert result.total_requested == 1 + + +def _get_initial_evaluation_count(result: EvaluateAutomationConditionsResult) -> int: + def _result_iter(r): + yield r + for cr in r.child_results: + yield from _result_iter(cr) + + assert len(result.results) == 1 + initial_evaluation_result = next( + r + for r in _result_iter(result.results[0]) + if type(r.condition) == type(dg.AutomationCondition.initial_evaluation()) + ) + return initial_evaluation_result.true_subset.size + + +def test_update_on_condition_change() -> None: + """We should update whenever the condition is changed in any way.""" + + def _get_defs(ac: dg.AutomationCondition) -> dg.Definitions: + @dg.asset(automation_condition=ac, deps=["up"]) + def a() -> None: ... + + return dg.Definitions(assets=[a]) + + instance = dg.DagsterInstance.ephemeral() + base_condition = dg.AutomationCondition.initial_evaluation() + + # initial evaluation + base_result = dg.evaluate_automation_conditions( + defs=_get_defs(base_condition), instance=instance + ) + assert _get_initial_evaluation_count(base_result) == 1 + + for condition in [ + # add condition before + ~dg.AutomationCondition.any_deps_in_progress() & base_condition, + # add condition after + base_condition & ~dg.AutomationCondition.any_deps_missing(), + # sandwich! + ~dg.AutomationCondition.any_deps_in_progress() + & base_condition + & dg.AutomationCondition.code_version_changed(), + # weird cases + base_condition.newly_true(), + base_condition.since(base_condition), + dg.AutomationCondition.any_deps_match(base_condition), + ]: + # first tick, should recognize the change + result = dg.evaluate_automation_conditions( + # note: doing relative to the base result cursor + defs=_get_defs(condition), + instance=instance, + cursor=base_result.cursor, + ) + assert _get_initial_evaluation_count(result) == 1 + + # second tick, new normal + result = dg.evaluate_automation_conditions( + defs=_get_defs(condition), instance=instance, cursor=result.cursor + ) + assert _get_initial_evaluation_count(result) == 0 + + +def test_no_update_on_new_deps() -> None: + def _get_defs(deps: Sequence[str]) -> dg.Definitions: + @dg.multi_asset(specs=[dg.AssetSpec(d) for d in deps]) + def m(): ... + + @dg.asset( + automation_condition=dg.AutomationCondition.initial_evaluation() + & dg.AutomationCondition.any_deps_in_progress(), + deps=deps, + ) + def downstream() -> None: ... + + return dg.Definitions(assets=[downstream, m]) + + instance = dg.DagsterInstance.ephemeral() + + # initial evaluation + base_result = dg.evaluate_automation_conditions(defs=_get_defs(["a", "b"]), instance=instance) + assert _get_initial_evaluation_count(base_result) == 1 + + for deps in [[], ["a"], ["c"], ["a", "b", "c"]]: + # no changes should be recognized + result = dg.evaluate_automation_conditions( + # note: doing relative to the base result cursor + defs=_get_defs(deps), + instance=instance, + cursor=base_result.cursor, + ) + assert _get_initial_evaluation_count(result) == 0 diff --git a/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/automation_condition_tests/builtins/test_newly_true_condition.py b/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/automation_condition_tests/builtins/test_newly_true_condition.py index 212f4b69ba7ae..b2e378d8a5cc4 100644 --- a/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/automation_condition_tests/builtins/test_newly_true_condition.py +++ b/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/automation_condition_tests/builtins/test_newly_true_condition.py @@ -1,5 +1,4 @@ from dagster._core.definitions.asset_key import AssetKey -from dagster._core.definitions.declarative_automation.operators import NewlyTrueCondition from dagster._core.definitions.events import AssetKeyPartitionKey from dagster_tests.definitions_tests.declarative_automation_tests.automation_condition_tests.builtins.test_dep_condition import ( @@ -16,7 +15,7 @@ def test_newly_true_condition() -> None: inner_condition, true_set = get_hardcoded_condition() - condition = NewlyTrueCondition(operand=inner_condition) + condition = inner_condition.newly_true() state = AutomationConditionScenarioState(one_asset, automation_condition=condition) # nothing true diff --git a/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/automation_condition_tests/builtins/test_on_missing_condition.py b/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/automation_condition_tests/builtins/test_on_missing_condition.py index b2305e310d14f..578a3b78bb0f5 100644 --- a/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/automation_condition_tests/builtins/test_on_missing_condition.py +++ b/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/automation_condition_tests/builtins/test_on_missing_condition.py @@ -1,4 +1,6 @@ from dagster import AutomationCondition +from dagster._core.definitions.asset_key import AssetKey +from dagster._core.definitions.events import AssetMaterialization from dagster_tests.definitions_tests.declarative_automation_tests.scenario_utils.automation_condition_scenario import ( AutomationConditionScenarioState, @@ -19,13 +21,19 @@ def test_on_missing_unpartitioned() -> None: ensure_empty_result=False, ) - # parent hasn't materialized yet + # B starts off as materialized + state.instance.report_runless_asset_event(AssetMaterialization(asset_key=AssetKey("B"))) state, result = state.evaluate("B") assert result.true_subset.size == 0 - # parent materialized, now can execute + # parent materialized, now could execute, but B is not missing state = state.with_runs(run_request("A")) state, result = state.evaluate("B") + assert result.true_subset.size == 0 + + # now wipe B so that it is newly missing, should update + state.instance.wipe_assets([AssetKey("B")]) + state, result = state.evaluate("B") assert result.true_subset.size == 1 # B has not yet materialized, but it has been requested, so don't request again @@ -65,13 +73,15 @@ def test_on_missing_hourly_partitioned() -> None: ensure_empty_result=False, ) .with_asset_properties(partitions_def=hourly_partitions_def) - .with_current_time("2020-02-02T01:05:00") + .with_current_time("2020-02-02T00:05:00") ) # parent hasn't updated yet state, result = state.evaluate("B") assert result.true_subset.size == 0 + state = state.with_current_time_advanced(hours=1) + # historical parent updated, doesn't matter state = state.with_runs(run_request("A", "2019-07-05-00:00")) state, result = state.evaluate("B") @@ -112,13 +122,15 @@ def test_on_missing_without_time_limit() -> None: ensure_empty_result=False, ) .with_asset_properties(partitions_def=hourly_partitions_def) - .with_current_time("2020-02-02T01:05:00") + .with_current_time("2019-02-02T01:05:00") ) # parent hasn't updated yet state, result = state.evaluate("B") assert result.true_subset.size == 0 + state = state.with_current_time_advanced(years=1) + # historical parents updated, matters state = state.with_runs(run_request("A", "2019-07-05-00:00")) state = state.with_runs(run_request("A", "2019-04-05-00:00")) diff --git a/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/automation_condition_tests/fundamentals/test_automation_condition_tester.py b/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/automation_condition_tests/fundamentals/test_automation_condition_tester.py index 7e6881d1e774d..dc8612a59f4b9 100644 --- a/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/automation_condition_tests/fundamentals/test_automation_condition_tester.py +++ b/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/automation_condition_tests/fundamentals/test_automation_condition_tester.py @@ -21,21 +21,19 @@ @asset( partitions_def=HourlyPartitionsDefinition("2020-01-01-00:00"), - auto_materialize_policy=AutomationCondition.eager().as_auto_materialize_policy(), + automation_condition=AutomationCondition.eager(), ) def hourly() -> None: ... @asset( partitions_def=StaticPartitionsDefinition(["a", "b", "c"]), - auto_materialize_policy=AutomationCondition.eager().as_auto_materialize_policy(), + automation_condition=AutomationCondition.on_missing(), ) def static() -> None: ... -@asset( - auto_materialize_policy=AutomationCondition.eager().as_auto_materialize_policy(), -) +@asset(automation_condition=AutomationCondition.eager(), deps=["upstream"]) def unpartitioned() -> None: ... @@ -81,11 +79,19 @@ def noop(): ... def test_basic_regular_defs() -> None: instance = DagsterInstance.ephemeral() + result = evaluate_automation_conditions( + defs=defs, + asset_selection=AssetSelection.assets(unpartitioned), + instance=instance, + ) + assert result.total_requested == 0 + instance.report_runless_asset_event(AssetMaterialization("upstream")) result = evaluate_automation_conditions( defs=defs, asset_selection=AssetSelection.assets(unpartitioned), instance=instance, + cursor=result.cursor, ) assert result.total_requested == 1 @@ -102,6 +108,12 @@ def test_basic_assets_defs() -> None: instance = DagsterInstance.ephemeral() result = evaluate_automation_conditions(defs=[unpartitioned], instance=instance) + assert result.total_requested == 0 + + instance.report_runless_asset_event(AssetMaterialization(asset_key=AssetKey("upstream"))) + result = evaluate_automation_conditions( + defs=[unpartitioned], instance=instance, cursor=result.cursor + ) assert result.total_requested == 1 result = evaluate_automation_conditions( diff --git a/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/automation_condition_tests/fundamentals/test_result_value_hash.py b/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/automation_condition_tests/fundamentals/test_result_value_hash.py index b3c5bbb7ee333..6a32bda5c5dfa 100644 --- a/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/automation_condition_tests/fundamentals/test_result_value_hash.py +++ b/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/automation_condition_tests/fundamentals/test_result_value_hash.py @@ -31,18 +31,18 @@ [ # cron condition returns a unique value hash if parents change, if schedule changes, if the # partitions def changes, or if an asset is materialized - ("9a5947c1196f3311a1039aecb90e04f5", SC.on_cron("0 * * * *"), one_parent, False), - ("bdf32a2d014be85c68843f55776bd70c", SC.on_cron("0 * * * *"), one_parent, True), - ("be9c8c64a822d0dbc49e462a3aabf4d8", SC.on_cron("0 0 * * *"), one_parent, False), - ("dfed65a9a20ff1f10e60ff7f0397ffb1", SC.on_cron("0 * * * *"), one_parent_daily, False), - ("44305b43d3719344819f9bda178f4588", SC.on_cron("0 * * * *"), two_parents, False), - ("1d902e0c59648e5022ae84bd6a1b1c49", SC.on_cron("0 * * * *"), two_parents_daily, False), + ("ad228f0044da1efba407e794c845e858", SC.on_cron("0 * * * *"), one_parent, False), + ("f34de3cd3e1ab283a95a892192437076", SC.on_cron("0 * * * *"), one_parent, True), + ("d9533b4eb0aad1798d5da85520b9852c", SC.on_cron("0 0 * * *"), one_parent, False), + ("8a233d38e569faba1470b0717c28fbee", SC.on_cron("0 * * * *"), one_parent_daily, False), + ("e8fa53c550e99edc1346a1f80979cddd", SC.on_cron("0 * * * *"), two_parents, False), + ("5c58fb8fc117d69b32e734c45af219ea", SC.on_cron("0 * * * *"), two_parents_daily, False), # same as above - ("0e5ef287a89ed9e6c08a25e4920ee4f3", SC.eager(), one_parent, False), - ("42a1a04bf1fa76fd723fb60294bff6ae", SC.eager(), one_parent, True), - ("bfb3170339eea63e425d375d2c187c34", SC.eager(), one_parent_daily, False), - ("0e9b7774d09132a19b8d8674fdca500d", SC.eager(), two_parents, False), - ("3cf82286c216f9535842fb2852b88838", SC.eager(), two_parents_daily, False), + ("678e0a2be6bba89bd2d37fb432d8fb51", SC.eager(), one_parent, False), + ("72bd068363441e02d67b3407fe3e9cae", SC.eager(), one_parent, True), + ("4f0e2e38131ae91b1b9408e3cd549dd0", SC.eager(), one_parent_daily, False), + ("00ecedd77a8d887940856950c556c7d1", SC.eager(), two_parents, False), + ("6ad1fd331c63c75e17572ec60b9c27b5", SC.eager(), two_parents_daily, False), # missing condition is invariant to changes other than partitions def changes ("5c24ffc21af9983a4917b91290de8f5d", SC.missing(), one_parent, False), ("5c24ffc21af9983a4917b91290de8f5d", SC.missing(), one_parent, True), diff --git a/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/daemon_tests/definitions/default_condition.py b/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/daemon_tests/definitions/default_condition.py index e3735016b1f85..be9cf31cfb5a4 100644 --- a/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/daemon_tests/definitions/default_condition.py +++ b/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/daemon_tests/definitions/default_condition.py @@ -1,7 +1,7 @@ import dagster as dg -@dg.asset(automation_condition=dg.AutomationCondition.eager()) +@dg.asset(automation_condition=dg.AutomationCondition.missing().newly_true()) def eager_asset() -> None: ...