Skip to content

Commit

Permalink
Refactor asset key resolution in @asset and @graph_asset
Browse files Browse the repository at this point in the history
names

condense

comment
  • Loading branch information
schrockn committed Sep 25, 2023
1 parent 091d5f5 commit 8dbabb2
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,35 @@ def inner(fn: Callable[..., Any]) -> AssetsDefinition:
return inner


def _resolve_key_and_name(
*,
key: Optional[CoercibleToAssetKey],
key_prefix: Optional[CoercibleToAssetKeyPrefix],
name: Optional[str],
decorator: str,
fn: Callable[..., Any],
) -> Tuple[AssetKey, str]:
if (name or key_prefix) and key:
raise DagsterInvalidDefinitionError(
f"Cannot specify a name or key prefix for {decorator} when the key"
" argument is provided."
)
key_prefix_list = [key_prefix] if isinstance(key_prefix, str) else key_prefix
key = AssetKey.from_coercible(key) if key else None
assigned_name = name or fn.__name__
return (
(
# the filter here appears unnecessary per typing, but this exists
# historically so keeping it here to be conservative in case users
# can get Nones into the key_prefix_list somehow
AssetKey(list(filter(None, [*(key_prefix_list or []), assigned_name])))
if not key
else key
),
assigned_name,
)


class _Asset:
def __init__(
self,
Expand Down Expand Up @@ -284,9 +313,6 @@ def __init__(
check_specs: Optional[Sequence[AssetCheckSpec]] = None,
):
self.name = name

if isinstance(key_prefix, str):
key_prefix = [key_prefix]
self.key_prefix = key_prefix
self.ins = ins or {}
self.deps = deps or []
Expand All @@ -311,14 +337,7 @@ def __init__(
self.backfill_policy = backfill_policy
self.code_version = code_version
self.check_specs = check_specs

if (name or key_prefix) and key:
raise DagsterInvalidDefinitionError(
"Cannot specify a name or key prefix for an asset when the key argument is"
" provided."
)

self.key = AssetKey.from_coercible(key) if key is not None else None
self.key = key

def __call__(self, fn: Callable) -> AssetsDefinition:
from dagster._config.pythonic_config import (
Expand All @@ -327,15 +346,17 @@ def __call__(self, fn: Callable) -> AssetsDefinition:
from dagster._core.execution.build_resources import wrap_resources_for_execution

validate_resource_annotated_function(fn)
asset_name = self.name or fn.__name__

asset_ins = build_asset_ins(fn, self.ins or {}, {dep.asset_key for dep in self.deps})

out_asset_key = (
AssetKey(list(filter(None, [*(self.key_prefix or []), asset_name])))
if not self.key
else self.key
out_asset_key, asset_name = _resolve_key_and_name(
key=self.key,
key_prefix=self.key_prefix,
name=self.name,
fn=fn,
decorator="@asset",
)

with disable_dagster_warnings():
arg_resource_keys = {arg.name for arg in get_resource_args(fn)}

Expand Down Expand Up @@ -1040,20 +1061,14 @@ def slack_files_table():
key=key,
)
else:
if (name or key_prefix) and key:
raise DagsterInvalidDefinitionError(
"Cannot specify a name or key prefix for graph_asset when the key argument is"
" provided."
)

key = AssetKey.from_coercible(key) if key is not None else None

key_prefix = [key_prefix] if isinstance(key_prefix, str) else key_prefix
ins = ins or {}
asset_name = name or compose_fn.__name__
asset_ins = build_asset_ins(compose_fn, ins or {}, set())
out_asset_key = (
key if key else AssetKey(list(filter(None, [*(key_prefix or []), asset_name])))
out_asset_key, _asset_name = _resolve_key_and_name(
key=key,
key_prefix=key_prefix,
name=name,
decorator="@graph_asset",
fn=compose_fn,
)

keys_by_input_name = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2106,7 +2106,7 @@ def my_op() -> int:
with pytest.raises(
DagsterInvalidDefinitionError,
match=(
"Cannot specify a name or key prefix for graph_asset when the key argument is provided"
"Cannot specify a name or key prefix for @graph_asset when the key argument is provided"
),
):

Expand All @@ -2119,7 +2119,7 @@ def _specified_elsewhere() -> int:
with pytest.raises(
DagsterInvalidDefinitionError,
match=(
"Cannot specify a name or key prefix for graph_asset when the key argument is provided"
"Cannot specify a name or key prefix for @graph_asset when the key argument is provided"
),
):

Expand All @@ -2132,7 +2132,7 @@ def _specified_elsewhere() -> int:
with pytest.raises(
DagsterInvalidDefinitionError,
match=(
"Cannot specify a name or key prefix for graph_asset when the key argument is provided"
"Cannot specify a name or key prefix for @graph_asset when the key argument is provided"
),
):

Expand Down

0 comments on commit 8dbabb2

Please sign in to comment.