diff --git a/python_modules/dagster/dagster/_core/execution/plan/inputs.py b/python_modules/dagster/dagster/_core/execution/plan/inputs.py index e8860c834fbf2..df981aa57887b 100644 --- a/python_modules/dagster/dagster/_core/execution/plan/inputs.py +++ b/python_modules/dagster/dagster/_core/execution/plan/inputs.py @@ -122,10 +122,12 @@ def compute_version( raise NotImplementedError() -@whitelist_for_serdes(storage_field_names={"node_handle": "solid_handle"}) -class FromSourceAsset( +@whitelist_for_serdes( + storage_name="FromSourceAsset", storage_field_names={"node_handle": "solid_handle"} +) +class FromLoadableAsset( NamedTuple( - "_FromSourceAsset", + "_FromLoadableAsset", [ ("node_handle", NodeHandle), ("input_name", str), diff --git a/python_modules/dagster/dagster/_core/execution/plan/plan.py b/python_modules/dagster/dagster/_core/execution/plan/plan.py index f15e7219bbcd7..bdd2e46cf4501 100644 --- a/python_modules/dagster/dagster/_core/execution/plan/plan.py +++ b/python_modules/dagster/dagster/_core/execution/plan/plan.py @@ -62,10 +62,10 @@ FromDirectInputValue, FromDynamicCollect, FromInputManager, + FromLoadableAsset, FromMultipleSources, FromMultipleSourcesLoadSingleSource, FromPendingDynamicStepOutput, - FromSourceAsset, FromStepOutput, FromUnresolvedStepOutput, StepInput, @@ -441,7 +441,7 @@ def get_step_input_source( ): # can only load from source asset if assets defs are available if asset_layer.asset_key_for_input(handle, input_handle.input_name): - return FromSourceAsset(node_handle=handle, input_name=input_name) + return FromLoadableAsset(node_handle=handle, input_name=input_name) elif input_def.input_manager_key: return FromInputManager(node_handle=handle, input_name=input_name) @@ -626,7 +626,7 @@ def _step_input_source_from_blocking_asset_checks_dep_def( if source_to_load_from is None: asset_key_for_input = asset_layer.asset_key_for_input(node_handle, input_handle.input_name) if asset_key_for_input: - source_to_load_from = FromSourceAsset(node_handle=node_handle, input_name=input_name) + source_to_load_from = FromLoadableAsset(node_handle=node_handle, input_name=input_name) sources.append(source_to_load_from) else: check.failed("Unexpected: no sources to load from and no asset key to load from")