Skip to content

Commit

Permalink
sending now uses stack for serialization of small messages/request/re…
Browse files Browse the repository at this point in the history
…sponses, and PMR otherwise #verification #sonar #docs
  • Loading branch information
serges147 committed Dec 2, 2024
1 parent b333779 commit e146b6a
Show file tree
Hide file tree
Showing 9 changed files with 461 additions and 115 deletions.
4 changes: 2 additions & 2 deletions include/libcyphal/config.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,10 @@
#define LIBCYPHAL_CONFIG_HPP_INCLUDED

#ifdef LIBCYPHAL_CONFIG
#include LIBCYPHAL_CONFIG
# include LIBCYPHAL_CONFIG
#else

#include <cstddef>
# include <cstddef>

namespace libcyphal
{
Expand Down
104 changes: 51 additions & 53 deletions include/libcyphal/presentation/client.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#define LIBCYPHAL_PRESENTATION_CLIENT_HPP_INCLUDED

#include "client_impl.hpp"
#include "common_helpers.hpp"
#include "presentation_delegate.hpp"
#include "response_promise.hpp"

Expand All @@ -16,12 +17,9 @@

#include <cetl/cetl.hpp>
#include <cetl/pf17/cetlpf.hpp>
#include <cetl/pf20/cetlpf.hpp>

#include <nunavut/support/serialization.hpp>

#include <array>
#include <cstdint>
#include <type_traits>
#include <utility>

Expand Down Expand Up @@ -149,6 +147,11 @@ class ClientBase // NOSONAR cpp:S4963
shared_client_->retain();
}

cetl::pmr::memory_resource& memory() const noexcept
{
return shared_client_->memory();
}

