diff --git a/Cargo.lock b/Cargo.lock index c3b2ec4fa8..20fd76df40 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -313,9 +313,9 @@ dependencies = [ [[package]] name = "bao-tree" -version = "0.12.1" +version = "0.13.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "95ae2f5c25ce9df1d21b6d2cfe8e1517ff78bd65476bfc47a1ac5b657fa0e1df" +checksum = "f1f7a89a8ee5889d2593ae422ce6e1bb03e48a0e8a16e4fa0882dfcbe7e182ef" dependencies = [ "bytes", "futures-lite", @@ -326,7 +326,6 @@ dependencies = [ "range-collections", "self_cell", "smallvec", - "tokio", ] [[package]] @@ -2516,12 +2515,12 @@ dependencies = [ [[package]] name = "iroh-io" -version = "0.4.0" +version = "0.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1fd67e386f948a6f09e71057b48fff51b6414f0080997495b5bdf2d1bdcdbe46" +checksum = "74d1047ad5ca29ab4ff316b6830d86e7ea52cea54325e4d4a849692e1274b498" dependencies = [ "bytes", - "futures", + "futures-lite", "pin-project", "smallvec", "tokio", diff --git a/iroh-base/Cargo.toml b/iroh-base/Cargo.toml index 24351b4948..b618acf2b7 100644 --- a/iroh-base/Cargo.toml +++ b/iroh-base/Cargo.toml @@ -16,7 +16,7 @@ workspace = true [dependencies] anyhow = { version = "1" } -bao-tree = { version = "0.12", features = ["tokio_fsm", "validate"], default-features = false, optional = true } +bao-tree = { version = "0.13", features = ["tokio_fsm", "validate"], default-features = false, optional = true } data-encoding = { version = "2.3.3", optional = true } hex = "0.4.3" multibase = { version = "0.9.1", optional = true } diff --git a/iroh-bytes/Cargo.toml b/iroh-bytes/Cargo.toml index b76aa570a9..dd87ff71da 100644 --- a/iroh-bytes/Cargo.toml +++ b/iroh-bytes/Cargo.toml @@ -17,7 +17,7 @@ workspace = true [dependencies] anyhow = { version = "1" } -bao-tree = { version = "0.12", features = ["tokio_fsm"], default-features = false } +bao-tree = { version = "0.13", features = ["tokio_fsm"], default-features = false } bytes = { version = "1.4", features = ["serde"] } chrono = "0.4.31" data-encoding = "2.3.3" @@ -28,7 +28,7 @@ futures-buffered = "0.2.4" genawaiter = { version = "0.99.1", features = ["futures03"] } hex = "0.4.3" iroh-base = { version = "0.13.0", features = ["redb"], path = "../iroh-base" } -iroh-io = { version = "0.4.0", features = ["stats"] } +iroh-io = { version = "0.6.0", features = ["stats"] } iroh-metrics = { version = "0.13.0", path = "../iroh-metrics", optional = true } iroh-net = { version = "0.13.0", path = "../iroh-net", optional = true } num_cpus = "1.15.0" diff --git a/iroh-bytes/src/get.rs b/iroh-bytes/src/get.rs index b2cbcf05d1..47cd017932 100644 --- a/iroh-bytes/src/get.rs +++ b/iroh-bytes/src/get.rs @@ -70,8 +70,10 @@ pub mod fsm { BaoTree, ChunkRanges, TreeNode, }; use derive_more::From; - use iroh_io::AsyncSliceWriter; - use tokio::io::{AsyncReadExt, AsyncWriteExt}; + use iroh_io::{AsyncSliceWriter, AsyncStreamReader, TokioStreamReader}; + use tokio::io::AsyncWriteExt; + + type WrappedRecvStream = TrackingReader>; self_cell::self_cell! { struct RangesIterInner { @@ -142,7 +144,7 @@ pub mod fsm { pub async fn next(self) -> Result { let start = Instant::now(); let (writer, reader) = self.connection.open_bi().await?; - let reader = TrackingReader::new(reader); + let reader = TrackingReader::new(TokioStreamReader::new(reader)); let writer = TrackingWriter::new(writer); Ok(AtConnected { start, @@ -157,7 +159,7 @@ pub mod fsm { #[derive(Debug)] pub struct AtConnected { start: Instant, - reader: TrackingReader, + reader: WrappedRecvStream, writer: TrackingWriter, request: GetRequest, } @@ -292,7 +294,7 @@ pub mod fsm { #[derive(Debug)] pub struct AtStartRoot { ranges: ChunkRanges, - reader: TrackingReader, + reader: TrackingReader>, misc: Box, hash: Hash, } @@ -301,7 +303,7 @@ pub mod fsm { #[derive(Debug)] pub struct AtStartChild { ranges: ChunkRanges, - reader: TrackingReader, + reader: TrackingReader>, misc: Box, child_offset: u64, } @@ -376,7 +378,7 @@ pub mod fsm { #[derive(Debug)] pub struct AtBlobHeader { ranges: ChunkRanges, - reader: TrackingReader, + reader: TrackingReader>, misc: Box, hash: Hash, } @@ -412,7 +414,7 @@ pub mod fsm { impl AtBlobHeader { /// Read the size header, returning it and going into the `Content` state. pub async fn next(mut self) -> Result<(AtBlobContent, u64), AtBlobHeaderNextError> { - let size = self.reader.read_u64_le().await.map_err(|cause| { + let size = self.reader.read::<8>().await.map_err(|cause| { if cause.kind() == io::ErrorKind::UnexpectedEof { AtBlobHeaderNextError::NotFound } else if let Some(e) = cause @@ -424,6 +426,7 @@ pub mod fsm { AtBlobHeaderNextError::Io(cause) } })?; + let size = u64::from_le_bytes(size); let stream = ResponseDecoder::new( self.hash.into(), self.ranges, @@ -513,7 +516,7 @@ pub mod fsm { /// State while we are reading content #[derive(Debug)] pub struct AtBlobContent { - stream: ResponseDecoder>, + stream: ResponseDecoder, misc: Box, } @@ -792,7 +795,7 @@ pub mod fsm { /// State after we have read all the content for a blob #[derive(Debug)] pub struct AtEndBlob { - stream: TrackingReader, + stream: WrappedRecvStream, misc: Box, } @@ -826,16 +829,12 @@ pub mod fsm { #[derive(Debug)] pub struct AtClosing { misc: Box, - reader: TrackingReader, + reader: WrappedRecvStream, check_extra_data: bool, } impl AtClosing { - fn new( - misc: Box, - reader: TrackingReader, - check_extra_data: bool, - ) -> Self { + fn new(misc: Box, reader: WrappedRecvStream, check_extra_data: bool) -> Self { Self { misc, reader, @@ -846,7 +845,8 @@ pub mod fsm { /// Finish the get response, returning statistics pub async fn next(self) -> result::Result { // Shut down the stream - let (mut reader, bytes_read) = self.reader.into_parts(); + let (reader, bytes_read) = self.reader.into_parts(); + let mut reader = reader.into_inner(); if self.check_extra_data { if let Some(chunk) = reader.read_chunk(8, false).await? { reader.stop(0u8.into()).ok(); diff --git a/iroh-bytes/src/get/db.rs b/iroh-bytes/src/get/db.rs index 0596775912..f8dbd39ca1 100644 --- a/iroh-bytes/src/get/db.rs +++ b/iroh-bytes/src/get/db.rs @@ -145,7 +145,7 @@ pub async fn valid_ranges(entry: &D::EntryMut) -> anyhow::Result( // wrap the data reader in a tracking reader so we can get some stats for reading let mut tracking_reader = TrackingSliceReader::new(&mut data); // send the root + tw.write(outboard.tree().size().to_le_bytes().as_slice()) + .await?; encode_ranges_validated( &mut tracking_reader, &mut outboard, @@ -490,13 +492,14 @@ pub async fn send_blob( db: &D, name: Hash, ranges: &RangeSpec, - writer: W, + mut writer: W, ) -> Result<(SentStatus, u64, SliceReaderStats)> { match db.get(&name).await? { Some(entry) => { let outboard = entry.outboard().await?; let size = outboard.tree().size(); let mut file_reader = TrackingSliceReader::new(entry.data_reader().await?); + writer.write(size.to_le_bytes().as_slice()).await?; let res = encode_ranges_validated( &mut file_reader, outboard, diff --git a/iroh-bytes/src/store/bao_file.rs b/iroh-bytes/src/store/bao_file.rs index e1fa62e8a6..64adc6adc0 100644 --- a/iroh-bytes/src/store/bao_file.rs +++ b/iroh-bytes/src/store/bao_file.rs @@ -426,7 +426,7 @@ impl AsyncSliceReader for DataReader { .await } - async fn len(&mut self) -> io::Result { + async fn size(&mut self) -> io::Result { with_storage( &mut self.0, BaoFileStorage::is_mem, @@ -458,7 +458,7 @@ impl AsyncSliceReader for OutboardReader { .await } - async fn len(&mut self) -> io::Result { + async fn size(&mut self) -> io::Result { with_storage( &mut self.0, BaoFileStorage::is_mem, @@ -732,9 +732,9 @@ pub mod test_support { BlockSize, ChunkRanges, }; use futures::{Future, Stream, StreamExt}; + use iroh_io::AsyncStreamReader; use rand::RngCore; use range_collections::RangeSet2; - use tokio::io::{AsyncRead, AsyncReadExt}; use crate::util::limited_range; @@ -751,10 +751,11 @@ pub mod test_support { mut target: W, ) -> io::Result<()> where - R: AsyncRead + Unpin, + R: AsyncStreamReader, W: BaoBatchWriter, { - let size = encoded.read_u64_le().await?; + let size = encoded.read::<8>().await?; + let size = u64::from_le_bytes(size); let mut reading = ResponseDecoder::new(root.into(), ranges, BaoTree::new(size, block_size), encoded); let mut stack = Vec::new(); @@ -792,7 +793,8 @@ pub mod test_support { /// Take some data and encode it pub fn simulate_remote(data: &[u8]) -> (Hash, Cursor) { let outboard = bao_tree::io::outboard::PostOrderMemOutboard::create(data, IROH_BLOCK_SIZE); - let mut encoded = Vec::new(); + let size = data.len() as u64; + let mut encoded = size.to_le_bytes().to_vec(); bao_tree::io::sync::encode_ranges_validated( data, &outboard, @@ -823,7 +825,8 @@ pub mod test_support { let chunk_ranges = round_up_to_chunks(&range_set); // compute the outboard let outboard = PostOrderMemOutboard::create(data, IROH_BLOCK_SIZE).flip(); - let mut encoded = Vec::new(); + let size = data.len() as u64; + let mut encoded = size.to_le_bytes().to_vec(); encode_ranges_validated(data, &outboard, &chunk_ranges, &mut encoded).unwrap(); (outboard.root.into(), chunk_ranges, encoded) } @@ -866,8 +869,11 @@ pub mod test_support { #[cfg(test)] mod tests { + use std::io::Write; + use bao_tree::{blake3, ChunkNum, ChunkRanges}; use futures::StreamExt; + use iroh_io::TokioStreamReader; use tests::test_support::{ decode_response_into_batch, local, make_wire_data, random_test_data, trickle, validate, }; @@ -900,7 +906,7 @@ mod tests { let trickle = trickle(&wire_data, 1200, std::time::Duration::from_millis(10)) .map(io::Result::Ok) .boxed(); - let trickle = tokio_util::io::StreamReader::new(trickle); + let trickle = TokioStreamReader::new(tokio_util::io::StreamReader::new(trickle)); let _task = tasks.spawn_local(async move { decode_response_into_batch(hash, IROH_BLOCK_SIZE, chunk_ranges, trickle, file) .await @@ -912,7 +918,7 @@ mod tests { println!( "len {:?} {:?}", handle, - handle.data_reader().len().await.unwrap() + handle.data_reader().size().await.unwrap() ); #[allow(clippy::single_range_in_vec_init)] let ranges = [1024 * 16..1024 * 48]; @@ -920,10 +926,14 @@ mod tests { // let ranges = // let full_chunks = bao_tree::io::full_chunk_groups(); - let encoded = Vec::new(); + let mut encoded = Vec::new(); + let ob = handle.outboard().unwrap(); + encoded + .write_all(ob.tree.size().to_le_bytes().as_slice()) + .unwrap(); bao_tree::io::fsm::encode_ranges_validated( handle.data_reader(), - handle.outboard().unwrap(), + ob, &ChunkRanges::from(ChunkNum(16)..ChunkNum(48)), encoded, ) @@ -957,7 +967,7 @@ mod tests { let trickle = trickle(&wire_data, 1200, std::time::Duration::from_millis(10)) .map(io::Result::Ok) .boxed(); - let trickle = tokio_util::io::StreamReader::new(trickle); + let trickle = TokioStreamReader::new(tokio_util::io::StreamReader::new(trickle)); let task = local.spawn_pinned(move || async move { decode_response_into_batch(hash, IROH_BLOCK_SIZE, chunk_ranges, trickle, file).await }); @@ -969,16 +979,20 @@ mod tests { println!( "len {:?} {:?}", handle, - handle.data_reader().len().await.unwrap() + handle.data_reader().size().await.unwrap() ); #[allow(clippy::single_range_in_vec_init)] let ranges = [0..n]; validate(&handle, &test_data, &ranges).await; - let encoded = Vec::new(); + let mut encoded = Vec::new(); + let ob = handle.outboard().unwrap(); + encoded + .write_all(ob.tree.size().to_le_bytes().as_slice()) + .unwrap(); bao_tree::io::fsm::encode_ranges_validated( handle.data_reader(), - handle.outboard().unwrap(), + ob, &ChunkRanges::all(), encoded, ) @@ -1013,10 +1027,14 @@ mod tests { .unwrap(); validate(&handle, &test_data, &ranges).await; - let encoded = Vec::new(); + let mut encoded = Vec::new(); + let ob = handle.outboard().unwrap(); + encoded + .write_all(ob.tree.size().to_le_bytes().as_slice()) + .unwrap(); bao_tree::io::fsm::encode_ranges_validated( handle.data_reader(), - handle.outboard().unwrap(), + ob, &ChunkRanges::all(), encoded, ) diff --git a/iroh-bytes/src/store/fs/tests.rs b/iroh-bytes/src/store/fs/tests.rs index 7aaff9be7d..5844b78738 100644 --- a/iroh-bytes/src/store/fs/tests.rs +++ b/iroh-bytes/src/store/fs/tests.rs @@ -793,7 +793,7 @@ async fn actor_store_smoke() { hash, IROH_BLOCK_SIZE, chunk_ranges.clone(), - Cursor::new(wire_data), + Cursor::new(wire_data.as_slice()), handle.batch_writer().await.unwrap(), ) .await diff --git a/iroh-bytes/src/store/mem.rs b/iroh-bytes/src/store/mem.rs index 0b65510b75..395485d606 100644 --- a/iroh-bytes/src/store/mem.rs +++ b/iroh-bytes/src/store/mem.rs @@ -298,7 +298,7 @@ impl AsyncSliceReader for DataReader { Ok(self.0.data.read().unwrap().read_data_at(offset, len)) } - async fn len(&mut self) -> std::io::Result { + async fn size(&mut self) -> std::io::Result { Ok(self.0.data.read().unwrap().data_len()) } } @@ -310,7 +310,7 @@ impl AsyncSliceReader for OutboardReader { Ok(self.0.data.read().unwrap().read_outboard_at(offset, len)) } - async fn len(&mut self) -> std::io::Result { + async fn size(&mut self) -> std::io::Result { Ok(self.0.data.read().unwrap().outboard_len()) } } diff --git a/iroh-bytes/src/util/io.rs b/iroh-bytes/src/util/io.rs index 85d9aa408d..2a4b24018d 100644 --- a/iroh-bytes/src/util/io.rs +++ b/iroh-bytes/src/util/io.rs @@ -1,7 +1,8 @@ //! Utilities for working with tokio io +use iroh_io::AsyncStreamReader; use std::{io, pin::Pin, task::Poll}; -use tokio::io::{AsyncRead, AsyncWrite}; +use tokio::io::AsyncWrite; /// A reader that tracks the number of bytes read #[derive(Debug)] @@ -28,23 +29,20 @@ impl TrackingReader { } } -impl AsyncRead for TrackingReader +impl AsyncStreamReader for TrackingReader where - R: AsyncRead + Unpin, + R: AsyncStreamReader, { - fn poll_read( - mut self: Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - buf: &mut tokio::io::ReadBuf<'_>, - ) -> Poll> { - let this = &mut *self; - let filled0 = buf.filled().len(); - let res = Pin::new(&mut this.inner).poll_read(cx, buf); - if let Poll::Ready(Ok(())) = res { - let size = buf.filled().len().saturating_sub(filled0); - this.read = this.read.saturating_add(size as u64); - } - res + async fn read_bytes(&mut self, len: usize) -> io::Result { + let bytes = self.inner.read_bytes(len).await?; + self.read = self.read.saturating_add(bytes.len() as u64); + Ok(bytes) + } + + async fn read(&mut self) -> io::Result<[u8; L]> { + let res = self.inner.read::().await?; + self.read = self.read.saturating_add(L as u64); + Ok(res) } } diff --git a/iroh-cli/Cargo.toml b/iroh-cli/Cargo.toml index 4c4d706969..9a4bf2f31b 100644 --- a/iroh-cli/Cargo.toml +++ b/iroh-cli/Cargo.toml @@ -23,7 +23,7 @@ doc = false [dependencies] anyhow = "1.0.81" -bao-tree = { version = "0.12" } +bao-tree = { version = "0.13" } bytes = "1.5.0" clap = { version = "4", features = ["derive"] } colored = { version = "2.0.4" } diff --git a/iroh/Cargo.toml b/iroh/Cargo.toml index af65ecd65c..83a35d920c 100644 --- a/iroh/Cargo.toml +++ b/iroh/Cargo.toml @@ -17,7 +17,7 @@ workspace = true [dependencies] anyhow = { version = "1" } -bao-tree = { version = "0.12", features = ["tokio_fsm"], default-features = false } +bao-tree = { version = "0.13", features = ["tokio_fsm"], default-features = false } bytes = "1" data-encoding = "2.4.0" derive_more = { version = "1.0.0-beta.1", features = ["debug", "display", "from", "try_into", "from_str"] } @@ -28,7 +28,7 @@ hashlink = "0.8.4" hex = { version = "0.4.3" } iroh-bytes = { version = "0.13.0", path = "../iroh-bytes", features = ["downloader"] } iroh-base = { version = "0.13.0", path = "../iroh-base", features = ["key"] } -iroh-io = { version = "0.4.0", features = ["stats"] } +iroh-io = { version = "0.6.0", features = ["stats"] } iroh-metrics = { version = "0.13.0", path = "../iroh-metrics", optional = true } iroh-net = { version = "0.13.0", path = "../iroh-net" } num_cpus = { version = "1.15.0" } diff --git a/iroh/tests/gc.rs b/iroh/tests/gc.rs index e461695b5f..a63f03d0c5 100644 --- a/iroh/tests/gc.rs +++ b/iroh/tests/gc.rs @@ -1,4 +1,7 @@ -use std::{io::Cursor, time::Duration}; +use std::{ + io::{Cursor, Write}, + time::Duration, +}; use anyhow::Result; use bao_tree::{blake3, io::sync::Outboard, ChunkRanges}; @@ -25,6 +28,9 @@ pub fn create_test_data(size: usize) -> Bytes { pub fn simulate_remote(data: &[u8]) -> (blake3::Hash, Cursor) { let outboard = bao_tree::io::outboard::PostOrderMemOutboard::create(data, IROH_BLOCK_SIZE); let mut encoded = Vec::new(); + encoded + .write_all(outboard.tree.size().to_le_bytes().as_ref()) + .unwrap(); bao_tree::io::sync::encode_ranges_validated(data, &outboard, &ChunkRanges::all(), &mut encoded) .unwrap(); let hash = outboard.root();