From cd3aeeb5571ea8dc736920f7038b286d96447d9b Mon Sep 17 00:00:00 2001 From: noa-neria Date: Sun, 27 Oct 2024 16:58:11 +0000 Subject: [PATCH] fix race when shutting down the streamer --- cpp/common/responder/responder.cc | 14 ++++---------- cpp/common/responder/responder.h | 2 ++ cpp/streamer/streamer_test.cc | 31 +++++++++++++++++++++++++++++++ cpp/utils/threadpool/threadpool.h | 8 +++++--- 4 files changed, 42 insertions(+), 13 deletions(-) diff --git a/cpp/common/responder/responder.cc b/cpp/common/responder/responder.cc index e05c932..2ca9682 100644 --- a/cpp/common/responder/responder.cc +++ b/cpp/common/responder/responder.cc @@ -19,15 +19,7 @@ Responder::Responder(unsigned running) : Responder::~Responder() { - try - { - // wait for all the running requests to finish - while (pop().ret != ResponseCode::FinishedError) - {} - } - catch(const std::exception& e) - { - } + LOG(DEBUG) << "Responder shutdown"; } // return -1 if there are no running requests @@ -57,13 +49,15 @@ void Responder::push(Response && response) { const auto guard = std::unique_lock(_mutex); + _successful = _successful && response.ret == common::ResponseCode::Success; + if (_running) { LOG(SPAM) << response << " ; " << _running << " running requests"; _responses.push_back(response); --_running; - if (_running == 0 && _total_bytesize > 0) + if (_running == 0 && _successful && _total_bytesize > 100 * 1024 * 1024) { const auto throughput = bytes_per_second(); std::cout << "Read throughput is " << utils::logging::human_readable_size(throughput) << " per second " << std::endl; diff --git a/cpp/common/responder/responder.h b/cpp/common/responder/responder.h index 35d9d5e..5c910cc 100644 --- a/cpp/common/responder/responder.h +++ b/cpp/common/responder/responder.h @@ -58,6 +58,8 @@ struct Responder std::atomic _total_bytesize; std::chrono::time_point _start_time; + + bool _successful = true; }; }; // namespace runai::llm::streamer::common diff --git a/cpp/streamer/streamer_test.cc b/cpp/streamer/streamer_test.cc index 04664e5..098b728 100644 --- a/cpp/streamer/streamer_test.cc +++ b/cpp/streamer/streamer_test.cc @@ -1,9 +1,12 @@ #include "streamer/streamer.h" +#include + #include #include #include #include +#include #include "common/response_code/response_code.h" @@ -215,4 +218,32 @@ TEST_F(StreamerTest, S3_Library_Not_Found) runai_end(streamer); } +TEST_F(StreamerTest, End_Before_Read) +{ + auto size = utils::random::number(100000000, 1000000000); + const auto data = utils::random::buffer(size); + utils::temp::File file(data); + + const auto expected = utils::Fd::read(file.path); + EXPECT_EQ(expected.size(), size); + + void * streamer; + auto res = runai_start(&streamer); + EXPECT_EQ(res, static_cast(common::ResponseCode::Success)); + + std::vector dst(size); + std::vector sizes; + sizes.push_back(size); + + EXPECT_EQ(runai_request(streamer, file.path.c_str(), 0, size, dst.data(), 1, sizes.data()), static_cast(common::ResponseCode::Success)); + + ::usleep(utils::random::number(20000)); + + const auto start_time = std::chrono::steady_clock::now(); + runai_end(streamer); + const auto time_ = std::chrono::steady_clock::now(); + const auto duration = std::chrono::duration_cast(time_ - start_time); + EXPECT_LT(duration.count(), 1000); +} + }; // namespace runai::llm::streamer diff --git a/cpp/utils/threadpool/threadpool.h b/cpp/utils/threadpool/threadpool.h index f796be8..7b9be31 100644 --- a/cpp/utils/threadpool/threadpool.h +++ b/cpp/utils/threadpool/threadpool.h @@ -21,7 +21,7 @@ struct Deque { const auto lock = std::unique_lock(_mutex); - ASSERT(!_stopped) << "Pusing a message to an already stopped queue"; + ASSERT(!_stopped) << "Pushing a message to an already stopped queue"; _deque.push_back(std::move(message)); } @@ -52,8 +52,10 @@ struct Deque { const auto lock = std::unique_lock(_mutex); - CHECK(_deque.size() == 0) << "Stopping a `Deque` with unresolved messages"; - + if (_deque.size() != 0) + { + LOG(DEBUG) << "Stopping a `Deque` with unresolved messages"; + } _stopped = true; }