Skip to content

Commit d2af1e7

Browse files
committed
Added sendmmsg and recvmmsg (#574) (#575)
1 parent dd97053 commit d2af1e7

File tree

4 files changed

+445
-1
lines changed

4 files changed

+445
-1
lines changed

src/lib.rs

Lines changed: 199 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -699,3 +699,202 @@ impl<'name, 'bufs, 'control> fmt::Debug for MsgHdrMut<'name, 'bufs, 'control> {
699699
"MsgHdrMut".fmt(fmt)
700700
}
701701
}
702+
703+
/// Configuration of a `sendmmsg(2)` system call.
704+
///
705+
/// This wraps `mmsghdr` on Unix. Also see [`MMsgHdrMut`] for the variant used by `recvmmsg(2)`.
706+
/// This API is not available on Windows.
707+
#[cfg(any(
708+
target_os = "aix",
709+
target_os = "android",
710+
target_os = "freebsd",
711+
target_os = "linux",
712+
target_os = "netbsd",
713+
target_os = "openbsd",
714+
))]
715+
pub struct MMsgHdr<'addr, 'bufs, 'control> {
716+
inner: sys::mmsghdr,
717+
#[allow(clippy::type_complexity)]
718+
_lifetimes: PhantomData<(&'addr SockAddr, &'bufs IoSlice<'bufs>, &'control [u8])>,
719+
}
720+
721+
#[cfg(any(
722+
target_os = "aix",
723+
target_os = "android",
724+
target_os = "freebsd",
725+
target_os = "linux",
726+
target_os = "netbsd",
727+
target_os = "openbsd",
728+
))]
729+
impl<'addr, 'bufs, 'control> MMsgHdr<'addr, 'bufs, 'control> {
730+
/// Create a new `MMsgHdr` with all empty/zero fields.
731+
#[allow(clippy::new_without_default)]
732+
pub fn new() -> MMsgHdr<'addr, 'bufs, 'control> {
733+
// SAFETY: all zero is valid for `mmsghdr`.
734+
MMsgHdr {
735+
inner: unsafe { mem::zeroed() },
736+
_lifetimes: PhantomData,
737+
}
738+
}
739+
740+
/// Set the address (name) of the message.
741+
///
742+
/// Corresponds to setting `msg_name` and `msg_namelen` on Unix.
743+
pub fn with_addr(mut self, addr: &'addr SockAddr) -> Self {
744+
sys::set_msghdr_name(&mut self.inner.msg_hdr, addr);
745+
self
746+
}
747+
748+
/// Set the buffer(s) of the message.
749+
///
750+
/// Corresponds to setting `msg_iov` and `msg_iovlen` on Unix.
751+
pub fn with_buffers(mut self, bufs: &'bufs [IoSlice<'_>]) -> Self {
752+
let ptr = bufs.as_ptr() as *mut _;
753+
sys::set_msghdr_iov(&mut self.inner.msg_hdr, ptr, bufs.len());
754+
self
755+
}
756+
757+
/// Set the control buffer of the message.
758+
///
759+
/// Corresponds to setting `msg_control` and `msg_controllen` on Unix.
760+
pub fn with_control(mut self, buf: &'control [u8]) -> Self {
761+
let ptr = buf.as_ptr() as *mut _;
762+
sys::set_msghdr_control(&mut self.inner.msg_hdr, ptr, buf.len());
763+
self
764+
}
765+
766+
/// Set the flags of the message.
767+
///
768+
/// Corresponds to setting `msg_flags` on Unix.
769+
pub fn with_flags(mut self, flags: sys::c_int) -> Self {
770+
sys::set_msghdr_flags(&mut self.inner.msg_hdr, flags);
771+
self
772+
}
773+
774+
/// Gets the number of sent bytes.
775+
///
776+
/// Corresponds to `msg_len` on Unix.
777+
pub fn data_len(&self) -> usize {
778+
self.inner.msg_len as usize
779+
}
780+
}
781+
782+
#[cfg(any(
783+
target_os = "aix",
784+
target_os = "android",
785+
target_os = "freebsd",
786+
target_os = "linux",
787+
target_os = "netbsd",
788+
target_os = "openbsd",
789+
))]
790+
impl<'name, 'bufs, 'control> fmt::Debug for MMsgHdr<'name, 'bufs, 'control> {
791+
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
792+
"MMsgHdr".fmt(fmt)
793+
}
794+
}
795+
796+
/// Configuration of a `recvmmsg(2)` system call.
797+
///
798+
/// This wraps `mmsghdr` on Unix. Also see [`MMsgHdr`] for the variant used by `sendmmsg(2)`.
799+
/// This API is not available on Windows.
800+
#[cfg(any(
801+
target_os = "aix",
802+
target_os = "android",
803+
target_os = "freebsd",
804+
target_os = "linux",
805+
target_os = "netbsd",
806+
target_os = "openbsd",
807+
))]
808+
pub struct MMsgHdrMut<'addr, 'bufs, 'control> {
809+
inner: sys::mmsghdr,
810+
#[allow(clippy::type_complexity)]
811+
_lifetimes: PhantomData<(
812+
&'addr mut SockAddr,
813+
&'bufs mut MaybeUninitSlice<'bufs>,
814+
&'control mut [u8],
815+
)>,
816+
}
817+
818+
#[cfg(any(
819+
target_os = "aix",
820+
target_os = "android",
821+
target_os = "freebsd",
822+
target_os = "linux",
823+
target_os = "netbsd",
824+
target_os = "openbsd",
825+
))]
826+
impl<'addr, 'bufs, 'control> MMsgHdrMut<'addr, 'bufs, 'control> {
827+
/// Create a new `MMsgHdrMut` with all empty/zero fields.
828+
#[allow(clippy::new_without_default)]
829+
pub fn new() -> MMsgHdrMut<'addr, 'bufs, 'control> {
830+
// SAFETY: all zero is valid for `mmsghdr`.
831+
MMsgHdrMut {
832+
inner: unsafe { mem::zeroed() },
833+
_lifetimes: PhantomData,
834+
}
835+
}
836+
837+
/// Set the mutable address (name) of the message.
838+
///
839+
/// Corresponds to setting `msg_name` and `msg_namelen` on Unix.
840+
#[allow(clippy::needless_pass_by_ref_mut)]
841+
pub fn with_addr(mut self, addr: &'addr mut SockAddr) -> Self {
842+
sys::set_msghdr_name(&mut self.inner.msg_hdr, addr);
843+
self
844+
}
845+
846+
/// Set the mutable buffer(s) of the message.
847+
///
848+
/// Corresponds to setting `msg_iov` and `msg_iovlen` on Unix.
849+
pub fn with_buffers(mut self, bufs: &'bufs mut [MaybeUninitSlice<'_>]) -> Self {
850+
sys::set_msghdr_iov(
851+
&mut self.inner.msg_hdr,
852+
bufs.as_mut_ptr().cast(),
853+
bufs.len(),
854+
);
855+
self
856+
}
857+
858+
/// Set the mutable control buffer of the message.
859+
///
860+
/// Corresponds to setting `msg_control` and `msg_controllen` on Unix.
861+
pub fn with_control(mut self, buf: &'control mut [MaybeUninit<u8>]) -> Self {
862+
sys::set_msghdr_control(&mut self.inner.msg_hdr, buf.as_mut_ptr().cast(), buf.len());
863+
self
864+
}
865+
866+
/// Returns the flags of the message.
867+
pub fn flags(&self) -> RecvFlags {
868+
sys::msghdr_flags(&self.inner.msg_hdr)
869+
}
870+
871+
/// Gets the length of the control buffer.
872+
///
873+
/// Can be used to determine how much, if any, of the control buffer was filled by `recvmsg`.
874+
///
875+
/// Corresponds to `msg_controllen` on Unix.
876+
pub fn control_len(&self) -> usize {
877+
sys::msghdr_control_len(&self.inner.msg_hdr)
878+
}
879+
880+
/// Gets the number of received bytes.
881+
///
882+
/// Corresponds to `msg_len` on Unix.
883+
pub fn data_len(&self) -> usize {
884+
self.inner.msg_len as usize
885+
}
886+
}
887+
888+
#[cfg(any(
889+
target_os = "aix",
890+
target_os = "android",
891+
target_os = "freebsd",
892+
target_os = "linux",
893+
target_os = "netbsd",
894+
target_os = "openbsd",
895+
))]
896+
impl<'name, 'bufs, 'control> fmt::Debug for MMsgHdrMut<'name, 'bufs, 'control> {
897+
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
898+
"MMsgHdrMut".fmt(fmt)
899+
}
900+
}

