Skip to content

Commit

Permalink
Small demo of running CAR mirror in-memory
Browse files Browse the repository at this point in the history
  • Loading branch information
matheus23 committed Aug 16, 2023
1 parent aa22ea0 commit 0c197aa
Showing 1 changed file with 196 additions and 17 deletions.
213 changes: 196 additions & 17 deletions car-mirror/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,18 @@

//! car-mirror

use anyhow::{anyhow, Result};
use anyhow::{anyhow, bail, Result};
use async_stream::try_stream;
use bytes::Bytes;
use futures::{Stream, StreamExt};
use iroh_car::{CarReader, CarWriter};
use futures::{stream::LocalBoxStream, Stream, StreamExt, TryStreamExt};
use iroh_car::{CarHeader, CarReader, CarWriter};
use libipld::{Ipld, IpldCodec};
use libipld_core::{
cid::Cid,
codec::References,
multihash::{Code, MultihashDigest},
};
use messages::PushResponse;
use std::{
collections::{HashSet, VecDeque},
io::Cursor,
Expand All @@ -29,14 +30,137 @@ pub mod test_utils;
/// Contains the data types that are sent over-the-wire and relevant serialization code.
pub mod messages;

pub struct PushSenderSession<'a, B: BlockStore> {

Check failure on line 33 in car-mirror/src/lib.rs

View workflow job for this annotation

GitHub Actions / run-checks (nightly)

type does not implement `std::fmt::Debug`; consider adding `#[derive(Debug)]` or a manual implementation

Check failure on line 33 in car-mirror/src/lib.rs

View workflow job for this annotation

GitHub Actions / run-checks (nightly)

missing documentation for a struct

Check failure on line 33 in car-mirror/src/lib.rs

View workflow job for this annotation

GitHub Actions / run-checks (stable)

missing documentation for a struct

Check failure on line 33 in car-mirror/src/lib.rs

View workflow job for this annotation

GitHub Actions / run-checks (stable)

type does not implement `std::fmt::Debug`; consider adding `#[derive(Debug)]` or a manual implementation
last_response: PushResponse,
send_limit: usize,
store: &'a B,
}

impl<'a, B: BlockStore> PushSenderSession<'a, B> {
pub fn new(root: Cid, store: &'a B) -> Self {

Check failure on line 40 in car-mirror/src/lib.rs

View workflow job for this annotation

GitHub Actions / run-checks (nightly)

missing documentation for an associated function

Check failure on line 40 in car-mirror/src/lib.rs

View workflow job for this annotation

GitHub Actions / run-checks (stable)

missing documentation for an associated function
Self {
last_response: PushResponse {
subgraph_roots: vec![root],
// Just putting an empty bloom here initially
bloom_k: 3,
bloom: Vec::new(),
},
send_limit: 256 * 1024, // 256KiB
store,
}
}

pub fn handle_response(&mut self, response: PushResponse) -> bool {

Check failure on line 53 in car-mirror/src/lib.rs

View workflow job for this annotation

GitHub Actions / run-checks (nightly)

missing documentation for a method

Check failure on line 53 in car-mirror/src/lib.rs

View workflow job for this annotation

GitHub Actions / run-checks (stable)

missing documentation for a method
self.last_response = response;
self.last_response.subgraph_roots.is_empty()
}

pub async fn next_request(&mut self) -> Result<Bytes> {

Check failure on line 58 in car-mirror/src/lib.rs

View workflow job for this annotation

GitHub Actions / run-checks (nightly)

missing documentation for a method

Check failure on line 58 in car-mirror/src/lib.rs

View workflow job for this annotation

GitHub Actions / run-checks (stable)

missing documentation for a method
let mut writer = CarWriter::new(
CarHeader::new_v1(
// TODO(matheus23): This is stupid
// CAR files *must* have at least one CID in them, and all of them
// need to appear as a block in the payload.
// It would probably make most sense to just write all subgraph roots into this,
// but we don't know how many of the subgraph roots fit into this round yet,
// so we're simply writing the first one in here, since we know
// at least one block will be written (and it'll be that one).
self.last_response
.subgraph_roots
.iter()
.take(1)
.cloned()
.collect(),
),
Vec::new(),
);
writer.write_header().await?;

let mut block_bytes = 0;
let mut stream =
walk_dag_in_order_breadth_first(self.last_response.subgraph_roots.clone(), self.store);
while let Some((cid, block)) = stream.try_next().await? {
// TODO Eventually we'll need to turn the `LocalBoxStream` into a more configurable
// "external iterator", and then this will be the point where we prune parts of the DAG
// that the recipient already has.

// TODO(matheus23): Count the actual bytes sent?
block_bytes += block.len();
if block_bytes > self.send_limit {
break;
}

writer.write(cid, &block).await?;
}

Ok(writer.finish().await?.into())
}
}

