From 3fb19898c3257b14b9f261b8bd22710e9d1a943a Mon Sep 17 00:00:00 2001 From: OwenKephart Date: Tue, 24 Oct 2023 16:40:39 -0700 Subject: [PATCH] [auto-materialize] Go back to using recursion in get_outdated_parents (#17361) ## Summary & Motivation This ends up being significantly more performant than the iterative solution, and we've resolved the underlying recursion depth limit issue by filtering out self dependencies ## How I Tested These Changes --- .../dagster/_core/definitions/asset_graph.py | 6 +- .../definitions/auto_materialize_rule.py | 4 + .../_utils/caching_instance_queryer.py | 113 +++++++----------- .../auto_materialize_policy_scenarios.py | 23 ++++ 4 files changed, 74 insertions(+), 72 deletions(-) diff --git a/python_modules/dagster/dagster/_core/definitions/asset_graph.py b/python_modules/dagster/dagster/_core/definitions/asset_graph.py index 8dfb889be017d..572285e188142 100644 --- a/python_modules/dagster/dagster/_core/definitions/asset_graph.py +++ b/python_modules/dagster/dagster/_core/definitions/asset_graph.py @@ -41,7 +41,11 @@ from .freshness_policy import FreshnessPolicy from .partition import PartitionsDefinition, PartitionsSubset from .partition_key_range import PartitionKeyRange -from .partition_mapping import PartitionMapping, UpstreamPartitionsResult, infer_partition_mapping +from .partition_mapping import ( + PartitionMapping, + UpstreamPartitionsResult, + infer_partition_mapping, +) from .source_asset import SourceAsset from .time_window_partitions import ( get_time_partition_key, diff --git a/python_modules/dagster/dagster/_core/definitions/auto_materialize_rule.py b/python_modules/dagster/dagster/_core/definitions/auto_materialize_rule.py index bb3157757e619..9801f6dd2c5e4 100644 --- a/python_modules/dagster/dagster/_core/definitions/auto_materialize_rule.py +++ b/python_modules/dagster/dagster/_core/definitions/auto_materialize_rule.py @@ -476,6 +476,10 @@ def evaluate_for_asset(self, context: RuleEvaluationContext) -> RuleEvaluationRe for parent in context.get_parents_that_will_not_be_materialized_on_current_tick( asset_partition=candidate ): + if context.instance_queryer.have_ignorable_partition_mapping_for_outdated( + candidate.asset_key, parent.asset_key + ): + continue outdated_ancestors.update( context.instance_queryer.get_outdated_ancestors(asset_partition=parent) ) diff --git a/python_modules/dagster/dagster/_utils/caching_instance_queryer.py b/python_modules/dagster/dagster/_utils/caching_instance_queryer.py index 197cd6cb7ce53..9d8e6549a8fcb 100644 --- a/python_modules/dagster/dagster/_utils/caching_instance_queryer.py +++ b/python_modules/dagster/dagster/_utils/caching_instance_queryer.py @@ -1,5 +1,5 @@ import logging -from collections import defaultdict, deque +from collections import defaultdict from datetime import datetime from typing import ( TYPE_CHECKING, @@ -20,7 +20,7 @@ import pendulum import dagster._check as check -from dagster._core.definitions.asset_graph import AssetGraph, ToposortedPriorityQueue +from dagster._core.definitions.asset_graph import AssetGraph from dagster._core.definitions.asset_graph_subset import AssetGraphSubset from dagster._core.definitions.data_version import ( DATA_VERSION_TAG, @@ -29,6 +29,7 @@ ) from dagster._core.definitions.events import AssetKey, AssetKeyPartitionKey from dagster._core.definitions.partition import DynamicPartitionsDefinition, PartitionsSubset +from dagster._core.definitions.partition_mapping import AllPartitionMapping from dagster._core.definitions.time_window_partitions import ( TimeWindowPartitionsDefinition, get_time_partition_key, @@ -81,7 +82,6 @@ def __init__( self._evaluation_time = evaluation_time if evaluation_time else pendulum.now("UTC") - self._outdated_ancestors_cache: Dict[AssetKeyPartitionKey, Set[AssetKey]] = {} self._respect_materialization_data_versions = ( self._instance.auto_materialize_respect_materialization_data_versions ) @@ -854,82 +854,53 @@ def get_parent_asset_partitions_updated_after_child( ) return updated_parents - def get_outdated_ancestors( - self, *, asset_partition: AssetKeyPartitionKey - ) -> AbstractSet[AssetKey]: - """Return the set of assets that are ancestors of the given asset partition and have parents - that have been updated more recently than they have. + def have_ignorable_partition_mapping_for_outdated( + self, asset_key: AssetKey, upstream_asset_key: AssetKey + ) -> bool: + """Returns whether the given assets have a partition mapping between them which can be + ignored in the context of calculating if an asset is outdated or not. - If two ancestors would be returned, but one of them is an ancestor of the other one, then - only the most upstream ancestor is included. + These mappings are ignored in cases where respecting them would require an unrealistic + number of upstream partitions to be in a 'good' state before allowing a downstream asset + to be considered up to date. """ - if asset_partition in self._outdated_ancestors_cache: - return self._outdated_ancestors_cache[asset_partition] + # Self partition mappings impose constraints on all historical partitions + return asset_key == upstream_asset_key + @cached_method + def get_outdated_ancestors( + self, *, asset_partition: AssetKeyPartitionKey + ) -> AbstractSet[AssetKey]: if self.asset_graph.is_source(asset_partition.asset_key): return set() - # First traverse upwards and gather any candidates that have not been previously added - # to the cache - visited: set[AssetKeyPartitionKey] = set() - - queue: deque[AssetKeyPartitionKey] = deque() - queue.append(asset_partition) - - while queue: - current_partition = queue.popleft() - visited.add(current_partition) - - if self.asset_graph.is_source(current_partition.asset_key): - continue - - parent_asset_partitions = self.asset_graph.get_parents_partitions( - dynamic_partitions_store=self, - current_time=self._evaluation_time, - asset_key=current_partition.asset_key, - partition_key=current_partition.partition_key, - ).parent_partitions - - for parent in parent_asset_partitions: - if ( - parent not in visited - and parent not in self._outdated_ancestors_cache - # do not evaluate self-dependency asset partitions - and parent.asset_key != current_partition.asset_key - ): - queue.append(parent) + parent_asset_partitions = self.asset_graph.get_parents_partitions( + dynamic_partitions_store=self, + current_time=self._evaluation_time, + asset_key=asset_partition.asset_key, + partition_key=asset_partition.partition_key, + ).parent_partitions + + # the set of parent keys which we don't need to check + ignored_parent_keys = { + parent + for parent in self.asset_graph.get_parents(asset_partition.asset_key) + if self.have_ignorable_partition_mapping_for_outdated(asset_partition.asset_key, parent) + } - # Toposort them so that at each iteration we can count on the cache being full for - # all of your parents, then update the cache for each node based on the parent's results - toposort_queue = ToposortedPriorityQueue( - self.asset_graph, visited, include_required_multi_assets=False + updated_parents = self.get_parent_asset_partitions_updated_after_child( + asset_partition=asset_partition, + parent_asset_partitions=parent_asset_partitions, + respect_materialization_data_versions=self._respect_materialization_data_versions, + ignored_parent_keys=ignored_parent_keys, ) - while len(toposort_queue) > 0: - candidates_unit = toposort_queue.dequeue() - for current_partition in candidates_unit: - parent_asset_partitions = self.asset_graph.get_parents_partitions( - dynamic_partitions_store=self, - current_time=self._evaluation_time, - asset_key=current_partition.asset_key, - partition_key=current_partition.partition_key, - ).parent_partitions - - updated_parents: AbstractSet[ - AssetKeyPartitionKey - ] = self.get_parent_asset_partitions_updated_after_child( - asset_partition=current_partition, - parent_asset_partitions=parent_asset_partitions, - respect_materialization_data_versions=self._respect_materialization_data_versions, - # ignore self-dependency asset partitions - ignored_parent_keys={current_partition.asset_key}, - ) - - outdated_ancestors = {current_partition.asset_key} if updated_parents else set() + root_unreconciled_ancestors = {asset_partition.asset_key} if updated_parents else set() - for parent in set(parent_asset_partitions) - updated_parents: - outdated_ancestors.update(self._outdated_ancestors_cache.get(parent, set())) - - self._outdated_ancestors_cache[current_partition] = outdated_ancestors + # recurse over parents + for parent in set(parent_asset_partitions) - updated_parents: + if parent.asset_key in ignored_parent_keys: + continue + root_unreconciled_ancestors.update(self.get_outdated_ancestors(asset_partition=parent)) - return self._outdated_ancestors_cache[asset_partition] + return root_unreconciled_ancestors diff --git a/python_modules/dagster/dagster_tests/definitions_tests/auto_materialize_tests/scenarios/auto_materialize_policy_scenarios.py b/python_modules/dagster/dagster_tests/definitions_tests/auto_materialize_tests/scenarios/auto_materialize_policy_scenarios.py index 309891ba32f00..c8fa0a98eec80 100644 --- a/python_modules/dagster/dagster_tests/definitions_tests/auto_materialize_tests/scenarios/auto_materialize_policy_scenarios.py +++ b/python_modules/dagster/dagster_tests/definitions_tests/auto_materialize_tests/scenarios/auto_materialize_policy_scenarios.py @@ -127,6 +127,14 @@ ), ] +two_partitioned_to_three_unpartitioned = [ + asset_def("partitioned1", partitions_def=two_partitions_partitions_def), + asset_def("partitioned2", ["partitioned1"], partitions_def=two_partitions_partitions_def), + asset_def("unpartitioned1", ["partitioned2"]), + asset_def("unpartitioned2", ["unpartitioned1"]), + asset_def("unpartitioned3", ["unpartitioned2"]), +] + # Asset that triggers an error within the daemon when you try to generate # a plan to materialize it error_asset = [ @@ -683,6 +691,21 @@ ], expected_run_requests=[run_request(["unpartitioned2"])], ), + "test_dont_allow_outdated_unpartitioned_parent": AssetReconciliationScenario( + assets=two_partitioned_to_three_unpartitioned, + asset_selection=AssetSelection.keys("unpartitioned3"), + unevaluated_runs=[ + # fully backfill + run(["partitioned1", "partitioned2"], partition_key="a"), + run(["partitioned1", "partitioned2"], partition_key="b"), + run(["unpartitioned1", "unpartitioned2", "unpartitioned3"]), + # now unpartitioned2 is outdated... + run(["unpartitioned1"]), + ], + # unpartioned3 cannot run even though unpartitioned2 updated because unpartitioned2 is + # outdated + expected_run_requests=[], + ), "test_wait_for_all_parents_updated": AssetReconciliationScenario( assets=with_auto_materialize_policy( vee,