Skip to content

Commit

Permalink
Use DualStateContextResourcesContainer in InputContext
Browse files Browse the repository at this point in the history
  • Loading branch information
schrockn committed Sep 6, 2023
1 parent fe67979 commit e95a582
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 39 deletions.
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
from contextlib import ExitStack
from typing import Any, Mapping, Optional
from typing import Any, Mapping, Optional, Union

from dagster._core.definitions.scoped_resources_builder import IContainsGenerator, Resources
from dagster._core.errors import DagsterInvariantViolationError
from dagster._core.execution.build_resources import build_resources, wrap_resources_for_execution
from dagster._core.instance import DagsterInstance


Expand All @@ -22,20 +21,30 @@ class DualStateContextResourcesContainer:

def __init__(
self,
resources_dict: Mapping[str, Any],
resources_dict_or_resources_obj: Optional[Union[Mapping[str, Any], Resources]],
resources_config: Optional[Mapping[str, Any]] = None,
):
self._cm_scope_entered = False
self._exit_stack = ExitStack()
self.resource_defs = wrap_resources_for_execution(resources_dict)
self._resources = self._exit_stack.enter_context(
build_resources(resources=self.resource_defs, resource_config=resources_config)
)
self._resources_contain_cm = isinstance(self._resources, IContainsGenerator)

if isinstance(resources_dict_or_resources_obj, Resources):
self.resource_defs = {}
self._resources = resources_dict_or_resources_obj
self._resources_contain_cm = False
else:
from dagster._core.execution.build_resources import (
build_resources,
wrap_resources_for_execution,
)

self.resource_defs = wrap_resources_for_execution(resources_dict_or_resources_obj)
self._resources = self._exit_stack.enter_context(
build_resources(self.resource_defs, resource_config=resources_config)
)
self._resources_contain_cm = isinstance(self._resources, IContainsGenerator)

def call_on_enter(self) -> None:
self._cm_scope_entered = True
pass

def call_on_exit(self) -> None:
self._exit_stack.close()
Expand Down
42 changes: 12 additions & 30 deletions python_modules/dagster/dagster/_core/execution/context/input.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
from dagster._core.errors import DagsterInvariantViolationError
from dagster._core.instance import DagsterInstance, DynamicPartitionsStore

from .dual_state_context import DualStateContextResourcesContainer

if TYPE_CHECKING:
from dagster._core.definitions import PartitionsDefinition
from dagster._core.definitions.op_definition import OpDefinition
Expand Down Expand Up @@ -73,9 +75,6 @@ def __init__(
asset_partitions_def: Optional["PartitionsDefinition"] = None,
instance: Optional[DagsterInstance] = None,
):
from dagster._core.definitions.resource_definition import IContainsGenerator, Resources
from dagster._core.execution.build_resources import build_resources

self._name = name
self._job_name = job_name
self._op_def = op_def
Expand All @@ -95,33 +94,22 @@ def __init__(
self._asset_partitions_subset = asset_partitions_subset
self._asset_partitions_def = asset_partitions_def

if isinstance(resources, Resources):
self._resources_cm = None
self._resources = resources
else:
self._resources_cm = build_resources(
check.opt_mapping_param(resources, "resources", key_type=str)
)
self._resources = self._resources_cm.__enter__()
self._resources_contain_cm = isinstance(self._resources, IContainsGenerator)
self._cm_scope_entered = False
self._explicit_resources_provided = bool(resources)
self._resources_container = DualStateContextResourcesContainer(resources)

self._events: List["DagsterEvent"] = []
self._observations: List[AssetObservation] = []
self._instance = instance

def __enter__(self):
if self._resources_cm:
self._cm_scope_entered = True
def __enter__(self) -> "InputContext":
self._resources_container.call_on_enter()
return self

def __exit__(self, *exc):
if self._resources_cm:
self._resources_cm.__exit__(*exc)
def __exit__(self, *exc) -> None:
self._resources_container.call_on_exit()

def __del__(self):
if self._resources_cm and self._resources_contain_cm and not self._cm_scope_entered:
self._resources_cm.__exit__(None, None, None)
def __del__(self) -> None:
self._resources_container.call_on_del()

@property
def instance(self) -> DagsterInstance:
Expand Down Expand Up @@ -238,19 +226,13 @@ def resources(self) -> Any:
input manager. If using the :py:func:`@input_manager` decorator, these resources
correspond to those requested with the `required_resource_keys` parameter.
"""
if self._resources is None:
if not self._explicit_resources_provided:
raise DagsterInvariantViolationError(
"Attempting to access resources, "
"but it was not provided when constructing the InputContext"
)

if self._resources_cm and self._resources_contain_cm and not self._cm_scope_entered:
raise DagsterInvariantViolationError(
"At least one provided resource is a generator, but attempting to access "
"resources outside of context manager scope. You can use the following syntax to "
"open a context manager: `with build_input_context(...) as context:`"
)
return self._resources
return self._resources_container.get_resources("build_input_context")

@public
@property
Expand Down

0 comments on commit e95a582

Please sign in to comment.