From 0c197aad717dba70ae642a4050946c8a6a831858 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philipp=20Kr=C3=BCger?= Date: Wed, 16 Aug 2023 18:42:51 +0200 Subject: [PATCH] Small demo of running CAR mirror in-memory --- car-mirror/src/lib.rs | 213 ++++++++++++++++++++++++++++++++++++++---- 1 file changed, 196 insertions(+), 17 deletions(-) diff --git a/car-mirror/src/lib.rs b/car-mirror/src/lib.rs index d3fc7e2..2701d8e 100644 --- a/car-mirror/src/lib.rs +++ b/car-mirror/src/lib.rs @@ -4,17 +4,18 @@ //! car-mirror -use anyhow::{anyhow, Result}; +use anyhow::{anyhow, bail, Result}; use async_stream::try_stream; use bytes::Bytes; -use futures::{Stream, StreamExt}; -use iroh_car::{CarReader, CarWriter}; +use futures::{stream::LocalBoxStream, Stream, StreamExt, TryStreamExt}; +use iroh_car::{CarHeader, CarReader, CarWriter}; use libipld::{Ipld, IpldCodec}; use libipld_core::{ cid::Cid, codec::References, multihash::{Code, MultihashDigest}, }; +use messages::PushResponse; use std::{ collections::{HashSet, VecDeque}, io::Cursor, @@ -29,14 +30,137 @@ pub mod test_utils; /// Contains the data types that are sent over-the-wire and relevant serialization code. pub mod messages; +pub struct PushSenderSession<'a, B: BlockStore> { + last_response: PushResponse, + send_limit: usize, + store: &'a B, +} + +impl<'a, B: BlockStore> PushSenderSession<'a, B> { + pub fn new(root: Cid, store: &'a B) -> Self { + Self { + last_response: PushResponse { + subgraph_roots: vec![root], + // Just putting an empty bloom here initially + bloom_k: 3, + bloom: Vec::new(), + }, + send_limit: 256 * 1024, // 256KiB + store, + } + } + + pub fn handle_response(&mut self, response: PushResponse) -> bool { + self.last_response = response; + self.last_response.subgraph_roots.is_empty() + } + + pub async fn next_request(&mut self) -> Result { + let mut writer = CarWriter::new( + CarHeader::new_v1( + // TODO(matheus23): This is stupid + // CAR files *must* have at least one CID in them, and all of them + // need to appear as a block in the payload. + // It would probably make most sense to just write all subgraph roots into this, + // but we don't know how many of the subgraph roots fit into this round yet, + // so we're simply writing the first one in here, since we know + // at least one block will be written (and it'll be that one). + self.last_response + .subgraph_roots + .iter() + .take(1) + .cloned() + .collect(), + ), + Vec::new(), + ); + writer.write_header().await?; + + let mut block_bytes = 0; + let mut stream = + walk_dag_in_order_breadth_first(self.last_response.subgraph_roots.clone(), self.store); + while let Some((cid, block)) = stream.try_next().await? { + // TODO Eventually we'll need to turn the `LocalBoxStream` into a more configurable + // "external iterator", and then this will be the point where we prune parts of the DAG + // that the recipient already has. + + // TODO(matheus23): Count the actual bytes sent? + block_bytes += block.len(); + if block_bytes > self.send_limit { + break; + } + + writer.write(cid, &block).await?; + } + + Ok(writer.finish().await?.into()) + } +} + +pub struct PushReceiverSession<'a, B: BlockStore> { + accepted_roots: Vec, + receive_limit: usize, + store: &'a B, +} + +impl<'a, B: BlockStore> PushReceiverSession<'a, B> { + pub fn new(root: Cid, store: &'a B) -> Self { + Self { + accepted_roots: vec![root], + receive_limit: 256 * 1024, // 256KiB + store, + } + } + + pub async fn handle_request(&mut self, request: Bytes) -> Result { + let mut reader = CarReader::new(Cursor::new(request)).await?; + let mut stream = read_in_order_dag_from_car(self.accepted_roots.clone(), &mut reader); + + let mut missing_subgraphs: HashSet<_> = self.accepted_roots.iter().cloned().collect(); + + let mut block_bytes = 0; + while let Some((cid, block)) = stream.try_next().await? { + block_bytes += block.len(); + if block_bytes > self.receive_limit { + bail!( + "Received more than {} bytes ({block_bytes}), aborting request.", + self.receive_limit + ); + } + + let codec: IpldCodec = cid + .codec() + .try_into() + .map_err(|_| anyhow!("Unsupported codec in Cid: {cid}"))?; + + missing_subgraphs.remove(&cid); + missing_subgraphs.extend(references(codec, &block)?); + + self.store.put_block(block, cid.codec()).await?; + } + + let subgraph_roots: Vec<_> = missing_subgraphs.into_iter().collect(); + + self.accepted_roots = subgraph_roots.clone(); + + Ok(PushResponse { + subgraph_roots, + // We ignore blooms for now + bloom_k: 3, + bloom: Vec::new(), + }) + } +} + /// walks a DAG from given root breadth-first along IPLD links -pub fn walk_dag_in_order_breadth_first<'a>( - root: Cid, - store: &'a impl BlockStore, -) -> impl Stream> + Unpin + 'a { +pub fn walk_dag_in_order_breadth_first( + roots: impl IntoIterator, + store: &impl BlockStore, +) -> LocalBoxStream<'_, Result<(Cid, Bytes)>> { + let mut frontier: VecDeque<_> = roots.into_iter().collect(); + Box::pin(try_stream! { let mut visited = HashSet::new(); - let mut frontier = VecDeque::from([root]); while let Some(cid) = frontier.pop_front() { if visited.contains(&cid) { continue; @@ -64,12 +188,12 @@ pub async fn stream_into_car( /// Read a directed acyclic graph from a CAR file, making sure it's read in-order and /// only blocks reachable from the root are included. -pub fn read_in_order_dag_from_car<'a, R: tokio::io::AsyncRead + Unpin>( - root: Cid, - reader: &'a mut CarReader, -) -> impl Stream> + Unpin + 'a { +pub fn read_in_order_dag_from_car( + roots: impl IntoIterator, + reader: &mut CarReader, +) -> LocalBoxStream<'_, Result<(Cid, Bytes)>> { + let mut reachable_from_root: HashSet<_> = roots.into_iter().collect(); Box::pin(try_stream! { - let mut reachable_from_root = HashSet::from([root]); while let Some((cid, vec)) = reader.next_block().await.map_err(|e| anyhow!(e))? { let block = Bytes::from(vec); @@ -113,6 +237,8 @@ fn references(codec: IpldCodec, block: impl AsRef<[u8]>) -> Result> { #[cfg(test)] mod tests { + use std::collections::BTreeMap; + use crate::test_utils::{encode, generate_dag, Rvg}; use super::*; @@ -145,13 +271,13 @@ mod tests { let file = File::create(filename).await?; let mut writer = CarWriter::new(CarHeader::new_v1(vec![root]), file.compat_write()); writer.write_header().await?; - let block_stream = walk_dag_in_order_breadth_first(root, store); + let block_stream = walk_dag_in_order_breadth_first([root], store); stream_into_car(block_stream, &mut writer).await?; writer.finish().await?; let mut reader = CarReader::new(File::open(filename).await?.compat()).await?; - read_in_order_dag_from_car(root, &mut reader) + read_in_order_dag_from_car([root], &mut reader) .try_for_each(|(cid, _)| { println!("Got {cid}"); future::ready(Ok(())) @@ -161,6 +287,59 @@ mod tests { Ok(()) } + #[async_std::test] + async fn test_transfer() -> Result<()> { + let (blocks, root) = Rvg::new().sample(&generate_dag(256, |cids| { + let ipld = Ipld::Map(BTreeMap::from([ + ("data".into(), Ipld::Bytes(vec![0u8; 10 * 1024])), + ( + "links".into(), + Ipld::List(cids.into_iter().map(Ipld::Link).collect()), + ), + ])); + let bytes = encode(&ipld); + let cid = Cid::new_v1(IpldCodec::DagCbor.into(), Code::Blake3_256.digest(&bytes)); + (cid, bytes) + })); + + let sender_store = &MemoryBlockStore::new(); + for (cid, bytes) in blocks.iter() { + let cid_store = sender_store + .put_block(bytes.clone(), IpldCodec::DagCbor.into()) + .await?; + assert_eq!(*cid, cid_store); + } + + let receiver_store = &MemoryBlockStore::new(); + + let mut sender = PushSenderSession::new(root, sender_store); + let mut receiver = PushReceiverSession::new(root, receiver_store); + + loop { + let request = sender.next_request().await?; + println!("Sending request {} bytes", request.len()); + let response = receiver.handle_request(request).await?; + if sender.handle_response(response) { + // Should be done + break; + } + } + + // receiver should have all data + let sender_cids = walk_dag_in_order_breadth_first([root], sender_store) + .map_ok(|(cid, _)| cid) + .try_collect::>() + .await?; + let receiver_cids = walk_dag_in_order_breadth_first([root], receiver_store) + .map_ok(|(cid, _)| cid) + .try_collect::>() + .await?; + + assert_eq!(sender_cids, receiver_cids); + + Ok(()) + } + #[async_std::test] async fn test_walk_dag_breadth_first() -> Result<()> { let store = &MemoryBlockStore::new(); @@ -181,7 +360,7 @@ mod tests { ])) .await?; - let cids = walk_dag_in_order_breadth_first(cid_root, store) + let cids = walk_dag_in_order_breadth_first([cid_root], store) .try_collect::>() .await? .into_iter() @@ -234,7 +413,7 @@ mod proptests { assert_eq!(*cid, cid_store); } - let mut cids = walk_dag_in_order_breadth_first(root, store) + let mut cids = walk_dag_in_order_breadth_first([root], store) .map_ok(|(cid, _)| cid) .try_collect::>() .await