Skip to content

Commit

Permalink
split RemoteAssetGraph in to RemoteRepository and RemoteGlobal (#25199)
Browse files Browse the repository at this point in the history
More accurately model the two flavors of `RemoteAssetGraph` that we
create, a repository scoped one and a global / workspace scoped one. The
global asset graph is built by combining all of the repository scoped
asset graphs.

## How I Tested These Changes

existing coverage
  • Loading branch information
alangenfeld authored Oct 15, 2024
1 parent f724ea8 commit 92e60de
Show file tree
Hide file tree
Showing 17 changed files with 498 additions and 369 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -156,8 +156,9 @@ def get_asset_node_definition_collisions(
repos: Dict[AssetKey, List[GrapheneRepository]] = defaultdict(list)

for remote_asset_node in graphene_info.context.asset_graph.asset_nodes:
for repo_handle, asset_node_snap in remote_asset_node.repo_node_pairs:
if asset_node_snap.asset_key in asset_keys:
if remote_asset_node.key in asset_keys:
for info in remote_asset_node.repo_scoped_asset_infos:
asset_node_snap = info.asset_node.asset_node_snap
is_defined = (
asset_node_snap.node_definition_name
or asset_node_snap.graph_name
Expand All @@ -166,7 +167,7 @@ def get_asset_node_definition_collisions(
if not is_defined:
continue

repos[asset_node_snap.asset_key].append(GrapheneRepository(repo_handle))
repos[asset_node_snap.asset_key].append(GrapheneRepository(info.handle))

results: List[GrapheneAssetNodeDefinitionCollision] = []
for asset_key in repos.keys():
Expand All @@ -189,7 +190,7 @@ def _graphene_asset_node(
):
from dagster_graphql.schema.asset_graph import GrapheneAssetNode

handle = remote_node.priority_repository_handle
handle = remote_node.resolve_to_singular_repo_scoped_node().repository_handle
base_deployment_context = graphene_info.context.get_base_deployment_context()

return GrapheneAssetNode(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -256,10 +256,11 @@ def get_assets_latest_info(
for asset_key in step_keys_by_asset.keys():
asset_node = asset_nodes[asset_key]
if asset_node:
handle = asset_node.resolve_to_singular_repo_scoped_node().repository_handle
node_id = get_unique_asset_id(
asset_key,
asset_node.priority_repository_handle.repository_name,
asset_node.priority_repository_handle.location_name,
handle.repository_name,
handle.location_name,
)
else:
node_id = get_unique_asset_id(asset_key)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
from dagster._core.definitions.asset_check_spec import AssetCheckKey
from dagster._core.definitions.events import AssetKey
from dagster._core.definitions.partition import PartitionsDefinition
from dagster._core.definitions.remote_asset_graph import RemoteAssetGraph
from dagster._core.definitions.remote_asset_graph import RemoteWorkspaceAssetGraph
from dagster._core.definitions.selector import GraphSelector, JobSubsetSelector
from dagster._core.execution.backfill import PartitionBackfill
from dagster._core.workspace.context import BaseWorkspaceRequestContext
Expand Down Expand Up @@ -95,7 +95,7 @@ def assert_permission(graphene_info: "ResolveInfo", permission: str) -> None:

def has_permission_for_asset_graph(
graphene_info: "ResolveInfo",
asset_graph: RemoteAssetGraph,
asset_graph: RemoteWorkspaceAssetGraph,
asset_selection: Optional[Sequence[AssetKey]],
permission: str,
) -> bool:
Expand Down Expand Up @@ -125,7 +125,7 @@ def has_permission_for_asset_graph(

def assert_permission_for_asset_graph(
graphene_info: "ResolveInfo",
asset_graph: RemoteAssetGraph,
asset_graph: RemoteWorkspaceAssetGraph,
asset_selection: Optional[Sequence[AssetKey]],
permission: str,
) -> None:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -326,8 +326,9 @@ def __init__(

self._remote_node = check.inst_param(remote_node, "remote_node", RemoteAssetNode)

self._asset_node_snap = remote_node.priority_node_snap
self._repository_handle = remote_node.priority_repository_handle
repo_scoped_node = remote_node.resolve_to_singular_repo_scoped_node()
self._asset_node_snap = repo_scoped_node.asset_node_snap
self._repository_handle = repo_scoped_node.repository_handle
self._repository_selector = self._repository_handle.to_selector()

self._stale_status_loader = check.opt_inst_param(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1066,8 +1066,8 @@ def load_asset_graph() -> RemoteAssetGraph:
dynamic_partitions_loader=dynamic_partitions_loader,
# base_deployment_context will be None if we are not in a branch deployment
asset_graph_differ=AssetGraphDiffer.from_remote_repositories(
code_location_name=remote_node.priority_repository_handle.location_name,
repository_name=remote_node.priority_repository_handle.repository_name,
code_location_name=remote_node.resolve_to_singular_repo_scoped_node().repository_handle.location_name,
repository_name=remote_node.resolve_to_singular_repo_scoped_node().repository_handle.repository_name,
branch_workspace=graphene_info.context,
base_workspace=base_deployment_context,
)
Expand Down Expand Up @@ -1164,7 +1164,7 @@ def resolve_assetsLatestInfo(

# Build mapping of asset key to the step keys required to generate the asset
step_keys_by_asset: Dict[AssetKey, Sequence[str]] = {
remote_node.key: remote_node.priority_node_snap.op_names
remote_node.key: remote_node.resolve_to_singular_repo_scoped_node().asset_node_snap.op_names
for remote_node in remote_nodes
if remote_node
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -443,10 +443,10 @@ def resolve_asset_nodes(self, graphene_info: ResolveInfo) -> Sequence["GrapheneA
remote_node
for remote_node in ext_repo.asset_graph.asset_nodes
if (
(remote_node.priority_node_snap.node_definition_name == self.solid_def_name)
(remote_node.asset_node_snap.node_definition_name == self.solid_def_name)
or (
remote_node.priority_node_snap.graph_name
and remote_node.priority_node_snap.graph_name == self.solid_def_name
remote_node.asset_node_snap.graph_name
and remote_node.asset_node_snap.graph_name == self.solid_def_name
)
)
]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
)
from dagster._core.definitions.asset_graph import AssetGraph
from dagster._core.definitions.partition_key_range import PartitionKeyRange
from dagster._core.definitions.remote_asset_graph import RemoteAssetGraph
from dagster._core.definitions.remote_asset_graph import RemoteWorkspaceAssetGraph
from dagster._core.execution.asset_backfill import (
AssetBackfillIterationResult,
execute_asset_backfill_iteration,
Expand Down Expand Up @@ -222,7 +222,7 @@ def _get_run_stats(partition_statuses):


def _execute_asset_backfill_iteration_no_side_effects(
graphql_context, backfill_id: str, asset_graph: RemoteAssetGraph
graphql_context, backfill_id: str, asset_graph: RemoteWorkspaceAssetGraph
) -> None:
"""Executes an asset backfill iteration and updates the serialized asset backfill data.
However, does not execute side effects i.e. launching runs.
Expand Down Expand Up @@ -277,7 +277,7 @@ def _execute_backfill_iteration_with_side_effects(graphql_context, backfill_id):
def _mock_asset_backfill_runs(
graphql_context,
asset_key: AssetKey,
asset_graph: RemoteAssetGraph,
asset_graph: RemoteWorkspaceAssetGraph,
backfill_id: str,
status: DagsterRunStatus,
partition_key: Optional[str],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -349,14 +349,14 @@ def executable_in_same_run(
):
"""Returns whether a child asset can be materialized in the same run as a parent asset."""
from dagster._core.definitions.partition_mapping import IdentityPartitionMapping
from dagster._core.definitions.remote_asset_graph import RemoteAssetGraph
from dagster._core.definitions.remote_asset_graph import RemoteWorkspaceAssetGraph
from dagster._core.definitions.time_window_partition_mapping import TimeWindowPartitionMapping

child_node = asset_graph.get(child_key)
parent_node = asset_graph.get(parent_key)

# if operating on a RemoteAssetGraph, must share a repository handle
if isinstance(asset_graph, RemoteAssetGraph):
# if operating on a RemoteWorkspaceAssetGraph, must share a repository handle
if isinstance(asset_graph, RemoteWorkspaceAssetGraph):
child_handle = asset_graph.get_repository_handle(child_key)
parent_handle = asset_graph.get_repository_handle(parent_key)
if child_handle != parent_handle:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ def get_expected_data_time_for_asset_key(
"""Returns the data time that you would expect this asset to have if you were to execute it
on this tick.
"""
from dagster._core.definitions.remote_asset_graph import RemoteAssetGraph
from dagster._core.definitions.remote_asset_graph import RemoteWorkspaceAssetGraph

asset_key = context.asset_key
asset_graph = context.asset_graph
Expand All @@ -139,9 +139,9 @@ def get_expected_data_time_for_asset_key(
for parent_key in asset_graph.get(asset_key).parent_keys:
# if the parent will be materialized on this tick, and it's not in the same repo, then
# we must wait for this asset to be materialized
if isinstance(asset_graph, RemoteAssetGraph) and context.will_update_asset_partition(
AssetKeyPartitionKey(parent_key)
):
if isinstance(
asset_graph, RemoteWorkspaceAssetGraph
) and context.will_update_asset_partition(AssetKeyPartitionKey(parent_key)):
parent_repo = asset_graph.get_repository_handle(parent_key)
if parent_repo != asset_graph.get_repository_handle(asset_key):
return context.data_time_resolver.get_current_data_time(asset_key, current_time)
Expand Down
Loading

0 comments on commit 92e60de

Please sign in to comment.