diff --git a/python_modules/dagster/dagster/_core/definitions/hook_invocation.py b/python_modules/dagster/dagster/_core/definitions/hook_invocation.py index 429a8edd94fd6..dd7c98a81e101 100644 --- a/python_modules/dagster/dagster/_core/definitions/hook_invocation.py +++ b/python_modules/dagster/dagster/_core/definitions/hook_invocation.py @@ -23,7 +23,8 @@ def hook_invocation_result( # Validate that all required resources are provided in the context ensure_requirements_satisfied( - hook_context._resource_defs, list(hook_def.get_resource_requirements()) # noqa: SLF001 + hook_context._resources_container.resource_defs, # noqa: SLF001 + list(hook_def.get_resource_requirements()), ) bound_context = BoundHookContext( diff --git a/python_modules/dagster/dagster/_core/execution/context/hook.py b/python_modules/dagster/dagster/_core/execution/context/hook.py index 7ce3ec869e71b..2845937d037b9 100644 --- a/python_modules/dagster/dagster/_core/execution/context/hook.py +++ b/python_modules/dagster/dagster/_core/execution/context/hook.py @@ -9,11 +9,12 @@ from ...definitions.dependency import Node from ...definitions.hook_definition import HookDefinition from ...definitions.op_definition import OpDefinition -from ...definitions.resource_definition import IContainsGenerator, Resources +from ...definitions.resource_definition import Resources from ...errors import DagsterInvalidPropertyError, DagsterInvariantViolationError from ...log_manager import DagsterLogManager from ..plan.step import ExecutionStep from ..plan.utils import RetryRequestedFromPolicy +from .dual_state_context import DualStateContextResourcesContainer from .system import StepExecutionContext if TYPE_CHECKING: @@ -202,7 +203,6 @@ def __init__( op_exception: Optional[Exception], instance: Optional["DagsterInstance"], ): - from ..build_resources import build_resources, wrap_resources_for_execution from ..context_creation_job import initialize_console_manager self._op = None @@ -214,11 +214,7 @@ def temp_graph(): self._op = temp_graph.nodes[0] - # Open resource context manager - self._resource_defs = wrap_resources_for_execution(resources) - self._resources_cm = build_resources(self._resource_defs) - self._resources = self._resources_cm.__enter__() - self._resources_contain_cm = isinstance(self._resources, IContainsGenerator) + self._resources_container = DualStateContextResourcesContainer(resources) self._run_id = run_id self._job_name = job_name @@ -227,18 +223,15 @@ def temp_graph(): self._log = initialize_console_manager(None) - self._cm_scope_entered = False - - def __enter__(self): - self._cm_scope_entered = True + def __enter__(self) -> "UnboundHookContext": + self._resources_container.call_on_enter() return self - def __exit__(self, *exc: Any): - self._resources_cm.__exit__(*exc) + def __exit__(self, *exc: Any) -> None: + self._resources_container.call_on_exit() - def __del__(self): - if self._resources_contain_cm and not self._cm_scope_entered: - self._resources_cm.__exit__(None, None, None) + def __del__(self) -> None: + self._resources_container.call_on_del() @property def job_name(self) -> str: @@ -274,13 +267,7 @@ def required_resource_keys(self) -> Set[str]: @property def resources(self) -> "Resources": - if self._resources_contain_cm and not self._cm_scope_entered: - raise DagsterInvariantViolationError( - "At least one provided resource is a generator, but attempting to access " - "resources outside of context manager scope. You can use the following syntax to " - "open a context manager: `with build_hook_context(...) as context:`" - ) - return self._resources + return self._resources_container.get_resources("build_hook_context") @property def solid_config(self) -> Any: