diff --git a/Cargo.lock b/Cargo.lock index 5731461..1cf4a65 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -149,9 +149,9 @@ dependencies = [ [[package]] name = "async-once-cell" -version = "0.4.4" +version = "0.5.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5b49bd4c5b769125ea6323601c39815848972880efd33ffb2d01f9f909adc699" +checksum = "9338790e78aa95a416786ec8389546c4b6a1dfc3dc36071ed9518a9413a542eb" [[package]] name = "async-std" @@ -1002,9 +1002,9 @@ dependencies = [ [[package]] name = "iroh-car" -version = "0.3.0" +version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0a291220adb48738bdea587156c5f44ca5ec4ad31fdeb8fb88fda1dcd7886a24" +checksum = "475a6f0ebd64c87ea011021c67f10b57930f6c286e0163807066bfb83553b1b6" dependencies = [ "anyhow", "cid", @@ -2275,9 +2275,9 @@ checksum = "1a515f5799fe4961cb532f983ce2b23082366b898e52ffbce459c86f67c8378a" [[package]] name = "wnfs-common" -version = "0.1.23" +version = "0.1.24" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5dfcb4584f3866ead49adae8c05cec6f633139d19283448aa7807280612e24b7" +checksum = "0e7dd203b73bbbbbf175a8a733ef6aa843f020095f5f4d1e6cd3b7fdce8ba4d8" dependencies = [ "anyhow", "async-once-cell", diff --git a/car-mirror/Cargo.toml b/car-mirror/Cargo.toml index 7b8ed45..817ca1d 100644 --- a/car-mirror/Cargo.toml +++ b/car-mirror/Cargo.toml @@ -26,22 +26,22 @@ doc = true anyhow = "1.0" async-stream = "0.3.5" async-trait = "0.1.73" -bytes = "1.4.0" +bytes = "1.4" deterministic-bloom = "0.1" -futures = "0.3.28" -iroh-car = "0.3.0" -libipld = "0.16.0" -libipld-core = "0.16.0" +futures = "0.3" +iroh-car = "0.4" +libipld = "0.16" +libipld-core = "0.16" proptest = { version = "1.1", optional = true } -quick_cache = { version = "0.4.0", optional = true } +quick_cache = { version = "0.4", optional = true } roaring-graphs = { version = "0.12", optional = true } -serde = "1.0.183" -serde_ipld_dagcbor = "0.4.0" -thiserror = "1.0.47" +serde = "^1" +serde_ipld_dagcbor = "0.4" +thiserror = "1.0" tokio = { version = "^1", default-features = false } tracing = "0.1" tracing-subscriber = "0.3" -wnfs-common = "0.1.23" +wnfs-common = "0.1.24" [dev-dependencies] async-std = { version = "1.11", features = ["attributes"] } diff --git a/car-mirror/src/common.rs b/car-mirror/src/common.rs index a5b1cb2..20c2054 100644 --- a/car-mirror/src/common.rs +++ b/car-mirror/src/common.rs @@ -1,6 +1,5 @@ #![allow(unknown_lints)] // Because the `instrument` macro contains some `#[allow]`s that rust 1.66 doesn't know yet. -use anyhow::anyhow; use bytes::Bytes; use deterministic_bloom::runtime_size::BloomFilter; use futures::TryStreamExt; @@ -104,11 +103,6 @@ pub async fn block_send( Vec::new(), ); - writer - .write_header() - .await - .map_err(|e| Error::CarFileError(anyhow!(e)))?; - write_blocks_into_car( &mut writer, subgraph_roots, @@ -120,11 +114,7 @@ pub async fn block_send( .await?; Ok(CarFile { - bytes: writer - .finish() - .await - .map_err(|e| Error::CarFileError(anyhow!(e)))? - .into(), + bytes: writer.finish().await?.into(), }) } @@ -147,9 +137,7 @@ pub async fn block_receive( let mut dag_verification = IncrementalDagVerification::new([root], store, cache).await?; if let Some(car) = last_car { - let mut reader = CarReader::new(Cursor::new(car.bytes)) - .await - .map_err(|e| Error::CarFileError(anyhow!(e)))?; + let mut reader = CarReader::new(Cursor::new(car.bytes)).await?; read_and_verify_blocks( &mut dag_verification, @@ -310,14 +298,8 @@ async fn write_blocks_into_car( "writing block to CAR", ); - writer - .write(cid, &block) - .await - .map_err(|e| Error::CarFileError(anyhow!(e)))?; + block_bytes += writer.write(cid, &block).await?; - // TODO(matheus23): Count the actual bytes sent? - // At the moment, this is a rough estimate. iroh-car could be improved to return the written bytes. - block_bytes += block.len(); if block_bytes > send_minimum { break; } @@ -333,12 +315,8 @@ async fn read_and_verify_blocks( store: &impl BlockStore, cache: &impl Cache, ) -> Result<(), Error> { - let mut block_bytes = 0; - while let Some((cid, vec)) = reader - .next_block() - .await - .map_err(|e| Error::CarFileError(anyhow!(e)))? - { + let mut bytes_read = 0; + while let Some((cid, vec)) = reader.next_block().await? { let block = Bytes::from(vec); debug!( @@ -347,10 +325,10 @@ async fn read_and_verify_blocks( "reading block from CAR", ); - block_bytes += block.len(); - if block_bytes > receive_maximum { + bytes_read += block.len(); + if bytes_read > receive_maximum { return Err(Error::TooManyBytes { - block_bytes, + bytes_read, receive_maximum, }); } diff --git a/car-mirror/src/dag_walk.rs b/car-mirror/src/dag_walk.rs index 3bc607f..780c8f7 100644 --- a/car-mirror/src/dag_walk.rs +++ b/car-mirror/src/dag_walk.rs @@ -184,14 +184,14 @@ mod proptests { use proptest::strategy::Strategy; use std::collections::BTreeSet; use test_strategy::proptest; - use wnfs_common::{dagcbor::encode, BlockStore, MemoryBlockStore}; + use wnfs_common::{encode, BlockStore, MemoryBlockStore}; fn ipld_dags() -> impl Strategy, Cid)> { arb_ipld_dag(1..256, 0.5, |cids, _| { let ipld = Ipld::List(cids.into_iter().map(Ipld::Link).collect()); let cid = Cid::new_v1( IpldCodec::DagCbor.into(), - Code::Blake3_256.digest(&encode(&ipld).unwrap()), + Code::Blake3_256.digest(&encode(&ipld, IpldCodec::DagCbor).unwrap()), ); (cid, ipld) }) @@ -204,7 +204,7 @@ mod proptests { let store = &MemoryBlockStore::new(); for (cid, ipld) in dag.iter() { - let block: Bytes = encode(ipld).unwrap().into(); + let block: Bytes = encode(ipld, IpldCodec::DagCbor).unwrap().into(); let cid_store = store .put_block(block, IpldCodec::DagCbor.into()) .await diff --git a/car-mirror/src/error.rs b/car-mirror/src/error.rs index 1612f55..1dbc1ac 100644 --- a/car-mirror/src/error.rs +++ b/car-mirror/src/error.rs @@ -7,12 +7,12 @@ use crate::incremental_verification::BlockState; pub enum Error { /// An error raised during receival of blocks, when more than the configured maximum /// bytes are received in a single batch. See the `Config` type. - #[error("Received more than {receive_maximum} bytes ({block_bytes}), aborting request.")] + #[error("Expected to receive no more than {receive_maximum} bytes, but got at least {bytes_read}, aborting request.")] TooManyBytes { /// The configured amount of maximum bytes to receive receive_maximum: usize, /// The actual amount of bytes received so far - block_bytes: usize, + bytes_read: usize, }, /// This library only supports a subset of default codecs, including DAG-CBOR, DAG-JSON, DAG-PB and more.g @@ -50,10 +50,6 @@ pub enum Error { #[error("Error during block parsing: {0}")] ParsingError(anyhow::Error), - /// An error rasied when trying to read or write a CAR file. - #[error("CAR (de)serialization error: {0}")] - CarFileError(anyhow::Error), - /// An error rasied from the blockstore. #[error("BlockStore error: {0}")] BlockStoreError(anyhow::Error), @@ -64,6 +60,10 @@ pub enum Error { /// Errors related to incremental verification #[error(transparent)] IncrementalVerificationError(#[from] IncrementalVerificationError), + + /// An error rasied when trying to read or write a CAR file. + #[error("CAR (de)serialization error: {0}")] + CarFileError(#[from] iroh_car::Error), } /// Errors related to incremental verification diff --git a/car-mirror/src/test_utils/blockstore_utils.rs b/car-mirror/src/test_utils/blockstore_utils.rs index 3394271..ee34a99 100644 --- a/car-mirror/src/test_utils/blockstore_utils.rs +++ b/car-mirror/src/test_utils/blockstore_utils.rs @@ -3,7 +3,7 @@ use anyhow::Result; use bytes::Bytes; use libipld::{Cid, Ipld, IpldCodec}; use std::io::Write; -use wnfs_common::{dagcbor::encode, BlockStore, MemoryBlockStore}; +use wnfs_common::{encode, BlockStore, MemoryBlockStore}; /// Take a list of dag-cbor IPLD blocks and store all of them as dag-cbor in a /// MemoryBlockStore & return it. @@ -20,7 +20,7 @@ pub async fn setup_existing_blockstore( store: &impl BlockStore, ) -> Result<()> { for (cid, ipld) in blocks.into_iter() { - let block: Bytes = encode(&ipld)?.into(); + let block: Bytes = encode(&ipld, IpldCodec::DagCbor)?.into(); let cid_store = store.put_block(block, IpldCodec::DagCbor.into()).await?; debug_assert_eq!(cid, cid_store); } @@ -36,7 +36,7 @@ pub fn dag_to_dot( writeln!(writer, "digraph {{")?; for (cid, ipld) in blocks { - let bytes = encode(&ipld)?; + let bytes = encode(&ipld, IpldCodec::DagCbor)?; let refs = references(cid, bytes, Vec::new())?; for to_cid in refs { print_truncated_string(writer, cid.to_string())?; diff --git a/car-mirror/src/test_utils/dag_strategy.rs b/car-mirror/src/test_utils/dag_strategy.rs index 571d376..b98984e 100644 --- a/car-mirror/src/test_utils/dag_strategy.rs +++ b/car-mirror/src/test_utils/dag_strategy.rs @@ -8,7 +8,7 @@ use std::{ fmt::Debug, ops::Range, }; -use wnfs_common::dagcbor::encode; +use wnfs_common::encode; /// A strategy for use with proptest to generate random DAGs (directed acyclic graphs). /// The strategy generates a list of blocks of type T and their CIDs, as well as @@ -25,7 +25,7 @@ pub fn arb_ipld_dag( /// A block-generating function for use with `arb_ipld_dag`. pub fn links_to_ipld(cids: Vec, _: &mut TestRng) -> (Cid, Ipld) { let ipld = Ipld::List(cids.into_iter().map(Ipld::Link).collect()); - let bytes = encode(&ipld).unwrap(); + let bytes = encode(&ipld, IpldCodec::DagCbor).unwrap(); let cid = Cid::new_v1(IpldCodec::DagCbor.into(), Code::Blake3_256.digest(&bytes)); (cid, ipld) } @@ -33,7 +33,7 @@ pub fn links_to_ipld(cids: Vec, _: &mut TestRng) -> (Cid, Ipld) { /// A block-generating function for use with `arb_ipld_dag`. pub fn links_to_dag_cbor(cids: Vec, _: &mut TestRng) -> (Cid, Bytes) { let ipld = Ipld::List(cids.into_iter().map(Ipld::Link).collect()); - let bytes: Bytes = encode(&ipld).unwrap().into(); + let bytes: Bytes = encode(&ipld, IpldCodec::DagCbor).unwrap().into(); let cid = Cid::new_v1(IpldCodec::DagCbor.into(), Code::Blake3_256.digest(&bytes)); (cid, bytes) } @@ -58,7 +58,7 @@ pub fn links_to_padded_ipld( Ipld::List(cids.into_iter().map(Ipld::Link).collect()), ), ])); - let bytes = encode(&ipld).unwrap(); + let bytes = encode(&ipld, IpldCodec::DagCbor).unwrap(); let cid = Cid::new_v1(IpldCodec::DagCbor.into(), Code::Blake3_256.digest(&bytes)); (cid, ipld) }