Skip to content

Commit

Permalink
Use DualStateContextResourcesContainer in SensorEvaluationContext
Browse files Browse the repository at this point in the history
  • Loading branch information
schrockn committed Aug 21, 2023
1 parent 6585362 commit a89f35d
Showing 1 changed file with 14 additions and 57 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,13 @@
from dagster._core.definitions.resource_definition import (
Resources,
)
from dagster._core.definitions.scoped_resources_builder import ScopedResourcesBuilder
from dagster._core.errors import (
DagsterInvalidDefinitionError,
DagsterInvalidInvocationError,
DagsterInvalidSubsetError,
DagsterInvariantViolationError,
)
from dagster._core.execution.context.dual_state_context import DualStateContextResourcesContainer
from dagster._core.instance import DagsterInstance
from dagster._core.instance.ref import InstanceRef
from dagster._serdes import whitelist_for_serdes
Expand Down Expand Up @@ -162,10 +162,7 @@ def __init__(
self._instance = check.opt_inst_param(instance, "instance", DagsterInstance)
self._sensor_name = sensor_name

# Wait to set resources unless they're accessed
self._resource_defs = resources
self._resources = None
self._cm_scope_entered = False
self._resources_container = DualStateContextResourcesContainer(resources)

self._log_key = (
[
Expand All @@ -180,16 +177,22 @@ def __init__(
self._cursor_updated = False

def __enter__(self) -> "SensorEvaluationContext":
self._cm_scope_entered = True
self._resources_container.call_on_enter()
return self

def __exit__(self, *exc) -> None:
self._resources_container.call_on_exit()
self._exit_stack.close()
self._logger = None

def __del__(self) -> None:
self._resources_container.call_on_del()
self._exit_stack.close()
self._logger = None

@property
def resource_defs(self) -> Optional[Mapping[str, "ResourceDefinition"]]:
return self._resource_defs
return self._resources_container.resource_defs

def merge_resources(self, resources_dict: Mapping[str, Any]) -> "SensorEvaluationContext":
"""Merge the specified resources into this context.
Expand All @@ -200,7 +203,8 @@ def merge_resources(self, resources_dict: Mapping[str, Any]) -> "SensorEvaluatio
resources_dict (Mapping[str, Any]): The resources to replace in the context.
"""
check.invariant(
self._resources is None, "Cannot merge resources in context that has been initialized."
not self._resources_container.has_been_accessed,
"Cannot merge resources in context that has been initialized.",
)
from dagster._core.execution.build_resources import wrap_resources_for_execution

Expand All @@ -214,7 +218,7 @@ def merge_resources(self, resources_dict: Mapping[str, Any]) -> "SensorEvaluatio
instance=self._instance,
sensor_name=self._sensor_name,
resources={
**(self._resource_defs or {}),
**(self.resource_defs or {}),
**wrap_resources_for_execution(resources_dict),
},
)
Expand All @@ -223,54 +227,7 @@ def merge_resources(self, resources_dict: Mapping[str, Any]) -> "SensorEvaluatio
@property
def resources(self) -> Resources:
"""Resources: A mapping from resource key to instantiated resources for this sensor."""
from dagster._core.definitions.scoped_resources_builder import (
IContainsGenerator,
)
from dagster._core.execution.build_resources import build_resources

if not self._resources:
"""
This is similar to what we do in e.g. the op context - we set up a resource
building context manager, and immediately enter it. This is so that in cases
where a user is not using any context-manager based resources, they don't
need to enter this SensorEvaluationContext themselves.
For example:
my_sensor(build_sensor_context(resources={"my_resource": my_non_cm_resource})
will work ok, but for a CM resource we must do
with build_sensor_context(resources={"my_resource": my_cm_resource}) as context:
my_sensor(context)
"""

# Early exit if no resources are defined. This skips unnecessary initialization
# entirely. This allows users to run user code servers in cases where they
# do not have access to the instance if they use a subset of features do
# that do not require instance access. In this case, if they do not use
# resources on sensors they do not require the instance, so we do not
# instantiate it
#
# Tracking at https://github.com/dagster-io/dagster/issues/14345
if not self._resource_defs:
self._resources = ScopedResourcesBuilder.build_empty()
return self._resources

instance = self.instance if self._instance or self._instance_ref else None

resources_cm = build_resources(resources=self._resource_defs or {}, instance=instance)
self._resources = self._exit_stack.enter_context(resources_cm)

if isinstance(self._resources, IContainsGenerator) and not self._cm_scope_entered:
self._exit_stack.close()
raise DagsterInvariantViolationError(
"At least one provided resource is a generator, but attempting to access"
" resources outside of context manager scope. You can use the following syntax"
" to open a context manager: `with build_schedule_context(...) as context:`"
)

return self._resources
return self._resources_container.get_resources("build_sensor_context")

@public
@property
Expand Down

0 comments on commit a89f35d

Please sign in to comment.