diff --git a/python_modules/dagster-graphql/dagster_graphql/schema/asset_graph.py b/python_modules/dagster-graphql/dagster_graphql/schema/asset_graph.py index b10e6d245c7ae..01eb641ebd8ad 100644 --- a/python_modules/dagster-graphql/dagster_graphql/schema/asset_graph.py +++ b/python_modules/dagster-graphql/dagster_graphql/schema/asset_graph.py @@ -567,7 +567,7 @@ def resolve_assetMaterializationUsedData( return [] instance = graphene_info.context.instance - asset_graph = RemoteAssetGraph.from_external_repository(self._external_repository) + asset_graph = self._external_repository.asset_graph asset_key = self._external_asset_node.asset_key # in the future, we can share this same CachingInstanceQueryer across all @@ -885,7 +885,7 @@ def resolve_freshnessInfo( self, graphene_info: ResolveInfo ) -> Optional[GrapheneAssetFreshnessInfo]: if self._external_asset_node.freshness_policy: - asset_graph = RemoteAssetGraph.from_external_repository(self._external_repository) + asset_graph = self._external_repository.asset_graph return get_freshness_info( asset_key=self._external_asset_node.asset_key, # in the future, we can share this same CachingInstanceQueryer across all @@ -929,7 +929,7 @@ def resolve_targetingInstigators(self, graphene_info) -> Sequence[GrapheneSensor external_sensors = self._external_repository.get_external_sensors() external_schedules = self._external_repository.get_external_schedules() - asset_graph = RemoteAssetGraph.from_external_repository(self._external_repository) + asset_graph = self._external_repository.asset_graph job_names = { job_name @@ -959,7 +959,7 @@ def resolve_targetingInstigators(self, graphene_info) -> Sequence[GrapheneSensor return results def _get_auto_materialize_external_sensor(self) -> Optional[ExternalSensor]: - asset_graph = RemoteAssetGraph.from_external_repository(self._external_repository) + asset_graph = self._external_repository.asset_graph asset_key = self._external_asset_node.asset_key matching_sensors = [ diff --git a/python_modules/dagster-graphql/dagster_graphql/schema/asset_selections.py b/python_modules/dagster-graphql/dagster_graphql/schema/asset_selections.py index 749f89c7554c6..f8f691cad40b3 100644 --- a/python_modules/dagster-graphql/dagster_graphql/schema/asset_selections.py +++ b/python_modules/dagster-graphql/dagster_graphql/schema/asset_selections.py @@ -1,6 +1,5 @@ import graphene from dagster._core.definitions.asset_selection import AssetSelection -from dagster._core.definitions.remote_asset_graph import RemoteAssetGraph from dagster._core.remote_representation.external import ExternalRepository from ..implementation.fetch_assets import get_asset_nodes_by_asset_key @@ -21,7 +20,7 @@ def resolve_assetSelectionString(self, _graphene_info): return str(self._asset_selection) def resolve_assetKeys(self, _graphene_info): - asset_graph = RemoteAssetGraph.from_external_repository(self._external_repository) + asset_graph = self._external_repository.asset_graph return [ GrapheneAssetKey(path=asset_key.path) for asset_key in self._asset_selection.resolve(asset_graph) @@ -30,7 +29,7 @@ def resolve_assetKeys(self, _graphene_info): def resolve_assets(self, graphene_info): from dagster_graphql.schema.pipelines.pipeline import GrapheneAsset - asset_graph = RemoteAssetGraph.from_external_repository(self._external_repository) + asset_graph = self._external_repository.asset_graph asset_nodes_by_asset_key = get_asset_nodes_by_asset_key(graphene_info) return [ diff --git a/python_modules/dagster-graphql/dagster_graphql/schema/external.py b/python_modules/dagster-graphql/dagster_graphql/schema/external.py index 158cf89b72294..cb7e82772c2f7 100644 --- a/python_modules/dagster-graphql/dagster_graphql/schema/external.py +++ b/python_modules/dagster-graphql/dagster_graphql/schema/external.py @@ -8,7 +8,6 @@ ) from dagster._core.definitions.asset_graph_differ import AssetGraphDiffer from dagster._core.definitions.partition import CachingDynamicPartitionsLoader -from dagster._core.definitions.remote_asset_graph import RemoteAssetGraph from dagster._core.definitions.sensor_definition import ( SensorType, ) @@ -275,7 +274,7 @@ def __init__( self._batch_loader = RepositoryScopedBatchLoader(instance, repository) self._stale_status_loader = StaleStatusLoader( instance=instance, - asset_graph=lambda: RemoteAssetGraph.from_external_repository(repository), + asset_graph=lambda: repository.asset_graph, ) self._dynamic_partitions_loader = CachingDynamicPartitionsLoader(instance) diff --git a/python_modules/dagster-graphql/dagster_graphql/schema/roots/query.py b/python_modules/dagster-graphql/dagster_graphql/schema/roots/query.py index 16b796c177e2b..71dcb977866c0 100644 --- a/python_modules/dagster-graphql/dagster_graphql/schema/roots/query.py +++ b/python_modules/dagster-graphql/dagster_graphql/schema/roots/query.py @@ -952,7 +952,7 @@ def resolve_assetNodes( def load_asset_graph() -> RemoteAssetGraph: if repo is not None: - return RemoteAssetGraph.from_external_repository(repo) + return repo.asset_graph else: return RemoteAssetGraph.from_workspace(graphene_info.context) diff --git a/python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_partition_backfill.py b/python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_partition_backfill.py index 8fc251c330397..41de1c3b1724b 100644 --- a/python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_partition_backfill.py +++ b/python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_partition_backfill.py @@ -555,7 +555,7 @@ def test_cancel_asset_backfill(self, graphql_context): # since launching the run will cause test process will hang forever. code_location = graphql_context.get_code_location("test") repository = code_location.get_repository("test_repo") - asset_graph = RemoteAssetGraph.from_external_repository(repository) + asset_graph = repository.asset_graph _execute_asset_backfill_iteration_no_side_effects(graphql_context, backfill_id, asset_graph) # Launch the run that runs forever @@ -793,7 +793,7 @@ def test_asset_backfill_partition_stats(self, graphql_context): code_location = graphql_context.get_code_location("test") repository = code_location.get_repository("test_repo") - asset_graph = RemoteAssetGraph.from_external_repository(repository) + asset_graph = repository.asset_graph _execute_asset_backfill_iteration_no_side_effects(graphql_context, backfill_id, asset_graph) @@ -836,7 +836,7 @@ def test_asset_backfill_partition_stats(self, graphql_context): def test_asset_backfill_status_with_upstream_failure(self, graphql_context): code_location = graphql_context.get_code_location("test") repository = code_location.get_repository("test_repo") - asset_graph = RemoteAssetGraph.from_external_repository(repository) + asset_graph = repository.asset_graph asset_keys = [ AssetKey("unpartitioned_upstream_of_partitioned"), diff --git a/python_modules/dagster/dagster/_core/definitions/asset_graph_differ.py b/python_modules/dagster/dagster/_core/definitions/asset_graph_differ.py index 6c8f7bf89612d..6c40c881a7c8b 100644 --- a/python_modules/dagster/dagster/_core/definitions/asset_graph_differ.py +++ b/python_modules/dagster/dagster/_core/definitions/asset_graph_differ.py @@ -103,10 +103,8 @@ def from_external_repositories( base_workspace, code_location_name, repository_name ) return AssetGraphDiffer( - branch_asset_graph=lambda: RemoteAssetGraph.from_external_repository(branch_repo), - base_asset_graph=(lambda: RemoteAssetGraph.from_external_repository(base_repo)) - if base_repo is not None - else None, + branch_asset_graph=lambda: branch_repo.asset_graph, + base_asset_graph=(lambda: base_repo.asset_graph) if base_repo is not None else None, ) def _compare_base_and_branch_assets(self, asset_key: "AssetKey") -> Sequence[ChangeReason]: diff --git a/python_modules/dagster/dagster/_core/definitions/remote_asset_graph.py b/python_modules/dagster/dagster/_core/definitions/remote_asset_graph.py index d0e2cfffa441c..3e4271a64e894 100644 --- a/python_modules/dagster/dagster/_core/definitions/remote_asset_graph.py +++ b/python_modules/dagster/dagster/_core/definitions/remote_asset_graph.py @@ -237,18 +237,6 @@ def from_workspace(cls, context: IWorkspace) -> "RemoteAssetGraph": external_asset_checks=asset_checks, ) - @classmethod - def from_external_repository( - cls, external_repository: ExternalRepository - ) -> "RemoteAssetGraph": - return cls.from_repository_handles_and_external_asset_nodes( - repo_handle_external_asset_nodes=[ - (external_repository.handle, asset_node) - for asset_node in external_repository.get_external_asset_nodes() - ], - external_asset_checks=external_repository.get_external_asset_checks(), - ) - @classmethod def from_repository_handles_and_external_asset_nodes( cls, diff --git a/python_modules/dagster/dagster/_core/remote_representation/external.py b/python_modules/dagster/dagster/_core/remote_representation/external.py index 244a82d2cee92..131ac76ab82e6 100644 --- a/python_modules/dagster/dagster/_core/remote_representation/external.py +++ b/python_modules/dagster/dagster/_core/remote_representation/external.py @@ -1,4 +1,5 @@ from datetime import datetime +from functools import cached_property from threading import RLock from typing import ( TYPE_CHECKING, @@ -75,6 +76,7 @@ from .represented import RepresentedJob if TYPE_CHECKING: + from dagster._core.definitions.remote_asset_graph import RemoteAssetGraph from dagster._core.scheduler.instigation import InstigatorState from dagster._core.snap.execution_plan_snapshot import ExecutionStepSnap @@ -186,15 +188,13 @@ def get_default_auto_materialize_sensor_name(self): @property @cached_method def _external_sensors(self) -> Dict[str, "ExternalSensor"]: - from dagster._core.definitions.remote_asset_graph import RemoteAssetGraph - sensor_datas = { external_sensor_data.name: ExternalSensor(external_sensor_data, self._handle) for external_sensor_data in self.external_repository_data.external_sensor_datas } if self._instance.auto_materialize_use_sensors: - asset_graph = RemoteAssetGraph.from_external_repository(self) + asset_graph = self.asset_graph has_any_auto_observe_source_assets = False @@ -373,6 +373,18 @@ def get_external_asset_checks( def get_display_metadata(self) -> Mapping[str, str]: return self.handle.display_metadata + @cached_property + def asset_graph(self) -> "RemoteAssetGraph": + """Returns a repository scoped RemoteAssetGraph.""" + from dagster._core.definitions.remote_asset_graph import RemoteAssetGraph + + return RemoteAssetGraph.from_repository_handles_and_external_asset_nodes( + repo_handle_external_asset_nodes=[ + (self.handle, asset_node) for asset_node in self.get_external_asset_nodes() + ], + external_asset_checks=self.get_external_asset_checks(), + ) + class ExternalJob(RepresentedJob): """ExternalJob is a object that represents a loaded job definition that diff --git a/python_modules/dagster/dagster/_daemon/asset_daemon.py b/python_modules/dagster/dagster/_daemon/asset_daemon.py index a27e48e59b2c2..800d35369384e 100644 --- a/python_modules/dagster/dagster/_daemon/asset_daemon.py +++ b/python_modules/dagster/dagster/_daemon/asset_daemon.py @@ -672,7 +672,7 @@ def _process_auto_materialize_tick_generator( if sensor: eligible_keys = check.not_none(sensor.asset_selection).resolve( - RemoteAssetGraph.from_external_repository(check.not_none(repository)) + check.not_none(repository).asset_graph ) else: eligible_keys = { diff --git a/python_modules/dagster/dagster_tests/definitions_tests/asset_policy_sensors_tests/test_default_auto_materialize_sensors.py b/python_modules/dagster/dagster_tests/definitions_tests/asset_policy_sensors_tests/test_default_auto_materialize_sensors.py index 151e58bd34c01..d4cb26ff20d19 100644 --- a/python_modules/dagster/dagster_tests/definitions_tests/asset_policy_sensors_tests/test_default_auto_materialize_sensors.py +++ b/python_modules/dagster/dagster_tests/definitions_tests/asset_policy_sensors_tests/test_default_auto_materialize_sensors.py @@ -14,7 +14,6 @@ from dagster._core.definitions.auto_materialize_sensor_definition import ( AutoMaterializeSensorDefinition, ) -from dagster._core.definitions.remote_asset_graph import RemoteAssetGraph from dagster._core.definitions.sensor_definition import ( SensorType, ) @@ -223,7 +222,7 @@ def test_combine_default_sensors_with_non_default_sensors(instance_with_auto_mat assert external_repo.has_external_sensor("default_auto_materialize_sensor") assert external_repo.has_external_sensor("my_custom_policy_sensor") - asset_graph = RemoteAssetGraph.from_external_repository(external_repo) + asset_graph = external_repo.asset_graph # default sensor includes all assets that weren't covered by the custom one @@ -293,7 +292,7 @@ def test_custom_sensors_cover_all(instance_with_auto_materialize_sensors): assert external_repo.has_external_sensor("normal_sensor") assert external_repo.has_external_sensor("my_custom_policy_sensor") - asset_graph = RemoteAssetGraph.from_external_repository(external_repo) + asset_graph = external_repo.asset_graph # Custom sensor covered all the valid assets custom_sensor = external_repo.get_external_sensor("my_custom_policy_sensor")