From 550946b0a49447d9735ee42e4ad2fd89b2e56714 Mon Sep 17 00:00:00 2001 From: Nicholas Schrock Date: Mon, 21 Aug 2023 16:31:55 -0700 Subject: [PATCH] Use DualStateContextResourcesContainer in SensorEvaluationContext --- .../_core/definitions/sensor_definition.py | 71 ++++--------------- 1 file changed, 14 insertions(+), 57 deletions(-) diff --git a/python_modules/dagster/dagster/_core/definitions/sensor_definition.py b/python_modules/dagster/dagster/_core/definitions/sensor_definition.py index 901c6ae80395c..79ed965684e6f 100644 --- a/python_modules/dagster/dagster/_core/definitions/sensor_definition.py +++ b/python_modules/dagster/dagster/_core/definitions/sensor_definition.py @@ -38,13 +38,13 @@ from dagster._core.definitions.resource_definition import ( Resources, ) -from dagster._core.definitions.scoped_resources_builder import ScopedResourcesBuilder from dagster._core.errors import ( DagsterInvalidDefinitionError, DagsterInvalidInvocationError, DagsterInvalidSubsetError, DagsterInvariantViolationError, ) +from dagster._core.execution.context.dual_state_context import DualStateContextResourcesContainer from dagster._core.instance import DagsterInstance from dagster._core.instance.ref import InstanceRef from dagster._serdes import whitelist_for_serdes @@ -162,10 +162,7 @@ def __init__( self._instance = check.opt_inst_param(instance, "instance", DagsterInstance) self._sensor_name = sensor_name - # Wait to set resources unless they're accessed - self._resource_defs = resources - self._resources = None - self._cm_scope_entered = False + self._resources_container = DualStateContextResourcesContainer(resources) self._log_key = ( [ @@ -180,16 +177,22 @@ def __init__( self._cursor_updated = False def __enter__(self) -> "SensorEvaluationContext": - self._cm_scope_entered = True + self._resources_container.call_on_enter() return self def __exit__(self, *exc) -> None: + self._resources_container.call_on_exit() + self._exit_stack.close() + self._logger = None + + def __del__(self) -> None: + self._resources_container.call_on_del() self._exit_stack.close() self._logger = None @property def resource_defs(self) -> Optional[Mapping[str, "ResourceDefinition"]]: - return self._resource_defs + return self._resources_container.resource_defs def merge_resources(self, resources_dict: Mapping[str, Any]) -> "SensorEvaluationContext": """Merge the specified resources into this context. @@ -200,7 +203,8 @@ def merge_resources(self, resources_dict: Mapping[str, Any]) -> "SensorEvaluatio resources_dict (Mapping[str, Any]): The resources to replace in the context. """ check.invariant( - self._resources is None, "Cannot merge resources in context that has been initialized." + not self._resources_container.has_been_accessed, + "Cannot merge resources in context that has been initialized.", ) from dagster._core.execution.build_resources import wrap_resources_for_execution @@ -214,7 +218,7 @@ def merge_resources(self, resources_dict: Mapping[str, Any]) -> "SensorEvaluatio instance=self._instance, sensor_name=self._sensor_name, resources={ - **(self._resource_defs or {}), + **(self.resource_defs or {}), **wrap_resources_for_execution(resources_dict), }, ) @@ -223,54 +227,7 @@ def merge_resources(self, resources_dict: Mapping[str, Any]) -> "SensorEvaluatio @property def resources(self) -> Resources: """Resources: A mapping from resource key to instantiated resources for this sensor.""" - from dagster._core.definitions.scoped_resources_builder import ( - IContainsGenerator, - ) - from dagster._core.execution.build_resources import build_resources - - if not self._resources: - """ - This is similar to what we do in e.g. the op context - we set up a resource - building context manager, and immediately enter it. This is so that in cases - where a user is not using any context-manager based resources, they don't - need to enter this SensorEvaluationContext themselves. - - For example: - - my_sensor(build_sensor_context(resources={"my_resource": my_non_cm_resource}) - - will work ok, but for a CM resource we must do - - with build_sensor_context(resources={"my_resource": my_cm_resource}) as context: - my_sensor(context) - """ - - # Early exit if no resources are defined. This skips unnecessary initialization - # entirely. This allows users to run user code servers in cases where they - # do not have access to the instance if they use a subset of features do - # that do not require instance access. In this case, if they do not use - # resources on sensors they do not require the instance, so we do not - # instantiate it - # - # Tracking at https://github.com/dagster-io/dagster/issues/14345 - if not self._resource_defs: - self._resources = ScopedResourcesBuilder.build_empty() - return self._resources - - instance = self.instance if self._instance or self._instance_ref else None - - resources_cm = build_resources(resources=self._resource_defs or {}, instance=instance) - self._resources = self._exit_stack.enter_context(resources_cm) - - if isinstance(self._resources, IContainsGenerator) and not self._cm_scope_entered: - self._exit_stack.close() - raise DagsterInvariantViolationError( - "At least one provided resource is a generator, but attempting to access" - " resources outside of context manager scope. You can use the following syntax" - " to open a context manager: `with build_schedule_context(...) as context:`" - ) - - return self._resources + return self._resources_container.get_resources("build_sensor_context") @public @property