Skip to content

Commit

Permalink
move RemoteAssetGraph.from_workspace to cached property on IWorkspace (
Browse files Browse the repository at this point in the history
…#20469)

same logic as #20468

## How I Tested These Changes

existing coverage
  • Loading branch information
alangenfeld authored and PedramNavid committed Mar 28, 2024
1 parent 27c0ef2 commit 7882f14
Show file tree
Hide file tree
Showing 15 changed files with 112 additions and 119 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
from dagster._core.definitions.asset_check_evaluation import AssetCheckEvaluation
from dagster._core.definitions.asset_check_spec import AssetCheckKey
from dagster._core.definitions.events import AssetKey
from dagster._core.definitions.remote_asset_graph import RemoteAssetGraph
from dagster._core.definitions.selector import RepositorySelector
from dagster._core.instance import DagsterInstance
from dagster._core.remote_representation.code_location import CodeLocation
Expand Down Expand Up @@ -125,7 +124,7 @@ def _fetch_checks(
self._context.instance, check_keys=all_check_keys
)

asset_graph = RemoteAssetGraph.from_workspace(self._context)
asset_graph = self._context.asset_graph
graphene_checks: Mapping[AssetKey, AssetChecksOrErrorUnion] = {}
for asset_key in self._asset_keys:
if asset_key in errors:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import dagster._check as check
import pendulum
from dagster._core.definitions.remote_asset_graph import RemoteAssetGraph
from dagster._core.definitions.selector import PartitionsByAssetSelector, RepositorySelector
from dagster._core.errors import (
DagsterError,
Expand Down Expand Up @@ -46,7 +45,7 @@ def get_asset_backfill_preview(
) -> Sequence["GrapheneAssetPartitions"]:
from ...schema.backfill import GrapheneAssetPartitions

asset_graph = RemoteAssetGraph.from_workspace(graphene_info.context)
asset_graph = graphene_info.context.asset_graph

check.invariant(backfill_preview_params.get("assetSelection") is not None)
check.invariant(backfill_preview_params.get("partitionNames") is not None)
Expand Down Expand Up @@ -198,7 +197,7 @@ def create_and_launch_partition_backfill(
if backfill_params.get("fromFailure"):
raise DagsterError("fromFailure is not supported for pure asset backfills")

asset_graph = RemoteAssetGraph.from_workspace(graphene_info.context)
asset_graph = graphene_info.context.asset_graph

assert_permission_for_asset_graph(
graphene_info, asset_graph, asset_selection, Permissions.LAUNCH_PARTITION_BACKFILL
Expand Down Expand Up @@ -227,7 +226,7 @@ def create_and_launch_partition_backfill(
if backfill_params.get("fromFailure"):
raise DagsterError("fromFailure is not supported for pure asset backfills")

asset_graph = RemoteAssetGraph.from_workspace(graphene_info.context)
asset_graph = graphene_info.context.asset_graph
assert_permission_for_asset_graph(
graphene_info, asset_graph, asset_selection, Permissions.LAUNCH_PARTITION_BACKFILL
)
Expand Down Expand Up @@ -265,7 +264,7 @@ def cancel_partition_backfill(
check.failed(f"No backfill found for id: {backfill_id}")

if backfill.is_asset_backfill:
asset_graph = RemoteAssetGraph.from_workspace(graphene_info.context)
asset_graph = graphene_info.context.asset_graph
assert_permission_for_asset_graph(
graphene_info,
asset_graph,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
PartitionsDefinition,
PartitionsSubset,
)
from dagster._core.definitions.remote_asset_graph import RemoteAssetGraph
from dagster._core.definitions.time_window_partitions import (
BaseTimeWindowPartitionsSubset,
PartitionRangeStatus,
Expand Down Expand Up @@ -211,7 +210,7 @@ def get_asset_nodes_by_asset_key(

stale_status_loader = StaleStatusLoader(
instance=graphene_info.context.instance,
asset_graph=lambda: RemoteAssetGraph.from_workspace(graphene_info.context),
asset_graph=lambda: graphene_info.context.asset_graph,
)

dynamic_partitions_loader = CachingDynamicPartitionsLoader(graphene_info.context.instance)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import dagster._check as check
import graphene
from dagster._core.definitions.events import AssetKey
from dagster._core.definitions.remote_asset_graph import RemoteAssetGraph
from dagster._core.errors import DagsterInvariantViolationError
from dagster._core.nux import get_has_seen_nux, set_nux_seen
from dagster._core.workspace.permissions import Permissions
Expand Down Expand Up @@ -727,7 +726,7 @@ def mutate(

reporting_user_tags = {**graphene_info.context.get_reporting_user_tags()}

asset_graph = RemoteAssetGraph.from_workspace(graphene_info.context)
asset_graph = graphene_info.context.asset_graph

assert_permission_for_asset_graph(
graphene_info, asset_graph, [asset_key], Permissions.REPORT_RUNLESS_ASSET_EVENTS
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -954,7 +954,7 @@ def load_asset_graph() -> RemoteAssetGraph:
if repo is not None:
return repo.asset_graph
else:
return RemoteAssetGraph.from_workspace(graphene_info.context)
return graphene_info.context.asset_graph

stale_status_loader = StaleStatusLoader(
instance=graphene_info.context.instance,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
from dagster._core.definitions.utils import DEFAULT_GROUP_NAME
from dagster._core.remote_representation.external import ExternalRepository
from dagster._core.remote_representation.handle import RepositoryHandle
from dagster._core.workspace.workspace import IWorkspace

from .backfill_policy import BackfillPolicy
from .base_asset_graph import AssetKeyOrCheckKey, BaseAssetGraph, BaseAssetNode
Expand Down Expand Up @@ -209,34 +208,6 @@ def __init__(
self._asset_checks_by_key = asset_checks_by_key
self._asset_check_execution_sets_by_key = asset_check_execution_sets_by_key

@classmethod
def from_workspace(cls, context: IWorkspace) -> "RemoteAssetGraph":
code_locations = (
location_entry.code_location
for location_entry in context.get_workspace_snapshot().values()
if location_entry.code_location
)
repos = (
repo
for code_location in code_locations
for repo in code_location.get_repositories().values()
)
repo_handle_external_asset_nodes: Sequence[
Tuple[RepositoryHandle, "ExternalAssetNode"]
] = []
asset_checks: Sequence["ExternalAssetCheck"] = []

for repo in repos:
for external_asset_node in repo.get_external_asset_nodes():
repo_handle_external_asset_nodes.append((repo.handle, external_asset_node))

asset_checks.extend(repo.get_external_asset_checks())

return cls.from_repository_handles_and_external_asset_nodes(
repo_handle_external_asset_nodes=repo_handle_external_asset_nodes,
external_asset_checks=asset_checks,
)

@classmethod
def from_repository_handles_and_external_asset_nodes(
cls,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -883,8 +883,7 @@ def execute_asset_backfill_iteration(
logger.info(f"Evaluating asset backfill {backfill.backfill_id}")

workspace_context = workspace_process_context.create_request_context()

asset_graph = RemoteAssetGraph.from_workspace(workspace_context)
asset_graph = workspace_context.asset_graph

if not backfill.is_asset_backfill:
check.failed("Backfill must be an asset backfill")
Expand Down
13 changes: 6 additions & 7 deletions python_modules/dagster/dagster/_core/execution/backfill.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
from dagster._core.definitions import AssetKey
from dagster._core.definitions.base_asset_graph import BaseAssetGraph
from dagster._core.definitions.partition import PartitionsSubset
from dagster._core.definitions.remote_asset_graph import RemoteAssetGraph
from dagster._core.errors import DagsterDefinitionChangedDeserializationError
from dagster._core.execution.bulk_actions import BulkActionType
from dagster._core.instance import DynamicPartitionsStore
Expand Down Expand Up @@ -154,7 +153,7 @@ def is_valid_serialization(self, workspace: IWorkspace) -> bool:
if self.serialized_asset_backfill_data:
return AssetBackfillData.is_valid_serialization(
self.serialized_asset_backfill_data,
RemoteAssetGraph.from_workspace(workspace),
workspace.asset_graph,
)
else:
return True
Expand All @@ -171,7 +170,7 @@ def get_backfill_status_per_asset_key(
return []

if self.is_asset_backfill:
asset_graph = RemoteAssetGraph.from_workspace(workspace)
asset_graph = workspace.asset_graph
try:
asset_backfill_data = self.get_asset_backfill_data(asset_graph)
except DagsterDefinitionChangedDeserializationError:
Expand All @@ -188,7 +187,7 @@ def get_target_partitions_subset(
return None

if self.is_asset_backfill:
asset_graph = RemoteAssetGraph.from_workspace(workspace)
asset_graph = workspace.asset_graph
try:
asset_backfill_data = self.get_asset_backfill_data(asset_graph)
except DagsterDefinitionChangedDeserializationError:
Expand All @@ -205,7 +204,7 @@ def get_target_root_partitions_subset(
return None

if self.is_asset_backfill:
asset_graph = RemoteAssetGraph.from_workspace(workspace)
asset_graph = workspace.asset_graph
try:
asset_backfill_data = self.get_asset_backfill_data(asset_graph)
except DagsterDefinitionChangedDeserializationError:
Expand All @@ -220,7 +219,7 @@ def get_num_partitions(self, workspace: IWorkspace) -> Optional[int]:
return 0

if self.is_asset_backfill:
asset_graph = RemoteAssetGraph.from_workspace(workspace)
asset_graph = workspace.asset_graph
try:
asset_backfill_data = self.get_asset_backfill_data(asset_graph)
except DagsterDefinitionChangedDeserializationError:
Expand All @@ -238,7 +237,7 @@ def get_partition_names(self, workspace: IWorkspace) -> Optional[Sequence[str]]:
return []

if self.is_asset_backfill:
asset_graph = RemoteAssetGraph.from_workspace(workspace)
asset_graph = workspace.asset_graph
try:
asset_backfill_data = self.get_asset_backfill_data(asset_graph)
except DagsterDefinitionChangedDeserializationError:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ def _create_asset_run(
# likely is outdated and targeting the wrong job, refetch the asset
# graph from the workspace
workspace = workspace_process_context.create_request_context()
asset_graph = RemoteAssetGraph.from_workspace(workspace)
asset_graph = workspace.asset_graph

check.failed(
f"Failed to target asset selection {run_request.asset_selection} in run after retrying."
Expand Down
40 changes: 39 additions & 1 deletion python_modules/dagster/dagster/_core/workspace/workspace.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,18 @@
from abc import ABC, abstractmethod
from enum import Enum
from typing import TYPE_CHECKING, Mapping, NamedTuple, Optional, Sequence
from functools import cached_property
from typing import TYPE_CHECKING, Mapping, NamedTuple, Optional, Sequence, Tuple

from dagster._utils.error import SerializableErrorInfo

if TYPE_CHECKING:
from dagster._core.definitions.remote_asset_graph import RemoteAssetGraph
from dagster._core.remote_representation import CodeLocation, CodeLocationOrigin
from dagster._core.remote_representation.external_data import (
ExternalAssetCheck,
ExternalAssetNode,
)
from dagster._core.remote_representation.handle import RepositoryHandle


# For locations that are loaded asynchronously
Expand Down Expand Up @@ -48,6 +55,37 @@ def get_workspace_snapshot(self) -> Mapping[str, CodeLocationEntry]:
def get_code_location_statuses(self) -> Sequence[CodeLocationStatusEntry]:
pass

@cached_property
def asset_graph(self) -> "RemoteAssetGraph":
"""Returns a workspace scoped RemoteAssetGraph."""
from dagster._core.definitions.remote_asset_graph import RemoteAssetGraph

code_locations = (
location_entry.code_location
for location_entry in self.get_workspace_snapshot().values()
if location_entry.code_location
)
repos = (
repo
for code_location in code_locations
for repo in code_location.get_repositories().values()
)
repo_handle_external_asset_nodes: Sequence[
Tuple["RepositoryHandle", "ExternalAssetNode"]
] = []
asset_checks: Sequence["ExternalAssetCheck"] = []

for repo in repos:
for external_asset_node in repo.get_external_asset_nodes():
repo_handle_external_asset_nodes.append((repo.handle, external_asset_node))

asset_checks.extend(repo.get_external_asset_checks())

return RemoteAssetGraph.from_repository_handles_and_external_asset_nodes(
repo_handle_external_asset_nodes=repo_handle_external_asset_nodes,
external_asset_checks=asset_checks,
)


def location_status_from_location_entry(
entry: CodeLocationEntry,
Expand Down
4 changes: 2 additions & 2 deletions python_modules/dagster/dagster/_daemon/asset_daemon.py
Original file line number Diff line number Diff line change
Expand Up @@ -486,7 +486,7 @@ def _run_iteration_impl(
if not get_has_migrated_to_sensors(instance):
# Do a one-time migration to create the cursors for each sensor, based on the
# existing cursor for the legacy AMP tick
asset_graph = RemoteAssetGraph.from_workspace(workspace)
asset_graph = workspace.asset_graph
pre_sensor_cursor = _get_pre_sensor_auto_materialize_cursor(
instance, asset_graph
)
Expand Down Expand Up @@ -648,7 +648,7 @@ def _process_auto_materialize_tick_generator(

workspace = workspace_process_context.create_request_context()

asset_graph = RemoteAssetGraph.from_workspace(workspace)
asset_graph = workspace.asset_graph

instance: DagsterInstance = workspace_process_context.instance
error_info = None
Expand Down
3 changes: 1 addition & 2 deletions python_modules/dagster/dagster/_scheduler/stale.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
StaleStatus,
)
from dagster._core.definitions.events import AssetKey
from dagster._core.definitions.remote_asset_graph import RemoteAssetGraph
from dagster._core.definitions.run_request import RunRequest
from dagster._core.remote_representation.external import (
ExternalSchedule,
Expand All @@ -20,7 +19,7 @@ def resolve_stale_or_missing_assets(
run_request: RunRequest,
instigator: Union[ExternalSensor, ExternalSchedule],
) -> Sequence[AssetKey]:
asset_graph = RemoteAssetGraph.from_workspace(context.create_request_context())
asset_graph = context.create_request_context().asset_graph
asset_selection = (
run_request.asset_selection
if run_request.asset_selection is not None
Expand Down
Loading

0 comments on commit 7882f14

Please sign in to comment.