Skip to content

Commit

Permalink
fix race when shutting down the streamer
Browse files Browse the repository at this point in the history
  • Loading branch information
noa-neria committed Oct 27, 2024
1 parent 6f82839 commit cd3aeeb
Show file tree
Hide file tree
Showing 4 changed files with 42 additions and 13 deletions.
14 changes: 4 additions & 10 deletions cpp/common/responder/responder.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,7 @@ Responder::Responder(unsigned running) :

Responder::~Responder()
{
try
{
// wait for all the running requests to finish
while (pop().ret != ResponseCode::FinishedError)
{}
}
catch(const std::exception& e)
{
}
LOG(DEBUG) << "Responder shutdown";
}

// return -1 if there are no running requests
Expand Down Expand Up @@ -57,13 +49,15 @@ void Responder::push(Response && response)
{
const auto guard = std::unique_lock<std::mutex>(_mutex);

_successful = _successful && response.ret == common::ResponseCode::Success;

if (_running)
{
LOG(SPAM) << response << " ; " << _running << " running requests";
_responses.push_back(response);
--_running;

if (_running == 0 && _total_bytesize > 0)
if (_running == 0 && _successful && _total_bytesize > 100 * 1024 * 1024)
{
const auto throughput = bytes_per_second();
std::cout << "Read throughput is " << utils::logging::human_readable_size(throughput) << " per second " << std::endl;
Expand Down
2 changes: 2 additions & 0 deletions cpp/common/responder/responder.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ struct Responder

std::atomic<size_t> _total_bytesize;
std::chrono::time_point<std::chrono::steady_clock> _start_time;

bool _successful = true;
};

}; // namespace runai::llm::streamer::common
31 changes: 31 additions & 0 deletions cpp/streamer/streamer_test.cc
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
#include "streamer/streamer.h"

#include <unistd.h>

#include <gtest/gtest.h>
#include <map>
#include <string>
#include <vector>
#include <chrono>

#include "common/response_code/response_code.h"

Expand Down Expand Up @@ -215,4 +218,32 @@ TEST_F(StreamerTest, S3_Library_Not_Found)
runai_end(streamer);
}

TEST_F(StreamerTest, End_Before_Read)
{
auto size = utils::random::number(100000000, 1000000000);
const auto data = utils::random::buffer(size);
utils::temp::File file(data);

const auto expected = utils::Fd::read(file.path);
EXPECT_EQ(expected.size(), size);

void * streamer;
auto res = runai_start(&streamer);
EXPECT_EQ(res, static_cast<int>(common::ResponseCode::Success));

std::vector<unsigned char> dst(size);
std::vector<size_t> sizes;
sizes.push_back(size);

EXPECT_EQ(runai_request(streamer, file.path.c_str(), 0, size, dst.data(), 1, sizes.data()), static_cast<int>(common::ResponseCode::Success));

::usleep(utils::random::number(20000));

const auto start_time = std::chrono::steady_clock::now();
runai_end(streamer);
const auto time_ = std::chrono::steady_clock::now();
const auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(time_ - start_time);
EXPECT_LT(duration.count(), 1000);
}

}; // namespace runai::llm::streamer
8 changes: 5 additions & 3 deletions cpp/utils/threadpool/threadpool.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ struct Deque
{
const auto lock = std::unique_lock<std::mutex>(_mutex);

ASSERT(!_stopped) << "Pusing a message to an already stopped queue";
ASSERT(!_stopped) << "Pushing a message to an already stopped queue";

_deque.push_back(std::move(message));
}
Expand Down Expand Up @@ -52,8 +52,10 @@ struct Deque
{
const auto lock = std::unique_lock<std::mutex>(_mutex);

CHECK(_deque.size() == 0) << "Stopping a `Deque` with unresolved messages";

if (_deque.size() != 0)
{
LOG(DEBUG) << "Stopping a `Deque` with unresolved messages";
}
_stopped = true;
}

Expand Down

0 comments on commit cd3aeeb

Please sign in to comment.