Skip to content

Commit

Permalink
Make @graph_asset support explicit key (#16751)
Browse files Browse the repository at this point in the history
## Summary & Motivation

`graph_asset`'s parameters are out-of-sync with `asset`'s on a structural level, and we do not have the discipline or the tests to keep them in sync. This PR adds an explicit `key` which is nice for users and would have made a recent PR of @johannkm's nicer, who had to do some contortions in #16612 around `key_prefix`. 

This diff also adds documentation for `key` on `@asset`.

An upstack PR will consolidate the logic in `@asset` and `@graph_asset` around this stuff into a single helper function, since it is pretty gross, tricky code.

## How I Tested These Changes
  • Loading branch information
schrockn authored Sep 26, 2023
1 parent 4657783 commit 30b817d
Show file tree
Hide file tree
Showing 6 changed files with 90 additions and 4 deletions.
2 changes: 1 addition & 1 deletion docs/content/api/modules.json

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion docs/content/api/searchindex.json

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion docs/content/api/sections.json

Large diffs are not rendered by default.

Binary file modified docs/next/public/objects.inv
Binary file not shown.
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,7 @@ def asset(
execute in the decorated function after materializing the asset.
non_argument_deps (Optional[Union[Set[AssetKey], Set[str]]]): Deprecated, use deps instead.
Set of asset keys that are upstream dependencies, but do not pass an input to the asset.
key (Optional[CoeercibleToAssetKey]): The key for this asset. If provided, cannot specify key_prefix or name.
Examples:
.. code-block:: python
Expand Down Expand Up @@ -939,6 +940,7 @@ def graph_asset(
backfill_policy: Optional[BackfillPolicy] = ...,
resource_defs: Optional[Mapping[str, ResourceDefinition]] = ...,
check_specs: Optional[Sequence[AssetCheckSpec]] = None,
key: Optional[CoercibleToAssetKey] = None,
) -> Callable[[Callable[..., Any]], AssetsDefinition]: ...


Expand All @@ -958,6 +960,7 @@ def graph_asset(
backfill_policy: Optional[BackfillPolicy] = None,
resource_defs: Optional[Mapping[str, ResourceDefinition]] = None,
check_specs: Optional[Sequence[AssetCheckSpec]] = None,
key: Optional[CoercibleToAssetKey] = None,
) -> Union[AssetsDefinition, Callable[[Callable[..., Any]], AssetsDefinition]]:
"""Creates a software-defined asset that's computed using a graph of ops.
Expand Down Expand Up @@ -1001,6 +1004,7 @@ def graph_asset(
auto_materialize_policy (Optional[AutoMaterializePolicy]): The AutoMaterializePolicy to use
for this asset.
backfill_policy (Optional[BackfillPolicy]): The BackfillPolicy to use for this asset.
key (Optional[CoeercibleToAssetKey]): The key for this asset. If provided, cannot specify key_prefix or name.
Examples:
.. code-block:: python
Expand Down Expand Up @@ -1033,13 +1037,24 @@ def slack_files_table():
backfill_policy=backfill_policy,
resource_defs=resource_defs,
check_specs=check_specs,
key=key,
)
else:
if (name or key_prefix) and key:
raise DagsterInvalidDefinitionError(
"Cannot specify a name or key prefix for graph_asset when the key argument is"
" provided."
)

key = AssetKey.from_coercible(key) if key is not None else None

key_prefix = [key_prefix] if isinstance(key_prefix, str) else key_prefix
ins = ins or {}
asset_name = name or compose_fn.__name__
asset_ins = build_asset_ins(compose_fn, ins or {}, set())
out_asset_key = AssetKey(list(filter(None, [*(key_prefix or []), asset_name])))
out_asset_key = (
key if key else AssetKey(list(filter(None, [*(key_prefix or []), asset_name])))
)

keys_by_input_name = {
input_name: asset_key for asset_key, (input_name, _) in asset_ins.items()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
from dagster._core.definitions.asset_graph import AssetGraph
from dagster._core.definitions.asset_spec import AssetSpec
from dagster._core.definitions.auto_materialize_policy import AutoMaterializePolicy
from dagster._core.definitions.decorators.asset_decorator import graph_asset
from dagster._core.definitions.events import AssetMaterialization
from dagster._core.definitions.result import MaterializeResult
from dagster._core.errors import (
Expand Down Expand Up @@ -2070,3 +2071,73 @@ def asset_key_context_with_two_specs(context: AssetExecutionContext):
match="Cannot call `context.asset_key` in a multi_asset with more than one asset",
):
materialize([asset_key_context_with_two_specs])


def test_graph_asset_explicit_coercible_key() -> None:
@op
def my_op() -> int:
return 1

# conversion
@graph_asset(key="my_graph_asset")
def _specified_elsewhere() -> int:
return my_op()

assert _specified_elsewhere.key == AssetKey(["my_graph_asset"])


def test_graph_asset_explicit_fully_key() -> None:
@op
def my_op() -> int:
return 1

@graph_asset(key=AssetKey("my_graph_asset"))
def _specified_elsewhere() -> int:
return my_op()

assert _specified_elsewhere.key == AssetKey(["my_graph_asset"])


def test_graph_asset_cannot_use_key_prefix_name_and_key() -> None:
@op
def my_op() -> int:
return 1

with pytest.raises(
DagsterInvalidDefinitionError,
match=(
"Cannot specify a name or key prefix for graph_asset when the key argument is provided"
),
):

@graph_asset(key_prefix="a_prefix", key=AssetKey("my_graph_asset"))
def _specified_elsewhere() -> int:
return my_op()

assert _specified_elsewhere # appease linter

with pytest.raises(
DagsterInvalidDefinitionError,
match=(
"Cannot specify a name or key prefix for graph_asset when the key argument is provided"
),
):

@graph_asset(name="a_name", key=AssetKey("my_graph_asset"))
def _specified_elsewhere() -> int:
return my_op()

assert _specified_elsewhere # appease linter

with pytest.raises(
DagsterInvalidDefinitionError,
match=(
"Cannot specify a name or key prefix for graph_asset when the key argument is provided"
),
):

@graph_asset(name="a_name", key_prefix="a_prefix", key=AssetKey("my_graph_asset"))
def _specified_elsewhere() -> int:
return my_op()

assert _specified_elsewhere # appease linter

1 comment on commit 30b817d

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Deploy preview for dagster-docs ready!

✅ Preview
https://dagster-docs-i6d3zm5xm-elementl.vercel.app
https://master.dagster.dagster-docs.io

Built with commit 30b817d.
This pull request is being automatically deployed with vercel-action

Please sign in to comment.