diff --git a/include/libcyphal/transport/can/can_transport_impl.hpp b/include/libcyphal/transport/can/can_transport_impl.hpp index 9581e8b8c..3743395f1 100644 --- a/include/libcyphal/transport/can/can_transport_impl.hpp +++ b/include/libcyphal/transport/can/can_transport_impl.hpp @@ -112,8 +112,8 @@ class TransportImpl final : private TransportDelegate, public ICanTransport // // Verify input arguments: // - At least one media interface must be provided, but no more than the maximum allowed (255). // - const auto media_count = - static_cast(std::count_if(media.begin(), media.end(), [](const IMedia* const media_ptr) { + const auto media_count = static_cast( + std::count_if(media.begin(), media.end(), [](const IMedia* const media_ptr) -> bool { return media_ptr != nullptr; })); if ((media_count == 0) || (media_count > std::numeric_limits::max())) diff --git a/include/libcyphal/transport/udp/udp_transport.hpp b/include/libcyphal/transport/udp/udp_transport.hpp index e703fe4c1..d3a63626f 100644 --- a/include/libcyphal/transport/udp/udp_transport.hpp +++ b/include/libcyphal/transport/udp/udp_transport.hpp @@ -9,6 +9,7 @@ #include "libcyphal/transport/errors.hpp" #include "libcyphal/transport/transport.hpp" #include "media.hpp" +#include "tx_rx_sockets.hpp" #include #include @@ -70,9 +71,18 @@ class IUdpTransport : public ITransport IMedia& culprit; }; + /// @brief Error report about sending a frame to the media TX socket interface. + struct MediaTxSocketSend + { + AnyError error; + std::uint8_t media_index; + ITxSocket& culprit; + }; + /// Defines variant of all possible transient error reports. /// - using Variant = cetl::variant; + using Variant = + cetl::variant; }; // TransientErrorReport diff --git a/include/libcyphal/transport/udp/udp_transport_impl.hpp b/include/libcyphal/transport/udp/udp_transport_impl.hpp index 37856b527..103e32506 100644 --- a/include/libcyphal/transport/udp/udp_transport_impl.hpp +++ b/include/libcyphal/transport/udp/udp_transport_impl.hpp @@ -31,6 +31,7 @@ #include #include +#include #include #include #include @@ -126,8 +127,8 @@ class TransportImpl final : private TransportDelegate, public IUdpTransport // // Verify input arguments: // - At least one media interface must be provided, but no more than the maximum allowed (3). // - const auto media_count = - static_cast(std::count_if(media.begin(), media.end(), [](const IMedia* const media_ptr) { + const auto media_count = static_cast( + std::count_if(media.begin(), media.end(), [](const IMedia* const media_ptr) -> bool { return media_ptr != nullptr; })); if ((media_count == 0) || (media_count > UDPARD_NETWORK_INTERFACE_COUNT_MAX)) @@ -324,9 +325,19 @@ class TransportImpl final : private TransportDelegate, public IUdpTransport // // MARK: IRunnable - CETL_NODISCARD IRunnable::MaybeError run(const TimePoint) override + CETL_NODISCARD IRunnable::MaybeError run(const TimePoint now) override { - // TODO: Implement! + cetl::optional any_error{}; + + // We deliberately first run TX as much as possible, and only then running RX - + // transmission will release resources (like TX queue items) and make room for new incoming frames. + // + any_error = runMediaTransmit(now); + if (any_error.has_value()) + { + return any_error.value(); + } + return {}; } @@ -430,8 +441,10 @@ class TransportImpl final : private TransportDelegate, public IUdpTransport // }; // TxTransferHandler - template - CETL_NODISCARD cetl::optional tryHandleTransientMediaError(const Media& media, ErrorVariant&& error_var) + template + CETL_NODISCARD cetl::optional tryHandleTransientMediaError(const Media& media, + ErrorVariant&& error_var, + Culprit&& culprit) { AnyError any_error = common::detail::anyErrorFromVariant(std::forward(error_var)); if (!transient_error_handler_) @@ -439,7 +452,8 @@ class TransportImpl final : private TransportDelegate, public IUdpTransport // return any_error; } - TransientErrorReport::Variant report_var{Report{std::move(any_error), media.index(), media.interface()}}; + TransientErrorReport::Variant report_var{ + Report{std::move(any_error), media.index(), std::forward(culprit)}}; return transient_error_handler_(report_var); } @@ -501,13 +515,15 @@ class TransportImpl final : private TransportDelegate, public IUdpTransport // auto maybe_tx_socket = media.interface().makeTxSocket(); if (auto* media_error = cetl::get_if>(&maybe_tx_socket)) { - return tryHandleTransientMediaError(media, std::move(*media_error)); + return tryHandleTransientMediaError(media, std::move(*media_error), media.interface()); } media.tx_socket_ptr() = cetl::get>(std::move(maybe_tx_socket)); if (!media.tx_socket_ptr()) { - return tryHandleTransientMediaError>(media, MemoryError{}); + return tryHandleTransientMediaError>(media, + MemoryError{}, + media.interface()); } } @@ -518,7 +534,8 @@ class TransportImpl final : private TransportDelegate, public IUdpTransport // { for (Media& media : media_array_) { - auto any_error = withEnsureMediaTxSocket(media, [](auto&, auto&) { return cetl::nullopt; }); + cetl::optional any_error = + withEnsureMediaTxSocket(media, [](auto&, auto&) -> cetl::nullopt_t { return cetl::nullopt; }); if (any_error.has_value()) { return any_error; @@ -537,6 +554,111 @@ class TransportImpl final : private TransportDelegate, public IUdpTransport // } } + /// @brief Runs transmission loop for each redundant media interface. + /// + CETL_NODISCARD cetl::optional runMediaTransmit(const TimePoint now) + { + cetl::optional opt_any_error{}; + + for (Media& media : media_array_) + { + opt_any_error = + withEnsureMediaTxSocket(media, + [this, now](Media& media, ITxSocket& tx_socket) -> cetl::optional { + return runSingleMediaTransmit(media, tx_socket, now); + }); + if (opt_any_error.has_value()) + { + break; + } + } + + return opt_any_error; + } + + /// @brief Runs transmission loop for a single media interface and its TX socket. + /// + /// Transmits as much as possible frames that are ready to be sent by the media TX socket interface. + /// + CETL_NODISCARD cetl::optional runSingleMediaTransmit(Media& media, + ITxSocket& tx_socket, + const TimePoint now) + { + using PayloadFragment = cetl::span; + + while (const UdpardTxItem* const tx_item = ::udpardTxPeek(&media.udpard_tx())) + { + // Prepare a lambda to pop and free the TX queue item, but not just yet. + // In case of socket not being ready to send this item, we will need to retry it on next `run`. + // + auto popAndFreeUdpardTxItem = [tx_queue = &media.udpard_tx(), tx_item]() { + ::udpardTxFree(tx_queue->memory, ::udpardTxPop(tx_queue, tx_item)); + }; + + // We are dropping any TX item that has expired. + // Otherwise, we would send it to the media TX socket interface. + // We use strictly `>=` (instead of `>`) to give this frame a chance (one extra 1us) at the socket. + // + const auto deadline = TimePoint{std::chrono::microseconds{tx_item->deadline_usec}}; + if (now >= deadline) + { + popAndFreeUdpardTxItem(); + continue; + } + + const std::array single_payload_fragment{ + PayloadFragment{static_cast(tx_item->datagram_payload.data), + tx_item->datagram_payload.size}}; + + Expected> maybe_sent = + tx_socket.send({tx_item->destination.ip_address, tx_item->destination.udp_port}, + tx_item->dscp, + single_payload_fragment); + + // In case of socket send error we are going to drop this problematic frame + // (b/c it looks like media TX socket can't handle this frame), + // but we will continue to process with other frames if transient error handler says so. + // Note that socket not being ready/able to send a frame just yet (aka temporary) + // is not reported as an error (see `is_sent` below). + // + if (auto* error = cetl::get_if>(&maybe_sent)) + { + // Release problematic frame from the TX queue, so that other frames in TX queue have their chance. + // Otherwise, we would be stuck in a run loop trying to send the same frame. + popAndFreeUdpardTxItem(); + + cetl::optional opt_any_error = + tryHandleTransientMediaError(media, + std::move(*error), + tx_socket); + if (!opt_any_error.has_value()) + { + // The handler (if any) just said that it's fine to continue with sending other frames + // and ignore such a transient media error (and don't propagate it outside). + continue; + } + + return opt_any_error; + } + const auto is_sent = cetl::get(maybe_sent); + if (!is_sent) + { + // TX socket interface is busy, so we are done with this media for now, + // and will just try again with it later (on next `run`). + // Note, we are NOT releasing this item from the queue, so it will be retried on next `run`. + break; + + // TODO: It seems that `Multiplexer` interface would be used here + // but it is not yet implemented, so for now just `break`. + } + + popAndFreeUdpardTxItem(); + + } // for each frame + + return cetl::nullopt; + } + // MARK: Data members: MediaArray media_array_;