Skip to content

Commit

Permalink
Single copy TX pipeline (#405)
Browse files Browse the repository at this point in the history
- Introduced `libcyphal::transport::MediaPayload` - in use to pass
payload data between the transport layer and its media.
- Use lates latest Canard v4 api
  • Loading branch information
serges147 authored Nov 29, 2024
1 parent edded3f commit 5856066
Show file tree
Hide file tree
Showing 14 changed files with 422 additions and 59 deletions.
11 changes: 9 additions & 2 deletions docs/examples/platform/linux/can/can_media.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include <libcyphal/executor.hpp>
#include <libcyphal/transport/can/media.hpp>
#include <libcyphal/transport/errors.hpp>
#include <libcyphal/transport/media_payload.hpp>
#include <libcyphal/types.hpp>

#include <algorithm>
Expand Down Expand Up @@ -219,16 +220,22 @@ class CanMedia final : public libcyphal::transport::can::IMedia

PushResult::Type push(const libcyphal::TimePoint /* deadline */,
const libcyphal::transport::can::CanId can_id,
const cetl::span<const cetl::byte> payload) noexcept override
libcyphal::transport::MediaPayload& payload) noexcept override
{
const CanardFrame canard_frame{can_id, {payload.size(), payload.data()}};
const CanardFrame canard_frame{can_id, {payload.getSpan().size(), payload.getSpan().data()}};
const std::int16_t result = ::socketcanPush(socket_can_tx_fd_, &canard_frame, 0);
if (result < 0)
{
return libcyphal::transport::PlatformError{posix::PosixPlatformError{-result}};
}

const bool is_accepted = result > 0;
if (is_accepted)
{
// Payload is not needed anymore, so return memory asap.
payload.reset();
}

return PushResult::Success{is_accepted};
}

Expand Down
1 change: 1 addition & 0 deletions docs/examples/platform/linux/can/socketcan.c
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
# define _GNU_SOURCE
#endif

#include "canard.h"
#include "socketcan.h"

#ifdef __linux__
Expand Down
58 changes: 36 additions & 22 deletions include/libcyphal/transport/can/can_transport_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,12 @@ class TransportImpl final : private TransportDelegate, public ICanTransport //
{
using LizardHelpers = libcyphal::transport::detail::LizardHelpers;

return LizardHelpers::makeMemoryResource<CanardMemoryResource>(media_interface.getTxMemoryResource());
// TX memory resource is used for raw bytes block allocations only.
// So it has no alignment requirements.
constexpr std::size_t Alignment = 1;

return LizardHelpers::makeMemoryResource<CanardMemoryResource, Alignment>(
media_interface.getTxMemoryResource());
}

const std::uint8_t index_;
Expand Down Expand Up @@ -182,7 +187,7 @@ class TransportImpl final : private TransportDelegate, public ICanTransport //

for (Media& media : media_array_)
{
flushCanardTxQueue(media.canard_tx_queue());
flushCanardTxQueue(media.canard_tx_queue(), canard_instance());
}

CETL_DEBUG_ASSERT(total_msg_rx_ports_ == 0, //
Expand Down Expand Up @@ -514,16 +519,12 @@ class TransportImpl final : private TransportDelegate, public ICanTransport //
return media_array;
}

static void flushCanardTxQueue(CanardTxQueue& canard_tx_queue)
static void flushCanardTxQueue(CanardTxQueue& canard_tx_queue, const CanardInstance& canard_instance)
{
while (const CanardTxQueueItem* const maybe_item = ::canardTxPeek(&canard_tx_queue))
{
CanardTxQueueItem* const item = ::canardTxPop(&canard_tx_queue, maybe_item);

// No Sonar `cpp:S5356` b/c we need to free tx item allocated by libcanard as a raw memory.
canard_tx_queue.memory.deallocate(canard_tx_queue.memory.user_reference,
item->allocated_size,
item); // NOSONAR cpp:S5356
::canardTxFree(&canard_tx_queue, &canard_instance, item);
}
}

