From d7c8769f5f6d3ce0e4b0ee1646d9fcf049b9d63a Mon Sep 17 00:00:00 2001 From: Alex Langenfeld Date: Tue, 15 Oct 2024 15:58:21 -0500 Subject: [PATCH] [global asset graph] precompute targeting instigators (#25202) To facilitate being able to resolve all required information for an asset node from just the global scope remote asset node, track which sensors and schedules target each repository scoped asset. ## How I Tested These Changes existing coverage, additional coverage in https://github.com/dagster-io/internal/pull/11840 --- .../implementation/fetch_schedules.py | 10 ++- .../implementation/fetch_sensors.py | 17 ++-- .../dagster_graphql/schema/asset_graph.py | 74 ++++++++-------- .../schema/asset_selections.py | 43 ++++++---- .../dagster_graphql/schema/external.py | 4 +- .../schema/schedules/schedules.py | 7 +- .../dagster_graphql/schema/sensors.py | 8 +- .../_core/definitions/base_asset_graph.py | 14 +-- .../_core/definitions/remote_asset_graph.py | 39 ++++++++- .../_core/remote_representation/external.py | 86 +++++++++++++++++-- .../dagster/_core/workspace/context.py | 25 +++++- 11 files changed, 233 insertions(+), 94 deletions(-) diff --git a/python_modules/dagster-graphql/dagster_graphql/implementation/fetch_schedules.py b/python_modules/dagster-graphql/dagster_graphql/implementation/fetch_schedules.py index 67defc54e5c14..7746339b91c40 100644 --- a/python_modules/dagster-graphql/dagster_graphql/implementation/fetch_schedules.py +++ b/python_modules/dagster-graphql/dagster_graphql/implementation/fetch_schedules.py @@ -144,7 +144,10 @@ def get_schedules_or_error( results = [ GrapheneSchedule( - schedule, repository, schedule_states_by_name.get(schedule.name), batch_loader + schedule, + repository.handle, + schedule_states_by_name.get(schedule.name), + batch_loader, ) for schedule in filtered ] @@ -172,7 +175,8 @@ def get_schedules_for_pipeline( schedule.get_remote_origin_id(), schedule.selector_id, ) - results.append(GrapheneSchedule(schedule, repository, schedule_state)) + + results.append(GrapheneSchedule(schedule, repository.handle, schedule_state)) return results @@ -197,7 +201,7 @@ def get_schedule_or_error( schedule_state = graphene_info.context.instance.get_instigator_state( schedule.get_remote_origin_id(), schedule.selector_id ) - return GrapheneSchedule(schedule, repository, schedule_state) + return GrapheneSchedule(schedule, repository.handle, schedule_state) def get_schedule_next_tick( diff --git a/python_modules/dagster-graphql/dagster_graphql/implementation/fetch_sensors.py b/python_modules/dagster-graphql/dagster_graphql/implementation/fetch_sensors.py index c65458165e48c..6f8c465b0d7d9 100644 --- a/python_modules/dagster-graphql/dagster_graphql/implementation/fetch_sensors.py +++ b/python_modules/dagster-graphql/dagster_graphql/implementation/fetch_sensors.py @@ -61,7 +61,12 @@ def get_sensors_or_error( return GrapheneSensors( results=[ - GrapheneSensor(sensor, repository, sensor_states_by_name.get(sensor.name), batch_loader) + GrapheneSensor( + sensor, + repository.handle, + sensor_states_by_name.get(sensor.name), + batch_loader, + ) for sensor in filtered ] ) @@ -83,7 +88,7 @@ def get_sensor_or_error(graphene_info: ResolveInfo, selector: SensorSelector) -> sensor.selector_id, ) - return GrapheneSensor(sensor, repository, sensor_state) + return GrapheneSensor(sensor, repository.handle, sensor_state) def start_sensor(graphene_info: ResolveInfo, sensor_selector: SensorSelector) -> "GrapheneSensor": @@ -98,7 +103,7 @@ def start_sensor(graphene_info: ResolveInfo, sensor_selector: SensorSelector) -> raise UserFacingGraphQLError(GrapheneSensorNotFoundError(sensor_selector.sensor_name)) sensor = repository.get_sensor(sensor_selector.sensor_name) sensor_state = graphene_info.context.instance.start_sensor(sensor) - return GrapheneSensor(sensor, repository, sensor_state) + return GrapheneSensor(sensor, repository.handle, sensor_state) def stop_sensor( @@ -151,7 +156,7 @@ def reset_sensor(graphene_info: ResolveInfo, sensor_selector: SensorSelector) -> sensor = repository.get_sensor(sensor_selector.sensor_name) sensor_state = graphene_info.context.instance.reset_sensor(sensor) - return GrapheneSensor(sensor, repository, sensor_state) + return GrapheneSensor(sensor, repository.handle, sensor_state) def get_sensors_for_pipeline( @@ -174,7 +179,7 @@ def get_sensors_for_pipeline( sensor.get_remote_origin_id(), sensor.selector_id, ) - results.append(GrapheneSensor(sensor, repository, sensor_state)) + results.append(GrapheneSensor(sensor, repository.handle, sensor_state)) return results @@ -259,4 +264,4 @@ def set_sensor_cursor( else: instance.update_instigator_state(updated_state) - return GrapheneSensor(sensor, repository, updated_state) + return GrapheneSensor(sensor, repository.handle, updated_state) diff --git a/python_modules/dagster-graphql/dagster_graphql/schema/asset_graph.py b/python_modules/dagster-graphql/dagster_graphql/schema/asset_graph.py index 0b192bde05c2c..fdefd4d288873 100644 --- a/python_modules/dagster-graphql/dagster_graphql/schema/asset_graph.py +++ b/python_modules/dagster-graphql/dagster_graphql/schema/asset_graph.py @@ -1,13 +1,11 @@ -from typing import TYPE_CHECKING, Any, List, Optional, Sequence, Set, Union, cast +from typing import TYPE_CHECKING, Any, List, Optional, Sequence, Union, cast import graphene from dagster import ( AssetKey, - DagsterError, _check as check, ) from dagster._core.definitions.asset_graph_differ import AssetDefinitionChangeType, AssetGraphDiffer -from dagster._core.definitions.asset_job import IMPLICIT_ASSET_JOB_NAME from dagster._core.definitions.data_time import CachingDataTimeResolver from dagster._core.definitions.data_version import ( NULL_DATA_VERSION, @@ -16,7 +14,7 @@ ) from dagster._core.definitions.partition import CachingDynamicPartitionsLoader, PartitionsDefinition from dagster._core.definitions.partition_mapping import PartitionMapping -from dagster._core.definitions.remote_asset_graph import RemoteAssetGraph, RemoteAssetNode +from dagster._core.definitions.remote_asset_graph import RemoteAssetNode, RemoteWorkspaceAssetNode from dagster._core.definitions.selector import JobSelector from dagster._core.definitions.sensor_definition import SensorType from dagster._core.errors import DagsterInvariantViolationError @@ -847,53 +845,49 @@ def resolve_automationCondition( return GrapheneAutomationCondition(self._asset_node_snap.automation_condition) return None - def _sensor_targets_asset( - self, sensor: RemoteSensor, asset_graph: RemoteAssetGraph, job_names: Set[str] - ) -> bool: - asset_key = self._asset_node_snap.asset_key - - if sensor.asset_selection is not None: - try: - asset_selection = sensor.asset_selection.resolve(asset_graph) - except DagsterError: - return False - - if asset_key in asset_selection: - return True - - return any(target.job_name in job_names for target in sensor.get_targets()) - def resolve_targetingInstigators(self, graphene_info: ResolveInfo) -> Sequence[GrapheneSensor]: - repo = graphene_info.context.get_repository(self._repository_selector) - sensors = repo.get_sensors() - schedules = repo.get_schedules() - - asset_graph = repo.asset_graph - - job_names = { - job_name - for job_name in self._asset_node_snap.job_names - if not job_name == IMPLICIT_ASSET_JOB_NAME - } + if isinstance(self._remote_node, RemoteWorkspaceAssetNode): + # global nodes have saved references to their targeting instigators + schedules = [ + graphene_info.context.get_schedule(schedule_handle) + for schedule_handle in self._remote_node.get_targeting_schedule_handles() + ] + sensors = [ + graphene_info.context.get_sensor(sensor_handle) + for sensor_handle in self._remote_node.get_targeting_sensor_handles() + ] + else: + # fallback to using the repository + repo = graphene_info.context.get_repository(self._repository_selector) + schedules = repo.get_schedules_targeting(self._asset_node_snap.asset_key) + sensors = repo.get_sensors_targeting(self._asset_node_snap.asset_key) results = [] for sensor in sensors: - if not self._sensor_targets_asset(sensor, asset_graph, job_names): - continue - sensor_state = graphene_info.context.instance.get_instigator_state( sensor.get_remote_origin_id(), sensor.selector_id, ) - results.append(GrapheneSensor(sensor, repo, sensor_state)) + results.append( + GrapheneSensor( + sensor, + sensor.handle.repository_handle, + sensor_state, + ) + ) for schedule in schedules: - if schedule.job_name in job_names: - schedule_state = graphene_info.context.instance.get_instigator_state( - schedule.get_remote_origin_id(), - schedule.selector_id, + schedule_state = graphene_info.context.instance.get_instigator_state( + schedule.get_remote_origin_id(), + schedule.selector_id, + ) + results.append( + GrapheneSchedule( + schedule, + schedule.handle.repository_handle, + schedule_state, ) - results.append(GrapheneSchedule(schedule, repo, schedule_state)) + ) return results diff --git a/python_modules/dagster-graphql/dagster_graphql/schema/asset_selections.py b/python_modules/dagster-graphql/dagster_graphql/schema/asset_selections.py index d4cb97611ef2b..09c872a8d879d 100644 --- a/python_modules/dagster-graphql/dagster_graphql/schema/asset_selections.py +++ b/python_modules/dagster-graphql/dagster_graphql/schema/asset_selections.py @@ -1,15 +1,14 @@ -from functools import cached_property from typing import TYPE_CHECKING, Sequence import graphene from dagster._core.definitions.asset_key import AssetKey from dagster._core.definitions.asset_selection import AssetSelection -from dagster._core.remote_representation.external import RemoteRepository +from dagster._core.remote_representation.handle import RepositoryHandle -from dagster_graphql.implementation.fetch_assets import get_asset_nodes_by_asset_key +from dagster_graphql.implementation.fetch_assets import get_asset from dagster_graphql.implementation.utils import capture_error from dagster_graphql.schema.asset_key import GrapheneAssetKey -from dagster_graphql.schema.util import non_null_list +from dagster_graphql.schema.util import ResolveInfo, non_null_list if TYPE_CHECKING: from dagster_graphql.schema.roots.assets import GrapheneAssetConnection @@ -21,34 +20,40 @@ class GrapheneAssetSelection(graphene.ObjectType): assets = non_null_list("dagster_graphql.schema.pipelines.pipeline.GrapheneAsset") assetsOrError = graphene.NonNull("dagster_graphql.schema.roots.assets.GrapheneAssetsOrError") - def __init__(self, asset_selection: AssetSelection, remote_repository: RemoteRepository): + def __init__( + self, + asset_selection: AssetSelection, + repository_handle: RepositoryHandle, + ): self._asset_selection = asset_selection - self._remote_repository = remote_repository + self._repository_handle = repository_handle + self._resolved_keys = None - def resolve_assetSelectionString(self, _graphene_info): + def resolve_assetSelectionString(self, _graphene_info) -> str: return str(self._asset_selection) - def resolve_assetKeys(self, _graphene_info): + def resolve_assetKeys(self, graphene_info: ResolveInfo): return [ - GrapheneAssetKey(path=asset_key.path) for asset_key in self._resolved_and_sorted_keys + GrapheneAssetKey(path=asset_key.path) + for asset_key in self._get_resolved_and_sorted_keys(graphene_info) ] - def _get_assets(self, graphene_info): - from dagster_graphql.schema.pipelines.pipeline import GrapheneAsset - - asset_nodes_by_asset_key = get_asset_nodes_by_asset_key(graphene_info) + def _get_assets(self, graphene_info: ResolveInfo): return [ - GrapheneAsset(key=asset_key, definition=asset_nodes_by_asset_key.get(asset_key)) - for asset_key in self._resolved_and_sorted_keys + get_asset(graphene_info, asset_key) + for asset_key in self._get_resolved_and_sorted_keys(graphene_info) ] - def resolve_assets(self, graphene_info): + def resolve_assets(self, graphene_info: ResolveInfo): return self._get_assets(graphene_info) - @cached_property - def _resolved_and_sorted_keys(self) -> Sequence[AssetKey]: + def _get_resolved_and_sorted_keys(self, graphene_info: ResolveInfo) -> Sequence[AssetKey]: """Use this to maintain stability in ordering.""" - return sorted(self._asset_selection.resolve(self._remote_repository.asset_graph), key=str) + if self._resolved_keys is None: + repo = graphene_info.context.get_repository(self._repository_handle) + self._resolved_keys = sorted(self._asset_selection.resolve(repo.asset_graph), key=str) + + return self._resolved_keys @capture_error def resolve_assetsOrError(self, graphene_info) -> "GrapheneAssetConnection": diff --git a/python_modules/dagster-graphql/dagster_graphql/schema/external.py b/python_modules/dagster-graphql/dagster_graphql/schema/external.py index de241e12ee8a3..640ae9fdcaed1 100644 --- a/python_modules/dagster-graphql/dagster_graphql/schema/external.py +++ b/python_modules/dagster-graphql/dagster_graphql/schema/external.py @@ -311,7 +311,7 @@ def resolve_schedules(self, graphene_info: ResolveInfo): [ GrapheneSchedule( schedule, - repository, + repository.handle, batch_loader.get_schedule_state(schedule.name), batch_loader, ) @@ -326,7 +326,7 @@ def resolve_sensors(self, graphene_info: ResolveInfo, sensorType: Optional[Senso return [ GrapheneSensor( sensor, - repository, + repository.handle, batch_loader.get_sensor_state(sensor.name), batch_loader, ) diff --git a/python_modules/dagster-graphql/dagster_graphql/schema/schedules/schedules.py b/python_modules/dagster-graphql/dagster_graphql/schema/schedules/schedules.py index ba7e0666808f2..3015017d7d1ef 100644 --- a/python_modules/dagster-graphql/dagster_graphql/schema/schedules/schedules.py +++ b/python_modules/dagster-graphql/dagster_graphql/schema/schedules/schedules.py @@ -5,7 +5,7 @@ import graphene from dagster import DefaultScheduleStatus from dagster._core.remote_representation import RemoteSchedule -from dagster._core.remote_representation.external import RemoteRepository +from dagster._core.remote_representation.handle import RepositoryHandle from dagster._core.scheduler.instigation import InstigatorState, InstigatorStatus from dagster._time import get_current_timestamp @@ -66,12 +66,11 @@ class Meta: def __init__( self, remote_schedule: RemoteSchedule, - remote_repository: RemoteRepository, + repository_handle: RepositoryHandle, schedule_state: Optional[InstigatorState], batch_loader: Optional[RepositoryScopedBatchLoader] = None, ): self._remote_schedule = check.inst_param(remote_schedule, "remote_schedule", RemoteSchedule) - self._remote_repository = remote_repository # optional run loader, provided by a parent graphene object (e.g. GrapheneRepository) # that instantiates multiple schedules @@ -98,7 +97,7 @@ def __init__( description=remote_schedule.description, assetSelection=GrapheneAssetSelection( asset_selection=remote_schedule.asset_selection, - remote_repository=self._remote_repository, + repository_handle=repository_handle, ) if remote_schedule.asset_selection else None, diff --git a/python_modules/dagster-graphql/dagster_graphql/schema/sensors.py b/python_modules/dagster-graphql/dagster_graphql/schema/sensors.py index d5c112c989792..76ec746e84351 100644 --- a/python_modules/dagster-graphql/dagster_graphql/schema/sensors.py +++ b/python_modules/dagster-graphql/dagster_graphql/schema/sensors.py @@ -7,7 +7,8 @@ from dagster._core.definitions.sensor_definition import SensorType from dagster._core.errors import DagsterInvariantViolationError from dagster._core.remote_representation import RemoteSensor, TargetSnap -from dagster._core.remote_representation.external import CompoundID, RemoteRepository +from dagster._core.remote_representation.external import CompoundID +from dagster._core.remote_representation.handle import RepositoryHandle from dagster._core.scheduler.instigation import InstigatorState, InstigatorStatus from dagster._core.workspace.permissions import Permissions @@ -94,12 +95,11 @@ class Meta: def __init__( self, remote_sensor: RemoteSensor, - remote_repo: RemoteRepository, + repository_handle: RepositoryHandle, sensor_state: Optional[InstigatorState], batch_loader: Optional[RepositoryScopedBatchLoader] = None, ): self._remote_sensor = check.inst_param(remote_sensor, "remote_sensor", RemoteSensor) - self._remote_repository = remote_repo # optional run loader, provided by a parent GrapheneRepository object that instantiates # multiple sensors @@ -122,7 +122,7 @@ def __init__( sensorType=remote_sensor.sensor_type.value, assetSelection=GrapheneAssetSelection( asset_selection=remote_sensor.asset_selection, - remote_repository=self._remote_repository, + repository_handle=repository_handle, ) if remote_sensor.asset_selection else None, diff --git a/python_modules/dagster/dagster/_core/definitions/base_asset_graph.py b/python_modules/dagster/dagster/_core/definitions/base_asset_graph.py index 314690f4a53af..dbd003c0ee0d5 100644 --- a/python_modules/dagster/dagster/_core/definitions/base_asset_graph.py +++ b/python_modules/dagster/dagster/_core/definitions/base_asset_graph.py @@ -265,29 +265,29 @@ def entity_dep_graph(self) -> DependencyGraph[EntityKey]: "downstream": {node.key: node.child_entity_keys for node in self.nodes}, } - @cached_property + @property def all_asset_keys(self) -> AbstractSet[AssetKey]: - return {node.key for node in self.asset_nodes} + return set(self._asset_nodes_by_key) @cached_property def materializable_asset_keys(self) -> AbstractSet[AssetKey]: - return {node.key for node in self.asset_nodes if node.is_materializable} + return {key for key, node in self._asset_nodes_by_key.items() if node.is_materializable} @cached_property def observable_asset_keys(self) -> AbstractSet[AssetKey]: - return {node.key for node in self.asset_nodes if node.is_observable} + return {key for key, node in self._asset_nodes_by_key.items() if node.is_observable} @cached_property def external_asset_keys(self) -> AbstractSet[AssetKey]: - return {node.key for node in self.asset_nodes if node.is_external} + return {key for key, node in self._asset_nodes_by_key.items() if node.is_external} @cached_property def executable_asset_keys(self) -> AbstractSet[AssetKey]: - return {node.key for node in self.asset_nodes if node.is_executable} + return {key for key, node in self._asset_nodes_by_key.items() if node.is_executable} @cached_property def unexecutable_asset_keys(self) -> AbstractSet[AssetKey]: - return {node.key for node in self.asset_nodes if not node.is_executable} + return {key for key, node in self._asset_nodes_by_key.items() if not node.is_executable} @cached_property def toposorted_asset_keys(self) -> Sequence[AssetKey]: diff --git a/python_modules/dagster/dagster/_core/definitions/remote_asset_graph.py b/python_modules/dagster/dagster/_core/definitions/remote_asset_graph.py index 11f5b0214b168..25db24e036d37 100644 --- a/python_modules/dagster/dagster/_core/definitions/remote_asset_graph.py +++ b/python_modules/dagster/dagster/_core/definitions/remote_asset_graph.py @@ -42,7 +42,7 @@ from dagster._core.definitions.partition_mapping import PartitionMapping from dagster._core.definitions.utils import DEFAULT_GROUP_NAME from dagster._core.remote_representation.external import RemoteRepository -from dagster._core.remote_representation.handle import RepositoryHandle +from dagster._core.remote_representation.handle import InstigatorHandle, RepositoryHandle from dagster._core.workspace.workspace import WorkspaceSnapshot from dagster._record import ImportFrom, record from dagster._utils.cached_method import cached_method @@ -199,6 +199,8 @@ class RepositoryScopedAssetInfo: """ asset_node: RemoteRepositoryAssetNode + targeting_sensor_names: Sequence[str] + targeting_schedule_names: Sequence[str] @property def handle(self) -> RepositoryHandle: @@ -332,6 +334,35 @@ def resolve_to_singular_repo_scoped_node(self) -> "RemoteRepositoryAssetNode": ) ) + def get_targeting_schedule_handles( + self, + ) -> Sequence[InstigatorHandle]: + selectors = [] + for node in self.repo_scoped_asset_infos: + for schedule_name in node.targeting_schedule_names: + selectors.append( + InstigatorHandle( + repository_handle=node.handle, + instigator_name=schedule_name, + ) + ) + + return selectors + + def get_targeting_sensor_handles( + self, + ) -> Sequence[InstigatorHandle]: + selectors = [] + for node in self.repo_scoped_asset_infos: + for sensor_name in node.targeting_sensor_names: + selectors.append( + InstigatorHandle( + repository_handle=node.handle, + instigator_name=sensor_name, + ) + ) + return selectors + ##### HELPERS @cached_property @@ -573,6 +604,12 @@ def build(cls, workspace: WorkspaceSnapshot): asset_infos_by_key[key].append( RepositoryScopedAssetInfo( asset_node=asset_node, + targeting_sensor_names=[ + s.name for s in repo.get_sensors_targeting(asset_node.key) + ], + targeting_schedule_names=[ + s.name for s in repo.get_schedules_targeting(asset_node.key) + ], ) ) # NOTE: matches previous behavior of completely ignoring asset check collisions diff --git a/python_modules/dagster/dagster/_core/remote_representation/external.py b/python_modules/dagster/dagster/_core/remote_representation/external.py index 9d2a5e9b44c07..e1b271153c1ed 100644 --- a/python_modules/dagster/dagster/_core/remote_representation/external.py +++ b/python_modules/dagster/dagster/_core/remote_representation/external.py @@ -1,3 +1,4 @@ +from collections import defaultdict from datetime import datetime from functools import cached_property from threading import RLock @@ -13,6 +14,7 @@ Optional, Sequence, Set, + Tuple, Union, ) @@ -20,6 +22,7 @@ from dagster import AssetSelection from dagster._config.snap import ConfigFieldSnap, ConfigSchemaSnapshot from dagster._core.definitions.asset_check_spec import AssetCheckKey +from dagster._core.definitions.asset_job import IMPLICIT_ASSET_JOB_NAME from dagster._core.definitions.backfill_policy import BackfillPolicy from dagster._core.definitions.events import AssetKey from dagster._core.definitions.metadata import MetadataValue @@ -37,6 +40,7 @@ DefaultSensorStatus, SensorType, ) +from dagster._core.errors import DagsterError from dagster._core.execution.plan.handle import ResolvedFromDynamicStepHandle, StepHandle from dagster._core.instance import DagsterInstance from dagster._core.origin import JobPythonOrigin, RepositoryPythonOrigin @@ -87,6 +91,8 @@ from dagster._core.scheduler.instigation import InstigatorState from dagster._core.snap.execution_plan_snapshot import ExecutionStepSnap +_empty_set = frozenset() + class RemoteRepository: """RemoteRepository is a object that represents a loaded repository definition that @@ -372,13 +378,15 @@ def get_asset_node_snaps(self, job_name: Optional[str] = None) -> Sequence[Asset else self._asset_jobs.get(job_name, []) ) + @cached_property + def _asset_snaps_by_key(self) -> Mapping[AssetKey, AssetNodeSnap]: + mapping = {} + for asset_snap in self.repository_snap.asset_nodes: + mapping[asset_snap.asset_key] = asset_snap + return mapping + def get_asset_node_snap(self, asset_key: AssetKey) -> Optional[AssetNodeSnap]: - matching = [ - asset_node - for asset_node in self.repository_snap.asset_nodes - if asset_node.asset_key == asset_key - ] - return matching[0] if matching else None + return self._asset_snaps_by_key.get(asset_key) def get_asset_check_node_snaps( self, job_name: Optional[str] = None @@ -441,6 +449,72 @@ def _get_partitions_def_for_job( f" {len(unique_partitions_defs)} unique PartitionsDefinitions." ) + @cached_property + def _sensor_mappings( + self, + ) -> Tuple[ + Mapping[str, Sequence["RemoteSensor"]], + Mapping[AssetKey, Sequence["RemoteSensor"]], + ]: + asset_key_mapping = defaultdict(list) + job_name_mapping = defaultdict(list) + for sensor in self.get_sensors(): + for target in sensor.get_targets(): + job_name_mapping[target.job_name].append(sensor) + + if sensor and sensor.asset_selection: + try: + keys = sensor.asset_selection.resolve(self.asset_graph) + for key in keys: + asset_key_mapping[key].append(sensor) + except DagsterError: + pass + + return job_name_mapping, asset_key_mapping + + @property + def _sensors_by_job_name(self) -> Mapping[str, Sequence["RemoteSensor"]]: + return self._sensor_mappings[0] + + @property + def _sensors_by_asset_key(self) -> Mapping[AssetKey, Sequence["RemoteSensor"]]: + return self._sensor_mappings[1] + + @cached_property + def _schedules_by_job_name(self) -> Mapping[str, Sequence["RemoteSchedule"]]: + mapping = defaultdict(list) + for schedule in self.get_schedules(): + mapping[schedule.job_name].append(schedule) + + return mapping + + def get_sensors_targeting(self, asset_key: AssetKey) -> AbstractSet["RemoteSensor"]: + asset_snap = self.get_asset_node_snap(asset_key) + if not asset_snap: + return _empty_set + + sensors = set() + if asset_key in self._sensors_by_asset_key: + sensors.update(self._sensors_by_asset_key[asset_key]) + + for job_name in asset_snap.job_names: + if job_name != IMPLICIT_ASSET_JOB_NAME and job_name in self._sensors_by_job_name: + sensors.update(self._sensors_by_job_name[job_name]) + + return sensors + + def get_schedules_targeting(self, asset_key: AssetKey) -> AbstractSet["RemoteSchedule"]: + asset_snap = self.get_asset_node_snap(asset_key) + if not asset_snap: + return _empty_set + + schedules = set() + for job_name in asset_snap.job_names: + if job_name != IMPLICIT_ASSET_JOB_NAME and job_name in self._schedules_by_job_name: + schedules.update(self._schedules_by_job_name[job_name]) + + return schedules + class RemoteJob(RepresentedJob): """RemoteJob is a object that represents a loaded job definition that diff --git a/python_modules/dagster/dagster/_core/workspace/context.py b/python_modules/dagster/dagster/_core/workspace/context.py index e13e3ee60cfbf..d72fd89591a12 100644 --- a/python_modules/dagster/dagster/_core/workspace/context.py +++ b/python_modules/dagster/dagster/_core/workspace/context.py @@ -36,13 +36,18 @@ RemoteJob, RepositoryHandle, ) -from dagster._core.remote_representation.external import RemoteRepository +from dagster._core.remote_representation.external import ( + RemoteRepository, + RemoteSchedule, + RemoteSensor, +) from dagster._core.remote_representation.grpc_server_registry import GrpcServerRegistry from dagster._core.remote_representation.grpc_server_state_subscriber import ( LocationStateChangeEvent, LocationStateChangeEventType, LocationStateSubscriber, ) +from dagster._core.remote_representation.handle import InstigatorHandle from dagster._core.remote_representation.origin import ( GrpcServerCodeLocationOrigin, ManagedGrpcPythonEnvCodeLocationOrigin, @@ -341,11 +346,27 @@ def get_asset_node(self, asset_key: AssetKey) -> Optional["RemoteWorkspaceAssetN return self.get_workspace_snapshot().asset_graph.get(asset_key) - def get_repository(self, selector: RepositorySelector) -> RemoteRepository: + def get_repository( + self, selector: Union[RepositorySelector, RepositoryHandle] + ) -> RemoteRepository: return self.get_code_location(selector.location_name).get_repository( selector.repository_name ) + def get_sensor(self, selector: InstigatorHandle) -> RemoteSensor: + return ( + self.get_code_location(selector.location_name) + .get_repository(selector.repository_name) + .get_sensor(selector.instigator_name) + ) + + def get_schedule(self, selector: InstigatorHandle) -> RemoteSchedule: + return ( + self.get_code_location(selector.location_name) + .get_repository(selector.repository_name) + .get_schedule(selector.instigator_name) + ) + class WorkspaceRequestContext(BaseWorkspaceRequestContext): def __init__(