pub struct PushReceiverSession<'a, B: BlockStore> {

Check failure on line 100 in car-mirror/src/lib.rs

View workflow job for this annotation

GitHub Actions / run-checks (nightly)

type does not implement `std::fmt::Debug`; consider adding `#[derive(Debug)]` or a manual implementation

Check failure on line 100 in car-mirror/src/lib.rs

View workflow job for this annotation

GitHub Actions / run-checks (nightly)

missing documentation for a struct

Check failure on line 100 in car-mirror/src/lib.rs

View workflow job for this annotation

GitHub Actions / run-checks (stable)

missing documentation for a struct
accepted_roots: Vec<Cid>,
receive_limit: usize,
store: &'a B,
}

impl<'a, B: BlockStore> PushReceiverSession<'a, B> {
pub fn new(root: Cid, store: &'a B) -> Self {

Check failure on line 107 in car-mirror/src/lib.rs

View workflow job for this annotation

GitHub Actions / run-checks (nightly)

missing documentation for an associated function
Self {
accepted_roots: vec![root],
receive_limit: 256 * 1024, // 256KiB
store,
}
}

pub async fn handle_request(&mut self, request: Bytes) -> Result<PushResponse> {

Check failure on line 115 in car-mirror/src/lib.rs

View workflow job for this annotation

GitHub Actions / run-checks (nightly)

missing documentation for a method
let mut reader = CarReader::new(Cursor::new(request)).await?;
let mut stream = read_in_order_dag_from_car(self.accepted_roots.clone(), &mut reader);

let mut missing_subgraphs: HashSet<_> = self.accepted_roots.iter().cloned().collect();

let mut block_bytes = 0;
while let Some((cid, block)) = stream.try_next().await? {
block_bytes += block.len();
if block_bytes > self.receive_limit {
bail!(
"Received more than {} bytes ({block_bytes}), aborting request.",
self.receive_limit
);
}

let codec: IpldCodec = cid
.codec()
.try_into()
.map_err(|_| anyhow!("Unsupported codec in Cid: {cid}"))?;

missing_subgraphs.remove(&cid);
missing_subgraphs.extend(references(codec, &block)?);

self.store.put_block(block, cid.codec()).await?;
}

let subgraph_roots: Vec<_> = missing_subgraphs.into_iter().collect();

self.accepted_roots = subgraph_roots.clone();

Ok(PushResponse {
subgraph_roots,
// We ignore blooms for now
bloom_k: 3,
bloom: Vec::new(),
})
}
}

/// walks a DAG from given root breadth-first along IPLD links
pub fn walk_dag_in_order_breadth_first<'a>(
root: Cid,
store: &'a impl BlockStore,
) -> impl Stream<Item = Result<(Cid, Bytes)>> + Unpin + 'a {
pub fn walk_dag_in_order_breadth_first(
roots: impl IntoIterator<Item = Cid>,
store: &impl BlockStore,
) -> LocalBoxStream<'_, Result<(Cid, Bytes)>> {
let mut frontier: VecDeque<_> = roots.into_iter().collect();

