diff --git a/python_modules/dagster/dagster/_core/execution/context/compute.py b/python_modules/dagster/dagster/_core/execution/context/compute.py index d4de7b644909e..cc411f8b834f6 100644 --- a/python_modules/dagster/dagster/_core/execution/context/compute.py +++ b/python_modules/dagster/dagster/_core/execution/context/compute.py @@ -159,6 +159,9 @@ def __init__(self, step_execution_context: StepExecutionContext): self._events: List[DagsterEvent] = [] self._output_metadata: Dict[str, Any] = {} + self._requires_typed_event_stream = False + self._typed_event_stream_error_message = None + @public @property def op_config(self) -> Any: @@ -1337,14 +1340,16 @@ def asset_check_spec(self) -> AssetCheckSpec: # allowed. @property def requires_typed_event_stream(self) -> bool: - return self._step_execution_context.requires_typed_event_stream + return self._requires_typed_event_stream @property def typed_event_stream_error_message(self) -> Optional[str]: - return self._step_execution_context.typed_event_stream_error_message + return self._typed_event_stream_error_message - def set_requires_typed_event_stream(self, *, error_message: Optional[str] = None) -> None: - self._step_execution_context.set_requires_typed_event_stream(error_message=error_message) + # Error message will be appended to the default error message. + def set_requires_typed_event_stream(self, *, error_message: Optional[str] = None): + self._requires_typed_event_stream = True + self._typed_event_stream_error_message = error_message @staticmethod def get() -> "OpExecutionContext": diff --git a/python_modules/dagster/dagster/_core/execution/context/system.py b/python_modules/dagster/dagster/_core/execution/context/system.py index 60235f365eeb2..dc2db13c74409 100644 --- a/python_modules/dagster/dagster/_core/execution/context/system.py +++ b/python_modules/dagster/dagster/_core/execution/context/system.py @@ -575,24 +575,6 @@ def __init__( self._is_external_input_asset_version_info_loaded = False self._data_version_cache: Dict[AssetKey, "DataVersion"] = {} - self._requires_typed_event_stream = False - self._typed_event_stream_error_message = None - - # In this mode no conversion is done on returned values and missing but expected outputs are not - # allowed. - @property - def requires_typed_event_stream(self) -> bool: - return self._requires_typed_event_stream - - @property - def typed_event_stream_error_message(self) -> Optional[str]: - return self._typed_event_stream_error_message - - # Error message will be appended to the default error message. - def set_requires_typed_event_stream(self, *, error_message: Optional[str] = None): - self._requires_typed_event_stream = True - self._typed_event_stream_error_message = error_message - @property def step(self) -> ExecutionStep: return self._step diff --git a/python_modules/dagster/dagster/_core/execution/plan/compute.py b/python_modules/dagster/dagster/_core/execution/plan/compute.py index 2d67fb38bab2e..3a55902de60fe 100644 --- a/python_modules/dagster/dagster/_core/execution/plan/compute.py +++ b/python_modules/dagster/dagster/_core/execution/plan/compute.py @@ -245,7 +245,7 @@ def execute_core_compute( output.name for output in step.step_outputs # checks are required if we're in requires_typed_event_stream mode - if step_context.requires_typed_event_stream or output.properties.asset_check_key + if compute_context.requires_typed_event_stream or output.properties.asset_check_key } omitted_outputs = expected_op_output_names.difference(emitted_result_names) if omitted_outputs: @@ -254,9 +254,9 @@ def execute_core_compute( f"expected outputs {omitted_outputs!r}." ) - if step_context.requires_typed_event_stream: - if step_context.typed_event_stream_error_message: - message += " " + step_context.typed_event_stream_error_message + if compute_context.requires_typed_event_stream: + if compute_context.typed_event_stream_error_message: + message += " " + compute_context.typed_event_stream_error_message raise DagsterInvariantViolationError(message) else: step_context.log.info(message)