SharedClient& getSharedClient() const noexcept
{
CETL_DEBUG_ASSERT(shared_client_ != nullptr, "");
Expand Down Expand Up @@ -195,6 +198,9 @@ class Client final : public detail::ClientBase

/// @brief Initiates a strong-typed request to the server, and returns a promise object to handle the response.
///
/// If `BufferSize` is less or equal to `config::presentation::SmallPayloadSize`,
/// the message will be serialized using a stack-allocated buffer; otherwise, PMR allocation will be used.
///
/// Issuing a new request involves the following steps:
/// 1. Serialize the request object to a raw payload buffer, which might fail with `nunavut::support::Error`.
/// 2. Allocation of the next transfer ID not in use, so that request and response can be paired. Depending on
Expand All @@ -208,64 +214,56 @@ class Client final : public detail::ClientBase
/// 4. Sending the raw request payload to the server, which might fail with a transport layer error.
/// If it does fail, then the response promise object will be destroyed, and the user will get the failure.
///
/// @param request_deadline The deadline for the request sending operation. Request will be dropped if not sent
/// before this deadline, which will inevitably timeout the response waiting deadline.
/// @tparam BufferSize The size of the buffer to serialize the request.
/// @param request_deadline The deadline for the request sending operation. The request will be dropped if not sent
/// before this deadline, which will inevitably time out the response waiting deadline.
/// @param request The request object to be serialized and then sent to the server.
/// @param response_deadline The deadline for the response receiving operation. If `nullopt` (or `{}`) then
/// `request_deadline` will be used for both request & response deadlines.
/// @return If request sending has succeeded then result will be a promise object to handle the response,
/// `request_deadline` will be used for both request and response deadlines.
/// @return If request sending has succeeded, then the result will be a promise object to handle the response,
/// which will be filled in the future with a received response. See `ResponsePromise` for details.
/// If request sending has failed then result will be a failure object, which will contain the reason.
/// If request sending has failed, then the result will be a failure object, which will contain the reason.
///
template <std::size_t BufferSize = Request::_traits_::SerializationBufferSizeBytes>
Expected<ResponsePromise<Response>, Failure> request(const TimePoint request_deadline,
const Request& request,
const cetl::optional<TimePoint> response_deadline = {}) const
{
using PayloadFragment = const cetl::span<const cetl::byte>;

// 1. Try to serialize the request to raw payload buffer.
//
// Next nolint b/c we use a buffer to serialize the message, so no need to zero it (and performance better).
// NOLINTNEXTLINE(cppcoreguidelines-pro-type-member-init,hicpp-member-init)
std::array<std::uint8_t, Request::_traits_::SerializationBufferSizeBytes> buffer;
const auto buffer_size = serialize(request, buffer);
if (!buffer_size)
{
return buffer_size.error();
}
// Next nolint & NOSONAR are currently unavoidable.
// TODO: Eliminate `reinterpret_cast` when Nunavut supports `cetl::byte` at its `serialize`.
// NOLINTNEXTLINE(cppcoreguidelines-pro-type-reinterpret-cast)
PayloadFragment fragment{reinterpret_cast<cetl::byte*>(buffer.data()), // NOSONAR cpp:S3630
buffer_size.value()};
const std::array<PayloadFragment, 1> payload{fragment};

// 2. For request (and following response) we need to allocate a transfer ID,
// which will be in use to pair the request with the response.
//
auto& shared_client = getSharedClient();
auto opt_transfer_id = shared_client.nextTransferId();
if (!opt_transfer_id)
{
return TooManyPendingRequestsError{};
}
const auto transfer_id = *opt_transfer_id;

// 3. Create and register a response promise object, which will be used to handle the response.
// Its done specifically before sending the request, so that we will be ready to handle a response
// immediately, even if it happens to be received in context (during) the request sending call.
//
ResponsePromise<Response> response_promise{&shared_client,
transfer_id,
response_deadline.value_or(request_deadline)};
//
const transport::TransferTxMetadata tx_metadata{{transfer_id, getPriority()}, request_deadline};
if (auto failure = shared_client.sendRequestPayload(tx_metadata, payload))
{
return libcyphal::detail::upcastVariant<Failure>(std::move(*failure));
}

return response_promise;
using Result = Expected<ResponsePromise<Response>, Failure>;
constexpr bool IsOnStack = BufferSize <= config::presentation::SmallPayloadSize;

return detail::tryPerformOnSerialized<Request, Result, BufferSize, IsOnStack>( //
request,
memory(),
[this, request_deadline, response_deadline](const auto serialized_fragments) -> Result {
//
// For request (and the following response) we need to allocate a transfer ID,
// which will be in use to pair the request with the response.
//
auto& shared_client = getSharedClient();
auto opt_transfer_id = shared_client.nextTransferId();
if (!opt_transfer_id)
{
return TooManyPendingRequestsError{};
}
const auto transfer_id = *opt_transfer_id;

// Create and register a response promise object, which will be used to handle the response.
// Its done specifically before sending the request, so that we will be ready to handle a response
// immediately, even if it happens to be received in context (during) the request sending call.
//
ResponsePromise<Response> response_promise{&shared_client,
transfer_id,
response_deadline.value_or(request_deadline)};
//
const transport::TransferTxMetadata tx_metadata{{transfer_id, getPriority()}, request_deadline};
if (auto failure = shared_client.sendRequestPayload(tx_metadata, serialized_fragments))
{
return libcyphal::detail::upcastVariant<Failure>(std::move(*failure));
}

return response_promise;
});
}

private:
Expand Down
66 changes: 66 additions & 0 deletions include/libcyphal/presentation/common_helpers.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,14 @@
#include "libcyphal/transport/scattered_buffer.hpp"

#include <cetl/pf17/cetlpf.hpp>
#include <cetl/pf20/cetlpf.hpp>

#include <nunavut/support/serialization.hpp>

#include <array>
#include <cstdint>
#include <memory>
#include <type_traits>

namespace libcyphal
{
Expand Down Expand Up @@ -73,6 +76,69 @@ static cetl::optional<DeserializationFailure> tryDeserializePayload(const transp
return result ? cetl::nullopt : cetl::optional<DeserializationFailure>(result.error());
}

template <typename Message, typename Result, std::size_t BufferSize, bool IsOnStack, typename Action>
static auto tryPerformOnSerialized(const Message& message,
cetl::pmr::memory_resource& memory,
Action&& action) -> std::enable_if_t<IsOnStack, Result>
{
// Not in use b/c we use stack buffer for small messages.
(void) memory;

// Try to serialize the message to raw payload buffer.
//
// Next nolint b/c we use a buffer to serialize the message, so no need to zero it (and performance better).
// NOLINTNEXTLINE(cppcoreguidelines-pro-type-member-init,hicpp-member-init)
std::array<cetl::byte, BufferSize> buffer;
// TODO: Eliminate `reinterpret_cast` when Nunavut supports `cetl::byte` at its `serialize`.
const auto result_size = serialize(message,
// Next nolint & NOSONAR are currently unavoidable.
// NOLINTNEXTLINE(cppcoreguidelines-pro-type-reinterpret-cast)
{reinterpret_cast<std::uint8_t*>(buffer.data()), // NOSONAR cpp:S3630,
BufferSize});
if (!result_size)
{
return result_size.error();
}

const cetl::span<const cetl::byte> data_span{buffer.data(), result_size.value()};
const std::array<const cetl::span<const cetl::byte>, 1> fragments{data_span};

return std::forward<Action>(action)(fragments);
}

template <typename Message, typename Result, std::size_t BufferSize, bool IsOnStack, typename Action>
static auto tryPerformOnSerialized(const Message& message,
cetl::pmr::memory_resource& memory,
Action&& action) -> std::enable_if_t<!IsOnStack, Result>
{
const std::unique_ptr<cetl::byte[], PmrRawBytesDeleter> buffer //
{static_cast<cetl::byte*>(memory.allocate(BufferSize)), // NOSONAR cpp:S5356 cpp:S5357
{BufferSize, &memory}};
if (!buffer)
{
return MemoryError{};
}

// Try to serialize the message to raw payload buffer.
//
// Next nolint b/c we use a buffer to serialize the message, so no need to zero it (and performance better).
// TODO: Eliminate `reinterpret_cast` when Nunavut supports `cetl::byte` at its `serialize`.
const auto result_size = serialize(message,
// Next nolint & NOSONAR are currently unavoidable.
// NOLINTNEXTLINE(cppcoreguidelines-pro-type-reinterpret-cast)
{reinterpret_cast<std::uint8_t*>(buffer.get()), // NOSONAR cpp:S3630,
BufferSize});
if (!result_size)
{
return result_size.error();
}

const cetl::span<const cetl::byte> data_span{buffer.get(), result_size.value()};
const std::array<const cetl::span<const cetl::byte>, 1> fragments{data_span};

return std::forward<Action>(action)(fragments);
}

} // namespace detail
} // namespace presentation
} // namespace libcyphal
Expand Down
59 changes: 29 additions & 30 deletions include/libcyphal/presentation/publisher.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#ifndef LIBCYPHAL_PRESENTATION_PUBLISHER_HPP_INCLUDED
#define LIBCYPHAL_PRESENTATION_PUBLISHER_HPP_INCLUDED

