Skip to content

Commit c826b52

Browse files
Merge #1341
1341: Fix recvmmsg(2) implementation r=asomers a=codeslinger There were two problems discovered with the `recvmmsg(2)` implementation that this changeset attempts to fix: 1. As mentioned in /issues/1325, `recvmmsg(2)` can return fewer messages than requested, and 2. Passing the return value of `recvmmsg(2)` as the number of bytes in the messages received is incorrect. This changeset incorporates the proposed fix from /issues/1325, as well as passing the correct value (`mmsghdr.msg_len`) for the number of bytes in a given message. Co-authored-by: Toby DiPasquale <[email protected]>
2 parents 1794a47 + aef3068 commit c826b52

File tree

2 files changed

+73
-2
lines changed

2 files changed

+73
-2
lines changed

src/sys/socket/mod.rs

+3-2
Original file line numberDiff line numberDiff line change
@@ -1217,17 +1217,18 @@ pub fn recvmmsg<'a, I>(
12171217

12181218
let ret = unsafe { libc::recvmmsg(fd, output.as_mut_ptr(), output.len() as _, flags.bits() as _, timeout) };
12191219

1220-
let r = Errno::result(ret)?;
1220+
let _ = Errno::result(ret)?;
12211221

12221222
Ok(output
12231223
.into_iter()
1224+
.take(ret as usize)
12241225
.zip(addresses.iter().map(|addr| unsafe{addr.assume_init()}))
12251226
.zip(results.into_iter())
12261227
.map(|((mmsghdr, address), (msg_controllen, cmsg_buffer))| {
12271228
unsafe {
12281229
read_mhdr(
12291230
mmsghdr.msg_hdr,
1230-
r as isize,
1231+
mmsghdr.msg_len as isize,
12311232
msg_controllen,
12321233
address,
12331234
cmsg_buffer

test/sys/test_socket.rs

+70
Original file line numberDiff line numberDiff line change
@@ -437,6 +437,76 @@ mod recvfrom {
437437

438438
send_thread.join().unwrap();
439439
}
440+
441+
#[cfg(any(
442+
target_os = "linux",
443+
target_os = "android",
444+
target_os = "freebsd",
445+
target_os = "netbsd",
446+
))]
447+
#[test]
448+
pub fn udp_recvmmsg_dontwait_short_read() {
449+
use nix::sys::uio::IoVec;
450+
use nix::sys::socket::{MsgFlags, recvmmsg};
451+
452+
const NUM_MESSAGES_SENT: usize = 2;
453+
const DATA: [u8; 4] = [1,2,3,4];
454+
455+
let std_sa = SocketAddr::from_str("127.0.0.1:6799").unwrap();
456+
let inet_addr = InetAddr::from_std(&std_sa);
457+
let sock_addr = SockAddr::new_inet(inet_addr);
458+
459+
let rsock = socket(AddressFamily::Inet,
460+
SockType::Datagram,
461+
SockFlag::empty(),
462+
None
463+
).unwrap();
464+
bind(rsock, &sock_addr).unwrap();
465+
let ssock = socket(
466+
AddressFamily::Inet,
467+
SockType::Datagram,
468+
SockFlag::empty(),
469+
None,
470+
).expect("send socket failed");
471+
472+
let send_thread = thread::spawn(move || {
473+
for _ in 0..NUM_MESSAGES_SENT {
474+
sendto(ssock, &DATA[..], &sock_addr, MsgFlags::empty()).unwrap();
475+
}
476+
});
477+
// Ensure we've sent all the messages before continuing so `recvmmsg`
478+
// will return right away
479+
send_thread.join().unwrap();
480+
481+
let mut msgs = std::collections::LinkedList::new();
482+
483+
// Buffers to receive >`NUM_MESSAGES_SENT` messages to ensure `recvmmsg`
484+
// will return when there are fewer than requested messages in the
485+
// kernel buffers when using `MSG_DONTWAIT`.
486+
let mut receive_buffers = [[0u8; 32]; NUM_MESSAGES_SENT + 2];
487+
let iovs: Vec<_> = receive_buffers.iter_mut().map(|buf| {
488+
[IoVec::from_mut_slice(&mut buf[..])]
489+
}).collect();
490+
491+
for iov in &iovs {
492+
msgs.push_back(RecvMmsgData {
493+
iov: iov,
494+
cmsg_buffer: None,
495+
})
496+
};
497+
498+
let res = recvmmsg(rsock, &mut msgs, MsgFlags::MSG_DONTWAIT, None).expect("recvmmsg");
499+
assert_eq!(res.len(), NUM_MESSAGES_SENT);
500+
501+
for RecvMsg { address, bytes, .. } in res.into_iter() {
502+
assert_eq!(AddressFamily::Inet, address.unwrap().family());
503+
assert_eq!(DATA.len(), bytes);
504+
}
505+
506+
for buf in &receive_buffers[..NUM_MESSAGES_SENT] {
507+
assert_eq!(&buf[..DATA.len()], DATA);
508+
}
509+
}
440510
}
441511

442512
// Test error handling of our recvmsg wrapper

0 commit comments

Comments
 (0)