Skip to content

Commit

Permalink
Allow copyable GenericClientResponse structure and enhanced testing f…
Browse files Browse the repository at this point in the history
…or inline/multi slices (#231)
  • Loading branch information
hkadayam authored Mar 22, 2024
1 parent 675770c commit c3b8844
Show file tree
Hide file tree
Showing 4 changed files with 133 additions and 71 deletions.
2 changes: 1 addition & 1 deletion conanfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

class SISLConan(ConanFile):
name = "sisl"
version = "11.1.8"
version = "12.0.1"

homepage = "https://github.com/eBay/sisl"
description = "Library for fast data structures, utilities"
Expand Down
21 changes: 10 additions & 11 deletions include/sisl/grpc/rpc_client.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ using GenericClientRpcDataFuture = ClientRpcDataFuture< grpc::ByteBuffer, grpc::
template < typename ReqT, typename RespT >
class ClientRpcDataInternal : public ClientRpcDataAbstract {
public:
using ResponseReaderPtr = std::unique_ptr<::grpc::ClientAsyncResponseReaderInterface< RespT > >;
using ResponseReaderPtr = std::unique_ptr< ::grpc::ClientAsyncResponseReaderInterface< RespT > >;
using GenericResponseReaderPtr = std::unique_ptr< grpc::GenericClientAsyncResponseReader >;

/* Allow GrpcAsyncClient and its inner classes to use
Expand Down Expand Up @@ -166,21 +166,20 @@ class ClientRpcDataFuture : public ClientRpcDataInternal< ReqT, RespT > {
class GenericClientResponse {
public:
GenericClientResponse() = default;
GenericClientResponse(grpc::ByteBuffer const& buf);
GenericClientResponse(grpc::ByteBuffer const& buf) : m_response_buf{buf} {}

GenericClientResponse(GenericClientResponse&& other);
GenericClientResponse& operator=(GenericClientResponse&& other);
GenericClientResponse(GenericClientResponse const& other) = delete;
GenericClientResponse& operator=(GenericClientResponse const& other) = delete;
~GenericClientResponse();
GenericClientResponse(GenericClientResponse const& other) = default;
GenericClientResponse& operator=(GenericClientResponse const& other) = default;
~GenericClientResponse() = default;

io_blob& response_blob();
grpc::ByteBuffer response_buf();
io_blob response_blob();
grpc::ByteBuffer const& response_buf(bool need_contiguous = true);

private:
grpc::ByteBuffer m_response_buf;
io_blob m_response_blob;
bool m_response_blob_allocated{false};
grpc::Slice m_single_slice;
};

/**
Expand Down Expand Up @@ -228,7 +227,7 @@ class GrpcBaseClient {
const std::string m_target_domain;
const std::string m_ssl_cert;

std::shared_ptr<::grpc::ChannelInterface > m_channel;
std::shared_ptr< ::grpc::ChannelInterface > m_channel;
std::shared_ptr< sisl::GrpcTokenClient > m_token_client;

public:
Expand Down Expand Up @@ -344,7 +343,7 @@ class GrpcAsyncClient : public GrpcBaseClient {

/* unary call helper */
template < typename RespT >
using unary_call_return_t = std::unique_ptr<::grpc::ClientAsyncResponseReaderInterface< RespT > >;
using unary_call_return_t = std::unique_ptr< ::grpc::ClientAsyncResponseReaderInterface< RespT > >;

template < typename ReqT, typename RespT >
using unary_call_t = unary_call_return_t< RespT > (stub_t::*)(::grpc::ClientContext*, const ReqT&,
Expand Down
52 changes: 20 additions & 32 deletions src/grpc/rpc_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -187,48 +187,36 @@ std::unique_ptr< GrpcAsyncClient::GenericAsyncStub > GrpcAsyncClient::make_gener
m_token_client);
}

GenericClientResponse::GenericClientResponse(grpc::ByteBuffer const& buf) : m_response_buf(buf) {}

GenericClientResponse::GenericClientResponse(GenericClientResponse&& other) :
m_response_blob(other.m_response_blob), m_response_blob_allocated(other.m_response_blob_allocated) {
GenericClientResponse::GenericClientResponse(GenericClientResponse&& other) {
m_response_buf.Swap(&(other.m_response_buf));
other.m_response_blob.set_bytes(static_cast< uint8_t* >(nullptr));
other.m_response_blob.set_size(0);
m_single_slice = std::move(other.m_single_slice);
other.m_response_buf.Release();
}

GenericClientResponse& GenericClientResponse::operator=(GenericClientResponse&& other) {
if (m_response_blob_allocated) { m_response_blob.buf_free(); }
m_response_buf.Clear();
m_response_buf.Swap(&(other.m_response_buf));
m_response_blob_allocated = other.m_response_blob_allocated;
other.m_response_blob.set_bytes(static_cast< uint8_t* >(nullptr));
other.m_response_blob.set_size(0);

m_single_slice = std::move(other.m_single_slice);
other.m_response_buf.Release();
return *this;
}

GenericClientResponse::~GenericClientResponse() {
if (m_response_blob_allocated) { m_response_blob.buf_free(); }
}
grpc::ByteBuffer const& GenericClientResponse::response_buf(bool need_contiguous) {
if (!need_contiguous || m_single_slice.size() || !m_response_buf.Valid()) { return m_response_buf; }

grpc::ByteBuffer GenericClientResponse::response_buf() { return m_response_buf; }
auto status = m_response_buf.TrySingleSlice(&m_single_slice);
if (status.ok()) { return m_response_buf; }

io_blob& GenericClientResponse::response_blob() {
if (m_response_blob.cbytes() == nullptr) {
if (auto status = try_deserialize_from_byte_buffer(m_response_buf, m_response_blob);
status.error_code() == grpc::StatusCode::FAILED_PRECONDITION) {
if (status = deserialize_from_byte_buffer(m_response_buf, m_response_blob); status.ok()) {
m_response_blob_allocated = true;
} else {
LOGERRORMOD(grpc_server, "Failed to deserialize response: code: {}. msg: {}",
static_cast< int >(status.error_code()), status.error_message());
}
} else if (!status.ok()) {
LOGERRORMOD(grpc_server, "Failed to try deserialize response: code: {}. msg: {}",
static_cast< int >(status.error_code()), status.error_message());
}
}
return m_response_blob;
status = m_response_buf.DumpToSingleSlice(&m_single_slice);
RELEASE_ASSERT(status.ok(), "Failed to deserialize response: code: {}. msg: {}",
static_cast< int >(status.error_code()), status.error_message());

return m_response_buf;
}

io_blob GenericClientResponse::response_blob() {
response_buf(true /* need_contiguous */);
auto const size = uint32_cast(m_single_slice.size());
return size ? io_blob{m_single_slice.begin(), size, false /* is_aligned */} : io_blob{};
}

GenericRpcDataFutureBlob::GenericRpcDataFutureBlob(folly::Promise< Result< GenericClientResponse > >&& promise) :
Expand Down
129 changes: 102 additions & 27 deletions src/grpc/tests/unit/client_test.cpp
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
#include <random>
#include <gtest/gtest.h>
#include <gmock/gmock.h>
#include <sisl/logging/logging.h>
Expand All @@ -12,7 +13,7 @@ namespace sisltesting {
using namespace sisl;
using namespace ::grpc;

static void SerializeToByteBuffer(ByteBuffer& buffer, std::string const& msg) {
[[maybe_unused]] static void SerializeToByteBuffer(ByteBuffer& buffer, std::string const& msg) {
buffer.Clear();
Slice slice(msg);
ByteBuffer tmp(&slice, 1);
Expand All @@ -30,39 +31,113 @@ static std::string DeserializeFromBuffer(ByteBuffer const& buffer) {
return buf;
}

TEST(GenericClientResponseTest, movability_test) {
std::string msg("Hello");
ByteBuffer buffer;
SerializeToByteBuffer(buffer, msg);
GenericClientResponse resp1(buffer);
auto& b1 = resp1.response_blob();
auto buf1 = resp1.response_buf();
EXPECT_EQ(msg, DeserializeFromBuffer(buf1));
EXPECT_EQ(msg, std::string(reinterpret_cast< const char* >(b1.cbytes()), b1.size()));
static std::random_device g_rd{};
static std::default_random_engine g_re{g_rd()};

static constexpr std::array< const char, 62 > alphanum{
'0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'A', 'B', 'C', 'D', 'E', 'F', 'G', 'H', 'I', 'J', 'K',
'L', 'M', 'N', 'O', 'P', 'Q', 'R', 'S', 'T', 'U', 'V', 'W', 'X', 'Y', 'Z', 'a', 'b', 'c', 'd', 'e', 'f',
'g', 'h', 'i', 'j', 'k', 'l', 'm', 'n', 'o', 'p', 'q', 'r', 's', 't', 'u', 'v', 'w', 'x', 'y', 'z'};

static std::string gen_random_string(size_t len) {
std::string str;
std::uniform_int_distribution< size_t > rand_char{0, alphanum.size() - 1};
for (size_t i{0}; i < len; ++i) {
str += alphanum[rand_char(g_re)];
}
str += '\0';
return str;
}

static std::pair< std::string, grpc::ByteBuffer > create_test_byte_buffer(uint32_t num_slices, uint64_t total_size) {
std::vector< grpc::Slice > slices;
std::string concat_str;

uint64_t size_per_slice = (total_size - 1) / num_slices + 1;
for (uint32_t i = 0; i < num_slices; i++) {
std::string msg = gen_random_string(size_per_slice);
concat_str += msg;
slices.push_back(grpc::Slice(msg));
}
return std::pair{concat_str, grpc::ByteBuffer{slices.data(), slices.size()}};
}

static std::string blob_to_string(io_blob const& b) { return std::string(c_charptr_cast(b.cbytes()), b.size()); }

static void do_test(std::string const& msg, grpc::ByteBuffer& bbuf) {
GenericClientResponse resp1(bbuf);
{
auto const& rbuf1 = resp1.response_buf();
EXPECT_EQ(msg, DeserializeFromBuffer(rbuf1));
EXPECT_EQ(msg, blob_to_string(resp1.response_blob()));
}

// move construction
GenericClientResponse resp2(std::move(resp1));
auto& b2 = resp2.response_blob();
EXPECT_EQ(msg, std::string(b2.bytes(), b2.bytes() + b2.size()));
EXPECT_TRUE(resp2.response_buf().Valid());
EXPECT_EQ(b1.size(), 0);
EXPECT_EQ(b1.bytes(), nullptr);
EXPECT_FALSE(resp1.response_buf().Valid());
EXPECT_EQ(resp1.response_blob().size(), 0);
EXPECT_EQ(resp1.response_blob().bytes(), nullptr);
{
EXPECT_EQ(msg, blob_to_string(resp2.response_blob()));
EXPECT_TRUE(resp2.response_buf().Valid());

EXPECT_FALSE(resp1.response_buf().Valid());
auto blb1 = resp1.response_blob();
EXPECT_EQ(blb1.size(), 0);
EXPECT_EQ(blb1.cbytes(), nullptr);
}

// move assignment
GenericClientResponse resp3 = std::move(resp2);
auto b3 = resp3.response_blob();
EXPECT_EQ(msg, std::string(b3.bytes(), b3.bytes() + b3.size()));
EXPECT_TRUE(resp3.response_buf().Valid());
EXPECT_EQ(b2.size(), 0);
EXPECT_EQ(b2.bytes(), nullptr);
EXPECT_FALSE(resp2.response_buf().Valid());
EXPECT_EQ(resp2.response_blob().size(), 0);
EXPECT_EQ(resp2.response_blob().bytes(), nullptr);
GenericClientResponse resp3;
resp3 = std::move(resp2);
{
EXPECT_EQ(msg, blob_to_string(resp3.response_blob()));
EXPECT_TRUE(resp3.response_buf().Valid());

EXPECT_FALSE(resp2.response_buf().Valid());
auto blb2 = resp2.response_blob();
EXPECT_EQ(blb2.size(), 0);
EXPECT_EQ(blb2.cbytes(), nullptr);
}

// copy construction
{
GenericClientResponse resp4(resp3);
EXPECT_EQ(msg, blob_to_string(resp4.response_blob()));
EXPECT_TRUE(resp4.response_buf().Valid());

EXPECT_TRUE(resp3.response_buf().Valid());
EXPECT_EQ(msg, blob_to_string(resp3.response_blob()));
}

// copy assignment
{
GenericClientResponse resp5;
resp5 = resp3;
EXPECT_EQ(msg, blob_to_string(resp5.response_blob()));
EXPECT_TRUE(resp5.response_buf().Valid());

EXPECT_TRUE(resp3.response_buf().Valid());
EXPECT_EQ(msg, blob_to_string(resp3.response_blob()));
}
}

TEST(GenericClientResponseTest, inline_single_slice_test) {
auto [msg, bbuf] = create_test_byte_buffer(1u, 128);
do_test(msg, bbuf);
}

TEST(GenericClientResponseTest, inline_multi_slice_test) {
auto [msg, bbuf] = create_test_byte_buffer(2u, 128);
do_test(msg, bbuf);
}

TEST(GenericClientResponseTest, refcounted_single_slice_test) {
auto [msg, bbuf] = create_test_byte_buffer(1u, 8192);
do_test(msg, bbuf);
}

TEST(GenericClientResponseTest, refcounted_multi_slice_test) {
auto [msg, bbuf] = create_test_byte_buffer(2u, 10000);
do_test(msg, bbuf);
}
} // namespace sisltesting

int main(int argc, char* argv[]) {
Expand Down

0 comments on commit c3b8844

Please sign in to comment.