Skip to content

Commit

Permalink
Use DualStateContextResourcesContainer and DualStateInstanceContainer…
Browse files Browse the repository at this point in the history
… in UnboundInitResourceContext
  • Loading branch information
schrockn committed Aug 22, 2023
1 parent 8549e46 commit 0d23186
Showing 1 changed file with 17 additions and 40 deletions.
57 changes: 17 additions & 40 deletions python_modules/dagster/dagster/_core/execution/context/init.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,17 @@
import dagster._check as check
from dagster._annotations import public
from dagster._core.definitions.resource_definition import (
IContainsGenerator,
ResourceDefinition,
Resources,
)
from dagster._core.definitions.scoped_resources_builder import ScopedResourcesBuilder
from dagster._core.errors import DagsterInvariantViolationError
from dagster._core.instance import DagsterInstance
from dagster._core.log_manager import DagsterLogManager
from dagster._core.storage.dagster_run import DagsterRun

from .dual_state_context import DualStateContextResourcesContainer, DualStateInstanceContainer


class InitResourceContext:
"""The context object available as the argument to the initialization function of a :py:class:`dagster.ResourceDefinition`.
Expand Down Expand Up @@ -124,57 +126,38 @@ def __init__(
resources: Optional[Union[Resources, Mapping[str, Any]]],
instance: Optional[DagsterInstance],
):
from dagster._core.execution.api import ephemeral_instance_if_missing
from dagster._core.execution.build_resources import (
build_resources,
wrap_resources_for_execution,
)
from dagster._core.execution.context_creation_job import initialize_console_manager

self._instance_provided = (
check.opt_inst_param(instance, "instance", DagsterInstance) is not None
)
# Construct ephemeral instance if missing
self._instance_cm = ephemeral_instance_if_missing(instance)
# Pylint can't infer that the ephemeral_instance context manager has an __enter__ method,
# so ignore lint error
instance = self._instance_cm.__enter__()

if isinstance(resources, Resources):
check.failed("Should not have a Resources object directly from this initialization")

self._resource_defs = wrap_resources_for_execution(
check.opt_mapping_param(resources, "resources")
)

self._resources_cm = build_resources(self._resource_defs, instance=instance)
resources = self._resources_cm.__enter__()
self._resources_contain_cm = isinstance(resources, IContainsGenerator)
self._resources_container = DualStateContextResourcesContainer(resources)
self._instance_container = DualStateInstanceContainer(instance)

self._cm_scope_entered = False
super(UnboundInitResourceContext, self).__init__(
resource_config=resource_config,
resources=resources,
resources=ScopedResourcesBuilder.build_empty(), # This is untouched in the parent since we override resources property here
resource_def=None,
instance=instance,
instance=self._instance_container.instance,
dagster_run=None,
log_manager=initialize_console_manager(None),
)

def __enter__(self):
self._cm_scope_entered = True
def __enter__(self) -> "UnboundInitResourceContext":
self._resources_container.call_on_enter()
return self

def __exit__(self, *exc):
self._resources_cm.__exit__(*exc)
if self._instance_provided:
self._instance_cm.__exit__(*exc)
def __exit__(self, *exc) -> None:
self._resources_container.call_on_exit()
self._instance_container.call_on_exit()

def __del__(self):
if self._resources_cm and self._resources_contain_cm and not self._cm_scope_entered:
self._resources_cm.__exit__(None, None, None)
if self._instance_provided and not self._cm_scope_entered:
self._instance_cm.__exit__(None, None, None)
def __del__(self) -> None:
self._resources_container.call_on_del()
self._instance_container.call_on_del()

@property
def resource_config(self) -> Any:
Expand All @@ -189,17 +172,11 @@ def resource_def(self) -> Optional[ResourceDefinition]:
@property
def resources(self) -> Resources:
"""The resources that are available to the resource that we are initalizing."""
if self._resources_cm and self._resources_contain_cm and not self._cm_scope_entered:
raise DagsterInvariantViolationError(
"At least one provided resource is a generator, but attempting to access "
"resources outside of context manager scope. You can use the following syntax to "
"open a context manager: `with build_init_resource_context(...) as context:`"
)
return self._resources
return self._resources_container.get_resources("build_init_resource_context")

@property
def instance(self) -> Optional[DagsterInstance]:
return self._instance
return self._instance_container.instance

@property
def log(self) -> Optional[DagsterLogManager]:
Expand Down

0 comments on commit 0d23186

Please sign in to comment.