src/socket.rs

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,15 @@ use crate::sys::{self, c_int, getsockopt, setsockopt, Bool};
2424
#[cfg(all(unix, not(target_os = "redox")))]
2525
use crate::MsgHdrMut;
2626
use crate::{Domain, Protocol, SockAddr, TcpKeepalive, Type};
27+
#[cfg(any(
28+
target_os = "aix",
29+
target_os = "android",
30+
target_os = "freebsd",
31+
target_os = "linux",
32+
target_os = "netbsd",
33+
target_os = "openbsd",
34+
))]
35+
use crate::{MMsgHdr, MMsgHdrMut};
2736
#[cfg(not(target_os = "redox"))]
2837
use crate::{MaybeUninitSlice, MsgHdr, RecvFlags};
2938

@@ -648,6 +657,26 @@ impl Socket {
648657
sys::recvmsg(self.as_raw(), msg, flags)
649658
}
650659

660+
/// Receive a list of messages on a socket using a message structure.
661+
/// Note that the timeout is buggy on Linux, see BUGS section in the Linux manual page.
662+
#[doc = man_links!(recvmmsg(2))]
663+
#[cfg(any(
664+
target_os = "aix",
665+
target_os = "android",
666+
target_os = "freebsd",
667+
target_os = "linux",
668+
target_os = "netbsd",
669+
target_os = "openbsd",
670+
))]
671+
pub fn recvmmsg(
672+
&self,
673+
msgs: &mut [MMsgHdrMut<'_, '_, '_>],
674+
flags: sys::c_int,
675+
timeout: Option<Duration>,
676+
) -> io::Result<usize> {
677+
sys::recvmmsg(self.as_raw(), msgs, flags, timeout)
678+
}
679+
651680
/// Sends data on the socket to a connected peer.
652681
///
653682
/// This is typically used on TCP sockets or datagram sockets which have
@@ -749,6 +778,24 @@ impl Socket {
749778
pub fn sendmsg(&self, msg: &MsgHdr<'_, '_, '_>, flags: sys::c_int) -> io::Result<usize> {
750779
sys::sendmsg(self.as_raw(), msg, flags)
751780
}
781+
782+
/// Send a list of messages on a socket using a message structure.
783+
#[doc = man_links!(sendmmsg(2))]
784+
#[cfg(any(
785+
target_os = "aix",
786+
target_os = "android",
787+
target_os = "freebsd",
788+
target_os = "linux",
789+
target_os = "netbsd",
790+
target_os = "openbsd",
791+
))]
792+
pub fn sendmmsg(
793+
&self,
794+
msgs: &mut [MMsgHdr<'_, '_, '_>],
795+
flags: sys::c_int,
796+
) -> io::Result<usize> {
797+
sys::sendmmsg(self.as_raw(), msgs, flags)
798+
}
752799
}
753800

