Skip to content

Commit

Permalink
Implemented UDP runMediaTransmit #verification #docs #sonar
Browse files Browse the repository at this point in the history
  • Loading branch information
serges147 committed Jun 20, 2024
1 parent 15cc8ca commit 855bc97
Show file tree
Hide file tree
Showing 3 changed files with 145 additions and 13 deletions.
4 changes: 2 additions & 2 deletions include/libcyphal/transport/can/can_transport_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -112,8 +112,8 @@ class TransportImpl final : private TransportDelegate, public ICanTransport //
// Verify input arguments:
// - At least one media interface must be provided, but no more than the maximum allowed (255).
//
const auto media_count =
static_cast<std::size_t>(std::count_if(media.begin(), media.end(), [](const IMedia* const media_ptr) {
const auto media_count = static_cast<std::size_t>(
std::count_if(media.begin(), media.end(), [](const IMedia* const media_ptr) -> bool {
return media_ptr != nullptr;
}));
if ((media_count == 0) || (media_count > std::numeric_limits<std::uint8_t>::max()))
Expand Down
12 changes: 11 additions & 1 deletion include/libcyphal/transport/udp/udp_transport.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#include "libcyphal/transport/errors.hpp"
#include "libcyphal/transport/transport.hpp"
#include "media.hpp"
#include "tx_rx_sockets.hpp"

#include <cetl/pf17/cetlpf.hpp>
#include <cetl/pmr/function.hpp>
Expand Down Expand Up @@ -70,9 +71,18 @@ class IUdpTransport : public ITransport
IMedia& culprit;
};

/// @brief Error report about sending a frame to the media TX socket interface.
struct MediaTxSocketSend
{
AnyError error;
std::uint8_t media_index;
ITxSocket& culprit;
};

/// Defines variant of all possible transient error reports.
///
using Variant = cetl::variant<UdpardTxPublish, UdpardTxRequest, UdpardTxRespond, MediaMakeTxSocket>;
using Variant =
cetl::variant<UdpardTxPublish, UdpardTxRequest, UdpardTxRespond, MediaMakeTxSocket, MediaTxSocketSend>;

}; // TransientErrorReport

Expand Down
142 changes: 132 additions & 10 deletions include/libcyphal/transport/udp/udp_transport_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
#include <udpard.h>

#include <algorithm>
#include <array>
#include <cstddef>
#include <cstdint>
#include <limits>
Expand Down Expand Up @@ -126,8 +127,8 @@ class TransportImpl final : private TransportDelegate, public IUdpTransport //
// Verify input arguments:
// - At least one media interface must be provided, but no more than the maximum allowed (3).
//
const auto media_count =
static_cast<std::size_t>(std::count_if(media.begin(), media.end(), [](const IMedia* const media_ptr) {
const auto media_count = static_cast<std::size_t>(
std::count_if(media.begin(), media.end(), [](const IMedia* const media_ptr) -> bool {
return media_ptr != nullptr;
}));
if ((media_count == 0) || (media_count > UDPARD_NETWORK_INTERFACE_COUNT_MAX))
Expand Down Expand Up @@ -324,9 +325,19 @@ class TransportImpl final : private TransportDelegate, public IUdpTransport //

// MARK: IRunnable

CETL_NODISCARD IRunnable::MaybeError run(const TimePoint) override
CETL_NODISCARD IRunnable::MaybeError run(const TimePoint now) override
{
// TODO: Implement!
cetl::optional<AnyError> any_error{};

// We deliberately first run TX as much as possible, and only then running RX -
// transmission will release resources (like TX queue items) and make room for new incoming frames.
//
any_error = runMediaTransmit(now);
if (any_error.has_value())
{
return any_error.value();
}

return {};
}

Expand Down Expand Up @@ -430,16 +441,19 @@ class TransportImpl final : private TransportDelegate, public IUdpTransport //

}; // TxTransferHandler

template <typename Report, typename ErrorVariant>
CETL_NODISCARD cetl::optional<AnyError> tryHandleTransientMediaError(const Media& media, ErrorVariant&& error_var)
template <typename Report, typename ErrorVariant, typename Culprit>
CETL_NODISCARD cetl::optional<AnyError> tryHandleTransientMediaError(const Media& media,
ErrorVariant&& error_var,
Culprit&& culprit)
{
AnyError any_error = common::detail::anyErrorFromVariant(std::forward<ErrorVariant>(error_var));
if (!transient_error_handler_)
{
return any_error;
}

TransientErrorReport::Variant report_var{Report{std::move(any_error), media.index(), media.interface()}};
TransientErrorReport::Variant report_var{
Report{std::move(any_error), media.index(), std::forward<Culprit>(culprit)}};
return transient_error_handler_(report_var);
}

Expand Down Expand Up @@ -501,13 +515,15 @@ class TransportImpl final : private TransportDelegate, public IUdpTransport //
auto maybe_tx_socket = media.interface().makeTxSocket();
if (auto* media_error = cetl::get_if<cetl::variant<MemoryError, PlatformError>>(&maybe_tx_socket))
{
return tryHandleTransientMediaError<ErrorReport>(media, std::move(*media_error));
return tryHandleTransientMediaError<ErrorReport>(media, std::move(*media_error), media.interface());
}

media.tx_socket_ptr() = cetl::get<UniquePtr<ITxSocket>>(std::move(maybe_tx_socket));
if (!media.tx_socket_ptr())
{
return tryHandleTransientMediaError<ErrorReport, cetl::variant<MemoryError>>(media, MemoryError{});
return tryHandleTransientMediaError<ErrorReport, cetl::variant<MemoryError>>(media,
MemoryError{},
media.interface());
}
}

Expand All @@ -518,7 +534,8 @@ class TransportImpl final : private TransportDelegate, public IUdpTransport //
{
for (Media& media : media_array_)
{
auto any_error = withEnsureMediaTxSocket(media, [](auto&, auto&) { return cetl::nullopt; });
cetl::optional<AnyError> any_error =
withEnsureMediaTxSocket(media, [](auto&, auto&) -> cetl::nullopt_t { return cetl::nullopt; });
if (any_error.has_value())
{
return any_error;
Expand All @@ -537,6 +554,111 @@ class TransportImpl final : private TransportDelegate, public IUdpTransport //
}
}

/// @brief Runs transmission loop for each redundant media interface.
///
CETL_NODISCARD cetl::optional<AnyError> runMediaTransmit(const TimePoint now)
{
cetl::optional<AnyError> opt_any_error{};

for (Media& media : media_array_)
{
opt_any_error =
withEnsureMediaTxSocket(media,
[this, now](Media& media, ITxSocket& tx_socket) -> cetl::optional<AnyError> {
return runSingleMediaTransmit(media, tx_socket, now);
});
if (opt_any_error.has_value())
{
break;
}
}

return opt_any_error;
}

/// @brief Runs transmission loop for a single media interface and its TX socket.
///
/// Transmits as much as possible frames that are ready to be sent by the media TX socket interface.
///
CETL_NODISCARD cetl::optional<AnyError> runSingleMediaTransmit(Media& media,
ITxSocket& tx_socket,
const TimePoint now)
{
using PayloadFragment = cetl::span<const cetl::byte>;

while (const UdpardTxItem* const tx_item = ::udpardTxPeek(&media.udpard_tx()))
{
// Prepare a lambda to pop and free the TX queue item, but not just yet.
// In case of socket not being ready to send this item, we will need to retry it on next `run`.
//
auto popAndFreeUdpardTxItem = [tx_queue = &media.udpard_tx(), tx_item]() {
::udpardTxFree(tx_queue->memory, ::udpardTxPop(tx_queue, tx_item));
};

// We are dropping any TX item that has expired.
// Otherwise, we would send it to the media TX socket interface.
// We use strictly `>=` (instead of `>`) to give this frame a chance (one extra 1us) at the socket.
//
const auto deadline = TimePoint{std::chrono::microseconds{tx_item->deadline_usec}};
if (now >= deadline)
{
popAndFreeUdpardTxItem();
continue;
}

const std::array<PayloadFragment, 1> single_payload_fragment{
PayloadFragment{static_cast<const cetl::byte*>(tx_item->datagram_payload.data),
tx_item->datagram_payload.size}};

Expected<bool, cetl::variant<PlatformError, ArgumentError>> maybe_sent =
tx_socket.send({tx_item->destination.ip_address, tx_item->destination.udp_port},
tx_item->dscp,
single_payload_fragment);

// In case of socket send error we are going to drop this problematic frame
// (b/c it looks like media TX socket can't handle this frame),
// but we will continue to process with other frames if transient error handler says so.
// Note that socket not being ready/able to send a frame just yet (aka temporary)
// is not reported as an error (see `is_sent` below).
//
if (auto* error = cetl::get_if<cetl::variant<PlatformError, ArgumentError>>(&maybe_sent))
{
// Release problematic frame from the TX queue, so that other frames in TX queue have their chance.
// Otherwise, we would be stuck in a run loop trying to send the same frame.
popAndFreeUdpardTxItem();

cetl::optional<AnyError> opt_any_error =
tryHandleTransientMediaError<TransientErrorReport::MediaTxSocketSend>(media,
std::move(*error),
tx_socket);
if (!opt_any_error.has_value())
{
// The handler (if any) just said that it's fine to continue with sending other frames
// and ignore such a transient media error (and don't propagate it outside).
continue;
}

return opt_any_error;
}
const auto is_sent = cetl::get<bool>(maybe_sent);
if (!is_sent)
{
// TX socket interface is busy, so we are done with this media for now,
// and will just try again with it later (on next `run`).
// Note, we are NOT releasing this item from the queue, so it will be retried on next `run`.
break;

// TODO: It seems that `Multiplexer` interface would be used here
// but it is not yet implemented, so for now just `break`.
}

popAndFreeUdpardTxItem();

} // for each frame

return cetl::nullopt;
}

// MARK: Data members:

MediaArray media_array_;
Expand Down

0 comments on commit 855bc97

Please sign in to comment.