Skip to content

Commit

Permalink
Make it be able to share cuda pool on different devices to the stub w…
Browse files Browse the repository at this point in the history
…hen needed
  • Loading branch information
krishung5 committed Sep 20, 2023
1 parent 088030a commit 1aca3bf
Show file tree
Hide file tree
Showing 14 changed files with 117 additions and 168 deletions.
3 changes: 0 additions & 3 deletions src/infer_request.cc
Original file line number Diff line number Diff line change
Expand Up @@ -589,9 +589,6 @@ InferRequest::Exec(const bool is_decoupled)
uint64_t memory_release_id = output_tensor->Memory()->MemoryReleaseId();
output_tensor->Memory()->SetMemoryReleaseCallback(
[&memory_manager_message_queue, memory_release_id, &shm_pool]() {
// memory_manager_message_queue->Push(memory_release_id);
// std::cerr << "=== STUB: Pushed memory release id: "
// << memory_release_id << std::endl;
std::unique_ptr<IPCMessage> ipc_message =
IPCMessage::Create(shm_pool, true /* inline_response */);
AllocatedSharedMemory<MemoryReleaseMessage>
Expand Down
60 changes: 0 additions & 60 deletions src/memory_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -33,33 +33,6 @@ namespace triton { namespace backend { namespace python {


#ifdef TRITON_ENABLE_GPU
GPUMemoryRecord::GPUMemoryRecord(void* ptr)
{
ptr_ = ptr;
release_callback_ = [](void* ptr) {
cudaError_t err = cudaFree(ptr);
if (err != cudaSuccess) {
LOG_MESSAGE(
TRITONSERVER_LOG_ERROR,
(std::string("Failed to free the allocated cuda memory. error: ") +
cudaGetErrorString(err))
.c_str());
}
};
}

void*
GPUMemoryRecord::MemoryId()
{
return ptr_;
}

const std::function<void(void*)>&
GPUMemoryRecord::ReleaseCallback()
{
return release_callback_;
}

BackendMemoryRecord::BackendMemoryRecord(
std::unique_ptr<BackendMemory> backend_memory)
: backend_memory_(std::move(backend_memory))
Expand Down Expand Up @@ -104,35 +77,6 @@ MemoryManager::AddRecord(std::unique_ptr<MemoryRecord>&& memory_record)
return memory_record_id;
}

// void
// MemoryManager::QueueMonitorThread()
// {
// while (true) {
// intptr_t memory = message_queue_->Pop();
// if (memory == 0) {
// return;
// }

// {
// std::lock_guard<std::mutex> lock{mu_};
// auto it = records_.find(memory);
// if (it == records_.end()) {
// LOG_MESSAGE(
// TRITONSERVER_LOG_ERROR,
// "Unexpected memory index received for deallocation.");
// continue;
// }

// // Call the release callback.
// auto temp = it->second->MemoryId();
// it->second->ReleaseCallback()(it->second->MemoryId());
// records_.erase(it);
// std::cerr << "=== MemoryManager::QueueMonitorThread() erase " <<
// reinterpret_cast<intptr_t>(temp) << std::endl;
// }
// }
// }

void
MemoryManager::QueueMonitorThread()
{
Expand Down Expand Up @@ -162,18 +106,14 @@ MemoryManager::QueueMonitorThread()
}

// Call the release callback.
auto temp = it->second->MemoryId();
it->second->ReleaseCallback()(it->second->MemoryId());
it->second.reset();
records_.erase(it);
std::cerr << "=== MemoryManager::QueueMonitorThread() erase "
<< reinterpret_cast<intptr_t>(temp) << std::endl;
{
bi::scoped_lock<bi::interprocess_mutex> lock{
*(ipc_message->ResponseMutex())};
memory_release_message_ptr->waiting_on_stub = true;
ipc_message->ResponseCondition()->notify_all();
std::cerr << "=== after notify_all() " << std::endl;
}
}
}
Expand Down
17 changes: 1 addition & 16 deletions src/memory_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,27 +52,12 @@ class MemoryRecord {
};

#ifdef TRITON_ENABLE_GPU
class GPUMemoryRecord : public MemoryRecord {
public:
GPUMemoryRecord(void* ptr);
const std::function<void(void*)>& ReleaseCallback() override;
void* MemoryId() override;

private:
void* ptr_;
std::function<void(void*)> release_callback_;
};

class BackendMemoryRecord : public MemoryRecord {
public:
BackendMemoryRecord(std::unique_ptr<BackendMemory> backend_memory);
const std::function<void(void*)>& ReleaseCallback() override;
void* MemoryId() override;
~BackendMemoryRecord()
{
backend_memory_.reset();
std::cerr << "=== BackendMemoryRecord destructor called ===\n";
}
~BackendMemoryRecord() { backend_memory_.reset(); }

private:
std::unique_ptr<BackendMemory> backend_memory_;
Expand Down
22 changes: 14 additions & 8 deletions src/pb_memory.cc
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,8 @@ PbMemory::Create(
pb_memory->original_buffer_ = data;
pb_memory->data_ptr_ =
(reinterpret_cast<char*>(
shm_pool->GetCUDAMemoryPoolManager()->CUDAPoolAddress()) +
shm_pool->GetCUDAMemoryPoolManager()->CUDAPoolAddress(
pb_memory->memory_shm_ptr_->memory_type_id)) +
pb_memory->memory_shm_ptr_->cuda_pool_offset);
}
#endif
Expand Down Expand Up @@ -129,7 +130,8 @@ PbMemory::Create(
pb_memory->original_buffer_ = data;
pb_memory->data_ptr_ =
(reinterpret_cast<char*>(
shm_pool->GetCUDAMemoryPoolManager()->CUDAPoolAddress()) +
shm_pool->GetCUDAMemoryPoolManager()->CUDAPoolAddress(
pb_memory->memory_shm_ptr_->memory_type_id)) +
pb_memory->memory_shm_ptr_->cuda_pool_offset);
}
#endif
Expand Down Expand Up @@ -231,19 +233,20 @@ PbMemory::FillShmData(
reinterpret_cast<cudaIpcMemHandle_t*>(memory_data_shm), data));
}
#ifndef TRITON_PB_STUB
if (cuda_pool->UseCudaSharedPool()) {
if (cuda_pool->UseCudaSharedPool(memory_type_id)) {
// Check if the data is already in the pool by checking the base
// address.
CUDAHandler& cuda_api = CUDAHandler::getInstance();
CUdeviceptr cuda_pool_address = 0;
cuda_api.PointerGetAttribute(
&cuda_pool_address, CU_POINTER_ATTRIBUTE_RANGE_START_ADDR,
reinterpret_cast<CUdeviceptr>(data));
if (cuda_pool->CUDAPoolAddress() ==
if (cuda_pool->CUDAPoolAddress(memory_type_id) ==
reinterpret_cast<void*>(cuda_pool_address)) {
use_cuda_shared_pool = true;
memory_shm_ptr->cuda_pool_offset =
data - reinterpret_cast<char*>(cuda_pool->CUDAPoolAddress());
data - reinterpret_cast<char*>(
cuda_pool->CUDAPoolAddress(memory_type_id));
} else {
try {
THROW_IF_TRITON_ERROR(BackendMemory::Create(
Expand Down Expand Up @@ -281,7 +284,8 @@ PbMemory::FillShmData(
memory_shm_ptr->cuda_pool_offset =
(reinterpret_cast<BackendMemory*>(*backend_memory))
->MemoryPtr() -
reinterpret_cast<char*>(cuda_pool->CUDAPoolAddress());
reinterpret_cast<char*>(
cuda_pool->CUDAPoolAddress(memory_type_id));
}
catch (const PythonBackendException& pb_exception) {
LOG_MESSAGE(
Expand Down Expand Up @@ -328,7 +332,8 @@ PbMemory::LoadFromSharedMemory(
// data pointer using the offset.
data_ptr =
(reinterpret_cast<char*>(
shm_pool->GetCUDAMemoryPoolManager()->CUDAPoolAddress()) +
shm_pool->GetCUDAMemoryPoolManager()->CUDAPoolAddress(
memory_shm_ptr->memory_type_id)) +
memory_shm_ptr->cuda_pool_offset);
#endif // TRITON_PB_STUB
} else {
Expand Down Expand Up @@ -379,7 +384,8 @@ PbMemory::LoadFromSharedMemory(
// data pointer using the offset.
data_ptr =
(reinterpret_cast<char*>(
shm_pool->GetCUDAMemoryPoolManager()->CUDAPoolAddress()) +
shm_pool->GetCUDAMemoryPoolManager()->CUDAPoolAddress(
memory_shm_ptr->memory_type_id)) +
memory_shm_ptr->cuda_pool_offset);
#endif // TRITON_PB_STUB
} else {
Expand Down
28 changes: 15 additions & 13 deletions src/pb_stub.cc
Original file line number Diff line number Diff line change
Expand Up @@ -516,8 +516,6 @@ Stub::Initialize(bi::managed_external_buffer::handle_t map_handle)
model_config_params[pair.first.c_str()] = pair.second;
}

device_id_ = std::stoi(map["model_instance_device_id"]);

LaunchStubToParentQueueMonitor();
LaunchParentToStubQueueMonitor();

Expand Down Expand Up @@ -874,16 +872,18 @@ Stub::SendIPCUtilsMessage(std::unique_ptr<IPCMessage>& ipc_message)
Stub::~Stub()
{
#ifdef TRITON_ENABLE_GPU
if (shm_pool_->GetCUDAMemoryPoolManager()->CUDAPoolAddress() != nullptr) {
try {
CUDAHandler& cuda_api = CUDAHandler::getInstance();
cuda_api.CloseCudaHandle(
device_id_, shm_pool_->GetCUDAMemoryPoolManager()->CUDAPoolAddress());
}
catch (const PythonBackendException& pb_exception) {
std::cerr << "Error when closing CUDA handle: " << pb_exception.what();
try {
CUDAHandler& cuda_api = CUDAHandler::getInstance();
for (auto& m :
shm_pool_->GetCUDAMemoryPoolManager()->CUDAPoolAddressMap()) {
if (m.second != nullptr) {
cuda_api.CloseCudaHandle(m.first, m.second);
}
}
}
catch (const PythonBackendException& pb_exception) {
std::cerr << "Error when closing CUDA handle: " << pb_exception.what();
}
#endif

{
Expand Down Expand Up @@ -1190,14 +1190,16 @@ Stub::GetCUDAMemoryPoolAddress(std::unique_ptr<IPCMessage>& ipc_message)
CUDAHandler& cuda_api = CUDAHandler::getInstance();
void* cuda_pool_address;
cuda_api.OpenCudaHandle(
device_id_, &cuda_pool_message_ptr->cuda_handle, &cuda_pool_address);
cuda_pool_message_ptr->device_id, &cuda_pool_message_ptr->cuda_handle,
&cuda_pool_address);
shm_pool_->GetCUDAMemoryPoolManager()->SetCUDAPoolAddress(
cuda_pool_address);
cuda_pool_message_ptr->device_id, cuda_pool_address);
}
catch (const PythonBackendException& pb_exception) {
has_exception = true;
error_string = pb_exception.what();
shm_pool_->GetCUDAMemoryPoolManager()->SetCUDAPoolAddress(nullptr);
shm_pool_->GetCUDAMemoryPoolManager()->SetCUDAPoolAddress(
cuda_pool_message_ptr->device_id, nullptr);
}

if (has_exception) {
Expand Down
5 changes: 1 addition & 4 deletions src/pb_stub.h
Original file line number Diff line number Diff line change
Expand Up @@ -200,9 +200,7 @@ struct UtilsMessagePayload {

class Stub {
public:
Stub()
: device_id_(0), stub_to_parent_thread_(false),
parent_to_stub_thread_(false){};
Stub() : stub_to_parent_thread_(false), parent_to_stub_thread_(false){};
static std::unique_ptr<Stub>& GetOrCreateInstance();

/// Instantiate a new Python backend Stub.
Expand Down Expand Up @@ -347,7 +345,6 @@ class Stub {
void GetCUDAMemoryPoolAddress(std::unique_ptr<IPCMessage>& ipc_message);

private:
int32_t device_id_;
bi::interprocess_mutex* stub_mutex_;
bi::interprocess_condition* stub_cond_;
bi::interprocess_mutex* parent_mutex_;
Expand Down
1 change: 1 addition & 0 deletions src/pb_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,7 @@ struct MemoryReleaseMessage {
#ifdef TRITON_ENABLE_GPU
struct CUDAMemPoolMessage : SendMessageBase {
cudaIpcMemHandle_t cuda_handle;
int32_t device_id;
bi::managed_external_buffer::handle_t error;
bool has_error;
bool is_error_set;
Expand Down
29 changes: 7 additions & 22 deletions src/python_be.cc
Original file line number Diff line number Diff line change
Expand Up @@ -522,7 +522,7 @@ ModelInstanceState::GetInputTensor(
} else {
#ifdef TRITON_ENABLE_GPU
// Attempt to use the cuda shared memory pool for GPU tensor.
ShareCUDAMemoryPool();
ShareCUDAMemoryPool(src_memory_type_id);

// Retrieving GPU input tensors
const void* buffer = nullptr;
Expand Down Expand Up @@ -698,7 +698,7 @@ ModelInstanceState::ExecuteBLSRequest(
if (!input_tensor->IsCPU()) {
#ifdef TRITON_ENABLE_GPU
// Attempt to use the cuda shared memory pool for GPU tensor.
ShareCUDAMemoryPool();
ShareCUDAMemoryPool(input_tensor->MemoryTypeId());
BackendMemory* backend_memory;
std::unique_ptr<BackendMemory> lbackend_memory;
has_gpu_tensor = true;
Expand Down Expand Up @@ -1169,8 +1169,7 @@ ModelInstanceState::ResponseSendDecoupled(
for (auto& output_tensor : infer_response->OutputTensors()) {
if ((output_tensor->MemoryType() == TRITONSERVER_MEMORY_GPU)) {
// Attempt to use the cuda shared memory pool for GPU tensor.
ShareCUDAMemoryPool();
break;
ShareCUDAMemoryPool(output_tensor->MemoryTypeId());
}
}
#endif // TRITON_ENABLE_GPU
Expand Down Expand Up @@ -1552,8 +1551,7 @@ ModelInstanceState::ProcessRequests(
for (auto& output_tensor : infer_response->OutputTensors()) {
if ((output_tensor->MemoryType() == TRITONSERVER_MEMORY_GPU)) {
// Attempt to use the cuda shared memory pool for GPU tensor.
ShareCUDAMemoryPool();
break;
ShareCUDAMemoryPool(output_tensor->MemoryTypeId());
}
}
#endif // TRITON_ENABLE_GPU
Expand Down Expand Up @@ -1683,8 +1681,7 @@ ModelInstanceState::PrepareResponseHandle(
for (auto& output_tensor : (*infer_response)->OutputTensors()) {
if (!output_tensor->IsCPU()) {
// Attempt to use the cuda shared memory pool for GPU tensor.
ShareCUDAMemoryPool();
break;
ShareCUDAMemoryPool(output_tensor->MemoryTypeId());
}
}
#endif // TRITON_ENABLE_GPU
Expand All @@ -1694,18 +1691,6 @@ ModelInstanceState::PrepareResponseHandle(
if (!output_tensor->IsCPU()) {
#ifdef TRITON_ENABLE_GPU
std::unique_ptr<MemoryRecord> memory_record;
// if (output_tensor->Memory()->UseCUDASharedPool()) {
// // Need to transfer the ownership of the BackendMemory to the
// // MemoryManager so that the lifetime of the BackendMemory is
// managed. memory_record = std::make_unique<BackendMemoryRecord>(
// output_tensor->Memory()->GetBackendMemory());
// } else {
// // For GPU tensors we need to store the memory release id in
// // memory manager.
// memory_record = std::make_unique<GPUMemoryRecord>(
// output_tensor->Memory()->DataPtr());
// }

// Need to transfer the ownership of the BackendMemory to the
// MemoryManager so that the lifetime of the BackendMemory is managed.
memory_record = std::make_unique<BackendMemoryRecord>(
Expand Down Expand Up @@ -1769,11 +1754,11 @@ ModelInstanceState::SendBLSDecoupledResponse(
}

void
ModelInstanceState::ShareCUDAMemoryPool()
ModelInstanceState::ShareCUDAMemoryPool(const int32_t device_id)
{
#ifdef TRITON_ENABLE_GPU
try {
Stub()->ShareCUDAMemoryPool(Model()->TritonMemoryManager());
Stub()->ShareCUDAMemoryPool(Model()->TritonMemoryManager(), device_id);
}
catch (const PythonBackendException& ex) {
LOG_MESSAGE(
Expand Down
2 changes: 1 addition & 1 deletion src/python_be.h
Original file line number Diff line number Diff line change
Expand Up @@ -413,6 +413,6 @@ class ModelInstanceState : public BackendModelInstance {
void ProcessModelControlRequest(const std::unique_ptr<IPCMessage>& message);

// Attempt to share CUDA memory pool with the stub process
void ShareCUDAMemoryPool();
void ShareCUDAMemoryPool(const int32_t device_id);
};
}}} // namespace triton::backend::python
2 changes: 1 addition & 1 deletion src/request_executor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,7 @@ ResponseAlloc(
lbackend_memory.reset(backend_memory);

std::unique_ptr<PbMemory> pb_memory = PbMemory::Create(
shm_pool, std::move(lbackend_memory), false /* copy_gpu */);
shm_pool, std::move(lbackend_memory), true /* copy_gpu */);
*buffer = pb_memory->DataPtr();
*buffer_userp = reinterpret_cast<void*>(pb_memory.get());
pb_memory.release();
Expand Down
Loading

0 comments on commit 1aca3bf

Please sign in to comment.