Skip to content

Commit

Permalink
[auto-materialize] Go back to using recursion in get_outdated_parents (
Browse files Browse the repository at this point in the history
…#17361)

## Summary & Motivation

This ends up being significantly more performant than the iterative
solution, and we've resolved the underlying recursion depth limit issue
by filtering out self dependencies

## How I Tested These Changes
  • Loading branch information
OwenKephart authored and shalabhc committed Oct 24, 2023
1 parent 7a14e1f commit 3fb1989
Show file tree
Hide file tree
Showing 4 changed files with 74 additions and 72 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,11 @@
from .freshness_policy import FreshnessPolicy
from .partition import PartitionsDefinition, PartitionsSubset
from .partition_key_range import PartitionKeyRange
from .partition_mapping import PartitionMapping, UpstreamPartitionsResult, infer_partition_mapping
from .partition_mapping import (
PartitionMapping,
UpstreamPartitionsResult,
infer_partition_mapping,
)
from .source_asset import SourceAsset
from .time_window_partitions import (
get_time_partition_key,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -476,6 +476,10 @@ def evaluate_for_asset(self, context: RuleEvaluationContext) -> RuleEvaluationRe
for parent in context.get_parents_that_will_not_be_materialized_on_current_tick(
asset_partition=candidate
):
if context.instance_queryer.have_ignorable_partition_mapping_for_outdated(
candidate.asset_key, parent.asset_key
):
continue
outdated_ancestors.update(
context.instance_queryer.get_outdated_ancestors(asset_partition=parent)
)
Expand Down
113 changes: 42 additions & 71 deletions python_modules/dagster/dagster/_utils/caching_instance_queryer.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import logging
from collections import defaultdict, deque
from collections import defaultdict
from datetime import datetime
from typing import (
TYPE_CHECKING,
Expand All @@ -20,7 +20,7 @@
import pendulum

import dagster._check as check
from dagster._core.definitions.asset_graph import AssetGraph, ToposortedPriorityQueue
from dagster._core.definitions.asset_graph import AssetGraph
from dagster._core.definitions.asset_graph_subset import AssetGraphSubset
from dagster._core.definitions.data_version import (
DATA_VERSION_TAG,
Expand All @@ -29,6 +29,7 @@
)
from dagster._core.definitions.events import AssetKey, AssetKeyPartitionKey
from dagster._core.definitions.partition import DynamicPartitionsDefinition, PartitionsSubset
from dagster._core.definitions.partition_mapping import AllPartitionMapping
from dagster._core.definitions.time_window_partitions import (
TimeWindowPartitionsDefinition,
get_time_partition_key,
Expand Down Expand Up @@ -81,7 +82,6 @@ def __init__(

self._evaluation_time = evaluation_time if evaluation_time else pendulum.now("UTC")

self._outdated_ancestors_cache: Dict[AssetKeyPartitionKey, Set[AssetKey]] = {}
self._respect_materialization_data_versions = (
self._instance.auto_materialize_respect_materialization_data_versions
)
Expand Down Expand Up @@ -854,82 +854,53 @@ def get_parent_asset_partitions_updated_after_child(
)
return updated_parents

def get_outdated_ancestors(
self, *, asset_partition: AssetKeyPartitionKey
) -> AbstractSet[AssetKey]:
"""Return the set of assets that are ancestors of the given asset partition and have parents
that have been updated more recently than they have.
def have_ignorable_partition_mapping_for_outdated(
self, asset_key: AssetKey, upstream_asset_key: AssetKey
) -> bool:
"""Returns whether the given assets have a partition mapping between them which can be
ignored in the context of calculating if an asset is outdated or not.
If two ancestors would be returned, but one of them is an ancestor of the other one, then
only the most upstream ancestor is included.
These mappings are ignored in cases where respecting them would require an unrealistic
number of upstream partitions to be in a 'good' state before allowing a downstream asset
to be considered up to date.
"""
if asset_partition in self._outdated_ancestors_cache:
return self._outdated_ancestors_cache[asset_partition]
# Self partition mappings impose constraints on all historical partitions
return asset_key == upstream_asset_key

@cached_method
def get_outdated_ancestors(
self, *, asset_partition: AssetKeyPartitionKey
) -> AbstractSet[AssetKey]:
if self.asset_graph.is_source(asset_partition.asset_key):
return set()

# First traverse upwards and gather any candidates that have not been previously added
# to the cache
visited: set[AssetKeyPartitionKey] = set()

queue: deque[AssetKeyPartitionKey] = deque()
queue.append(asset_partition)

while queue:
current_partition = queue.popleft()
visited.add(current_partition)

if self.asset_graph.is_source(current_partition.asset_key):
continue

parent_asset_partitions = self.asset_graph.get_parents_partitions(
dynamic_partitions_store=self,
current_time=self._evaluation_time,
asset_key=current_partition.asset_key,
partition_key=current_partition.partition_key,
).parent_partitions

for parent in parent_asset_partitions:
if (
parent not in visited
and parent not in self._outdated_ancestors_cache
# do not evaluate self-dependency asset partitions
and parent.asset_key != current_partition.asset_key
):
queue.append(parent)
parent_asset_partitions = self.asset_graph.get_parents_partitions(
dynamic_partitions_store=self,
current_time=self._evaluation_time,
asset_key=asset_partition.asset_key,
partition_key=asset_partition.partition_key,
).parent_partitions

# the set of parent keys which we don't need to check
ignored_parent_keys = {
parent
for parent in self.asset_graph.get_parents(asset_partition.asset_key)
if self.have_ignorable_partition_mapping_for_outdated(asset_partition.asset_key, parent)
}

# Toposort them so that at each iteration we can count on the cache being full for
# all of your parents, then update the cache for each node based on the parent's results
toposort_queue = ToposortedPriorityQueue(
self.asset_graph, visited, include_required_multi_assets=False
updated_parents = self.get_parent_asset_partitions_updated_after_child(
asset_partition=asset_partition,
parent_asset_partitions=parent_asset_partitions,
respect_materialization_data_versions=self._respect_materialization_data_versions,
ignored_parent_keys=ignored_parent_keys,
)

while len(toposort_queue) > 0:
candidates_unit = toposort_queue.dequeue()
for current_partition in candidates_unit:
parent_asset_partitions = self.asset_graph.get_parents_partitions(
dynamic_partitions_store=self,
current_time=self._evaluation_time,
asset_key=current_partition.asset_key,
partition_key=current_partition.partition_key,
).parent_partitions

updated_parents: AbstractSet[
AssetKeyPartitionKey
] = self.get_parent_asset_partitions_updated_after_child(
asset_partition=current_partition,
parent_asset_partitions=parent_asset_partitions,
respect_materialization_data_versions=self._respect_materialization_data_versions,
# ignore self-dependency asset partitions
ignored_parent_keys={current_partition.asset_key},
)

outdated_ancestors = {current_partition.asset_key} if updated_parents else set()
root_unreconciled_ancestors = {asset_partition.asset_key} if updated_parents else set()

for parent in set(parent_asset_partitions) - updated_parents:
outdated_ancestors.update(self._outdated_ancestors_cache.get(parent, set()))

self._outdated_ancestors_cache[current_partition] = outdated_ancestors
# recurse over parents
for parent in set(parent_asset_partitions) - updated_parents:
if parent.asset_key in ignored_parent_keys:
continue
root_unreconciled_ancestors.update(self.get_outdated_ancestors(asset_partition=parent))

return self._outdated_ancestors_cache[asset_partition]
return root_unreconciled_ancestors
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,14 @@
),
]

two_partitioned_to_three_unpartitioned = [
asset_def("partitioned1", partitions_def=two_partitions_partitions_def),
asset_def("partitioned2", ["partitioned1"], partitions_def=two_partitions_partitions_def),
asset_def("unpartitioned1", ["partitioned2"]),
asset_def("unpartitioned2", ["unpartitioned1"]),
asset_def("unpartitioned3", ["unpartitioned2"]),
]

# Asset that triggers an error within the daemon when you try to generate
# a plan to materialize it
error_asset = [
Expand Down Expand Up @@ -683,6 +691,21 @@
],
expected_run_requests=[run_request(["unpartitioned2"])],
),
"test_dont_allow_outdated_unpartitioned_parent": AssetReconciliationScenario(
assets=two_partitioned_to_three_unpartitioned,
asset_selection=AssetSelection.keys("unpartitioned3"),
unevaluated_runs=[
# fully backfill
run(["partitioned1", "partitioned2"], partition_key="a"),
run(["partitioned1", "partitioned2"], partition_key="b"),
run(["unpartitioned1", "unpartitioned2", "unpartitioned3"]),
# now unpartitioned2 is outdated...
run(["unpartitioned1"]),
],
# unpartioned3 cannot run even though unpartitioned2 updated because unpartitioned2 is
# outdated
expected_run_requests=[],
),
"test_wait_for_all_parents_updated": AssetReconciliationScenario(
assets=with_auto_materialize_policy(
vee,
Expand Down

0 comments on commit 3fb1989

Please sign in to comment.