Expand Down Expand Up @@ -578,18 +579,20 @@ class TransportImpl final : private TransportDelegate, public ICanTransport //
///
void pushNextFrameToMedia(Media& media)
{
using PayloadFragment = cetl::span<const cetl::byte>;

TimePoint tx_deadline;
while (const CanardTxQueueItem* const tx_item = peekFirstValidTxItem(media.canard_tx_queue(), tx_deadline))
while (CanardTxQueueItem* const tx_item = peekFirstValidTxItem(media.canard_tx_queue(), tx_deadline))
{
// Move the payload from the frame to the media payload - `media.push` might take ownership of it.
// No Sonar `cpp:S5356` and `cpp:S5357` b/c we integrate here with C libcanard API.
const auto* const buffer =
static_cast<const cetl::byte*>(tx_item->frame.payload.data); // NOSONAR cpp:S5356 cpp:S5357
const PayloadFragment payload{buffer, tx_item->frame.payload.size};
//
auto& frame_payload = tx_item->frame.payload;
MediaPayload payload{frame_payload.size,
static_cast<cetl::byte*>(frame_payload.data), // NOSONAR cpp:S5356 cpp:S5357
frame_payload.allocated_size,
&media.interface().getTxMemoryResource()};
frame_payload = {0, nullptr, 0};

IMedia::PushResult::Type push_result =
media.interface().push(tx_deadline, tx_item->frame.extended_can_id, payload);
auto push_result = media.interface().push(tx_deadline, tx_item->frame.extended_can_id, payload);

// In case of media push error, we are going to drop this problematic frame
// (b/c it looks like media can't handle this frame),
Expand All @@ -603,7 +606,19 @@ class TransportImpl final : private TransportDelegate, public ICanTransport //
const auto push = cetl::get<IMedia::PushResult::Success>(push_result);
if (push.is_accepted)
{
popAndFreeCanardTxQueueItem(media.canard_tx_queue(), tx_item, false /* single frame */);
popAndFreeCanardTxQueueItem(media.canard_tx_queue(),
canard_instance(),
tx_item,
false /* single frame */);
}
else
{
// Media has not accepted the frame, so we need return original payload back to the item,
// so that in the future potential retry could try to push it again.
const auto org_payload = payload.release();
frame_payload.size = std::get<0>(org_payload);
frame_payload.data = std::get<1>(org_payload);
frame_payload.allocated_size = std::get<2>(org_payload);
}

// If needed schedule (recursively!) next frame to push.
Expand All @@ -622,7 +637,7 @@ class TransportImpl final : private TransportDelegate, public ICanTransport //
// Release whole problematic transfer from the TX queue,
// so that other transfers in TX queue have their chance.
// Otherwise, we would be stuck in an execution loop trying to send the same frame.
popAndFreeCanardTxQueueItem(media.canard_tx_queue(), tx_item, true /* whole transfer */);
popAndFreeCanardTxQueueItem(media.canard_tx_queue(), canard_instance(), tx_item, true /* whole transfer */);

using Report = TransientErrorReport::MediaPush;
tryHandleTransientMediaFailure<Report>(media, std::move(*push_failure));
Expand All @@ -638,12 +653,11 @@ class TransportImpl final : private TransportDelegate, public ICanTransport //
/// While searching, any of already expired TX items are pop from the queue and freed (aka dropped).
/// If there is no still valid TX items in the queue, returns `nullptr`.
///
CETL_NODISCARD const CanardTxQueueItem* peekFirstValidTxItem(CanardTxQueue& canard_tx,
TimePoint& out_deadline) const
CETL_NODISCARD CanardTxQueueItem* peekFirstValidTxItem(CanardTxQueue& canard_tx, TimePoint& out_deadline) const
{
const TimePoint now = executor_.now();

while (const CanardTxQueueItem* const tx_item = ::canardTxPeek(&canard_tx))
while (CanardTxQueueItem* const tx_item = ::canardTxPeek(&canard_tx))
{
// We are dropping any TX item that has expired.
// Otherwise, we would push it to the media interface.
Expand All @@ -657,7 +671,7 @@ class TransportImpl final : private TransportDelegate, public ICanTransport //
}

// Release whole expired transfer b/c possible next frames of the same transfer are also expired.
popAndFreeCanardTxQueueItem(canard_tx, tx_item, true /* whole transfer */);
popAndFreeCanardTxQueueItem(canard_tx, canard_instance(), tx_item, true /* whole transfer */);
}
return nullptr;
}
Expand Down
13 changes: 8 additions & 5 deletions include/libcyphal/transport/can/delegate.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,11 @@ class TransportDelegate
return canard_instance_;
}

CETL_NODISCARD const CanardInstance& canard_instance() const noexcept
{
return canard_instance_;
}

