Skip to content

Commit

Permalink
bypasses shreds deserialization when recovering Vec<Entry> (#4582)
Browse files Browse the repository at this point in the history
The commit reads the only few necessary fields from the payload,
bypassing a full deserialization.
  • Loading branch information
behzadnouri authored Jan 28, 2025
1 parent 85e8f86 commit fe4b325
Show file tree
Hide file tree
Showing 8 changed files with 168 additions and 75 deletions.
3 changes: 2 additions & 1 deletion core/benches/shredder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,8 @@ fn bench_deshredder(bencher: &mut Bencher) {
&mut ProcessShredsStats::default(),
);
bencher.iter(|| {
let raw = &mut Shredder::deshred(&data_shreds).unwrap();
let data_shreds = data_shreds.iter().map(Shred::payload);
let raw = &mut Shredder::deshred(data_shreds).unwrap();
assert_ne!(raw.len(), 0);
})
}
Expand Down
9 changes: 2 additions & 7 deletions ledger/src/blockstore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3736,14 +3736,9 @@ impl Blockstore {
.multi_get_bytes(&keys)
.zip(indices)
.map(|(shred, index)| {
let Some(shred) = shred? else {
shred?.ok_or_else(|| {
maybe_panic(index);
return Err(BlockstoreError::MissingShred(slot, index));
};
Shred::new_from_serialized_shred(shred).map_err(|err| {
BlockstoreError::InvalidShredData(Box::new(bincode::ErrorKind::Custom(
format!("Could not reconstruct shred from shred payload: {err:?}"),
)))
BlockstoreError::MissingShred(slot, index)
})
});
completed_ranges
Expand Down
97 changes: 67 additions & 30 deletions ledger/src/shred.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,12 +99,8 @@ const SIZE_OF_COMMON_SHRED_HEADER: usize = 83;
const SIZE_OF_DATA_SHRED_HEADERS: usize = 88;
const SIZE_OF_CODING_SHRED_HEADERS: usize = 89;
const SIZE_OF_SIGNATURE: usize = SIGNATURE_BYTES;
const SIZE_OF_SHRED_VARIANT: usize = 1;
const SIZE_OF_SHRED_SLOT: usize = 8;

const OFFSET_OF_SHRED_VARIANT: usize = SIZE_OF_SIGNATURE;
const OFFSET_OF_SHRED_SLOT: usize = SIZE_OF_SIGNATURE + SIZE_OF_SHRED_VARIANT;
const OFFSET_OF_SHRED_INDEX: usize = OFFSET_OF_SHRED_SLOT + SIZE_OF_SHRED_SLOT;

// Shreds are uniformly split into erasure batches with a "target" number of
// data shreds per each batch as below. The actual number of data shreds in
Expand Down Expand Up @@ -464,13 +460,6 @@ impl Shred {
self.common_header().index
}

pub(crate) fn data(&self) -> Result<&[u8], Error> {
match self {
Self::ShredCode(_) => Err(Error::InvalidShredType),
Self::ShredData(shred) => shred.data(),
}
}

// Possibly trimmed payload;
// Should only be used when storing shreds to blockstore.
pub(crate) fn bytes_to_store(&self) -> &[u8] {
Expand Down Expand Up @@ -651,17 +640,17 @@ pub mod layout {
shred.get(..SIZE_OF_COMMON_SHRED_HEADER)
}

#[inline]
pub(crate) fn get_signature(shred: &[u8]) -> Option<Signature> {
shred
.get(..SIZE_OF_SIGNATURE)
.map(Signature::try_from)?
.ok()
let bytes = <[u8; 64]>::try_from(shred.get(..64)?).unwrap();
Some(Signature::from(bytes))
}

pub(crate) const fn get_signature_range() -> Range<usize> {
0..SIZE_OF_SIGNATURE
}

#[inline]
pub(super) fn get_shred_variant(shred: &[u8]) -> Result<ShredVariant, Error> {
let Some(&shred_variant) = shred.get(OFFSET_OF_SHRED_VARIANT) else {
return Err(Error::InvalidPayloadSize(shred.len()));
Expand All @@ -671,36 +660,79 @@ pub mod layout {

#[inline]
pub(super) fn get_shred_type(shred: &[u8]) -> Result<ShredType, Error> {
let shred_variant = get_shred_variant(shred)?;
Ok(ShredType::from(shred_variant))
get_shred_variant(shred).map(ShredType::from)
}

#[inline]
pub fn get_slot(shred: &[u8]) -> Option<Slot> {
<[u8; 8]>::try_from(shred.get(OFFSET_OF_SHRED_SLOT..)?.get(..8)?)
.map(Slot::from_le_bytes)
.ok()
let bytes = <[u8; 8]>::try_from(shred.get(65..65 + 8)?).unwrap();
Some(Slot::from_le_bytes(bytes))
}

#[inline]
pub(super) fn get_index(shred: &[u8]) -> Option<u32> {
<[u8; 4]>::try_from(shred.get(OFFSET_OF_SHRED_INDEX..)?.get(..4)?)
.map(u32::from_le_bytes)
.ok()
pub(crate) fn get_index(shred: &[u8]) -> Option<u32> {
let bytes = <[u8; 4]>::try_from(shred.get(73..73 + 4)?).unwrap();
Some(u32::from_le_bytes(bytes))
}

#[inline]
pub fn get_version(shred: &[u8]) -> Option<u16> {
<[u8; 2]>::try_from(shred.get(77..79)?)
.map(u16::from_le_bytes)
.ok()
let bytes = <[u8; 2]>::try_from(shred.get(77..77 + 2)?).unwrap();
Some(u16::from_le_bytes(bytes))
}

// The caller should verify first that the shred is data and not code!
#[inline]
pub(super) fn get_parent_offset(shred: &[u8]) -> Option<u16> {
debug_assert_eq!(get_shred_type(shred).unwrap(), ShredType::Data);
<[u8; 2]>::try_from(shred.get(83..85)?)
.map(u16::from_le_bytes)
.ok()
let bytes = <[u8; 2]>::try_from(shred.get(83..83 + 2)?).unwrap();
Some(u16::from_le_bytes(bytes))
}

// Returns DataShredHeader.flags.
#[inline]
pub(crate) fn get_flags(shred: &[u8]) -> Result<ShredFlags, Error> {
match get_shred_type(shred)? {
ShredType::Code => Err(Error::InvalidShredType),
ShredType::Data => {
let Some(flags) = shred.get(85).copied() else {
return Err(Error::InvalidPayloadSize(shred.len()));
};
ShredFlags::from_bits(flags).ok_or(Error::InvalidShredFlags(flags))
}
}
}

// Returns DataShredHeader.size for data shreds.
// The caller should verify first that the shred is data and not code!
#[inline]
fn get_data_size(shred: &[u8]) -> Result<u16, Error> {
debug_assert_eq!(get_shred_type(shred).unwrap(), ShredType::Data);
let Some(bytes) = shred.get(86..86 + 2) else {
return Err(Error::InvalidPayloadSize(shred.len()));
};
let bytes = <[u8; 2]>::try_from(bytes).unwrap();
Ok(u16::from_le_bytes(bytes))
}

#[inline]
pub(crate) fn get_data(shred: &[u8]) -> Result<&[u8], Error> {
match get_shred_variant(shred)? {
ShredVariant::LegacyCode => Err(Error::InvalidShredType),
ShredVariant::MerkleCode { .. } => Err(Error::InvalidShredType),
ShredVariant::LegacyData => legacy::ShredData::get_data(shred, get_data_size(shred)?),
ShredVariant::MerkleData {
proof_size,
chained,
resigned,
} => merkle::ShredData::get_data(
shred,
proof_size,
chained,
resigned,
get_data_size(shred)?,
),
}
}

#[inline]
Expand Down Expand Up @@ -1345,6 +1377,11 @@ mod tests {
};

const SIZE_OF_SHRED_INDEX: usize = 4;
const SIZE_OF_SHRED_SLOT: usize = 8;
const SIZE_OF_SHRED_VARIANT: usize = 1;

const OFFSET_OF_SHRED_SLOT: usize = SIZE_OF_SIGNATURE + SIZE_OF_SHRED_VARIANT;
const OFFSET_OF_SHRED_INDEX: usize = OFFSET_OF_SHRED_SLOT + SIZE_OF_SHRED_SLOT;

fn bs58_decode<T: AsRef<[u8]>>(data: T) -> Vec<u8> {
bs58::decode(data).into_vec().unwrap()
Expand Down
34 changes: 22 additions & 12 deletions ledger/src/shred/legacy.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
use {
crate::shred::{
self,
common::impl_shred_common,
shred_code, shred_data,
traits::{Shred, ShredCode as ShredCodeTrait, ShredData as ShredDataTrait},
CodingShredHeader, DataShredHeader, Error, ShredCommonHeader, ShredFlags, ShredVariant,
SIZE_OF_CODING_SHRED_HEADERS, SIZE_OF_COMMON_SHRED_HEADER, SIZE_OF_DATA_SHRED_HEADERS,
SIZE_OF_SIGNATURE,
},
assert_matches::debug_assert_matches,
solana_perf::packet::deserialize_from_with_limit,
solana_sdk::{clock::Slot, signature::Signature},
static_assertions::const_assert_eq,
Expand Down Expand Up @@ -187,19 +189,9 @@ impl ShredDataTrait for ShredData {
&self.data_header
}

#[inline]
fn data(&self) -> Result<&[u8], Error> {
let size = usize::from(self.data_header.size);
#[allow(clippy::manual_range_contains)]
if size > self.payload.len()
|| size < Self::SIZE_OF_HEADERS
|| size > Self::SIZE_OF_HEADERS + Self::CAPACITY
{
return Err(Error::InvalidDataSize {
size: self.data_header.size,
payload: self.payload.len(),
});
}
Ok(&self.payload[Self::SIZE_OF_HEADERS..size])
Self::get_data(&self.payload, self.data_header.size)
}
}

Expand Down Expand Up @@ -260,6 +252,24 @@ impl ShredData {
}
}

// Given shred payload and DataShredHeader.size, returns the slice storing
// ledger entries in the shred.
pub(super) fn get_data(shred: &[u8], size: u16) -> Result<&[u8], Error> {
debug_assert_matches!(
shred::layout::get_shred_variant(shred),
Ok(ShredVariant::LegacyData)
);
let size = usize::from(size);
(Self::SIZE_OF_HEADERS..=Self::SIZE_OF_HEADERS + Self::CAPACITY)
.contains(&size)
.then(|| shred.get(Self::SIZE_OF_HEADERS..size))
.flatten()
.ok_or_else(|| Error::InvalidDataSize {
size: size as u16,
payload: shred.len(),
})
}

pub(super) fn bytes_to_store(&self) -> &[u8] {
// Payload will be padded out to Self::SIZE_OF_PAYLOAD.
// But only need to store the bytes within data_header.size.
Expand Down
57 changes: 44 additions & 13 deletions ledger/src/shred/merkle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,27 @@ impl ShredData {
// Offset into the payload where the erasure coded slice begins.
const ERASURE_SHARD_START_OFFSET: usize = SIZE_OF_SIGNATURE;

// Given shred payload, ShredVariant{..} and DataShredHeader.size, returns
// the slice storing ledger entries in the shred.
pub(super) fn get_data(
shred: &[u8],
proof_size: u8,
chained: bool,
resigned: bool,
size: u16, // DataShredHeader.size
) -> Result<&[u8], Error> {
let size = usize::from(size);
let data_buffer_size = Self::capacity(proof_size, chained, resigned)?;
(Self::SIZE_OF_HEADERS..=Self::SIZE_OF_HEADERS + data_buffer_size)
.contains(&size)
.then(|| shred.get(Self::SIZE_OF_HEADERS..size))
.flatten()
.ok_or_else(|| Error::InvalidDataSize {
size: size as u16,
payload: shred.len(),
})
}

pub(super) fn get_merkle_root(
shred: &[u8],
proof_size: u8,
Expand Down Expand Up @@ -592,6 +613,7 @@ impl ShredDataTrait for ShredData {
&self.data_header
}

#[inline]
fn data(&self) -> Result<&[u8], Error> {
let ShredVariant::MerkleData {
proof_size,
Expand All @@ -601,18 +623,13 @@ impl ShredDataTrait for ShredData {
else {
return Err(Error::InvalidShredVariant);
};
let data_buffer_size = Self::capacity(proof_size, chained, resigned)?;
let size = usize::from(self.data_header.size);
if size > self.payload.len()
|| size < Self::SIZE_OF_HEADERS
|| size > Self::SIZE_OF_HEADERS + data_buffer_size
{
return Err(Error::InvalidDataSize {
size: self.data_header.size,
payload: self.payload.len(),
});
}
Ok(&self.payload[Self::SIZE_OF_HEADERS..size])
Self::get_data(
&self.payload,
proof_size,
chained,
resigned,
self.data_header.size,
)
}
}

Expand Down Expand Up @@ -1919,7 +1936,7 @@ mod test {
assert_eq!(common_header.version, shred_version);
let proof_size = shred.proof_size().unwrap();
match shred {
Shred::ShredCode(_) => {
Shred::ShredCode(shred) => {
assert_eq!(common_header.index, next_code_index + num_coding_shreds);
assert_eq!(
common_header.shred_variant,
Expand All @@ -1930,6 +1947,12 @@ mod test {
}
);
num_coding_shreds += 1;
let shred = shred.payload();
assert_matches!(
shred::layout::get_flags(shred),
Err(Error::InvalidShredType)
);
assert_matches!(shred::layout::get_data(shred), Err(Error::InvalidShredType));
}
Shred::ShredData(shred) => {
assert_eq!(common_header.index, next_shred_index + num_data_shreds);
Expand All @@ -1950,11 +1973,19 @@ mod test {
(shred.data_header.flags & ShredFlags::SHRED_TICK_REFERENCE_MASK).bits(),
reference_tick,
);
let data_header = shred.data_header;
let data = shred.data().unwrap();
let shred = shred.payload();
assert_eq!(
shred::layout::get_parent_offset(shred),
Some(u16::try_from(slot - parent_slot).unwrap()),
);
assert_eq!(
shred::layout::get_parent_offset(shred).unwrap(),
data_header.parent_offset
);
assert_eq!(shred::layout::get_flags(shred).unwrap(), data_header.flags);
assert_eq!(shred::layout::get_data(shred).unwrap(), data);
assert_eq!(
shred::layout::get_reference_tick(shred).unwrap(),
reference_tick
Expand Down
1 change: 0 additions & 1 deletion ledger/src/shred/shred_data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ impl ShredData {
dispatch!(fn data_header(&self) -> &DataShredHeader);

dispatch!(pub(super) fn common_header(&self) -> &ShredCommonHeader);
dispatch!(pub(super) fn data(&self) -> Result<&[u8], Error>);
dispatch!(pub(super) fn erasure_shard(self) -> Result<Vec<u8>, Error>);
dispatch!(pub(super) fn erasure_shard_as_slice(&self) -> Result<&[u8], Error>);
dispatch!(pub(super) fn erasure_shard_index(&self) -> Result<usize, Error>);
Expand Down
Loading

0 comments on commit fe4b325

Please sign in to comment.