Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(s2n-quic-dc): implement recv path packet pool #2483

Open
wants to merge 1 commit into
base: main
Choose a base branch
from

Conversation

camshaft
Copy link
Contributor

@camshaft camshaft commented Feb 24, 2025

Description of changes:

The current recv buffer code relys on the s2n_quic_dc::msg::recv::Message module. This works well for an implementation that owns all of the messages that come in on a single socket. However, once we start getting into multiplexing multiple receivers on a single socket, the Message struct doesn't really enable that usecase, at least not efficiently.

We also run into issues if we ever want to support AF_XDP (since the XDP UMEM wants a contiguous region of packets), GRO (since we can't easily split Messages across packet boundaries without a copy), or dedicated recv tasks/threads that dispatch to the registered channels.

This implementation adds a new socket::recv::pool module, which enables all of these use cases. It works by passing around Descriptor pointers that point back to a region of memory (which can easily support AF_XDP). On drop, the descriptors get put back into a free list to make sure we don't leak segments. Additionally, the implementation will return a Segments iterator, which cheaply splits received packets up into GRO segments, which can be sent to distinct tasks/threads, without worrying about synchronizing those regions.

I've also included an example implementation of a worker that loops and receives packets from a blocking UDP socket and dispatches them using the new Router trait. The final implementation will probably be a bit more complicated - it probably needs to support shutting down, busy polling, etc, but it's a starting point to run tests.

Speaking of tests, I've got a test in a branch that sends and receives packets in a loop using this new allocator. Over localhost, I was able to do about 450 Gbps with GSO/GRO. The descriptor free code accounts for about 1% CPU in this example so it's possible it could be slightly improved but I think this is an OK start.

flamegraph

Testing:

I've included the model_test, which was run under miri and bolero with ASAN to try and catch any issues around safety violations. I did initially run into a few issues with leaks and stacked borrow violations, but have addressed all of those issues with this change. The model test is quite comprehensive on all of the operations that can happen with the pool, though it does not currently check for atomic issues, since we don't have loom set up in the s2n-quic-dc crate. That being said, everything that deals with atomics links to the std::sync::Arc code, to justify why it's there.

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.

@camshaft camshaft force-pushed the camshaft/dc-packet-pool branch 2 times, most recently from 32464a7 to e8932f5 Compare February 24, 2025 22:59
@camshaft camshaft marked this pull request as ready for review February 24, 2025 23:29
@camshaft camshaft force-pushed the camshaft/dc-packet-pool branch from e8932f5 to a1e1b25 Compare February 24, 2025 23:32
@Mark-Simulacrum
Copy link
Collaborator

Over localhost, I was able to do about 450 Gbps with GSO/GRO.

That bandwidth feels very close to (main) memory bandwidth (probably ~100 GB/second theoretical) -- and you're hitting 57 GB/second. So that seems good, though a little surprising to me... historically a single UDP socket has peaked at ~300k packets/second in my testing which is around 300_000*(2**16)*8/1000/1000/1000 = 157.2864 Gbps assuming "perfect" 65k packets.

@camshaft
Copy link
Contributor Author

historically a single UDP socket has peaked at ~300k packets/second

That's probably the difference here - I had a socket per core and reuse port on the receiver. I should have mentioned that in my description.

Copy link
Collaborator

@Mark-Simulacrum Mark-Simulacrum left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will come back to this in a bit, but some initial questions/thoughts...

debug_assert_ne!(mem_refs, 0, "reference count underflow");

// if the free_list is still active (the allocator hasn't dropped) then just push the id
// TODO Weak::upgrade is a bit expensive since it clones the `Arc`, only to drop it again
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems unavoidable with a reference count and no separate sync mechanism (e.g., crossbeam-epoch/RCU, global lock, etc.) -- you're fundamentally acquiring and then freeing a lock here on the memory.

address: NonNull<Addr>,
data: NonNull<u8>,

references: AtomicUsize,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we put a comment somewhere on why this can't be something like type Descriptor = Arc<DescriptorInner>? That is a bit larger (extra usize for Weak support) but that doesn't seem likely to be hugely important, and if we care there's crates without the weak counts and/or we can have our own Arc that doesn't have weaks...


/// An unfilled packet
pub struct Unfilled {
desc: Option<Descriptor>,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to check, this Option is just because you can't move out of the field given the Drop impl?

let inner = desc.inner();
let addr = unsafe { &mut *inner.address.as_ptr() };
let capacity = inner.capacity() as usize;
let data = unsafe { core::slice::from_raw_parts_mut(inner.data.as_ptr(), capacity) };
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we assuming the buffer is always fully initialized upfront?

/// [`Filled`] contains elements `[0, at)`.
///
/// This is an `O(1)` operation that just increases the reference count and
/// sets a few indices.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Stale comment? I don't see reference count updates.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants