Skip to content

Commit

Permalink
Use canardTxPoll; MediaPayload::Ownership (#409)
Browse files Browse the repository at this point in the history
  • Loading branch information
serges147 authored Dec 11, 2024
1 parent 914d85d commit f66b8a6
Show file tree
Hide file tree
Showing 14 changed files with 179 additions and 159 deletions.
2 changes: 1 addition & 1 deletion include/libcyphal/application/registry/register.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ enum class SetError : std::uint8_t

/// Defines interface for a register.
///
class IRegister : public cavl::Node<IRegister>
class IRegister : public common::cavl::Node<IRegister>
{
// 1AD1885B-954B-48CF-BAC4-FA0A251D3FC0
// clang-format off
Expand Down
4 changes: 2 additions & 2 deletions include/libcyphal/application/registry/registry_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -159,8 +159,8 @@ class Registry final : public IIntrospectableRegistry
[key = IRegister::Key{name}](const IRegister& other) { return other.compareBy(key); });
}

cetl::pmr::memory_resource& memory_;
cavl::Tree<IRegister> registers_tree_;
cetl::pmr::memory_resource& memory_;
common::cavl::Tree<IRegister> registers_tree_;

}; // Registry

Expand Down
8 changes: 7 additions & 1 deletion include/libcyphal/common/cavl/cavl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@

// NOLINTBEGIN(cppcoreguidelines-pro-bounds-constant-array-index)

namespace libcyphal
{
namespace common
{
namespace cavl
{
template <typename Derived>
Expand Down Expand Up @@ -788,7 +792,7 @@ class Tree final // NOSONAR cpp:S3624
{
public:
/// Helper alias of the compatible node type.
using NodeType = ::cavl::Node<Derived>;
using NodeType = Node<Derived>;
using DerivedType = Derived;

Tree() = default;
Expand Down Expand Up @@ -998,5 +1002,7 @@ class Tree final // NOSONAR cpp:S3624
};

} // namespace cavl
} // namespace common
} // namespace libcyphal

// NOLINTEND(cppcoreguidelines-pro-bounds-constant-array-index)
4 changes: 2 additions & 2 deletions include/libcyphal/platform/single_threaded_executor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ class SingleThreadedExecutor : public IExecutor
}

protected:
class CallbackNode : public cavl::Node<CallbackNode>, public Callback::Interface
class CallbackNode : public libcyphal::common::cavl::Node<CallbackNode>, public Callback::Interface
{
public:
CallbackNode(SingleThreadedExecutor& executor, Callback::Function&& function)
Expand Down Expand Up @@ -272,7 +272,7 @@ class SingleThreadedExecutor : public IExecutor
// MARK: - Data members:

/// Holds AVL tree of registered callback node, sorted by the next execution time.
cavl::Tree<CallbackNode> callback_nodes_;
common::cavl::Tree<CallbackNode> callback_nodes_;

}; // SingleThreadedExecutor

Expand Down
6 changes: 3 additions & 3 deletions include/libcyphal/presentation/client_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ namespace presentation
namespace detail
{

class SharedClient : public cavl::Node<SharedClient>, public SharedObject
class SharedClient : public common::cavl::Node<SharedClient>, public SharedObject
{
public:
using Node::remove;
Expand Down Expand Up @@ -388,9 +388,9 @@ class SharedClient : public cavl::Node<SharedClient>, public SharedObject
const UniquePtr<transport::IRequestTxSession> svc_request_tx_session_;
const UniquePtr<transport::IResponseRxSession> svc_response_rx_session_;
const transport::ResponseRxParams response_rx_params_;
cavl::Tree<CallbackNode> cb_nodes_by_transfer_id_;
common::cavl::Tree<CallbackNode> cb_nodes_by_transfer_id_;
TimePoint nearest_deadline_;
cavl::Tree<TimeoutNode> timeout_nodes_by_deadline_;
common::cavl::Tree<TimeoutNode> timeout_nodes_by_deadline_;
IExecutor::Callback::Any nearest_deadline_callback_;

}; // SharedClient
Expand Down
16 changes: 8 additions & 8 deletions include/libcyphal/presentation/presentation.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -727,14 +727,14 @@ class Presentation final : private detail::IPresentationDelegate

// MARK: Data members:

cetl::pmr::memory_resource& memory_;
IExecutor& executor_;
transport::ITransport& transport_;
cavl::Tree<detail::SharedClient> shared_client_nodes_;
cavl::Tree<detail::PublisherImpl> publisher_impl_nodes_;
cavl::Tree<detail::SubscriberImpl> subscriber_impl_nodes_;
detail::UnRefNode unreferenced_nodes_;
IExecutor::Callback::Any unref_nodes_deleter_callback_;
cetl::pmr::memory_resource& memory_;
IExecutor& executor_;
transport::ITransport& transport_;
common::cavl::Tree<detail::SharedClient> shared_client_nodes_;
common::cavl::Tree<detail::PublisherImpl> publisher_impl_nodes_;
common::cavl::Tree<detail::SubscriberImpl> subscriber_impl_nodes_;
detail::UnRefNode unreferenced_nodes_;
IExecutor::Callback::Any unref_nodes_deleter_callback_;

}; // Presentation

