Skip to content

Commit

Permalink
rebase on asset decs
Browse files Browse the repository at this point in the history
  • Loading branch information
johannkm committed Sep 26, 2023
1 parent 2fec27e commit c7b6e26
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -201,8 +201,9 @@ def blocking_asset(**kwargs):

return graph_asset_no_defaults(
compose_fn=blocking_asset,
name=asset_def.key.path[-1],
key_prefix=asset_def.key.path[:-1] if len(asset_def.key.path) > 1 else None,
name=None,
key_prefix=None,
key=asset_def.key,
group_name=asset_def.group_names_by_key.get(asset_def.key),
partitions_def=asset_def.partitions_def,
check_specs=check_specs,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -962,7 +962,6 @@ def graph_asset(
resource_defs: Optional[Mapping[str, ResourceDefinition]] = ...,
check_specs: Optional[Sequence[AssetCheckSpec]] = None,
key: Optional[CoercibleToAssetKey] = None,
_graph_name: Optional[str] = None,
) -> Callable[[Callable[..., Any]], AssetsDefinition]: ...


Expand All @@ -983,7 +982,6 @@ def graph_asset(
resource_defs: Optional[Mapping[str, ResourceDefinition]] = None,
check_specs: Optional[Sequence[AssetCheckSpec]] = None,
key: Optional[CoercibleToAssetKey] = None,
_graph_name: Optional[str] = None,
) -> Union[AssetsDefinition, Callable[[Callable[..., Any]], AssetsDefinition]]:
"""Creates a software-defined asset that's computed using a graph of ops.
Expand Down Expand Up @@ -1061,41 +1059,9 @@ def slack_files_table():
resource_defs=resource_defs,
check_specs=check_specs,
key=key,
_graph_name=_graph_name,
)
else:
ins = ins or {}
asset_ins = build_asset_ins(compose_fn, ins or {}, set())
out_asset_key, _asset_name = _resolve_key_and_name(
key=key,
key_prefix=key_prefix,
name=name,
decorator="@graph_asset",
fn=compose_fn,
)

keys_by_input_name = {
input_name: asset_key for asset_key, (input_name, _) in asset_ins.items()
}
partition_mappings = {
input_name: asset_in.partition_mapping
for input_name, asset_in in ins.items()
if asset_in.partition_mapping
}

check_specs_by_output_name = _validate_and_assign_output_names_to_check_specs(
check_specs, [out_asset_key]
)
check_outs_by_output_name: Mapping[str, GraphOut] = {
output_name: GraphOut() for output_name in check_specs_by_output_name.keys()
}

combined_outs_by_output_name: Mapping = {
"result": GraphOut(),
**check_outs_by_output_name,
}

return graph_asset_no_defaults(
graph_asset_no_defaults(
compose_fn=compose_fn,
name=name,
description=description,
Expand All @@ -1110,6 +1076,7 @@ def slack_files_table():
backfill_policy=backfill_policy,
resource_defs=resource_defs,
check_specs=check_specs,
key=key,
)


Expand All @@ -1129,12 +1096,17 @@ def graph_asset_no_defaults(
backfill_policy: Optional[BackfillPolicy],
resource_defs: Optional[Mapping[str, ResourceDefinition]],
check_specs: Optional[Sequence[AssetCheckSpec]],
key: Optional[CoercibleToAssetKey],
) -> AssetsDefinition:
key_prefix = [key_prefix] if isinstance(key_prefix, str) else key_prefix
ins = ins or {}
asset_name = name or compose_fn.__name__
asset_ins = build_asset_ins(compose_fn, ins or {}, set())
out_asset_key = AssetKey(list(filter(None, [*(key_prefix or []), asset_name])))
out_asset_key, _asset_name = _resolve_key_and_name(
key=key,
key_prefix=key_prefix,
name=name,
decorator="@graph_asset",
fn=compose_fn,
)

keys_by_input_name = {input_name: asset_key for asset_key, (input_name, _) in asset_ins.items()}
partition_mappings = {
Expand Down

0 comments on commit c7b6e26

Please sign in to comment.