Skip to content

Commit

Permalink
Add require_typed_event_stream to compute contexts (#16706)
Browse files Browse the repository at this point in the history
## Summary & Motivation

Per conversation with @schrockn and @rexledesma, add a
`require_typed_event_stream` switch on `OpExecutionContext`. This is off
by default and has to be explicitly turned on, which we do upstack in
`ext_protocol`. The switch has two effects:

- Skips special validation done on returned values from ops, instead
wrapping the returned result without alteration in a generator and
treating them as if they had been yielded
- Causes an error to be thrown when an expected result is missing.

The implementation here is moderately greasy and should be considered
temporary. A better solution will take time since the code paths
governing op results are complex.

## How I Tested These Changes

New unit tests.
  • Loading branch information
smackesey authored Sep 22, 2023
1 parent 3f068a1 commit 547a624
Show file tree
Hide file tree
Showing 8 changed files with 328 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -144,10 +144,14 @@ def to_asset_check_evaluation(
severity=self.severity,
)

def get_spec_python_identifier(self, asset_key: Optional[AssetKey]) -> str:
def get_spec_python_identifier(
self, *, asset_key: Optional[AssetKey] = None, check_name: Optional[str] = None
) -> str:
"""Returns a string uniquely identifying the asset check spec associated with this result.
This is used for the output name associated with an `AssetCheckResult`.
"""
asset_key = asset_key or self.asset_key
check_name = check_name or self.check_name
assert asset_key is not None, "Asset key must be provided if not set on spec"
assert asset_key is not None, "Asset key must be provided if not set on spec"
return f"{asset_key.to_python_identifier()}_{self.check_name}"
14 changes: 13 additions & 1 deletion python_modules/dagster/dagster/_core/definitions/asset_layer.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,10 @@
)
from dagster._core.selector.subset_selector import AssetSelectionData

from ..errors import DagsterInvalidSubsetError
from ..errors import (
DagsterInvalidSubsetError,
DagsterInvariantViolationError,
)
from .config import ConfigMapping
from .dependency import NodeHandle, NodeInputHandle, NodeOutput, NodeOutputHandle
from .events import AssetKey
Expand Down Expand Up @@ -606,6 +609,15 @@ def node_output_handle_for_asset(self, asset_key: AssetKey) -> NodeOutputHandle:
def assets_def_for_node(self, node_handle: NodeHandle) -> Optional["AssetsDefinition"]:
return self.assets_defs_by_node_handle.get(node_handle)

def asset_key_for_node(self, node_handle: NodeHandle) -> AssetKey:
assets_def = self.assets_def_for_node(node_handle)
if not assets_def or len(assets_def.keys_by_output_name.keys()) > 1:
raise DagsterInvariantViolationError(
"Cannot call `asset_key_for_node` in a multi_asset with more than one asset."
" Multiple asset keys defined."
)
return next(iter(assets_def.keys_by_output_name.values()))

def asset_check_specs_for_node(self, node_handle: NodeHandle) -> Sequence[AssetCheckSpec]:
assets_def_for_node = self.assets_def_for_node(node_handle)
checks_def_for_node = self.asset_checks_def_for_node(node_handle)
Expand Down
13 changes: 13 additions & 0 deletions python_modules/dagster/dagster/_core/execution/context/compute.py
Original file line number Diff line number Diff line change
Expand Up @@ -1222,6 +1222,19 @@ def asset_check_spec(self) -> AssetCheckSpec:
)
return asset_checks_def.spec

# In this mode no conversion is done on returned values and missing but expected outputs are not
# allowed.
@property
def requires_typed_event_stream(self) -> bool:
return self._step_execution_context.requires_typed_event_stream

@property
def typed_event_stream_error_message(self) -> Optional[str]:
return self._step_execution_context.typed_event_stream_error_message

def set_requires_typed_event_stream(self, *, error_message: Optional[str] = None) -> None:
self._step_execution_context.set_requires_typed_event_stream(error_message=error_message)


