Skip to content

Commit

Permalink
try poll #verification #sonar #docs
Browse files Browse the repository at this point in the history
  • Loading branch information
serges147 committed Dec 5, 2024
1 parent 9974086 commit 1b7345b
Show file tree
Hide file tree
Showing 3 changed files with 82 additions and 10 deletions.
2 changes: 1 addition & 1 deletion cmake/modules/Findlibcanard.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
include(FetchContent)

set(libcanard_GIT_REPOSITORY "https://github.com/OpenCyphal/libcanard.git")
set(libcanard_GIT_TAG "v4")
set(libcanard_GIT_TAG "sshirokov/v4_tx_poll")

FetchContent_Declare(
libcanard
Expand Down
78 changes: 75 additions & 3 deletions include/libcyphal/transport/can/can_transport_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,7 @@ class TransportImpl final : private TransportDelegate, public ICanTransport
return MemoryError{};
}

const auto now_us = std::chrono::duration_cast<std::chrono::microseconds>(executor_.now().time_since_epoch());
const auto deadline_us = std::chrono::duration_cast<std::chrono::microseconds>(deadline.time_since_epoch());

for (Media& media : media_array_)
Expand All @@ -335,7 +336,8 @@ class TransportImpl final : private TransportDelegate, public ICanTransport
&canardInstance(),
static_cast<CanardMicrosecond>(deadline_us.count()),
&metadata,
{payload.size(), payload.data()}); // NOSONAR cpp:S5356
{payload.size(), payload.data()}, // NOSONAR cpp:S5356
static_cast<CanardMicrosecond>(now_us.count()));

cetl::optional<AnyFailure> failure =
tryHandleTransientCanardResult<TransientErrorReport::CanardTxPush>(media, result);
Expand Down Expand Up @@ -574,7 +576,7 @@ class TransportImpl final : private TransportDelegate, public ICanTransport

/// @brief Tries to push next frame from TX queue to media.
///
void pushNextFrameToMedia(Media& media)
void pushNextFrameToMedia_old(Media& media)
{
TimePoint tx_deadline;
while (CanardTxQueueItem* const tx_item = peekFirstValidTxItem(media.canard_tx_queue(), tx_deadline))
Expand Down Expand Up @@ -625,7 +627,7 @@ class TransportImpl final : private TransportDelegate, public ICanTransport
{
media.tx_callback() = media.interface().registerPushCallback([this, &media](const auto&) {
//
pushNextFrameToMedia(media);
pushNextFrameToMedia_old(media);
});
}
return;
Expand All @@ -645,6 +647,76 @@ class TransportImpl final : private TransportDelegate, public ICanTransport
media.tx_callback().reset();
}

/// @brief Tries to push next frame from TX queue to media.
///
void pushNextFrameToMedia(Media& media)
{
auto frame_handler = [this, &media](const CanardMicrosecond deadline,
CanardMutableFrame& frame) -> std::int8_t {
//
// Move the payload from the frame to the media payload - `media.push` might take ownership of it.
// No Sonar `cpp:S5356` and `cpp:S5357` b/c we integrate here with C libcanard API.
//
MediaPayload payload{frame.payload.size,
static_cast<cetl::byte*>(frame.payload.data), // NOSONAR cpp:S5356 cpp:S5357
frame.payload.allocated_size,
&media.interface().getTxMemoryResource()};
frame.payload = {0, nullptr, 0};

auto push_result = media.interface().push(TimePoint{std::chrono::microseconds{deadline}}, //
frame.extended_can_id,
payload);

if (const auto* const push = cetl::get_if<IMedia::PushResult::Success>(&push_result))
{
if (!push->is_accepted)
{
// Media has not accepted the frame, so we need return original payload back to the item,
// so that in the future potential retry could try to push it again.
const auto org_payload = payload.release();
frame.payload.size = std::get<0>(org_payload);
frame.payload.data = std::get<1>(org_payload);
frame.payload.allocated_size = std::get<2>(org_payload);
}

// If needed schedule (recursively!) next frame to push.
// Already existing callback will be called by executor when media TX is ready to push more.
//
if (!media.tx_callback())
{
media.tx_callback() = media.interface().registerPushCallback([this, &media](const auto&) {
//
pushNextFrameToMedia(media);
});
}
return push->is_accepted ? 1 : 0;
}

using Report = TransientErrorReport::MediaPush;
tryHandleTransientMediaFailure<Report>(media,
cetl::get<IMedia::PushResult::Failure>(std::move(push_result)));
return -1;
};

while (true)
{
const auto result =
::canardTxPoll(&media.canard_tx_queue(),
&canardInstance(),
static_cast<CanardMicrosecond>(executor_.now().time_since_epoch().count()),
&frame_handler,
[](auto* const user_reference, const auto deadline, auto* frame) {
//
auto* frame_handler_ptr = static_cast<decltype(frame_handler)*>(user_reference);
return (*frame_handler_ptr)(deadline, *frame);
});
if (result >= 0)
{
return;
}
}
}

