Skip to content

Commit

Permalink
Generic client blob (#216)
Browse files Browse the repository at this point in the history
* add unary call to accept io_blob_list and return a future based on io_blob

* Split generic service into header and cpp to use the utils.hpp. Add test case to the sisl::io_blob version of the grpc client future

* make GenericClientResponse move only class and avoid unique ptr

* Add unit test for client response movability

* bump conan version

* remove the linking to sisl in client test

* add error logging when deserialize grpc buf fails

---------

Co-authored-by: Ravi Nagarjun Akella <raakella1@$HOSTNAME>
  • Loading branch information
raakella1 and Ravi Nagarjun Akella authored Feb 9, 2024
1 parent 4bc0ea8 commit 6bf081f
Show file tree
Hide file tree
Showing 12 changed files with 464 additions and 133 deletions.
2 changes: 1 addition & 1 deletion conanfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

class SISLConan(ConanFile):
name = "sisl"
version = "11.1.5"
version = "11.1.6"

homepage = "https://github.com/eBay/sisl"
description = "Library for fast data structures, utilities"
Expand Down
124 changes: 30 additions & 94 deletions include/sisl/grpc/generic_service.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,7 @@ namespace sisl {

class GenericRpcStaticInfo : public RpcStaticInfoBase {
public:
GenericRpcStaticInfo(GrpcServer* server, grpc::AsyncGenericService* service) :
m_server{server}, m_generic_service{service} {}
GenericRpcStaticInfo(GrpcServer* server, grpc::AsyncGenericService* service);

GrpcServer* m_server;
grpc::AsyncGenericService* m_generic_service;
Expand All @@ -43,58 +42,31 @@ using generic_rpc_ctx_ptr = std::unique_ptr< GenericRpcContextBase >;

class GenericRpcData : public RpcDataAbstract, sisl::ObjLifeCounter< GenericRpcData > {
public:
static RpcDataAbstract* make(GenericRpcStaticInfo* rpc_info, size_t queue_idx) {
return new GenericRpcData(rpc_info, queue_idx);
}
static RpcDataAbstract* make(GenericRpcStaticInfo* rpc_info, size_t queue_idx);

RpcDataAbstract* create_new() override { return new GenericRpcData(m_rpc_info, m_queue_idx); }
void set_status(grpc::Status& status) { m_retstatus = status; }
RpcDataAbstract* create_new() override;
void set_status(grpc::Status& status);

~GenericRpcData() override {
if (m_request_blob_allocated) { m_request_blob.buf_free(); }
}
~GenericRpcData() override;

// There is only one generic static rpc data for all rpcs.
size_t get_rpc_idx() const override { return 0; }

const grpc::ByteBuffer& request() const { return m_request; }
sisl::io_blob& request_blob() {
if (m_request_blob.cbytes() == nullptr) {
grpc::Slice slice;
auto status = m_request.TrySingleSlice(&slice);
if (status.ok()) {
m_request_blob.set_bytes(slice.begin());
m_request_blob.set_size(slice.size());
} else if (status.error_code() == grpc::StatusCode::FAILED_PRECONDITION) {
// If the ByteBuffer is not made up of single slice, TrySingleSlice() will fail.
// DumpSingleSlice() should work in those cases but will incur a copy.
if (status = m_request.DumpToSingleSlice(&slice); status.ok()) {
m_request_blob.buf_alloc(slice.size());
m_request_blob_allocated = true;
std::memcpy(voidptr_cast(m_request_blob.bytes()), c_voidptr_cast(slice.begin()),
slice.size());
}
}
}
return m_request_blob;
}

grpc::ByteBuffer& response() { return m_response; }

void enqueue_call_request(::grpc::ServerCompletionQueue& cq) override {
m_rpc_info->m_generic_service->RequestCall(&m_ctx, &m_stream, &cq, &cq,
static_cast< void* >(m_request_received_tag.ref()));
}

void send_response() { m_stream.Write(m_response, static_cast< void* >(m_buf_write_tag.ref())); }

void set_context(generic_rpc_ctx_ptr ctx) { m_rpc_context = std::move(ctx); }
GenericRpcContextBase* get_context() { return m_rpc_context.get(); }

void set_comp_cb(generic_rpc_completed_cb_t const& comp_cb) { m_comp_cb = comp_cb; }

GenericRpcData(GenericRpcStaticInfo* rpc_info, size_t queue_idx) :
RpcDataAbstract{queue_idx}, m_rpc_info{rpc_info}, m_stream(&m_ctx) {}
size_t get_rpc_idx() const override;

const grpc::ByteBuffer& request() const;
sisl::io_blob& request_blob();

grpc::ByteBuffer& response();

void enqueue_call_request(::grpc::ServerCompletionQueue& cq) override;

void send_response();

void set_context(generic_rpc_ctx_ptr ctx);
GenericRpcContextBase* get_context();

void set_comp_cb(generic_rpc_completed_cb_t const& comp_cb);

GenericRpcData(GenericRpcStaticInfo* rpc_info, size_t queue_idx);

private:
GenericRpcStaticInfo* m_rpc_info;
Expand All @@ -111,54 +83,18 @@ class GenericRpcData : public RpcDataAbstract, sisl::ObjLifeCounter< GenericRpcD
generic_rpc_completed_cb_t m_comp_cb{nullptr};

private:
bool do_authorization() {
m_retstatus = RPCHelper::do_authorization(m_rpc_info->m_server, &m_ctx);
return m_retstatus.error_code() == grpc::StatusCode::OK;
}

RpcDataAbstract* on_request_received(bool ok) {
bool in_shutdown = RPCHelper::has_server_shutdown(m_rpc_info->m_server);

if (ok) {
if (!do_authorization()) {
m_stream.Finish(m_retstatus, static_cast< void* >(m_request_completed_tag.ref()));
} else {
m_stream.Read(&m_request, static_cast< void* >(m_buf_read_tag.ref()));
}
}

return in_shutdown ? nullptr : create_new();
}

RpcDataAbstract* on_buf_read(bool) {
auto this_rpc_data = boost::intrusive_ptr< GenericRpcData >{this};
// take a ref before the handler cb is called.
// unref is called in send_response which is handled by us (in case of sync calls)
// or by the handler (for async calls)
ref();
if (RPCHelper::run_generic_handler_cb(m_rpc_info->m_server, m_ctx.method(), this_rpc_data)) { send_response(); }
return nullptr;
}

RpcDataAbstract* on_buf_write(bool) {
m_stream.Finish(m_retstatus, static_cast< void* >(m_request_completed_tag.ref()));
unref();
return nullptr;
}

RpcDataAbstract* on_request_completed(bool) {
auto this_rpc_data = boost::intrusive_ptr< GenericRpcData >{this};
if (m_comp_cb) { m_comp_cb(this_rpc_data); }
return nullptr;
}
bool do_authorization();

RpcDataAbstract* on_request_received(bool ok);
RpcDataAbstract* on_buf_read(bool);
RpcDataAbstract* on_buf_write(bool);
RpcDataAbstract* on_request_completed(bool);

struct RpcTagImpl : public RpcTag {
using callback_type = RpcDataAbstract* (GenericRpcData::*)(bool);
RpcTagImpl(GenericRpcData* rpc, callback_type cb) : RpcTag{rpc}, m_callback{cb} {}
RpcTagImpl(GenericRpcData* rpc, callback_type cb);

RpcDataAbstract* do_process(bool ok) override {
return (static_cast< GenericRpcData* >(m_rpc_data)->*m_callback)(ok);
}
RpcDataAbstract* do_process(bool ok) override;

callback_type m_callback;
};
Expand Down
48 changes: 43 additions & 5 deletions include/sisl/grpc/rpc_client.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
#include <sisl/utility/obj_life_counter.hpp>
#include <sisl/utility/enum.hpp>
#include <sisl/auth_manager/token_client.hpp>
#include <sisl/fds/buffer.hpp>

#include <fmt/format.h>

Expand Down Expand Up @@ -81,8 +82,6 @@ using generic_req_builder_cb_t = req_builder_cb_t< grpc::ByteBuffer >;
using generic_unary_callback_t = unary_callback_t< grpc::ByteBuffer >;
using GenericClientRpcDataCallback = ClientRpcDataCallback< grpc::ByteBuffer, grpc::ByteBuffer >;
using GenericClientRpcDataFuture = ClientRpcDataFuture< grpc::ByteBuffer, grpc::ByteBuffer >;
using generic_result_t = Result< grpc::ByteBuffer >;
using generic_async_result_t = AsyncResult< grpc::ByteBuffer >;

/**
* The specialized 'ClientRpcDataInternal' per gRPC call,
Expand Down Expand Up @@ -164,6 +163,41 @@ class ClientRpcDataFuture : public ClientRpcDataInternal< ReqT, RespT > {
folly::Promise< Result< RespT > > m_promise;
};

class GenericClientResponse {
public:
GenericClientResponse() = default;
GenericClientResponse(grpc::ByteBuffer const& buf);

GenericClientResponse(GenericClientResponse&& other);
GenericClientResponse& operator=(GenericClientResponse&& other);
GenericClientResponse(GenericClientResponse const& other) = delete;
GenericClientResponse& operator=(GenericClientResponse const& other) = delete;
~GenericClientResponse();

io_blob& response_blob();
grpc::ByteBuffer response_buf();

private:
grpc::ByteBuffer m_response_buf;
io_blob m_response_blob;
bool m_response_blob_allocated{false};
};

/**
* futures version of ClientRpcDataInternal
* This class holds the promise end of the grpc response
* that returns a GenericClientResponse. The sisl::io_blob version of the response
* can be accessed via the response_blob() method.
*/
class GenericRpcDataFutureBlob : public ClientRpcDataInternal< grpc::ByteBuffer, grpc::ByteBuffer > {
public:
GenericRpcDataFutureBlob(folly::Promise< Result< GenericClientResponse > >&& promise);
virtual void handle_response([[maybe_unused]] bool ok = true) override;

private:
folly::Promise< Result< GenericClientResponse > > m_promise;
};

template < typename ReqT, typename RespT >
class ClientRpcData : public ClientRpcDataInternal< ReqT, RespT > {
public:
Expand Down Expand Up @@ -418,7 +452,8 @@ class GrpcAsyncClient : public GrpcBaseClient {
m_generic_stub(std::move(stub)), m_worker(worker), m_token_client(token_client) {}

void prepare_and_send_unary_generic(ClientRpcDataInternal< grpc::ByteBuffer, grpc::ByteBuffer >* data,
const grpc::ByteBuffer& request, const std::string& method, uint32_t deadline);
const grpc::ByteBuffer& request, const std::string& method,
uint32_t deadline);

void call_unary(const grpc::ByteBuffer& request, const std::string& method,
const generic_unary_callback_t& callback, uint32_t deadline);
Expand All @@ -427,8 +462,11 @@ class GrpcAsyncClient : public GrpcBaseClient {
const generic_rpc_comp_cb_t& done_cb, uint32_t deadline);

// futures version of call_unary
generic_async_result_t call_unary(const grpc::ByteBuffer& request, const std::string& method,
uint32_t deadline);
AsyncResult< grpc::ByteBuffer > call_unary(const grpc::ByteBuffer& request, const std::string& method,
uint32_t deadline);

AsyncResult< GenericClientResponse > call_unary(const io_blob_list_t& request, const std::string& method,
uint32_t deadline);

std::unique_ptr< grpc::GenericStub > m_generic_stub;
GrpcAsyncClientWorker* m_worker;
Expand Down
5 changes: 3 additions & 2 deletions src/grpc/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,9 @@ include_directories(BEFORE ${CMAKE_CURRENT_BINARY_DIR}/../auth_manager)

add_library(sisl_grpc)
target_sources(sisl_grpc PRIVATE
rpc_server.cpp
rpc_client.cpp
rpc_server.cpp
rpc_client.cpp
generic_service.cpp
)
target_link_libraries(sisl_grpc PUBLIC
sisl_buffer
Expand Down
126 changes: 126 additions & 0 deletions src/grpc/generic_service.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
/*********************************************************************************
* Modifications Copyright 2017-2019 eBay Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed
* under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
* CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
*********************************************************************************/

#include "sisl/grpc/generic_service.hpp"
#include "utils.hpp"

SISL_LOGGING_DECL(grpc_server)

namespace sisl {

GenericRpcStaticInfo::GenericRpcStaticInfo(GrpcServer* server, grpc::AsyncGenericService* service) :
m_server{server}, m_generic_service{service} {}

RpcDataAbstract* GenericRpcData::make(GenericRpcStaticInfo* rpc_info, size_t queue_idx) {
return new GenericRpcData(rpc_info, queue_idx);
}

RpcDataAbstract* GenericRpcData::create_new() { return new GenericRpcData(m_rpc_info, m_queue_idx); }

void GenericRpcData::set_status(grpc::Status& status) { m_retstatus = status; }

GenericRpcData::~GenericRpcData() {
if (m_request_blob_allocated) { m_request_blob.buf_free(); }
}

size_t GenericRpcData::get_rpc_idx() const { return 0; }

const grpc::ByteBuffer& GenericRpcData::request() const { return m_request; }

sisl::io_blob& GenericRpcData::request_blob() {
if (m_request_blob.cbytes() == nullptr) {
if (auto status = try_deserialize_from_byte_buffer(m_request, m_request_blob);
status.error_code() == grpc::StatusCode::FAILED_PRECONDITION) {
if (status = deserialize_from_byte_buffer(m_request, m_request_blob); status.ok()) {
m_request_blob_allocated = true;
} else {
LOGERRORMOD(grpc_server, "Failed to deserialize request: code: {}. msg: {}",
static_cast< int >(status.error_code()), status.error_message());
}
} else if (!status.ok()) {
LOGERRORMOD(grpc_server, "Failed to try deserialize request: code: {}. msg: {}",
static_cast< int >(status.error_code()), status.error_message());
}
}
return m_request_blob;
}

grpc::ByteBuffer& GenericRpcData::response() { return m_response; }

void GenericRpcData::enqueue_call_request(::grpc::ServerCompletionQueue& cq) {
m_rpc_info->m_generic_service->RequestCall(&m_ctx, &m_stream, &cq, &cq,
static_cast< void* >(m_request_received_tag.ref()));
}

void GenericRpcData::send_response() { m_stream.Write(m_response, static_cast< void* >(m_buf_write_tag.ref())); }

void GenericRpcData::set_context(generic_rpc_ctx_ptr ctx) { m_rpc_context = std::move(ctx); }

GenericRpcContextBase* GenericRpcData::get_context() { return m_rpc_context.get(); }

void GenericRpcData::set_comp_cb(generic_rpc_completed_cb_t const& comp_cb) { m_comp_cb = comp_cb; }

GenericRpcData::GenericRpcData(GenericRpcStaticInfo* rpc_info, size_t queue_idx) :
RpcDataAbstract{queue_idx}, m_rpc_info{rpc_info}, m_stream(&m_ctx) {}

bool GenericRpcData::do_authorization() {
m_retstatus = RPCHelper::do_authorization(m_rpc_info->m_server, &m_ctx);
return m_retstatus.error_code() == grpc::StatusCode::OK;
}

RpcDataAbstract* GenericRpcData::on_request_received(bool ok) {
bool in_shutdown = RPCHelper::has_server_shutdown(m_rpc_info->m_server);

if (ok) {
if (!do_authorization()) {
m_stream.Finish(m_retstatus, static_cast< void* >(m_request_completed_tag.ref()));
} else {
m_stream.Read(&m_request, static_cast< void* >(m_buf_read_tag.ref()));
}
}

return in_shutdown ? nullptr : create_new();
}

RpcDataAbstract* GenericRpcData::on_buf_read(bool) {
auto this_rpc_data = boost::intrusive_ptr< GenericRpcData >{this};
// take a ref before the handler cb is called.
// unref is called in send_response which is handled by us (in case of sync calls)
// or by the handler (for async calls)
ref();
if (RPCHelper::run_generic_handler_cb(m_rpc_info->m_server, m_ctx.method(), this_rpc_data)) { send_response(); }
return nullptr;
}

RpcDataAbstract* GenericRpcData::on_buf_write(bool) {
m_stream.Finish(m_retstatus, static_cast< void* >(m_request_completed_tag.ref()));
unref();
return nullptr;
}

RpcDataAbstract* GenericRpcData::on_request_completed(bool) {
auto this_rpc_data = boost::intrusive_ptr< GenericRpcData >{this};
if (m_comp_cb) { m_comp_cb(this_rpc_data); }
return nullptr;
}

using callback_type = RpcDataAbstract* (GenericRpcData::*)(bool);
GenericRpcData::RpcTagImpl::RpcTagImpl(GenericRpcData* rpc, callback_type cb) : RpcTag{rpc}, m_callback{cb} {}

RpcDataAbstract* GenericRpcData::RpcTagImpl::do_process(bool ok) {
return (static_cast< GenericRpcData* >(m_rpc_data)->*m_callback)(ok);
}

} // namespace sisl
Loading

0 comments on commit 6bf081f

Please sign in to comment.