# actually forking the object type for assets is tricky for users in the cases of:
# * manually constructing ops to make AssetsDefinitions
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -447,6 +447,8 @@ def __init__(
self._partition_key = partition_key
self._partition_key_range = partition_key_range
self._assets_def = assets_def
self._requires_typed_event_stream = False
self._typed_event_stream_error_message = None

@property
def op_config(self) -> Any:
Expand Down Expand Up @@ -714,6 +716,20 @@ def add_metadata_two_outputs(context) -> Tuple[str, int]:
else:
self._output_metadata[output_name] = metadata

# In this mode no conversion is done on returned values and missing but expected outputs are not
# allowed.
@property
def requires_typed_event_stream(self) -> bool:
return self._requires_typed_event_stream

@property
def typed_event_stream_error_message(self) -> Optional[str]:
return self._typed_event_stream_error_message

def set_requires_typed_event_stream(self, *, error_message: Optional[str]) -> None:
self._requires_typed_event_stream = True
self._typed_event_stream_error_message = error_message


def build_op_context(
resources: Optional[Mapping[str, Any]] = None,
Expand Down
18 changes: 18 additions & 0 deletions python_modules/dagster/dagster/_core/execution/context/system.py
Original file line number Diff line number Diff line change
Expand Up @@ -575,6 +575,24 @@ def __init__(
self._is_external_input_asset_version_info_loaded = False
self._data_version_cache: Dict[AssetKey, "DataVersion"] = {}

self._requires_typed_event_stream = False
self._typed_event_stream_error_message = None

# In this mode no conversion is done on returned values and missing but expected outputs are not
# allowed.
@property
def requires_typed_event_stream(self) -> bool:
return self._requires_typed_event_stream

@property
def typed_event_stream_error_message(self) -> Optional[str]:
return self._typed_event_stream_error_message

# Error message will be appended to the default error message.
def set_requires_typed_event_stream(self, *, error_message: Optional[str] = None):
self._requires_typed_event_stream = True
self._typed_event_stream_error_message = error_message

@property
def step(self) -> ExecutionStep:
return self._step
Expand Down
46 changes: 42 additions & 4 deletions python_modules/dagster/dagster/_core/execution/plan/compute.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
NodeHandle,
Output,
)
from dagster._core.definitions.asset_check_spec import AssetCheckHandle
from dagster._core.definitions.asset_layer import AssetLayer
from dagster._core.definitions.op_definition import OpComputeFunction
from dagster._core.definitions.result import MaterializeResult
Expand Down Expand Up @@ -204,13 +205,50 @@ def execute_core_compute(
yield step_output
if isinstance(step_output, (DynamicOutput, Output)):
emitted_result_names.add(step_output.output_name)
elif isinstance(step_output, MaterializeResult):
asset_key = (
step_output.asset_key
or step_context.job_def.asset_layer.asset_key_for_node(step_context.node_handle)
)
emitted_result_names.add(
step_context.job_def.asset_layer.node_output_handle_for_asset(asset_key).output_name
)
# Check results embedded in MaterializeResult are counted
for check_result in step_output.check_results or []:
handle = check_result.to_asset_check_evaluation(step_context).asset_check_handle
output_name = step_context.job_def.asset_layer.get_output_name_for_asset_check(
handle
)
emitted_result_names.add(output_name)
elif isinstance(step_output, AssetCheckEvaluation):
output_name = step_context.job_def.asset_layer.get_output_name_for_asset_check(
step_output.asset_check_handle
)
emitted_result_names.add(output_name)
elif isinstance(step_output, AssetCheckResult):
if step_output.asset_key and step_output.check_name:
handle = AssetCheckHandle(step_output.asset_key, step_output.check_name)
else:
handle = step_output.to_asset_check_evaluation(step_context).asset_check_handle
output_name = step_context.job_def.asset_layer.get_output_name_for_asset_check(handle)
emitted_result_names.add(output_name)

expected_op_output_names = {
output.name for output in step.step_outputs if not output.properties.asset_check_handle
output.name
for output in step.step_outputs
# checks are required if we're in requires_typed_event_stream mode
if step_context.requires_typed_event_stream or output.properties.asset_check_handle
}
omitted_outputs = expected_op_output_names.difference(emitted_result_names)
if omitted_outputs:
step_context.log.info(
f"{step_context.op_def.node_type_str} '{step.node_handle}' did not fire "
f"outputs {omitted_outputs!r}"
message = (
f"{step_context.op_def.node_type_str} '{step.node_handle}' did not yield or return "
f"expected outputs {omitted_outputs!r}."
)

if step_context.requires_typed_event_stream:
if step_context.typed_event_stream_error_message:
message += " " + step_context.typed_event_stream_error_message
raise DagsterInvariantViolationError(message)
else:
step_context.log.info(message)
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ def _filter_expected_output_defs(
)
materialize_results = [x for x in result_tuple if isinstance(x, MaterializeResult)]
remove_outputs = [
r.get_spec_python_identifier(x.asset_key or context.asset_key)
r.get_spec_python_identifier(asset_key=x.asset_key or context.asset_key)
for x in materialize_results
for r in x.check_results or []
]
Expand Down Expand Up @@ -267,6 +267,20 @@ def validate_and_coerce_op_result_to_iterator(
f" {type(result)}. {context.op_def.node_type_str.capitalize()} is explicitly defined to"
" return no results."
)
# `requires_typed_event_stream` is a mode where we require users to return/yield exactly the
# results that will be registered in the instance, without additional fancy inference (like
# wrapping `None` in an `Output`). We therefore skip any return-specific validation for this
# mode and treat returned values as if they were yielded.
elif output_defs and context.requires_typed_event_stream:
# If nothing was returned, treat it as an empty tuple instead of a `(None,)`.
# This is important for delivering the correct error message when an output is missing.
if result is None:
result_tuple = tuple()
elif not isinstance(result, tuple) or is_named_tuple_instance(result):
result_tuple = (result,)
else:
result_tuple = result
yield from result_tuple
elif output_defs:
for position, output_def, element in _zip_and_iterate_op_result(
result, context, output_defs
Expand Down
Loading

0 comments on commit 547a624

Please sign in to comment.