From c7b6e266ad83bb184f4ae6e09dc1bb1dd3ef1f07 Mon Sep 17 00:00:00 2001 From: Johann Miller Date: Tue, 26 Sep 2023 11:50:03 -0400 Subject: [PATCH] rebase on asset decs --- .../dagster/_core/definitions/asset_checks.py | 5 +- .../definitions/decorators/asset_decorator.py | 48 ++++--------------- 2 files changed, 13 insertions(+), 40 deletions(-) diff --git a/python_modules/dagster/dagster/_core/definitions/asset_checks.py b/python_modules/dagster/dagster/_core/definitions/asset_checks.py index 21fbddd85eb0f..20d28ad947b71 100644 --- a/python_modules/dagster/dagster/_core/definitions/asset_checks.py +++ b/python_modules/dagster/dagster/_core/definitions/asset_checks.py @@ -201,8 +201,9 @@ def blocking_asset(**kwargs): return graph_asset_no_defaults( compose_fn=blocking_asset, - name=asset_def.key.path[-1], - key_prefix=asset_def.key.path[:-1] if len(asset_def.key.path) > 1 else None, + name=None, + key_prefix=None, + key=asset_def.key, group_name=asset_def.group_names_by_key.get(asset_def.key), partitions_def=asset_def.partitions_def, check_specs=check_specs, diff --git a/python_modules/dagster/dagster/_core/definitions/decorators/asset_decorator.py b/python_modules/dagster/dagster/_core/definitions/decorators/asset_decorator.py index 3dd3b072a0a02..43bc02203e6f4 100644 --- a/python_modules/dagster/dagster/_core/definitions/decorators/asset_decorator.py +++ b/python_modules/dagster/dagster/_core/definitions/decorators/asset_decorator.py @@ -962,7 +962,6 @@ def graph_asset( resource_defs: Optional[Mapping[str, ResourceDefinition]] = ..., check_specs: Optional[Sequence[AssetCheckSpec]] = None, key: Optional[CoercibleToAssetKey] = None, - _graph_name: Optional[str] = None, ) -> Callable[[Callable[..., Any]], AssetsDefinition]: ... @@ -983,7 +982,6 @@ def graph_asset( resource_defs: Optional[Mapping[str, ResourceDefinition]] = None, check_specs: Optional[Sequence[AssetCheckSpec]] = None, key: Optional[CoercibleToAssetKey] = None, - _graph_name: Optional[str] = None, ) -> Union[AssetsDefinition, Callable[[Callable[..., Any]], AssetsDefinition]]: """Creates a software-defined asset that's computed using a graph of ops. @@ -1061,41 +1059,9 @@ def slack_files_table(): resource_defs=resource_defs, check_specs=check_specs, key=key, - _graph_name=_graph_name, ) else: - ins = ins or {} - asset_ins = build_asset_ins(compose_fn, ins or {}, set()) - out_asset_key, _asset_name = _resolve_key_and_name( - key=key, - key_prefix=key_prefix, - name=name, - decorator="@graph_asset", - fn=compose_fn, - ) - - keys_by_input_name = { - input_name: asset_key for asset_key, (input_name, _) in asset_ins.items() - } - partition_mappings = { - input_name: asset_in.partition_mapping - for input_name, asset_in in ins.items() - if asset_in.partition_mapping - } - - check_specs_by_output_name = _validate_and_assign_output_names_to_check_specs( - check_specs, [out_asset_key] - ) - check_outs_by_output_name: Mapping[str, GraphOut] = { - output_name: GraphOut() for output_name in check_specs_by_output_name.keys() - } - - combined_outs_by_output_name: Mapping = { - "result": GraphOut(), - **check_outs_by_output_name, - } - - return graph_asset_no_defaults( + graph_asset_no_defaults( compose_fn=compose_fn, name=name, description=description, @@ -1110,6 +1076,7 @@ def slack_files_table(): backfill_policy=backfill_policy, resource_defs=resource_defs, check_specs=check_specs, + key=key, ) @@ -1129,12 +1096,17 @@ def graph_asset_no_defaults( backfill_policy: Optional[BackfillPolicy], resource_defs: Optional[Mapping[str, ResourceDefinition]], check_specs: Optional[Sequence[AssetCheckSpec]], + key: Optional[CoercibleToAssetKey], ) -> AssetsDefinition: - key_prefix = [key_prefix] if isinstance(key_prefix, str) else key_prefix ins = ins or {} - asset_name = name or compose_fn.__name__ asset_ins = build_asset_ins(compose_fn, ins or {}, set()) - out_asset_key = AssetKey(list(filter(None, [*(key_prefix or []), asset_name]))) + out_asset_key, _asset_name = _resolve_key_and_name( + key=key, + key_prefix=key_prefix, + name=name, + decorator="@graph_asset", + fn=compose_fn, + ) keys_by_input_name = {input_name: asset_key for asset_key, (input_name, _) in asset_ins.items()} partition_mappings = {