From cb31ed522e407608a0c1e1921927c7871a935733 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philipp=20Kr=C3=BCger?= Date: Wed, 3 Jan 2024 12:46:03 +0100 Subject: [PATCH 1/6] feat: Implement streaming --- Cargo.lock | 95 ++++-- car-mirror/Cargo.toml | 2 - car-mirror/src/common.rs | 318 +++++++++++++-------- car-mirror/src/incremental_verification.rs | 45 +++ 4 files changed, 306 insertions(+), 154 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 679fab7..cab4c10 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -180,28 +180,6 @@ dependencies = [ "wasm-bindgen-futures", ] -[[package]] -name = "async-stream" -version = "0.3.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cd56dd203fef61ac097dd65721a419ddccb106b2d2b70ba60a6b529f03961a51" -dependencies = [ - "async-stream-impl", - "futures-core", - "pin-project-lite", -] - -[[package]] -name = "async-stream-impl" -version = "0.3.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "16e62a023e7c117e27523144c5d2459f4397fcc3cab0085af8e2224f643a0193" -dependencies = [ - "proc-macro2", - "quote", - "syn 2.0.28", -] - [[package]] name = "async-task" version = "4.4.0" @@ -389,7 +367,6 @@ version = "0.1.0" dependencies = [ "anyhow", "async-std", - "async-stream", "async-trait", "bytes", "car-mirror", @@ -403,6 +380,7 @@ dependencies = [ "roaring-graphs", "serde", "serde_ipld_dagcbor", + "test-log", "test-strategy", "testresult", "thiserror", @@ -1205,6 +1183,15 @@ dependencies = [ "value-bag", ] +[[package]] +name = "matchers" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8263075bb86c5a1b1427b5ae862e8889656f126e9f77c484496e8b47cf5c5558" +dependencies = [ + "regex-automata", +] + [[package]] name = "memchr" version = "2.5.0" @@ -1565,6 +1552,15 @@ dependencies = [ "regex-syntax 0.7.2", ] +[[package]] +name = "regex-automata" +version = "0.1.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6c230d73fb8d8c1b9c0b3135c5142a8acee3a0558fb8db5cf1cb65f8d7862132" +dependencies = [ + "regex-syntax 0.6.29", +] + [[package]] name = "regex-syntax" version = "0.6.29" @@ -1749,9 +1745,9 @@ dependencies = [ [[package]] name = "sharded-slab" -version = "0.1.4" +version = "0.1.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "900fba806f70c630b0a382d0d825e17a0f19fcd059a2ade1ff237bcddf446b31" +checksum = "f40ca3c46823713e0d4209592e8d6e826aa57e928f09752619fc696c499637f6" dependencies = [ "lazy_static", ] @@ -1858,6 +1854,27 @@ dependencies = [ "windows-sys", ] +[[package]] +name = "test-log" +version = "0.2.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6159ab4116165c99fc88cce31f99fa2c9dbe08d3691cb38da02fc3b45f357d2b" +dependencies = [ + "test-log-macros", + "tracing-subscriber", +] + +[[package]] +name = "test-log-macros" +version = "0.2.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7ba277e77219e9eea169e8508942db1bf5d8a41ff2db9b20aab5a5aadc9fa25d" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.28", +] + [[package]] name = "test-strategy" version = "0.3.1" @@ -1978,27 +1995,45 @@ dependencies = [ [[package]] name = "tracing-log" -version = "0.1.3" +version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "78ddad33d2d10b1ed7eb9d1f518a5674713876e97e5bb9b7345a7984fbb4f922" +checksum = "ee855f1f400bd0e5c02d150ae5de3840039a3f54b025156404e34c23c03f47c3" dependencies = [ - "lazy_static", "log", + "once_cell", + "tracing-core", +] + +[[package]] +name = "tracing-serde" +version = "0.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bc6b213177105856957181934e4920de57730fc69bf42c37ee5bb664d406d9e1" +dependencies = [ + "serde", "tracing-core", ] [[package]] name = "tracing-subscriber" -version = "0.3.17" +version = "0.3.18" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "30a651bc37f915e81f087d86e62a18eec5f79550c7faff886f7090b4ea757c77" +checksum = "ad0f048c97dbd9faa9b7df56362b8ebcaa52adb06b498c050d2f4e32f90a7a8b" dependencies = [ + "matchers", "nu-ansi-term", + "once_cell", + "parking_lot", + "regex", + "serde", + "serde_json", "sharded-slab", "smallvec", "thread_local", + "tracing", "tracing-core", "tracing-log", + "tracing-serde", ] [[package]] diff --git a/car-mirror/Cargo.toml b/car-mirror/Cargo.toml index 510354e..127321f 100644 --- a/car-mirror/Cargo.toml +++ b/car-mirror/Cargo.toml @@ -24,7 +24,6 @@ doc = true [dependencies] anyhow = "1.0" -async-stream = "0.3.5" async-trait = "0.1.73" bytes = "1.4" deterministic-bloom = "0.1" @@ -40,7 +39,6 @@ serde_ipld_dagcbor = "0.4" thiserror = "1.0" tokio = { version = "^1", default-features = false } tracing = "0.1" -tracing-subscriber = "0.3" wnfs-common = "0.1.26" [dev-dependencies] diff --git a/car-mirror/src/common.rs b/car-mirror/src/common.rs index ffbd2be..c6dd370 100644 --- a/car-mirror/src/common.rs +++ b/car-mirror/src/common.rs @@ -2,13 +2,16 @@ use bytes::Bytes; use deterministic_bloom::runtime_size::BloomFilter; -use futures::TryStreamExt; +use futures::{future, TryStreamExt}; use iroh_car::{CarHeader, CarReader, CarWriter}; use libipld::{Ipld, IpldCodec}; use libipld_core::{cid::Cid, codec::References}; use std::io::Cursor; use tracing::{debug, instrument, trace, warn}; -use wnfs_common::BlockStore; +use wnfs_common::{ + utils::{BoxStream, CondSend}, + BlockStore, +}; use crate::{ dag_walk::DagWalk, @@ -25,16 +28,12 @@ use crate::{ /// Configuration values (such as byte limits) for the CAR mirror protocol #[derive(Clone, Debug)] pub struct Config { - /// A client will try to send at least `send_minimum` bytes of block data - /// in each request, except if close to the end of the protocol (when there's) - /// not that much data left. - pub send_minimum: usize, - /// The maximum number of bytes per request that the server accepts. + /// The maximum number of bytes per request that a recipient should. pub receive_maximum: usize, - /// The maximum number of roots per request that the server will send to the client, - /// and that the client will consume. + /// The maximum number of roots per request that will be requested by the recipient + /// to be sent by the sender. pub max_roots_per_round: usize, - /// The target false positive rate for the bloom filter that the server sends. + /// The target false positive rate for the bloom filter that the recipient sends. pub bloom_fpr: fn(u64) -> f64, } @@ -56,6 +55,9 @@ pub struct CarFile { pub bytes: Bytes, } +/// TODO(matheus23): Docs +pub type BlockStream<'a> = BoxStream<'a, Result<(Cid, Bytes), Error>>; + //-------------------------------------------------------------------------------------------------- // Functions //-------------------------------------------------------------------------------------------------- @@ -75,8 +77,38 @@ pub async fn block_send( store: &impl BlockStore, cache: &impl Cache, ) -> Result { + let mut block_stream = block_send_block_stream(root, last_state, store, cache).await?; + + let bytes = + write_blocks_into_car(Vec::new(), &mut block_stream, Some(config.receive_maximum)).await?; + + Ok(CarFile { + bytes: bytes.into(), + }) +} + +/// TODO(matheus23): Docs +#[instrument(skip_all, fields(root, last_state))] +pub async fn block_send_car_stream<'a, W: tokio::io::AsyncWrite + Unpin + Send>( + root: Cid, + last_state: Option, + stream: W, + store: &impl BlockStore, + cache: &impl Cache, +) -> Result { + let mut block_stream = block_send_block_stream(root, last_state, store, cache).await?; + write_blocks_into_car(stream, &mut block_stream, None).await +} + +/// TODO(matheus23): Docs +pub async fn block_send_block_stream<'a>( + root: Cid, + last_state: Option, + store: &'a impl BlockStore, + cache: &'a impl Cache, +) -> Result, Error> { let ReceiverState { - ref missing_subgraph_roots, + missing_subgraph_roots, have_cids_bloom, } = last_state.unwrap_or(ReceiverState { missing_subgraph_roots: vec![root], @@ -85,37 +117,13 @@ pub async fn block_send( // Verify that all missing subgraph roots are in the relevant DAG: let subgraph_roots = - verify_missing_subgraph_roots(root, missing_subgraph_roots, store, cache).await?; + verify_missing_subgraph_roots(root, &missing_subgraph_roots, store, cache).await?; let bloom = handle_missing_bloom(have_cids_bloom); - let mut writer = CarWriter::new( - CarHeader::new_v1( - // https://github.com/wnfs-wg/car-mirror-spec/issues/6 - // CAR files *must* have at least one CID in them, and all of them - // need to appear as a block in the payload. - // It would probably make most sense to just write all subgraph roots into this, - // but we don't know how many of the subgraph roots fit into this round yet, - // so we're simply writing the first one in here, since we know - // at least one block will be written (and it'll be that one). - subgraph_roots.iter().take(1).cloned().collect(), - ), - Vec::new(), - ); - - write_blocks_into_car( - &mut writer, - subgraph_roots, - &bloom, - config.send_minimum, - store, - cache, - ) - .await?; + let stream = stream_blocks_from_roots(subgraph_roots, bloom, store, cache); - Ok(CarFile { - bytes: writer.finish().await?.into(), - }) + Ok(Box::pin(stream)) } /// This function is run on the block receiving end of the protocol. @@ -149,51 +157,70 @@ pub async fn block_receive( .await?; } - let missing_subgraph_roots = dag_verification - .want_cids - .iter() - .take(config.max_roots_per_round) - .cloned() - .collect(); + let mut receiver_state = dag_verification.into_receiver_state(config.bloom_fpr); - let bloom_capacity = dag_verification.have_cids.len() as u64; + receiver_state + .missing_subgraph_roots + .truncate(config.max_roots_per_round); - if bloom_capacity == 0 { - return Ok(ReceiverState { - missing_subgraph_roots, - have_cids_bloom: None, - }); - } + Ok(receiver_state) +} - if missing_subgraph_roots.is_empty() { - // We're done. No need to compute a bloom. - return Ok(ReceiverState { - missing_subgraph_roots, - have_cids_bloom: None, - }); - } +/// TODO(matheus23): Docs +#[instrument(skip_all, fields(root))] +pub async fn block_receive_car_stream( + root: Cid, + reader: R, + config: &Config, + store: &impl BlockStore, + cache: &impl Cache, +) -> Result { + let reader = CarReader::new(reader).await?; - let mut bloom = - BloomFilter::new_from_fpr_po2(bloom_capacity, (config.bloom_fpr)(bloom_capacity)); - - dag_verification - .have_cids - .iter() - .for_each(|cid| bloom.insert(&cid.to_bytes())); - - debug!( - inserted_elements = bloom_capacity, - size_bits = bloom.as_bytes().len() * 8, - hash_count = bloom.hash_count(), - ones_count = bloom.count_ones(), - estimated_fpr = bloom.current_false_positive_rate(), - "built 'have cids' bloom", + let mut stream: BlockStream<'_> = Box::pin( + reader + .stream() + .map_ok(|(cid, bytes)| (cid, Bytes::from(bytes))) + .map_err(Error::CarFileError), ); - Ok(ReceiverState { - missing_subgraph_roots, - have_cids_bloom: Some(bloom), - }) + block_receive_block_stream(root, &mut stream, config, store, cache).await +} + +/// Consumes a stream of blocks, verifying their integrity and +/// making sure all blocks are part of the DAG. +pub async fn block_receive_block_stream( + root: Cid, + stream: &mut BlockStream<'_>, + config: &Config, + store: &impl BlockStore, + cache: &impl Cache, +) -> Result { + let mut dag_verification = IncrementalDagVerification::new([root], store, cache).await?; + + while let Some(block) = stream.try_next().await? { + match read_and_verify_block(&mut dag_verification, block, store, cache).await? { + BlockState::Have => { + // This can happen because we've just discovered a subgraph we already have. + // Let's update the endpoint with our new receiver state. + break; + } + BlockState::Unexpected => { + // We received a block out-of-order. This is weird, but can + // happen due to bloom filter false positives. + // Essentially, the sender could've skipped a block that was + // important for us to verify that further blocks are connected + // to the root. + // We should update the endpoint about the skipped block. + break; + } + BlockState::Want => { + // Perfect, we're just getting what we want. Let's continue! + } + } + } + + Ok(dag_verification.into_receiver_state(config.bloom_fpr)) } /// Find all CIDs that a block references. @@ -216,7 +243,7 @@ pub fn references>( } //-------------------------------------------------------------------------------------------------- -// Private Functions +// Private //-------------------------------------------------------------------------------------------------- async fn verify_missing_subgraph_roots( @@ -264,48 +291,77 @@ fn handle_missing_bloom(have_cids_bloom: Option) -> BloomFilter { have_cids_bloom.unwrap_or_else(|| BloomFilter::new_with(1, Box::new([0]))) // An empty bloom that contains nothing } -async fn write_blocks_into_car( - writer: &mut CarWriter, +fn stream_blocks_from_roots<'a>( subgraph_roots: Vec, - bloom: &BloomFilter, - send_minimum: usize, - store: &impl BlockStore, - cache: &impl Cache, -) -> Result<(), Error> { + bloom: BloomFilter, + store: &'a impl BlockStore, + cache: &'a impl Cache, +) -> BlockStream<'a> { + Box::pin( + DagWalk::breadth_first(subgraph_roots.clone()) + .stream(store, cache) + .try_filter(move |cid| { + future::ready(should_block_be_skipped(cid, &bloom, &subgraph_roots)) + }) + .and_then(move |cid| async move { + let bytes = store + .get_block(&cid) + .await + .map_err(Error::BlockStoreError)?; + Ok((cid, bytes)) + }), + ) +} + +async fn write_blocks_into_car( + write: W, + blocks: &mut BlockStream<'_>, + receive_limit: Option, +) -> Result { let mut block_bytes = 0; - let mut dag_walk = DagWalk::breadth_first(subgraph_roots.clone()); - while let Some(cid) = dag_walk.next(store, cache).await? { - let block = store - .get_block(&cid) - .await - .map_err(Error::BlockStoreError)?; + // https://github.com/wnfs-wg/car-mirror-spec/issues/6 + // CAR files *must* have at least one CID in them, and all of them + // need to appear as a block in the payload. + // It would probably make most sense to just write all subgraph roots into this, + // but we don't know how many of the subgraph roots fit into this round yet, + // so we're simply writing the first one in here, since we know + // at least one block will be written (and it'll be that one). + let Some((cid, block)) = blocks.try_next().await? else { + debug!("No blocks to write."); + return Ok(write); + }; - if bloom.contains(&cid.to_bytes()) && !subgraph_roots.contains(&cid) { - debug!( - cid = %cid, - bloom_contains = bloom.contains(&cid.to_bytes()), - subgraph_roots_contains = subgraph_roots.contains(&cid), - "skipped writing block" - ); - continue; - } + let mut writer = CarWriter::new(CarHeader::new_v1(vec![cid]), write); + + block_bytes += writer.write(cid, block).await?; + while let Some((cid, block)) = blocks.try_next().await? { debug!( cid = %cid, num_bytes = block.len(), - frontier_size = dag_walk.frontier.len(), "writing block to CAR", ); - block_bytes += writer.write(cid, &block).await?; + // Let's be conservative, assume a 64-byte CID (usually ~40 byte) + // and a 4-byte frame size varint (3 byte would be enough for an 8MiB frame). + let added_bytes = 64 + 4 + block.len(); - if block_bytes > send_minimum { - break; + if let Some(receive_limit) = receive_limit { + if block_bytes + added_bytes > receive_limit { + debug!(%cid, receive_limit, block_bytes, added_bytes, "Skipping block because it would go over the receive limit"); + break; + } } + + block_bytes += writer.write(cid, &block).await?; } - Ok(()) + Ok(writer.finish().await?) +} + +fn should_block_be_skipped(cid: &Cid, bloom: &BloomFilter, subgraph_roots: &[Cid]) -> bool { + bloom.contains(&cid.to_bytes()) && !subgraph_roots.contains(cid) } async fn read_and_verify_blocks( @@ -333,26 +389,45 @@ async fn read_and_verify_blocks( }); } - match dag_verification.block_state(cid) { - BlockState::Have => continue, - BlockState::Unexpected => { - trace!( - cid = %cid, - "received block out of order (possibly due to bloom false positive)" - ); - break; - } - BlockState::Want => { - dag_verification - .verify_and_store_block((cid, block), store, cache) - .await?; - } + let block_state = + read_and_verify_block(dag_verification, (cid, block), store, cache).await?; + + if matches!(block_state, BlockState::Unexpected) { + break; } } Ok(()) } +/// Returns whether to continue receiving blocks. +/// +/// Only returns false when a block was received out of order +/// (or perhaps because a block was skipped due to a bloom filter false positive). +async fn read_and_verify_block( + dag_verification: &mut IncrementalDagVerification, + (cid, block): (Cid, Bytes), + store: &impl BlockStore, + cache: &impl Cache, +) -> Result { + match dag_verification.block_state(cid) { + BlockState::Have => Ok(BlockState::Have), + BlockState::Unexpected => { + trace!( + cid = %cid, + "received block out of order (possibly due to bloom false positive)" + ); + Ok(BlockState::Unexpected) + } + BlockState::Want => { + dag_verification + .verify_and_store_block((cid, block), store, cache) + .await?; + Ok(BlockState::Want) + } + } +} + //-------------------------------------------------------------------------------------------------- // Implementations //-------------------------------------------------------------------------------------------------- @@ -443,9 +518,8 @@ impl ReceiverState { impl Default for Config { fn default() -> Self { Self { - send_minimum: 128 * 1024, // 128KiB - receive_maximum: 512 * 1024, // 512KiB - max_roots_per_round: 1000, // max. ~41KB of CIDs + receive_maximum: 2_000_000, // 2 MB + max_roots_per_round: 1000, // max. ~41KB of CIDs bloom_fpr: |num_of_elems| f64::min(0.001, 0.1 / num_of_elems as f64), } } diff --git a/car-mirror/src/incremental_verification.rs b/car-mirror/src/incremental_verification.rs index 215887b..ffcb122 100644 --- a/car-mirror/src/incremental_verification.rs +++ b/car-mirror/src/incremental_verification.rs @@ -1,11 +1,13 @@ #![allow(unknown_lints)] // Because the `instrument` macro contains some `#[allow]`s that rust 1.66 doesn't know yet. use crate::{ + common::ReceiverState, dag_walk::DagWalk, error::{Error, IncrementalVerificationError}, traits::Cache, }; use bytes::Bytes; +use deterministic_bloom::runtime_size::BloomFilter; use libipld_core::{ cid::Cid, multihash::{Code, MultihashDigest}, @@ -197,4 +199,47 @@ impl IncrementalDagVerification { Ok(()) } + + /// Computes the receiver state for the current incremental dag verification state. + /// This takes the have CIDs and turns them into + pub fn into_receiver_state(self, bloom_fpr: fn(u64) -> f64) -> ReceiverState { + let missing_subgraph_roots = self.want_cids.into_iter().collect(); + + let bloom_capacity = self.have_cids.len() as u64; + + if bloom_capacity == 0 { + return ReceiverState { + missing_subgraph_roots, + have_cids_bloom: None, + }; + } + + if missing_subgraph_roots.is_empty() { + // We're done. No need to compute a bloom. + return ReceiverState { + missing_subgraph_roots, + have_cids_bloom: None, + }; + } + + let mut bloom = BloomFilter::new_from_fpr_po2(bloom_capacity, bloom_fpr(bloom_capacity)); + + self.have_cids + .into_iter() + .for_each(|cid| bloom.insert(&cid.to_bytes())); + + tracing::debug!( + inserted_elements = bloom_capacity, + size_bits = bloom.as_bytes().len() * 8, + hash_count = bloom.hash_count(), + ones_count = bloom.count_ones(), + estimated_fpr = bloom.current_false_positive_rate(), + "built 'have cids' bloom", + ); + + ReceiverState { + missing_subgraph_roots, + have_cids_bloom: Some(bloom), + } + } } From 7c330fdecb9e12db086768ed169809c03c9c2d84 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philipp=20Kr=C3=BCger?= Date: Wed, 3 Jan 2024 12:48:24 +0100 Subject: [PATCH 2/6] fix: Good 'ol 1-character bug --- car-mirror/src/common.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/car-mirror/src/common.rs b/car-mirror/src/common.rs index c6dd370..1a6d154 100644 --- a/car-mirror/src/common.rs +++ b/car-mirror/src/common.rs @@ -301,7 +301,7 @@ fn stream_blocks_from_roots<'a>( DagWalk::breadth_first(subgraph_roots.clone()) .stream(store, cache) .try_filter(move |cid| { - future::ready(should_block_be_skipped(cid, &bloom, &subgraph_roots)) + future::ready(!should_block_be_skipped(cid, &bloom, &subgraph_roots)) }) .and_then(move |cid| async move { let bytes = store From 956a5c0d6da42f53e2c42f9c344f4468d5a65511 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philipp=20Kr=C3=BCger?= Date: Wed, 3 Jan 2024 12:51:22 +0100 Subject: [PATCH 3/6] chore: Switch tests to use `TestResult` and `test_log` --- car-mirror/Cargo.toml | 4 +++- car-mirror/src/dag_walk.rs | 6 +++--- car-mirror/src/pull.rs | 5 +++-- car-mirror/src/push.rs | 13 +++++++------ car-mirror/src/traits.rs | 12 ++++++------ 5 files changed, 22 insertions(+), 18 deletions(-) diff --git a/car-mirror/Cargo.toml b/car-mirror/Cargo.toml index 127321f..1d20cee 100644 --- a/car-mirror/Cargo.toml +++ b/car-mirror/Cargo.toml @@ -46,8 +46,10 @@ async-std = { version = "1.11", features = ["attributes"] } car-mirror = { path = ".", features = ["test_utils"] } proptest = "1.1" roaring-graphs = "0.12" +test-log = { version = "0.2", default-features = false, features = ["trace"] } test-strategy = "0.3" -testresult = "0.3.0" +testresult = "0.3" +tracing-subscriber = { version = "0.3", features = ["env-filter", "json", "parking_lot", "registry"] } [features] default = [] diff --git a/car-mirror/src/dag_walk.rs b/car-mirror/src/dag_walk.rs index 780c8f7..51ec790 100644 --- a/car-mirror/src/dag_walk.rs +++ b/car-mirror/src/dag_walk.rs @@ -130,13 +130,13 @@ impl DagWalk { mod tests { use super::*; use crate::traits::NoCache; - use anyhow::Result; use futures::TryStreamExt; use libipld::Ipld; + use testresult::TestResult; use wnfs_common::MemoryBlockStore; - #[async_std::test] - async fn test_walk_dag_breadth_first() -> Result<()> { + #[test_log::test(async_std::test)] + async fn test_walk_dag_breadth_first() -> TestResult { let store = &MemoryBlockStore::new(); // cid_root ---> cid_1_wrap ---> cid_1 diff --git a/car-mirror/src/pull.rs b/car-mirror/src/pull.rs index 1991a90..d13cfee 100644 --- a/car-mirror/src/pull.rs +++ b/car-mirror/src/pull.rs @@ -54,6 +54,7 @@ mod tests { use futures::TryStreamExt; use libipld::Cid; use std::collections::HashSet; + use testresult::TestResult; use wnfs_common::{BlockStore, MemoryBlockStore}; pub(crate) async fn simulate_protocol( @@ -85,8 +86,8 @@ mod tests { Ok(metrics) } - #[async_std::test] - async fn test_transfer() -> Result<()> { + #[test_log::test(async_std::test)] + async fn test_transfer() -> TestResult { let client_store = &MemoryBlockStore::new(); let (root, ref server_store) = setup_random_dag(256, 10 * 1024 /* 10 KiB */).await?; diff --git a/car-mirror/src/push.rs b/car-mirror/src/push.rs index c2c81ac..f4e36bc 100644 --- a/car-mirror/src/push.rs +++ b/car-mirror/src/push.rs @@ -64,6 +64,7 @@ mod tests { use libipld::Cid; use proptest::collection::vec; use std::collections::HashSet; + use testresult::TestResult; use wnfs_common::{BlockStore, MemoryBlockStore}; pub(crate) async fn simulate_protocol( @@ -95,8 +96,8 @@ mod tests { Ok(metrics) } - #[async_std::test] - async fn test_transfer() -> Result<()> { + #[test_log::test(async_std::test)] + async fn test_transfer() -> TestResult { let (root, ref client_store) = setup_random_dag(256, 10 * 1024 /* 10 KiB */).await?; let server_store = &MemoryBlockStore::new(); simulate_protocol(root, &Config::default(), client_store, server_store).await?; @@ -116,8 +117,8 @@ mod tests { Ok(()) } - #[async_std::test] - async fn test_deduplicating_transfer() -> Result<()> { + #[test_log::test(async_std::test)] + async fn test_deduplicating_transfer() -> TestResult { let (root, ref client_store) = setup_random_dag(256, 10 * 1024 /* 10 KiB */).await?; let total_bytes = total_dag_bytes(root, client_store).await?; let path = Rvg::new().sample(&vec(0usize..128, 0..64)); @@ -140,8 +141,8 @@ mod tests { Ok(()) } - #[async_std::test] - async fn print_metrics() -> Result<()> { + #[test_log::test(async_std::test)] + async fn print_metrics() -> TestResult { const TESTS: usize = 200; const DAG_SIZE: u16 = 256; const BLOCK_PADDING: usize = 10 * 1024; diff --git a/car-mirror/src/traits.rs b/car-mirror/src/traits.rs index 49f8931..67eca42 100644 --- a/car-mirror/src/traits.rs +++ b/car-mirror/src/traits.rs @@ -191,7 +191,7 @@ mod quick_cache_tests { use testresult::TestResult; use wnfs_common::{BlockStore, MemoryBlockStore}; - #[async_std::test] + #[test_log::test(async_std::test)] async fn test_has_block_cache() -> TestResult { let store = &MemoryBlockStore::new(); let cache = InMemoryCache::new(10_000, 150_000); @@ -212,7 +212,7 @@ mod quick_cache_tests { Ok(()) } - #[async_std::test] + #[test_log::test(async_std::test)] async fn test_references_cache() -> TestResult { let store = &MemoryBlockStore::new(); let cache = InMemoryCache::new(10_000, 150_000); @@ -289,7 +289,7 @@ mod tests { } } - #[async_std::test] + #[test_log::test(async_std::test)] async fn test_has_block_cache() -> TestResult { let store = &MemoryBlockStore::new(); let cache = HashMapCache::default(); @@ -310,7 +310,7 @@ mod tests { Ok(()) } - #[async_std::test] + #[test_log::test(async_std::test)] async fn test_references_cache() -> TestResult { let store = &MemoryBlockStore::new(); let cache = HashMapCache::default(); @@ -346,7 +346,7 @@ mod tests { Ok(()) } - #[async_std::test] + #[test_log::test(async_std::test)] async fn test_no_cache_has_block() -> TestResult { let store = &MemoryBlockStore::new(); let cache = NoCache; @@ -372,7 +372,7 @@ mod tests { Ok(()) } - #[async_std::test] + #[test_log::test(async_std::test)] async fn test_no_cache_references() -> TestResult { let store = &MemoryBlockStore::new(); let cache = NoCache; From ec0a4d5665e22ee61372bd687bd54e5739451c76 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philipp=20Kr=C3=BCger?= Date: Wed, 3 Jan 2024 16:24:16 +0100 Subject: [PATCH 4/6] feat: Add useful trace msgs --- car-mirror/src/common.rs | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/car-mirror/src/common.rs b/car-mirror/src/common.rs index 1a6d154..2b8efa6 100644 --- a/car-mirror/src/common.rs +++ b/car-mirror/src/common.rs @@ -198,11 +198,12 @@ pub async fn block_receive_block_stream( ) -> Result { let mut dag_verification = IncrementalDagVerification::new([root], store, cache).await?; - while let Some(block) = stream.try_next().await? { - match read_and_verify_block(&mut dag_verification, block, store, cache).await? { + while let Some((cid, block)) = stream.try_next().await? { + match read_and_verify_block(&mut dag_verification, (cid, block), store, cache).await? { BlockState::Have => { // This can happen because we've just discovered a subgraph we already have. // Let's update the endpoint with our new receiver state. + tracing::debug!(%cid, "Received block we already have, stopping transfer"); break; } BlockState::Unexpected => { @@ -212,6 +213,7 @@ pub async fn block_receive_block_stream( // important for us to verify that further blocks are connected // to the root. // We should update the endpoint about the skipped block. + tracing::debug!(%cid, "Received block out of order, stopping transfer"); break; } BlockState::Want => { From e1968464bf856bd335e46adff1ad39d5c1fb3ccf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philipp=20Kr=C3=BCger?= Date: Tue, 23 Jan 2024 18:10:51 +0100 Subject: [PATCH 5/6] refactor: Re-use streaming impl for non-streaming impl --- car-mirror/src/common.rs | 93 +++++++++++++--------------------------- 1 file changed, 30 insertions(+), 63 deletions(-) diff --git a/car-mirror/src/common.rs b/car-mirror/src/common.rs index 2b8efa6..7436797 100644 --- a/car-mirror/src/common.rs +++ b/car-mirror/src/common.rs @@ -55,7 +55,7 @@ pub struct CarFile { pub bytes: Bytes, } -/// TODO(matheus23): Docs +/// A stream of blocks. This requires the underlying futures to be `Send`, except for the `wasm32` target. pub type BlockStream<'a> = BoxStream<'a, Result<(Cid, Bytes), Error>>; //-------------------------------------------------------------------------------------------------- @@ -77,10 +77,15 @@ pub async fn block_send( store: &impl BlockStore, cache: &impl Cache, ) -> Result { - let mut block_stream = block_send_block_stream(root, last_state, store, cache).await?; - - let bytes = - write_blocks_into_car(Vec::new(), &mut block_stream, Some(config.receive_maximum)).await?; + let bytes = block_send_car_stream( + root, + last_state, + Vec::new(), + Some(config.receive_maximum), + store, + cache, + ) + .await?; Ok(CarFile { bytes: bytes.into(), @@ -93,11 +98,12 @@ pub async fn block_send_car_stream<'a, W: tokio::io::AsyncWrite + Unpin + Send>( root: Cid, last_state: Option, stream: W, + send_limit: Option, store: &impl BlockStore, cache: &impl Cache, ) -> Result { let mut block_stream = block_send_block_stream(root, last_state, store, cache).await?; - write_blocks_into_car(stream, &mut block_stream, None).await + write_blocks_into_car(stream, &mut block_stream, send_limit).await } /// TODO(matheus23): Docs @@ -142,22 +148,21 @@ pub async fn block_receive( store: &impl BlockStore, cache: &impl Cache, ) -> Result { - let mut dag_verification = IncrementalDagVerification::new([root], store, cache).await?; - - if let Some(car) = last_car { - let mut reader = CarReader::new(Cursor::new(car.bytes)).await?; - - read_and_verify_blocks( - &mut dag_verification, - &mut reader, - config.receive_maximum, - store, - cache, - ) - .await?; - } + let mut receiver_state = match last_car { + Some(car) => { + if car.bytes.len() > config.receive_maximum { + return Err(Error::TooManyBytes { + receive_maximum: config.receive_maximum, + bytes_read: car.bytes.len(), + }); + } - let mut receiver_state = dag_verification.into_receiver_state(config.bloom_fpr); + block_receive_car_stream(root, Cursor::new(car.bytes), config, store, cache).await? + } + None => IncrementalDagVerification::new([root], store, cache) + .await? + .into_receiver_state(config.bloom_fpr), + }; receiver_state .missing_subgraph_roots @@ -318,7 +323,7 @@ fn stream_blocks_from_roots<'a>( async fn write_blocks_into_car( write: W, blocks: &mut BlockStream<'_>, - receive_limit: Option, + size_limit: Option, ) -> Result { let mut block_bytes = 0; @@ -349,7 +354,7 @@ async fn write_blocks_into_car( // and a 4-byte frame size varint (3 byte would be enough for an 8MiB frame). let added_bytes = 64 + 4 + block.len(); - if let Some(receive_limit) = receive_limit { + if let Some(receive_limit) = size_limit { if block_bytes + added_bytes > receive_limit { debug!(%cid, receive_limit, block_bytes, added_bytes, "Skipping block because it would go over the receive limit"); break; @@ -366,46 +371,8 @@ fn should_block_be_skipped(cid: &Cid, bloom: &BloomFilter, subgraph_roots: &[Cid bloom.contains(&cid.to_bytes()) && !subgraph_roots.contains(cid) } -async fn read_and_verify_blocks( - dag_verification: &mut IncrementalDagVerification, - reader: &mut CarReader, - receive_maximum: usize, - store: &impl BlockStore, - cache: &impl Cache, -) -> Result<(), Error> { - let mut bytes_read = 0; - while let Some((cid, vec)) = reader.next_block().await? { - let block = Bytes::from(vec); - - debug!( - cid = %cid, - num_bytes = block.len(), - "reading block from CAR", - ); - - bytes_read += block.len(); - if bytes_read > receive_maximum { - return Err(Error::TooManyBytes { - bytes_read, - receive_maximum, - }); - } - - let block_state = - read_and_verify_block(dag_verification, (cid, block), store, cache).await?; - - if matches!(block_state, BlockState::Unexpected) { - break; - } - } - - Ok(()) -} - -/// Returns whether to continue receiving blocks. -/// -/// Only returns false when a block was received out of order -/// (or perhaps because a block was skipped due to a bloom filter false positive). +/// Takes a block and stores it iff it's one of the blocks we're currently trying to retrieve. +/// Returns the block state of the received block. async fn read_and_verify_block( dag_verification: &mut IncrementalDagVerification, (cid, block): (Cid, Bytes), From 295b8b3b49bc95fc839c853ced1ed792f0b32024 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philipp=20Kr=C3=BCger?= Date: Wed, 24 Jan 2024 11:26:35 +0100 Subject: [PATCH 6/6] chore: Write docs --- car-mirror/src/common.rs | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/car-mirror/src/common.rs b/car-mirror/src/common.rs index 7436797..2bcc539 100644 --- a/car-mirror/src/common.rs +++ b/car-mirror/src/common.rs @@ -92,7 +92,9 @@ pub async fn block_send( }) } -/// TODO(matheus23): Docs +/// This is the streaming equivalent of `block_send`. +/// +/// It uses the car file format for framing blocks & CIDs in the given `AsyncWrite`. #[instrument(skip_all, fields(root, last_state))] pub async fn block_send_car_stream<'a, W: tokio::io::AsyncWrite + Unpin + Send>( root: Cid, @@ -106,7 +108,8 @@ pub async fn block_send_car_stream<'a, W: tokio::io::AsyncWrite + Unpin + Send>( write_blocks_into_car(stream, &mut block_stream, send_limit).await } -/// TODO(matheus23): Docs +/// This is the car mirror block sending function, but unlike `block_send_car_stream` +/// it leaves framing blocks to the caller. pub async fn block_send_block_stream<'a>( root: Cid, last_state: Option, @@ -171,7 +174,7 @@ pub async fn block_receive( Ok(receiver_state) } -/// TODO(matheus23): Docs +/// Like `block_receive`, but allows consuming the CAR file as a stream. #[instrument(skip_all, fields(root))] pub async fn block_receive_car_stream( root: Cid,