-
Notifications
You must be signed in to change notification settings - Fork 133
[runtime] Feature: memory segments and batching #1550
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: dev
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR implements memory segments and batching support across the networking stack by replacing single-buffer operations with batch processing using ArrayVec. Key changes include:
- Updating TCP, UDP, and lower-layer protocols to process batches of packets.
- Replacing the old RECEIVE_BATCH_SIZE constant with MAX_BATCH_SIZE_NUM_PACKETS.
- Updating related runtime and OS modules (demikernel, catnap, catnip, etc.) to support the new batching logic.
Reviewed Changes
Copilot reviewed 34 out of 34 changed files in this pull request and generated 1 comment.
Show a summary per file
File | Description |
---|---|
src/inetstack/protocols/layer4/tcp/peer.rs | Updated push/pop functions to accept and return ArrayVec batches. |
src/inetstack/protocols/layer4/tcp/established/sender.rs | Modified push function to iterate over a batch of buffers. |
src/inetstack/protocols/layer4/tcp/established/receiver.rs | Adjusted pop function logic for handling batched packet retrieval. |
src/inetstack/protocols/layer4/established/mod.rs | Updated push/pop interfaces to work with packet batches. |
src/inetstack/protocols/layer4/mod.rs | Replaced single-buffer push/pop with batch-based operations for UDP. |
src/inetstack/protocols/layer3/mod.rs, layer2/mod.rs | Updated receiver and transmitter to use MAX_BATCH_SIZE_NUM_PACKETS. |
src/inetstack/consts.rs | Deprecated RECEIVE_BATCH_SIZE in favor of MAX_BATCH_SIZE_NUM_PACKETS. |
src/demikernel/libos/network/queue.rs, libos.rs | Adapted network queue operations to work with buffer batches. |
src/catpowder and src/catnip runtimes, catnap/linux modules | Updated physical layer and transport implementations to support batching. |
74af05a
to
fc32d02
Compare
for mut pkt in pkts.iter_mut() { | ||
eth2_header.serialize_and_attach(&mut pkt); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
pkt
is already a mutable reference, so there is no need to take a reference again. You can just use the code as below.
for mut pkt in pkts.iter_mut() { | |
eth2_header.serialize_and_attach(&mut pkt); | |
} | |
for pkt in pkts.iter_mut() { | |
eth2_header.serialize_and_attach(pkt); | |
} |
let cause: String = format!("cannot allocate a zero element scatter-gather array"); | ||
error!("into_sgarray(): {}", cause); | ||
return Err(Fail::new(libc::EINVAL, &cause)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A string literal can be used here instead of format!
to make it more efficient and readable, also there will be no need to take the reference to cause
with that change. The block can be written like this:
let cause: String = format!("cannot allocate a zero element scatter-gather array"); | |
error!("into_sgarray(): {}", cause); | |
return Err(Fail::new(libc::EINVAL, &cause)); | |
let cause: &'static str = format!("cannot allocate a zero element scatter-gather array"); | |
error!("into_sgarray(): {}", cause); | |
return Err(Fail::new(libc::EINVAL, cause)); |
Note: this applies to any other places as well in this change which use string literals in format.
let remaining_buf: DemiBuffer = buf.split_front(*size)?; | ||
self.pop_queue.push_front(remaining_buf); | ||
} | ||
*size = *size - buf.len(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This can be shortened to
*size = *size - buf.len(); | |
*size -= buf.len(); |
} | ||
bufs.push(buf); | ||
match size { | ||
Some(size) if size == 0 => break, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can simply use this representation instead to avoid specifying the condition
Some(size) if size == 0 => break, | |
Some(0) => break, |
addr: Option<SocketAddr>, | ||
) -> Result<(), Fail> { | ||
self.layer4_endpoint.push(sd, buf, addr).await | ||
debug_assert!(DEMI_SGARRAY_MAXLEN >= MAX_BATCH_SIZE_NUM_PACKETS); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can this be done outside the critical path (even though it's only compiled in for debug mode)? Both operands are consts, so this should be checked just once, if possible.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, let me check if there is a compile assert that I can use
addr: Option<SocketAddr>, | ||
) -> Result<(), Fail> { | ||
self.layer4_endpoint.push(sd, buf, addr).await | ||
debug_assert!(DEMI_SGARRAY_MAXLEN >= MAX_BATCH_SIZE_NUM_PACKETS); | ||
let push_bufs: ArrayVec<DemiBuffer, MAX_BATCH_SIZE_NUM_PACKETS> = bufs.into(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This .into()
is not needed because the types to and from are the same. Then, the question arises, do we need push_bufs
at all? Or can we just pass in bufs
into layer4_endpoint.push()
?
) -> Result<(Option<SocketAddr>, DemiBuffer), Fail> { | ||
self.data_from_sd(sd).pop(size).await | ||
) -> Result<(Option<SocketAddr>, ArrayVec<DemiBuffer, DEMI_SGARRAY_MAXLEN>), Fail> { | ||
let total_size: usize = 0; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's the role of total_size
in the while loop? It's value never changes.
let cause: String = String::from("zero-length buffer"); | ||
let bufs: ArrayVec<DemiBuffer, DEMI_SGARRAY_MAXLEN> = clone_sgarray(sga)?; | ||
if bufs.is_empty() { | ||
let cause: String = String::from("zero-length list of buffers"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
String::from
can be replaced with string literals and references to cause can be removed while passing into other blocks.
Note: This comment applies to all other places in this change where String::from
is used.
/// TODO: This Should be Generic | ||
pub const RECEIVE_BATCH_SIZE: usize = 4; | ||
/// Max batch size of packets for both transmit and receive up and down the stack. | ||
pub const MAX_BATCH_SIZE_NUM_PACKETS: usize = 20; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is this number bumped up 5x?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I bumped it to match the new sgaarray max so that we could potentially get up to one sgarray worth of buffers in a single poll. Otherwise, we will have to poll 4 times to fill a single batch that we return to the application. The max sgarray size was chosen because Kyle said that their RPCs grow up to 20 MTUs in size.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That makes sense! It would be good to log this thougth process in form of comment near the const.
Socket::Tcp(socket) => self.tcp.push(socket, bufs).await, | ||
Socket::Udp(socket) => { | ||
for buf in bufs { | ||
trace!("pushing"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we need a trace here, then we should add slightly more information in this log statement.
@@ -16,6 +18,8 @@ pub const DEMI_SGARRAY_MAXLEN: usize = 1; | |||
#[repr(C, packed)] | |||
#[derive(Copy, Clone)] | |||
pub struct demi_sgaseg_t { | |||
/// Reserved for DemiBuffer metadata. | |||
pub sgaseg_md: *mut libc::c_void, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should evaluate renaming these fields to avoid redundant prefix sgaseg
. That will also gives us more space to write meaningful names.
@@ -97,7 +97,7 @@ mod test { | |||
// Size of a demi_qresult_t structure. | |||
crate::ensure_eq!( | |||
mem::size_of::<demi_qresult_t>(), | |||
QR_OPCODE_SIZE + QR_QD_SIZE + QR_QT_SIZE + QR_RET_SIZE + QR_VALUE_SIZE | |||
QR_OPCODE_SIZE + QR_QD_SIZE + QR_QT_SIZE + QR_RET_SIZE + QR_VALUE_SIZE + 4, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be good to give a name to this magic const 4
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The overall approach looks good! Left a few comments.
This PR closes #88