Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[catpowder] Feature: Add cohosting mode to XDP backend #1470

Merged
merged 1 commit into from
Dec 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions scripts/config/azure.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,12 @@ demikernel:
raw_socket:
linux_interface_name: "abcde"
xdp_interface_index: 0
xdp_cohost_mode: false
# Enable the following for XDP cohosting mode, or override in environment:
# xdp_tcp_ports: [80, 443]
# xdp_udp_ports: [53]
# Enable the following line if you have a VF interface
# xdp_vf_interface_index: 0
dpdk:
eal_init: ["-c", "0xff", "-n", "4", "-a", "WW:WW.W", "--proc-type=auto", "--vdev=net_vdev_netvsc0,iface=abcde"]
tcp_socket_options:
Expand Down
4 changes: 4 additions & 0 deletions scripts/config/default.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ demikernel:
raw_socket:
linux_interface_name: "abcde"
xdp_interface_index: 0
xdp_cohost_mode: false
# Enable the following for XDP cohosting mode, or override in environment:
# xdp_tcp_ports: [80, 443]
# xdp_udp_ports: [53]
# Enable the following line if you have a VF interface
# xdp_vf_interface_index: 0
dpdk:
Expand Down
23 changes: 22 additions & 1 deletion src/rust/catpowder/win/ring/rule/rule.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

use crate::{
catpowder::win::{ring::rule::params::XdpRedirectParams, socket::XdpSocket},
inetstack::protocols::Protocol,
runtime::libxdp,
};
use ::std::mem;
Expand All @@ -24,7 +25,8 @@ pub struct XdpRule(libxdp::XDP_RULE);
//======================================================================================================================

impl XdpRule {
/// Creates a new XDP rule for the target socket.
/// Creates a new XDP rule for the target socket which filters all traffic.
#[allow(dead_code)]
pub fn new(socket: &XdpSocket) -> Self {
let redirect: XdpRedirectParams = XdpRedirectParams::new(socket);
let rule: libxdp::XDP_RULE = unsafe {
Expand All @@ -39,4 +41,23 @@ impl XdpRule {
};
Self(rule)
}

/// Creates a new XDP rule for the target socket which filters for a specific (protocol, port) combination.
pub fn new_for_dest(socket: &XdpSocket, protocol: Protocol, port: u16) -> Self {
let redirect: XdpRedirectParams = XdpRedirectParams::new(socket);
let rule: libxdp::XDP_RULE = unsafe {
let mut rule: libxdp::XDP_RULE = std::mem::zeroed();
rule.Match = match protocol {
Protocol::Udp => libxdp::_XDP_MATCH_TYPE_XDP_MATCH_UDP_DST,
Protocol::Tcp => libxdp::_XDP_MATCH_TYPE_XDP_MATCH_TCP_DST,
};
rule.Action = libxdp::_XDP_RULE_ACTION_XDP_PROGRAM_ACTION_REDIRECT;
*rule.Pattern.Port.as_mut() = port;
// Perform bitwise copy from redirect to rule, as this is a union field.
*rule.__bindgen_anon_1.Redirect.as_mut() = mem::transmute_copy(redirect.as_ref());

rule
};
Self(rule)
}
}
83 changes: 65 additions & 18 deletions src/rust/catpowder/win/ring/rx_ring.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use crate::{
},
socket::XdpSocket,
},
inetstack::protocols::Protocol,
runtime::{fail::Fail, libxdp, limits},
};
use ::std::{cell::RefCell, rc::Rc};
Expand All @@ -26,16 +27,20 @@ use ::std::{cell::RefCell, rc::Rc};

/// A ring for receiving packets.
pub struct RxRing {
/// Index of the interface for the ring.
ifindex: u32,
/// Index of the queue for the ring.
queueid: u32,
/// A user memory region where receive buffers are stored.
mem: Rc<RefCell<UmemReg>>,
/// A ring for receiving packets.
rx_ring: XdpRing,
/// A ring for returning receive buffers to the kernel.
rx_fill_ring: XdpRing,
/// Underlying XDP socket.
_socket: XdpSocket, // NOTE: we keep this here to prevent the socket from being dropped.
socket: XdpSocket, // NOTE: we keep this here to prevent the socket from being dropped.
/// Underlying XDP program.
_program: XdpProgram, // NOTE: we keep this here to prevent the program from being dropped.
_program: Option<XdpProgram>, // NOTE: we keep this here to prevent the program from being dropped.
}

