Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Change schedule context to use DualStateContextResourcesContainer #16001

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@
from dagster._annotations import deprecated, deprecated_param, public
from dagster._core.definitions.instigation_logger import InstigationLogger
from dagster._core.definitions.resource_annotation import get_resource_args
from dagster._core.definitions.scoped_resources_builder import Resources, ScopedResourcesBuilder
from dagster._core.definitions.scoped_resources_builder import Resources
from dagster._core.execution.context.dual_state_context import DualStateContextResourcesContainer
from dagster._serdes import whitelist_for_serdes
from dagster._utils import IHasInternalInit, ensure_gen
from dagster._utils.merger import merge_dicts
Expand Down Expand Up @@ -170,11 +171,8 @@ def the_schedule(context: ScheduleEvaluationContext):
"_log_key",
"_logger",
"_repository_name",
"_resource_defs",
"_schedule_name",
"_resources_cm",
"_resources",
"_cm_scope_entered",
"_resources_container",
"_repository_def",
]

Expand Down Expand Up @@ -210,63 +208,37 @@ def __init__(
self._schedule_name = schedule_name

# Wait to set resources unless they're accessed
self._resource_defs = resources
self._resources = None
self._cm_scope_entered = False
self._resources_container = DualStateContextResourcesContainer(resources)
self._repository_def = check.opt_inst_param(
repository_def, "repository_def", RepositoryDefinition
)

def __enter__(self) -> "ScheduleEvaluationContext":
self._cm_scope_entered = True
self._resources_container.call_on_enter()
return self

def __exit__(self, *exc) -> None:
self._exit_stack.close()
self._resources_container.call_on_exit()
self._logger = None

def __del__(self) -> None:
self._resources_container.call_on_del()

@property
def resource_defs(self) -> Optional[Mapping[str, "ResourceDefinition"]]:
return self._resource_defs
return self._resources_container.resource_defs

@public
@property
def resources(self) -> Resources:
"""Mapping of resource key to resource definition to be made available
during schedule execution.
"""
from dagster._core.definitions.scoped_resources_builder import (
IContainsGenerator,
)
from dagster._core.execution.build_resources import build_resources

if not self._resources:
# Early exit if no resources are defined. This skips unnecessary initialization
# entirely. This allows users to run user code servers in cases where they
# do not have access to the instance if they use a subset of features do
# that do not require instance access. In this case, if they do not use
# resources on schedules they do not require the instance, so we do not
# instantiate it
#
# Tracking at https://github.com/dagster-io/dagster/issues/14345
if not self._resource_defs:
self._resources = ScopedResourcesBuilder.build_empty()
return self._resources

instance = self.instance if self._instance or self._instance_ref else None

resources_cm = build_resources(resources=self._resource_defs, instance=instance)
self._resources = self._exit_stack.enter_context(resources_cm)

if isinstance(self._resources, IContainsGenerator) and not self._cm_scope_entered:
self._exit_stack.close()
raise DagsterInvariantViolationError(
"At least one provided resource is a generator, but attempting to access"
" resources outside of context manager scope. You can use the following syntax"
" to open a context manager: `with build_sensor_context(...) as context:`"
)
if self._resources_container.has_been_accessed:
return self._resources_container.get_already_accessed_resources()

return self._resources
instance = self.instance if self._instance or self._instance_ref else None
return self._resources_container.make_resources("build_schedule_context", instance=instance)

def merge_resources(self, resources_dict: Mapping[str, Any]) -> "ScheduleEvaluationContext":
"""Merge the specified resources into this context.
Expand All @@ -276,7 +248,8 @@ def merge_resources(self, resources_dict: Mapping[str, Any]) -> "ScheduleEvaluat
resources_dict (Mapping[str, Any]): The resources to replace in the context.
"""
check.invariant(
self._resources is None, "Cannot merge resources in context that has been initialized."
not self._resources_container.cm_scope_entered,
"Cannot merge resources in context that has been initialized.",
)
from dagster._core.execution.build_resources import wrap_resources_for_execution

Expand All @@ -286,7 +259,7 @@ def merge_resources(self, resources_dict: Mapping[str, Any]) -> "ScheduleEvaluat
repository_name=self._repository_name,
schedule_name=self._schedule_name,
resources={
**(self._resource_defs or {}),
**(self.resource_defs or {}),
**wrap_resources_for_execution(resources_dict),
},
repository_def=self._repository_def,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,16 @@
from contextlib import ExitStack
from typing import Any, Mapping, Optional, Union

from dagster._core.definitions.scoped_resources_builder import IContainsGenerator, Resources
from typing import TYPE_CHECKING, Any, Mapping, Optional, Union

from dagster import _check as check
from dagster._core.definitions.scoped_resources_builder import (
IContainsGenerator,
Resources,
ScopedResourcesBuilder,
)
from dagster._core.errors import DagsterInvariantViolationError
from dagster._core.instance import DagsterInstance

if TYPE_CHECKING:
from dagster._core.instance import DagsterInstance


class DualStateContextResourcesContainer:
Expand All @@ -24,36 +31,68 @@ def __init__(
resources_dict_or_resources_obj: Optional[Union[Mapping[str, Any], Resources]],
resources_config: Optional[Mapping[str, Any]] = None,
):
self._cm_scope_entered = False
self.cm_scope_entered = False
self._exit_stack = ExitStack()
self._resources_config = resources_config

if isinstance(resources_dict_or_resources_obj, Resources):
self.resource_defs = {}
self._resources = resources_dict_or_resources_obj
self._resources_contain_cm = False
else:
from dagster._core.execution.build_resources import (
build_resources,
wrap_resources_for_execution,
)

self._resources = None
self.resource_defs = wrap_resources_for_execution(resources_dict_or_resources_obj)
self._resources = self._exit_stack.enter_context(
build_resources(self.resource_defs, resource_config=resources_config)
)
self._resources_contain_cm = isinstance(self._resources, IContainsGenerator)
if not self.resource_defs:
self._resources = ScopedResourcesBuilder.build_empty()

def call_on_enter(self) -> None:
self._cm_scope_entered = True
self.cm_scope_entered = True

def call_on_exit(self) -> None:
self._exit_stack.close()

def call_on_del(self) -> None:
self._exit_stack.close()

def get_resources(self, fn_name_for_err_msg: str) -> Resources:
if self._resources_contain_cm and not self._cm_scope_entered:
@property
def has_been_accessed(self) -> bool:
return isinstance(self._resources, Resources)

def get_already_accessed_resources(self) -> Resources:
check.invariant(self.has_been_accessed)
return check.not_none(self._resources)

def make_resources(
self, fn_name_for_err_msg: str, instance: Optional["DagsterInstance"] = None
) -> Resources:
if self._resources:
return self._resources

# Early exit if no resources are defined. This skips unnecessary initialization
# entirely. This allows users to run user code servers in cases where they
# do not have access to the instance if they use a subset of features do
# that do not require instance access. In this case, if they do not use
# resources on schedules they do not require the instance, so we do not
# instantiate it
#
# Tracking at https://github.com/dagster-io/dagster/issues/14345
if not self.resource_defs:
self._resources = ScopedResourcesBuilder.build_empty()
return self._resources

from dagster._core.execution.build_resources import build_resources

self._resources = self._exit_stack.enter_context(
build_resources(
self.resource_defs, resource_config=self._resources_config, instance=instance
)
)
resources_contain_cm = isinstance(self._resources, IContainsGenerator)

if resources_contain_cm and not self._cm_scope_entered:
raise DagsterInvariantViolationError(
"At least one provided resource is a generator, but attempting to access "
"resources outside of context manager scope. You can use the following syntax to "
Expand All @@ -63,7 +102,7 @@ def get_resources(self, fn_name_for_err_msg: str) -> Resources:


class DualStateInstanceContainer:
def __init__(self, instance: Optional[DagsterInstance]):
def __init__(self, instance: Optional["DagsterInstance"]):
from dagster._core.execution.api import ephemeral_instance_if_missing

self._exit_stack = ExitStack()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,7 @@ def required_resource_keys(self) -> Set[str]:

@property
def resources(self) -> "Resources":
return self._resources_container.get_resources("build_hook_context")
return self._resources_container.make_resources("build_hook_context")

@property
def solid_config(self) -> Any:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ def resource_def(self) -> Optional[ResourceDefinition]:
@property
def resources(self) -> Resources:
"""The resources that are available to the resource that we are initalizing."""
return self._resources_container.get_resources("build_init_resource_context")
return self._resources_container.make_resources("build_init_resource_context")

@property
def instance(self) -> Optional[DagsterInstance]:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ def resources(self) -> Any:
"but it was not provided when constructing the InputContext"
)

return self._resources_container.get_resources("build_input_context")
return self._resources_container.make_resources("build_input_context")

@public
@property
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ def resource_keys(self) -> AbstractSet[str]:

@property
def resources(self) -> Resources:
return self._resources_container.get_resources("build_op_context")
return self._resources_container.make_resources("build_op_context")

@property
def dagster_run(self) -> DagsterRun:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,7 @@ def resources(self) -> Any:
"but it was not provided when constructing the OutputContext"
)

return self._resources_container.get_resources("build_output_context")
return self._resources_container.make_resources("build_output_context")

@property
def asset_info(self) -> Optional[AssetOutputInfo]:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1037,9 +1037,12 @@ def my_op(context):
DagsterInvalidConfigError,
match='Received unexpected config entry "bad_resource" at the root.',
):
context_builder(
resources={"my_resource": my_resource},
resources_config={"bad_resource": {"config": "foo"}},
# behavior change to deferred config eval
my_op(
context_builder(
resources={"my_resource": my_resource},
resources_config={"bad_resource": {"config": "foo"}},
)
)


Expand Down