Skip to content

Commit

Permalink
Fix for issue 416: Impossible to retain transfer ids (#420)
Browse files Browse the repository at this point in the history
- Introduced public `ITransferIdMap` and internal
`detail::ITransferIdStorage` interfaces.
- Presentation publisher and rpc client entities now...
- try initialize (at ctor) their transfer ids with the transfer id map
`getIdFor` method.
- try to store their next transfer ids into the map (using `setIdFor`).
- Extra unit tests and mocks to cover new transfer id map related
business logic.
  • Loading branch information
serges147 authored Jan 31, 2025
1 parent 06d654e commit d17f264
Show file tree
Hide file tree
Showing 17 changed files with 734 additions and 215 deletions.
2 changes: 1 addition & 1 deletion include/libcyphal/application/node/heartbeat_producer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ class HeartbeatProducer final // NOSONAR cpp:S3624
void publishMessage(const TimePoint approx_now)
{
// Publishing of heartbeats makes sense only if the local node ID is known.
if (presentation_.transport().getLocalNodeId() == cetl::nullopt)
if (!presentation_.transport().getLocalNodeId().has_value())
{
return;
}
Expand Down
8 changes: 4 additions & 4 deletions include/libcyphal/presentation/client.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -237,8 +237,8 @@ class Client final : public detail::ClientBase
// For request (and the following response) we need to allocate a transfer ID,
// which will be in use to pair the request with the response.
//
auto& shared_client = getSharedClient();
auto opt_transfer_id = shared_client.nextTransferId();
auto& shared_client = getSharedClient();
const auto opt_transfer_id = shared_client.nextTransferId();
if (!opt_transfer_id)
{
return TooManyPendingRequestsError{};
Expand Down Expand Up @@ -325,8 +325,8 @@ class RawServiceClient final : public detail::ClientBase
// 1. For request (and following response) we need to allocate a transfer ID,
// which will be in use to pair the request with the response.
//
auto& shared_client = getSharedClient();
auto opt_transfer_id = shared_client.nextTransferId();
auto& shared_client = getSharedClient();
const auto opt_transfer_id = shared_client.nextTransferId();
if (!opt_transfer_id)
{
return TooManyPendingRequestsError{};
Expand Down
74 changes: 53 additions & 21 deletions include/libcyphal/presentation/client_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
#include "libcyphal/executor.hpp"
#include "libcyphal/transport/errors.hpp"
#include "libcyphal/transport/svc_sessions.hpp"
#include "libcyphal/transport/transfer_id_generators.hpp"
#include "libcyphal/transport/transfer_id_map.hpp"
#include "libcyphal/transport/types.hpp"
#include "libcyphal/types.hpp"

Expand All @@ -34,7 +34,9 @@ namespace presentation
namespace detail
{

class SharedClient : public common::cavl::Node<SharedClient>, public SharedObject
class SharedClient : public common::cavl::Node<SharedClient>,
public SharedObject,
protected transport::detail::ITransferIdStorage
{
public:
using Node::remove;
Expand Down Expand Up @@ -134,17 +136,24 @@ class SharedClient : public common::cavl::Node<SharedClient>, public SharedObjec
, svc_request_tx_session_{std::move(svc_request_tx_session)}
, svc_response_rx_session_{std::move(svc_response_rx_session)}
, response_rx_params_{svc_response_rx_session_->getParams()}
, next_transfer_id_{0}
, nearest_deadline_{DistantFuture()}
{
CETL_DEBUG_ASSERT(svc_request_tx_session_ != nullptr, "");
CETL_DEBUG_ASSERT(svc_response_rx_session_ != nullptr, "");

if (const auto* const transfer_id_map = delegate.getTransferIdMap())
{
const SessionSpec session_spec{response_rx_params_.service_id, response_rx_params_.server_node_id};
next_transfer_id_ = transfer_id_map->getIdFor(session_spec);
}

// Override the default (2s) timeout value of the response session.
// This is done to allow multiple overlapping responses to be handled properly.
// Otherwise, the responses would be rejected (as "duplicates") if their transfer IDs are in order.
// Real duplicates (f.e. caused by redundant transports) won't cause any issues
// b/c shared RPC client expects/accepts only one response per transfer ID,
// and corresponding promise callback node will be removed after the first response.
// and the corresponding promise callback node will be removed after the first response.
svc_response_rx_session_->setTransferIdTimeout({});

svc_response_rx_session_->setOnReceiveCallback([this](const auto& arg) {
Expand Down Expand Up @@ -199,7 +208,7 @@ class SharedClient : public common::cavl::Node<SharedClient>, public SharedObjec
{
if (timeout_node.isTimeoutLinked())
{
// Remove previous timeout node (if any),
// Remove the previous timeout node (if any),
// and then reinsert the node with updated/given new deadline time.
//
timeout_nodes_by_deadline_.remove(&timeout_node);
Expand Down Expand Up @@ -240,9 +249,27 @@ class SharedClient : public common::cavl::Node<SharedClient>, public SharedObjec

void destroy() noexcept override
{
if (auto* const transfer_id_map = delegate_.getTransferIdMap())
{
const SessionSpec session_spec{response_rx_params_.service_id, response_rx_params_.server_node_id};
transfer_id_map->setIdFor(session_spec, next_transfer_id_);
}

delegate_.forgetSharedClient(*this);
}

// MARK: ITransferIdStorage

transport::TransferId load() const noexcept override
{
return next_transfer_id_;
}

void save(const transport::TransferId transfer_id) noexcept override
{
next_transfer_id_ = transfer_id;
}

protected:
virtual void insertNewCallbackNode(CallbackNode& callback_node)
{
Expand Down Expand Up @@ -273,7 +300,8 @@ class SharedClient : public common::cavl::Node<SharedClient>, public SharedObjec
}

private:
using Schedule = IExecutor::Callback::Schedule;
using Schedule = IExecutor::Callback::Schedule;
using SessionSpec = transport::ITransferIdMap::SessionSpec;

static constexpr TimePoint DistantFuture()
{
Expand Down Expand Up @@ -388,6 +416,7 @@ class SharedClient : public common::cavl::Node<SharedClient>, public SharedObjec
const UniquePtr<transport::IRequestTxSession> svc_request_tx_session_;
const UniquePtr<transport::IResponseRxSession> svc_response_rx_session_;
const transport::ResponseRxParams response_rx_params_;
transport::TransferId next_transfer_id_;
common::cavl::Tree<CallbackNode> cb_nodes_by_transfer_id_;
TimePoint nearest_deadline_;
common::cavl::Tree<TimeoutNode> timeout_nodes_by_deadline_;
Expand All @@ -399,8 +428,8 @@ class SharedClient : public common::cavl::Node<SharedClient>, public SharedObjec

/// @brief Defines a shared client implementation that uses a generic transfer ID generator.
///
template <typename TransferIdGeneratorMixin>
class ClientImpl final : public SharedClient, private TransferIdGeneratorMixin
template <typename TransferIdGenerator>
class ClientImpl final : public SharedClient
{
public:
ClientImpl(IPresentationDelegate& delegate,
Expand All @@ -409,7 +438,7 @@ class ClientImpl final : public SharedClient, private TransferIdGeneratorMixin
UniquePtr<transport::IResponseRxSession> svc_response_rx_session,
const transport::TransferId transfer_id_modulo)
: SharedClient{delegate, executor, std::move(svc_request_tx_session), std::move(svc_response_rx_session)}
, TransferIdGeneratorMixin{transfer_id_modulo}
, transfer_id_generator_{transfer_id_modulo, *this}
{
}

Expand All @@ -424,40 +453,41 @@ class ClientImpl final : public SharedClient, private TransferIdGeneratorMixin
private:
using Base = SharedClient;

// MARK: SharedClient

CETL_NODISCARD cetl::optional<transport::TransferId> nextTransferId() noexcept override
{
return TransferIdGeneratorMixin::nextTransferId();
}

void insertNewCallbackNode(CallbackNode& callback_node) override
{
SharedClient::insertNewCallbackNode(callback_node);
TransferIdGeneratorMixin::retainTransferId(callback_node.getTransferId());
transfer_id_generator_.retainTransferId(callback_node.getTransferId());
}

void removeCallbackNode(CallbackNode& callback_node) override
{
TransferIdGeneratorMixin::releaseTransferId(callback_node.getTransferId());
transfer_id_generator_.releaseTransferId(callback_node.getTransferId());
SharedClient::removeCallbackNode(callback_node);
}

// MARK: SharedClient

CETL_NODISCARD cetl::optional<transport::TransferId> nextTransferId() noexcept override
{
return transfer_id_generator_.nextTransferId();
}

TransferIdGenerator transfer_id_generator_;

}; // ClientImpl<TransferIdGeneratorMixin>

/// @brief Defines a shared client specialization that uses a trivial transfer ID generator.
///
template <>
class ClientImpl<transport::detail::TrivialTransferIdGenerator> final
: public SharedClient,
private transport::detail::TrivialTransferIdGenerator
class ClientImpl<transport::detail::TrivialTransferIdGenerator> final : public SharedClient
{
public:
ClientImpl(IPresentationDelegate& delegate,
IExecutor& executor,
UniquePtr<transport::IRequestTxSession> svc_request_tx_session,
UniquePtr<transport::IResponseRxSession> svc_response_rx_session)
: Base{delegate, executor, std::move(svc_request_tx_session), std::move(svc_response_rx_session)}
, transfer_id_generator_{*this}
{
}

Expand All @@ -476,9 +506,11 @@ class ClientImpl<transport::detail::TrivialTransferIdGenerator> final

CETL_NODISCARD cetl::optional<transport::TransferId> nextTransferId() noexcept override
{
return TrivialTransferIdGenerator::nextTransferId();
return transfer_id_generator_.nextTransferId();
}

transport::detail::TrivialTransferIdGenerator transfer_id_generator_;

}; // ClientImpl<TrivialTransferIdGenerator>

} // namespace detail
Expand Down
Loading

0 comments on commit d17f264

Please sign in to comment.