Skip to content

Commit

Permalink
Split into modules
Browse files Browse the repository at this point in the history
  • Loading branch information
matheus23 committed Aug 18, 2023
1 parent 5c4baa7 commit b0da652
Show file tree
Hide file tree
Showing 5 changed files with 607 additions and 572 deletions.
20 changes: 20 additions & 0 deletions car-mirror/src/common.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
use anyhow::{anyhow, Result};
use libipld::{Ipld, IpldCodec};
use libipld_core::{cid::Cid, codec::References};
use std::io::Cursor;

/// Find all CIDs that a block references.
///
/// This will error out if
/// - the codec is not supported
/// - the block can't be parsed.
pub fn references(cid: Cid, block: impl AsRef<[u8]>) -> Result<Vec<Cid>> {
let codec: IpldCodec = cid
.codec()
.try_into()
.map_err(|_| anyhow!("Unsupported codec in Cid: {cid}"))?;

let mut refs = Vec::new();
<Ipld as References<IpldCodec>>::references(codec, &mut Cursor::new(block), &mut refs)?;
Ok(refs)
}
219 changes: 219 additions & 0 deletions car-mirror/src/dag_walk.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,219 @@
use crate::common::references;
use anyhow::Result;
use bytes::Bytes;
use futures::{stream::try_unfold, Stream};
use libipld_core::cid::Cid;
use std::collections::{HashSet, VecDeque};
use wnfs_common::BlockStore;

/// A struct that represents an ongoing walk through the Dag.
#[derive(Clone, Debug)]
pub struct DagWalk {
/// A queue of CIDs to visit next
pub frontier: VecDeque<Cid>,
/// The set of already visited CIDs. This prevents re-visiting.
pub visited: HashSet<Cid>,
/// Whether to do a breadth-first or depth-first traversal.
/// This controls whether newly discovered links are appended or prepended to the frontier.
pub breadth_first: bool,
}

impl DagWalk {
/// Start a breadth-first traversal of given roots.
///
/// Breadth-first is explained the easiest in the simple case of a tree (which is a DAG):
/// It will visit each node in the tree layer-by-layer.
///
/// So the first nodes it will visit are going to be all roots in order.
pub fn breadth_first(roots: impl IntoIterator<Item = Cid>) -> Self {
Self::new(roots, true)
}

/// Start a depth-first traversal of given roots.
///
/// Depth-first will follow links immediately after discovering them, taking the fastest
/// path towards leaves.
///
/// The very first node is guaranteed to be the first root, but subsequent nodes may not be
/// from the initial roots.
pub fn depth_first(roots: impl IntoIterator<Item = Cid>) -> Self {
Self::new(roots, false)
}

/// Start a DAG traversal of given roots. See also `breadth_first` and `depth_first`.
pub fn new(roots: impl IntoIterator<Item = Cid>, breadth_first: bool) -> Self {
let frontier = roots.into_iter().collect();
let visited = HashSet::new();
Self {
frontier,
visited,
breadth_first,
}
}

/// Return the next node in the traversal.
///
/// Returns `None` if no nodes are left to be visited.
pub async fn next(&mut self, store: &impl BlockStore) -> Result<Option<(Cid, Bytes)>> {
let cid = loop {
let popped = if self.breadth_first {
self.frontier.pop_back()
} else {
self.frontier.pop_front()
};

let Some(cid) = popped else {
return Ok(None);
};

// We loop until we find an unvisited block
if self.visited.insert(cid) {
break cid;
}
};

let block = store.get_block(&cid).await?;
for ref_cid in references(cid, &block)? {
if !self.visited.contains(&ref_cid) {
self.frontier.push_front(ref_cid);
}
}

Ok(Some((cid, block)))
}

/// Turn this traversal into a stream
pub fn stream(
self,
store: &impl BlockStore,
) -> impl Stream<Item = Result<(Cid, Bytes)>> + Unpin + '_ {
Box::pin(try_unfold(self, move |mut this| async move {
let maybe_block = this.next(store).await?;
Ok(maybe_block.map(|b| (b, this)))
}))
}

/// Find out whether the traversal is finished.
///
/// The next call to `next` would result in `None` if this returns true.
pub fn is_finished(&self) -> bool {
// We're finished if the frontier does not contain any CIDs that we have not visited yet.
// Put differently:
// We're not finished if there exist unvisited CIDs in the frontier.
!self
.frontier
.iter()
.any(|frontier_cid| !self.visited.contains(frontier_cid))
}

/// Skip a node from the traversal for now.
pub fn skip_walking(&mut self, block: (Cid, Bytes)) -> Result<()> {
let (cid, bytes) = block;
let refs = references(cid, bytes)?;
self.visited.insert(cid);
self.frontier
.retain(|frontier_cid| !refs.contains(frontier_cid));

Ok(())
}
}

