Skip to content

Commit

Permalink
Add explicit_mode to compute contexts
Browse files Browse the repository at this point in the history
  • Loading branch information
smackesey committed Sep 21, 2023
1 parent c402db0 commit eafa0ea
Show file tree
Hide file tree
Showing 7 changed files with 199 additions and 69 deletions.
14 changes: 13 additions & 1 deletion python_modules/dagster/dagster/_core/definitions/asset_layer.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,10 @@
)
from dagster._core.selector.subset_selector import AssetSelectionData

from ..errors import DagsterInvalidSubsetError
from ..errors import (
DagsterInvalidSubsetError,
DagsterInvariantViolationError,
)
from .config import ConfigMapping
from .dependency import NodeHandle, NodeInputHandle, NodeOutput, NodeOutputHandle
from .events import AssetKey
Expand Down Expand Up @@ -606,6 +609,15 @@ def node_output_handle_for_asset(self, asset_key: AssetKey) -> NodeOutputHandle:
def assets_def_for_node(self, node_handle: NodeHandle) -> Optional["AssetsDefinition"]:
return self.assets_defs_by_node_handle.get(node_handle)

def asset_key_for_node(self, node_handle: NodeHandle) -> AssetKey:
assets_def = self.assets_def_for_node(node_handle)
if not assets_def or len(assets_def.keys_by_output_name.keys()) > 1:
raise DagsterInvariantViolationError(
"Cannot call `asset_key_for_node` in a multi_asset with more than one asset."
" Multiple asset keys defined."
)
return next(iter(assets_def.keys_by_output_name.values()))

def asset_check_specs_for_node(self, node_handle: NodeHandle) -> Sequence[AssetCheckSpec]:
assets_def_for_node = self.assets_def_for_node(node_handle)
checks_def_for_node = self.asset_checks_def_for_node(node_handle)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1222,6 +1222,14 @@ def asset_check_spec(self) -> AssetCheckSpec:
)
return asset_checks_def.spec

# In explicit mode, no conversion is done on returned values and missing but expected outputs are not
# allowed.
def is_explicit_mode(self) -> bool:
return self._step_execution_context.is_explicit_mode()

def set_explicit_mode(self):
self._step_execution_context.set_explicit_mode()


