Skip to content

Commit

Permalink
rename
Browse files Browse the repository at this point in the history
  • Loading branch information
jamiedemaria committed Jan 29, 2024
1 parent 0371e8e commit deec5a3
Show file tree
Hide file tree
Showing 4 changed files with 110 additions and 148 deletions.
30 changes: 15 additions & 15 deletions python_modules/dagster/dagster/_core/definitions/op_invocation.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
from .result import MaterializeResult

if TYPE_CHECKING:
from ..execution.context.invocation import RunlessOpExecutionContext
from ..execution.context.invocation import DirectOpExecutionContext
from .assets import AssetsDefinition
from .composition import PendingNodeInvocation
from .decorators.op_decorator import DecoratedOpFunction
Expand Down Expand Up @@ -109,7 +109,7 @@ def direct_invocation_result(
) -> Any:
from dagster._config.pythonic_config import Config
from dagster._core.execution.context.invocation import (
RunlessOpExecutionContext,
DirectOpExecutionContext,
build_op_context,
)

Expand Down Expand Up @@ -149,12 +149,12 @@ def direct_invocation_result(
" no context was provided when invoking."
)
if len(args) > 0:
if args[0] is not None and not isinstance(args[0], RunlessOpExecutionContext):
if args[0] is not None and not isinstance(args[0], DirectOpExecutionContext):
raise DagsterInvalidInvocationError(
f"Decorated function '{compute_fn.name}' has context argument, "
"but no context was provided when invoking."
)
context = cast(RunlessOpExecutionContext, args[0])
context = cast(DirectOpExecutionContext, args[0])
# update args to omit context
args = args[1:]
else: # context argument is provided under kwargs
Expand All @@ -165,14 +165,14 @@ def direct_invocation_result(
f"'{context_param_name}', but no value for '{context_param_name}' was "
f"found when invoking. Provided kwargs: {kwargs}"
)
context = cast(RunlessOpExecutionContext, kwargs[context_param_name])
context = cast(DirectOpExecutionContext, kwargs[context_param_name])
# update kwargs to remove context
kwargs = {
kwarg: val for kwarg, val in kwargs.items() if not kwarg == context_param_name
}
# allow passing context, even if the function doesn't have an arg for it
elif len(args) > 0 and isinstance(args[0], RunlessOpExecutionContext):
context = cast(RunlessOpExecutionContext, args[0])
elif len(args) > 0 and isinstance(args[0], DirectOpExecutionContext):
context = cast(DirectOpExecutionContext, args[0])
args = args[1:]

resource_arg_mapping = {arg.name: arg.name for arg in compute_fn.get_resource_args()}
Expand Down Expand Up @@ -230,7 +230,7 @@ def direct_invocation_result(


def _resolve_inputs(
op_def: "OpDefinition", args, kwargs, context: "RunlessOpExecutionContext"
op_def: "OpDefinition", args, kwargs, context: "DirectOpExecutionContext"
) -> Mapping[str, Any]:
from dagster._core.execution.plan.execute_step import do_type_check

Expand Down Expand Up @@ -333,7 +333,7 @@ def _resolve_inputs(
return input_dict


def _key_for_result(result: MaterializeResult, context: "RunlessOpExecutionContext") -> AssetKey:
def _key_for_result(result: MaterializeResult, context: "DirectOpExecutionContext") -> AssetKey:
if not context.bound_properties.assets_def:
raise DagsterInvariantViolationError(
f"Op {context.bound_properties.alias} does not have an assets definition."
Expand All @@ -352,7 +352,7 @@ def _key_for_result(result: MaterializeResult, context: "RunlessOpExecutionConte

def _output_name_for_result_obj(
event: MaterializeResult,
context: "RunlessOpExecutionContext",
context: "DirectOpExecutionContext",
):
if not context.bound_properties.assets_def:
raise DagsterInvariantViolationError(
Expand All @@ -365,7 +365,7 @@ def _output_name_for_result_obj(
def _handle_gen_event(
event: T,
op_def: "OpDefinition",
context: "RunlessOpExecutionContext",
context: "DirectOpExecutionContext",
output_defs: Mapping[str, OutputDefinition],
outputs_seen: Set[str],
) -> T:
Expand Down Expand Up @@ -399,7 +399,7 @@ def _handle_gen_event(


def _type_check_output_wrapper(
op_def: "OpDefinition", result: Any, context: "RunlessOpExecutionContext"
op_def: "OpDefinition", result: Any, context: "DirectOpExecutionContext"
) -> Any:
"""Type checks and returns the result of a op.
Expand Down Expand Up @@ -493,7 +493,7 @@ def type_check_gen(gen):


def _type_check_function_output(
op_def: "OpDefinition", result: T, context: "RunlessOpExecutionContext"
op_def: "OpDefinition", result: T, context: "DirectOpExecutionContext"
) -> T:
from ..execution.plan.compute_generator import validate_and_coerce_op_result_to_iterator

Expand All @@ -512,14 +512,14 @@ def _type_check_function_output(
def _type_check_output(
output_def: "OutputDefinition",
output: Union[Output, DynamicOutput],
context: "RunlessOpExecutionContext",
context: "DirectOpExecutionContext",
) -> None:
"""Validates and performs core type check on a provided output.
Args:
output_def (OutputDefinition): The output definition to validate against.
output (Any): The output to validate.
context (RunlessOpExecutionContext): Context containing resources to be used for type
context (DirectOpExecutionContext): Context containing resources to be used for type
check.
"""
from ..execution.plan.execute_step import do_type_check
Expand Down
Loading

0 comments on commit deec5a3

Please sign in to comment.