Skip to content

Commit

Permalink
Add some docs
Browse files Browse the repository at this point in the history
  • Loading branch information
matheus23 committed Aug 18, 2023
1 parent 7535cf8 commit 5c4baa7
Show file tree
Hide file tree
Showing 3 changed files with 90 additions and 10 deletions.
91 changes: 83 additions & 8 deletions car-mirror/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,20 @@ pub mod test_utils;
/// Contains the data types that are sent over-the-wire and relevant serialization code.
pub mod messages;

/// Configuration values (such as byte limits) for the CAR mirror push protocol
#[derive(Clone, Debug)]
pub struct PushConfig {
send_minimum: usize,
receive_maximum: usize,
max_roots_per_round: usize,
bloom_fpr: f64,
/// A client will try to send at least `send_minimum` bytes of block data
/// in each request, except if close to the end of the protocol (when there's)
/// not that much data left.
pub send_minimum: usize,
/// The maximum number of bytes per request that the server accepts.
pub receive_maximum: usize,
/// The maximum number of roots per request that the server will send to the client,
/// and that the client will consume.
pub max_roots_per_round: usize,
/// The target false positive rate for the bloom filter that the server sends.
pub bloom_fpr: f64,
}

impl Default for PushConfig {
Expand All @@ -40,11 +49,17 @@ impl Default for PushConfig {
send_minimum: 128 * 1024, // 128KiB
receive_maximum: 512 * 1024, // 512KiB
max_roots_per_round: 1000, // max. ~41KB of CIDs
bloom_fpr: 1.0 / 1_000.0, // 0.1%
bloom_fpr: 1.0 / 10_000.0, // 0.1%
}
}
}

