Skip to content

Commit

Permalink
[external-assets] Eliminate redundant iteration over assets_defs in A…
Browse files Browse the repository at this point in the history
…ssetGraph construction (#20567)

## Summary & Motivation

Eliminate multiple iterations over assets defs in
AssetGraph.from_assets. This greatly speeds up AssetGraph construction
for large asset graphs.

## How I Tested These Changes

Timed `AssetGraph.from_assets` for the below set of defs. Time went from
~6.5s on my machine to <1ms.

```
from dagster import multi_asset, Definitions, AssetCheckSpec, AssetSpec, AssetKey

import random
import string

alphabet = string.ascii_lowercase

def asset_key():
    return AssetKey("".join(random.choice(alphabet) for _ in range(10)))

asset_keys = [asset_key() for _ in range(5000)]

def asset_check_spec():
    name = "".join(random.choice(alphabet) for _ in range(10))
    return AssetCheckSpec(name= name, asset=random.choice(asset_keys))

asset_check_specs = [asset_check_spec() for _ in range(5000)]

@multi_asset(specs=[AssetSpec(k) for k in asset_keys], check_specs=asset_check_specs)
def hoorah(): ...


defs = Definitions(assets=[hoorah])
```
  • Loading branch information
smackesey authored and PedramNavid committed Mar 28, 2024
1 parent e604e73 commit 5b9a78a
Showing 1 changed file with 19 additions and 16 deletions.
35 changes: 19 additions & 16 deletions python_modules/dagster/dagster/_core/definitions/asset_graph.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from collections import defaultdict
from functools import cached_property
from typing import AbstractSet, Iterable, Mapping, Optional, Sequence, Union
from typing import AbstractSet, DefaultDict, Dict, Iterable, Mapping, Optional, Sequence, Set, Union

from dagster._core.definitions.asset_check_spec import AssetCheckKey
from dagster._core.definitions.asset_spec import (
Expand Down Expand Up @@ -146,15 +147,15 @@ def io_manager_key(self) -> str:


class AssetGraph(BaseAssetGraph[AssetNode]):
_asset_check_defs_by_key: Mapping[AssetCheckKey, AssetsDefinition]
_assets_defs_by_check_key: Mapping[AssetCheckKey, AssetsDefinition]

def __init__(
self,
asset_nodes_by_key: Mapping[AssetKey, AssetNode],
asset_check_defs_by_key: Mapping[AssetCheckKey, AssetsDefinition],
assets_defs_by_check_key: Mapping[AssetCheckKey, AssetsDefinition],
):
self._asset_nodes_by_key = asset_nodes_by_key
self._asset_check_defs_by_key = asset_check_defs_by_key
self._assets_defs_by_check_key = assets_defs_by_check_key
self._asset_nodes_by_check_key = {
check_key: asset
for asset in asset_nodes_by_key.values()
Expand Down Expand Up @@ -223,27 +224,29 @@ def from_assets(
# Build the set of AssetNodes. Each node holds key rather than object references to parent
# and child nodes.
dep_graph = generate_asset_dep_graph(assets_defs)

assets_defs_by_check_key: Dict[AssetCheckKey, AssetsDefinition] = {}
check_keys_by_asset_key: DefaultDict[AssetKey, Set[AssetCheckKey]] = defaultdict(set)
for ad in assets_defs:
for ck in ad.check_keys:
check_keys_by_asset_key[ck.asset_key].add(ck)
assets_defs_by_check_key[ck] = ad

asset_nodes_by_key = {
key: AssetNode(
key=key,
parent_keys=dep_graph["upstream"][key],
child_keys=dep_graph["downstream"][key],
assets_def=ad,
check_keys={
*(ck for ad in assets_defs for ck in ad.check_keys if ck.asset_key == key),
},
check_keys=check_keys_by_asset_key[key],
)
for ad in assets_defs
for key in ad.keys
}

asset_check_defs_by_key = {
key: assets_def for assets_def in assets_defs for key in assets_def.check_keys
}

return AssetGraph(
asset_nodes_by_key=asset_nodes_by_key,
asset_check_defs_by_key=asset_check_defs_by_key,
assets_defs_by_check_key=assets_defs_by_check_key,
)

def get_execution_set_asset_and_check_keys(
Expand All @@ -263,7 +266,7 @@ def assets_defs(self) -> Sequence[AssetsDefinition]:
return list(
{
*(asset.assets_def for asset in self.asset_nodes),
*(ad for ad in self._asset_check_defs_by_key.values()),
*(ad for ad in self._assets_defs_by_check_key.values()),
}
)

Expand All @@ -274,9 +277,9 @@ def assets_defs_for_keys(
{
*[self.get(key).assets_def for key in keys if isinstance(key, AssetKey)],
*[
ad
for k, ad in self._asset_check_defs_by_key.items()
if k in keys and isinstance(ad, AssetsDefinition)
self._assets_defs_by_check_key[key]
for key in keys
if isinstance(key, AssetCheckKey)
],
}
)
Expand Down

0 comments on commit 5b9a78a

Please sign in to comment.