Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[GraphBolt][CUDA] Overlap feature fetcher #6954

Merged
merged 29 commits into from
Jan 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
30a9b2a
initial code
mfbalin Jan 1, 2024
7fc10c2
add Bufferer
mfbalin Jan 15, 2024
bdf9d81
Merge branch 'master' into gb_cuda_overlap_feature_fetcher
mfbalin Jan 15, 2024
0cf33a5
add example and debug
mfbalin Jan 15, 2024
cd42c36
finished implementation, overlap works.
mfbalin Jan 16, 2024
8d68ace
linting
mfbalin Jan 16, 2024
3e81d6e
linting of example
mfbalin Jan 16, 2024
9a205e3
linting
mfbalin Jan 16, 2024
239f1c3
remove unused include.
mfbalin Jan 16, 2024
80a8a80
Merge branch 'master' into gb_cuda_overlap_feature_fetcher
mfbalin Jan 16, 2024
0bd41de
add missing include
mfbalin Jan 16, 2024
07b8c9a
remove unnecessary datapipe step
mfbalin Jan 16, 2024
79e6308
Merge branch 'master' into gb_cuda_overlap_feature_fetcher
mfbalin Jan 16, 2024
b855a3c
add and fix the tests
mfbalin Jan 16, 2024
ba9349b
refactor and fix linter
mfbalin Jan 16, 2024
dcf1a3d
fix the test
mfbalin Jan 16, 2024
3ca7b14
add memory profiling to debug overlap memory issue in WSL.
mfbalin Jan 16, 2024
d426c24
Merge branch 'master' into gb_cuda_overlap_feature_fetcher
mfbalin Jan 16, 2024
4eb916a
fix memory leak
mfbalin Jan 16, 2024
6434470
take back example changes
mfbalin Jan 16, 2024
ba0b170
linting
mfbalin Jan 16, 2024
0b1d3f6
fix linting
mfbalin Jan 16, 2024
ba52ef2
move the uva files into cuda directory.
mfbalin Jan 16, 2024
2e00869
A more sure way of initializing the uva_stream.
mfbalin Jan 16, 2024
5d30466
minor comment fix.
mfbalin Jan 16, 2024
b552ddb
compile uva source file when CUDA is enabled.
mfbalin Jan 16, 2024
6c7305c
address reviews
mfbalin Jan 17, 2024
0d6a9ca
address reviews and add docstring about Bufferer
mfbalin Jan 18, 2024
5e62667
Merge branch 'master' into gb_cuda_overlap_feature_fetcher
mfbalin Jan 18, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions graphbolt/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ file(GLOB BOLT_SRC ${BOLT_DIR}/*.cc)
if(USE_CUDA)
file(GLOB BOLT_CUDA_SRC
${BOLT_DIR}/cuda/*.cu
${BOLT_DIR}/cuda/*.cc
)
list(APPEND BOLT_SRC ${BOLT_CUDA_SRC})
if(DEFINED ENV{CUDAARCHS})
Expand Down
13 changes: 10 additions & 3 deletions graphbolt/src/cuda/index_select_impl.cu
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#include <numeric>

#include "./common.h"
#include "./max_uva_threads.h"
#include "./utils.h"

namespace graphbolt {
Expand Down Expand Up @@ -122,17 +123,23 @@ torch::Tensor UVAIndexSelectImpl_(torch::Tensor input, torch::Tensor index) {
if (aligned_feature_size == 1) {
// Use a single thread to process each output row to avoid wasting threads.
const int num_threads = cuda::FindNumThreads(return_len);
const int num_blocks = (return_len + num_threads - 1) / num_threads;
const int num_blocks =
(std::min(return_len, cuda::max_uva_threads.value_or(1 << 20)) +
num_threads - 1) /
num_threads;
CUDA_KERNEL_CALL(
IndexSelectSingleKernel, num_blocks, num_threads, 0, input_ptr,
input_len, index_sorted_ptr, return_len, ret_ptr, permutation_ptr);
} else {
dim3 block(512, 1);
constexpr int BLOCK_SIZE = 512;
dim3 block(BLOCK_SIZE, 1);
while (static_cast<int64_t>(block.x) >= 2 * aligned_feature_size) {
block.x >>= 1;
block.y <<= 1;
}
const dim3 grid((return_len + block.y - 1) / block.y);
const dim3 grid(std::min(
(return_len + block.y - 1) / block.y,
cuda::max_uva_threads.value_or(1 << 20) / BLOCK_SIZE));
if (aligned_feature_size * sizeof(DType) <= GPU_CACHE_LINE_SIZE) {
// When feature size is smaller than GPU cache line size, use unaligned
// version for less SM usage, which is more resource efficient.
Expand Down
15 changes: 15 additions & 0 deletions graphbolt/src/cuda/max_uva_threads.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
/**
* Copyright (c) 2023 by Contributors
* Copyright (c) 2023, GT-TDAlab (Muhammed Fatih Balin & Umit V. Catalyurek)
* @file cuda/max_uva_threads.cc
* @brief Max uva threads variable setter function.
*/
#include "./max_uva_threads.h"

namespace graphbolt {
namespace cuda {

void set_max_uva_threads(int64_t count) { max_uva_threads = count; }

} // namespace cuda
} // namespace graphbolt
24 changes: 24 additions & 0 deletions graphbolt/src/cuda/max_uva_threads.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/**
* Copyright (c) 2023 by Contributors
* Copyright (c) 2023, GT-TDAlab (Muhammed Fatih Balin & Umit V. Catalyurek)
* @file cuda/max_uva_threads.h
* @brief Max uva threads variable declaration.
*/
#ifndef GRAPHBOLT_MAX_UVA_THREADS_H_
#define GRAPHBOLT_MAX_UVA_THREADS_H_

#include <cstdint>
#include <optional>

namespace graphbolt {
namespace cuda {

/** @brief Set a limit on the number of CUDA threads for UVA accesses. */
inline std::optional<int64_t> max_uva_threads;
frozenbugs marked this conversation as resolved.
Show resolved Hide resolved

void set_max_uva_threads(int64_t count);

} // namespace cuda
} // namespace graphbolt

#endif // GRAPHBOLT_MAX_UVA_THREADS_H_
6 changes: 6 additions & 0 deletions graphbolt/src/python_binding.cc
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@
#include <graphbolt/serialize.h>
#include <graphbolt/unique_and_compact.h>

#ifdef GRAPHBOLT_USE_CUDA
#include "./cuda/max_uva_threads.h"
#endif
#include "./index_select.h"
#include "./random.h"

Expand Down Expand Up @@ -75,6 +78,9 @@ TORCH_LIBRARY(graphbolt, m) {
m.def("index_select", &ops::IndexSelect);
m.def("index_select_csc", &ops::IndexSelectCSC);
m.def("set_seed", &RandomEngine::SetManualSeed);
#ifdef GRAPHBOLT_USE_CUDA
m.def("set_max_uva_threads", &cuda::set_max_uva_threads);
#endif
}

} // namespace sampling
Expand Down
117 changes: 115 additions & 2 deletions python/dgl/graphbolt/dataloader.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
"""Graph Bolt DataLoaders"""

