Skip to content

Commit

Permalink
Renamed build_asset_ins to build_named_ins (#22381)
Browse files Browse the repository at this point in the history
## Summary & Motivation

Rename build_asset_ins to build_named_ins

## How I Tested These Changes

BK
  • Loading branch information
schrockn authored Jun 9, 2024
1 parent 9c36bcd commit b0f0f9b
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
from ..input import In
from .asset_decorator import make_asset_deps
from .decorator_assets_definition_builder import (
build_asset_ins,
build_named_ins,
compute_required_resource_keys,
get_function_params_without_context_or_config_or_resources,
)
Expand Down Expand Up @@ -91,7 +91,7 @@ def _build_asset_check_input(
" the target asset or be specified in 'additional_ins'."
)

return build_asset_ins(
return build_named_ins(
fn=fn,
asset_ins=all_ins,
deps=all_deps,
Expand Down Expand Up @@ -331,7 +331,7 @@ def inner(fn: MultiAssetCheckFunction) -> AssetChecksDefinition:
outs = {
spec.get_python_identifier(): Out(None, is_required=not can_subset) for spec in specs
}
input_tuples_by_asset_key = build_asset_ins(
named_ins_by_asset_key = build_named_ins(
fn=fn,
asset_ins={},
deps={spec.asset_key for spec in specs}
Expand All @@ -342,7 +342,7 @@ def inner(fn: MultiAssetCheckFunction) -> AssetChecksDefinition:
op_def = _Op(
name=op_name,
description=description,
ins=dict(input_tuples_by_asset_key.values()),
ins=dict(named_ins_by_asset_key.values()),
out=outs,
required_resource_keys=op_required_resource_keys,
tags={
Expand All @@ -358,7 +358,7 @@ def inner(fn: MultiAssetCheckFunction) -> AssetChecksDefinition:
resource_defs=wrap_resources_for_execution(resource_defs),
keys_by_input_name={
input_tuple[0]: asset_key
for asset_key, input_tuple in input_tuples_by_asset_key.items()
for asset_key, input_tuple in named_ins_by_asset_key.items()
},
check_specs_by_output_name={spec.get_python_identifier(): spec for spec in specs},
can_subset=can_subset,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,8 @@
from ..utils import DEFAULT_IO_MANAGER_KEY, DEFAULT_OUTPUT, NoValueSentinel, validate_tags_strict
from .decorator_assets_definition_builder import (
DecoratorAssetsDefinitionBuilderArgs,
build_asset_ins,
build_asset_outs,
build_named_ins,
)


Expand Down Expand Up @@ -797,7 +797,7 @@ def graph_asset_no_defaults(
key: Optional[CoercibleToAssetKey],
) -> AssetsDefinition:
ins = ins or {}
asset_ins = build_asset_ins(compose_fn, ins or {}, set())
named_ins = build_named_ins(compose_fn, ins or {}, set())
out_asset_key, _asset_name = resolve_asset_key_and_name_for_decorator(
key=key,
key_prefix=key_prefix,
Expand All @@ -806,7 +806,7 @@ def graph_asset_no_defaults(
fn=compose_fn,
)

keys_by_input_name = {input_name: asset_key for asset_key, (input_name, _) in asset_ins.items()}
keys_by_input_name = {input_name: asset_key for asset_key, (input_name, _) in named_ins.items()}
partition_mappings = {
input_name: asset_in.partition_mapping
for input_name, asset_in in ins.items()
Expand All @@ -829,7 +829,7 @@ def graph_asset_no_defaults(
name=out_asset_key.to_python_identifier(),
description=description,
config=config,
ins={input_name: GraphIn() for _, (input_name, _) in asset_ins.items()},
ins={input_name: GraphIn() for _, (input_name, _) in named_ins.items()},
out=combined_outs_by_output_name,
)(compose_fn)
return AssetsDefinition.from_graph(
Expand Down Expand Up @@ -909,9 +909,9 @@ def inner(fn: Callable[..., Any]) -> AssetsDefinition:
if asset_in.partition_mapping
}

asset_ins = build_asset_ins(fn, ins or {}, set())
named_ins = build_named_ins(fn, ins or {}, set())
keys_by_input_name = {
input_name: asset_key for asset_key, (input_name, _) in asset_ins.items()
input_name: asset_key for asset_key, (input_name, _) in named_ins.items()
}
asset_outs = build_asset_outs(outs)

Expand All @@ -931,7 +931,7 @@ def inner(fn: Callable[..., Any]) -> AssetsDefinition:
name=name or fn.__name__,
out=combined_outs_by_output_name,
config=config,
ins={input_name: GraphIn() for _, (input_name, _) in asset_ins.items()},
ins={input_name: GraphIn() for _, (input_name, _) in named_ins.items()},
)(fn)

# source metadata from the AssetOuts (if any)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ def get_function_params_without_context_or_config_or_resources(
return new_input_args


def build_asset_ins(
def build_named_ins(
fn: Callable[..., Any],
asset_ins: Mapping[str, AssetIn],
deps: Optional[AbstractSet[AssetKey]],
Expand Down Expand Up @@ -99,7 +99,7 @@ def build_asset_ins(
"of the arguments to the decorated function"
)

ins_by_asset_key: Dict[AssetKey, NamedIn] = {}
named_ins_by_asset_key: Dict[AssetKey, NamedIn] = {}
for input_name in all_input_names:
asset_key = None

Expand All @@ -117,23 +117,23 @@ def build_asset_ins(

asset_key = asset_key or AssetKey(list(filter(None, [*(key_prefix or []), input_name])))

ins_by_asset_key[asset_key] = NamedIn(
named_ins_by_asset_key[asset_key] = NamedIn(
input_name.replace("-", "_"),
In(metadata=metadata, input_manager_key=input_manager_key, dagster_type=dagster_type),
)

for asset_key in deps:
if asset_key in ins_by_asset_key:
if asset_key in named_ins_by_asset_key:
raise DagsterInvalidDefinitionError(
f"deps value {asset_key} also declared as input/AssetIn"
)
# mypy doesn't realize that Nothing is a valid type here
ins_by_asset_key[asset_key] = NamedIn(
named_ins_by_asset_key[asset_key] = NamedIn(
stringify_asset_key_to_input_name(asset_key),
In(cast(type, Nothing)),
)

return ins_by_asset_key
return named_ins_by_asset_key


def build_asset_outs(asset_outs: Mapping[str, AssetOut]) -> Mapping[AssetKey, "NamedOut"]:
Expand All @@ -150,7 +150,7 @@ def build_asset_outs(asset_outs: Mapping[str, AssetOut]) -> Mapping[AssetKey, "N
return named_outs_by_asset_key


def build_subsettable_asset_ins(
def build_subsettable_named_ins(
asset_ins: Mapping[AssetKey, Tuple[str, In]],
asset_outs: Mapping[AssetKey, Tuple[str, Out]],
internal_upstream_deps: Iterable[AbstractSet[AssetKey]],
Expand Down Expand Up @@ -252,7 +252,7 @@ def __init__(
(
{
**named_ins_by_asset_key,
**build_subsettable_asset_ins(
**build_subsettable_named_ins(
named_ins_by_asset_key,
named_outs_by_asset_key,
self.internal_deps.values(),
Expand Down Expand Up @@ -324,15 +324,15 @@ def from_specs(

explicit_ins = args.asset_in_map
# get which asset keys have inputs set
loaded_upstreams = build_asset_ins(fn, explicit_ins, deps=set())
loaded_upstreams = build_named_ins(fn, explicit_ins, deps=set())
unexpected_upstreams = {key for key in loaded_upstreams.keys() if key not in upstream_keys}
if unexpected_upstreams:
raise DagsterInvalidDefinitionError(
f"Asset inputs {unexpected_upstreams} do not have dependencies on the passed"
" AssetSpec(s). Set the deps on the appropriate AssetSpec(s)."
)
remaining_upstream_keys = {key for key in upstream_keys if key not in loaded_upstreams}
input_tuples_by_asset_key = build_asset_ins(fn, explicit_ins, deps=remaining_upstream_keys)
named_ins_by_asset_key = build_named_ins(fn, explicit_ins, deps=remaining_upstream_keys)

internal_deps = {
spec.key: {dep.asset_key for dep in spec.deps}
Expand All @@ -341,7 +341,7 @@ def from_specs(
}

return DecoratorAssetsDefinitionBuilder(
named_ins_by_asset_key=input_tuples_by_asset_key,
named_ins_by_asset_key=named_ins_by_asset_key,
named_outs_by_asset_key=named_outs_by_asset_key,
internal_deps=internal_deps,
op_name=op_name,
Expand All @@ -358,7 +358,7 @@ def from_asset_outs(
):
check.param_invariant(not args.specs, "args", "This codepath for non-spec based create")
asset_out_map = args.asset_out_map
inputs_tuples_by_asset_key = build_asset_ins(
named_ins_by_asset_key = build_named_ins(
fn,
args.asset_in_map,
deps=(
Expand All @@ -373,7 +373,7 @@ def from_asset_outs(

# validate that the asset_ins are a subset of the upstream asset_deps.
upstream_internal_asset_keys = set().union(*asset_deps.values())
asset_in_keys = set(inputs_tuples_by_asset_key.keys())
asset_in_keys = set(named_ins_by_asset_key.keys())
if asset_deps and not asset_in_keys.issubset(upstream_internal_asset_keys):
invalid_asset_in_keys = asset_in_keys - upstream_internal_asset_keys
check.failed(
Expand Down Expand Up @@ -405,7 +405,7 @@ def from_asset_outs(
internal_deps = {keys_by_output_name[name]: asset_deps[name] for name in asset_deps}

return DecoratorAssetsDefinitionBuilder(
named_ins_by_asset_key=inputs_tuples_by_asset_key,
named_ins_by_asset_key=named_ins_by_asset_key,
named_outs_by_asset_key=output_tuples_by_asset_key,
internal_deps=internal_deps,
op_name=op_name,
Expand Down

0 comments on commit b0f0f9b

Please sign in to comment.