//======================================================================================================================
Expand All @@ -44,7 +49,7 @@ pub struct RxRing {

impl RxRing {
/// Creates a new ring for receiving packets.
pub fn new(api: &mut XdpApi, length: u32, ifindex: u32, queueid: u32) -> Result<Self, Fail> {
fn new(api: &mut XdpApi, length: u32, ifindex: u32, queueid: u32) -> Result<Self, Fail> {
// Create an XDP socket.
trace!("creating xdp socket");
let mut socket: XdpSocket = XdpSocket::create(api)?;
Expand All @@ -63,7 +68,7 @@ impl RxRing {
)?;

// Set rx ring size.
trace!("setting rx ring size");
trace!("setting rx ring size: {}", length);
socket.setsockopt(
api,
libxdp::XSK_SOCKOPT_RX_RING_SIZE,
Expand All @@ -72,7 +77,7 @@ impl RxRing {
)?;

// Set rx fill ring size.
trace!("setting rx fill ring size");
trace!("setting rx fill ring size: {}", length);
socket.setsockopt(
api,
libxdp::XSK_SOCKOPT_RX_FILL_RING_SIZE,
Expand All @@ -81,7 +86,7 @@ impl RxRing {
)?;

// Bind the rx queue.
trace!("binding rx queue");
trace!("binding rx queue for interface {}, queue {}", ifindex, queueid);
socket.bind(api, ifindex, queueid, libxdp::_XSK_BIND_FLAGS_XSK_BIND_FLAG_RX)?;

// Activate socket to enable packet reception.
Expand Down Expand Up @@ -111,25 +116,67 @@ impl RxRing {
unsafe { *b = 0 };
rx_fill_ring.producer_submit(length);

// Create XDP program.
trace!("creating xdp program");
Ok(Self {
ifindex,
queueid,
mem,
rx_ring,
rx_fill_ring,
socket: socket,
_program: None,
})
}

/// Create a new RxRing which redirects all traffic on the (if, queue) pair.
pub fn new_redirect_all(api: &mut XdpApi, length: u32, ifindex: u32, queueid: u32) -> Result<Self, Fail> {
let mut ring: Self = Self::new(api, length, ifindex, queueid)?;
let rules: [XdpRule; 1] = [XdpRule::new(&ring.socket)];
ring.reprogram(api, &rules)?;
Ok(ring)
}

/// Create a new RxRing which redirects only specific TCP/UDP ports on the (if, queue) pair.
pub fn new_cohost(
api: &mut XdpApi,
length: u32,
ifindex: u32,
queueid: u32,
tcp_ports: &[u16],
udp_ports: &[u16],
) -> Result<Self, Fail> {
let mut ring: Self = Self::new(api, length, ifindex, queueid)?;

let rules: Vec<XdpRule> = tcp_ports
.iter()
.map(|port: &u16| XdpRule::new_for_dest(&ring.socket, Protocol::Tcp, *port))
.chain(
udp_ports
.iter()
.map(|port: &u16| XdpRule::new_for_dest(&ring.socket, Protocol::Udp, *port)),
)
.collect::<Vec<XdpRule>>();

ring.reprogram(api, rules.as_slice())?;
Ok(ring)
}

/// Update the RxRing to use the specified rules for filtering.
fn reprogram(&mut self, api: &mut XdpApi, rules: &[XdpRule]) -> Result<(), Fail> {
const XDP_INSPECT_RX: libxdp::XDP_HOOK_ID = libxdp::XDP_HOOK_ID {
Layer: libxdp::_XDP_HOOK_LAYER_XDP_HOOK_L2,
Direction: libxdp::_XDP_HOOK_DATAPATH_DIRECTION_XDP_HOOK_RX,
SubLayer: libxdp::_XDP_HOOK_SUBLAYER_XDP_HOOK_INSPECT,
};
let rules: Vec<XdpRule> = vec![XdpRule::new(&socket)];
let program: XdpProgram = XdpProgram::new(api, &rules, ifindex, &XDP_INSPECT_RX, queueid, 0)?;

trace!("xdp program created");
let program: XdpProgram = XdpProgram::new(api, &rules, self.ifindex, &XDP_INSPECT_RX, self.queueid, 0)?;
trace!(
"xdp program created for interface {}, queue {}",
self.ifindex,
self.queueid
);

Ok(Self {
mem,
rx_ring,
rx_fill_ring,
_socket: socket,
_program: program,
})
self._program = Some(program);
Ok(())
}

/// Reserves a consumer slot in the rx ring.
Expand Down
58 changes: 39 additions & 19 deletions src/rust/catpowder/win/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,48 +63,68 @@ impl SharedCatpowderRuntime {
/// Instantiates a new XDP runtime.
pub fn new(config: &Config) -> Result<Self, Fail> {
let ifindex: u32 = config.local_interface_index()?;
let vf_if_index = config.local_vf_interface_index(); // N.B. this one is optional

trace!("Creating XDP runtime.");
let mut api: XdpApi = XdpApi::new()?;

// Open TX and RX rings
let tx: TxRing = TxRing::new(&mut api, Self::RING_LENGTH, ifindex, 0)?;

let cohost_mode = config.xdp_cohost_mode()?;
let (tcp_ports, udp_ports) = if cohost_mode {
trace!("XDP cohost mode enabled.");
config.xdp_cohost_ports()?
} else {
trace!("XDP not cohosted; will redirect all traffic");
(vec![], vec![])
};

let make_ring = |api: &mut XdpApi, length: u32, ifindex: u32, queueid: u32| -> Result<RxRing, Fail> {
if cohost_mode {
RxRing::new_cohost(
api,
length,
ifindex,
queueid,
tcp_ports.as_slice(),
udp_ports.as_slice(),
)
} else {
RxRing::new_redirect_all(api, length, ifindex, queueid)
}
};

let queue_count: u32 = deduce_rss_settings(&mut api, ifindex)?;
let mut rx_rings: Vec<RxRing> = Vec::with_capacity(queue_count as usize);
for queueid in 0..queue_count {
rx_rings.push(RxRing::new(&mut api, Self::RING_LENGTH, ifindex, queueid as u32)?);
rx_rings.push(make_ring(&mut api, Self::RING_LENGTH, ifindex, queueid as u32)?);
}
trace!("Created {} RX rings on interface {}", rx_rings.len(), ifindex);

if let Ok(vf_if_index) = vf_if_index {
let vf_rx_rings: Vec<RxRing> = if let Ok(vf_if_index) = config.local_vf_interface_index() {
// Optionally create VF RX rings
let vf_queue_count = deduce_rss_settings(&mut api, vf_if_index)?;
let mut vf_rx_rings = Vec::with_capacity(vf_queue_count as usize);
let vf_queue_count: u32 = deduce_rss_settings(&mut api, vf_if_index)?;
let mut vf_rx_rings: Vec<RxRing> = Vec::with_capacity(vf_queue_count as usize);
for queueid in 0..vf_queue_count {
vf_rx_rings.push(RxRing::new(&mut api, Self::RING_LENGTH, vf_if_index, queueid as u32)?);
vf_rx_rings.push(make_ring(&mut api, Self::RING_LENGTH, vf_if_index, queueid as u32)?);
}
trace!(
"Created {} RX rings on VF interface {}.",
vf_rx_rings.len(),
vf_if_index
);

Ok(Self(SharedObject::new(CatpowderRuntimeInner {
api,
tx,
rx_rings,
vf_rx_rings,
})))
vf_rx_rings
} else {
Ok(Self(SharedObject::new(CatpowderRuntimeInner {
api,
tx,
rx_rings,
vf_rx_rings: Vec::new(),
})))
}
vec![]
};

Ok(Self(SharedObject::new(CatpowderRuntimeInner {
api,
tx,
rx_rings,
vf_rx_rings,
})))
}
}

Expand Down
Loading
Loading