Skip to content

Commit

Permalink
memoize inputs and outputs in op/graph snapshots (dagster-io#26482)
Browse files Browse the repository at this point in the history
Summary:
Iterating through big lists is slow, indexing into a mapping is fast.

Test Plan: BK, speedscope on large multi-asset with many outputs

## Summary & Motivation

## How I Tested These Changes

## Changelog

> Insert changelog entry or delete this section.
  • Loading branch information
gibsondan authored and pskinnerthyme committed Dec 16, 2024
1 parent c252b84 commit a8ad2fe
Showing 1 changed file with 23 additions and 6 deletions.
29 changes: 23 additions & 6 deletions python_modules/dagster/dagster/_core/snap/node.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from functools import cached_property
from typing import Mapping, NamedTuple, Optional, Sequence, Union

import dagster._check as check
Expand Down Expand Up @@ -233,6 +234,14 @@ def __new__(
),
)

@cached_property
def input_def_map(self) -> Mapping[str, InputDefSnap]:
return {input_def.name: input_def for input_def in self.input_def_snaps}

@cached_property
def output_def_map(self) -> Mapping[str, OutputDefSnap]:
return {output_def.name: output_def for output_def in self.output_def_snaps}

def get_input_snap(self, name: str) -> InputDefSnap:
return _get_input_snap(self, name)

Expand Down Expand Up @@ -282,6 +291,14 @@ def __new__(
),
)

@cached_property
def input_def_map(self) -> Mapping[str, InputDefSnap]:
return {input_def.name: input_def for input_def in self.input_def_snaps}

@cached_property
def output_def_map(self) -> Mapping[str, OutputDefSnap]:
return {output_def.name: output_def for output_def in self.output_def_snaps}

def get_input_snap(self, name: str) -> InputDefSnap:
return _get_input_snap(self, name)

Expand Down Expand Up @@ -387,18 +404,18 @@ def build_op_def_snap(op_def: OpDefinition) -> OpDefSnap:
# shared impl for GraphDefSnap and OpDefSnap
def _get_input_snap(node_def: Union[GraphDefSnap, OpDefSnap], name: str) -> InputDefSnap:
check.str_param(name, "name")
for inp in node_def.input_def_snaps:
if inp.name == name:
return inp
inp = node_def.input_def_map.get(name)
if inp:
return inp

check.failed(f"Could not find input {name} in op def {node_def.name}")


# shared impl for GraphDefSnap and OpDefSnap
def _get_output_snap(node_def: Union[GraphDefSnap, OpDefSnap], name: str) -> OutputDefSnap:
check.str_param(name, "name")
for out in node_def.output_def_snaps:
if out.name == name:
return out
inp = node_def.output_def_map.get(name)
if inp:
return inp

check.failed(f"Could not find output {name} in node def {node_def.name}")

0 comments on commit a8ad2fe

Please sign in to comment.