Expand Down
2 changes: 1 addition & 1 deletion include/libcyphal/presentation/publisher_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ namespace presentation
namespace detail
{

class PublisherImpl final : public cavl::Node<PublisherImpl>, public SharedObject
class PublisherImpl final : public common::cavl::Node<PublisherImpl>, public SharedObject
{
public:
using Node::remove;
Expand Down
4 changes: 2 additions & 2 deletions include/libcyphal/presentation/subscriber_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ namespace presentation
namespace detail
{

class SubscriberImpl final : public cavl::Node<SubscriberImpl>, public SharedObject
class SubscriberImpl final : public common::cavl::Node<SubscriberImpl>, public SharedObject
{
public:
using Node::remove;
Expand Down Expand Up @@ -304,7 +304,7 @@ class SubscriberImpl final : public cavl::Node<SubscriberImpl>, public SharedObj
ITimeProvider& time_provider_;
const UniquePtr<transport::IMessageRxSession> msg_rx_session_;
const transport::PortId subject_id_;
cavl::Tree<CallbackNode> callback_nodes_;
common::cavl::Tree<CallbackNode> callback_nodes_;
CallbackNode* next_cb_node_;

}; // SubscriberImpl
Expand Down
137 changes: 73 additions & 64 deletions include/libcyphal/transport/can/can_transport_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,7 @@ class TransportImpl final : private TransportDelegate, public ICanTransport
return MemoryError{};
}

const auto now_us = std::chrono::duration_cast<std::chrono::microseconds>(executor_.now().time_since_epoch());
const auto deadline_us = std::chrono::duration_cast<std::chrono::microseconds>(deadline.time_since_epoch());

for (Media& media : media_array_)
Expand All @@ -335,7 +336,8 @@ class TransportImpl final : private TransportDelegate, public ICanTransport
&canardInstance(),
static_cast<CanardMicrosecond>(deadline_us.count()),
&metadata,
{payload.size(), payload.data()}); // NOSONAR cpp:S5356
{payload.size(), payload.data()}, // NOSONAR cpp:S5356
static_cast<CanardMicrosecond>(now_us.count()));

cetl::optional<AnyFailure> failure =
tryHandleTransientCanardResult<TransientErrorReport::CanardTxPush>(media, result);
Expand Down Expand Up @@ -572,77 +574,84 @@ class TransportImpl final : private TransportDelegate, public ICanTransport
}
}

