Skip to content

Commit

Permalink
Read back car files and make sure they're incrementally verified
Browse files Browse the repository at this point in the history
  • Loading branch information
matheus23 committed Aug 16, 2023
1 parent ca38690 commit 9e4fcf8
Showing 1 changed file with 66 additions and 9 deletions.
75 changes: 66 additions & 9 deletions car-mirror/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,21 @@

//! car-mirror

use anyhow::Result;
use anyhow::{anyhow, bail, Result};

Check failure on line 7 in car-mirror/src/lib.rs

View workflow job for this annotation

GitHub Actions / run-checks (nightly)

unused import: `bail`

Check failure on line 7 in car-mirror/src/lib.rs

View workflow job for this annotation

GitHub Actions / run-checks (stable)

unused import: `bail`
use async_stream::try_stream;
use bytes::Bytes;
use futures::{Stream, StreamExt};
use iroh_car::CarWriter;
use iroh_car::{CarReader, CarWriter};
use libipld::{Ipld, IpldCodec};
use libipld_core::{cid::Cid, codec::References};
use libipld_core::{
cid::Cid,
codec::References,
multihash::{Code, MultihashDigest},
};
use std::{
collections::{HashSet, VecDeque},
io::Cursor,
};
use tokio::io::AsyncWrite;
use wnfs_common::BlockStore;

/// Test utilities.
Expand Down Expand Up @@ -45,7 +48,7 @@ pub fn walk_dag_in_order_breadth_first<'a>(
}

/// Writes a stream of blocks into a car file
pub async fn stream_into_car<W: AsyncWrite + Send + Unpin>(
pub async fn stream_into_car<W: tokio::io::AsyncWrite + Send + Unpin>(
mut blocks: impl Stream<Item = Result<(Cid, Bytes)>> + Unpin,
writer: &mut CarWriter<W>,
) -> Result<()> {
Expand All @@ -56,6 +59,49 @@ pub async fn stream_into_car<W: AsyncWrite + Send + Unpin>(
Ok(())
}

/// Read a directed acyclic graph from a CAR file, making sure it's read in-order and
/// only blocks reachable from the root are included.
pub fn read_in_order_dag_from_car<'a, R: tokio::io::AsyncRead + Unpin>(

Check failure on line 64 in car-mirror/src/lib.rs

View workflow job for this annotation

GitHub Actions / run-checks (nightly)

the following explicit lifetimes could be elided: 'a

Check failure on line 64 in car-mirror/src/lib.rs

View workflow job for this annotation

GitHub Actions / run-checks (stable)

the following explicit lifetimes could be elided: 'a
root: Cid,
reader: &'a mut CarReader<R>,
) -> impl Stream<Item = Result<(Cid, Bytes)>> + Unpin + 'a {
Box::pin(try_stream! {
let mut reachable_from_root = HashSet::from([root]);
while let Some((cid, vec)) = reader.next_block().await.map_err(|e| anyhow!(e))? {
let block = Bytes::from(vec);

let code: Code = cid
.hash()
.code()
.try_into()
.map_err(|_| anyhow!("Unsupported hash code in Cid: {cid}"))?;

let codec: IpldCodec = cid
.codec()
.try_into()
.map_err(|_| anyhow!("Unsupported codec in Cid: {cid}"))?;

let digest = code.digest(&block);

if cid.hash() != &digest {
Err(anyhow!(
"Digest mismatch in CAR file: expected {:?}, got {:?}",
digest,
cid.hash()
))?;
}

if !reachable_from_root.contains(&cid) {
Err(anyhow!("Unexpected block or block out of order: {cid}"))?;
}

reachable_from_root.extend(references(codec, &block)?);

yield (cid, block);
}
})
}

fn references(codec: IpldCodec, block: impl AsRef<[u8]>) -> Result<Vec<Cid>> {
let mut refs = Vec::new();
<Ipld as References<IpldCodec>>::references(codec, &mut Cursor::new(block), &mut refs)?;
Expand All @@ -68,10 +114,10 @@ mod tests {

use super::*;
use async_std::fs::File;
use futures::TryStreamExt;
use futures::{future, TryStreamExt};
use iroh_car::CarHeader;
use libipld_core::multihash::{Code, MultihashDigest};
use tokio_util::compat::FuturesAsyncWriteCompatExt;
use tokio_util::compat::{FuturesAsyncReadCompatExt, FuturesAsyncWriteCompatExt};
use wnfs_common::MemoryBlockStore;

#[async_std::test]
Expand All @@ -91,12 +137,23 @@ mod tests {
assert_eq!(*cid, cid_store);
}

let file = File::create("./my-car3.car").await?;
let filename = "./my-car.car";

let file = File::create(filename).await?;
let mut writer = CarWriter::new(CarHeader::new_v1(vec![root]), file.compat_write());
writer.write_header().await?;
let block_stream = walk_dag_in_order_breadth_first(root, store);
stream_into_car(block_stream, &mut writer).await?;
let file = writer.finish().await?;
writer.finish().await?;

let mut reader = CarReader::new(File::open(filename).await?.compat()).await?;

read_in_order_dag_from_car(root, &mut reader)
.try_for_each(|(cid, _)| {
println!("Got {cid}");
future::ready(Ok(()))
})
.await?;

Ok(())
}
Expand Down

0 comments on commit 9e4fcf8

Please sign in to comment.