Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[STF] Ensure algorithms with nested contexts use allocator adapters #3548

Draft
wants to merge 5 commits into
base: main
Choose a base branch
from
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
91 changes: 75 additions & 16 deletions cudax/include/cuda/experimental/__stf/allocators/adapters.cuh
Original file line number Diff line number Diff line change
@@ -35,7 +35,9 @@ namespace cuda::experimental::stf
* allocations APIs (cudaMallocAsync, cudaFreeAsync)
*
* This can be used as an alternative in CUDA graphs to avoid creating CUDA
* graphs with a large memory footprints.
* graphs with a large memory footprints. Allocations will be done in the same
* stream as the one used to launch the graph, and will be destroyed in that
* stream when the graph has been launched.
*/
class stream_adapter
{
@@ -55,15 +57,30 @@ class stream_adapter
data_place memory_node;
};

/**
* @brief Store the state of the allocator such as the buffers to free
*/
struct adapter_allocator_state
{
adapter_allocator_state(cudaStream_t stream_)
: stream(stream_)
{}

// Resources to release
::std::vector<raw_buffer> to_free;

// stream used to allocate data
cudaStream_t stream;
};

/**
* @brief allocator interface created within the stream_adapter
*/
class adapter_allocator : public block_allocator_interface
{
public:
adapter_allocator(cudaStream_t stream_, stream_adapter* sa_)
: stream(stream_)
, sa(sa_)
adapter_allocator(::std::shared_ptr<adapter_allocator_state> state_)
: state(mv(state_))
{}

// these are events from a graph_ctx
@@ -109,7 +126,7 @@ class stream_adapter
}
};

cuda_safe_call(cudaMallocAsync(&result, s, stream));
cuda_safe_call(cudaMallocAsync(&result, s, state->stream));
}

return result;
@@ -118,8 +135,8 @@ class stream_adapter
void deallocate(
backend_ctx_untyped&, const data_place& memory_node, event_list& /* prereqs */, void* ptr, size_t sz) override
{
// Do not deallocate buffers, this is done later after
sa->to_free.emplace_back(ptr, sz, memory_node);
// Do not deallocate buffers, this is done later when we call clear()
state->to_free.emplace_back(ptr, sz, memory_node);
// Prereqs are unchanged
}

@@ -135,33 +152,71 @@ class stream_adapter
}

private:
cudaStream_t stream;
stream_adapter* sa;
// To keep track of allocated resources
::std::shared_ptr<adapter_allocator_state> state;
};

public:
template <typename context_t>
stream_adapter(context_t& ctx, cudaStream_t stream_ /*, block_allocator_untyped root_allocator_*/)
: stream(stream_) /*, root_allocator(mv(root_allocator_))*/
stream_adapter(context_t& ctx, cudaStream_t stream /*, block_allocator_untyped root_allocator_*/)
: adapter_state(::std::make_shared<adapter_allocator_state>(stream))
, alloc(block_allocator<adapter_allocator>(ctx, adapter_state))
{}

// Delete copy constructor and copy assignment operator
stream_adapter(const stream_adapter&) = delete;
stream_adapter& operator=(const stream_adapter&) = delete;

// This is movable, but we don't need to call clear anymore after moving
stream_adapter(stream_adapter&& other) noexcept
: adapter_state(other.adapter_state)
, alloc(other.alloc)
, cleared_or_moved(other.cleared_or_moved)
{
alloc = block_allocator<adapter_allocator>(ctx, stream, this);
// No need to clear this now that it was moved
other.cleared_or_moved = true;
}

stream_adapter& operator=(stream_adapter&& other) noexcept
{
if (this != &other)
{
adapter_state = mv(other.adapter_state);
alloc = mv(other.alloc);
cleared_or_moved = other.cleared_or_moved;

// Mark the moved-from object as "moved"
other.cleared_or_moved = true;
}
return *this;
}

// Destructor
~stream_adapter()
{
_CCCL_ASSERT(cleared_or_moved, "clear() was not called.");
}

