Skip to content

Commit

Permalink
Add max_receive_msg_size parameters. (#230)
Browse files Browse the repository at this point in the history
max_receive_msg_size default to 4M, which might be too small
for some usecases, allow this to be reconfigurable.

Signed-off-by: Xiaoxi Chen <[email protected]>
  • Loading branch information
xiaoxichen authored Mar 27, 2024
1 parent c3b8844 commit 370c772
Show file tree
Hide file tree
Showing 5 changed files with 61 additions and 25 deletions.
2 changes: 1 addition & 1 deletion conanfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

class SISLConan(ConanFile):
name = "sisl"
version = "12.0.1"
version = "12.1.1"

homepage = "https://github.com/eBay/sisl"
description = "Library for fast data structures, utilities"
Expand Down
6 changes: 4 additions & 2 deletions include/sisl/grpc/rpc_server.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,15 +41,17 @@ class GrpcServer : private boost::noncopyable {
const std::string& ssl_cert);
GrpcServer(const std::string& listen_addr, uint32_t threads, const std::string& ssl_key,
const std::string& ssl_cert, const std::shared_ptr< sisl::GrpcTokenVerifier >& auth_mgr);
GrpcServer(const std::string& listen_addr, uint32_t threads, int max_receive_msg_size, const std::string& ssl_key,
const std::string& ssl_cert, const std::shared_ptr< sisl::GrpcTokenVerifier >& auth_mgr);
virtual ~GrpcServer();

/**
* Create a new GrpcServer instance and initialize it.
*/
static GrpcServer* make(const std::string& listen_addr, uint32_t threads = 1, const std::string& ssl_key = "",
const std::string& ssl_cert = "");
const std::string& ssl_cert = "", int max_receive_msg_size = 0);
static GrpcServer* make(const std::string& listen_addr, const std::shared_ptr< sisl::GrpcTokenVerifier >& auth_mgr,
uint32_t threads = 1, const std::string& ssl_key = "", const std::string& ssl_cert = "");
uint32_t threads = 1, const std::string& ssl_key = "", const std::string& ssl_cert = "", int max_receive_msg_size = 0);