/// @brief Tries to peek the first TX item from the media TX queue which is not expired.
///
/// While searching, any of already expired TX items are pop from the queue and freed (aka dropped).
Expand Down
12 changes: 6 additions & 6 deletions test/unittest/transport/can/test_can_msg_tx_session.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -218,12 +218,12 @@ TEST_F(TestCanMsgTxSession, send_empty_expired_payload)

scheduler_.scheduleAt(1s, [&](const auto&) {
//
// Emulate that media became ready on the very edge of the default 1s timeout (exactly at the deadline).
// Emulate that media became ready on the very edge of the default 1s timeout (+1us after the deadline).
EXPECT_CALL(media_mock_, push(_, _, _)) //
.WillOnce(Return(IMedia::PushResult::Success{false /* is_accepted */}));
EXPECT_CALL(media_mock_, registerPushCallback(_)) //
.WillOnce(Invoke([&](auto function) { //
return scheduler_.registerAndScheduleNamedCallback("", now() + timeout, std::move(function));
return scheduler_.registerAndScheduleNamedCallback("", now() + timeout + 1us, std::move(function));
}));

metadata.deadline = now() + timeout;
Expand All @@ -248,23 +248,23 @@ TEST_F(TestCanMsgTxSession, send_7bytes_payload_with_500ms_timeout)

scheduler_.scheduleAt(1s, [&](const auto&) {
//
// Emulate that socket became ready on the very edge of the 500ms timeout (just 1us before the deadline).
// Emulate that socket became ready on the very edge of the 500ms timeout (exactly at the deadline).
EXPECT_CALL(media_mock_, push(_, _, _)) //
.WillOnce(Return(IMedia::PushResult::Success{false /* is_accepted */}));
EXPECT_CALL(media_mock_, registerPushCallback(_)) //
.WillOnce(Invoke([&](auto function) { //
return scheduler_.registerAndScheduleNamedCallback("", now() + timeout - 1us, std::move(function));
return scheduler_.registerAndScheduleNamedCallback("", now() + timeout, std::move(function));
}));

metadata.deadline = now() + timeout;
auto failure = session->send(metadata, makeSpansFrom(payload));
EXPECT_THAT(failure, Eq(cetl::nullopt));
});
scheduler_.scheduleAt(1s + timeout - 1us, [&](const auto&) {
scheduler_.scheduleAt(1s + timeout, [&](const auto&) {
//
EXPECT_CALL(media_mock_, push(_, _, _)) //
.WillOnce([&](auto, auto can_id, auto& pld) {
EXPECT_THAT(now(), metadata.deadline - 1us);
EXPECT_THAT(now(), metadata.deadline);
EXPECT_THAT(can_id, SubjectOfCanIdEq(17));
EXPECT_THAT(can_id, AllOf(PriorityOfCanIdEq(metadata.base.priority), IsMessageCanId()));

Expand Down

0 comments on commit 1b7345b

Please sign in to comment.