diff --git a/python_modules/dagster-graphql/dagster_graphql/implementation/fetch_assets.py b/python_modules/dagster-graphql/dagster_graphql/implementation/fetch_assets.py index 5260d864b2084..e1a0688006b74 100644 --- a/python_modules/dagster-graphql/dagster_graphql/implementation/fetch_assets.py +++ b/python_modules/dagster-graphql/dagster_graphql/implementation/fetch_assets.py @@ -156,8 +156,9 @@ def get_asset_node_definition_collisions( repos: Dict[AssetKey, List[GrapheneRepository]] = defaultdict(list) for remote_asset_node in graphene_info.context.asset_graph.asset_nodes: - for repo_handle, asset_node_snap in remote_asset_node.repo_node_pairs: - if asset_node_snap.asset_key in asset_keys: + if remote_asset_node.key in asset_keys: + for info in remote_asset_node.repo_scoped_asset_infos: + asset_node_snap = info.asset_node.asset_node_snap is_defined = ( asset_node_snap.node_definition_name or asset_node_snap.graph_name @@ -166,7 +167,7 @@ def get_asset_node_definition_collisions( if not is_defined: continue - repos[asset_node_snap.asset_key].append(GrapheneRepository(repo_handle)) + repos[asset_node_snap.asset_key].append(GrapheneRepository(info.handle)) results: List[GrapheneAssetNodeDefinitionCollision] = [] for asset_key in repos.keys(): @@ -189,7 +190,7 @@ def _graphene_asset_node( ): from dagster_graphql.schema.asset_graph import GrapheneAssetNode - handle = remote_node.priority_repository_handle + handle = remote_node.resolve_to_singular_repo_scoped_node().repository_handle base_deployment_context = graphene_info.context.get_base_deployment_context() return GrapheneAssetNode( diff --git a/python_modules/dagster-graphql/dagster_graphql/implementation/fetch_runs.py b/python_modules/dagster-graphql/dagster_graphql/implementation/fetch_runs.py index 5aefa00438f65..4400f9bab3696 100644 --- a/python_modules/dagster-graphql/dagster_graphql/implementation/fetch_runs.py +++ b/python_modules/dagster-graphql/dagster_graphql/implementation/fetch_runs.py @@ -256,10 +256,11 @@ def get_assets_latest_info( for asset_key in step_keys_by_asset.keys(): asset_node = asset_nodes[asset_key] if asset_node: + handle = asset_node.resolve_to_singular_repo_scoped_node().repository_handle node_id = get_unique_asset_id( asset_key, - asset_node.priority_repository_handle.repository_name, - asset_node.priority_repository_handle.location_name, + handle.repository_name, + handle.location_name, ) else: node_id = get_unique_asset_id(asset_key) diff --git a/python_modules/dagster-graphql/dagster_graphql/implementation/utils.py b/python_modules/dagster-graphql/dagster_graphql/implementation/utils.py index 040a228511264..67366670cc072 100644 --- a/python_modules/dagster-graphql/dagster_graphql/implementation/utils.py +++ b/python_modules/dagster-graphql/dagster_graphql/implementation/utils.py @@ -27,7 +27,7 @@ from dagster._core.definitions.asset_check_spec import AssetCheckKey from dagster._core.definitions.events import AssetKey from dagster._core.definitions.partition import PartitionsDefinition -from dagster._core.definitions.remote_asset_graph import RemoteAssetGraph +from dagster._core.definitions.remote_asset_graph import RemoteWorkspaceAssetGraph from dagster._core.definitions.selector import GraphSelector, JobSubsetSelector from dagster._core.execution.backfill import PartitionBackfill from dagster._core.workspace.context import BaseWorkspaceRequestContext @@ -95,7 +95,7 @@ def assert_permission(graphene_info: "ResolveInfo", permission: str) -> None: def has_permission_for_asset_graph( graphene_info: "ResolveInfo", - asset_graph: RemoteAssetGraph, + asset_graph: RemoteWorkspaceAssetGraph, asset_selection: Optional[Sequence[AssetKey]], permission: str, ) -> bool: @@ -125,7 +125,7 @@ def has_permission_for_asset_graph( def assert_permission_for_asset_graph( graphene_info: "ResolveInfo", - asset_graph: RemoteAssetGraph, + asset_graph: RemoteWorkspaceAssetGraph, asset_selection: Optional[Sequence[AssetKey]], permission: str, ) -> None: diff --git a/python_modules/dagster-graphql/dagster_graphql/schema/asset_graph.py b/python_modules/dagster-graphql/dagster_graphql/schema/asset_graph.py index 62b7aab391a21..0b192bde05c2c 100644 --- a/python_modules/dagster-graphql/dagster_graphql/schema/asset_graph.py +++ b/python_modules/dagster-graphql/dagster_graphql/schema/asset_graph.py @@ -326,8 +326,9 @@ def __init__( self._remote_node = check.inst_param(remote_node, "remote_node", RemoteAssetNode) - self._asset_node_snap = remote_node.priority_node_snap - self._repository_handle = remote_node.priority_repository_handle + repo_scoped_node = remote_node.resolve_to_singular_repo_scoped_node() + self._asset_node_snap = repo_scoped_node.asset_node_snap + self._repository_handle = repo_scoped_node.repository_handle self._repository_selector = self._repository_handle.to_selector() self._stale_status_loader = check.opt_inst_param( diff --git a/python_modules/dagster-graphql/dagster_graphql/schema/roots/query.py b/python_modules/dagster-graphql/dagster_graphql/schema/roots/query.py index 97bc01183d0b7..ff8848f2765b1 100644 --- a/python_modules/dagster-graphql/dagster_graphql/schema/roots/query.py +++ b/python_modules/dagster-graphql/dagster_graphql/schema/roots/query.py @@ -1066,8 +1066,8 @@ def load_asset_graph() -> RemoteAssetGraph: dynamic_partitions_loader=dynamic_partitions_loader, # base_deployment_context will be None if we are not in a branch deployment asset_graph_differ=AssetGraphDiffer.from_remote_repositories( - code_location_name=remote_node.priority_repository_handle.location_name, - repository_name=remote_node.priority_repository_handle.repository_name, + code_location_name=remote_node.resolve_to_singular_repo_scoped_node().repository_handle.location_name, + repository_name=remote_node.resolve_to_singular_repo_scoped_node().repository_handle.repository_name, branch_workspace=graphene_info.context, base_workspace=base_deployment_context, ) @@ -1164,7 +1164,7 @@ def resolve_assetsLatestInfo( # Build mapping of asset key to the step keys required to generate the asset step_keys_by_asset: Dict[AssetKey, Sequence[str]] = { - remote_node.key: remote_node.priority_node_snap.op_names + remote_node.key: remote_node.resolve_to_singular_repo_scoped_node().asset_node_snap.op_names for remote_node in remote_nodes if remote_node } diff --git a/python_modules/dagster-graphql/dagster_graphql/schema/solids.py b/python_modules/dagster-graphql/dagster_graphql/schema/solids.py index 74ee66f568f4a..9b4f45c9566a7 100644 --- a/python_modules/dagster-graphql/dagster_graphql/schema/solids.py +++ b/python_modules/dagster-graphql/dagster_graphql/schema/solids.py @@ -443,10 +443,10 @@ def resolve_asset_nodes(self, graphene_info: ResolveInfo) -> Sequence["GrapheneA remote_node for remote_node in ext_repo.asset_graph.asset_nodes if ( - (remote_node.priority_node_snap.node_definition_name == self.solid_def_name) + (remote_node.asset_node_snap.node_definition_name == self.solid_def_name) or ( - remote_node.priority_node_snap.graph_name - and remote_node.priority_node_snap.graph_name == self.solid_def_name + remote_node.asset_node_snap.graph_name + and remote_node.asset_node_snap.graph_name == self.solid_def_name ) ) ] diff --git a/python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_partition_backfill.py b/python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_partition_backfill.py index 3561f3d5edf68..34e337d9b20d3 100644 --- a/python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_partition_backfill.py +++ b/python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_partition_backfill.py @@ -12,7 +12,7 @@ ) from dagster._core.definitions.asset_graph import AssetGraph from dagster._core.definitions.partition_key_range import PartitionKeyRange -from dagster._core.definitions.remote_asset_graph import RemoteAssetGraph +from dagster._core.definitions.remote_asset_graph import RemoteWorkspaceAssetGraph from dagster._core.execution.asset_backfill import ( AssetBackfillIterationResult, execute_asset_backfill_iteration, @@ -222,7 +222,7 @@ def _get_run_stats(partition_statuses): def _execute_asset_backfill_iteration_no_side_effects( - graphql_context, backfill_id: str, asset_graph: RemoteAssetGraph + graphql_context, backfill_id: str, asset_graph: RemoteWorkspaceAssetGraph ) -> None: """Executes an asset backfill iteration and updates the serialized asset backfill data. However, does not execute side effects i.e. launching runs. @@ -277,7 +277,7 @@ def _execute_backfill_iteration_with_side_effects(graphql_context, backfill_id): def _mock_asset_backfill_runs( graphql_context, asset_key: AssetKey, - asset_graph: RemoteAssetGraph, + asset_graph: RemoteWorkspaceAssetGraph, backfill_id: str, status: DagsterRunStatus, partition_key: Optional[str], diff --git a/python_modules/dagster/dagster/_core/definitions/asset_graph.py b/python_modules/dagster/dagster/_core/definitions/asset_graph.py index 45052835bad39..7ab0fdd51dc65 100644 --- a/python_modules/dagster/dagster/_core/definitions/asset_graph.py +++ b/python_modules/dagster/dagster/_core/definitions/asset_graph.py @@ -349,14 +349,14 @@ def executable_in_same_run( ): """Returns whether a child asset can be materialized in the same run as a parent asset.""" from dagster._core.definitions.partition_mapping import IdentityPartitionMapping - from dagster._core.definitions.remote_asset_graph import RemoteAssetGraph + from dagster._core.definitions.remote_asset_graph import RemoteWorkspaceAssetGraph from dagster._core.definitions.time_window_partition_mapping import TimeWindowPartitionMapping child_node = asset_graph.get(child_key) parent_node = asset_graph.get(parent_key) - # if operating on a RemoteAssetGraph, must share a repository handle - if isinstance(asset_graph, RemoteAssetGraph): + # if operating on a RemoteWorkspaceAssetGraph, must share a repository handle + if isinstance(asset_graph, RemoteWorkspaceAssetGraph): child_handle = asset_graph.get_repository_handle(child_key) parent_handle = asset_graph.get_repository_handle(parent_key) if child_handle != parent_handle: diff --git a/python_modules/dagster/dagster/_core/definitions/freshness_based_auto_materialize.py b/python_modules/dagster/dagster/_core/definitions/freshness_based_auto_materialize.py index d3ed9b26eebe1..bdb8d22674c07 100644 --- a/python_modules/dagster/dagster/_core/definitions/freshness_based_auto_materialize.py +++ b/python_modules/dagster/dagster/_core/definitions/freshness_based_auto_materialize.py @@ -122,7 +122,7 @@ def get_expected_data_time_for_asset_key( """Returns the data time that you would expect this asset to have if you were to execute it on this tick. """ - from dagster._core.definitions.remote_asset_graph import RemoteAssetGraph + from dagster._core.definitions.remote_asset_graph import RemoteWorkspaceAssetGraph asset_key = context.asset_key asset_graph = context.asset_graph @@ -139,9 +139,9 @@ def get_expected_data_time_for_asset_key( for parent_key in asset_graph.get(asset_key).parent_keys: # if the parent will be materialized on this tick, and it's not in the same repo, then # we must wait for this asset to be materialized - if isinstance(asset_graph, RemoteAssetGraph) and context.will_update_asset_partition( - AssetKeyPartitionKey(parent_key) - ): + if isinstance( + asset_graph, RemoteWorkspaceAssetGraph + ) and context.will_update_asset_partition(AssetKeyPartitionKey(parent_key)): parent_repo = asset_graph.get_repository_handle(parent_key) if parent_repo != asset_graph.get_repository_handle(asset_key): return context.data_time_resolver.get_current_data_time(asset_key, current_time) diff --git a/python_modules/dagster/dagster/_core/definitions/remote_asset_graph.py b/python_modules/dagster/dagster/_core/definitions/remote_asset_graph.py index 96463ec561b27..11f5b0214b168 100644 --- a/python_modules/dagster/dagster/_core/definitions/remote_asset_graph.py +++ b/python_modules/dagster/dagster/_core/definitions/remote_asset_graph.py @@ -1,22 +1,25 @@ import itertools import warnings +from abc import ABC, abstractmethod from collections import defaultdict -from enum import Enum from functools import cached_property from typing import ( TYPE_CHECKING, AbstractSet, - DefaultDict, Dict, + Generic, Iterable, List, Mapping, Optional, Sequence, Set, - Tuple, + TypeVar, + Union, ) +from typing_extensions import Annotated + import dagster._check as check from dagster._core.definitions.asset_check_spec import AssetCheckKey from dagster._core.definitions.asset_job import IMPLICIT_ASSET_JOB_NAME @@ -41,76 +44,228 @@ from dagster._core.remote_representation.external import RemoteRepository from dagster._core.remote_representation.handle import RepositoryHandle from dagster._core.workspace.workspace import WorkspaceSnapshot +from dagster._record import ImportFrom, record +from dagster._utils.cached_method import cached_method if TYPE_CHECKING: from dagster._core.remote_representation.external_data import AssetCheckNodeSnap, AssetNodeSnap - from dagster._core.selector.subset_selector import DependencyGraph -class RemoteAssetNode(BaseAssetNode): - def __init__( - self, - key: AssetKey, - parent_keys: AbstractSet[AssetKey], - child_keys: AbstractSet[AssetKey], - execution_set_keys: AbstractSet[EntityKey], - repo_node_pairs: Sequence[Tuple[RepositoryHandle, "AssetNodeSnap"]], - check_keys: AbstractSet[AssetCheckKey], - ): - self.key = key - self.parent_keys = parent_keys - self.child_keys = child_keys - self._repo_node_pairs = repo_node_pairs - self._asset_node_snaps = [node for _, node in repo_node_pairs] - self._check_keys = check_keys - self._execution_set_keys = execution_set_keys +@record +class RemoteAssetCheckNode: + handle: RepositoryHandle + asset_check: Annotated[ + "AssetCheckNodeSnap", + ImportFrom("dagster._core.remote_representation.external_data"), + ] + execution_set_entity_keys: AbstractSet[EntityKey] - ##### COMMON ASSET NODE INTERFACE + +class RemoteAssetNode(BaseAssetNode, ABC): + @abstractmethod + def resolve_to_singular_repo_scoped_node(self) -> "RemoteRepositoryAssetNode": ... + + @property + def execution_set_asset_keys(self) -> AbstractSet[AssetKey]: + return {k for k in self.execution_set_entity_keys if isinstance(k, AssetKey)} @property def description(self) -> Optional[str]: - return self.priority_node_snap.description + return self.resolve_to_singular_repo_scoped_node().asset_node_snap.description @property def group_name(self) -> str: - return self.priority_node_snap.group_name or DEFAULT_GROUP_NAME + return ( + self.resolve_to_singular_repo_scoped_node().asset_node_snap.group_name + or DEFAULT_GROUP_NAME + ) + + @property + def metadata(self) -> ArbitraryMetadataMapping: + return self.resolve_to_singular_repo_scoped_node().asset_node_snap.metadata + + @property + def tags(self) -> Mapping[str, str]: + return self.resolve_to_singular_repo_scoped_node().asset_node_snap.tags or {} + + @property + def owners(self) -> Sequence[str]: + return self.resolve_to_singular_repo_scoped_node().asset_node_snap.owners or [] + + @property + def is_partitioned(self) -> bool: + return self.resolve_to_singular_repo_scoped_node().asset_node_snap.partitions is not None @cached_property + def partitions_def(self) -> Optional[PartitionsDefinition]: + partitions_snap = self.resolve_to_singular_repo_scoped_node().asset_node_snap.partitions + return partitions_snap.get_partitions_definition() if partitions_snap else None + + @property + def freshness_policy(self) -> Optional[FreshnessPolicy]: + # It is currently not possible to access the freshness policy for an observation definition + # if a materialization definition also exists. This needs to be fixed. + return self.resolve_to_singular_repo_scoped_node().asset_node_snap.freshness_policy + + @property + def code_version(self) -> Optional[str]: + # It is currently not possible to access the code version for an observation definition if a + # materialization definition also exists. This needs to be fixed. + return self.resolve_to_singular_repo_scoped_node().asset_node_snap.code_version + + @property + def job_names(self) -> Sequence[str]: + # It is currently not possible to access the job names for an observation definition if a + # materialization definition also exists. This needs to be fixed. + return ( + self.resolve_to_singular_repo_scoped_node().asset_node_snap.job_names + if self.is_executable + else [] + ) + + +@record +class RemoteRepositoryAssetNode(RemoteAssetNode): + """Asset nodes from a single RemoteRepository.""" + + repository_handle: RepositoryHandle + asset_node_snap: Annotated[ + "AssetNodeSnap", + ImportFrom("dagster._core.remote_representation.external_data"), + ] + parent_keys: AbstractSet[AssetKey] + child_keys: AbstractSet[AssetKey] + check_keys: AbstractSet[AssetCheckKey] + execution_set_entity_keys: AbstractSet[EntityKey] + + def __hash__(self): + # we create sets of these objects in the context of asset graphs but don't want to + # enforce that all recursively contained types are hashable so use object hash instead + return object.__hash__(self) + + def resolve_to_singular_repo_scoped_node(self) -> "RemoteRepositoryAssetNode": + return self + + @property + def key(self) -> AssetKey: + return self.asset_node_snap.asset_key + + @property def is_materializable(self) -> bool: - return any(node.is_materializable for node in self._asset_node_snaps) + return self.asset_node_snap.is_materializable - @cached_property + @property def is_observable(self) -> bool: - return any(node.is_observable for node in self._asset_node_snaps) + return self.asset_node_snap.is_observable - @cached_property + @property def is_external(self) -> bool: - return all(node.is_external for node in self._asset_node_snaps) + return self.asset_node_snap.is_external - @cached_property + @property def is_executable(self) -> bool: - return any(node.is_executable for node in self._asset_node_snaps) + return self.asset_node_snap.is_executable @property - def metadata(self) -> ArbitraryMetadataMapping: - return self.priority_node_snap.metadata + def partition_mappings(self) -> Mapping[AssetKey, PartitionMapping]: + return { + dep.parent_asset_key: dep.partition_mapping + for dep in self.asset_node_snap.parent_edges + if dep.partition_mapping is not None + } @property - def tags(self) -> Mapping[str, str]: - return self.priority_node_snap.tags or {} + def backfill_policy(self) -> Optional[BackfillPolicy]: + return self.asset_node_snap.backfill_policy @property - def owners(self) -> Sequence[str]: - return self.priority_node_snap.owners or [] + def automation_condition(self) -> Optional[AutomationCondition]: + return self.asset_node_snap.automation_condition @property - def is_partitioned(self) -> bool: - return self.priority_node_snap.partitions is not None + def auto_materialize_policy(self) -> Optional[AutoMaterializePolicy]: + return self.asset_node_snap.auto_materialize_policy + + @property + def auto_observe_interval_minutes(self) -> Optional[float]: + return self.asset_node_snap.auto_observe_interval_minutes + +@record +class RepositoryScopedAssetInfo: + """RemoteRepositoryAssetNode paired with additional information from that repository. + This split allows repository scoped asset graph to be constructed without depending on schedules/sensors + as defining schedule/sensors needs an asset graph. + """ + + asset_node: RemoteRepositoryAssetNode + + @property + def handle(self) -> RepositoryHandle: + return self.asset_node.repository_handle + + +@record +class RemoteWorkspaceAssetNode(RemoteAssetNode): + """Asset nodes constructed from a WorkspaceSnapshot, containing nodes from potentially several RemoteRepositories.""" + + repo_scoped_asset_infos: Sequence[RepositoryScopedAssetInfo] + + def __hash__(self): + # we create sets of these objects in the context of asset graphs but don't want to + # enforce that all recursively contained types are hashable so use object hash instead + return object.__hash__(self) + + ##### COMMON ASSET NODE INTERFACE @cached_property - def partitions_def(self) -> Optional[PartitionsDefinition]: - partitions_snap = self.priority_node_snap.partitions - return partitions_snap.get_partitions_definition() if partitions_snap else None + def key(self) -> AssetKey: + return self.repo_scoped_asset_infos[0].asset_node.asset_node_snap.asset_key + + @property + def parent_keys(self) -> AbstractSet[AssetKey]: + # combine deps from all nodes + keys = set() + for info in self.repo_scoped_asset_infos: + keys.update(info.asset_node.parent_keys) + return keys + + @property + def child_keys(self) -> AbstractSet[AssetKey]: + # combine deps from all nodes + keys = set() + for info in self.repo_scoped_asset_infos: + keys.update(info.asset_node.child_keys) + return keys + + @property + def check_keys(self) -> AbstractSet[AssetCheckKey]: + # combine check keys from all nodes + keys = set() + for info in self.repo_scoped_asset_infos: + keys.update(info.asset_node.check_keys) + return keys + + @property + def execution_set_entity_keys( + self, + ) -> AbstractSet[Union[AssetKey, AssetCheckKey]]: + return self.resolve_to_singular_repo_scoped_node().execution_set_entity_keys + + @cached_property + def is_materializable(self) -> bool: + return any(info.asset_node.is_materializable for info in self.repo_scoped_asset_infos) + + @cached_property + def is_observable(self) -> bool: + return any(info.asset_node.is_observable for info in self.repo_scoped_asset_infos) + + @cached_property + def is_external(self) -> bool: + return all(info.asset_node.is_external for info in self.repo_scoped_asset_infos) + + @cached_property + def is_executable(self) -> bool: + return any(node.asset_node.is_executable for node in self.repo_scoped_asset_infos) @property def partition_mappings(self) -> Mapping[AssetKey, PartitionMapping]: @@ -123,12 +278,6 @@ def partition_mappings(self) -> Mapping[AssetKey, PartitionMapping]: else: return {} - @property - def freshness_policy(self) -> Optional[FreshnessPolicy]: - # It is currently not possible to access the freshness policy for an observation definition - # if a materialization definition also exists. This needs to be fixed. - return self.priority_node_snap.freshness_policy - @property def auto_materialize_policy(self) -> Optional[AutoMaterializePolicy]: return ( @@ -156,64 +305,30 @@ def auto_observe_interval_minutes(self) -> Optional[float]: def backfill_policy(self) -> Optional[BackfillPolicy]: return self._materializable_node_snap.backfill_policy if self.is_materializable else None - @property - def code_version(self) -> Optional[str]: - # It is currently not possible to access the code version for an observation definition if a - # materialization definition also exists. This needs to be fixed. - return self.priority_node_snap.code_version - - @property - def check_keys(self) -> AbstractSet[AssetCheckKey]: - return self._check_keys - - @property - def execution_set_asset_keys(self) -> AbstractSet[AssetKey]: - return {k for k in self.execution_set_entity_keys if isinstance(k, AssetKey)} - - @property - def execution_set_entity_keys(self) -> AbstractSet[EntityKey]: - return self._execution_set_keys - ##### REMOTE-SPECIFIC INTERFACE - - @property - def job_names(self) -> Sequence[str]: - # It is currently not possible to access the job names for an observation definition if a - # materialization definition also exists. This needs to be fixed. - return self.priority_node_snap.job_names if self.is_executable else [] - - @property - def priority_repository_handle(self) -> RepositoryHandle: - # This property supports existing behavior but it should be phased out, because it relies on - # materialization nodes shadowing observation nodes that would otherwise be exposed. - return next( - itertools.chain( - (repo for repo, node in self._repo_node_pairs if node.is_materializable), - (repo for repo, node in self._repo_node_pairs if node.is_observable), - (repo for repo, node in self._repo_node_pairs), - ) - ) - - @property - def repository_handles(self) -> Sequence[RepositoryHandle]: - return [repo_handle for repo_handle, _ in self._repo_node_pairs] - - @property - def repo_node_pairs(self) -> Sequence[Tuple[RepositoryHandle, "AssetNodeSnap"]]: - return self._repo_node_pairs - - @cached_property - def priority_node_snap(self) -> "AssetNodeSnap": + @cached_method + def resolve_to_singular_repo_scoped_node(self) -> "RemoteRepositoryAssetNode": # Return a materialization node if it exists, otherwise return an observable node if it # exists, otherwise return any node. This exists to preserve implicit behavior, where the # materialization node was previously preferred over the observable node. This is a # temporary measure until we can appropriately scope the accessors that could apply to # either a materialization or observation node. + + # This property supports existing behavior but it should be phased out, because it relies on + # materialization nodes shadowing observation nodes that would otherwise be exposed. return next( itertools.chain( - (node for node in self._asset_node_snaps if node.is_materializable), - (node for node in self._asset_node_snaps if node.is_observable), - (node for node in self._asset_node_snaps), + ( + info.asset_node + for info in self.repo_scoped_asset_infos + if info.asset_node.is_materializable + ), + ( + info.asset_node + for info in self.repo_scoped_asset_infos + if info.asset_node.is_observable + ), + (info.asset_node for info in self.repo_scoped_asset_infos), ) ) @@ -222,157 +337,47 @@ def priority_node_snap(self) -> "AssetNodeSnap": @cached_property def _materializable_node_snap(self) -> "AssetNodeSnap": try: - return next(node for node in self._asset_node_snaps if node.is_materializable) + return next( + info.asset_node.asset_node_snap + for info in self.repo_scoped_asset_infos + if info.asset_node.is_materializable + ) except StopIteration: check.failed("No materializable node found") @cached_property def _observable_node_snap(self) -> "AssetNodeSnap": try: - return next((node for node in self._asset_node_snaps if node.is_observable)) + return next( + ( + info.asset_node.asset_node_snap + for info in self.repo_scoped_asset_infos + if info.asset_node.is_observable + ) + ) except StopIteration: check.failed("No observable node found") -class RemoteAssetGraphScope(Enum): - """Was this asset graph built from a single repository or all repositories across the whole workspace.""" - - REPOSITORY = "REPOSITORY" - WORKSPACE = "WORKSPACE" - - -class RemoteAssetGraph(BaseAssetGraph[RemoteAssetNode]): - def __init__( - self, - scope: RemoteAssetGraphScope, - asset_nodes_by_key: Mapping[AssetKey, RemoteAssetNode], - asset_checks_by_key: Mapping[AssetCheckKey, "AssetCheckNodeSnap"], - asset_check_execution_sets_by_key: Mapping[AssetCheckKey, AbstractSet[EntityKey]], - repository_handles_by_asset_check_key: Mapping[AssetCheckKey, RepositoryHandle], - ): - self._scope = scope - self._asset_nodes_by_key = asset_nodes_by_key - self._asset_checks_by_key = asset_checks_by_key - self._asset_check_nodes_by_key = { - k: AssetCheckNode(k, v.blocking, v.automation_condition) - for k, v in asset_checks_by_key.items() - } - self._asset_check_execution_sets_by_key = asset_check_execution_sets_by_key - self._repository_handles_by_asset_check_key = repository_handles_by_asset_check_key - - @classmethod - def from_remote_repository(cls, repo: RemoteRepository): - return cls._build( - scope=RemoteAssetGraphScope.REPOSITORY, - repo_handle_assets=[ - (repo.handle, node_snap) for node_snap in repo.get_asset_node_snaps() - ], - repo_handle_asset_checks=[ - (repo.handle, asset_check_node) - for asset_check_node in repo.get_asset_check_node_snaps() - ], - ) - - @classmethod - def from_workspace_snapshot(cls, workspace: WorkspaceSnapshot): - code_locations = ( - location_entry.code_location - for location_entry in workspace.code_location_entries.values() - if location_entry.code_location - ) - repos = ( - repo - for code_location in code_locations - for repo in code_location.get_repositories().values() - ) - - repo_handle_assets: Sequence[Tuple["RepositoryHandle", "AssetNodeSnap"]] = [] - repo_handle_asset_checks: Sequence[Tuple["RepositoryHandle", "AssetCheckNodeSnap"]] = [] - for repo in repos: - for asset_node_snap in repo.get_asset_node_snaps(): - repo_handle_assets.append((repo.handle, asset_node_snap)) - for asset_check_node_snap in repo.get_asset_check_node_snaps(): - repo_handle_asset_checks.append((repo.handle, asset_check_node_snap)) - - return cls._build( - scope=RemoteAssetGraphScope.WORKSPACE, - repo_handle_assets=repo_handle_assets, - repo_handle_asset_checks=repo_handle_asset_checks, - ) - - @classmethod - def _build( - cls, - scope: RemoteAssetGraphScope, - repo_handle_assets: Sequence[Tuple[RepositoryHandle, "AssetNodeSnap"]], - repo_handle_asset_checks: Sequence[Tuple[RepositoryHandle, "AssetCheckNodeSnap"]], - ) -> "RemoteAssetGraph": - _warn_on_duplicate_nodes(repo_handle_assets) - - # Build an index of execution sets by key. An execution set is a set of assets and checks - # that must be executed together. AssetNodeSnaps and AssetCheckNodeSnaps already have an - # optional execution_set_identifier set. A null execution_set_identifier indicates that the - # node or check can be executed independently. - assets = [asset for _, asset in repo_handle_assets] - asset_checks = [asset_check for _, asset_check in repo_handle_asset_checks] - execution_sets_by_key = _build_execution_set_index(assets, asset_checks) - - # Index all (RepositoryHandle, AssetNodeSnap) pairs by their asset key, then use this to - # build the set of RemoteAssetNodes (indexed by key). Each RemoteAssetNode wraps the set of - # pairs for an asset key. - repo_node_pairs_by_key: Dict[AssetKey, List[Tuple[RepositoryHandle, "AssetNodeSnap"]]] = ( - defaultdict(list) - ) - - # Build the dependency graph of asset keys. - all_keys = {asset.asset_key for asset in assets} - upstream: Dict[AssetKey, Set[AssetKey]] = {key: set() for key in all_keys} - downstream: Dict[AssetKey, Set[AssetKey]] = {key: set() for key in all_keys} +TRemoteAssetNode = TypeVar("TRemoteAssetNode", bound=RemoteAssetNode) - for repo_handle, node in repo_handle_assets: - repo_node_pairs_by_key[node.asset_key].append((repo_handle, node)) - for dep in node.parent_edges: - upstream[node.asset_key].add(dep.parent_asset_key) - downstream[dep.parent_asset_key].add(node.asset_key) - dep_graph: DependencyGraph[AssetKey] = {"upstream": upstream, "downstream": downstream} - - # Build the set of ExternalAssetChecks, indexed by key. Also the index of execution units for - # each asset check key. - check_keys_by_asset_key: Dict[AssetKey, Set[AssetCheckKey]] = defaultdict(set) - asset_checks_by_key: Dict[AssetCheckKey, "AssetCheckNodeSnap"] = {} - repository_handles_by_asset_check_key: Dict[AssetCheckKey, RepositoryHandle] = {} - for repo_handle, asset_check in repo_handle_asset_checks: - asset_checks_by_key[asset_check.key] = asset_check - check_keys_by_asset_key[asset_check.asset_key].add(asset_check.key) - repository_handles_by_asset_check_key[asset_check.key] = repo_handle - - asset_check_execution_sets_by_key = { - k: v for k, v in execution_sets_by_key.items() if isinstance(k, AssetCheckKey) - } - # Build the set of RemoteAssetNodes in topological order so that each node can hold - # references to its parents. - asset_nodes_by_key = { - key: RemoteAssetNode( - key=key, - parent_keys=dep_graph["upstream"][key], - child_keys=dep_graph["downstream"][key], - execution_set_keys=execution_sets_by_key[key], - repo_node_pairs=repo_node_pairs, - check_keys=check_keys_by_asset_key[key], - ) - for key, repo_node_pairs in repo_node_pairs_by_key.items() - } +class RemoteAssetGraph(BaseAssetGraph[TRemoteAssetNode], ABC, Generic[TRemoteAssetNode]): + @property + @abstractmethod + def remote_asset_nodes_by_key(self) -> Mapping[AssetKey, TRemoteAssetNode]: ... - return cls( - scope, - asset_nodes_by_key, - asset_checks_by_key, - asset_check_execution_sets_by_key, - repository_handles_by_asset_check_key, - ) + @property + @abstractmethod + def remote_asset_check_nodes_by_key(self) -> Mapping[AssetCheckKey, RemoteAssetCheckNode]: ... ##### COMMON ASSET GRAPH INTERFACE + @cached_property + def _asset_check_nodes_by_key(self) -> Mapping[AssetCheckKey, AssetCheckNode]: + return { + k: AssetCheckNode(k, v.asset_check.blocking, v.asset_check.automation_condition) + for k, v in self.remote_asset_check_nodes_by_key.items() + } def get_execution_set_asset_and_check_keys( self, entity_key: EntityKey @@ -380,24 +385,17 @@ def get_execution_set_asset_and_check_keys( if isinstance(entity_key, AssetKey): return self.get(entity_key).execution_set_entity_keys else: # AssetCheckKey - return self._asset_check_execution_sets_by_key[entity_key] + return self.remote_asset_check_nodes_by_key[entity_key].execution_set_entity_keys ##### REMOTE-SPECIFIC METHODS - @property - def asset_node_snaps_by_key(self) -> Mapping[AssetKey, "AssetNodeSnap"]: - # This exists to support existing callsites but it should be removed ASAP, since it exposes - # `AssetNodeSnap` instances directly. All sites using this should use RemoteAssetNode - # instead. - return {k: node.priority_node_snap for k, node in self._asset_nodes_by_key.items()} - @property def asset_checks(self) -> Sequence["AssetCheckNodeSnap"]: - return list(self._asset_checks_by_key.values()) + return [node.asset_check for node in self.remote_asset_check_nodes_by_key.values()] @cached_property def asset_check_keys(self) -> AbstractSet[AssetCheckKey]: - return set(self._asset_checks_by_key.keys()) + return set(self.remote_asset_check_nodes_by_key.keys()) def asset_keys_for_job(self, job_name: str) -> AbstractSet[AssetKey]: return {node.key for node in self.asset_nodes if job_name in node.job_names} @@ -406,16 +404,6 @@ def asset_keys_for_job(self, job_name: str) -> AbstractSet[AssetKey]: def all_job_names(self) -> AbstractSet[str]: return {job_name for node in self.asset_nodes for job_name in node.job_names} - @cached_property - def repository_handles_by_key(self) -> Mapping[EntityKey, RepositoryHandle]: - return { - **{k: node.priority_repository_handle for k, node in self._asset_nodes_by_key.items()}, - **self._repository_handles_by_asset_check_key, - } - - def get_repository_handle(self, key: EntityKey) -> RepositoryHandle: - return self.repository_handles_by_key[key] - def get_materialization_job_names(self, asset_key: AssetKey) -> Sequence[str]: """Returns the names of jobs that materialize this asset.""" # This is a poorly named method because it will expose observation job names for assets with @@ -442,6 +430,118 @@ def get_implicit_job_name_for_assets( """ return IMPLICIT_ASSET_JOB_NAME + +@record +class RemoteRepositoryAssetGraph(RemoteAssetGraph[RemoteRepositoryAssetNode]): + remote_asset_nodes_by_key: Mapping[AssetKey, RemoteRepositoryAssetNode] + remote_asset_check_nodes_by_key: Mapping[AssetCheckKey, RemoteAssetCheckNode] + + @property + def _asset_nodes_by_key(self) -> Mapping[AssetKey, RemoteRepositoryAssetNode]: + return self.remote_asset_nodes_by_key + + @classmethod + def build(cls, repo: RemoteRepository): + # First pass, we need to: + + # * Build the dependency graph of asset keys. + upstream: Dict[AssetKey, Set[AssetKey]] = defaultdict(set) + downstream: Dict[AssetKey, Set[AssetKey]] = defaultdict(set) + + # * Build an index of execution sets by key. An execution set is a set of assets and checks + # that must be executed together. AssetNodeSnaps and AssetCheckNodeSnaps already have an + # optional execution_set_identifier set. A null execution_set_identifier indicates that the + # node or check can be executed independently. + execution_sets_by_id: Dict[str, Set[EntityKey]] = defaultdict(set) + + # * Map checks to their corresponding asset keys + check_keys_by_asset_key: Dict[AssetKey, Set[AssetCheckKey]] = defaultdict(set) + for asset_snap in repo.get_asset_node_snaps(): + id = asset_snap.execution_set_identifier + key = asset_snap.asset_key + if id is not None: + execution_sets_by_id[id].add(key) + + for dep in asset_snap.parent_edges: + upstream[asset_snap.asset_key].add(dep.parent_asset_key) + downstream[dep.parent_asset_key].add(asset_snap.asset_key) + + for check_snap in repo.get_asset_check_node_snaps(): + id = check_snap.execution_set_identifier + key = check_snap.key + if id is not None: + execution_sets_by_id[id].add(key) + + check_keys_by_asset_key[check_snap.asset_key].add(check_snap.key) + + # Second Pass - build the final nodes + assets_by_key: Dict[AssetKey, RemoteRepositoryAssetNode] = {} + asset_checks_by_key: Dict[AssetCheckKey, RemoteAssetCheckNode] = {} + + for asset_snap in repo.get_asset_node_snaps(): + id = asset_snap.execution_set_identifier + key = asset_snap.asset_key + + assets_by_key[key] = RemoteRepositoryAssetNode( + repository_handle=repo.handle, + asset_node_snap=asset_snap, + execution_set_entity_keys=execution_sets_by_id[id] if id is not None else {key}, + parent_keys=upstream[key], + child_keys=downstream[key], + check_keys=check_keys_by_asset_key[key], + ) + + for check_snap in repo.get_asset_check_node_snaps(): + id = check_snap.execution_set_identifier + key = check_snap.key + + asset_checks_by_key[key] = RemoteAssetCheckNode( + handle=repo.handle, + asset_check=check_snap, + execution_set_entity_keys=execution_sets_by_id[id] if id is not None else {key}, + ) + + return cls( + remote_asset_nodes_by_key=assets_by_key, + remote_asset_check_nodes_by_key=asset_checks_by_key, + ) + + +@record +class RemoteWorkspaceAssetGraph(RemoteAssetGraph[RemoteWorkspaceAssetNode]): + remote_asset_nodes_by_key: Mapping[AssetKey, RemoteWorkspaceAssetNode] + remote_asset_check_nodes_by_key: Mapping[AssetCheckKey, RemoteAssetCheckNode] + + @property + def _asset_nodes_by_key(self) -> Mapping[AssetKey, RemoteWorkspaceAssetNode]: + return self.remote_asset_nodes_by_key + + @property + def asset_node_snaps_by_key(self) -> Mapping[AssetKey, "AssetNodeSnap"]: + # This exists to support existing callsites but it should be removed ASAP, since it exposes + # `AssetNodeSnap` instances directly. All sites using this should use RemoteAssetNode + # instead. + return { + k: node.resolve_to_singular_repo_scoped_node().asset_node_snap + for k, node in self._asset_nodes_by_key.items() + } + + @cached_property + def repository_handles_by_key(self) -> Mapping[EntityKey, RepositoryHandle]: + return { + **{ + k: node.resolve_to_singular_repo_scoped_node().repository_handle + for k, node in self._asset_nodes_by_key.items() + }, + **{k: v.handle for k, v in self.remote_asset_check_nodes_by_key.items()}, + } + + def get_repository_handle(self, key: EntityKey) -> RepositoryHandle: + if isinstance(key, AssetKey): + return self.get(key).resolve_to_singular_repo_scoped_node().repository_handle + else: + return self.remote_asset_check_nodes_by_key[key].handle + def split_entity_keys_by_repository( self, keys: AbstractSet[EntityKey] ) -> Sequence[AbstractSet[EntityKey]]: @@ -451,73 +551,95 @@ def split_entity_keys_by_repository( keys_by_repo[(repo_handle.location_name, repo_handle.repository_name)].add(key) return list(keys_by_repo.values()) + @classmethod + def build(cls, workspace: WorkspaceSnapshot): + # Combine repository scoped asset graphs with additional context to form the global graph -def _warn_on_duplicate_nodes( - repo_handle_asset_node_snaps: Sequence[Tuple[RepositoryHandle, "AssetNodeSnap"]], -) -> None: + code_locations = ( + location_entry.code_location + for location_entry in workspace.code_location_entries.values() + if location_entry.code_location + ) + repos = ( + repo + for code_location in code_locations + for repo in code_location.get_repositories().values() + ) + + asset_infos_by_key: Dict[AssetKey, List[RepositoryScopedAssetInfo]] = defaultdict(list) + asset_checks_by_key: Dict[AssetCheckKey, RemoteAssetCheckNode] = {} + for repo in repos: + for key, asset_node in repo.asset_graph.remote_asset_nodes_by_key.items(): + asset_infos_by_key[key].append( + RepositoryScopedAssetInfo( + asset_node=asset_node, + ) + ) + # NOTE: matches previous behavior of completely ignoring asset check collisions + asset_checks_by_key.update(repo.asset_graph.remote_asset_check_nodes_by_key) + + asset_nodes_by_key = {} + nodes_with_multiple = [] + for key, asset_infos in asset_infos_by_key.items(): + node = RemoteWorkspaceAssetNode( + repo_scoped_asset_infos=asset_infos, + ) + asset_nodes_by_key[key] = node + if len(asset_infos) > 1: + nodes_with_multiple.append(node) + + _warn_on_duplicate_nodes(nodes_with_multiple) + + return cls( + remote_asset_nodes_by_key=asset_nodes_by_key, + remote_asset_check_nodes_by_key=asset_checks_by_key, + ) + + +def _warn_on_duplicate_nodes(nodes_with_multiple: Sequence[RemoteWorkspaceAssetNode]) -> None: # Split the nodes into materializable, observable, and unexecutable nodes. Observable and # unexecutable `AssetNodeSnap` represent both source and external assets-- the # "External" in "AssetNodeSnap" is unrelated to the "external" in "external asset", this # is just an unfortunate naming collision. `AssetNodeSnap` will be renamed eventually. - materializable_node_pairs: List[Tuple[RepositoryHandle, "AssetNodeSnap"]] = [] - observable_node_pairs: List[Tuple[RepositoryHandle, "AssetNodeSnap"]] = [] - unexecutable_node_pairs: List[Tuple[RepositoryHandle, "AssetNodeSnap"]] = [] - for repo_handle, node in repo_handle_asset_node_snaps: - if node.is_source and node.is_observable: - observable_node_pairs.append((repo_handle, node)) - elif node.is_source: - unexecutable_node_pairs.append((repo_handle, node)) - else: - materializable_node_pairs.append((repo_handle, node)) + materializable_duplicates: Mapping[AssetKey, Sequence[str]] = {} + observable_duplicates: Mapping[AssetKey, Sequence[str]] = {} + for node in nodes_with_multiple: + check.invariant( + len(node.repo_scoped_asset_infos) > 1, "only perform check on nodes with multiple defs" + ) + observable_locations = [] + materializable_locations = [] + for info in node.repo_scoped_asset_infos: + snap = info.asset_node.asset_node_snap + location = info.asset_node.repository_handle.location_name + if snap.is_source and snap.is_observable: + observable_locations.append(location) + elif snap.is_materializable: + materializable_locations.append(location) + if len(observable_locations) > 1: + observable_duplicates[node.key] = observable_locations + if len(materializable_locations) > 1: + materializable_duplicates[node.key] = materializable_locations # It is possible for multiple nodes to exist that share the same key. This is invalid if # more than one node is materializable or if more than one node is observable. It is valid # if there is at most one materializable node and at most one observable node, with all # other nodes unexecutable. - _warn_on_duplicates_within_subset(materializable_node_pairs, AssetExecutionType.MATERIALIZATION) - _warn_on_duplicates_within_subset(observable_node_pairs, AssetExecutionType.OBSERVATION) + _warn_on_duplicates_within_subset(materializable_duplicates, AssetExecutionType.MATERIALIZATION) + _warn_on_duplicates_within_subset(observable_duplicates, AssetExecutionType.OBSERVATION) def _warn_on_duplicates_within_subset( - node_pairs: Sequence[Tuple[RepositoryHandle, "AssetNodeSnap"]], + duplicates: Mapping[AssetKey, Sequence[str]], execution_type: AssetExecutionType, ) -> None: - repo_handles_by_asset_key: DefaultDict[AssetKey, List[RepositoryHandle]] = defaultdict(list) - for repo_handle, node in node_pairs: - repo_handles_by_asset_key[node.asset_key].append(repo_handle) - - duplicates = {k: v for k, v in repo_handles_by_asset_key.items() if len(v) > 1} - duplicate_lines = [] - for asset_key, repo_handles in duplicates.items(): - locations = [repo_handle.code_location_origin.location_name for repo_handle in repo_handles] - duplicate_lines.append(f" {asset_key.to_string()}: {locations}") - duplicate_str = "\n".join(duplicate_lines) if duplicates: + duplicate_lines = [] + for asset_key, loc_names in duplicates.items(): + duplicate_lines.append(f" {asset_key.to_string()}: {loc_names}") + duplicate_str = "\n".join(duplicate_lines) + warnings.warn( f"Found {execution_type.value} nodes for some asset keys in multiple code locations." f" Only one {execution_type.value} node is allowed per asset key. Duplicates:\n {duplicate_str}" ) - - -def _build_execution_set_index( - asset_node_snaps: Iterable["AssetNodeSnap"], - asset_check_node_snaps: Iterable["AssetCheckNodeSnap"], -) -> Mapping[EntityKey, AbstractSet[EntityKey]]: - from dagster._core.remote_representation.external_data import AssetNodeSnap - - all_items = [*asset_node_snaps, *asset_check_node_snaps] - - execution_sets_by_id: Dict[str, Set[EntityKey]] = defaultdict(set) - for item in all_items: - id = item.execution_set_identifier - key = item.asset_key if isinstance(item, AssetNodeSnap) else item.key - if id is not None: - execution_sets_by_id[id].add(key) - - execution_sets_by_key: Dict[EntityKey, Set[EntityKey]] = {} - for item in all_items: - id = item.execution_set_identifier - key = item.asset_key if isinstance(item, AssetNodeSnap) else item.key - execution_sets_by_key[key] = execution_sets_by_id[id] if id is not None else {key} - - return execution_sets_by_key diff --git a/python_modules/dagster/dagster/_core/execution/asset_backfill.py b/python_modules/dagster/dagster/_core/execution/asset_backfill.py index de6df81596a84..f1c24f080d578 100644 --- a/python_modules/dagster/dagster/_core/execution/asset_backfill.py +++ b/python_modules/dagster/dagster/_core/execution/asset_backfill.py @@ -33,7 +33,7 @@ from dagster._core.definitions.partition import PartitionsDefinition, PartitionsSubset from dagster._core.definitions.partition_key_range import PartitionKeyRange from dagster._core.definitions.partition_mapping import IdentityPartitionMapping -from dagster._core.definitions.remote_asset_graph import RemoteAssetGraph +from dagster._core.definitions.remote_asset_graph import RemoteAssetGraph, RemoteWorkspaceAssetGraph from dagster._core.definitions.run_request import RunRequest from dagster._core.definitions.selector import PartitionsByAssetSelector from dagster._core.definitions.time_window_partition_mapping import TimeWindowPartitionMapping @@ -719,7 +719,7 @@ def _submit_runs_and_update_backfill_in_chunks( workspace_process_context: IWorkspaceProcessContext, backfill_id: str, asset_backfill_iteration_result: AssetBackfillIterationResult, - asset_graph: RemoteAssetGraph, + asset_graph: RemoteWorkspaceAssetGraph, logger: logging.Logger, run_tags: Mapping[str, str], instance_queryer: CachingInstanceQueryer, @@ -870,7 +870,7 @@ def _check_target_partitions_subset_is_valid( def _check_validity_and_deserialize_asset_backfill_data( workspace_context: BaseWorkspaceRequestContext, backfill: "PartitionBackfill", - asset_graph: BaseAssetGraph, + asset_graph: RemoteWorkspaceAssetGraph, instance_queryer: CachingInstanceQueryer, logger: logging.Logger, ) -> Optional[AssetBackfillData]: @@ -1183,7 +1183,7 @@ def get_canceling_asset_backfill_iteration_data( backfill_id: str, asset_backfill_data: AssetBackfillData, instance_queryer: CachingInstanceQueryer, - asset_graph: RemoteAssetGraph, + asset_graph: RemoteWorkspaceAssetGraph, backfill_start_timestamp: float, ) -> Iterable[Optional[AssetBackfillData]]: """For asset backfills in the "canceling" state, fetch the asset backfill data with the updated @@ -1221,7 +1221,7 @@ def get_canceling_asset_backfill_iteration_data( def get_asset_backfill_iteration_materialized_partitions( backfill_id: str, asset_backfill_data: AssetBackfillData, - asset_graph: RemoteAssetGraph, + asset_graph: RemoteWorkspaceAssetGraph, instance_queryer: CachingInstanceQueryer, ) -> Iterable[Optional[AssetGraphSubset]]: """Returns the partitions that have been materialized by the backfill. @@ -1270,7 +1270,7 @@ def get_asset_backfill_iteration_materialized_partitions( def _get_failed_and_downstream_asset_partitions( backfill_id: str, asset_backfill_data: AssetBackfillData, - asset_graph: RemoteAssetGraph, + asset_graph: RemoteWorkspaceAssetGraph, instance_queryer: CachingInstanceQueryer, backfill_start_timestamp: float, ) -> AssetGraphSubset: @@ -1341,7 +1341,7 @@ def _asset_graph_subset_to_str( def execute_asset_backfill_iteration_inner( backfill_id: str, asset_backfill_data: AssetBackfillData, - asset_graph: RemoteAssetGraph, + asset_graph: RemoteWorkspaceAssetGraph, instance_queryer: CachingInstanceQueryer, backfill_start_timestamp: float, logger: logging.Logger, @@ -1529,7 +1529,7 @@ def can_run_with_parent( parent: AssetKeyPartitionKey, candidate: AssetKeyPartitionKey, candidates_unit: Iterable[AssetKeyPartitionKey], - asset_graph: RemoteAssetGraph, + asset_graph: RemoteWorkspaceAssetGraph, target_subset: AssetGraphSubset, asset_partitions_to_request_map: Mapping[AssetKey, AbstractSet[Optional[str]]], ) -> Tuple[bool, str]: @@ -1564,7 +1564,10 @@ def can_run_with_parent( False, f"parent {parent_node.key.to_user_string()} and {candidate_node.key.to_user_string()} have different backfill policies so they cannot be materialized in the same run. {candidate_node.key.to_user_string()} can be materialized once {parent_node.key} is materialized.", ) - if parent_node.priority_repository_handle != candidate_node.priority_repository_handle: + if ( + parent_node.resolve_to_singular_repo_scoped_node().repository_handle + != candidate_node.resolve_to_singular_repo_scoped_node().repository_handle + ): return ( False, f"parent {parent_node.key.to_user_string()} and {candidate_node.key.to_user_string()} are in different code locations so they cannot be materialized in the same run. {candidate_node.key.to_user_string()} can be materialized once {parent_node.key.to_user_string()} is materialized.", @@ -1617,7 +1620,7 @@ def can_run_with_parent( def should_backfill_atomic_asset_partitions_unit( - asset_graph: RemoteAssetGraph, + asset_graph: RemoteWorkspaceAssetGraph, candidates_unit: Iterable[AssetKeyPartitionKey], asset_partitions_to_request: AbstractSet[AssetKeyPartitionKey], target_subset: AssetGraphSubset, diff --git a/python_modules/dagster/dagster/_core/execution/submit_asset_runs.py b/python_modules/dagster/dagster/_core/execution/submit_asset_runs.py index a4a0d435f6133..ad8b8f8c49a91 100644 --- a/python_modules/dagster/dagster/_core/execution/submit_asset_runs.py +++ b/python_modules/dagster/dagster/_core/execution/submit_asset_runs.py @@ -7,7 +7,7 @@ from dagster._core.definitions.asset_job import IMPLICIT_ASSET_JOB_NAME from dagster._core.definitions.asset_key import EntityKey from dagster._core.definitions.events import AssetKey -from dagster._core.definitions.remote_asset_graph import RemoteAssetGraph +from dagster._core.definitions.remote_asset_graph import RemoteWorkspaceAssetGraph from dagster._core.definitions.run_request import RunRequest from dagster._core.definitions.selector import JobSubsetSelector from dagster._core.errors import DagsterInvalidSubsetError, DagsterUserCodeProcessError @@ -27,7 +27,7 @@ class RunRequestExecutionData(NamedTuple): def _get_implicit_job_name_for_assets( - asset_graph: RemoteAssetGraph, asset_keys: Sequence[AssetKey] + asset_graph: RemoteWorkspaceAssetGraph, asset_keys: Sequence[AssetKey] ) -> Optional[str]: job_names = set(asset_graph.get_materialization_job_names(asset_keys[0])) for asset_key in asset_keys[1:]: @@ -54,7 +54,7 @@ def _get_execution_plan_entity_keys( def _get_job_execution_data_from_run_request( - asset_graph: RemoteAssetGraph, + asset_graph: RemoteWorkspaceAssetGraph, run_request: RunRequest, instance: DagsterInstance, workspace: BaseWorkspaceRequestContext, diff --git a/python_modules/dagster/dagster/_core/remote_representation/external.py b/python_modules/dagster/dagster/_core/remote_representation/external.py index eec98f1474081..9d2a5e9b44c07 100644 --- a/python_modules/dagster/dagster/_core/remote_representation/external.py +++ b/python_modules/dagster/dagster/_core/remote_representation/external.py @@ -83,7 +83,7 @@ if TYPE_CHECKING: from dagster._core.definitions.asset_key import EntityKey - from dagster._core.definitions.remote_asset_graph import RemoteAssetGraph + from dagster._core.definitions.remote_asset_graph import RemoteRepositoryAssetGraph from dagster._core.scheduler.instigation import InstigatorState from dagster._core.snap.execution_plan_snapshot import ExecutionStepSnap @@ -392,11 +392,11 @@ def get_display_metadata(self) -> Mapping[str, str]: return self.handle.display_metadata @cached_property - def asset_graph(self) -> "RemoteAssetGraph": + def asset_graph(self) -> "RemoteRepositoryAssetGraph": """Returns a repository scoped RemoteAssetGraph.""" - from dagster._core.definitions.remote_asset_graph import RemoteAssetGraph + from dagster._core.definitions.remote_asset_graph import RemoteRepositoryAssetGraph - return RemoteAssetGraph.from_remote_repository(self) + return RemoteRepositoryAssetGraph.build(self) def get_partition_names_for_asset_job( self, diff --git a/python_modules/dagster/dagster/_core/workspace/context.py b/python_modules/dagster/dagster/_core/workspace/context.py index 7143fb0a85ff0..e13e3ee60cfbf 100644 --- a/python_modules/dagster/dagster/_core/workspace/context.py +++ b/python_modules/dagster/dagster/_core/workspace/context.py @@ -65,7 +65,10 @@ from dagster._utils.error import SerializableErrorInfo, serializable_error_info_from_exc_info if TYPE_CHECKING: - from dagster._core.definitions.remote_asset_graph import RemoteAssetGraph, RemoteAssetNode + from dagster._core.definitions.remote_asset_graph import ( + RemoteWorkspaceAssetGraph, + RemoteWorkspaceAssetNode, + ) from dagster._core.remote_representation import ( PartitionConfigSnap, PartitionExecutionErrorSnap, @@ -107,7 +110,7 @@ def get_code_location_entries(self) -> Mapping[str, CodeLocationEntry]: return self.get_workspace_snapshot().code_location_entries @property - def asset_graph(self) -> "RemoteAssetGraph": + def asset_graph(self) -> "RemoteWorkspaceAssetGraph": return self.get_workspace_snapshot().asset_graph @property @@ -332,7 +335,7 @@ def get_notebook_data(self, code_location_name: str, notebook_path: str) -> byte def get_base_deployment_context(self) -> Optional["BaseWorkspaceRequestContext"]: return None - def get_asset_node(self, asset_key: AssetKey) -> Optional["RemoteAssetNode"]: + def get_asset_node(self, asset_key: AssetKey) -> Optional["RemoteWorkspaceAssetNode"]: if not self.get_workspace_snapshot().asset_graph.has(asset_key): return None diff --git a/python_modules/dagster/dagster/_core/workspace/workspace.py b/python_modules/dagster/dagster/_core/workspace/workspace.py index 0bf85648ec883..f890d3fbaf86b 100644 --- a/python_modules/dagster/dagster/_core/workspace/workspace.py +++ b/python_modules/dagster/dagster/_core/workspace/workspace.py @@ -8,7 +8,7 @@ from dagster._utils.error import SerializableErrorInfo if TYPE_CHECKING: - from dagster._core.definitions.remote_asset_graph import RemoteAssetGraph + from dagster._core.definitions.remote_asset_graph import RemoteWorkspaceAssetGraph from dagster._core.remote_representation import CodeLocation, CodeLocationOrigin @@ -48,10 +48,10 @@ class WorkspaceSnapshot: code_location_entries: Mapping[str, CodeLocationEntry] @cached_property - def asset_graph(self) -> "RemoteAssetGraph": - from dagster._core.definitions.remote_asset_graph import RemoteAssetGraph + def asset_graph(self) -> "RemoteWorkspaceAssetGraph": + from dagster._core.definitions.remote_asset_graph import RemoteWorkspaceAssetGraph - return RemoteAssetGraph.from_workspace_snapshot(self) + return RemoteWorkspaceAssetGraph.build(self) def with_code_location(self, name: str, entry: CodeLocationEntry) -> "WorkspaceSnapshot": return WorkspaceSnapshot(code_location_entries={**self.code_location_entries, name: entry}) diff --git a/python_modules/dagster/dagster_tests/asset_defs_tests/test_asset_graph.py b/python_modules/dagster/dagster_tests/asset_defs_tests/test_asset_graph.py index f94f92bdd8f7a..6cc911bce0743 100644 --- a/python_modules/dagster/dagster_tests/asset_defs_tests/test_asset_graph.py +++ b/python_modules/dagster/dagster_tests/asset_defs_tests/test_asset_graph.py @@ -55,7 +55,7 @@ def repo(): repository_handle=RepositoryHandle.for_test(location_name="fake", repository_name="repo"), instance=DagsterInstance.ephemeral(), ) - return RemoteAssetGraph.from_remote_repository(remote_repo) + return remote_repo.asset_graph @pytest.fixture( @@ -888,9 +888,7 @@ def b(): ... def repo_b(): return [b] - asset_graph = RemoteAssetGraph.from_workspace_snapshot( - mock_workspace_from_repos([repo_a, repo_b]) - ) + asset_graph = mock_workspace_from_repos([repo_a, repo_b]).asset_graph assert isinstance( asset_graph.get_partition_mapping(key=b.key, parent_asset_key=a.key), diff --git a/python_modules/dagster/dagster_tests/core_tests/execution_tests/test_asset_backfill.py b/python_modules/dagster/dagster_tests/core_tests/execution_tests/test_asset_backfill.py index 31430b56c44d4..b411a4fddfdcd 100644 --- a/python_modules/dagster/dagster_tests/core_tests/execution_tests/test_asset_backfill.py +++ b/python_modules/dagster/dagster_tests/core_tests/execution_tests/test_asset_backfill.py @@ -41,7 +41,7 @@ from dagster._core.definitions.base_asset_graph import BaseAssetGraph from dagster._core.definitions.decorators.repository_decorator import repository from dagster._core.definitions.events import AssetKeyPartitionKey -from dagster._core.definitions.remote_asset_graph import RemoteAssetGraph +from dagster._core.definitions.remote_asset_graph import RemoteWorkspaceAssetGraph from dagster._core.definitions.selector import ( PartitionRangeSelector, PartitionsByAssetSelector, @@ -255,7 +255,7 @@ def _get_instance_queryer( def _single_backfill_iteration( backfill_id, backfill_data, - asset_graph: RemoteAssetGraph, + asset_graph: RemoteWorkspaceAssetGraph, instance, assets_by_repo_name, ) -> AssetBackfillData: @@ -463,7 +463,7 @@ def downstream(upstream): def make_backfill_data( some_or_all: str, - asset_graph: RemoteAssetGraph, + asset_graph: RemoteWorkspaceAssetGraph, instance: DagsterInstance, current_time: datetime.datetime, ) -> AssetBackfillData: @@ -486,7 +486,7 @@ def make_backfill_data( def make_random_subset( - asset_graph: RemoteAssetGraph, + asset_graph: RemoteWorkspaceAssetGraph, instance: DagsterInstance, evaluation_time: datetime.datetime, ) -> AssetGraphSubset: @@ -520,7 +520,7 @@ def make_random_subset( def make_subset_from_partition_keys( partition_keys: Sequence[str], - asset_graph: RemoteAssetGraph, + asset_graph: RemoteWorkspaceAssetGraph, instance: DagsterInstance, evaluation_time: datetime.datetime, ) -> AssetGraphSubset: @@ -543,7 +543,7 @@ def make_subset_from_partition_keys( def get_asset_graph( assets_by_repo_name: Mapping[str, Sequence[AssetsDefinition]], -) -> RemoteAssetGraph: +) -> RemoteWorkspaceAssetGraph: assets_defs_by_key = { key: assets_def for assets in assets_by_repo_name.values() @@ -567,7 +567,7 @@ def get_asset_graph( def execute_asset_backfill_iteration_consume_generator( backfill_id: str, asset_backfill_data: AssetBackfillData, - asset_graph: RemoteAssetGraph, + asset_graph: RemoteWorkspaceAssetGraph, instance: DagsterInstance, ) -> AssetBackfillIterationResult: counter = Counter() @@ -592,7 +592,7 @@ def execute_asset_backfill_iteration_consume_generator( def run_backfill_to_completion( - asset_graph: RemoteAssetGraph, + asset_graph: RemoteWorkspaceAssetGraph, assets_by_repo_name: Mapping[str, Sequence[AssetsDefinition]], backfill_data: AssetBackfillData, fail_asset_partitions: Iterable[AssetKeyPartitionKey], @@ -742,7 +742,7 @@ def _requested_asset_partitions_in_run_request( def remote_asset_graph_from_assets_by_repo_name( assets_by_repo_name: Mapping[str, Sequence[AssetsDefinition]], -) -> RemoteAssetGraph: +) -> RemoteWorkspaceAssetGraph: repos = [] for repo_name, assets in assets_by_repo_name.items(): @@ -752,7 +752,7 @@ def repo(assets=assets): repos.append(repo) - return RemoteAssetGraph.from_workspace_snapshot(mock_workspace_from_repos(repos)) + return mock_workspace_from_repos(repos).asset_graph @pytest.mark.parametrize(