Skip to content

Commit

Permalink
Refactor UnboundOpExecutionContext.bind to use contextmanager
Browse files Browse the repository at this point in the history
  • Loading branch information
schrockn committed Aug 21, 2023
1 parent da23cb0 commit 5101150
Show file tree
Hide file tree
Showing 2 changed files with 69 additions and 47 deletions.
31 changes: 15 additions & 16 deletions python_modules/dagster/dagster/_core/definitions/op_invocation.py
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ def direct_invocation_result(
resources_by_param_name = extracted.resources_by_param_name
config_input = extracted.config_arg

bound_context = (context or build_op_context()).bind(
with (context or build_op_context()).bind_and_scope(
op_def=op_def,
pending_invocation=pending_invocation,
assets_def=assets_def,
Expand All @@ -202,22 +202,21 @@ def direct_invocation_result(
if isinstance(config_input, Config)
else config_input
),
)

input_dict = _resolve_inputs(op_def, input_args, input_kwargs, bound_context)

result = invoke_compute_fn(
fn=compute_fn.decorated_fn,
context=bound_context,
kwargs=input_dict,
context_arg_provided=compute_fn.has_context_arg(),
config_arg_cls=(
compute_fn.get_config_arg().annotation if compute_fn.has_config_arg() else None
),
resource_args=resource_arg_mapping,
)
) as bound_context:
input_dict = _resolve_inputs(op_def, input_args, input_kwargs, bound_context)

result = invoke_compute_fn(
fn=compute_fn.decorated_fn,
context=bound_context,
kwargs=input_dict,
context_arg_provided=compute_fn.has_context_arg(),
config_arg_cls=(
compute_fn.get_config_arg().annotation if compute_fn.has_config_arg() else None
),
resource_args=resource_arg_mapping,
)

return _type_check_output_wrapper(op_def, result, bound_context)
return _type_check_output_wrapper(op_def, result, bound_context)


def _resolve_inputs(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
from contextlib import ExitStack
from contextlib import ExitStack, contextmanager
from typing import (
AbstractSet,
Any,
Dict,
Iterator,
List,
Mapping,
NamedTuple,
Optional,
Sequence,
Set,
Tuple,
Union,
cast,
)
Expand Down Expand Up @@ -254,51 +256,72 @@ def get_tag(self, key: str) -> str:
def get_step_execution_context(self) -> StepExecutionContext:
raise DagsterInvalidPropertyError(_property_msg("get_step_execution_context", "methods"))

def bind(
@contextmanager
def bind_and_scope(
self,
op_def: OpDefinition,
pending_invocation: Optional[PendingNodeInvocation[OpDefinition]],
assets_def: Optional[AssetsDefinition],
config_from_args: Optional[Mapping[str, Any]],
resources_from_args: Optional[Mapping[str, Any]],
) -> "BoundOpExecutionContext":
) -> Iterator["BoundOpExecutionContext"]:
from dagster._core.definitions.resource_invocation import resolve_bound_config

if resources_from_args:
if self._resource_defs:
raise DagsterInvalidInvocationError(
"Cannot provide resources in both context and kwargs"
)
resource_defs = wrap_resources_for_execution(resources_from_args)
# add new resources context to the stack to be cleared on exit
resources = self._exit_stack.enter_context(
build_resources(resource_defs, self.instance)
resources_on_assets_def = bool(assets_def and assets_def.resource_defs)
resources_on_context = not resources_on_assets_def and not resources_from_args

if self.op_config and config_from_args:
raise DagsterInvalidInvocationError("Cannot provide config in both context and kwargs")

op_config = resolve_bound_config(config_from_args or self.op_config, op_def)

if resources_on_context:
yield self.create_bound_context(
op_def,
pending_invocation,
assets_def,
self._resources,
op_config,
self._resource_defs,
)
elif assets_def and assets_def.resource_defs:
for key in sorted(list(assets_def.resource_defs.keys())):
if key in self._resource_defs:
return

def get_resource_defs_config() -> (
Tuple[Mapping[str, ResourceDefinition], Optional[Mapping[str, Any]]]
):
if resources_from_args:
if self._resource_defs:
raise DagsterInvalidInvocationError(
f"Error when invoking {assets_def!s} resource '{key}' "
"provided on both the definition and invocation context. Please "
"provide on only one or the other."
"Cannot provide resources in both context and kwargs"
)
resource_defs = wrap_resources_for_execution(
{**self._resource_defs, **assets_def.resource_defs}
)
# add new resources context to the stack to be cleared on exit
resources = self._exit_stack.enter_context(
build_resources(resource_defs, self.instance, self._resources_config)
return wrap_resources_for_execution(resources_from_args), None
else:
check.invariant(resources_on_assets_def)
assert assets_def # for typing
for key in sorted(list(assets_def.resource_defs.keys())):
if key in self._resource_defs:
raise DagsterInvalidInvocationError(
f"Error when invoking {assets_def!s} resource '{key}' "
"provided on both the definition and invocation context. Please "
"provide on only one or the other."
)
resources = wrap_resources_for_execution(
{**self._resource_defs, **assets_def.resource_defs}
)
return resources, self._resources_config

resource_defs, resources_config = get_resource_defs_config()

with build_resources(resource_defs, self.instance, resources_config) as resources:
yield self.create_bound_context(
op_def, pending_invocation, assets_def, resources, op_config, resource_defs
)
else:
resources = self.resources
resource_defs = self._resource_defs

def create_bound_context(
self, op_def, pending_invocation, assets_def, resources, op_config, resource_defs
) -> "BoundOpExecutionContext":
_validate_resource_requirements(resource_defs, op_def)

if self.op_config and config_from_args:
raise DagsterInvalidInvocationError("Cannot provide config in both context and kwargs")
op_config = resolve_bound_config(config_from_args or self.op_config, op_def)

return BoundOpExecutionContext(
op_def=op_def,
op_config=op_config,
Expand Down

0 comments on commit 5101150

Please sign in to comment.