Box::pin(try_stream! {
let mut visited = HashSet::new();
let mut frontier = VecDeque::from([root]);
while let Some(cid) = frontier.pop_front() {
if visited.contains(&cid) {
continue;
Expand Down Expand Up @@ -64,12 +188,12 @@ pub async fn stream_into_car<W: tokio::io::AsyncWrite + Send + Unpin>(

/// Read a directed acyclic graph from a CAR file, making sure it's read in-order and
/// only blocks reachable from the root are included.
pub fn read_in_order_dag_from_car<'a, R: tokio::io::AsyncRead + Unpin>(
root: Cid,
reader: &'a mut CarReader<R>,
) -> impl Stream<Item = Result<(Cid, Bytes)>> + Unpin + 'a {
pub fn read_in_order_dag_from_car<R: tokio::io::AsyncRead + Unpin>(
roots: impl IntoIterator<Item = Cid>,
reader: &mut CarReader<R>,
) -> LocalBoxStream<'_, Result<(Cid, Bytes)>> {
let mut reachable_from_root: HashSet<_> = roots.into_iter().collect();
Box::pin(try_stream! {
let mut reachable_from_root = HashSet::from([root]);
while let Some((cid, vec)) = reader.next_block().await.map_err(|e| anyhow!(e))? {
let block = Bytes::from(vec);

Expand Down Expand Up @@ -113,6 +237,8 @@ fn references(codec: IpldCodec, block: impl AsRef<[u8]>) -> Result<Vec<Cid>> {

#[cfg(test)]
mod tests {
use std::collections::BTreeMap;

use crate::test_utils::{encode, generate_dag, Rvg};

use super::*;
Expand Down Expand Up @@ -145,13 +271,13 @@ mod tests {
let file = File::create(filename).await?;
let mut writer = CarWriter::new(CarHeader::new_v1(vec![root]), file.compat_write());
writer.write_header().await?;
let block_stream = walk_dag_in_order_breadth_first(root, store);
let block_stream = walk_dag_in_order_breadth_first([root], store);
stream_into_car(block_stream, &mut writer).await?;
writer.finish().await?;

let mut reader = CarReader::new(File::open(filename).await?.compat()).await?;

read_in_order_dag_from_car(root, &mut reader)
read_in_order_dag_from_car([root], &mut reader)
.try_for_each(|(cid, _)| {
println!("Got {cid}");
future::ready(Ok(()))
Expand All @@ -161,6 +287,59 @@ mod tests {
Ok(())
}

#[async_std::test]
async fn test_transfer() -> Result<()> {
let (blocks, root) = Rvg::new().sample(&generate_dag(256, |cids| {
let ipld = Ipld::Map(BTreeMap::from([
("data".into(), Ipld::Bytes(vec![0u8; 10 * 1024])),
(
"links".into(),
Ipld::List(cids.into_iter().map(Ipld::Link).collect()),
),
]));
let bytes = encode(&ipld);
let cid = Cid::new_v1(IpldCodec::DagCbor.into(), Code::Blake3_256.digest(&bytes));
(cid, bytes)
}));

let sender_store = &MemoryBlockStore::new();
for (cid, bytes) in blocks.iter() {
let cid_store = sender_store
.put_block(bytes.clone(), IpldCodec::DagCbor.into())
.await?;
assert_eq!(*cid, cid_store);
}

let receiver_store = &MemoryBlockStore::new();

let mut sender = PushSenderSession::new(root, sender_store);
let mut receiver = PushReceiverSession::new(root, receiver_store);

loop {
let request = sender.next_request().await?;
println!("Sending request {} bytes", request.len());
let response = receiver.handle_request(request).await?;
if sender.handle_response(response) {
// Should be done
break;
}
}

// receiver should have all data
let sender_cids = walk_dag_in_order_breadth_first([root], sender_store)
.map_ok(|(cid, _)| cid)
.try_collect::<Vec<_>>()
.await?;
let receiver_cids = walk_dag_in_order_breadth_first([root], receiver_store)
.map_ok(|(cid, _)| cid)
.try_collect::<Vec<_>>()
.await?;

assert_eq!(sender_cids, receiver_cids);

Ok(())
}

#[async_std::test]
async fn test_walk_dag_breadth_first() -> Result<()> {
let store = &MemoryBlockStore::new();
Expand All @@ -181,7 +360,7 @@ mod tests {
]))
.await?;

let cids = walk_dag_in_order_breadth_first(cid_root, store)
let cids = walk_dag_in_order_breadth_first([cid_root], store)
.try_collect::<Vec<_>>()
.await?
.into_iter()
Expand Down Expand Up @@ -234,7 +413,7 @@ mod proptests {
assert_eq!(*cid, cid_store);
}

let mut cids = walk_dag_in_order_breadth_first(root, store)
let mut cids = walk_dag_in_order_breadth_first([root], store)
.map_ok(|(cid, _)| cid)
.try_collect::<Vec<_>>()
.await
Expand Down

0 comments on commit 0c197aa

Please sign in to comment.