Skip to content

Commit

Permalink
Use DualStateContextResourcesContainer on UnboundHookContext
Browse files Browse the repository at this point in the history
  • Loading branch information
schrockn committed Aug 21, 2023
1 parent c530cd4 commit ff3ee5e
Showing 1 changed file with 10 additions and 23 deletions.
33 changes: 10 additions & 23 deletions python_modules/dagster/dagster/_core/execution/context/hook.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,12 @@
from ...definitions.dependency import Node
from ...definitions.hook_definition import HookDefinition
from ...definitions.op_definition import OpDefinition
from ...definitions.resource_definition import IContainsGenerator, Resources
from ...definitions.resource_definition import Resources
from ...errors import DagsterInvalidPropertyError, DagsterInvariantViolationError
from ...log_manager import DagsterLogManager
from ..plan.step import ExecutionStep
from ..plan.utils import RetryRequestedFromPolicy
from .dual_state_context import DualStateContextResourcesContainer
from .system import StepExecutionContext

if TYPE_CHECKING:
Expand Down Expand Up @@ -202,7 +203,6 @@ def __init__(
op_exception: Optional[Exception],
instance: Optional["DagsterInstance"],
):
from ..build_resources import build_resources, wrap_resources_for_execution
from ..context_creation_job import initialize_console_manager

self._op = None
Expand All @@ -214,11 +214,7 @@ def temp_graph():

self._op = temp_graph.nodes[0]

# Open resource context manager
self._resource_defs = wrap_resources_for_execution(resources)
self._resources_cm = build_resources(self._resource_defs)
self._resources = self._resources_cm.__enter__()
self._resources_contain_cm = isinstance(self._resources, IContainsGenerator)
self._resources_container = DualStateContextResourcesContainer(resources)

self._run_id = run_id
self._job_name = job_name
Expand All @@ -227,18 +223,15 @@ def temp_graph():

self._log = initialize_console_manager(None)

self._cm_scope_entered = False

def __enter__(self):
self._cm_scope_entered = True
def __enter__(self) -> "UnboundHookContext":
self._resources_container.call_on_enter()
return self

def __exit__(self, *exc: Any):
self._resources_cm.__exit__(*exc)
def __exit__(self, *exc: Any) -> None:
self._resources_container.call_on_exit()

def __del__(self):
if self._resources_contain_cm and not self._cm_scope_entered:
self._resources_cm.__exit__(None, None, None)
def __del__(self) -> None:
self._resources_container.call_on_del()

@property
def job_name(self) -> str:
Expand Down Expand Up @@ -274,13 +267,7 @@ def required_resource_keys(self) -> Set[str]:

@property
def resources(self) -> "Resources":
if self._resources_contain_cm and not self._cm_scope_entered:
raise DagsterInvariantViolationError(
"At least one provided resource is a generator, but attempting to access "
"resources outside of context manager scope. You can use the following syntax to "
"open a context manager: `with build_hook_context(...) as context:`"
)
return self._resources
return self._resources_container.get_resources("build_hook_context")

@property
def solid_config(self) -> Any:
Expand Down

0 comments on commit ff3ee5e

Please sign in to comment.