754801
/// Set `SOCK_CLOEXEC` and `NO_HANDLE_INHERIT` on the `ty`pe on platforms that

src/sys/unix.rs

Lines changed: 93 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,15 @@ use libc::ssize_t;
8080
use libc::{in6_addr, in_addr};
8181

8282
use crate::{Domain, Protocol, SockAddr, SockAddrStorage, TcpKeepalive, Type};
83+
#[cfg(any(
84+
target_os = "aix",
85+
target_os = "android",
86+
target_os = "freebsd",
87+
target_os = "linux",
88+
target_os = "netbsd",
89+
target_os = "openbsd",
90+
))]
91+
use crate::{MMsgHdr, MMsgHdrMut};
8392
#[cfg(not(target_os = "redox"))]
8493
use crate::{MsgHdr, MsgHdrMut, RecvFlags};
8594

@@ -739,6 +748,17 @@ pub(crate) fn msghdr_control_len(msg: &msghdr) -> usize {
739748
msg.msg_controllen as _
740749
}
741750

751+
// Used in `MMsgHdr`.
752+
#[cfg(any(
753+
target_os = "aix",
754+
target_os = "android",
755+
target_os = "freebsd",
756+
target_os = "linux",
757+
target_os = "netbsd",
758+
target_os = "openbsd",
759+
))]
760+
pub(crate) use libc::mmsghdr;
761+
742762
/// Unix only API.
743763
impl SockAddr {
744764
/// Constructs a `SockAddr` with the family `AF_VSOCK` and the provided CID/port.
@@ -1093,6 +1113,57 @@ pub(crate) fn recvmsg(
10931113
syscall!(recvmsg(fd, &mut msg.inner, flags)).map(|n| n as usize)
10941114
}
10951115

1116+
fn into_timespec(duration: Duration) -> libc::timespec {
1117+
// https://github.com/rust-lang/libc/issues/1848
1118+
#[cfg_attr(any(target_env = "musl", target_env = "ohos"), allow(deprecated))]
1119+
libc::timespec {
1120+
tv_sec: min(duration.as_secs(), libc::time_t::MAX as u64) as libc::time_t,
1121+
tv_nsec: duration.subsec_nanos() as libc::c_long,
1122+
}
1123+
}
1124+
1125+
// type of the parameter specifying the number of mmsghdr elements in sendmmsg/recvmmsg syscalls
1126+
#[cfg(any(
1127+
target_os = "aix",
1128+
target_os = "android",
1129+
target_os = "linux",
1130+
target_os = "netbsd",
1131+
target_os = "openbsd",
1132+
))]
1133+
type MMsgHdrLen = libc::c_uint;
1134+
#[cfg(target_os = "freebsd")]
1135+
type MMsgHdrLen = usize;
1136+
1137+
#[cfg(any(
1138+
target_os = "aix",
1139+
target_os = "android",
1140+
target_os = "freebsd",
1141+
target_os = "linux",
1142+
target_os = "netbsd",
1143+
target_os = "openbsd",
1144+
))]
1145+
pub(crate) fn recvmmsg(
1146+
fd: Socket,
1147+
msgs: &mut [MMsgHdrMut<'_, '_, '_>],
1148+
flags: c_int,
1149+
timeout: Option<Duration>,
1150+
) -> io::Result<usize> {
1151+
let mut ts: libc::timespec;
1152+
let tp = match timeout {
1153+
Some(d) => {
1154+
ts = into_timespec(d);
1155+
&mut ts
1156+
}
1157+
None => std::ptr::null_mut(),
1158+
};
1159+
// MMsgHdrMut only contains libc::mmsghdr and PhantomData
1160+
let mp = msgs.as_mut_ptr() as *mut libc::mmsghdr;
1161+
// flags is unsigned in musl and ohos libc
1162+
#[cfg(any(target_env = "musl", target_env = "ohos"))]
1163+
let flags = flags.cast_unsigned();
1164+
syscall!(recvmmsg(fd, mp, msgs.len() as MMsgHdrLen, flags, tp)).map(|n| n as usize)
1165+
}
1166+
10961167
pub(crate) fn send(fd: Socket, buf: &[u8], flags: c_int) -> io::Result<usize> {
10971168
syscall!(send(
10981169
fd,
@@ -1137,6 +1208,27 @@ pub(crate) fn sendmsg(fd: Socket, msg: &MsgHdr<'_, '_, '_>, flags: c_int) -> io:
11371208
syscall!(sendmsg(fd, &msg.inner, flags)).map(|n| n as usize)
11381209
}
11391210

1211+
#[cfg(any(
1212+
target_os = "aix",
1213+
target_os = "android",
1214+
target_os = "freebsd",
1215+
target_os = "linux",
1216+
target_os = "netbsd",
1217+
target_os = "openbsd",
1218+
))]
1219+
pub(crate) fn sendmmsg(
1220+
fd: Socket,
1221+
msgs: &mut [MMsgHdr<'_, '_, '_>],
1222+
flags: c_int,
1223+
) -> io::Result<usize> {
1224+
// MMsgHdr only contains libc::mmsghdr and PhantomData
1225+
let mp = msgs.as_mut_ptr() as *mut libc::mmsghdr;
1226+
// flags is unsigned in musl and ohos libc
1227+
#[cfg(any(target_env = "musl", target_env = "ohos"))]
1228+
let flags = flags.cast_unsigned();
1229+
syscall!(sendmmsg(fd, mp, msgs.len() as MMsgHdrLen, flags)).map(|n| n as usize)
1230+
}
1231+
11401232
/// Wrapper around `getsockopt` to deal with platform specific timeouts.
11411233
pub(crate) fn timeout_opt(fd: Socket, opt: c_int, val: c_int) -> io::Result<Option<Duration>> {
11421234
unsafe { getsockopt(fd, opt, val).map(from_timeval) }
@@ -1166,7 +1258,7 @@ pub(crate) fn set_timeout_opt(
11661258
fn into_timeval(duration: Option<Duration>) -> libc::timeval {
11671259
match duration {
11681260
// https://github.com/rust-lang/libc/issues/1848
1169-
#[cfg_attr(target_env = "musl", allow(deprecated))]
1261+
#[cfg_attr(any(target_env = "musl", target_env = "ohos"), allow(deprecated))]
11701262
Some(duration) => libc::timeval {
11711263
tv_sec: min(duration.as_secs(), libc::time_t::MAX as u64) as libc::time_t,
11721264
tv_usec: duration.subsec_micros() as libc::suseconds_t,

0 commit comments

Comments
 (0)