From 140909fba2e24ece949de393c1a95d66d354199a Mon Sep 17 00:00:00 2001 From: steviez Date: Thu, 7 Nov 2024 15:14:24 -0600 Subject: [PATCH 1/9] Introduce API to initialize MaybeUninit --- sdk/packet/src/lib.rs | 122 +++++++++++++++++++++++++++++++++++++++++- 1 file changed, 120 insertions(+), 2 deletions(-) diff --git a/sdk/packet/src/lib.rs b/sdk/packet/src/lib.rs index 0f29fe7d327011..ba5d915bcbdc90 100644 --- a/sdk/packet/src/lib.rs +++ b/sdk/packet/src/lib.rs @@ -9,8 +9,9 @@ use solana_frozen_abi_macro::AbiExample; use { bitflags::bitflags, std::{ - fmt, + fmt, io, mem, net::{IpAddr, Ipv4Addr, SocketAddr}, + ptr, slice::SliceIndex, }, }; @@ -158,6 +159,37 @@ impl Packet { &mut self.meta } + #[cfg(feature = "bincode")] + /// Initializes a std::mem::MaybeUninit such that the Packet can + /// be safely extracted via methods such as MaybeUninit::assume_init() + pub fn init_packet( + packet: &mut mem::MaybeUninit, + data: &T, + addr: Option<&SocketAddr>, + ) -> Result<()> { + let mut writer = PacketWriter::new_from_uninit_packet(packet); + bincode::serialize_into(&mut writer, data)?; + + let serialized_size = writer.position(); + let (ip, port) = if let Some(addr) = addr { + (addr.ip(), addr.port()) + } else { + (IpAddr::V4(Ipv4Addr::UNSPECIFIED), 0) + }; + // SAFETY: Access the field by pointer as creating a reference to + // and/or within the uninitialized Packet is undefined behavior + unsafe { + ptr::addr_of_mut!((*packet.as_mut_ptr()).meta).write(Meta { + size: serialized_size, + addr: ip, + port, + flags: PacketFlags::empty(), + }); + } + + Ok(()) + } + #[cfg(feature = "bincode")] pub fn from_data(dest: Option<&SocketAddr>, data: T) -> Result { let mut packet = Self::default(); @@ -224,6 +256,61 @@ impl PartialEq for Packet { } } +/// A custom implementation of io::Write to facilitate safe (non-UB) +/// initialization of a MaybeUninit +struct PacketWriter { + // A pointer to the current write position + position: *mut u8, + // The number of remaining bytes that can be written to + spare_capacity: usize, +} + +impl PacketWriter { + fn new_from_uninit_packet(packet: &mut mem::MaybeUninit) -> Self { + // SAFETY: Access the field by pointer as creating a reference to + // and/or within the uninitialized Packet is undefined behavior + let position = unsafe { ptr::addr_of_mut!((*packet.as_mut_ptr()).buffer) as *mut u8 }; + let spare_capacity = PACKET_DATA_SIZE; + + Self { + position, + spare_capacity, + } + } + + /// The offset of the write pointer within the buffer, which is also the + /// number of bytes that have been written + fn position(&self) -> usize { + PACKET_DATA_SIZE - self.spare_capacity + } +} + +impl io::Write for PacketWriter { + #[inline] + fn write(&mut self, buf: &[u8]) -> io::Result { + if buf.len() > self.spare_capacity { + return Err(io::Error::from(io::ErrorKind::WriteZero)); + } + + // SAFETY: We previously verifed that buf.len() <= self.spare_capacity + // so this write will not push us past the end of the buffer + unsafe { + ptr::copy(buf.as_ptr(), self.position, buf.len()); + // Update position and spare_capacity for the next call to write() + self.position = self.position.add(buf.len()); + // Unchecked math since we know buf.len() <= spare_capacity + self.spare_capacity -= buf.len(); + } + + Ok(buf.len()) + } + + #[inline] + fn flush(&mut self) -> io::Result<()> { + Ok(()) + } +} + impl Meta { pub fn socket_addr(&self) -> SocketAddr { SocketAddr::new(self.addr, self.port) @@ -309,7 +396,7 @@ impl Default for Meta { #[cfg(test)] mod tests { - use super::*; + use {super::*, std::io::Write}; #[test] fn test_deserialize_slice() { @@ -349,4 +436,35 @@ mod tests { Err("the size limit has been reached".to_string()), ); } + + #[test] + fn test_packet_buffer_writer() { + let mut packet = mem::MaybeUninit::::uninit(); + let mut writer = PacketWriter::new_from_uninit_packet(&mut packet); + let total_capacity = writer.spare_capacity; + assert_eq!(total_capacity, PACKET_DATA_SIZE); + let payload: [u8; PACKET_DATA_SIZE] = std::array::from_fn(|i| i as u8 % 255); + + // Write 1200 bytes (1200 total) + let num_to_write = PACKET_DATA_SIZE - 32; + assert_eq!( + num_to_write, + writer.write(&payload[..num_to_write]).unwrap() + ); + assert_eq!(num_to_write, writer.position()); + // Write 28 bytes (1228 total) + assert_eq!(28, writer.write(&payload[1200..1200 + 28]).unwrap()); + assert_eq!(1200 + 28, writer.position()); + // Attempt to write 5 bytes (1233 total) which exceeds buffer capacity + assert!(writer + .write(&payload[1200 + 28 - 1..PACKET_DATA_SIZE]) + .is_err()); + // writer.position() remains unchanged + assert_eq!(1200 + 28, writer.position()); + // Write 4 bytes (1232 total) to fill buffer + assert_eq!(4, writer.write(&payload[1200 + 28..]).unwrap()); + assert_eq!(PACKET_DATA_SIZE, writer.position()); + // Writing any amount of bytes will fail on the already full buffer + assert!(writer.write(&[0]).is_err()); + } } From ad3760fa6861c66ccaf0833baa767cc160142df7 Mon Sep 17 00:00:00 2001 From: steviez Date: Thu, 7 Nov 2024 15:24:45 -0600 Subject: [PATCH 2/9] Rewrite Packet::from_data() in terms of Packet::init_packet() --- sdk/packet/src/lib.rs | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/sdk/packet/src/lib.rs b/sdk/packet/src/lib.rs index ba5d915bcbdc90..e23f741b222b5e 100644 --- a/sdk/packet/src/lib.rs +++ b/sdk/packet/src/lib.rs @@ -192,9 +192,10 @@ impl Packet { #[cfg(feature = "bincode")] pub fn from_data(dest: Option<&SocketAddr>, data: T) -> Result { - let mut packet = Self::default(); - Self::populate_packet(&mut packet, dest, &data)?; - Ok(packet) + let mut packet = mem::MaybeUninit::uninit(); + Self::init_packet(&mut packet, &data, dest)?; + // SAFETY: init_packet_from_data() just initialized the packet + unsafe { Ok(packet.assume_init()) } } #[cfg(feature = "bincode")] From 792f1544f7dabced9e2edf273ebead53d800325f Mon Sep 17 00:00:00 2001 From: steviez Date: Thu, 7 Nov 2024 15:41:21 -0600 Subject: [PATCH 3/9] Update PacketBatch to support .spare_capacity_mut() --- perf/src/cuda_runtime.rs | 5 +++++ perf/src/packet.rs | 5 +++++ 2 files changed, 10 insertions(+) diff --git a/perf/src/cuda_runtime.rs b/perf/src/cuda_runtime.rs index 1aa3b95153816d..d01df0c8496728 100644 --- a/perf/src/cuda_runtime.rs +++ b/perf/src/cuda_runtime.rs @@ -14,6 +14,7 @@ use { rayon::prelude::*, serde::{Deserialize, Serialize}, std::{ + mem::MaybeUninit, ops::{Index, IndexMut}, os::raw::c_int, slice::{Iter, IterMut, SliceIndex}, @@ -129,6 +130,10 @@ impl PinnedVec { self.x.iter_mut() } + pub fn spare_capacity_mut(&mut self) -> &mut [MaybeUninit] { + self.x.spare_capacity_mut() + } + pub fn capacity(&self) -> usize { self.x.capacity() } diff --git a/perf/src/packet.rs b/perf/src/packet.rs index 64bc4366ea431e..e5b8348131592a 100644 --- a/perf/src/packet.rs +++ b/perf/src/packet.rs @@ -7,6 +7,7 @@ use { serde::{de::DeserializeOwned, Deserialize, Serialize}, std::{ io::Read, + mem::MaybeUninit, net::SocketAddr, ops::{Index, IndexMut}, slice::{Iter, IterMut, SliceIndex}, @@ -152,6 +153,10 @@ impl PacketBatch { self.packets.iter_mut() } + pub fn spare_capacity_mut(&mut self) -> &mut [MaybeUninit] { + self.packets.spare_capacity_mut() + } + /// See Vector::set_len() for more details /// /// # Safety From 29814b20a014f421343f951a4b31f0ec0cd10c2f Mon Sep 17 00:00:00 2001 From: steviez Date: Thu, 7 Nov 2024 15:44:26 -0600 Subject: [PATCH 4/9] Update entry.rs to use Packet MaybeUninit interface This avoids potential UB by calling .set_len() on the PacketBatch before the items have properly been initialized --- entry/src/entry.rs | 29 ++++++++++++----------------- 1 file changed, 12 insertions(+), 17 deletions(-) diff --git a/entry/src/entry.rs b/entry/src/entry.rs index 5b3d1bfd3eddd4..f504ad9b966d0c 100644 --- a/entry/src/entry.rs +++ b/entry/src/entry.rs @@ -24,7 +24,6 @@ use { solana_runtime_transaction::runtime_transaction::RuntimeTransaction, solana_sdk::{ hash::Hash, - packet::Meta, transaction::{ Result, SanitizedTransaction, Transaction, TransactionError, TransactionVerificationMode, VersionedTransaction, @@ -548,26 +547,22 @@ fn start_verify_transactions_gpu( num_transactions, "entry-sig-verify", ); - // We use set_len here instead of resize(num_txs, Packet::default()), to save - // memory bandwidth and avoid writing a large amount of data that will be overwritten - // soon afterwards. As well, Packet::default() actually leaves the packet data - // uninitialized, so the initialization would simply write junk into - // the vector anyway. - unsafe { - packet_batch.set_len(num_transactions); - } + + let uninitialized_packets = packet_batch.spare_capacity_mut().iter_mut(); let transaction_iter = transaction_chunk .iter() .map(|tx| tx.to_versioned_transaction()); - let res = packet_batch - .iter_mut() - .zip(transaction_iter) - .all(|(packet, tx)| { - *packet.meta_mut() = Meta::default(); - Packet::populate_packet(packet, None, &tx).is_ok() - }); - if res { + let all_packets_initialized = + uninitialized_packets + .zip(transaction_iter) + .all(|(uninit_packet, tx)| { + Packet::init_packet(uninit_packet, &tx, None).is_ok() + }); + + if all_packets_initialized { + // SAFETY: All packets have been successfully initialized + unsafe { packet_batch.set_len(num_transactions) }; Ok(packet_batch) } else { Err(TransactionError::SanitizeFailure) From 111a86b6afc2e6666598a87a40321a47fc1e2dee Mon Sep 17 00:00:00 2001 From: steviez Date: Thu, 7 Nov 2024 16:34:20 -0600 Subject: [PATCH 5/9] Add another helper to init packet from [&u8] --- entry/src/entry.rs | 2 +- sdk/packet/src/lib.rs | 53 +++++++++++++++++++++++++++++++++++-------- 2 files changed, 45 insertions(+), 10 deletions(-) diff --git a/entry/src/entry.rs b/entry/src/entry.rs index f504ad9b966d0c..897b0457339f44 100644 --- a/entry/src/entry.rs +++ b/entry/src/entry.rs @@ -557,7 +557,7 @@ fn start_verify_transactions_gpu( uninitialized_packets .zip(transaction_iter) .all(|(uninit_packet, tx)| { - Packet::init_packet(uninit_packet, &tx, None).is_ok() + Packet::init_packet_from_data(uninit_packet, &tx, None).is_ok() }); if all_packets_initialized { diff --git a/sdk/packet/src/lib.rs b/sdk/packet/src/lib.rs index e23f741b222b5e..c08603dee34ad8 100644 --- a/sdk/packet/src/lib.rs +++ b/sdk/packet/src/lib.rs @@ -9,7 +9,9 @@ use solana_frozen_abi_macro::AbiExample; use { bitflags::bitflags, std::{ - fmt, io, mem, + fmt, + io::{self, Write}, + mem, net::{IpAddr, Ipv4Addr, SocketAddr}, ptr, slice::SliceIndex, @@ -162,7 +164,7 @@ impl Packet { #[cfg(feature = "bincode")] /// Initializes a std::mem::MaybeUninit such that the Packet can /// be safely extracted via methods such as MaybeUninit::assume_init() - pub fn init_packet( + pub fn init_packet_from_data( packet: &mut mem::MaybeUninit, data: &T, addr: Option<&SocketAddr>, @@ -176,24 +178,57 @@ impl Packet { } else { (IpAddr::V4(Ipv4Addr::UNSPECIFIED), 0) }; - // SAFETY: Access the field by pointer as creating a reference to - // and/or within the uninitialized Packet is undefined behavior - unsafe { - ptr::addr_of_mut!((*packet.as_mut_ptr()).meta).write(Meta { + Self::init_packet_meta( + packet, + Meta { size: serialized_size, addr: ip, port, flags: PacketFlags::empty(), - }); - } + }, + ); + + Ok(()) + } + + pub fn init_packet_from_bytes( + packet: &mut mem::MaybeUninit, + bytes: &[u8], + addr: Option<&SocketAddr>, + ) -> io::Result<()> { + let mut writer = PacketWriter::new_from_uninit_packet(packet); + let num_bytes_written = writer.write(bytes)?; + debug_assert_eq!(bytes.len(), num_bytes_written); + + let size = writer.position(); + let (ip, port) = if let Some(addr) = addr { + (addr.ip(), addr.port()) + } else { + (IpAddr::V4(Ipv4Addr::UNSPECIFIED), 0) + }; + Self::init_packet_meta( + packet, + Meta { + size, + addr: ip, + port, + flags: PacketFlags::empty(), + }, + ); Ok(()) } + fn init_packet_meta(packet: &mut mem::MaybeUninit, meta: Meta) { + // SAFETY: Access the field by pointer as creating a reference to + // and/or within the uninitialized Packet is undefined behavior + unsafe { ptr::addr_of_mut!((*packet.as_mut_ptr()).meta).write(meta) }; + } + #[cfg(feature = "bincode")] pub fn from_data(dest: Option<&SocketAddr>, data: T) -> Result { let mut packet = mem::MaybeUninit::uninit(); - Self::init_packet(&mut packet, &data, dest)?; + Self::init_packet_from_data(&mut packet, &data, dest)?; // SAFETY: init_packet_from_data() just initialized the packet unsafe { Ok(packet.assume_init()) } } From adda120a458c030cfb7dbd7476a28721df619c40 Mon Sep 17 00:00:00 2001 From: steviez Date: Thu, 7 Nov 2024 16:50:09 -0600 Subject: [PATCH 6/9] Update ShredFetchStage to use Packet MaybeUninit interface --- core/src/shred_fetch_stage.rs | 20 +++++++------------- sdk/packet/src/lib.rs | 5 +++++ 2 files changed, 12 insertions(+), 13 deletions(-) diff --git a/core/src/shred_fetch_stage.rs b/core/src/shred_fetch_stage.rs index 6f3e8889423bf4..434d580732e8f5 100644 --- a/core/src/shred_fetch_stage.rs +++ b/core/src/shred_fetch_stage.rs @@ -14,7 +14,7 @@ use { clock::{Slot, DEFAULT_MS_PER_SLOT}, epoch_schedule::EpochSchedule, genesis_config::ClusterType, - packet::{Meta, PACKET_DATA_SIZE}, + packet::{Packet, PACKET_DATA_SIZE}, pubkey::Pubkey, }, solana_streamer::streamer::{self, PacketBatchReceiver, StreamerReceiveStats}, @@ -350,9 +350,6 @@ pub(crate) fn receive_quic_datagrams( }; let mut packet_batch = PacketBatch::new_with_recycler(&recycler, PACKETS_PER_BATCH, "receive_quic_datagrams"); - unsafe { - packet_batch.set_len(PACKETS_PER_BATCH); - }; let deadline = Instant::now() + PACKET_COALESCE_DURATION; let entries = std::iter::once(entry).chain( std::iter::repeat_with(|| quic_datagrams_receiver.recv_deadline(deadline).ok()) @@ -360,19 +357,16 @@ pub(crate) fn receive_quic_datagrams( ); let size = entries .filter(|(_, _, bytes)| bytes.len() <= PACKET_DATA_SIZE) - .zip(packet_batch.iter_mut()) + .zip(packet_batch.spare_capacity_mut().iter_mut()) .map(|((_pubkey, addr, bytes), packet)| { - *packet.meta_mut() = Meta { - size: bytes.len(), - addr: addr.ip(), - port: addr.port(), - flags, - }; - packet.buffer_mut()[..bytes.len()].copy_from_slice(&bytes); + Packet::init_packet_from_bytes(packet, &bytes, Some(&addr)).unwrap(); + // SAFETY: Packet::init_packet_from_bytes() just initialized the packet + unsafe { packet.assume_init_mut().meta_mut().set_flags(flags) }; }) .count(); if size > 0 { - packet_batch.truncate(size); + // SAFETY: By now, size packets have been initialized + unsafe { packet_batch.set_len(size) }; if sender.send(packet_batch).is_err() { return; // The receiver end of the channel is disconnected. } diff --git a/sdk/packet/src/lib.rs b/sdk/packet/src/lib.rs index c08603dee34ad8..4d66fb8efc96ab 100644 --- a/sdk/packet/src/lib.rs +++ b/sdk/packet/src/lib.rs @@ -362,6 +362,11 @@ impl Meta { .set(PacketFlags::FROM_STAKED_NODE, from_staked_node); } + #[inline] + pub fn set_flags(&mut self, flags: PacketFlags) { + self.flags = flags; + } + #[inline] pub fn discard(&self) -> bool { self.flags.contains(PacketFlags::DISCARD) From bf903684011ed545e2d9c2e5073a97d385bf9060 Mon Sep 17 00:00:00 2001 From: steviez Date: Thu, 7 Nov 2024 17:33:42 -0600 Subject: [PATCH 7/9] Use saturating math --- sdk/packet/src/lib.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/sdk/packet/src/lib.rs b/sdk/packet/src/lib.rs index 4d66fb8efc96ab..d4a238af8c1f2b 100644 --- a/sdk/packet/src/lib.rs +++ b/sdk/packet/src/lib.rs @@ -317,7 +317,7 @@ impl PacketWriter { /// The offset of the write pointer within the buffer, which is also the /// number of bytes that have been written fn position(&self) -> usize { - PACKET_DATA_SIZE - self.spare_capacity + PACKET_DATA_SIZE.saturating_sub(self.spare_capacity) } } @@ -329,13 +329,13 @@ impl io::Write for PacketWriter { } // SAFETY: We previously verifed that buf.len() <= self.spare_capacity - // so this write will not push us past the end of the buffer + // so this write will not push us past the end of the buffer. Likewise, + // we can update self.spare_capacity without fear of overflow unsafe { ptr::copy(buf.as_ptr(), self.position, buf.len()); // Update position and spare_capacity for the next call to write() self.position = self.position.add(buf.len()); - // Unchecked math since we know buf.len() <= spare_capacity - self.spare_capacity -= buf.len(); + self.spare_capacity = self.spare_capacity.saturating_sub(buf.len()); } Ok(buf.len()) From c904a9b8dc3374e0acdd3ff8dd8cf7e59245ec52 Mon Sep 17 00:00:00 2001 From: steviez Date: Thu, 7 Nov 2024 17:36:10 -0600 Subject: [PATCH 8/9] Run test_packet_buffer_writer through miri --- ci/test-miri.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ci/test-miri.sh b/ci/test-miri.sh index 3163ba500e2290..e3415462144b51 100755 --- a/ci/test-miri.sh +++ b/ci/test-miri.sh @@ -6,8 +6,8 @@ source ci/_ source ci/rust-version.sh nightly # miri is very slow; so only run very few of selective tests! +_ cargo "+${rust_nightly}" miri test -p solana-packet -- test_packet_buffer_writer _ cargo "+${rust_nightly}" miri test -p solana-program -- hash:: account_info:: - _ cargo "+${rust_nightly}" miri test -p solana-unified-scheduler-logic # run intentionally-#[ignored] ub triggering tests for each to make sure they fail From 085626429446011ceae15dc9f9ab392fbd23b08d Mon Sep 17 00:00:00 2001 From: steviez Date: Fri, 8 Nov 2024 10:27:40 -0600 Subject: [PATCH 9/9] Use ptr::copy_nonoverlapping instead of ptr::copy --- sdk/packet/src/lib.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/packet/src/lib.rs b/sdk/packet/src/lib.rs index d4a238af8c1f2b..aa5270f4a3ada4 100644 --- a/sdk/packet/src/lib.rs +++ b/sdk/packet/src/lib.rs @@ -332,7 +332,7 @@ impl io::Write for PacketWriter { // so this write will not push us past the end of the buffer. Likewise, // we can update self.spare_capacity without fear of overflow unsafe { - ptr::copy(buf.as_ptr(), self.position, buf.len()); + ptr::copy_nonoverlapping(buf.as_ptr(), self.position, buf.len()); // Update position and spare_capacity for the next call to write() self.position = self.position.add(buf.len()); self.spare_capacity = self.spare_capacity.saturating_sub(buf.len());