Skip to content

Commit

Permalink
Add ObjectFetcher tests
Browse files Browse the repository at this point in the history
  • Loading branch information
teor2345 committed Jan 31, 2025
1 parent 4562e28 commit da8c06b
Show file tree
Hide file tree
Showing 5 changed files with 100 additions and 2 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 12 additions & 2 deletions crates/subspace-core-primitives/src/pieces.rs
Original file line number Diff line number Diff line change
Expand Up @@ -624,11 +624,21 @@ impl Record {
pub fn to_raw_record_chunks(
&self,
) -> impl Iterator<Item = &'_ [u8; ScalarBytes::SAFE_BYTES]> + '_ {
// We have zero byte padding from [`ScalarBytes::SAFE_BYTES`] to [`ScalarBytes::FULL_BYTES`] that we need
// to skip
// We have zero byte padding from [`ScalarBytes::SAFE_BYTES`] to
// [`ScalarBytes::FULL_BYTES`] that we need to skip
self.iter()
.map(|bytes| bytes[1..].try_into().expect("Correct length; qed"))
}

/// Convert from a record to mutable raw bytes, assumes dealing with source record that only stores
/// safe bytes in its chunks.
#[inline]
pub fn to_mut_raw_record_chunks(
&mut self,
) -> impl Iterator<Item = &'_ mut [u8; ScalarBytes::SAFE_BYTES]> + '_ {
self.iter_mut()
.map(|bytes| (&mut bytes[1..]).try_into().expect("Correct length; qed"))
}
}

/// Record commitment contained within a piece.
Expand Down
2 changes: 2 additions & 0 deletions shared/subspace-data-retrieval/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ tracing = "0.1.40"

[dev-dependencies]
subspace-runtime-primitives = { version = "0.1.0", path = "../../crates/subspace-runtime-primitives" }
rand = { version = "0.8.5", features = ["min_const_gen"] }
tokio = { version = "1.40.0", features = ["rt-multi-thread", "macros"] }

[features]
parallel = [
Expand Down
63 changes: 63 additions & 0 deletions shared/subspace-data-retrieval/src/object_fetcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1215,6 +1215,7 @@ fn decode_data_length(
mod test {
use super::*;
use parity_scale_codec::{Compact, CompactLen, Encode};
use rand::{thread_rng, RngCore};
use subspace_core_primitives::hashes::Blake3Hash;
use subspace_core_primitives::segments::{
ArchivedBlockProgress, LastArchivedBlock, SegmentCommitment, SegmentHeader,
Expand Down Expand Up @@ -1268,4 +1269,66 @@ mod test {
};
assert_eq!(segment_header.encoded_size(), MAX_SEGMENT_HEADER_SIZE);
}

#[tokio::test(flavor = "multi_thread")]
async fn get_single_piece_object() {
// TODO:
// We need to cover 6 known good cases:
// - start of segment, offset already excludes segment header
// - middle of segment
// - end of segment, no padding
// - end of segment, end of object goes into padding (but not into the next segment)
// - end of segment, end of object length overlaps start of padding (but object does not cross into the next segment)
// - end of segment, start of object length is in padding (but object does not cross into the next segment)
//
// For multiple pieces, we need to cover 5 known good cases:
// - end of segment, end of object goes into padding (but not into the next segment)
// - end of segment, end of object goes into padding, and one piece into the next segment
// - end of segment, end of object goes into padding, and multiple pieces into the next segment
// - end of segment, end of object length overlaps start of padding, and one piece into the next segment
// - end of segment, end of object length overlaps start of padding, and multiple pieces into the next segment

// Generate random piece data
let mut piece_data = vec![0u8; Piece::SIZE];
thread_rng().fill_bytes(piece_data.as_mut_slice());
let mut piece = Piece::try_from(piece_data).unwrap();

// Encode the length of the object at the offset
let object_len = 100;
let object_len_encoded = Compact(object_len as u32).encode();
let offset = MAX_SEGMENT_HEADER_SIZE + 1;

let raw_data = piece.record_mut().to_mut_raw_record_chunks().flatten();
raw_data
.skip(offset)
.zip(object_len_encoded.iter())
.for_each(|(raw_data_byte, len_byte)| {
*raw_data_byte = *len_byte;
});

// Set up the mapping
let piece_index = PieceIndex::from(0);
let object_data = piece
.record()
.to_raw_record_chunks()
.flatten()
.skip(offset + object_len_encoded.len())
.take(object_len)
.copied()
.collect::<Vec<u8>>();

let mapping = GlobalObject {
piece_index,
offset: offset as u32,
hash: blake3_hash(&object_data),
};

// Set up the object fetcher
let piece_getter = vec![(piece_index, piece)];
let object_fetcher = ObjectFetcher::new(Arc::new(piece_getter), object_len);

// Now get the object back
let fetched_data = object_fetcher.fetch_object(mapping).await.unwrap();
assert_eq!(fetched_data, object_data);
}
}
22 changes: 22 additions & 0 deletions shared/subspace-data-retrieval/src/piece_getter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,28 @@ impl PieceGetter for (PieceIndex, Piece) {
}
}

#[async_trait]
impl PieceGetter for Vec<(PieceIndex, Piece)> {
async fn get_piece(&self, piece_index: PieceIndex) -> anyhow::Result<Option<Piece>> {
Ok(self.iter().find_map(|(index, piece)| {
if *index == piece_index {
Some(piece.clone())
} else {
None
}
}))
}

async fn get_pieces<'a>(
&'a self,
piece_indices: Vec<PieceIndex>,
) -> anyhow::Result<
Box<dyn Stream<Item = (PieceIndex, anyhow::Result<Option<Piece>>)> + Send + Unpin + 'a>,
> {
get_pieces_individually(|piece_index| self.get_piece(piece_index), piece_indices)
}
}

/// A default implementation which gets each piece individually, using the `get_piece` async
/// function.
///
Expand Down

0 comments on commit da8c06b

Please sign in to comment.