/**
* @brief Free resources allocated by the stream_adapter object
*/
void clear()
{
_CCCL_ASSERT(adapter_state, "Invalid state");
_CCCL_ASSERT(!cleared_or_moved, "clear() was already called, or the object was moved.");

// We avoid changing device around every CUDA API call, so we will only
// change it when necessary, and restore the current device at the end
// of the loop.
const int prev_dev_id = cuda_try<cudaGetDevice>();
int current_dev_id = prev_dev_id;

cudaStream_t stream = adapter_state->stream;

// No need to wait for the stream multiple times
bool stream_was_synchronized = false;

for (auto& b : to_free)
for (auto& b : adapter_state->to_free)
{
if (b.memory_node == data_place::host)
{
@@ -200,7 +255,9 @@ public:
cuda_safe_call(cudaSetDevice(prev_dev_id));
}

to_free.clear();
adapter_state->to_free.clear();

cleared_or_moved = true;
}

/**
@@ -213,11 +270,13 @@ public:
}

private:
cudaStream_t stream;
::std::shared_ptr<adapter_allocator_state> adapter_state;

// Note this is using a PIMPL idiom so it's movable
block_allocator_untyped alloc;

::std::vector<raw_buffer> to_free;
// We need to call clear() before destroying the object, unless it was moved
bool cleared_or_moved = false;
};

} // end namespace cuda::experimental::stf
41 changes: 34 additions & 7 deletions cudax/include/cuda/experimental/stf.cuh
Original file line number Diff line number Diff line change
@@ -1664,22 +1664,42 @@ public:
return runner_impl(ctx, *this, mv(deps)...);
}

auto setup_allocator(graph_ctx& gctx, cudaStream_t stream)
{
// Use a pooled allocator: this avoids calling the underlying "uncached"
// allocator too often by making larger allocations which can be used for
// multiple small allocations
gctx.set_allocator(block_allocator<pooled_allocator>(gctx));

// The uncached allocator allocates the (large) blocks of memory required
// by the allocator. Within CUDA graphs, using memory nodes is expensive,
// and caching a graph with memory nodes may appear as "leaking" memory.
// We thus use the stream_adapter allocator which relies stream-based
// asynchronous allocator API (cudaMallocAsync, cudaFreeAsync)
// The resources reserved by this allocator can be released asynchronously
// after the submission of the CUDA graph.
auto wrapper = stream_adapter(gctx, stream);

gctx.update_uncached_allocator(wrapper.allocator());

return wrapper;
}

/* Execute the algorithm as a CUDA graph and launch this graph in a CUDA
* stream */
template <typename Fun, typename parent_ctx_t, typename... Args>
void run(Fun fun, parent_ctx_t& parent_ctx, cudaStream_t stream, Args... args)
{
auto argsTuple = ::std::make_tuple(args...);
graph_ctx gctx(parent_ctx.async_resources());

// Useful for tools
gctx.set_parent_ctx(parent_ctx);
gctx.get_dot()->set_ctx_symbol("algo: " + symbol);

// This creates an adapter which "redirects" allocations to the CUDA stream API
auto wrapper = stream_adapter(gctx, stream);

gctx.update_uncached_allocator(wrapper.allocator());
// This will setup allocators to avoid created CUDA graph memory nodes, and
// defer the allocations and deallocations to the cudaMallocAsync API
// instead. These resources need to be released later with .clear()
auto adapter = setup_allocator(gctx, stream);

auto current_place = gctx.default_exec_place();

@@ -1692,6 +1712,7 @@ public:
};

// Transform the tuple of instances into a tuple of logical data
auto argsTuple = ::std::make_tuple(args...);
auto logicalArgsTuple = ::std::apply(
[&](auto&&... args) {
return ::std::tuple(logify(::std::forward<decltype(args)>(args))...);
@@ -1735,7 +1756,7 @@ public:
cuda_safe_call(cudaGraphLaunch(*eg, stream));

// Free resources allocated through the adapter
wrapper.clear();
adapter.clear();
}

/* Contrary to `run`, we here have a dynamic set of dependencies for the
@@ -1749,7 +1770,10 @@ public:
gctx.set_parent_ctx(parent_ctx);
gctx.get_dot()->set_ctx_symbol("algo: " + symbol);

gctx.set_allocator(block_allocator<pooled_allocator>(gctx));
// This will setup allocators to avoid created CUDA graph memory nodes, and
// defer the allocations and deallocations to the cudaMallocAsync API
// instead. These resources need to be released later with .clear()
auto adapter = setup_allocator(gctx, stream);

auto current_place = gctx.default_exec_place();

@@ -1787,6 +1811,9 @@ public:
}

cuda_safe_call(cudaGraphLaunch(*eg, stream));

// Free resources allocated through the adapter
adapter.clear();
}

private:
1 change: 1 addition & 0 deletions cudax/test/stf/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -85,6 +85,7 @@ set(stf_test_codegen_sources
freeze/task_fence.cu
graph/epoch.cu
graph/for_each_batched.cu
graph/for_each_batched_allocator.cu
graph/for_each_batched_write.cu
graph/freeze_for_graph.cu
graph/graph_composition.cu
4 changes: 4 additions & 0 deletions cudax/test/stf/allocators/adapter.cu
Original file line number Diff line number Diff line change
@@ -31,7 +31,11 @@ int main()
{
graph_ctx ctx(stream, handle);

// The uncached allocator of the context will be using cudaMallocAsync(...,
// stream) to avoid creating memory nodes in the graph (because they are
// costly and caching the graph also keeps memory allocated)
auto wrapper = stream_adapter(ctx, stream);

ctx.set_allocator(block_allocator<buddy_allocator>(ctx, wrapper.allocator()));

auto A = ctx.logical_data(make_slice(d_ptrA, N), data_place::current_device());
82 changes: 82 additions & 0 deletions cudax/test/stf/graph/for_each_batched_allocator.cu
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
//===----------------------------------------------------------------------===//
//
// Part of CUDASTF in CUDA C++ Core Libraries,
// under the Apache License v2.0 with LLVM Exceptions.
// See https://llvm.org/LICENSE.txt for license information.
// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception
// SPDX-FileCopyrightText: Copyright (c) 2022-2024 NVIDIA CORPORATION & AFFILIATES.
//
//===----------------------------------------------------------------------===//

/**
* @file
* @brief Ensure the allocator used in algorithms with for_each_batched are deferred to the stream_ctx allocator
*/

#include <cuda/experimental/stf.cuh>

using namespace cuda::experimental::stf;

// x(i) += 3*i-1
template <typename context_t>
void func(context_t& ctx, logical_data<slice<double>> lx)
{
auto tmp = ctx.logical_data(lx.shape());
// tmp = 3*i-1
ctx.parallel_for(tmp.shape(), tmp.write())->*[] __device__(size_t i, auto t) {
t(i) = 3.0 * i - 1.0;
};

// x += tmp
ctx.parallel_for(lx.shape(), lx.rw(), tmp.read())->*[] __device__(size_t i, auto x, auto t) {
x(i) += t(i);
};
}

int main()
{
nvtx_range r("run");

stream_ctx ctx;

const size_t N = 256 * 1024;
const size_t K = 8;

size_t BATCH_SIZE = 4;

logical_data<slice<double>> lX[K];

for (size_t i = 0; i < K; i++)
{
lX[i] = ctx.logical_data<double>(N);

ctx.parallel_for(lX[i].shape(), lX[i].write()).set_symbol("INIT")->*[] __device__(size_t i, auto x) {
x(i) = 2.0 * i + 12.0;
};
}

for_each_batched<slice<double>>(
context(ctx),
K,
BATCH_SIZE,
[&](size_t i) {
return lX[i].rw();
})
->*[&](context ctx, size_t, auto lxi) {
// We use a function not to inline extended lambdas within a lambda
func(ctx, lxi);
};

for (size_t i = 0; i < K; i++)
{
ctx.host_launch(lX[i].read()).set_symbol("check")->*[](auto x) {
for (size_t ind = 0; ind < N; ind++)
{
double expected = 2.0 * ind + 12.0 + 3.0 * ind - 1.0;
_CCCL_ASSERT(fabs(x(ind) - expected) < 0.01, "invalid result");
}
};
}

ctx.finalize();
}