From 0df51395d5feb9ab9bcbbffed76d1c9c63742d83 Mon Sep 17 00:00:00 2001 From: Alex Langenfeld Date: Tue, 15 Oct 2024 16:07:01 -0500 Subject: [PATCH] serializable RemoteAssetNodes (#24827) Make the global and repository scoped `RemoteAssetNode` serializable so we can persist/cache the resolved global asset node of a workspace. ## How I Tested These Changes added test --- .../_core/definitions/remote_asset_graph.py | 5 ++++- .../dagster/_core/remote_representation/handle.py | 2 ++ python_modules/dagster/dagster/_serdes/serdes.py | 8 ++++++++ .../asset_defs_tests/test_asset_graph.py | 14 ++++++++++++++ 4 files changed, 28 insertions(+), 1 deletion(-) diff --git a/python_modules/dagster/dagster/_core/definitions/remote_asset_graph.py b/python_modules/dagster/dagster/_core/definitions/remote_asset_graph.py index 25db24e036d37..5d7dd73f2cd33 100644 --- a/python_modules/dagster/dagster/_core/definitions/remote_asset_graph.py +++ b/python_modules/dagster/dagster/_core/definitions/remote_asset_graph.py @@ -45,6 +45,7 @@ from dagster._core.remote_representation.handle import InstigatorHandle, RepositoryHandle from dagster._core.workspace.workspace import WorkspaceSnapshot from dagster._record import ImportFrom, record +from dagster._serdes.serdes import whitelist_for_serdes from dagster._utils.cached_method import cached_method if TYPE_CHECKING: @@ -124,6 +125,7 @@ def job_names(self) -> Sequence[str]: ) +@whitelist_for_serdes @record class RemoteRepositoryAssetNode(RemoteAssetNode): """Asset nodes from a single RemoteRepository.""" @@ -191,6 +193,7 @@ def auto_observe_interval_minutes(self) -> Optional[float]: return self.asset_node_snap.auto_observe_interval_minutes +@whitelist_for_serdes @record class RepositoryScopedAssetInfo: """RemoteRepositoryAssetNode paired with additional information from that repository. @@ -207,6 +210,7 @@ def handle(self) -> RepositoryHandle: return self.asset_node.repository_handle +@whitelist_for_serdes @record class RemoteWorkspaceAssetNode(RemoteAssetNode): """Asset nodes constructed from a WorkspaceSnapshot, containing nodes from potentially several RemoteRepositories.""" @@ -315,7 +319,6 @@ def resolve_to_singular_repo_scoped_node(self) -> "RemoteRepositoryAssetNode": # materialization node was previously preferred over the observable node. This is a # temporary measure until we can appropriately scope the accessors that could apply to # either a materialization or observation node. - # This property supports existing behavior but it should be phased out, because it relies on # materialization nodes shadowing observation nodes that would otherwise be exposed. return next( diff --git a/python_modules/dagster/dagster/_core/remote_representation/handle.py b/python_modules/dagster/dagster/_core/remote_representation/handle.py index ece5f1a629234..cc59a21ef5bc2 100644 --- a/python_modules/dagster/dagster/_core/remote_representation/handle.py +++ b/python_modules/dagster/dagster/_core/remote_representation/handle.py @@ -12,11 +12,13 @@ RemoteRepositoryOrigin, ) from dagster._record import IHaveNew, record, record_custom +from dagster._serdes.serdes import whitelist_for_serdes if TYPE_CHECKING: from dagster._core.remote_representation.code_location import CodeLocation +@whitelist_for_serdes @record class RepositoryHandle: repository_name: str diff --git a/python_modules/dagster/dagster/_serdes/serdes.py b/python_modules/dagster/dagster/_serdes/serdes.py index 3f866173845ad..16ffd665683d3 100644 --- a/python_modules/dagster/dagster/_serdes/serdes.py +++ b/python_modules/dagster/dagster/_serdes/serdes.py @@ -871,6 +871,14 @@ def pack_value( ) -> Sequence[JsonSerializableValue]: ... +@overload +def pack_value( + val: PackableValue, + whitelist_map: WhitelistMap = ..., + descent_path: Optional[str] = ..., +) -> JsonSerializableValue: ... + + def pack_value( val: PackableValue, whitelist_map: WhitelistMap = _WHITELIST_MAP, diff --git a/python_modules/dagster/dagster_tests/asset_defs_tests/test_asset_graph.py b/python_modules/dagster/dagster_tests/asset_defs_tests/test_asset_graph.py index 6cc911bce0743..aabea0bdb5377 100644 --- a/python_modules/dagster/dagster_tests/asset_defs_tests/test_asset_graph.py +++ b/python_modules/dagster/dagster_tests/asset_defs_tests/test_asset_graph.py @@ -42,6 +42,7 @@ from dagster._core.remote_representation.external_data import RepositorySnap from dagster._core.remote_representation.handle import RepositoryHandle from dagster._core.test_utils import freeze_time, instance_for_test, mock_workspace_from_repos +from dagster._serdes.serdes import deserialize_value, serialize_value from dagster._time import create_datetime, get_current_datetime @@ -894,3 +895,16 @@ def repo_b(): asset_graph.get_partition_mapping(key=b.key, parent_asset_key=a.key), TimeWindowPartitionMapping, ) + + +def test_serdes() -> None: + @asset + def a(): ... + + @repository + def repo(): + return [a] + + asset_graph = mock_workspace_from_repos([repo]).asset_graph + for node in asset_graph.asset_nodes: + assert node == deserialize_value(serialize_value(node))