From 8e5a4369c871c539ab464fbe567a20435a97fd48 Mon Sep 17 00:00:00 2001 From: Cedric Augonnet Date: Sun, 26 Jan 2025 13:38:07 +0100 Subject: [PATCH 1/3] Use a stream_adapter for the allocator of the inner graph in an algorithm --- cudax/include/cuda/experimental/stf.cuh | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/cudax/include/cuda/experimental/stf.cuh b/cudax/include/cuda/experimental/stf.cuh index 267a0dbd4b1..b951dc6f5ef 100644 --- a/cudax/include/cuda/experimental/stf.cuh +++ b/cudax/include/cuda/experimental/stf.cuh @@ -1750,6 +1750,11 @@ public: gctx.get_dot()->set_ctx_symbol("algo: " + symbol); gctx.set_allocator(block_allocator(gctx)); + // This creates an adapter which "redirects" allocations to the CUDA stream API + auto wrapper = stream_adapter(gctx, stream); + + gctx.update_uncached_allocator(wrapper.allocator()); + auto current_place = gctx.default_exec_place(); From b1b9c62ee30b5589a0d56a1273a4c01e88abe60f Mon Sep 17 00:00:00 2001 From: Cedric Augonnet Date: Tue, 28 Jan 2025 13:48:35 +0100 Subject: [PATCH 2/3] Entirely rework the stream_adapter implementation so that it is movable and to check clear() was called, factorize code to setup allocators in algorithms --- .../__stf/allocators/adapters.cuh | 91 +++++++++++++++---- cudax/include/cuda/experimental/stf.cuh | 46 +++++++--- 2 files changed, 109 insertions(+), 28 deletions(-) diff --git a/cudax/include/cuda/experimental/__stf/allocators/adapters.cuh b/cudax/include/cuda/experimental/__stf/allocators/adapters.cuh index fce719de5e6..e33a4cdd039 100644 --- a/cudax/include/cuda/experimental/__stf/allocators/adapters.cuh +++ b/cudax/include/cuda/experimental/__stf/allocators/adapters.cuh @@ -35,7 +35,9 @@ namespace cuda::experimental::stf * allocations APIs (cudaMallocAsync, cudaFreeAsync) * * This can be used as an alternative in CUDA graphs to avoid creating CUDA - * graphs with a large memory footprints. + * graphs with a large memory footprints. Allocations will be done in the same + * stream as the one used to launch the graph, and will be destroyed in that + * stream when the graph has been launched. */ class stream_adapter { @@ -55,15 +57,30 @@ class stream_adapter data_place memory_node; }; + /** + * @brief Store the state of the allocator such as the buffers to free + */ + struct adapter_allocator_state + { + adapter_allocator_state(cudaStream_t stream_) + : stream(stream_) + {} + + // Resources to release + ::std::vector to_free; + + // stream used to allocate data + cudaStream_t stream; + }; + /** * @brief allocator interface created within the stream_adapter */ class adapter_allocator : public block_allocator_interface { public: - adapter_allocator(cudaStream_t stream_, stream_adapter* sa_) - : stream(stream_) - , sa(sa_) + adapter_allocator(::std::shared_ptr state_) + : state(mv(state_)) {} // these are events from a graph_ctx @@ -109,7 +126,7 @@ class stream_adapter } }; - cuda_safe_call(cudaMallocAsync(&result, s, stream)); + cuda_safe_call(cudaMallocAsync(&result, s, state->stream)); } return result; @@ -118,8 +135,8 @@ class stream_adapter void deallocate( backend_ctx_untyped&, const data_place& memory_node, event_list& /* prereqs */, void* ptr, size_t sz) override { - // Do not deallocate buffers, this is done later after - sa->to_free.emplace_back(ptr, sz, memory_node); + // Do not deallocate buffers, this is done later when we call clear() + state->to_free.emplace_back(ptr, sz, memory_node); // Prereqs are unchanged } @@ -135,16 +152,49 @@ class stream_adapter } private: - cudaStream_t stream; - stream_adapter* sa; + // To keep track of allocated resources + ::std::shared_ptr state; }; public: template - stream_adapter(context_t& ctx, cudaStream_t stream_ /*, block_allocator_untyped root_allocator_*/) - : stream(stream_) /*, root_allocator(mv(root_allocator_))*/ + stream_adapter(context_t& ctx, cudaStream_t stream /*, block_allocator_untyped root_allocator_*/) + : adapter_state(::std::make_shared(stream)) + , alloc(block_allocator(ctx, adapter_state)) + {} + + // Delete copy constructor and copy assignment operator + stream_adapter(const stream_adapter&) = delete; + stream_adapter& operator=(const stream_adapter&) = delete; + + // This is movable, but we don't need to call clear anymore after moving + stream_adapter(stream_adapter&& other) noexcept + : adapter_state(other.adapter_state) + , alloc(other.alloc) + , cleared_or_moved(other.cleared_or_moved) { - alloc = block_allocator(ctx, stream, this); + // No need to clear this now that it was moved + other.cleared_or_moved = true; + } + + stream_adapter& operator=(stream_adapter&& other) noexcept + { + if (this != &other) + { + adapter_state = mv(other.adapter_state); + alloc = mv(other.alloc); + cleared_or_moved = other.cleared_or_moved; + + // Mark the moved-from object as "moved" + other.cleared_or_moved = true; + } + return *this; + } + + // Destructor + ~stream_adapter() + { + _CCCL_ASSERT(cleared_or_moved, "clear() was not called."); } /** @@ -152,16 +202,21 @@ public: */ void clear() { + _CCCL_ASSERT(adapter_state, "Invalid state"); + _CCCL_ASSERT(!cleared_or_moved, "clear() was already called, or the object was moved."); + // We avoid changing device around every CUDA API call, so we will only // change it when necessary, and restore the current device at the end // of the loop. const int prev_dev_id = cuda_try(); int current_dev_id = prev_dev_id; + cudaStream_t stream = adapter_state->stream; + // No need to wait for the stream multiple times bool stream_was_synchronized = false; - for (auto& b : to_free) + for (auto& b : adapter_state->to_free) { if (b.memory_node == data_place::host) { @@ -200,7 +255,9 @@ public: cuda_safe_call(cudaSetDevice(prev_dev_id)); } - to_free.clear(); + adapter_state->to_free.clear(); + + cleared_or_moved = true; } /** @@ -213,11 +270,13 @@ public: } private: - cudaStream_t stream; + ::std::shared_ptr adapter_state; + // Note this is using a PIMPL idiom so it's movable block_allocator_untyped alloc; - ::std::vector to_free; + // We need to call clear() before destroying the object, unless it was moved + bool cleared_or_moved = false; }; } // end namespace cuda::experimental::stf diff --git a/cudax/include/cuda/experimental/stf.cuh b/cudax/include/cuda/experimental/stf.cuh index b951dc6f5ef..466f0baf5ff 100644 --- a/cudax/include/cuda/experimental/stf.cuh +++ b/cudax/include/cuda/experimental/stf.cuh @@ -1664,22 +1664,42 @@ public: return runner_impl(ctx, *this, mv(deps)...); } + auto setup_allocator(graph_ctx& gctx, cudaStream_t stream) + { + // Use a pooled allocator: this avoids calling the underlying "uncached" + // allocator too often by making larger allocations which can be used for + // multiple small allocations + gctx.set_allocator(block_allocator(gctx)); + + // The uncached allocator allocates the (large) blocks of memory required + // by the allocator. Within CUDA graphs, using memory nodes is expensive, + // and caching a graph with memory nodes may appear as "leaking" memory. + // We thus use the stream_adapter allocator which relies stream-based + // asynchronous allocator API (cudaMallocAsync, cudaFreeAsync) + // The resources reserved by this allocator can be released asynchronously + // after the submission of the CUDA graph. + auto wrapper = stream_adapter(gctx, stream); + + gctx.update_uncached_allocator(wrapper.allocator()); + + return wrapper; + } + /* Execute the algorithm as a CUDA graph and launch this graph in a CUDA * stream */ template void run(Fun fun, parent_ctx_t& parent_ctx, cudaStream_t stream, Args... args) { - auto argsTuple = ::std::make_tuple(args...); graph_ctx gctx(parent_ctx.async_resources()); // Useful for tools gctx.set_parent_ctx(parent_ctx); gctx.get_dot()->set_ctx_symbol("algo: " + symbol); - // This creates an adapter which "redirects" allocations to the CUDA stream API - auto wrapper = stream_adapter(gctx, stream); - - gctx.update_uncached_allocator(wrapper.allocator()); + // This will setup allocators to avoid created CUDA graph memory nodes, and + // defer the allocations and deallocations to the cudaMallocAsync API + // instead. These resources need to be released later with .clear() + auto adapter = setup_allocator(gctx, stream); auto current_place = gctx.default_exec_place(); @@ -1692,6 +1712,7 @@ public: }; // Transform the tuple of instances into a tuple of logical data + auto argsTuple = ::std::make_tuple(args...); auto logicalArgsTuple = ::std::apply( [&](auto&&... args) { return ::std::tuple(logify(::std::forward(args))...); @@ -1735,7 +1756,7 @@ public: cuda_safe_call(cudaGraphLaunch(*eg, stream)); // Free resources allocated through the adapter - wrapper.clear(); + adapter.clear(); } /* Contrary to `run`, we here have a dynamic set of dependencies for the @@ -1749,12 +1770,10 @@ public: gctx.set_parent_ctx(parent_ctx); gctx.get_dot()->set_ctx_symbol("algo: " + symbol); - gctx.set_allocator(block_allocator(gctx)); - // This creates an adapter which "redirects" allocations to the CUDA stream API - auto wrapper = stream_adapter(gctx, stream); - - gctx.update_uncached_allocator(wrapper.allocator()); - + // This will setup allocators to avoid created CUDA graph memory nodes, and + // defer the allocations and deallocations to the cudaMallocAsync API + // instead. These resources need to be released later with .clear() + auto adapter = setup_allocator(gctx, stream); auto current_place = gctx.default_exec_place(); @@ -1792,6 +1811,9 @@ public: } cuda_safe_call(cudaGraphLaunch(*eg, stream)); + + // Free resources allocated through the adapter + adapter.clear(); } private: From ed1f24427b111694985d0be95e9b82592bb3f08d Mon Sep 17 00:00:00 2001 From: Cedric Augonnet Date: Tue, 28 Jan 2025 15:10:41 +0100 Subject: [PATCH 3/3] Add a new tests and some comments --- cudax/test/stf/CMakeLists.txt | 1 + cudax/test/stf/allocators/adapter.cu | 4 + .../stf/graph/for_each_batched_allocator.cu | 82 +++++++++++++++++++ 3 files changed, 87 insertions(+) create mode 100644 cudax/test/stf/graph/for_each_batched_allocator.cu diff --git a/cudax/test/stf/CMakeLists.txt b/cudax/test/stf/CMakeLists.txt index 20527586cfe..2b78e97f2ba 100644 --- a/cudax/test/stf/CMakeLists.txt +++ b/cudax/test/stf/CMakeLists.txt @@ -85,6 +85,7 @@ set(stf_test_codegen_sources freeze/task_fence.cu graph/epoch.cu graph/for_each_batched.cu + graph/for_each_batched_allocator.cu graph/for_each_batched_write.cu graph/freeze_for_graph.cu graph/graph_composition.cu diff --git a/cudax/test/stf/allocators/adapter.cu b/cudax/test/stf/allocators/adapter.cu index e205883523d..425959fb625 100644 --- a/cudax/test/stf/allocators/adapter.cu +++ b/cudax/test/stf/allocators/adapter.cu @@ -31,7 +31,11 @@ int main() { graph_ctx ctx(stream, handle); + // The uncached allocator of the context will be using cudaMallocAsync(..., + // stream) to avoid creating memory nodes in the graph (because they are + // costly and caching the graph also keeps memory allocated) auto wrapper = stream_adapter(ctx, stream); + ctx.set_allocator(block_allocator(ctx, wrapper.allocator())); auto A = ctx.logical_data(make_slice(d_ptrA, N), data_place::current_device()); diff --git a/cudax/test/stf/graph/for_each_batched_allocator.cu b/cudax/test/stf/graph/for_each_batched_allocator.cu new file mode 100644 index 00000000000..5a689d9d19f --- /dev/null +++ b/cudax/test/stf/graph/for_each_batched_allocator.cu @@ -0,0 +1,82 @@ +//===----------------------------------------------------------------------===// +// +// Part of CUDASTF in CUDA C++ Core Libraries, +// under the Apache License v2.0 with LLVM Exceptions. +// See https://llvm.org/LICENSE.txt for license information. +// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception +// SPDX-FileCopyrightText: Copyright (c) 2022-2024 NVIDIA CORPORATION & AFFILIATES. +// +//===----------------------------------------------------------------------===// + +/** + * @file + * @brief Ensure the allocator used in algorithms with for_each_batched are deferred to the stream_ctx allocator + */ + +#include + +using namespace cuda::experimental::stf; + +// x(i) += 3*i-1 +template +void func(context_t& ctx, logical_data> lx) +{ + auto tmp = ctx.logical_data(lx.shape()); + // tmp = 3*i-1 + ctx.parallel_for(tmp.shape(), tmp.write())->*[] __device__(size_t i, auto t) { + t(i) = 3.0 * i - 1.0; + }; + + // x += tmp + ctx.parallel_for(lx.shape(), lx.rw(), tmp.read())->*[] __device__(size_t i, auto x, auto t) { + x(i) += t(i); + }; +} + +int main() +{ + nvtx_range r("run"); + + stream_ctx ctx; + + const size_t N = 256 * 1024; + const size_t K = 8; + + size_t BATCH_SIZE = 4; + + logical_data> lX[K]; + + for (size_t i = 0; i < K; i++) + { + lX[i] = ctx.logical_data(N); + + ctx.parallel_for(lX[i].shape(), lX[i].write()).set_symbol("INIT")->*[] __device__(size_t i, auto x) { + x(i) = 2.0 * i + 12.0; + }; + } + + for_each_batched>( + context(ctx), + K, + BATCH_SIZE, + [&](size_t i) { + return lX[i].rw(); + }) + ->*[&](context ctx, size_t, auto lxi) { + // We use a function not to inline extended lambdas within a lambda + func(ctx, lxi); + }; + + for (size_t i = 0; i < K; i++) + { + ctx.host_launch(lX[i].read()).set_symbol("check")->*[](auto x) { + for (size_t ind = 0; ind < N; ind++) + { + double expected = 2.0 * ind + 12.0 + 3.0 * ind - 1.0; + _CCCL_ASSERT(fabs(x(ind) - expected) < 0.01, "invalid result"); + } + }; + } + + ctx.finalize(); +}