void run(const rpc_thread_start_cb_t& thread_start_cb = nullptr);
void shutdown();
Expand Down
14 changes: 5 additions & 9 deletions src/grpc/rpc_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,21 +33,17 @@ GrpcBaseClient::GrpcBaseClient(const std::string& server_addr,

void GrpcBaseClient::init() {
::grpc::SslCredentialsOptions ssl_opts;
::grpc::ChannelArguments channel_args;
channel_args.SetMaxReceiveMessageSize(-1);
if (!m_ssl_cert.empty()) {
if (load_ssl_cert(m_ssl_cert, ssl_opts.pem_root_certs)) {
if (!m_target_domain.empty()) {
::grpc::ChannelArguments channel_args;
channel_args.SetSslTargetNameOverride(m_target_domain);
m_channel = ::grpc::CreateCustomChannel(m_server_addr, ::grpc::SslCredentials(ssl_opts), channel_args);
} else {
m_channel = ::grpc::CreateChannel(m_server_addr, ::grpc::SslCredentials(ssl_opts));
}

if (!m_target_domain.empty()) { channel_args.SetSslTargetNameOverride(m_target_domain); }
m_channel = ::grpc::CreateCustomChannel(m_server_addr, ::grpc::SslCredentials(ssl_opts), channel_args);
} else {
throw std::runtime_error("Unable to load ssl certification for grpc client");
}
} else {
m_channel = ::grpc::CreateChannel(m_server_addr, ::grpc::InsecureChannelCredentials());
m_channel = ::grpc::CreateCustomChannel(m_server_addr, ::grpc::InsecureChannelCredentials(), channel_args);
}
}

Expand Down
18 changes: 12 additions & 6 deletions src/grpc/rpc_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,18 @@ SISL_LOGGING_DEF(grpc_server)
namespace sisl {
GrpcServer::GrpcServer(const std::string& listen_addr, uint32_t threads, const std::string& ssl_key,
const std::string& ssl_cert) :
GrpcServer::GrpcServer(listen_addr, threads, ssl_key, ssl_cert, nullptr) {}

GrpcServer::GrpcServer(listen_addr, threads, 0, ssl_key, ssl_cert, nullptr) {}
GrpcServer::GrpcServer(const std::string& listen_addr, uint32_t threads, const std::string& ssl_key,
const std::string& ssl_cert, const std::shared_ptr< sisl::GrpcTokenVerifier >& auth_mgr) :
GrpcServer::GrpcServer(listen_addr, threads, 0, ssl_key, ssl_cert, auth_mgr) {}
GrpcServer::GrpcServer(const std::string& listen_addr, uint32_t threads, int max_receive_msg_size,
const std::string& ssl_key, const std::string& ssl_cert,
const std::shared_ptr< sisl::GrpcTokenVerifier >& auth_mgr) :
m_num_threads{threads}, m_auth_mgr{auth_mgr} {
if (listen_addr.empty() || threads == 0) { throw std::invalid_argument("Invalid parameter to start grpc server"); }

if (max_receive_msg_size != 0) { m_builder.SetMaxReceiveMessageSize(max_receive_msg_size); }

if (!ssl_cert.empty() && !ssl_key.empty()) {
std::string key_contents;
std::string cert_contents;
Expand Down Expand Up @@ -76,13 +81,14 @@ GrpcServer::~GrpcServer() {
}

GrpcServer* GrpcServer::make(const std::string& listen_addr, uint32_t threads, const std::string& ssl_key,
const std::string& ssl_cert) {
return GrpcServer::make(listen_addr, nullptr, threads, ssl_key, ssl_cert);
const std::string& ssl_cert, int max_receive_msg_size) {
return GrpcServer::make(listen_addr, nullptr, threads, ssl_key, ssl_cert, max_receive_msg_size);
}

GrpcServer* GrpcServer::make(const std::string& listen_addr, const std::shared_ptr< sisl::GrpcTokenVerifier >& auth_mgr,
uint32_t threads, const std::string& ssl_key, const std::string& ssl_cert) {
return new GrpcServer(listen_addr, threads, ssl_key, ssl_cert, auth_mgr);
uint32_t threads, const std::string& ssl_key, const std::string& ssl_cert,
int max_receive_msg_size) {
return new GrpcServer(listen_addr, threads, max_receive_msg_size, ssl_key, ssl_cert, auth_mgr);
}

void GrpcServer::run(const rpc_thread_start_cb_t& thread_start_cb) {
Expand Down
46 changes: 39 additions & 7 deletions src/grpc/tests/function/echo_async_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include <chrono>
#include <thread>
#include <mutex>
#include <random>

#include <sisl/logging/logging.h>
#include <sisl/options/options.h>
Expand All @@ -32,6 +33,25 @@ using namespace sisl;
using namespace ::grpc_helper_test;
using namespace std::placeholders;

#define MAX_GRPC_RECV_SIZE 64 * 1024 * 1024

static constexpr std::array< const char, 62 > alphanum{
'0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'A', 'B', 'C', 'D', 'E', 'F', 'G', 'H', 'I', 'J', 'K',
'L', 'M', 'N', 'O', 'P', 'Q', 'R', 'S', 'T', 'U', 'V', 'W', 'X', 'Y', 'Z', 'a', 'b', 'c', 'd', 'e', 'f',
'g', 'h', 'i', 'j', 'k', 'l', 'm', 'n', 'o', 'p', 'q', 'r', 's', 't', 'u', 'v', 'w', 'x', 'y', 'z'};

static std::string gen_random_string(size_t len) {
std::string str;
static thread_local std::random_device rd{};
static thread_local std::default_random_engine re{rd()};
std::uniform_int_distribution< size_t > rand_char{0, alphanum.size() - 1};
for (size_t i{0}; i < len; ++i) {
str += alphanum[rand_char(re)];
}
str += '\0';
return str;
}

struct DataMessage {
int m_seqno;
std::string m_buf;
Expand Down Expand Up @@ -97,7 +117,7 @@ static void SerializeToBlob(sisl::io_blob_list_t& buffer, const DataMessage& msg
buffer.emplace_back(buf);
}

static const std::string GENERIC_CLIENT_MESSAGE{"I am a super client!"};
static const std::string GENERIC_CLIENT_MESSAGE{gen_random_string(MAX_GRPC_RECV_SIZE)};
static const std::string GENERIC_METHOD{"SendData"};

class TestClient {
Expand Down Expand Up @@ -236,8 +256,14 @@ class TestClient {
} else {
// divide all numbers not divisible by 2 and 3 into three equal buckets
static uint32_t j = 0u;
static int mess_size[] = {16, 64, 64 * 1024, 16 * 1024, 16 * 1024 * 1024, 64 * 1024 * 1024 - 1024};
static std::random_device rd;
static std::mt19937 gen(rd());
static std::uniform_int_distribution< int > distrib(0, sizeof(mess_size)/sizeof(mess_size[0]) -1);
if ((j++ % 4) == 0) {
DataMessage req(i, GENERIC_CLIENT_MESSAGE);
int size = mess_size[distrib(gen)];
LOGDEBUGMOD(grpc_server, "Testing call_unary with size {}", size);
DataMessage req(i, GENERIC_CLIENT_MESSAGE.substr(0, size));
grpc::ByteBuffer cli_buf;
SerializeToByteBuffer(cli_buf, req);
generic_stub->call_unary(
Expand All @@ -247,15 +273,19 @@ class TestClient {
},
1);
} else if (((j++ % 4) == 1)) {
DataMessage data_msg(i, GENERIC_CLIENT_MESSAGE);
int size = mess_size[distrib(gen)];
LOGDEBUGMOD(grpc_server, "Testing call_rpc with size {}", size);
DataMessage data_msg(i, GENERIC_CLIENT_MESSAGE.substr(0, size));
generic_stub->call_rpc([data_msg](grpc::ByteBuffer& req) { SerializeToByteBuffer(req, data_msg); },
GENERIC_METHOD,
[data_msg, this](GenericClientRpcData& cd) {
validate_generic_reply(data_msg, cd.reply(), cd.status());
},
1);
} else if (((j++ % 4) == 2)) {
DataMessage req(i, GENERIC_CLIENT_MESSAGE);
int size = mess_size[distrib(gen)];
LOGDEBUGMOD(grpc_server, "Testing call_unary with size {}", size);
DataMessage req(i, GENERIC_CLIENT_MESSAGE.substr(0, size));
grpc::ByteBuffer cli_buf;
SerializeToByteBuffer(cli_buf, req);
generic_stub->call_unary(cli_buf, GENERIC_METHOD, 1)
Expand All @@ -266,8 +296,11 @@ class TestClient {
return folly::Unit();
})
.get();

} else {
DataMessage req(i, GENERIC_CLIENT_MESSAGE);
int size = mess_size[distrib(gen)];
LOGDEBUGMOD(grpc_server, "Testing call_unary with size {}", size);
DataMessage req(i, GENERIC_CLIENT_MESSAGE.substr(0, size));
sisl::io_blob_list_t cli_buf;
SerializeToBlob(cli_buf, req);
generic_stub->call_unary(cli_buf, GENERIC_METHOD, 1)
Expand Down Expand Up @@ -372,7 +405,6 @@ class TestServer {
static void set_response(BufT const& req, grpc::ByteBuffer& resp, bool set_buf) {
DataMessage cli_request;
DeserializeFromBuffer(req, cli_request);
RELEASE_ASSERT((cli_request.m_buf == GENERIC_CLIENT_MESSAGE), "Could not parse response buffer");
if (set_buf) { SerializeToByteBuffer(resp, cli_request); }
}

Expand Down Expand Up @@ -417,7 +449,7 @@ class TestServer {

void start(const std::string& server_address) {
LOGINFO("Start echo and ping server on {}...", server_address);
m_grpc_server = GrpcServer::make(server_address, 4, "", "");
m_grpc_server = GrpcServer::make(server_address, 4, "", "", MAX_GRPC_RECV_SIZE);
m_echo_impl = new EchoServiceImpl();
m_echo_impl->register_service(m_grpc_server);

Expand Down

0 comments on commit 370c772

Please sign in to comment.