Skip to content

Commit

Permalink
more reorg
Browse files Browse the repository at this point in the history
  • Loading branch information
jamiedemaria committed Dec 11, 2023
1 parent 9b04729 commit fe91874
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 22 deletions.
22 changes: 20 additions & 2 deletions python_modules/dagster/dagster/_core/execution/context/compute.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,13 +71,16 @@ class ExecutionProperties:
You should not need to access these attributes directly.
"""

def __init__(self, step_description: str, node_type: str, op_def: "OpDefinition"):
def __init__(
self, step_description: str, node_type: str, op_def: "OpDefinition", op_config: Any
):
self._step_description = step_description
self._node_type = node_type
self._op_def = op_def
self._events: List[DagsterEvent] = []
self._requires_typed_event_stream = False
self._typed_event_stream_error_message = None
self._op_config = op_config

@property
def step_description(self) -> str:
Expand All @@ -91,6 +94,10 @@ def node_type(self) -> str:
def op_def(self) -> "OpDefinition":
return self._op_def

@property
def op_config(self) -> Any:
return self._op_config

def consume_events(self) -> Iterator[DagsterEvent]:
events = self._events
self._events = []
Expand Down Expand Up @@ -168,11 +175,20 @@ def op_config(self) -> Any:


class ContextHasExecutionProperties(ABC):
"""Base class that any context that can be used for execution or invocation of an op or asset
must implement.
"""

@property
@abstractmethod
def execution_properties(self) -> ExecutionProperties:
"""Context classes must contain an instance of ExecutionProperties."""

@property
@abstractmethod
def resources(self) -> Any:
"""Context classes must be able to provide currently available resources."""


class OpExecutionContextMetaClass(AbstractComputeMetaclass):
def __instancecheck__(cls, instance) -> bool:
Expand Down Expand Up @@ -233,6 +249,7 @@ def __init__(self, step_execution_context: StepExecutionContext):
OpDefinition,
self._step_execution_context.job_def.get_node(self.node_handle).definition,
),
op_config=self._step_execution_context.op_config,
)

@property
Expand All @@ -243,7 +260,7 @@ def execution_properties(self) -> ExecutionProperties:
@property
def op_config(self) -> Any:
"""Any: The parsed config specific to this op."""
return self._step_execution_context.op_config
return self.execution_properties.op_config

@property
def dagster_run(self) -> DagsterRun:
Expand Down Expand Up @@ -1517,6 +1534,7 @@ def execution_properties(self) -> ExecutionProperties:
step_description=f"asset {self.op_execution_context.node_handle}",
node_type="asset",
op_def=self.op_execution_context.op_def,
op_config=self.op_execution_context.op_config,
)
return self._execution_props

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@
from dagster._utils.merger import merge_dicts
from dagster._utils.warnings import deprecation_warning

from .compute import OpExecutionContext
from .compute import ExecutionProperties, OpExecutionContext
from .system import StepExecutionContext, TypeCheckContext


Expand Down Expand Up @@ -106,18 +106,26 @@ def __new__(
)


class RunlessExecutionProperties:
"""Maintains information about the invocation that is updated during execution time. This information
needs to be available to the user once invocation is complete, so that they can assert on events and
outputs. It needs to be cleared before the context is used for another invocation.
class RunlessExecutionProperties(ExecutionProperties):
"""Maintains properties that need to be available to the execution code. To support runless execution
(direct invocation) this class also maintains information about the invocation that is updated
during execution time. This information needs to be available to the user once invocation is
complete, so that they can assert on events and outputs. It needs to be cleared before the
context is used for another invocation.
"""

def __init__(self):
def __init__(
self, step_description: str, node_type: str, op_def: "OpDefinition", op_config: Any
):
self._step_description = step_description
self._node_type = node_type
self._op_def = op_def
self._events: List[UserEvent] = []
self._seen_outputs = {}
self._output_metadata = {}
self._requires_typed_event_stream = False
self._typed_event_stream_error_message = None
self._op_config = op_config

@property
def user_events(self):
Expand All @@ -131,14 +139,6 @@ def seen_outputs(self):
def output_metadata(self):
return self._output_metadata

@property
def requires_typed_event_stream(self) -> bool:
return self._requires_typed_event_stream

@property
def typed_event_stream_error_message(self) -> Optional[str]:
return self._typed_event_stream_error_message

def log_event(self, event: UserEvent) -> None:
check.inst_param(
event,
Expand Down Expand Up @@ -292,7 +292,7 @@ def __init__(
# my_op(ctx)
# ctx._execution_properties.output_metadata # information is retained after invocation
# my_op(ctx) # ctx._execution_properties is cleared at the beginning of the next invocation
self._execution_properties = RunlessExecutionProperties()
self._execution_properties = None

def __enter__(self):
self._cm_scope_entered = True
Expand Down Expand Up @@ -326,9 +326,6 @@ def bind(
f"This context is currently being used to execute {self.alias}. The context cannot be used to execute another op until {self.alias} has finished executing."
)

# reset execution_properties
self._execution_properties = RunlessExecutionProperties()

# update the bound context with properties relevant to the execution of the op

invocation_tags = (
Expand Down Expand Up @@ -403,6 +400,11 @@ def bind(
step_description=step_description,
)

# reset execution_properties
self._execution_properties = RunlessExecutionProperties(
step_description=step_description, node_type="op", op_def=op_def, op_config=op_config
)

return self

def unbind(self):
Expand All @@ -414,6 +416,11 @@ def is_bound(self) -> bool:

@property
def execution_properties(self) -> RunlessExecutionProperties:
if self._execution_properties is None:
raise DagsterInvalidPropertyError(
"Cannot access execution_properties until after the context has been used to"
" invoke an op"
)
return self._execution_properties

@property
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,10 +120,12 @@ def invoke_compute_fn(
if config_arg_cls:
# config_arg_cls is either a Config class or a primitive type
if issubclass(config_arg_cls, Config):
to_pass = config_arg_cls._get_non_default_public_field_values_cls(context.op_config) # noqa: SLF001
to_pass = config_arg_cls._get_non_default_public_field_values_cls( # noqa: SLF001
context.execution_properties.op_config
)
args_to_pass["config"] = config_arg_cls(**to_pass)
else:
args_to_pass["config"] = context.op_config
args_to_pass["config"] = context.execution_properties.op_config
if resource_args:
for resource_name, arg_name in resource_args.items():
args_to_pass[arg_name] = context.resources.original_resource_dict[resource_name]
Expand Down

0 comments on commit fe91874

Please sign in to comment.