Skip to content

Commit

Permalink
Allow attaching IO managers to specs (#23674)
Browse files Browse the repository at this point in the history
## Summary

For purposes of migrating older `@multi_assets` without dropping custom
IO manager capability, it's useful to be able to specify an IO manager
to use for each asset output. Currently, an invariant will error if the
special `SYSTEM_METADATA_KEY_IO_MANAGER_KEY` key is set on a
materializable multi-asset.

This PR drops that requirement and threads through this IO manager
expectation into the `Out`, also allowing any data type to be returned
if an IO manager override is specified.


Before:
```python
@multi_asset(
  outs={
    "foo": AssetOut(key=AssetKey("foo_asset"), io_manager_key="foo_io_manager", dagster_type=int)
    "bar": AssetOut(key=AssetKey("bar_asset"), io_manager_key="bar_io_manager", dagster_type=int)
  }
)
def my_multi_asset():
  ...
  yield Output(output_name="foo", value=5)
  yield Output(output_name="bar", value=10)

```

After:
```python
@multi_asset(
  specs=[
    AssetSpec(key=AssetKey("foo_asset"), metadata={SYSTEM_METADATA_KEY_IO_MANAGER_KEY: "foo_io_manager"}),
    AssetSpec(key=AssetKey("bar_asset"), metadata={SYSTEM_METADATA_KEY_IO_MANAGER_KEY: "bar_io_manager"}),
  ]
)
def my_multi_asset():
  ...
  yield Output(output_name=AssetKey("foo_asset").to_python_identifier(), value=5)
  yield Output(output_name=AssetKey("bar_asset").to_python_identifier(), value=10)
```

This is not particularly elegant, but lets us update existing
multi-assets with IO-manager-processed outputs in-place to use specs,
without breaking downstream assets which might depend on them. For a
concrete example, see #23676. We wouldn't expect users to replicate this
pattern, & shouldn't do so on any greenfield code.

## Test Plan

New unit tests.
  • Loading branch information
benpankow authored Aug 21, 2024
1 parent 588e8eb commit eb005be
Show file tree
Hide file tree
Showing 4 changed files with 98 additions and 4 deletions.
2 changes: 2 additions & 0 deletions python_modules/dagster/dagster/_core/definitions/assets.py
Original file line number Diff line number Diff line change
Expand Up @@ -1423,6 +1423,8 @@ def get_io_manager_key_for_asset_key(self, key: AssetKey) -> str:
SYSTEM_METADATA_KEY_IO_MANAGER_KEY, DEFAULT_IO_MANAGER_KEY
)
else:
if SYSTEM_METADATA_KEY_IO_MANAGER_KEY in self._specs_by_key[key].metadata:
return self._specs_by_key[key].metadata[SYSTEM_METADATA_KEY_IO_MANAGER_KEY]
check.invariant(
SYSTEM_METADATA_KEY_IO_MANAGER_KEY not in self._specs_by_key[key].metadata
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,11 @@
from dagster._core.definitions.asset_in import AssetIn
from dagster._core.definitions.asset_key import AssetKey
from dagster._core.definitions.asset_out import AssetOut
from dagster._core.definitions.asset_spec import AssetExecutionType, AssetSpec
from dagster._core.definitions.asset_spec import (
SYSTEM_METADATA_KEY_IO_MANAGER_KEY,
AssetExecutionType,
AssetSpec,
)
from dagster._core.definitions.assets import (
ASSET_SUBSET_INPUT_PREFIX,
AssetsDefinition,
Expand All @@ -41,7 +45,11 @@
from dagster._core.definitions.resource_definition import ResourceDefinition
from dagster._core.errors import DagsterInvalidDefinitionError
from dagster._core.storage.tags import COMPUTE_KIND_TAG
from dagster._core.types.dagster_type import DagsterType, Nothing
from dagster._core.types.dagster_type import (
Any as DagsterAny,
DagsterType,
Nothing,
)

from ..asset_check_spec import AssetCheckSpec
from ..utils import NoValueSentinel
Expand Down Expand Up @@ -328,11 +336,14 @@ def from_multi_asset_specs(
named_outs_by_asset_key[asset_spec.key] = NamedOut(
output_name,
Out(
Nothing,
DagsterAny
if asset_spec.metadata.get(SYSTEM_METADATA_KEY_IO_MANAGER_KEY)
else Nothing,
is_required=not (can_subset or asset_spec.skippable),
description=asset_spec.description,
code_version=asset_spec.code_version,
metadata=asset_spec.metadata,
io_manager_key=asset_spec.metadata.get(SYSTEM_METADATA_KEY_IO_MANAGER_KEY),
),
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
from dagster._core.definitions import AssetIn, SourceAsset, asset, multi_asset
from dagster._core.definitions.asset_check_spec import AssetCheckKey
from dagster._core.definitions.asset_graph import AssetGraph
from dagster._core.definitions.asset_spec import AssetSpec
from dagster._core.definitions.asset_spec import SYSTEM_METADATA_KEY_IO_MANAGER_KEY, AssetSpec
from dagster._core.definitions.auto_materialize_policy import AutoMaterializePolicy
from dagster._core.definitions.decorators.asset_decorator import graph_asset
from dagster._core.definitions.events import AssetMaterialization
Expand Down Expand Up @@ -711,6 +711,55 @@ def my_asset(context):
assert bar_list == [2]


def test_multi_asset_io_manager_execution_specs() -> None:
class MyIOManager(IOManager):
def __init__(self, the_list):
self._the_list = the_list

def handle_output(self, _context, obj):
self._the_list.append(obj)

def load_input(self, _context):
pass

foo_list = []

@resource
def baz_resource():
return "baz"

@io_manager(required_resource_keys={"baz"})
def foo_manager(context):
assert context.resources.baz == "baz"
return MyIOManager(foo_list)

bar_list = []

@io_manager
def bar_manager():
return MyIOManager(bar_list)

@multi_asset(
specs=[
AssetSpec(key=AssetKey("key1"), metadata={SYSTEM_METADATA_KEY_IO_MANAGER_KEY: "foo"}),
AssetSpec(key=AssetKey("key2"), metadata={SYSTEM_METADATA_KEY_IO_MANAGER_KEY: "bar"}),
],
resource_defs={"foo": foo_manager, "bar": bar_manager, "baz": baz_resource},
)
def my_asset(context):
# Required io manager keys are available on the context, same behavoir as ops
assert hasattr(context.resources, "foo")
assert hasattr(context.resources, "bar")
yield Output(1, "key1")
yield Output(2, "key2")

with instance_for_test() as instance:
materialize([my_asset], instance=instance)

assert foo_list == [1]
assert bar_list == [2]


def test_graph_backed_asset_resources():
@op(required_resource_keys={"foo"})
def the_op(context):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
from dagster._check import CheckError
from dagster._config.pythonic_config import Config
from dagster._core.definitions import AssetIn, AssetsDefinition, asset, multi_asset
from dagster._core.definitions.asset_spec import SYSTEM_METADATA_KEY_IO_MANAGER_KEY, AssetSpec
from dagster._core.definitions.declarative_automation.automation_condition import (
AutomationCondition,
)
Expand Down Expand Up @@ -683,6 +684,37 @@ def my_asset():
)


@ignore_warning("Parameter `resource_defs` .* is experimental")
def test_multi_asset_resource_defs_specs() -> None:
@resource
def baz_resource():
pass

@io_manager(required_resource_keys={"baz"})
def foo_manager():
pass

@io_manager
def bar_manager():
pass

@multi_asset(
specs=[
AssetSpec("key1", metadata={SYSTEM_METADATA_KEY_IO_MANAGER_KEY: "foo"}),
AssetSpec("key2", metadata={SYSTEM_METADATA_KEY_IO_MANAGER_KEY: "bar"}),
],
resource_defs={"foo": foo_manager, "bar": bar_manager, "baz": baz_resource},
)
def my_asset():
pass

assert my_asset.required_resource_keys == {"foo", "bar", "baz"}

ensure_requirements_satisfied(
my_asset.resource_defs, list(my_asset.get_resource_requirements())
)


def test_multi_asset_code_versions():
@multi_asset(
outs={
Expand Down

0 comments on commit eb005be

Please sign in to comment.