/// @brief Tries to push next frame from TX queue to media.
///
void pushNextFrameToMedia(Media& media)
std::int8_t handleMediaTxFrame(Media& media, const CanardMicrosecond deadline, CanardMutableFrame& frame)
{
TimePoint tx_deadline;
while (CanardTxQueueItem* const tx_item = peekFirstValidTxItem(media.canard_tx_queue(), tx_deadline))
//
// Move the payload from the frame to the media payload - `media.push` might take ownership of it.
// No Sonar `cpp:S5356` and `cpp:S5357` b/c we integrate here with C libcanard API.
//
MediaPayload payload{frame.payload.size,
static_cast<cetl::byte*>(frame.payload.data), // NOSONAR cpp:S5356 cpp:S5357
frame.payload.allocated_size,
&media.interface().getTxMemoryResource()};
frame.payload = {0, nullptr, 0};

auto push_result = media.interface().push(TimePoint{std::chrono::microseconds{deadline}}, //
frame.extended_can_id,
payload);

if (const auto* const push = cetl::get_if<IMedia::PushResult::Success>(&push_result))
{
// Move the payload from the frame to the media payload - `media.push` might take ownership of it.
// No Sonar `cpp:S5356` and `cpp:S5357` b/c we integrate here with C libcanard API.
//
auto& frame_payload = tx_item->frame.payload;
MediaPayload payload{frame_payload.size,
static_cast<cetl::byte*>(frame_payload.data), // NOSONAR cpp:S5356 cpp:S5357
frame_payload.allocated_size,
&media.interface().getTxMemoryResource()};
frame_payload = {0, nullptr, 0};

auto push_result = media.interface().push(tx_deadline, tx_item->frame.extended_can_id, payload);

// In case of media push error, we are going to drop this problematic frame
// (b/c it looks like media can't handle this frame),
// but we will continue to process with another transfer frame.
// Note that media not being ready/able to push a frame just yet (aka temporary)
// is not reported as an error (see `is_pushed` below).
//
auto* const push_failure = cetl::get_if<IMedia::PushResult::Failure>(&push_result);
if (nullptr == push_failure)
if (!push->is_accepted)
{
const auto push = cetl::get<IMedia::PushResult::Success>(push_result);
if (push.is_accepted)
{
popAndFreeCanardTxQueueItem(media.canard_tx_queue(),
canardInstance(),
tx_item,
false /* single frame */);
}
else
{
// Media has not accepted the frame, so we need return original payload back to the item,
// so that in the future potential retry could try to push it again.
const auto org_payload = payload.release();
frame_payload.size = std::get<0>(org_payload);
frame_payload.data = std::get<1>(org_payload);
frame_payload.allocated_size = std::get<2>(org_payload);
}

// If needed schedule (recursively!) next frame to push.
// Already existing callback will be called by executor when media TX is ready to push more.
//
if (!media.tx_callback())
{
media.tx_callback() = media.interface().registerPushCallback([this, &media](const auto&) {
//
pushNextFrameToMedia(media);
});
}
return;
// Media has not accepted the frame, so we need return original payload back to the item,
// so that in the future potential retry could try to push it again.
// No Sonar `cpp:S5356` b/c we need to pass payload as a raw data to the libcanard.
const auto org_payload = payload.release();
frame.payload.size = org_payload.size;
frame.payload.data = org_payload.data; // NOSONAR cpp:S5356
frame.payload.allocated_size = org_payload.allocated_size;
}

// Release whole problematic transfer from the TX queue,
// so that other transfers in TX queue have their chance.
// Otherwise, we would be stuck in an execution loop trying to send the same frame.
popAndFreeCanardTxQueueItem(media.canard_tx_queue(), canardInstance(), tx_item, true /* whole transfer */);
// If needed schedule (recursively!) next frame to push.
// Already existing callback will be called by executor when media TX is ready to push more.
//
if (!media.tx_callback())
{
media.tx_callback() = media.interface().registerPushCallback([this, &media](const auto&) {
//
pushNextFrameToMedia(media);
});
}
return push->is_accepted ? 1 : 0;
}

using Report = TransientErrorReport::MediaPush;
tryHandleTransientMediaFailure<Report>(media, std::move(*push_failure));
using Report = TransientErrorReport::MediaPush;
tryHandleTransientMediaFailure<Report>(media, cetl::get<IMedia::PushResult::Failure>(std::move(push_result)));
return -1;
}

} // for a valid tx item
/// @brief Tries to push next frame from TX queue to media.
///
void pushNextFrameToMedia(Media& media)
{
auto frame_handler = [this, &media](const CanardMicrosecond deadline,
CanardMutableFrame& frame) -> std::int8_t {
//
return handleMediaTxFrame(media, deadline, frame);
};

// There is nothing to send anymore, so we are done with this media TX - no more callbacks for now.
media.tx_callback().reset();
// In case of a media failure we gonna try to push another frame from the next transfer in the queue, so
// that at least (and at most) one new frame will be succesfully attempted to be pushed in the end.
// Everytime we poll the queue, its size surely decrements (when `result != 0`),
// so there is no risk of infinite loop here.
//
std::int8_t result = -1;
while (result < 0)
{
// No Sonar `cpp:S5356` & `cpp:S5356` b/c we integrate with Canard C api.
result = ::canardTxPoll( //
&media.canard_tx_queue(),
&canardInstance(),
static_cast<CanardMicrosecond>(executor_.now().time_since_epoch().count()),
&frame_handler, // NOSONAR cpp:S5356
[](auto* const user_reference, const auto deadline, auto* frame) {
//
auto* const frame_handler_ptr =
static_cast<decltype(frame_handler)*>(user_reference); // NOSONAR cpp:S5356, cpp:S5357
return (*frame_handler_ptr)(deadline, *frame);
});
}
}

/// @brief Tries to peek the first TX item from the media TX queue which is not expired.
Expand Down
Loading

0 comments on commit f66b8a6

Please sign in to comment.