from queue import Queue

import torch
import torch.utils.data
import torchdata.dataloader2.graph as dp_utils
import torchdata.datapipes as dp
Expand Down Expand Up @@ -35,6 +38,62 @@ def _find_and_wrap_parent(
)


class EndMarker(dp.iter.IterDataPipe):
"""Used to mark the end of a datapipe and is a no-op."""

def __init__(self, datapipe):
self.datapipe = datapipe

def __iter__(self):
for data in self.datapipe:
yield data


class Bufferer(dp.iter.IterDataPipe):
"""Buffers items before yielding them.

Parameters
----------
datapipe : DataPipe
The data pipeline.
buffer_size : int, optional
The size of the buffer which stores the fetched samples. If data coming
from datapipe has latency spikes, consider increasing passing a high
value. Default is 2.
"""

def __init__(self, datapipe, buffer_size=2):
mfbalin marked this conversation as resolved.
Show resolved Hide resolved
self.datapipe = datapipe
if buffer_size <= 0:
raise ValueError(
"'buffer_size' is required to be a positive integer."
)
self.buffer = Queue(buffer_size)

def __iter__(self):
for data in self.datapipe:
if not self.buffer.full():
self.buffer.put(data)
else:
return_data = self.buffer.get()
self.buffer.put(data)
yield return_data
while not self.buffer.empty():
yield self.buffer.get()


class Awaiter(dp.iter.IterDataPipe):
"""Calls the wait function of all items."""

def __init__(self, datapipe):
self.datapipe = datapipe

def __iter__(self):
for data in self.datapipe:
data.wait()
yield data


class MultiprocessingWrapper(dp.iter.IterDataPipe):
"""Wraps a datapipe with multiprocessing.

Expand Down Expand Up @@ -64,6 +123,14 @@ def __iter__(self):
yield from self.dataloader


# There needs to be a single instance of the uva_stream, if it is created
# multiple times, it leads to multiple CUDA memory pools and memory leaks.
def _get_uva_stream():
if not hasattr(_get_uva_stream, "stream"):
_get_uva_stream.stream = torch.cuda.Stream(priority=-1)
return _get_uva_stream.stream


class DataLoader(torch.utils.data.DataLoader):
"""Multiprocessing DataLoader.

