From 74b2fb5f1b4621c61717df67fe510f65420f1ce6 Mon Sep 17 00:00:00 2001 From: CfromBU <2649624957@qq.com> Date: Thu, 12 Dec 2024 07:33:53 +0000 Subject: [PATCH 01/15] Fix None graphbolt mask will always be set 0 automatically. --- python/dgl/distributed/dist_dataloader.py | 12 +++++++----- python/dgl/distributed/dist_graph.py | 16 ++++++++++------ 2 files changed, 17 insertions(+), 11 deletions(-) diff --git a/python/dgl/distributed/dist_dataloader.py b/python/dgl/distributed/dist_dataloader.py index 117379b2b4a3..b9733fdb99d1 100644 --- a/python/dgl/distributed/dist_dataloader.py +++ b/python/dgl/distributed/dist_dataloader.py @@ -311,7 +311,7 @@ def collate(self, items): raise NotImplementedError @staticmethod - def add_edge_attribute_to_graph(g, data_name): + def add_edge_attribute_to_graph(g, data_name, gb_padding=0): """Add data into the graph as an edge attribute. For some cases such as prob/mask-based sampling on GraphBolt partitions, @@ -329,7 +329,7 @@ def add_edge_attribute_to_graph(g, data_name): The name of data that's stored in DistGraph.ndata/edata. """ if g._use_graphbolt and data_name: - g.add_edge_attribute(data_name) + g.add_edge_attribute(data_name, gb_padding) class NodeCollator(Collator): @@ -366,7 +366,7 @@ class NodeCollator(Collator): :doc:`Minibatch Training Tutorials `. """ - def __init__(self, g, nids, graph_sampler): + def __init__(self, g, nids, graph_sampler, gb_padding=0): self.g = g if not isinstance(nids, Mapping): assert ( @@ -380,7 +380,7 @@ def __init__(self, g, nids, graph_sampler): # Add prob/mask into graphbolt partition's edge attributes if needed. if hasattr(self.graph_sampler, "prob"): Collator.add_edge_attribute_to_graph( - self.g, self.graph_sampler.prob + self.g, self.graph_sampler.prob, gb_padding ) @property @@ -612,6 +612,7 @@ def __init__( reverse_eids=None, reverse_etypes=None, negative_sampler=None, + gb_padding=0, ): self.g = g if not isinstance(eids, Mapping): @@ -642,7 +643,7 @@ def __init__( # Add prob/mask into graphbolt partition's edge attributes if needed. if hasattr(self.graph_sampler, "prob"): Collator.add_edge_attribute_to_graph( - self.g, self.graph_sampler.prob + self.g, self.graph_sampler.prob, gb_padding ) @property @@ -864,6 +865,7 @@ def __init__(self, g, eids, graph_sampler, device=None, **kwargs): else: dataloader_kwargs[k] = v + collator_kwargs["gb_padding"] = 1 if device is None: # for the distributed case default to the CPU device = "cpu" diff --git a/python/dgl/distributed/dist_graph.py b/python/dgl/distributed/dist_graph.py index d4e31ce02770..23a9caf92470 100644 --- a/python/dgl/distributed/dist_graph.py +++ b/python/dgl/distributed/dist_graph.py @@ -143,15 +143,16 @@ def _copy_data_from_shared_mem(name, shape): class AddEdgeAttributeFromKVRequest(rpc.Request): """Add edge attribute from kvstore to local GraphBolt partition.""" - def __init__(self, name, kv_names): + def __init__(self, name, kv_names, padding=0): self._name = name self._kv_names = kv_names + self._padding = padding def __getstate__(self): - return self._name, self._kv_names + return self._name, self._kv_names, self._padding def __setstate__(self, state): - self._name, self._kv_names = state + self._name, self._kv_names, self._padding = state def process_request(self, server_state): # For now, this is only used to add prob/mask data to the graph. @@ -169,7 +170,10 @@ def process_request(self, server_state): gpb = server_state.partition_book # Initialize the edge attribute. num_edges = g.total_num_edges - attr_data = torch.zeros(num_edges, dtype=data_type) + if self._padding == 0: + attr_data = torch.zeros(num_edges, dtype=data_type) + else: + attr_data = torch.full((num_edges,), self._padding, dtype=data_type) # Map data from kvstore to the local partition for inner edges only. num_inner_edges = gpb.metadata()[gpb.partid]["num_edges"] homo_eids = g.edge_attributes[EID][:num_inner_edges] @@ -1620,7 +1624,7 @@ def _get_edata_names(self, etype=None): edata_names.append(name) return edata_names - def add_edge_attribute(self, name): + def add_edge_attribute(self, name, padding=0): """Add an edge attribute into GraphBolt partition from edge data. Parameters @@ -1643,7 +1647,7 @@ def add_edge_attribute(self, name): ] rpc.send_request( self._client._main_server_id, - AddEdgeAttributeFromKVRequest(name, kv_names), + AddEdgeAttributeFromKVRequest(name, kv_names, padding), ) # Wait for the response. assert rpc.recv_response()._name == name From ecedeaeb82b9b43dbfdd4a74155bca82e46e8ccc Mon Sep 17 00:00:00 2001 From: CfromBU <2649624957@qq.com> Date: Thu, 12 Dec 2024 07:53:53 +0000 Subject: [PATCH 02/15] Fix None graphbolt mask will always be set 0 automatically. --- python/dgl/distributed/dist_graph.py | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/python/dgl/distributed/dist_graph.py b/python/dgl/distributed/dist_graph.py index 23a9caf92470..054e7d758900 100644 --- a/python/dgl/distributed/dist_graph.py +++ b/python/dgl/distributed/dist_graph.py @@ -170,10 +170,7 @@ def process_request(self, server_state): gpb = server_state.partition_book # Initialize the edge attribute. num_edges = g.total_num_edges - if self._padding == 0: - attr_data = torch.zeros(num_edges, dtype=data_type) - else: - attr_data = torch.full((num_edges,), self._padding, dtype=data_type) + attr_data = torch.full((num_edges,), self._padding, dtype=data_type) # Map data from kvstore to the local partition for inner edges only. num_inner_edges = gpb.metadata()[gpb.partid]["num_edges"] homo_eids = g.edge_attributes[EID][:num_inner_edges] From 76858ff40ba68c5248a2b1e23ffdc8557964bf3d Mon Sep 17 00:00:00 2001 From: CfromBU <2649624957@qq.com> Date: Thu, 12 Dec 2024 08:22:50 +0000 Subject: [PATCH 03/15] add comment --- python/dgl/distributed/dist_dataloader.py | 4 ++++ python/dgl/distributed/dist_graph.py | 2 ++ 2 files changed, 6 insertions(+) diff --git a/python/dgl/distributed/dist_dataloader.py b/python/dgl/distributed/dist_dataloader.py index b9733fdb99d1..bf644187cb96 100644 --- a/python/dgl/distributed/dist_dataloader.py +++ b/python/dgl/distributed/dist_dataloader.py @@ -327,6 +327,8 @@ def add_edge_attribute_to_graph(g, data_name, gb_padding=0): The graph. data_name : str The name of data that's stored in DistGraph.ndata/edata. + gb_padding : int, optional + The padding value for GraphBolt partitions' new edge_attributes. """ if g._use_graphbolt and data_name: g.add_edge_attribute(data_name, gb_padding) @@ -344,6 +346,8 @@ class NodeCollator(Collator): The node set to compute outputs. graph_sampler : dgl.dataloading.BlockSampler The neighborhood sampler. + gb_padding : int, optional + The padding value for GraphBolt partitions' new edge_attributes. Examples -------- diff --git a/python/dgl/distributed/dist_graph.py b/python/dgl/distributed/dist_graph.py index 054e7d758900..0049816047c2 100644 --- a/python/dgl/distributed/dist_graph.py +++ b/python/dgl/distributed/dist_graph.py @@ -1628,6 +1628,8 @@ def add_edge_attribute(self, name, padding=0): ---------- name : str The name of the edge attribute. + padding : int, optional + The padding value for the new edge attribute. """ # Sanity checks. if not self._use_graphbolt: From 23c6cd733369de1f92cfbce0eee7183077b09127 Mon Sep 17 00:00:00 2001 From: CfromBU <2649624957@qq.com> Date: Wed, 18 Dec 2024 01:53:14 +0000 Subject: [PATCH 04/15] add test case for DistGB mask issue --- .../distributed/test_distributed_sampling.py | 74 ++++++++++++++++++- 1 file changed, 73 insertions(+), 1 deletion(-) diff --git a/tests/distributed/test_distributed_sampling.py b/tests/distributed/test_distributed_sampling.py index 4ca8f7b130ac..3f6592ff19a1 100644 --- a/tests/distributed/test_distributed_sampling.py +++ b/tests/distributed/test_distributed_sampling.py @@ -7,8 +7,9 @@ import unittest from pathlib import Path -import backend as F import dgl + +import dgl.backend as F import numpy as np import pytest import torch @@ -1858,6 +1859,77 @@ def test_local_sampling_heterograph(num_parts, use_graphbolt, prob_or_mask): ) +def check_mask_hetero_sampling_gb(tmpdir, num_server, use_graphbolt=True): + def create_hetero_graph(dense=False, empty=False): + num_nodes = {"n1": 210, "n2": 200, "n3": 220, "n4": 230} + etypes = [("n1", "r12", "n2"), ("n2", "r23", "n3"), ("n3", "r34", "n4")] + edges = {} + random.seed(42) + for etype in etypes: + src_ntype, _, dst_ntype = etype + arr = spsp.random( + num_nodes[src_ntype] - 10 if empty else num_nodes[src_ntype], + num_nodes[dst_ntype] - 10 if empty else num_nodes[dst_ntype], + density=0.1, + format="coo", + random_state=100, + ) + edges[etype] = (arr.row, arr.col) + g = dgl.heterograph(edges, num_nodes) + + return g + + generate_ip_config("rpc_ip_config.txt", num_server, num_server) + + g = create_hetero_graph() + indices = torch.randperm(g.num_edges("r34"))[:10] + mask = torch.zeros(g.num_edges("r34"), dtype=torch.bool) + mask[indices] = True + + num_parts = num_server + + orig_nid_map, orig_eid_map = partition_graph( + g, + "test_sampling", + num_parts, + tmpdir, + num_hops=1, + part_method="metis", + return_mapping=True, + use_graphbolt=use_graphbolt, + store_eids=True, + ) + + pserver_list = [] + + part_config = tmpdir / "test_sampling.json" + + dgl.distributed.initialize("rpc_ip_config.txt") + dist_graph = DistGraph("test_sampling", part_config=part_config) + print(dist_graph.local_partition) + + os.environ["DGL_DIST_DEBUG"] = "1" + + edges = {("n3", "r34", "n4"): indices} + sampler = dgl.dataloading.MultiLayerNeighborSampler([10, 10], mask="mask") + loader = dgl.dataloading.DistEdgeDataLoader( + dist_graph, edges, sampler, batch_size=64 + ) + + block = next(iter(loader))[2][0] + assert block.num_src_nodes("n1") > 0 + + +@pytest.mark.parametrize("num_parts", [1]) +def test_local_masked_sampling_heterograph_gb( + num_server, +): + reset_envs() + os.environ["DGL_DIST_MODE"] = "distributed" + with tempfile.TemporaryDirectory() as tmpdirname: + check_mask_hetero_sampling_gb(Path(tmpdirname), num_server) + + if __name__ == "__main__": import tempfile From 4eb9a9d06ac965ad6a6ba3d86c40144ff5a4b6cd Mon Sep 17 00:00:00 2001 From: CfromBU <2649624957@qq.com> Date: Wed, 18 Dec 2024 02:24:31 +0000 Subject: [PATCH 05/15] change test case for distGB --- tests/distributed/test_distributed_sampling.py | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/tests/distributed/test_distributed_sampling.py b/tests/distributed/test_distributed_sampling.py index 3f6592ff19a1..e6b28be8c83a 100644 --- a/tests/distributed/test_distributed_sampling.py +++ b/tests/distributed/test_distributed_sampling.py @@ -1882,9 +1882,9 @@ def create_hetero_graph(dense=False, empty=False): generate_ip_config("rpc_ip_config.txt", num_server, num_server) g = create_hetero_graph() - indices = torch.randperm(g.num_edges("r34"))[:10] + eids = torch.randperm(g.num_edges("r34"))[:10] mask = torch.zeros(g.num_edges("r34"), dtype=torch.bool) - mask[indices] = True + mask[eids] = True num_parts = num_server @@ -1900,17 +1900,14 @@ def create_hetero_graph(dense=False, empty=False): store_eids=True, ) - pserver_list = [] - part_config = tmpdir / "test_sampling.json" dgl.distributed.initialize("rpc_ip_config.txt") dist_graph = DistGraph("test_sampling", part_config=part_config) - print(dist_graph.local_partition) os.environ["DGL_DIST_DEBUG"] = "1" - edges = {("n3", "r34", "n4"): indices} + edges = {("n3", "r34", "n4"): eids} sampler = dgl.dataloading.MultiLayerNeighborSampler([10, 10], mask="mask") loader = dgl.dataloading.DistEdgeDataLoader( dist_graph, edges, sampler, batch_size=64 From 5f848d4deb69c7b14e7d8a41cc1da17f6e07ebfc Mon Sep 17 00:00:00 2001 From: CfromBU <2649624957@qq.com> Date: Wed, 18 Dec 2024 02:40:14 +0000 Subject: [PATCH 06/15] change test case --- .../distributed/test_distributed_sampling.py | 20 ++++++++++++++++++- 1 file changed, 19 insertions(+), 1 deletion(-) diff --git a/tests/distributed/test_distributed_sampling.py b/tests/distributed/test_distributed_sampling.py index e6b28be8c83a..2ecce809ff18 100644 --- a/tests/distributed/test_distributed_sampling.py +++ b/tests/distributed/test_distributed_sampling.py @@ -1902,7 +1902,25 @@ def create_hetero_graph(dense=False, empty=False): part_config = tmpdir / "test_sampling.json" - dgl.distributed.initialize("rpc_ip_config.txt") + pserver_list = [] + ctx = mp.get_context("spawn") + for i in range(num_server): + p = ctx.Process( + target=start_server, + args=( + i, + tmpdir, + num_server > 1, + "test_sampling", + ["csc", "coo"], + True, + ), + ) + p.start() + time.sleep(1) + pserver_list.append(p) + + dgl.distributed.initialize("rpc_ip_config.txt", use_graphbolt=True) dist_graph = DistGraph("test_sampling", part_config=part_config) os.environ["DGL_DIST_DEBUG"] = "1" From a9539dcaa2631f54297b6f3cfb08c2d6a12c3dec Mon Sep 17 00:00:00 2001 From: CfromBU <2649624957@qq.com> Date: Thu, 19 Dec 2024 01:56:36 +0000 Subject: [PATCH 07/15] fix test_distributed_sampling --- .../distributed/test_distributed_sampling.py | 21 ++++++++++++------- 1 file changed, 13 insertions(+), 8 deletions(-) diff --git a/tests/distributed/test_distributed_sampling.py b/tests/distributed/test_distributed_sampling.py index 2ecce809ff18..29e9f488c114 100644 --- a/tests/distributed/test_distributed_sampling.py +++ b/tests/distributed/test_distributed_sampling.py @@ -1859,8 +1859,10 @@ def test_local_sampling_heterograph(num_parts, use_graphbolt, prob_or_mask): ) -def check_mask_hetero_sampling_gb(tmpdir, num_server, use_graphbolt=True): - def create_hetero_graph(dense=False, empty=False): +def check_hetero_dist_edge_dataloader_gb( + tmpdir, num_server, use_graphbolt=True +): + def create_hetero_graph(): num_nodes = {"n1": 210, "n2": 200, "n3": 220, "n4": 230} etypes = [("n1", "r12", "n2"), ("n2", "r23", "n3"), ("n3", "r34", "n4")] edges = {} @@ -1868,8 +1870,8 @@ def create_hetero_graph(dense=False, empty=False): for etype in etypes: src_ntype, _, dst_ntype = etype arr = spsp.random( - num_nodes[src_ntype] - 10 if empty else num_nodes[src_ntype], - num_nodes[dst_ntype] - 10 if empty else num_nodes[dst_ntype], + num_nodes[src_ntype], + num_nodes[dst_ntype], density=0.1, format="coo", random_state=100, @@ -1930,19 +1932,22 @@ def create_hetero_graph(dense=False, empty=False): loader = dgl.dataloading.DistEdgeDataLoader( dist_graph, edges, sampler, batch_size=64 ) + dgl.distributed.exit_client() + for p in pserver_list: + p.join() + assert p.exitcode == 0 block = next(iter(loader))[2][0] assert block.num_src_nodes("n1") > 0 -@pytest.mark.parametrize("num_parts", [1]) -def test_local_masked_sampling_heterograph_gb( - num_server, +def test_hetero_dist_edge_dataloader_gb( + num_server=1, ): reset_envs() os.environ["DGL_DIST_MODE"] = "distributed" with tempfile.TemporaryDirectory() as tmpdirname: - check_mask_hetero_sampling_gb(Path(tmpdirname), num_server) + check_hetero_dist_edge_dataloader_gb(Path(tmpdirname), num_server) if __name__ == "__main__": From 636c61cf9910dc356ee2df01c4cdf5309f3ff09e Mon Sep 17 00:00:00 2001 From: CfromBU <2649624957@qq.com> Date: Thu, 19 Dec 2024 02:17:55 +0000 Subject: [PATCH 08/15] add comment to create_hetero_graph() --- tests/distributed/test_distributed_sampling.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/tests/distributed/test_distributed_sampling.py b/tests/distributed/test_distributed_sampling.py index 29e9f488c114..411a9546ac8f 100644 --- a/tests/distributed/test_distributed_sampling.py +++ b/tests/distributed/test_distributed_sampling.py @@ -1862,6 +1862,9 @@ def test_local_sampling_heterograph(num_parts, use_graphbolt, prob_or_mask): def check_hetero_dist_edge_dataloader_gb( tmpdir, num_server, use_graphbolt=True ): + # Custom function to create a heterogeneous graph, ensuring that edges with missing masks + # can still be used to sample in DistEdgeDataloader. create_random_hetero does not support this case, + # so this function was added to handle the requirement. def create_hetero_graph(): num_nodes = {"n1": 210, "n2": 200, "n3": 220, "n4": 230} etypes = [("n1", "r12", "n2"), ("n2", "r23", "n3"), ("n3", "r34", "n4")] From 09f3718d81016250c087969093a6981ba65b7d10 Mon Sep 17 00:00:00 2001 From: CfromBU <2649624957@qq.com> Date: Thu, 19 Dec 2024 02:33:24 +0000 Subject: [PATCH 09/15] change test_distributed_sampling.py --- .../distributed/test_distributed_sampling.py | 30 +++---------------- 1 file changed, 4 insertions(+), 26 deletions(-) diff --git a/tests/distributed/test_distributed_sampling.py b/tests/distributed/test_distributed_sampling.py index 411a9546ac8f..6f2df8b84f0f 100644 --- a/tests/distributed/test_distributed_sampling.py +++ b/tests/distributed/test_distributed_sampling.py @@ -1862,33 +1862,11 @@ def test_local_sampling_heterograph(num_parts, use_graphbolt, prob_or_mask): def check_hetero_dist_edge_dataloader_gb( tmpdir, num_server, use_graphbolt=True ): - # Custom function to create a heterogeneous graph, ensuring that edges with missing masks - # can still be used to sample in DistEdgeDataloader. create_random_hetero does not support this case, - # so this function was added to handle the requirement. - def create_hetero_graph(): - num_nodes = {"n1": 210, "n2": 200, "n3": 220, "n4": 230} - etypes = [("n1", "r12", "n2"), ("n2", "r23", "n3"), ("n3", "r34", "n4")] - edges = {} - random.seed(42) - for etype in etypes: - src_ntype, _, dst_ntype = etype - arr = spsp.random( - num_nodes[src_ntype], - num_nodes[dst_ntype], - density=0.1, - format="coo", - random_state=100, - ) - edges[etype] = (arr.row, arr.col) - g = dgl.heterograph(edges, num_nodes) - - return g - generate_ip_config("rpc_ip_config.txt", num_server, num_server) - g = create_hetero_graph() - eids = torch.randperm(g.num_edges("r34"))[:10] - mask = torch.zeros(g.num_edges("r34"), dtype=torch.bool) + g = create_random_hetero() + eids = torch.randperm(g.num_edges("r23"))[:10] + mask = torch.zeros(g.num_edges("r23"), dtype=torch.bool) mask[eids] = True num_parts = num_server @@ -1930,7 +1908,7 @@ def create_hetero_graph(): os.environ["DGL_DIST_DEBUG"] = "1" - edges = {("n3", "r34", "n4"): eids} + edges = {("n2", "r23", "n3"): eids} sampler = dgl.dataloading.MultiLayerNeighborSampler([10, 10], mask="mask") loader = dgl.dataloading.DistEdgeDataLoader( dist_graph, edges, sampler, batch_size=64 From d46aa0c5767c5af9b5580b4f2ad35f3ebaa5e6b4 Mon Sep 17 00:00:00 2001 From: CfromBU <2649624957@qq.com> Date: Thu, 26 Dec 2024 07:30:41 +0000 Subject: [PATCH 10/15] change test_distributed_sampling --- tests/distributed/test_distributed_sampling.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/tests/distributed/test_distributed_sampling.py b/tests/distributed/test_distributed_sampling.py index 6f2df8b84f0f..1dd8fac8e34f 100644 --- a/tests/distributed/test_distributed_sampling.py +++ b/tests/distributed/test_distributed_sampling.py @@ -7,9 +7,8 @@ import unittest from pathlib import Path -import dgl - import dgl.backend as F +import dgl import numpy as np import pytest import torch From 46145ff904d6fb659197c345954baae559f9e1fa Mon Sep 17 00:00:00 2001 From: CfromBU <2649624957@qq.com> Date: Thu, 26 Dec 2024 07:31:44 +0000 Subject: [PATCH 11/15] change format --- tests/distributed/test_distributed_sampling.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/distributed/test_distributed_sampling.py b/tests/distributed/test_distributed_sampling.py index 1dd8fac8e34f..fcdaca133268 100644 --- a/tests/distributed/test_distributed_sampling.py +++ b/tests/distributed/test_distributed_sampling.py @@ -7,7 +7,7 @@ import unittest from pathlib import Path -import dgl.backend as F +import backend as F import dgl import numpy as np import pytest From 5089e3a7dd0dc0f8fa59484e3b1edca36ac75c46 Mon Sep 17 00:00:00 2001 From: CfromBU <2649624957@qq.com> Date: Tue, 7 Jan 2025 08:10:00 +0000 Subject: [PATCH 12/15] fix mask issue and deal with come comment --- python/dgl/distributed/dist_dataloader.py | 11 ++++++++--- python/dgl/distributed/dist_graph.py | 9 +++++++-- tests/distributed/test_distributed_sampling.py | 6 +++++- 3 files changed, 20 insertions(+), 6 deletions(-) diff --git a/python/dgl/distributed/dist_dataloader.py b/python/dgl/distributed/dist_dataloader.py index bf644187cb96..684afa2a9ed4 100644 --- a/python/dgl/distributed/dist_dataloader.py +++ b/python/dgl/distributed/dist_dataloader.py @@ -311,7 +311,7 @@ def collate(self, items): raise NotImplementedError @staticmethod - def add_edge_attribute_to_graph(g, data_name, gb_padding=0): + def add_edge_attribute_to_graph(g, data_name, gb_padding): """Add data into the graph as an edge attribute. For some cases such as prob/mask-based sampling on GraphBolt partitions, @@ -348,6 +348,8 @@ class NodeCollator(Collator): The neighborhood sampler. gb_padding : int, optional The padding value for GraphBolt partitions' new edge_attributes. + e.g. some edges of specific types have no mask, the mask will be set as gb_padding. + the edge will not be sampled if the mask is 0. Examples -------- @@ -512,6 +514,10 @@ class EdgeCollator(Collator): A set of builtin negative samplers are provided in :ref:`the negative sampling module `. + gb_padding : int, optional + The padding value for GraphBolt partitions' new edge_attributes if the attributes in DistGraph are None. + e.g. prob/mask-based sampling. + Only when the mask of one edge is set as 1, the edge will be sampled. Examples -------- @@ -616,7 +622,7 @@ def __init__( reverse_eids=None, reverse_etypes=None, negative_sampler=None, - gb_padding=0, + gb_padding=1, ): self.g = g if not isinstance(eids, Mapping): @@ -869,7 +875,6 @@ def __init__(self, g, eids, graph_sampler, device=None, **kwargs): else: dataloader_kwargs[k] = v - collator_kwargs["gb_padding"] = 1 if device is None: # for the distributed case default to the CPU device = "cpu" diff --git a/python/dgl/distributed/dist_graph.py b/python/dgl/distributed/dist_graph.py index 0049816047c2..92babaebf11f 100644 --- a/python/dgl/distributed/dist_graph.py +++ b/python/dgl/distributed/dist_graph.py @@ -143,7 +143,7 @@ def _copy_data_from_shared_mem(name, shape): class AddEdgeAttributeFromKVRequest(rpc.Request): """Add edge attribute from kvstore to local GraphBolt partition.""" - def __init__(self, name, kv_names, padding=0): + def __init__(self, name, kv_names, padding): self._name = name self._kv_names = kv_names self._padding = padding @@ -170,6 +170,11 @@ def process_request(self, server_state): gpb = server_state.partition_book # Initialize the edge attribute. num_edges = g.total_num_edges + + # Padding is used here to fill missing edge attributes (e.g., 'prob' or 'mask') for certain edge types. + # In DGLGraph, some edges may not have attributes or their values could be None. + # GraphBolt, however, samples edges based on specific attributes (like 'mask' == 1), so we pad the missing attributes with default values (e.g., 1 for 'mask') + # to ensure that all edges can be sampled consistently, regardless of whether their attributes are available in the DGLGraph. attr_data = torch.full((num_edges,), self._padding, dtype=data_type) # Map data from kvstore to the local partition for inner edges only. num_inner_edges = gpb.metadata()[gpb.partid]["num_edges"] @@ -1621,7 +1626,7 @@ def _get_edata_names(self, etype=None): edata_names.append(name) return edata_names - def add_edge_attribute(self, name, padding=0): + def add_edge_attribute(self, name, padding): """Add an edge attribute into GraphBolt partition from edge data. Parameters diff --git a/tests/distributed/test_distributed_sampling.py b/tests/distributed/test_distributed_sampling.py index fcdaca133268..556e091fbbe3 100644 --- a/tests/distributed/test_distributed_sampling.py +++ b/tests/distributed/test_distributed_sampling.py @@ -7,8 +7,9 @@ import unittest from pathlib import Path -import backend as F import dgl + +import dgl.backend as F import numpy as np import pytest import torch @@ -1919,6 +1920,9 @@ def check_hetero_dist_edge_dataloader_gb( block = next(iter(loader))[2][0] assert block.num_src_nodes("n1") > 0 + assert block.num_edges("r12") > 0 + assert block.num_edges("r13") > 0 + assert block.num_edges("r23") > 0 def test_hetero_dist_edge_dataloader_gb( From d3ea6a9f946532e83f02f36d52c1ecbde00ac004 Mon Sep 17 00:00:00 2001 From: CfromBU <2649624957@qq.com> Date: Thu, 9 Jan 2025 09:25:58 +0000 Subject: [PATCH 13/15] change dist_dataloader --- python/dgl/distributed/dist_dataloader.py | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/python/dgl/distributed/dist_dataloader.py b/python/dgl/distributed/dist_dataloader.py index 684afa2a9ed4..c4e75cdb52fc 100644 --- a/python/dgl/distributed/dist_dataloader.py +++ b/python/dgl/distributed/dist_dataloader.py @@ -347,9 +347,10 @@ class NodeCollator(Collator): graph_sampler : dgl.dataloading.BlockSampler The neighborhood sampler. gb_padding : int, optional - The padding value for GraphBolt partitions' new edge_attributes. - e.g. some edges of specific types have no mask, the mask will be set as gb_padding. - the edge will not be sampled if the mask is 0. + The padding value for GraphBolt partitions' new edge_attributes if the attributes in DistGraph are None. + e.g. prob/mask-based sampling. + Only when the mask of one edge is set as 1, the edge will be sampled in dgl.graphbolt.FusedCSCSamplingGraph.sample_neighbors. + The argument will be used in add_edge_attribute_to_graph to add new edge_attributes in graphbolt. Examples -------- @@ -372,7 +373,7 @@ class NodeCollator(Collator): :doc:`Minibatch Training Tutorials `. """ - def __init__(self, g, nids, graph_sampler, gb_padding=0): + def __init__(self, g, nids, graph_sampler, gb_padding=1): self.g = g if not isinstance(nids, Mapping): assert ( @@ -517,9 +518,8 @@ class EdgeCollator(Collator): gb_padding : int, optional The padding value for GraphBolt partitions' new edge_attributes if the attributes in DistGraph are None. e.g. prob/mask-based sampling. - Only when the mask of one edge is set as 1, the edge will be sampled. - - Examples + Only when the mask of one edge is set as 1, the edge will be sampled in dgl.graphbolt.FusedCSCSamplingGraph.sample_neighbors. + The argument will be used in add_edge_attribute_to_graph to add new edge_attributes in graphbolt. -------- The following example shows how to train a 3-layer GNN for edge classification on a set of edges ``train_eid`` on a homogeneous undirected graph. Each node takes From 2cc8857a31b30fd6da3ad6afa93a56b5fe576496 Mon Sep 17 00:00:00 2001 From: CfromBU <2649624957@qq.com> Date: Thu, 9 Jan 2025 09:27:10 +0000 Subject: [PATCH 14/15] enrich comment --- python/dgl/distributed/dist_graph.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/python/dgl/distributed/dist_graph.py b/python/dgl/distributed/dist_graph.py index 92babaebf11f..149d5d3fd878 100644 --- a/python/dgl/distributed/dist_graph.py +++ b/python/dgl/distributed/dist_graph.py @@ -171,10 +171,11 @@ def process_request(self, server_state): # Initialize the edge attribute. num_edges = g.total_num_edges - # Padding is used here to fill missing edge attributes (e.g., 'prob' or 'mask') for certain edge types. - # In DGLGraph, some edges may not have attributes or their values could be None. - # GraphBolt, however, samples edges based on specific attributes (like 'mask' == 1), so we pad the missing attributes with default values (e.g., 1 for 'mask') - # to ensure that all edges can be sampled consistently, regardless of whether their attributes are available in the DGLGraph. + # Padding is used to fill missing edge attributes (e.g., 'prob' or 'mask') for certain edge types. + # In DGLGraph, some edges may lack these attributes or have them set to None, but DGL will still sample these edges. + # In contrast, GraphBolt samples edges based on specific attributes (e.g., 'mask' == 1) and will skip edges with missing attributes. + # To ensure consistent sampling behavior in GraphBolt, we pad missing attributes with default values (e.g., 'mask' = 1), + # allowing all edges to be sampled, even if their attributes were missing or None in DGLGraph. attr_data = torch.full((num_edges,), self._padding, dtype=data_type) # Map data from kvstore to the local partition for inner edges only. num_inner_edges = gpb.metadata()[gpb.partid]["num_edges"] From 0681e203813396f4f37ad3f17bdec9b8e12fb439 Mon Sep 17 00:00:00 2001 From: CfromBU <2649624957@qq.com> Date: Thu, 9 Jan 2025 09:31:44 +0000 Subject: [PATCH 15/15] add comment in dist_dataloader.py --- python/dgl/distributed/dist_dataloader.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/python/dgl/distributed/dist_dataloader.py b/python/dgl/distributed/dist_dataloader.py index c4e75cdb52fc..833fb937a43c 100644 --- a/python/dgl/distributed/dist_dataloader.py +++ b/python/dgl/distributed/dist_dataloader.py @@ -349,7 +349,7 @@ class NodeCollator(Collator): gb_padding : int, optional The padding value for GraphBolt partitions' new edge_attributes if the attributes in DistGraph are None. e.g. prob/mask-based sampling. - Only when the mask of one edge is set as 1, the edge will be sampled in dgl.graphbolt.FusedCSCSamplingGraph.sample_neighbors. + Only when the mask of one edge is set as 1, an edge will be sampled in dgl.graphbolt.FusedCSCSamplingGraph.sample_neighbors. The argument will be used in add_edge_attribute_to_graph to add new edge_attributes in graphbolt. Examples @@ -518,7 +518,7 @@ class EdgeCollator(Collator): gb_padding : int, optional The padding value for GraphBolt partitions' new edge_attributes if the attributes in DistGraph are None. e.g. prob/mask-based sampling. - Only when the mask of one edge is set as 1, the edge will be sampled in dgl.graphbolt.FusedCSCSamplingGraph.sample_neighbors. + Only when the mask of one edge is set as 1, an edge will be sampled in dgl.graphbolt.FusedCSCSamplingGraph.sample_neighbors. The argument will be used in add_edge_attribute_to_graph to add new edge_attributes in graphbolt. -------- The following example shows how to train a 3-layer GNN for edge classification on a