Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor UnboundOpExecutionContext.bind to use contextmanager #15990

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 15 additions & 16 deletions python_modules/dagster/dagster/_core/definitions/op_invocation.py
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ def direct_invocation_result(
resources_by_param_name = extracted.resources_by_param_name
config_input = extracted.config_arg

bound_context = (context or build_op_context()).bind(
with (context or build_op_context()).bind_and_scope(
op_def=op_def,
pending_invocation=pending_invocation,
assets_def=assets_def,
Expand All @@ -202,22 +202,21 @@ def direct_invocation_result(
if isinstance(config_input, Config)
else config_input
),
)

input_dict = _resolve_inputs(op_def, input_args, input_kwargs, bound_context)

result = invoke_compute_fn(
fn=compute_fn.decorated_fn,
context=bound_context,
kwargs=input_dict,
context_arg_provided=compute_fn.has_context_arg(),
config_arg_cls=(
compute_fn.get_config_arg().annotation if compute_fn.has_config_arg() else None
),
resource_args=resource_arg_mapping,
)
) as bound_context:
input_dict = _resolve_inputs(op_def, input_args, input_kwargs, bound_context)

result = invoke_compute_fn(
fn=compute_fn.decorated_fn,
context=bound_context,
kwargs=input_dict,
context_arg_provided=compute_fn.has_context_arg(),
config_arg_cls=(
compute_fn.get_config_arg().annotation if compute_fn.has_config_arg() else None
),
resource_args=resource_arg_mapping,
)

return _type_check_output_wrapper(op_def, result, bound_context)
return _type_check_output_wrapper(op_def, result, bound_context)


def _resolve_inputs(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
from contextlib import ExitStack
from contextlib import ExitStack, contextmanager
from typing import (
AbstractSet,
Any,
Dict,
Iterator,
List,
Mapping,
NamedTuple,
Optional,
Sequence,
Set,
Tuple,
Union,
cast,
)
Expand Down Expand Up @@ -254,51 +256,72 @@ def get_tag(self, key: str) -> str:
def get_step_execution_context(self) -> StepExecutionContext:
raise DagsterInvalidPropertyError(_property_msg("get_step_execution_context", "methods"))

def bind(
@contextmanager
def bind_and_scope(
self,
op_def: OpDefinition,
pending_invocation: Optional[PendingNodeInvocation[OpDefinition]],
assets_def: Optional[AssetsDefinition],
config_from_args: Optional[Mapping[str, Any]],
resources_from_args: Optional[Mapping[str, Any]],
) -> "BoundOpExecutionContext":
) -> Iterator["BoundOpExecutionContext"]:
from dagster._core.definitions.resource_invocation import resolve_bound_config

if resources_from_args:
if self._resource_defs:
raise DagsterInvalidInvocationError(
"Cannot provide resources in both context and kwargs"
)
resource_defs = wrap_resources_for_execution(resources_from_args)
# add new resources context to the stack to be cleared on exit
resources = self._exit_stack.enter_context(
build_resources(resource_defs, self.instance)
resources_on_assets_def = bool(assets_def and assets_def.resource_defs)
resources_on_context = not resources_on_assets_def and not resources_from_args

if self.op_config and config_from_args:
raise DagsterInvalidInvocationError("Cannot provide config in both context and kwargs")

op_config = resolve_bound_config(config_from_args or self.op_config, op_def)

if resources_on_context:
yield self.create_bound_context(
op_def,
pending_invocation,
assets_def,
self.resources,
op_config,
self._resource_defs,
)
elif assets_def and assets_def.resource_defs:
for key in sorted(list(assets_def.resource_defs.keys())):
if key in self._resource_defs:
return

def get_resource_defs_config() -> (
Tuple[Mapping[str, ResourceDefinition], Optional[Mapping[str, Any]]]
):
if resources_from_args:
if self._resource_defs:
raise DagsterInvalidInvocationError(
f"Error when invoking {assets_def!s} resource '{key}' "
"provided on both the definition and invocation context. Please "
"provide on only one or the other."
"Cannot provide resources in both context and kwargs"
)
resource_defs = wrap_resources_for_execution(
{**self._resource_defs, **assets_def.resource_defs}
)
# add new resources context to the stack to be cleared on exit
resources = self._exit_stack.enter_context(
build_resources(resource_defs, self.instance, self._resources_config)
return wrap_resources_for_execution(resources_from_args), None
else:
check.invariant(resources_on_assets_def)
assert assets_def # for typing
for key in sorted(list(assets_def.resource_defs.keys())):
if key in self._resource_defs:
raise DagsterInvalidInvocationError(
f"Error when invoking {assets_def!s} resource '{key}' "
"provided on both the definition and invocation context. Please "
"provide on only one or the other."
)
resources = wrap_resources_for_execution(
{**self._resource_defs, **assets_def.resource_defs}
)
return resources, self._resources_config

resource_defs, resources_config = get_resource_defs_config()

with build_resources(resource_defs, self.instance, resources_config) as resources:
yield self.create_bound_context(
op_def, pending_invocation, assets_def, resources, op_config, resource_defs
)
else:
resources = self.resources
resource_defs = self._resource_defs

def create_bound_context(
self, op_def, pending_invocation, assets_def, resources, op_config, resource_defs
) -> "BoundOpExecutionContext":
_validate_resource_requirements(resource_defs, op_def)

if self.op_config and config_from_args:
raise DagsterInvalidInvocationError("Cannot provide config in both context and kwargs")
op_config = resolve_bound_config(config_from_args or self.op_config, op_def)

return BoundOpExecutionContext(
op_def=op_def,
op_config=op_config,
Expand Down