From ca386907a258256baf9c25fac03d7fb9372381cd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philipp=20Kr=C3=BCger?= Date: Wed, 16 Aug 2023 11:27:30 +0200 Subject: [PATCH] Write a stream of blocks into a CAR file --- Cargo.lock | 113 ++++++++++++++++++++++++++++++++++++++++-- car-mirror/Cargo.toml | 4 ++ car-mirror/src/lib.rs | 53 ++++++++++++++++++-- 3 files changed, 163 insertions(+), 7 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 45b7775..d4cff6a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2,6 +2,21 @@ # It is not intended for manual editing. version = 3 +[[package]] +name = "addr2line" +version = "0.20.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f4fa78e18c64fce05e902adecd7a5eed15a5e0a3439f7b0e169f0252214865e3" +dependencies = [ + "gimli", +] + +[[package]] +name = "adler" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f26201604c87b1e01bd3d98f8d5d9a8fcbb815e8cedb41ffccbeb4bf593a35fe" + [[package]] name = "android-tzdata" version = "0.1.1" @@ -183,9 +198,9 @@ checksum = "ecc7ab41815b3c653ccd2978ec3255c81349336702dfdf62ee6f7069b12a3aae" [[package]] name = "async-trait" -version = "0.1.69" +version = "0.1.73" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7b2d0f03b3640e3a630367e40c468cb7f309529c708ed1d88597047b0e7c6ef7" +checksum = "bc00ceb34980c03614e35a3a4e218276a0a824e911d07651cd0d858a51e8c0f0" dependencies = [ "proc-macro2", "quote", @@ -215,6 +230,21 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa" +[[package]] +name = "backtrace" +version = "0.3.68" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4319208da049c43661739c5fade2ba182f09d1dc2299b32298d3a31692b17e12" +dependencies = [ + "addr2line", + "cc", + "cfg-if", + "libc", + "miniz_oxide", + "object", + "rustc-demangle", +] + [[package]] name = "base-x" version = "0.2.11" @@ -335,15 +365,19 @@ dependencies = [ "anyhow", "async-std", "async-stream", + "async-trait", "bytes", "car-mirror", "fixedbitset", "futures", + "iroh-car", "libipld", "libipld-core", "proptest", "roaring-graphs", "test-strategy", + "tokio", + "tokio-util", "tracing", "tracing-subscriber", "wnfs-common", @@ -798,6 +832,12 @@ dependencies = [ "wasi", ] +[[package]] +name = "gimli" +version = "0.27.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b6c80984affa11d98d1b88b66ac8853f143217b399d3c74116778ff8fdb4ed2e" + [[package]] name = "gloo-timers" version = "0.2.6" @@ -890,6 +930,21 @@ dependencies = [ "windows-sys", ] +[[package]] +name = "iroh-car" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0a291220adb48738bdea587156c5f44ca5ec4ad31fdeb8fb88fda1dcd7886a24" +dependencies = [ + "anyhow", + "cid", + "futures", + "libipld", + "thiserror", + "tokio", + "unsigned-varint", +] + [[package]] name = "itertools" version = "0.10.5" @@ -1061,6 +1116,15 @@ version = "2.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2dffe52ecf27772e601905b7522cb4ef790d2cc203488bbd0e2fe85fcb74566d" +[[package]] +name = "miniz_oxide" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e7810e0be55b428ada41041c41f32c9f1a42817901b4ccf45fa3d4b6561e74c7" +dependencies = [ + "adler", +] + [[package]] name = "multibase" version = "0.9.1" @@ -1125,6 +1189,15 @@ dependencies = [ "libm", ] +[[package]] +name = "object" +version = "0.31.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8bda667d9f2b5051b8833f59f3bf748b28ef54f850f4fcb389a252aa383866d1" +dependencies = [ + "memchr", +] + [[package]] name = "once_cell" version = "1.18.0" @@ -1269,9 +1342,9 @@ dependencies = [ [[package]] name = "quote" -version = "1.0.28" +version = "1.0.32" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1b9ab9c7eadfd8df19006f1cf1a4aed13540ed5cbc047010ece5826e10825488" +checksum = "50f3b39ccfb720540debaa0164757101c08ecb8d326b15358ce76a62c7e85965" dependencies = [ "proc-macro2", ] @@ -1374,6 +1447,12 @@ dependencies = [ "roaring", ] +[[package]] +name = "rustc-demangle" +version = "0.1.23" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d626bb9dae77e28219937af045c257c28bfd3f69333c512553507f5f9798cb76" + [[package]] name = "rustix" version = "0.37.20" @@ -1654,6 +1733,32 @@ dependencies = [ "serde_json", ] +[[package]] +name = "tokio" +version = "1.29.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "532826ff75199d5833b9d2c5fe410f29235e25704ee5f0ef599fb51c21f4a4da" +dependencies = [ + "autocfg", + "backtrace", + "bytes", + "pin-project-lite", +] + +[[package]] +name = "tokio-util" +version = "0.7.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "806fe8c2c87eccc8b3267cbae29ed3ab2d0bd37fca70ab622e46aaa9375ddb7d" +dependencies = [ + "bytes", + "futures-core", + "futures-io", + "futures-sink", + "pin-project-lite", + "tokio", +] + [[package]] name = "toml" version = "0.5.11" diff --git a/car-mirror/Cargo.toml b/car-mirror/Cargo.toml index 30f2028..5d7094f 100644 --- a/car-mirror/Cargo.toml +++ b/car-mirror/Cargo.toml @@ -28,13 +28,17 @@ async-stream = "0.3.5" bytes = "1.4.0" fixedbitset = "0.4.2" futures = "0.3.28" +iroh-car = "0.3.0" libipld = "0.16.0" libipld-core = "0.16.0" proptest = { version = "1.1", optional = true } roaring-graphs = "0.12" +tokio-util = { version = "0.7.8", features = ["compat"] } +tokio = { version = "^1", features = ["io-util"] } tracing = "0.1" tracing-subscriber = "0.3" wnfs-common = "0.1.23" +async-trait = "0.1.73" [dev-dependencies] async-std = { version = "1.11", features = ["attributes"] } diff --git a/car-mirror/src/lib.rs b/car-mirror/src/lib.rs index 533ffda..3ee79e2 100644 --- a/car-mirror/src/lib.rs +++ b/car-mirror/src/lib.rs @@ -7,13 +7,15 @@ use anyhow::Result; use async_stream::try_stream; use bytes::Bytes; -use futures::Stream; +use futures::{Stream, StreamExt}; +use iroh_car::CarWriter; use libipld::{Ipld, IpldCodec}; use libipld_core::{cid::Cid, codec::References}; use std::{ collections::{HashSet, VecDeque}, io::Cursor, }; +use tokio::io::AsyncWrite; use wnfs_common::BlockStore; /// Test utilities. @@ -25,8 +27,8 @@ pub mod test_utils; pub fn walk_dag_in_order_breadth_first<'a>( root: Cid, store: &'a impl BlockStore, -) -> impl Stream> + 'a { - try_stream! { +) -> impl Stream> + Unpin + 'a { + Box::pin(try_stream! { let mut visited = HashSet::new(); let mut frontier = VecDeque::from([root]); while let Some(cid) = frontier.pop_front() { @@ -39,7 +41,19 @@ pub fn walk_dag_in_order_breadth_first<'a>( frontier.extend(references(codec, &block)?); yield (cid, block); } + }) +} + +/// Writes a stream of blocks into a car file +pub async fn stream_into_car( + mut blocks: impl Stream> + Unpin, + writer: &mut CarWriter, +) -> Result<()> { + while let Some(result) = blocks.next().await { + let (cid, bytes) = result?; + writer.write(cid, bytes).await?; } + Ok(()) } fn references(codec: IpldCodec, block: impl AsRef<[u8]>) -> Result> { @@ -50,10 +64,43 @@ fn references(codec: IpldCodec, block: impl AsRef<[u8]>) -> Result> { #[cfg(test)] mod tests { + use crate::test_utils::{encode, generate_dag, Rvg}; + use super::*; + use async_std::fs::File; use futures::TryStreamExt; + use iroh_car::CarHeader; + use libipld_core::multihash::{Code, MultihashDigest}; + use tokio_util::compat::FuturesAsyncWriteCompatExt; use wnfs_common::MemoryBlockStore; + #[async_std::test] + async fn test_write_into_car() -> Result<()> { + let (blocks, root) = Rvg::new().sample(&generate_dag(256, |cids| { + let ipld = Ipld::List(cids.into_iter().map(Ipld::Link).collect()); + let bytes = encode(&ipld); + let cid = Cid::new_v1(IpldCodec::DagCbor.into(), Code::Blake3_256.digest(&bytes)); + (cid, bytes) + })); + + let store = &MemoryBlockStore::new(); + for (cid, bytes) in blocks.iter() { + let cid_store = store + .put_block(bytes.clone(), IpldCodec::DagCbor.into()) + .await?; + assert_eq!(*cid, cid_store); + } + + let file = File::create("./my-car3.car").await?; + let mut writer = CarWriter::new(CarHeader::new_v1(vec![root]), file.compat_write()); + writer.write_header().await?; + let block_stream = walk_dag_in_order_breadth_first(root, store); + stream_into_car(block_stream, &mut writer).await?; + let file = writer.finish().await?; + + Ok(()) + } + #[async_std::test] async fn test_walk_dag_breadth_first() -> Result<()> { let store = &MemoryBlockStore::new();