diff --git a/python_modules/dagster/dagster/_core/execution/context/compute.py b/python_modules/dagster/dagster/_core/execution/context/compute.py index d76c79afe3a8d..6acae1f22ce98 100644 --- a/python_modules/dagster/dagster/_core/execution/context/compute.py +++ b/python_modules/dagster/dagster/_core/execution/context/compute.py @@ -1222,6 +1222,14 @@ def asset_check_spec(self) -> AssetCheckSpec: ) return asset_checks_def.spec + # In explicit mode, no conversion is done on returned values and missing but expected outputs are not + # allowed. + def is_explicit_mode(self) -> bool: + return self._step_execution_context.is_explicit_mode() + + def set_explicit_mode(self): + self._step_execution_context.set_explicit_mode() + # actually forking the object type for assets is tricky for users in the cases of: # * manually constructing ops to make AssetsDefinitions diff --git a/python_modules/dagster/dagster/_core/execution/context/invocation.py b/python_modules/dagster/dagster/_core/execution/context/invocation.py index 46e0363f31b9d..d9c8b643d32d6 100644 --- a/python_modules/dagster/dagster/_core/execution/context/invocation.py +++ b/python_modules/dagster/dagster/_core/execution/context/invocation.py @@ -447,6 +447,7 @@ def __init__( self._partition_key = partition_key self._partition_key_range = partition_key_range self._assets_def = assets_def + self._is_explicit_mode = False @property def op_config(self) -> Any: @@ -714,6 +715,14 @@ def add_metadata_two_outputs(context) -> Tuple[str, int]: else: self._output_metadata[output_name] = metadata + # In explicit mode, no conversion is done on returned values and missing but expected outputs are not + # allowed. + def is_explicit_mode(self) -> bool: + return self._is_explicit_mode + + def set_explicit_mode(self): + self._is_explicit_mode = True + def build_op_context( resources: Optional[Mapping[str, Any]] = None, diff --git a/python_modules/dagster/dagster/_core/execution/context/system.py b/python_modules/dagster/dagster/_core/execution/context/system.py index e36f176ef9b27..cba11e4c0fc09 100644 --- a/python_modules/dagster/dagster/_core/execution/context/system.py +++ b/python_modules/dagster/dagster/_core/execution/context/system.py @@ -559,6 +559,16 @@ def __init__( self._is_external_input_asset_version_info_loaded = False self._data_version_cache: Dict[AssetKey, "DataVersion"] = {} + self._is_explicit_mode = False + + # In explicit mode, no conversion is done on returned values and missing but expected outputs are not + # allowed. + def is_explicit_mode(self) -> bool: + return self._is_explicit_mode + + def set_explicit_mode(self) -> None: + self._is_explicit_mode = True + @property def step(self) -> ExecutionStep: return self._step diff --git a/python_modules/dagster/dagster/_core/execution/plan/compute.py b/python_modules/dagster/dagster/_core/execution/plan/compute.py index 549c091f655a2..c168b3c91bed4 100644 --- a/python_modules/dagster/dagster/_core/execution/plan/compute.py +++ b/python_modules/dagster/dagster/_core/execution/plan/compute.py @@ -27,6 +27,7 @@ NodeHandle, Output, ) +from dagster._core.definitions.asset_check_spec import AssetCheckHandle from dagster._core.definitions.asset_layer import AssetLayer from dagster._core.definitions.op_definition import OpComputeFunction from dagster._core.definitions.result import MaterializeResult @@ -203,14 +204,42 @@ def execute_core_compute( for step_output in _yield_compute_results(step_context, inputs, compute_fn): yield step_output if isinstance(step_output, (DynamicOutput, Output)): - emitted_result_names.add(step_output.output_name) + output_name = step_output.output_name + elif isinstance(step_output, MaterializeResult): + asset_key = ( + step_output.asset_key + or step_context.job_def.asset_layer.asset_key_for_node(step_context.node_handle) + ) + output_name = step_context.job_def.asset_layer.node_output_handle_for_asset( + asset_key + ).output_name + elif isinstance(step_output, AssetCheckEvaluation): + output_name = step_context.job_def.asset_layer.get_output_name_for_asset_check( + step_output.asset_check_handle + ) + elif isinstance(step_output, AssetCheckResult): + if step_output.asset_key and step_output.check_name: + handle = AssetCheckHandle(step_output.asset_key, step_output.check_name) + else: + handle = step_output.to_asset_check_evaluation(step_context).asset_check_handle + output_name = step_context.job_def.asset_layer.get_output_name_for_asset_check(handle) + else: + output_name = None + if output_name: + emitted_result_names.add(output_name) expected_op_output_names = { output.name for output in step.step_outputs if not output.properties.asset_check_handle } omitted_outputs = expected_op_output_names.difference(emitted_result_names) if omitted_outputs: - step_context.log.info( - f"{step_context.op_def.node_type_str} '{step.node_handle}' did not fire " - f"outputs {omitted_outputs!r}" - ) + if step_context.is_explicit_mode(): + raise DagsterInvariantViolationError( + f"{step_context.op_def.node_type_str} '{step.node_handle}' did not yield or return " + f"expected outputs {omitted_outputs!r}." + ) + else: + step_context.log.info( + f"{step_context.op_def.node_type_str} '{step.node_handle}' did not fire " + f"outputs {omitted_outputs!r}" + ) diff --git a/python_modules/dagster/dagster/_core/execution/plan/compute_generator.py b/python_modules/dagster/dagster/_core/execution/plan/compute_generator.py index 360f50f62b851..8c6274ea7ccc8 100644 --- a/python_modules/dagster/dagster/_core/execution/plan/compute_generator.py +++ b/python_modules/dagster/dagster/_core/execution/plan/compute_generator.py @@ -268,73 +268,87 @@ def validate_and_coerce_op_result_to_iterator( " return no results." ) elif output_defs: - for position, output_def, element in _zip_and_iterate_op_result( - result, context, output_defs - ): - annotation = _get_annotation_for_output_position(position, context.op_def, output_defs) - if output_def.is_dynamic: - if not isinstance(element, list): - raise DagsterInvariantViolationError( - f"Error with output for {context.describe_op()}: " - f"dynamic output '{output_def.name}' expected a list of " - "DynamicOutput objects, but instead received instead an " - f"object of type {type(element)}." - ) - for item in element: - if not isinstance(item, DynamicOutput): + # Skip any return-specific validation and treat it like a generator op + if context.is_explicit_mode(): + # If nothing was returned, treat it as an empty tuple instead of a `(None,)`. + # This is important for delivering the correct error message when an output is missing. + if result is None: + result_tuple = tuple() + elif not isinstance(result, tuple) or is_named_tuple_instance(result): + result_tuple = (result,) + else: + result_tuple = result + yield from result_tuple + else: + for position, output_def, element in _zip_and_iterate_op_result( + result, context, output_defs + ): + annotation = _get_annotation_for_output_position( + position, context.op_def, output_defs + ) + if output_def.is_dynamic: + if not isinstance(element, list): raise DagsterInvariantViolationError( f"Error with output for {context.describe_op()}: " - f"dynamic output '{output_def.name}' at position {position} expected a " - "list of DynamicOutput objects, but received an " - f"item with type {type(item)}." + f"dynamic output '{output_def.name}' expected a list of " + "DynamicOutput objects, but instead received instead an " + f"object of type {type(element)}." + ) + for item in element: + if not isinstance(item, DynamicOutput): + raise DagsterInvariantViolationError( + f"Error with output for {context.describe_op()}: dynamic output" + f" '{output_def.name}' at position {position} expected a list of" + " DynamicOutput objects, but received an item with type" + f" {type(item)}." + ) + dynamic_output = cast(DynamicOutput, item) + _check_output_object_name(dynamic_output, output_def, position) + + with disable_dagster_warnings(): + yield DynamicOutput( + output_name=output_def.name, + value=dynamic_output.value, + mapping_key=dynamic_output.mapping_key, + metadata=dynamic_output.metadata, + ) + elif isinstance(element, MaterializeResult): + yield element # coerced in to Output in outer iterator + elif isinstance(element, Output): + if annotation != inspect.Parameter.empty and not is_generic_output_annotation( + annotation + ): + raise DagsterInvariantViolationError( + f"Error with output for {context.describe_op()}: received Output object" + f" for output '{output_def.name}' which does not have an Output" + f" annotation. Annotation has type {annotation}." ) - dynamic_output = cast(DynamicOutput, item) - _check_output_object_name(dynamic_output, output_def, position) + _check_output_object_name(element, output_def, position) with disable_dagster_warnings(): - yield DynamicOutput( + yield Output( output_name=output_def.name, - value=dynamic_output.value, - mapping_key=dynamic_output.mapping_key, - metadata=dynamic_output.metadata, + value=element.value, + metadata=element.metadata, + data_version=element.data_version, ) - elif isinstance(element, MaterializeResult): - yield element # coerced in to Output in outer iterator - elif isinstance(element, Output): - if annotation != inspect.Parameter.empty and not is_generic_output_annotation( - annotation - ): - raise DagsterInvariantViolationError( - f"Error with output for {context.describe_op()}: received Output object for" - f" output '{output_def.name}' which does not have an Output annotation." - f" Annotation has type {annotation}." - ) - _check_output_object_name(element, output_def, position) - - with disable_dagster_warnings(): - yield Output( - output_name=output_def.name, - value=element.value, - metadata=element.metadata, - data_version=element.data_version, - ) - else: - # If annotation indicates a generic output annotation, and an - # output object was not received, throw an error. - if is_generic_output_annotation(annotation): - raise DagsterInvariantViolationError( - f"Error with output for {context.describe_op()}: output " - f"'{output_def.name}' has generic output annotation, " - "but did not receive an Output object for this output. " - f"Received instead an object of type {type(element)}." - ) - if result is None and output_def.is_required is False: - context.log.warning( - 'Value "None" returned for non-required output ' - f'"{output_def.name}" of {context.describe_op()}. ' - "This value will be passed to downstream " - f"{context.op_def.node_type_str}s. For conditional " - "execution, results must be yielded: " - "https://docs.dagster.io/concepts/ops-jobs-graphs/graphs#with-conditional-branching" - ) - yield Output(output_name=output_def.name, value=element) + else: + # If annotation indicates a generic output annotation, and an + # output object was not received, throw an error. + if is_generic_output_annotation(annotation): + raise DagsterInvariantViolationError( + f"Error with output for {context.describe_op()}: output " + f"'{output_def.name}' has generic output annotation, " + "but did not receive an Output object for this output. " + f"Received instead an object of type {type(element)}." + ) + if result is None and output_def.is_required is False: + context.log.warning( + 'Value "None" returned for non-required output ' + f'"{output_def.name}" of {context.describe_op()}. ' + "This value will be passed to downstream " + f"{context.op_def.node_type_str}s. For conditional " + "execution, results must be yielded: " + "https://docs.dagster.io/concepts/ops-jobs-graphs/graphs#with-conditional-branching" + ) + yield Output(output_name=output_def.name, value=element) diff --git a/python_modules/dagster/dagster_tests/execution_tests/test_explicit_mode.py b/python_modules/dagster/dagster_tests/execution_tests/test_explicit_mode.py new file mode 100644 index 0000000000000..10065c5797dc6 --- /dev/null +++ b/python_modules/dagster/dagster_tests/execution_tests/test_explicit_mode.py @@ -0,0 +1,48 @@ +from contextlib import contextmanager +from typing import Iterator + +import pytest +from dagster import OpExecutionContext, Out, asset, multi_asset, op +from dagster._core.definitions.asset_spec import AssetSpec +from dagster._core.definitions.events import Output +from dagster._core.definitions.materialize import materialize +from dagster._core.errors import DagsterInvariantViolationError +from dagster._utils.test import wrap_op_in_graph_and_execute + + +@contextmanager +def raises_missing_output_error() -> Iterator[None]: + with pytest.raises( + DagsterInvariantViolationError, match="did not yield or return expected outputs" + ): + yield + + +def test_explicit_mode_op(): + @op(out={"a": Out(int), "b": Out(int)}) + def explicit_mode_op(context: OpExecutionContext): + context.set_explicit_mode() + + with raises_missing_output_error(): + wrap_op_in_graph_and_execute(explicit_mode_op) + + +def test_explicit_mode_asset(): + @asset + def explicit_mode_asset(context: OpExecutionContext): + context.set_explicit_mode() + pass + + with raises_missing_output_error(): + materialize([explicit_mode_asset]) + + +def test_explicit_mode_multi_asset(): + @multi_asset(specs=[AssetSpec("foo"), AssetSpec("bar")]) + def explicit_mode_multi_asset(context: OpExecutionContext): + context.set_explicit_mode() + yield Output(None, output_name="foo") + pass + + with raises_missing_output_error(): + materialize([explicit_mode_multi_asset])