#include "common_helpers.hpp"
#include "publisher_impl.hpp"

#include "libcyphal/transport/errors.hpp"
Expand All @@ -14,12 +15,9 @@

#include <cetl/cetl.hpp>
#include <cetl/pf17/cetlpf.hpp>
#include <cetl/pf20/cetlpf.hpp>

#include <nunavut/support/serialization.hpp>

#include <array>
#include <cstdint>
#include <utility>

namespace libcyphal
Expand Down Expand Up @@ -123,6 +121,11 @@ class PublisherBase // NOSONAR cpp:S4963
impl_->retain();
}

cetl::pmr::memory_resource& memory() const noexcept
{
return impl_->memory();
}

cetl::optional<Failure> publishRawData(const TimePoint deadline,
const transport::PayloadFragments payload_fragments) const
{
Expand All @@ -149,9 +152,9 @@ class PublisherBase // NOSONAR cpp:S4963
/// Otherwise, see below requirements for the `Message` type, as well as consult with
/// Nunavut's generated code (f.e. for the signatures of expected `serialize` function).
///
/// @tparam Message The message type of the publisher. This type has the following requirements:
/// - contains `_traits_::SerializationBufferSizeBytes` constant
/// - has freestanding `serialize` function under its namespace (so that ADL will find it)
/// @tparam Message_ The message type of the publisher. This type has the following requirements:
/// - contains `_traits_::SerializationBufferSizeBytes` constant
/// - has freestanding `serialize` function under its namespace (so that ADL will find it)
///
template <typename Message_>
class Publisher final : public detail::PublisherBase
Expand All @@ -161,7 +164,7 @@ class Publisher final : public detail::PublisherBase
///
using Message = Message_;

/// @brief Defines failure type for a strong-typed publisher operations.
/// @brief Defines a failure type for a strong-typed publisher operations.
///
/// The set of possible failures includes transport layer failures (inherited from the base publisher),
/// as well as serialization-related ones.
Expand All @@ -170,34 +173,30 @@ class Publisher final : public detail::PublisherBase

/// Publishes the message on libcyphal network.
///
/// If `BufferSize` is less or equal to `config::presentation::SmallPayloadSize`,
/// the message will be serialized using a stack-allocated buffer; otherwise, PMR allocation will be used.
///
/// @tparam BufferSize The size of the buffer to serialize the message.
/// @param deadline The latest time to send the message. Will be dropped if exceeded.
/// @param message The message to serialize and then send.
///
template <std::size_t BufferSize = Message::_traits_::SerializationBufferSizeBytes>
cetl::optional<Failure> publish(const TimePoint deadline, const Message& message) const
{
// Try to serialize the message to raw payload buffer.
//
// Next nolint b/c we use a buffer to serialize the message, so no need to zero it (and performance better).
// NOLINTNEXTLINE(cppcoreguidelines-pro-type-member-init,hicpp-member-init)
std::array<std::uint8_t, Message::_traits_::SerializationBufferSizeBytes> buffer;
const auto result_size = serialize(message, buffer);
if (!result_size)
{
return result_size.error();
}

// Next nolint & NOSONAR are currently unavoidable.
// TODO: Eliminate `reinterpret_cast` when Nunavut supports `cetl::byte` at its `serialize`.
// NOLINTNEXTLINE(cppcoreguidelines-pro-type-reinterpret-cast)
const cetl::span<const cetl::byte> data{reinterpret_cast<cetl::byte*>(buffer.data()), // NOSONAR cpp:S3630
result_size.value()};
const std::array<const cetl::span<const cetl::byte>, 1> payload_fragments{data};
if (auto failure = publishRawData(deadline, payload_fragments))
{
return libcyphal::detail::upcastVariant<Failure>(std::move(*failure));
}

return cetl::nullopt;
using Result = cetl::optional<Failure>;
constexpr bool IsOnStack = BufferSize <= config::presentation::SmallPayloadSize;

return detail::tryPerformOnSerialized<Message, Result, BufferSize, IsOnStack>( //
message,
memory(),
[this, deadline](const auto serialized_fragments) -> Result {
//
if (auto failure = publishRawData(deadline, serialized_fragments))
{
return libcyphal::detail::upcastVariant<Failure>(std::move(*failure));
}
return cetl::nullopt;
});
}

private:
Expand Down
6 changes: 5 additions & 1 deletion include/libcyphal/presentation/publisher_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
#include <cetl/pf17/cetlpf.hpp>
#include <cetl/pf20/cetlpf.hpp>

#include <array>
#include <cstdint>
#include <utility>

Expand Down Expand Up @@ -48,6 +47,11 @@ class PublisherImpl final : public cavl::Node<PublisherImpl>, public SharedObjec
CETL_DEBUG_ASSERT(msg_tx_session_ != nullptr, "");
}

CETL_NODISCARD cetl::pmr::memory_resource& memory() const noexcept
{
return delegate_.memory();
}

CETL_NODISCARD std::int32_t compareBySubjectId(const transport::PortId subject_id) const
{
return static_cast<std::int32_t>(subject_id_) - static_cast<std::int32_t>(subject_id);
Expand Down
Loading

0 comments on commit e146b6a

Please sign in to comment.