Expand All @@ -84,16 +151,34 @@ class DataLoader(torch.utils.data.DataLoader):
If True, the data loader will not shut down the worker processes after a
dataset has been consumed once. This allows to maintain the workers
instances alive.
overlap_feature_fetch : bool, optional
If True, the data loader will overlap the UVA feature fetcher operations
with the rest of operations by using an alternative CUDA stream. Default
is True.
max_uva_threads : int, optional
Limits the number of CUDA threads used for UVA copies so that the rest
of the computations can run simultaneously with it. Setting it to a too
high value will limit the amount of overlap while setting it too low may
cause the PCI-e bandwidth to not get fully utilized. Manually tuned
default is 6144, meaning around 3-4 Streaming Multiprocessors.
"""

def __init__(self, datapipe, num_workers=0, persistent_workers=True):
def __init__(
self,
datapipe,
num_workers=0,
persistent_workers=True,
overlap_feature_fetch=True,
max_uva_threads=6144,
mfbalin marked this conversation as resolved.
Show resolved Hide resolved
):
# Multiprocessing requires two modifications to the datapipe:
#
# 1. Insert a stage after ItemSampler to distribute the
# minibatches evenly across processes.
# 2. Cut the datapipe at FeatureFetcher, and wrap the inner datapipe
# of the FeatureFetcher with a multiprocessing PyTorch DataLoader.

datapipe = EndMarker(datapipe)
datapipe_graph = dp_utils.traverse_dps(datapipe)
datapipe_adjlist = datapipe_graph_to_adjlist(datapipe_graph)

Expand Down Expand Up @@ -122,7 +207,35 @@ def __init__(self, datapipe, num_workers=0, persistent_workers=True):
persistent_workers=persistent_workers,
)

# (3) Cut datapipe at CopyTo and wrap with prefetcher. This enables the
# (3) Overlap UVA feature fetching by buffering and using an alternative
# stream.
mfbalin marked this conversation as resolved.
Show resolved Hide resolved
if (
overlap_feature_fetch
and num_workers == 0
and torch.cuda.is_available()
):
torch.ops.graphbolt.set_max_uva_threads(max_uva_threads)
mfbalin marked this conversation as resolved.
Show resolved Hide resolved
feature_fetchers = dp_utils.find_dps(
datapipe_graph,
FeatureFetcher,
)
for feature_fetcher in feature_fetchers:
feature_fetcher.stream = _get_uva_stream()
_find_and_wrap_parent(
datapipe_graph,
datapipe_adjlist,
EndMarker,
Bufferer,
buffer_size=2,
)
_find_and_wrap_parent(
datapipe_graph,
datapipe_adjlist,
EndMarker,
Awaiter,
)

# (4) Cut datapipe at CopyTo and wrap with prefetcher. This enables the
# data pipeline up to the CopyTo operation to run in a separate thread.
_find_and_wrap_parent(
datapipe_graph,
Expand Down
71 changes: 51 additions & 20 deletions python/dgl/graphbolt/feature_fetcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

from typing import Dict

import torch

from torch.utils.data import functional_datapipe

from .base import etype_tuple_to_str
Expand Down Expand Up @@ -52,8 +54,9 @@ def __init__(
self.feature_store = feature_store
self.node_feature_keys = node_feature_keys
self.edge_feature_keys = edge_feature_keys
self.stream = None

def _read(self, data):
def _read_data(self, data, stream):
"""
Fill in the node/edge features field in data.

Expand All @@ -77,6 +80,12 @@ def _read(self, data):
) or isinstance(self.edge_feature_keys, Dict)
# Read Node features.
input_nodes = data.node_ids()

def record_stream(tensor):
if stream is not None and tensor.is_cuda:
tensor.record_stream(stream)
return tensor

if self.node_feature_keys and input_nodes is not None:
if is_heterogeneous:
for type_name, feature_names in self.node_feature_keys.items():
Expand All @@ -86,19 +95,23 @@ def _read(self, data):
for feature_name in feature_names:
node_features[
(type_name, feature_name)
] = self.feature_store.read(
"node",
type_name,
feature_name,
nodes,
] = record_stream(
self.feature_store.read(
"node",
type_name,
feature_name,
nodes,
)
)
else:
for feature_name in self.node_feature_keys:
node_features[feature_name] = self.feature_store.read(
"node",
None,
feature_name,
input_nodes,
node_features[feature_name] = record_stream(
self.feature_store.read(
"node",
None,
feature_name,
input_nodes,
)
)
# Read Edge features.
if self.edge_feature_keys and num_layers > 0:
Expand All @@ -124,19 +137,37 @@ def _read(self, data):
for feature_name in feature_names:
edge_features[i][
(type_name, feature_name)
] = self.feature_store.read(
"edge", type_name, feature_name, edges
] = record_stream(
self.feature_store.read(
"edge", type_name, feature_name, edges
)
)
else:
for feature_name in self.edge_feature_keys:
edge_features[i][
feature_name
] = self.feature_store.read(
"edge",
None,
feature_name,
original_edge_ids,
edge_features[i][feature_name] = record_stream(
self.feature_store.read(
"edge",
None,
feature_name,
original_edge_ids,
)
)
data.set_node_features(node_features)
data.set_edge_features(edge_features)
return data

def _read(self, data):
current_stream = None
if self.stream is not None:
current_stream = torch.cuda.current_stream()
self.stream.wait_stream(current_stream)
with torch.cuda.stream(self.stream):
data = self._read_data(data, current_stream)
if self.stream is not None:
event = torch.cuda.current_stream().record_event()

def _wait():
event.wait()

data.wait = _wait
return data
Loading