CETL_NODISCARD cetl::pmr::memory_resource& memory() const noexcept
{
return memory_;
Expand Down Expand Up @@ -271,21 +276,19 @@ class TransportDelegate
/// Pops and frees Canard TX queue item(s).
///
/// @param tx_queue The TX queue from which the item should be popped.
/// @param canard_instance The Canard instance to be used for the item deallocation.
/// @param tx_item The TX queue item to be popped and freed.
/// @param whole_transfer If `true` then whole transfer should be released from the queue.
///
static void popAndFreeCanardTxQueueItem(CanardTxQueue& tx_queue,
const CanardInstance& canard_instance,
const CanardTxQueueItem* tx_item,
const bool whole_transfer)
{
while (CanardTxQueueItem* const mut_tx_item = ::canardTxPop(&tx_queue, tx_item))
{
tx_item = tx_item->next_in_transfer;

// No Sonar `cpp:S5356` b/c we need to free tx item allocated by libcanard as a raw memory.
tx_queue.memory.deallocate(tx_queue.memory.user_reference,
mut_tx_item->allocated_size,
mut_tx_item); // NOSONAR cpp:S5356
::canardTxFree(&tx_queue, &canard_instance, mut_tx_item);

if (!whole_transfer)
{
Expand Down
12 changes: 9 additions & 3 deletions include/libcyphal/transport/can/media.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

#include "libcyphal/executor.hpp"
#include "libcyphal/transport/errors.hpp"
#include "libcyphal/transport/media_payload.hpp"
#include "libcyphal/types.hpp"

#include <cetl/cetl.hpp>
Expand Down Expand Up @@ -66,9 +67,16 @@ class IMedia

/// @brief Schedules the frame for transmission asynchronously and return immediately.
///
/// Concrete media implementation has multiple options with how to handle `payload` buffer:
/// - just copy the buffer data byts (using `payload.getSpan` const method) and return without changing the payload;
/// - take ownership of the buffer (by moving the payload to another internal payload;
/// - calling `payload.reset()` immediately after it's not needed anymore.
/// In any case, the payload should not be changed (moved or reset) if it is not accepted.
///
/// @param deadline The deadline for the push operation. Media implementation should drop the payload
/// if the deadline is exceeded (aka `now > deadline`).
/// @param can_id The destination CAN ID of the frame.
/// @param payload The mutable payload of the frame. Should not be changed if payload is not accepted.
/// @return `true` if the frame is accepted or already timed out;
/// `false` to try again later (f.e. b/c output TX queue is currently full).
/// If any media failure occurred, the frame will be dropped by transport.
Expand All @@ -83,9 +91,7 @@ class IMedia

using Type = Expected<Success, Failure>;
};
virtual PushResult::Type push(const TimePoint deadline,
const CanId can_id,
const cetl::span<const cetl::byte> payload) noexcept = 0;
virtual PushResult::Type push(const TimePoint deadline, const CanId can_id, MediaPayload& payload) noexcept = 0;
///@}

/// @brief Takes the next payload fragment (aka CAN frame) from the reception queue unless it's empty.
Expand Down
10 changes: 6 additions & 4 deletions include/libcyphal/transport/lizard_helpers.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,29 +27,31 @@ class LizardHelpers final

/// Constructs a lizard C memory resource.
///
template <typename MemoryResource>
template <typename MemoryResource, std::size_t Alignment = alignof(std::max_align_t)>
CETL_NODISCARD static MemoryResource makeMemoryResource(cetl::pmr::memory_resource& memory)
{
/// No Sonar `cpp:S5356` is unavoidable - integration with Lizard C memory management.
///
return {&memory, deallocateMemory, allocateMemory}; // NOSONAR cpp:S5356
return {&memory, deallocateMemory<Alignment>, allocateMemory<Alignment>}; // NOSONAR cpp:S5356
}

private:
/// No Sonar `cpp:S5008` is unavoidable - integration with Lizard C memory management.
///
template <std::size_t Alignment>
static void* allocateMemory(void* const user_reference, const std::size_t amount) // NOSONAR cpp:S5008
{
// No Sonar `cpp:S5357` "... isn't related to `void*`.
// B/c we integrate here with lizard C memory management.
auto* const memory = static_cast<cetl::pmr::memory_resource*>(user_reference); // NOSONAR cpp:S5357
CETL_DEBUG_ASSERT(nullptr != user_reference, "Expected PMR as non-null user reference.");

return memory->allocate(amount);
return memory->allocate(amount, Alignment);
}

/// No Sonar `cpp:S5008` is unavoidable - integration with Lizard C memory management.
///
template <std::size_t Alignment>
static void deallocateMemory(void* const user_reference, // NOSONAR cpp:S5008
const std::size_t amount,
void* const pointer) // NOSONAR cpp:S5008
Expand All @@ -59,7 +61,7 @@ class LizardHelpers final
auto* const memory = static_cast<cetl::pmr::memory_resource*>(user_reference); // NOSONAR cpp:S5357
CETL_DEBUG_ASSERT(nullptr != user_reference, "Expected PMR as non-null user reference.");

memory->deallocate(pointer, amount);
memory->deallocate(pointer, amount, Alignment);
}

}; // LizardHelpers
Expand Down
Loading

0 comments on commit 5856066

Please sign in to comment.