diff --git a/python_modules/dagster/dagster/_core/definitions/op_invocation.py b/python_modules/dagster/dagster/_core/definitions/op_invocation.py index e78c36cae9391..3d1508df73598 100644 --- a/python_modules/dagster/dagster/_core/definitions/op_invocation.py +++ b/python_modules/dagster/dagster/_core/definitions/op_invocation.py @@ -192,7 +192,7 @@ def direct_invocation_result( resources_by_param_name = extracted.resources_by_param_name config_input = extracted.config_arg - bound_context = (context or build_op_context()).bind( + with (context or build_op_context()).bind_and_scope( op_def=op_def, pending_invocation=pending_invocation, assets_def=assets_def, @@ -202,22 +202,21 @@ def direct_invocation_result( if isinstance(config_input, Config) else config_input ), - ) - - input_dict = _resolve_inputs(op_def, input_args, input_kwargs, bound_context) - - result = invoke_compute_fn( - fn=compute_fn.decorated_fn, - context=bound_context, - kwargs=input_dict, - context_arg_provided=compute_fn.has_context_arg(), - config_arg_cls=( - compute_fn.get_config_arg().annotation if compute_fn.has_config_arg() else None - ), - resource_args=resource_arg_mapping, - ) + ) as bound_context: + input_dict = _resolve_inputs(op_def, input_args, input_kwargs, bound_context) + + result = invoke_compute_fn( + fn=compute_fn.decorated_fn, + context=bound_context, + kwargs=input_dict, + context_arg_provided=compute_fn.has_context_arg(), + config_arg_cls=( + compute_fn.get_config_arg().annotation if compute_fn.has_config_arg() else None + ), + resource_args=resource_arg_mapping, + ) - return _type_check_output_wrapper(op_def, result, bound_context) + return _type_check_output_wrapper(op_def, result, bound_context) def _resolve_inputs( diff --git a/python_modules/dagster/dagster/_core/execution/context/invocation.py b/python_modules/dagster/dagster/_core/execution/context/invocation.py index c53d673e3200e..1c72e99bea668 100644 --- a/python_modules/dagster/dagster/_core/execution/context/invocation.py +++ b/python_modules/dagster/dagster/_core/execution/context/invocation.py @@ -1,14 +1,16 @@ -from contextlib import ExitStack +from contextlib import ExitStack, contextmanager from typing import ( AbstractSet, Any, Dict, + Iterator, List, Mapping, NamedTuple, Optional, Sequence, Set, + Tuple, Union, cast, ) @@ -254,51 +256,72 @@ def get_tag(self, key: str) -> str: def get_step_execution_context(self) -> StepExecutionContext: raise DagsterInvalidPropertyError(_property_msg("get_step_execution_context", "methods")) - def bind( + @contextmanager + def bind_and_scope( self, op_def: OpDefinition, pending_invocation: Optional[PendingNodeInvocation[OpDefinition]], assets_def: Optional[AssetsDefinition], config_from_args: Optional[Mapping[str, Any]], resources_from_args: Optional[Mapping[str, Any]], - ) -> "BoundOpExecutionContext": + ) -> Iterator["BoundOpExecutionContext"]: from dagster._core.definitions.resource_invocation import resolve_bound_config - if resources_from_args: - if self._resource_defs: - raise DagsterInvalidInvocationError( - "Cannot provide resources in both context and kwargs" - ) - resource_defs = wrap_resources_for_execution(resources_from_args) - # add new resources context to the stack to be cleared on exit - resources = self._exit_stack.enter_context( - build_resources(resource_defs, self.instance) + resources_on_assets_def = bool(assets_def and assets_def.resource_defs) + resources_on_context = not resources_on_assets_def and not resources_from_args + + if self.op_config and config_from_args: + raise DagsterInvalidInvocationError("Cannot provide config in both context and kwargs") + + op_config = resolve_bound_config(config_from_args or self.op_config, op_def) + + if resources_on_context: + yield self.create_bound_context( + op_def, + pending_invocation, + assets_def, + self.resources, + op_config, + self._resource_defs, ) - elif assets_def and assets_def.resource_defs: - for key in sorted(list(assets_def.resource_defs.keys())): - if key in self._resource_defs: + return + + def get_resource_defs_config() -> ( + Tuple[Mapping[str, ResourceDefinition], Optional[Mapping[str, Any]]] + ): + if resources_from_args: + if self._resource_defs: raise DagsterInvalidInvocationError( - f"Error when invoking {assets_def!s} resource '{key}' " - "provided on both the definition and invocation context. Please " - "provide on only one or the other." + "Cannot provide resources in both context and kwargs" ) - resource_defs = wrap_resources_for_execution( - {**self._resource_defs, **assets_def.resource_defs} - ) - # add new resources context to the stack to be cleared on exit - resources = self._exit_stack.enter_context( - build_resources(resource_defs, self.instance, self._resources_config) + return wrap_resources_for_execution(resources_from_args), None + else: + check.invariant(resources_on_assets_def) + assert assets_def # for typing + for key in sorted(list(assets_def.resource_defs.keys())): + if key in self._resource_defs: + raise DagsterInvalidInvocationError( + f"Error when invoking {assets_def!s} resource '{key}' " + "provided on both the definition and invocation context. Please " + "provide on only one or the other." + ) + resources = wrap_resources_for_execution( + {**self._resource_defs, **assets_def.resource_defs} + ) + return resources, self._resources_config + + resource_defs, resources_config = get_resource_defs_config() + + with build_resources(resource_defs, self.instance, resources_config) as resources: + yield self.create_bound_context( + op_def, pending_invocation, assets_def, resources, op_config, resource_defs ) - else: - resources = self.resources - resource_defs = self._resource_defs + def create_bound_context( + self, op_def, pending_invocation, assets_def, resources, op_config, resource_defs + ) -> "BoundOpExecutionContext": _validate_resource_requirements(resource_defs, op_def) - if self.op_config and config_from_args: - raise DagsterInvalidInvocationError("Cannot provide config in both context and kwargs") - op_config = resolve_bound_config(config_from_args or self.op_config, op_def) - return BoundOpExecutionContext( op_def=op_def, op_config=op_config,