/// Initiate a car mirror push request.
///
/// The goal is to transfer the DAG below the root CID to
/// the server.
///
/// The return value is a CAR file.
pub async fn client_initiate_push(
root: Cid,
config: &PushConfig,
Expand All @@ -59,6 +74,13 @@ pub async fn client_initiate_push(
client_push(root, fake_response, config, store).await
}

/// Send a subsequent car mirror push request, following up on
/// a response retrieved from an initial `client_initiate_push` request.
///
/// Make sure to call `response.indicates_finished()` before initiating
/// a follow-up `client_push` request.
///
/// The return value is another CAR file with more blocks from the DAG below the root.
pub async fn client_push(
root: Cid,
last_response: PushResponse,
Expand Down Expand Up @@ -129,6 +151,12 @@ pub async fn client_push(
Ok(writer.finish().await?.into())
}

/// This handles a car mirror push request on the server side.
///
/// The root is the root CID of the DAG that is pushed, the request is a CAR file
/// with some blocks from the cold call.
///
/// Returns a response to answer the client's request with.
pub async fn server_push_response(
root: Cid,
request: Bytes,
Expand Down Expand Up @@ -180,21 +208,41 @@ pub async fn server_push_response(
})
}

/// A struct that represents an ongoing walk through the Dag.
#[derive(Clone, Debug)]
pub struct DagWalk {
/// A queue of CIDs to visit next
pub frontier: VecDeque<Cid>,
/// The set of already visited CIDs. This prevents re-visiting.
pub visited: HashSet<Cid>,
/// Whether to do a breadth-first or depth-first traversal.
/// This controls whether newly discovered links are appended or prepended to the frontier.
pub breadth_first: bool,
}

impl DagWalk {
/// Start a breadth-first traversal of given roots.
///
/// Breadth-first is explained the easiest in the simple case of a tree (which is a DAG):
/// It will visit each node in the tree layer-by-layer.
///
/// So the first nodes it will visit are going to be all roots in order.
pub fn breadth_first(roots: impl IntoIterator<Item = Cid>) -> Self {
Self::new(roots, true)
}

/// Start a depth-first traversal of given roots.
///
/// Depth-first will follow links immediately after discovering them, taking the fastest
/// path towards leaves.
///
/// The very first node is guaranteed to be the first root, but subsequent nodes may not be
/// from the initial roots.
pub fn depth_first(roots: impl IntoIterator<Item = Cid>) -> Self {
Self::new(roots, false)
}

/// Start a DAG traversal of given roots. See also `breadth_first` and `depth_first`.
pub fn new(roots: impl IntoIterator<Item = Cid>, breadth_first: bool) -> Self {
let frontier = roots.into_iter().collect();
let visited = HashSet::new();
Expand All @@ -205,12 +253,15 @@ impl DagWalk {
}
}

/// Return the next node in the traversal.
///
/// Returns `None` if no nodes are left to be visited.
pub async fn next(&mut self, store: &impl BlockStore) -> Result<Option<(Cid, Bytes)>> {
let cid = loop {
let popped = if self.breadth_first {
self.frontier.pop_front()
} else {
self.frontier.pop_back()
} else {
self.frontier.pop_front()
};

let Some(cid) = popped else {
Expand All @@ -226,13 +277,14 @@ impl DagWalk {
let block = store.get_block(&cid).await?;
for ref_cid in references(cid, &block)? {
if !self.visited.contains(&ref_cid) {
self.frontier.push_back(ref_cid);
self.frontier.push_front(ref_cid);
}
}

Ok(Some((cid, block)))
}

/// Turn this traversal into a stream
pub fn stream(
self,
store: &impl BlockStore,
Expand All @@ -243,6 +295,9 @@ impl DagWalk {
}))
}

/// Find out whether the traversal is finished.
///
/// The next call to `next` would result in `None` if this returns true.
pub fn is_finished(&self) -> bool {
// We're finished if the frontier does not contain any CIDs that we have not visited yet.
// Put differently:
Expand All @@ -253,6 +308,7 @@ impl DagWalk {
.any(|frontier_cid| !self.visited.contains(frontier_cid))
}

/// Skip a node from the traversal for now.
pub fn skip_walking(&mut self, block: (Cid, Bytes)) -> Result<()> {
let (cid, bytes) = block;
let refs = references(cid, bytes)?;
Expand All @@ -276,12 +332,20 @@ pub async fn stream_into_car<W: tokio::io::AsyncWrite + Send + Unpin>(
Ok(())
}

/// A data structure that keeps state about incremental DAG verification.
#[derive(Clone, Debug)]
pub struct IncrementalDagVerification {
/// All the CIDs that have been discovered to be missing from the DAG.
pub want_cids: HashSet<Cid>,
/// All the CIDs that are available locally.
pub have_cids: HashSet<Cid>,
}

impl IncrementalDagVerification {
/// Initiate incremental DAG verification of given roots.
///
/// This will already run a traversal to find missing subgraphs and
/// CIDs that are already present.
pub async fn new(
roots: impl IntoIterator<Item = Cid>,
store: &impl BlockStore,
Expand Down Expand Up @@ -316,6 +380,17 @@ impl IncrementalDagVerification {
})
}

/// Verify that
/// - the block actually hashes to the hash from given CID and
/// - the block is part of the graph below the roots.
///
/// And finally stores the block in the blockstore.
///
/// This *may* fail, even if the block is part of the graph below the roots,
/// if intermediate blocks between the roots and this block are missing.
///
/// This *may* add the block to the blockstore, but still fail to verify, specifically
/// if the block's bytes don't match the hash in the CID.
pub async fn verify_and_store_block(
&mut self,
block: (Cid, Bytes),
Expand Down
1 change: 1 addition & 0 deletions car-mirror/src/messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ pub struct PushResponse {
}

impl PushResponse {
/// Whether this response indicates that the protocol is finished.
pub fn indicates_finished(&self) -> bool {
self.subgraph_roots.is_empty()
}
Expand Down
8 changes: 6 additions & 2 deletions car-mirror/src/test_utils/dag_strategy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,20 +6,24 @@ use libipld_core::codec::Encode;
use proptest::strategy::Strategy;
use roaring_graphs::{arb_dag, DirectedAcyclicGraph, Vertex};

/// Encode some IPLD as dag-cbor
pub fn encode(ipld: &Ipld) -> Bytes {
let mut vec = Vec::new();
ipld.encode(IpldCodec::DagCbor, &mut vec).unwrap(); // TODO(matheus23) unwrap
Bytes::from(vec)
}

/// A strategy for use with proptest to generate random DAGs (directed acyclic graphs).
/// The strategy generates a list of blocks of type T and their CIDs, as well as
/// the root block's CID.
pub fn generate_dag<T: Debug + Clone>(
max_nodes: u16,
generate_block: fn(Vec<Cid>) -> (Cid, T),
) -> impl Strategy<Value = (Vec<(Cid, T)>, Cid)> {
arb_dag(1..max_nodes, 0.5).prop_map(move |dag| dag_to_nodes(&dag, generate_block))
}

pub fn dag_to_nodes<T>(
fn dag_to_nodes<T>(
dag: &DirectedAcyclicGraph,
generate_node: fn(Vec<Cid>) -> (Cid, T),
) -> (Vec<(Cid, T)>, Cid) {
Expand All @@ -30,7 +34,7 @@ pub fn dag_to_nodes<T>(
(blocks, cid)
}

pub fn dag_to_nodes_helper<T>(
fn dag_to_nodes_helper<T>(
dag: &DirectedAcyclicGraph,
root: Vertex,
generate_node: fn(Vec<Cid>) -> (Cid, T),
Expand Down

0 comments on commit 5c4baa7

Please sign in to comment.