#[cfg(test)]
mod tests {
use super::*;
use futures::TryStreamExt;
use libipld::Ipld;
use wnfs_common::MemoryBlockStore;

#[async_std::test]
async fn test_walk_dag_breadth_first() -> Result<()> {
let store = &MemoryBlockStore::new();

let cid_1 = store.put_serializable(&Ipld::String("1".into())).await?;
let cid_2 = store.put_serializable(&Ipld::String("2".into())).await?;
let cid_3 = store.put_serializable(&Ipld::String("3".into())).await?;

let cid_1_wrap = store
.put_serializable(&Ipld::List(vec![Ipld::Link(cid_1)]))
.await?;

let cid_root = store
.put_serializable(&Ipld::List(vec![
Ipld::Link(cid_1_wrap),
Ipld::Link(cid_2),
Ipld::Link(cid_3),
]))
.await?;

let cids = DagWalk::breadth_first([cid_root])
.stream(store)
.try_collect::<Vec<_>>()
.await?
.into_iter()
.map(|(cid, _block)| cid)
.collect::<Vec<_>>();

assert_eq!(cids, vec![cid_root, cid_1_wrap, cid_2, cid_3, cid_1]);

Ok(())
}
}

#[cfg(test)]
mod proptests {
use super::*;
use crate::test_utils::{encode, generate_dag};
use futures::TryStreamExt;
use libipld::{
multihash::{Code, MultihashDigest},
Cid, Ipld, IpldCodec,
};
use proptest::strategy::Strategy;
use std::collections::BTreeSet;
use test_strategy::proptest;
use wnfs_common::{BlockStore, MemoryBlockStore};

fn ipld_dags() -> impl Strategy<Value = (Vec<(Cid, Ipld)>, Cid)> {
generate_dag(256, |cids| {
let ipld = Ipld::List(cids.into_iter().map(Ipld::Link).collect());
let cid = Cid::new_v1(
IpldCodec::DagCbor.into(),
Code::Blake3_256.digest(&encode(&ipld)),
);
(cid, ipld)
})
}

#[proptest(max_shrink_iters = 100_000)]
fn walk_dag_never_iterates_block_twice(#[strategy(ipld_dags())] dag: (Vec<(Cid, Ipld)>, Cid)) {
async_std::task::block_on(async {
let (dag, root) = dag;
let store = &MemoryBlockStore::new();
for (cid, ipld) in dag.iter() {
let cid_store = store
.put_block(encode(ipld), IpldCodec::DagCbor.into())
.await
.unwrap();
assert_eq!(*cid, cid_store);
}

let mut cids = DagWalk::breadth_first([root])
.stream(store)
.map_ok(|(cid, _)| cid)
.try_collect::<Vec<_>>()
.await
.unwrap();

cids.sort();

let unique_cids = cids
.iter()
.cloned()
.collect::<BTreeSet<_>>()
.into_iter()
.collect::<Vec<_>>();

assert_eq!(cids, unique_cids);
});
}
}
100 changes: 100 additions & 0 deletions car-mirror/src/incremental_verification.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
use crate::{common::references, dag_walk::DagWalk};
use anyhow::{bail, Result};
use bytes::Bytes;
use libipld_core::cid::Cid;
use std::{collections::HashSet, eprintln};
use wnfs_common::{BlockStore, BlockStoreError};

/// A data structure that keeps state about incremental DAG verification.
#[derive(Clone, Debug)]
pub struct IncrementalDagVerification {
/// All the CIDs that have been discovered to be missing from the DAG.
pub want_cids: HashSet<Cid>,
/// All the CIDs that are available locally.
pub have_cids: HashSet<Cid>,
}

impl IncrementalDagVerification {
/// Initiate incremental DAG verification of given roots.
///
/// This will already run a traversal to find missing subgraphs and
/// CIDs that are already present.
pub async fn new(
roots: impl IntoIterator<Item = Cid>,
store: &impl BlockStore,
) -> Result<Self> {
let mut want_cids = HashSet::new();
let mut have_cids = HashSet::new();
let mut dag_walk = DagWalk::breadth_first(roots);

loop {
match dag_walk.next(store).await {
Err(e) => {
if let Some(BlockStoreError::CIDNotFound(not_found)) =
e.downcast_ref::<BlockStoreError>()
{
want_cids.insert(*not_found);
} else {
bail!(e);
}
}
Ok(Some((cid, _))) => {
have_cids.insert(cid);
}
Ok(None) => {
break;
}
}
}

Ok(Self {
want_cids,
have_cids,
})
}

/// Verify that
/// - the block actually hashes to the hash from given CID and
/// - the block is part of the graph below the roots.
///
/// And finally stores the block in the blockstore.
///
/// This *may* fail, even if the block is part of the graph below the roots,
/// if intermediate blocks between the roots and this block are missing.
///
/// This *may* add the block to the blockstore, but still fail to verify, specifically
/// if the block's bytes don't match the hash in the CID.
pub async fn verify_and_store_block(
&mut self,
block: (Cid, Bytes),
store: &impl BlockStore,
) -> Result<()> {
let (cid, bytes) = block;

if !self.want_cids.contains(&cid) {
if self.have_cids.contains(&cid) {
eprintln!("Warn: Received {cid}, even though we already have it");
} else {
bail!("Unexpected block or block out of order: {cid}");
}
}

let refs = references(cid, &bytes)?;
let result_cid = store.put_block(bytes, cid.codec()).await?;

if result_cid != cid {
bail!("Digest mismatch in CAR file: expected {cid}, got {result_cid}");
}

for ref_cid in refs {
if !self.have_cids.contains(&ref_cid) {
self.want_cids.insert(ref_cid);
}
}

self.want_cids.remove(&cid);
self.have_cids.insert(cid);

Ok(())
}
}
Loading

0 comments on commit b0da652

Please sign in to comment.