Skip to content

Commit

Permalink
Write a stream of blocks into a CAR file
Browse files Browse the repository at this point in the history
  • Loading branch information
matheus23 committed Aug 16, 2023
1 parent 77b1f11 commit ca38690
Show file tree
Hide file tree
Showing 3 changed files with 163 additions and 7 deletions.
113 changes: 109 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions car-mirror/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,17 @@ async-stream = "0.3.5"
bytes = "1.4.0"
fixedbitset = "0.4.2"
futures = "0.3.28"
iroh-car = "0.3.0"
libipld = "0.16.0"
libipld-core = "0.16.0"
proptest = { version = "1.1", optional = true }
roaring-graphs = "0.12"
tokio-util = { version = "0.7.8", features = ["compat"] }
tokio = { version = "^1", features = ["io-util"] }
tracing = "0.1"
tracing-subscriber = "0.3"
wnfs-common = "0.1.23"
async-trait = "0.1.73"

[dev-dependencies]
async-std = { version = "1.11", features = ["attributes"] }
Expand Down
53 changes: 50 additions & 3 deletions car-mirror/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,15 @@
use anyhow::Result;
use async_stream::try_stream;
use bytes::Bytes;
use futures::Stream;
use futures::{Stream, StreamExt};
use iroh_car::CarWriter;
use libipld::{Ipld, IpldCodec};
use libipld_core::{cid::Cid, codec::References};
use std::{
collections::{HashSet, VecDeque},
io::Cursor,
};
use tokio::io::AsyncWrite;
use wnfs_common::BlockStore;

/// Test utilities.
Expand All @@ -25,8 +27,8 @@ pub mod test_utils;
pub fn walk_dag_in_order_breadth_first<'a>(

Check failure on line 27 in car-mirror/src/lib.rs

View workflow job for this annotation

GitHub Actions / run-checks (stable)

the following explicit lifetimes could be elided: 'a

Check failure on line 27 in car-mirror/src/lib.rs

View workflow job for this annotation

GitHub Actions / run-checks (nightly)

the following explicit lifetimes could be elided: 'a
root: Cid,
store: &'a impl BlockStore,
) -> impl Stream<Item = Result<(Cid, Bytes)>> + 'a {
try_stream! {
) -> impl Stream<Item = Result<(Cid, Bytes)>> + Unpin + 'a {
Box::pin(try_stream! {
let mut visited = HashSet::new();
let mut frontier = VecDeque::from([root]);
while let Some(cid) = frontier.pop_front() {
Expand All @@ -39,7 +41,19 @@ pub fn walk_dag_in_order_breadth_first<'a>(
frontier.extend(references(codec, &block)?);
yield (cid, block);
}
})
}

/// Writes a stream of blocks into a car file
pub async fn stream_into_car<W: AsyncWrite + Send + Unpin>(
mut blocks: impl Stream<Item = Result<(Cid, Bytes)>> + Unpin,
writer: &mut CarWriter<W>,
) -> Result<()> {
while let Some(result) = blocks.next().await {
let (cid, bytes) = result?;
writer.write(cid, bytes).await?;
}
Ok(())
}

fn references(codec: IpldCodec, block: impl AsRef<[u8]>) -> Result<Vec<Cid>> {
Expand All @@ -50,10 +64,43 @@ fn references(codec: IpldCodec, block: impl AsRef<[u8]>) -> Result<Vec<Cid>> {

#[cfg(test)]
mod tests {
use crate::test_utils::{encode, generate_dag, Rvg};

use super::*;
use async_std::fs::File;
use futures::TryStreamExt;
use iroh_car::CarHeader;
use libipld_core::multihash::{Code, MultihashDigest};
use tokio_util::compat::FuturesAsyncWriteCompatExt;
use wnfs_common::MemoryBlockStore;

#[async_std::test]
async fn test_write_into_car() -> Result<()> {
let (blocks, root) = Rvg::new().sample(&generate_dag(256, |cids| {
let ipld = Ipld::List(cids.into_iter().map(Ipld::Link).collect());
let bytes = encode(&ipld);
let cid = Cid::new_v1(IpldCodec::DagCbor.into(), Code::Blake3_256.digest(&bytes));
(cid, bytes)
}));

let store = &MemoryBlockStore::new();
for (cid, bytes) in blocks.iter() {
let cid_store = store
.put_block(bytes.clone(), IpldCodec::DagCbor.into())
.await?;
assert_eq!(*cid, cid_store);
}

let file = File::create("./my-car3.car").await?;
let mut writer = CarWriter::new(CarHeader::new_v1(vec![root]), file.compat_write());
writer.write_header().await?;
let block_stream = walk_dag_in_order_breadth_first(root, store);
stream_into_car(block_stream, &mut writer).await?;
let file = writer.finish().await?;

Ok(())
}

#[async_std::test]
async fn test_walk_dag_breadth_first() -> Result<()> {
let store = &MemoryBlockStore::new();
Expand Down

0 comments on commit ca38690

Please sign in to comment.