From e1b34f715ac220dbd72a7f609f7664935ab73c55 Mon Sep 17 00:00:00 2001 From: alangenfeld Date: Wed, 13 Mar 2024 21:17:00 -0500 Subject: [PATCH] move RemoteAssetGraph.from_external_repository to cached property on ExternalRepository --- .../dagster_graphql/schema/asset_graph.py | 8 ++++---- .../dagster_graphql/schema/asset_selections.py | 5 ++--- .../dagster_graphql/schema/external.py | 3 +-- .../dagster_graphql/schema/roots/query.py | 2 +- .../graphql/test_partition_backfill.py | 6 +++--- .../_core/definitions/asset_graph_differ.py | 4 ++-- .../_core/remote_representation/external.py | 17 ++++++++++++++--- .../dagster/dagster/_daemon/asset_daemon.py | 2 +- .../test_default_auto_materialize_sensors.py | 5 ++--- 9 files changed, 30 insertions(+), 22 deletions(-) diff --git a/python_modules/dagster-graphql/dagster_graphql/schema/asset_graph.py b/python_modules/dagster-graphql/dagster_graphql/schema/asset_graph.py index b10e6d245c7ae..fe41f59afabb5 100644 --- a/python_modules/dagster-graphql/dagster_graphql/schema/asset_graph.py +++ b/python_modules/dagster-graphql/dagster_graphql/schema/asset_graph.py @@ -567,7 +567,7 @@ def resolve_assetMaterializationUsedData( return [] instance = graphene_info.context.instance - asset_graph = RemoteAssetGraph.from_external_repository(self._external_repository) + asset_graph = self._external_repository.remote_asset_graph asset_key = self._external_asset_node.asset_key # in the future, we can share this same CachingInstanceQueryer across all @@ -885,7 +885,7 @@ def resolve_freshnessInfo( self, graphene_info: ResolveInfo ) -> Optional[GrapheneAssetFreshnessInfo]: if self._external_asset_node.freshness_policy: - asset_graph = RemoteAssetGraph.from_external_repository(self._external_repository) + asset_graph = self._external_repository.remote_asset_graph return get_freshness_info( asset_key=self._external_asset_node.asset_key, # in the future, we can share this same CachingInstanceQueryer across all @@ -929,7 +929,7 @@ def resolve_targetingInstigators(self, graphene_info) -> Sequence[GrapheneSensor external_sensors = self._external_repository.get_external_sensors() external_schedules = self._external_repository.get_external_schedules() - asset_graph = RemoteAssetGraph.from_external_repository(self._external_repository) + asset_graph = self._external_repository.remote_asset_graph job_names = { job_name @@ -959,7 +959,7 @@ def resolve_targetingInstigators(self, graphene_info) -> Sequence[GrapheneSensor return results def _get_auto_materialize_external_sensor(self) -> Optional[ExternalSensor]: - asset_graph = RemoteAssetGraph.from_external_repository(self._external_repository) + asset_graph = self._external_repository.remote_asset_graph asset_key = self._external_asset_node.asset_key matching_sensors = [ diff --git a/python_modules/dagster-graphql/dagster_graphql/schema/asset_selections.py b/python_modules/dagster-graphql/dagster_graphql/schema/asset_selections.py index 749f89c7554c6..134e24b92e20a 100644 --- a/python_modules/dagster-graphql/dagster_graphql/schema/asset_selections.py +++ b/python_modules/dagster-graphql/dagster_graphql/schema/asset_selections.py @@ -1,6 +1,5 @@ import graphene from dagster._core.definitions.asset_selection import AssetSelection -from dagster._core.definitions.remote_asset_graph import RemoteAssetGraph from dagster._core.remote_representation.external import ExternalRepository from ..implementation.fetch_assets import get_asset_nodes_by_asset_key @@ -21,7 +20,7 @@ def resolve_assetSelectionString(self, _graphene_info): return str(self._asset_selection) def resolve_assetKeys(self, _graphene_info): - asset_graph = RemoteAssetGraph.from_external_repository(self._external_repository) + asset_graph = self._external_repository.remote_asset_graph return [ GrapheneAssetKey(path=asset_key.path) for asset_key in self._asset_selection.resolve(asset_graph) @@ -30,7 +29,7 @@ def resolve_assetKeys(self, _graphene_info): def resolve_assets(self, graphene_info): from dagster_graphql.schema.pipelines.pipeline import GrapheneAsset - asset_graph = RemoteAssetGraph.from_external_repository(self._external_repository) + asset_graph = self._external_repository.remote_asset_graph asset_nodes_by_asset_key = get_asset_nodes_by_asset_key(graphene_info) return [ diff --git a/python_modules/dagster-graphql/dagster_graphql/schema/external.py b/python_modules/dagster-graphql/dagster_graphql/schema/external.py index 158cf89b72294..36945fe567456 100644 --- a/python_modules/dagster-graphql/dagster_graphql/schema/external.py +++ b/python_modules/dagster-graphql/dagster_graphql/schema/external.py @@ -8,7 +8,6 @@ ) from dagster._core.definitions.asset_graph_differ import AssetGraphDiffer from dagster._core.definitions.partition import CachingDynamicPartitionsLoader -from dagster._core.definitions.remote_asset_graph import RemoteAssetGraph from dagster._core.definitions.sensor_definition import ( SensorType, ) @@ -275,7 +274,7 @@ def __init__( self._batch_loader = RepositoryScopedBatchLoader(instance, repository) self._stale_status_loader = StaleStatusLoader( instance=instance, - asset_graph=lambda: RemoteAssetGraph.from_external_repository(repository), + asset_graph=lambda: repository.remote_asset_graph, ) self._dynamic_partitions_loader = CachingDynamicPartitionsLoader(instance) diff --git a/python_modules/dagster-graphql/dagster_graphql/schema/roots/query.py b/python_modules/dagster-graphql/dagster_graphql/schema/roots/query.py index 16b796c177e2b..8698ada8d24d0 100644 --- a/python_modules/dagster-graphql/dagster_graphql/schema/roots/query.py +++ b/python_modules/dagster-graphql/dagster_graphql/schema/roots/query.py @@ -952,7 +952,7 @@ def resolve_assetNodes( def load_asset_graph() -> RemoteAssetGraph: if repo is not None: - return RemoteAssetGraph.from_external_repository(repo) + return repo.remote_asset_graph else: return RemoteAssetGraph.from_workspace(graphene_info.context) diff --git a/python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_partition_backfill.py b/python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_partition_backfill.py index 8fc251c330397..0c28cd98ac2ec 100644 --- a/python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_partition_backfill.py +++ b/python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_partition_backfill.py @@ -555,7 +555,7 @@ def test_cancel_asset_backfill(self, graphql_context): # since launching the run will cause test process will hang forever. code_location = graphql_context.get_code_location("test") repository = code_location.get_repository("test_repo") - asset_graph = RemoteAssetGraph.from_external_repository(repository) + asset_graph = repository.remote_asset_graph _execute_asset_backfill_iteration_no_side_effects(graphql_context, backfill_id, asset_graph) # Launch the run that runs forever @@ -793,7 +793,7 @@ def test_asset_backfill_partition_stats(self, graphql_context): code_location = graphql_context.get_code_location("test") repository = code_location.get_repository("test_repo") - asset_graph = RemoteAssetGraph.from_external_repository(repository) + asset_graph = repository.remote_asset_graph _execute_asset_backfill_iteration_no_side_effects(graphql_context, backfill_id, asset_graph) @@ -836,7 +836,7 @@ def test_asset_backfill_partition_stats(self, graphql_context): def test_asset_backfill_status_with_upstream_failure(self, graphql_context): code_location = graphql_context.get_code_location("test") repository = code_location.get_repository("test_repo") - asset_graph = RemoteAssetGraph.from_external_repository(repository) + asset_graph = repository.remote_asset_graph asset_keys = [ AssetKey("unpartitioned_upstream_of_partitioned"), diff --git a/python_modules/dagster/dagster/_core/definitions/asset_graph_differ.py b/python_modules/dagster/dagster/_core/definitions/asset_graph_differ.py index 6c8f7bf89612d..f008928cb4d24 100644 --- a/python_modules/dagster/dagster/_core/definitions/asset_graph_differ.py +++ b/python_modules/dagster/dagster/_core/definitions/asset_graph_differ.py @@ -103,8 +103,8 @@ def from_external_repositories( base_workspace, code_location_name, repository_name ) return AssetGraphDiffer( - branch_asset_graph=lambda: RemoteAssetGraph.from_external_repository(branch_repo), - base_asset_graph=(lambda: RemoteAssetGraph.from_external_repository(base_repo)) + branch_asset_graph=lambda: branch_repo.remote_asset_graph, + base_asset_graph=(lambda: base_repo.remote_asset_graph) if base_repo is not None else None, ) diff --git a/python_modules/dagster/dagster/_core/remote_representation/external.py b/python_modules/dagster/dagster/_core/remote_representation/external.py index 244a82d2cee92..df3c4f24ba291 100644 --- a/python_modules/dagster/dagster/_core/remote_representation/external.py +++ b/python_modules/dagster/dagster/_core/remote_representation/external.py @@ -1,4 +1,5 @@ from datetime import datetime +from functools import cached_property from threading import RLock from typing import ( TYPE_CHECKING, @@ -75,6 +76,7 @@ from .represented import RepresentedJob if TYPE_CHECKING: + from dagster._core.definitions.remote_asset_graph import RemoteAssetGraph from dagster._core.scheduler.instigation import InstigatorState from dagster._core.snap.execution_plan_snapshot import ExecutionStepSnap @@ -186,15 +188,13 @@ def get_default_auto_materialize_sensor_name(self): @property @cached_method def _external_sensors(self) -> Dict[str, "ExternalSensor"]: - from dagster._core.definitions.remote_asset_graph import RemoteAssetGraph - sensor_datas = { external_sensor_data.name: ExternalSensor(external_sensor_data, self._handle) for external_sensor_data in self.external_repository_data.external_sensor_datas } if self._instance.auto_materialize_use_sensors: - asset_graph = RemoteAssetGraph.from_external_repository(self) + asset_graph = self.remote_asset_graph has_any_auto_observe_source_assets = False @@ -373,6 +373,17 @@ def get_external_asset_checks( def get_display_metadata(self) -> Mapping[str, str]: return self.handle.display_metadata + @cached_property + def remote_asset_graph(self) -> "RemoteAssetGraph": + from dagster._core.definitions.remote_asset_graph import RemoteAssetGraph + + return RemoteAssetGraph.from_repository_handles_and_external_asset_nodes( + repo_handle_external_asset_nodes=[ + (self.handle, asset_node) for asset_node in self.get_external_asset_nodes() + ], + external_asset_checks=self.get_external_asset_checks(), + ) + class ExternalJob(RepresentedJob): """ExternalJob is a object that represents a loaded job definition that diff --git a/python_modules/dagster/dagster/_daemon/asset_daemon.py b/python_modules/dagster/dagster/_daemon/asset_daemon.py index a27e48e59b2c2..485cf893a4095 100644 --- a/python_modules/dagster/dagster/_daemon/asset_daemon.py +++ b/python_modules/dagster/dagster/_daemon/asset_daemon.py @@ -672,7 +672,7 @@ def _process_auto_materialize_tick_generator( if sensor: eligible_keys = check.not_none(sensor.asset_selection).resolve( - RemoteAssetGraph.from_external_repository(check.not_none(repository)) + check.not_none(repository).remote_asset_graph ) else: eligible_keys = { diff --git a/python_modules/dagster/dagster_tests/definitions_tests/asset_policy_sensors_tests/test_default_auto_materialize_sensors.py b/python_modules/dagster/dagster_tests/definitions_tests/asset_policy_sensors_tests/test_default_auto_materialize_sensors.py index 151e58bd34c01..9d776b6246e21 100644 --- a/python_modules/dagster/dagster_tests/definitions_tests/asset_policy_sensors_tests/test_default_auto_materialize_sensors.py +++ b/python_modules/dagster/dagster_tests/definitions_tests/asset_policy_sensors_tests/test_default_auto_materialize_sensors.py @@ -14,7 +14,6 @@ from dagster._core.definitions.auto_materialize_sensor_definition import ( AutoMaterializeSensorDefinition, ) -from dagster._core.definitions.remote_asset_graph import RemoteAssetGraph from dagster._core.definitions.sensor_definition import ( SensorType, ) @@ -223,7 +222,7 @@ def test_combine_default_sensors_with_non_default_sensors(instance_with_auto_mat assert external_repo.has_external_sensor("default_auto_materialize_sensor") assert external_repo.has_external_sensor("my_custom_policy_sensor") - asset_graph = RemoteAssetGraph.from_external_repository(external_repo) + asset_graph = external_repo.remote_asset_graph # default sensor includes all assets that weren't covered by the custom one @@ -293,7 +292,7 @@ def test_custom_sensors_cover_all(instance_with_auto_materialize_sensors): assert external_repo.has_external_sensor("normal_sensor") assert external_repo.has_external_sensor("my_custom_policy_sensor") - asset_graph = RemoteAssetGraph.from_external_repository(external_repo) + asset_graph = external_repo.remote_asset_graph # Custom sensor covered all the valid assets custom_sensor = external_repo.get_external_sensor("my_custom_policy_sensor")