# actually forking the object type for assets is tricky for users in the cases of:
# * manually constructing ops to make AssetsDefinitions
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -447,6 +447,7 @@ def __init__(
self._partition_key = partition_key
self._partition_key_range = partition_key_range
self._assets_def = assets_def
self._is_explicit_mode = False

@property
def op_config(self) -> Any:
Expand Down Expand Up @@ -714,6 +715,14 @@ def add_metadata_two_outputs(context) -> Tuple[str, int]:
else:
self._output_metadata[output_name] = metadata

# In explicit mode, no conversion is done on returned values and missing but expected outputs are not
# allowed.
def is_explicit_mode(self) -> bool:
return self._is_explicit_mode

def set_explicit_mode(self):
self._is_explicit_mode = True


def build_op_context(
resources: Optional[Mapping[str, Any]] = None,
Expand Down
10 changes: 10 additions & 0 deletions python_modules/dagster/dagster/_core/execution/context/system.py
Original file line number Diff line number Diff line change
Expand Up @@ -559,6 +559,16 @@ def __init__(
self._is_external_input_asset_version_info_loaded = False
self._data_version_cache: Dict[AssetKey, "DataVersion"] = {}

self._is_explicit_mode = False

# In explicit mode, no conversion is done on returned values and missing but expected outputs are not
# allowed.
def is_explicit_mode(self) -> bool:
return self._is_explicit_mode

def set_explicit_mode(self) -> None:
self._is_explicit_mode = True

@property
def step(self) -> ExecutionStep:
return self._step
Expand Down
39 changes: 34 additions & 5 deletions python_modules/dagster/dagster/_core/execution/plan/compute.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
NodeHandle,
Output,
)
from dagster._core.definitions.asset_check_spec import AssetCheckHandle
from dagster._core.definitions.asset_layer import AssetLayer
from dagster._core.definitions.op_definition import OpComputeFunction
from dagster._core.definitions.result import MaterializeResult
Expand Down Expand Up @@ -203,14 +204,42 @@ def execute_core_compute(
for step_output in _yield_compute_results(step_context, inputs, compute_fn):
yield step_output
if isinstance(step_output, (DynamicOutput, Output)):
emitted_result_names.add(step_output.output_name)
output_name = step_output.output_name
elif isinstance(step_output, MaterializeResult):
asset_key = (
step_output.asset_key
or step_context.job_def.asset_layer.asset_key_for_node(step_context.node_handle)
)
output_name = step_context.job_def.asset_layer.node_output_handle_for_asset(
asset_key
).output_name
elif isinstance(step_output, AssetCheckEvaluation):
output_name = step_context.job_def.asset_layer.get_output_name_for_asset_check(
step_output.asset_check_handle
)
elif isinstance(step_output, AssetCheckResult):
if step_output.asset_key and step_output.check_name:
handle = AssetCheckHandle(step_output.asset_key, step_output.check_name)
else:
handle = step_output.to_asset_check_evaluation(step_context).asset_check_handle
output_name = step_context.job_def.asset_layer.get_output_name_for_asset_check(handle)
else:
output_name = None
if output_name:
emitted_result_names.add(output_name)

expected_op_output_names = {
output.name for output in step.step_outputs if not output.properties.asset_check_handle
}
omitted_outputs = expected_op_output_names.difference(emitted_result_names)
if omitted_outputs:
step_context.log.info(
f"{step_context.op_def.node_type_str} '{step.node_handle}' did not fire "
f"outputs {omitted_outputs!r}"
)
if step_context.is_explicit_mode():
raise DagsterInvariantViolationError(
f"{step_context.op_def.node_type_str} '{step.node_handle}' did not yield or return "
f"expected outputs {omitted_outputs!r}."
)
else:
step_context.log.info(
f"{step_context.op_def.node_type_str} '{step.node_handle}' did not fire "
f"outputs {omitted_outputs!r}"
)
Original file line number Diff line number Diff line change
Expand Up @@ -268,73 +268,87 @@ def validate_and_coerce_op_result_to_iterator(
" return no results."
)
elif output_defs:
for position, output_def, element in _zip_and_iterate_op_result(
result, context, output_defs
):
annotation = _get_annotation_for_output_position(position, context.op_def, output_defs)
if output_def.is_dynamic:
if not isinstance(element, list):
raise DagsterInvariantViolationError(
f"Error with output for {context.describe_op()}: "
f"dynamic output '{output_def.name}' expected a list of "
"DynamicOutput objects, but instead received instead an "
f"object of type {type(element)}."
)
for item in element:
if not isinstance(item, DynamicOutput):
# Skip any return-specific validation and treat it like a generator op
if context.is_explicit_mode():
# If nothing was returned, treat it as an empty tuple instead of a `(None,)`.
# This is important for delivering the correct error message when an output is missing.
if result is None:
result_tuple = tuple()
elif not isinstance(result, tuple) or is_named_tuple_instance(result):
result_tuple = (result,)
else:
result_tuple = result
yield from result_tuple
else:
for position, output_def, element in _zip_and_iterate_op_result(
result, context, output_defs
):
annotation = _get_annotation_for_output_position(
position, context.op_def, output_defs
)
if output_def.is_dynamic:
if not isinstance(element, list):
raise DagsterInvariantViolationError(
f"Error with output for {context.describe_op()}: "
f"dynamic output '{output_def.name}' at position {position} expected a "
"list of DynamicOutput objects, but received an "
f"item with type {type(item)}."
f"dynamic output '{output_def.name}' expected a list of "
"DynamicOutput objects, but instead received instead an "
f"object of type {type(element)}."
)
for item in element:
if not isinstance(item, DynamicOutput):
raise DagsterInvariantViolationError(
f"Error with output for {context.describe_op()}: dynamic output"
f" '{output_def.name}' at position {position} expected a list of"
" DynamicOutput objects, but received an item with type"
f" {type(item)}."
)
dynamic_output = cast(DynamicOutput, item)
_check_output_object_name(dynamic_output, output_def, position)

with disable_dagster_warnings():
yield DynamicOutput(
output_name=output_def.name,
value=dynamic_output.value,
mapping_key=dynamic_output.mapping_key,
metadata=dynamic_output.metadata,
)
elif isinstance(element, MaterializeResult):
yield element # coerced in to Output in outer iterator
elif isinstance(element, Output):
if annotation != inspect.Parameter.empty and not is_generic_output_annotation(
annotation
):
raise DagsterInvariantViolationError(
f"Error with output for {context.describe_op()}: received Output object"
f" for output '{output_def.name}' which does not have an Output"
f" annotation. Annotation has type {annotation}."
)
dynamic_output = cast(DynamicOutput, item)
_check_output_object_name(dynamic_output, output_def, position)
_check_output_object_name(element, output_def, position)

with disable_dagster_warnings():
yield DynamicOutput(
yield Output(
output_name=output_def.name,
value=dynamic_output.value,
mapping_key=dynamic_output.mapping_key,
metadata=dynamic_output.metadata,
value=element.value,
metadata=element.metadata,
data_version=element.data_version,
)
elif isinstance(element, MaterializeResult):
yield element # coerced in to Output in outer iterator
elif isinstance(element, Output):
if annotation != inspect.Parameter.empty and not is_generic_output_annotation(
annotation
):
raise DagsterInvariantViolationError(
f"Error with output for {context.describe_op()}: received Output object for"
f" output '{output_def.name}' which does not have an Output annotation."
f" Annotation has type {annotation}."
)
_check_output_object_name(element, output_def, position)

with disable_dagster_warnings():
yield Output(
output_name=output_def.name,
value=element.value,
metadata=element.metadata,
data_version=element.data_version,
)
else:
# If annotation indicates a generic output annotation, and an
# output object was not received, throw an error.
if is_generic_output_annotation(annotation):
raise DagsterInvariantViolationError(
f"Error with output for {context.describe_op()}: output "
f"'{output_def.name}' has generic output annotation, "
"but did not receive an Output object for this output. "
f"Received instead an object of type {type(element)}."
)
if result is None and output_def.is_required is False:
context.log.warning(
'Value "None" returned for non-required output '
f'"{output_def.name}" of {context.describe_op()}. '
"This value will be passed to downstream "
f"{context.op_def.node_type_str}s. For conditional "
"execution, results must be yielded: "
"https://docs.dagster.io/concepts/ops-jobs-graphs/graphs#with-conditional-branching"
)
yield Output(output_name=output_def.name, value=element)
else:
# If annotation indicates a generic output annotation, and an
# output object was not received, throw an error.
if is_generic_output_annotation(annotation):
raise DagsterInvariantViolationError(
f"Error with output for {context.describe_op()}: output "
f"'{output_def.name}' has generic output annotation, "
"but did not receive an Output object for this output. "
f"Received instead an object of type {type(element)}."
)
if result is None and output_def.is_required is False:
context.log.warning(
'Value "None" returned for non-required output '
f'"{output_def.name}" of {context.describe_op()}. '
"This value will be passed to downstream "
f"{context.op_def.node_type_str}s. For conditional "
"execution, results must be yielded: "
"https://docs.dagster.io/concepts/ops-jobs-graphs/graphs#with-conditional-branching"
)
yield Output(output_name=output_def.name, value=element)
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
from contextlib import contextmanager
from typing import Iterator

import pytest
from dagster import OpExecutionContext, Out, asset, multi_asset, op
from dagster._core.definitions.asset_spec import AssetSpec
from dagster._core.definitions.events import Output
from dagster._core.definitions.materialize import materialize
from dagster._core.errors import DagsterInvariantViolationError
from dagster._utils.test import wrap_op_in_graph_and_execute


@contextmanager
def raises_missing_output_error() -> Iterator[None]:
with pytest.raises(
DagsterInvariantViolationError, match="did not yield or return expected outputs"
):
yield


def test_explicit_mode_op():
@op(out={"a": Out(int), "b": Out(int)})
def explicit_mode_op(context: OpExecutionContext):
context.set_explicit_mode()

with raises_missing_output_error():
wrap_op_in_graph_and_execute(explicit_mode_op)


def test_explicit_mode_asset():
@asset
def explicit_mode_asset(context: OpExecutionContext):
context.set_explicit_mode()
pass

with raises_missing_output_error():
materialize([explicit_mode_asset])


def test_explicit_mode_multi_asset():
@multi_asset(specs=[AssetSpec("foo"), AssetSpec("bar")])
def explicit_mode_multi_asset(context: OpExecutionContext):
context.set_explicit_mode()
yield Output(None, output_name="foo")
pass

with raises_missing_output_error():
materialize([explicit_mode_multi_asset])

0 comments on commit eafa0ea

Please sign in to comment.