diff --git a/cli/Cargo.toml b/cli/Cargo.toml index 377fcce7c..145fb2467 100644 --- a/cli/Cargo.toml +++ b/cli/Cargo.toml @@ -49,6 +49,7 @@ tycho-util = { workspace = true } [dev-dependencies] tycho-collator = { workspace = true, features = ["test"] } +tycho-storage = { workspace = true, features = ["test"] } [build-dependencies] anyhow = { workspace = true } diff --git a/consensus/Cargo.toml b/consensus/Cargo.toml index ba58bff4d..f5975048e 100644 --- a/consensus/Cargo.toml +++ b/consensus/Cargo.toml @@ -40,7 +40,7 @@ tracing-appender = { workspace = true } # local deps tycho-network = { workspace = true } tycho-storage = { workspace = true } -tycho-util = { workspace = true, features = ["test"] } +tycho-util = { workspace = true } [dev-dependencies] parking_lot = { workspace = true, features = ["deadlock_detection"] } @@ -51,5 +51,7 @@ tikv-jemallocator = { workspace = true, features = [ "background_threads", ]} +tycho-util = { workspace = true, features = ["test"] } + [lints] workspace = true diff --git a/consensus/src/engine/engine.rs b/consensus/src/engine/engine.rs index 8e2a2cb08..f1a17d39b 100644 --- a/consensus/src/engine/engine.rs +++ b/consensus/src/engine/engine.rs @@ -30,7 +30,7 @@ pub struct Engine { top_dag_round: Arc>, tasks: JoinSet<()>, // should be JoinSet committed: UnboundedSender<(Arc, Vec>)>, - input_buffer: Box, + input_buffer: Box, } impl Engine { @@ -39,7 +39,7 @@ impl Engine { dht_client: &DhtClient, overlay_service: &OverlayService, committed: UnboundedSender<(Arc, Vec>)>, - input_buffer: impl InputBuffer + Send + 'static, + input_buffer: impl InputBuffer, ) -> Self { let log_id = Arc::new(format!("{:?}", PeerId::from(key_pair.public_key).ugly())); let peer_schedule = Arc::new(PeerSchedule::new(key_pair)); diff --git a/consensus/src/engine/input_buffer.rs b/consensus/src/engine/input_buffer.rs index 1675028fd..ef7717e30 100644 --- a/consensus/src/engine/input_buffer.rs +++ b/consensus/src/engine/input_buffer.rs @@ -1,5 +1,4 @@ use std::collections::VecDeque; -use std::mem; use std::sync::Arc; use async_trait::async_trait; @@ -11,7 +10,7 @@ use tokio::task::JoinHandle; use crate::engine::MempoolConfig; #[async_trait] -pub trait InputBuffer { +pub trait InputBuffer: Send + 'static { /// `only_fresh = false` to repeat the same elements if they are still buffered, /// use in case last round failed async fn fetch(&mut self, only_fresh: bool) -> Vec; @@ -22,58 +21,73 @@ pub struct InputBufferImpl { consumer: Option>, } -struct InputBufferImplInner { - abort: Arc, - externals: mpsc::UnboundedReceiver, - data: VecDeque, - data_bytes: usize, - offset_elements: usize, +impl InputBufferImpl { + pub fn new(externals: mpsc::UnboundedReceiver) -> Self { + let abort = Arc::new(Notify::new()); + let inner = InputBufferImplInner { + externals, + data: Default::default(), + }; + + Self { + consumer: Some(tokio::spawn(inner.consume(abort.clone()))), + abort, + } + } +} + +impl Drop for InputBufferImpl { + fn drop(&mut self) { + if let Some(handle) = self.consumer.take() { + handle.abort(); + } + } } #[async_trait] impl InputBuffer for InputBufferImpl { async fn fetch(&mut self, only_fresh: bool) -> Vec { - self.abort.notify_one(); - let handle = mem::take(&mut self.consumer).expect("consumer must be set"); + self.abort.notify_waiters(); + let handle = self.consumer.take().expect("consumer must be set"); let mut inner = handle.await.expect("consumer failed"); + if only_fresh { - inner.commit_offset(); + inner.data.commit_offset(); } - let result = inner.fetch(); - self.consumer = Some(tokio::spawn(inner.consume())); + let result = inner.data.fetch(); + + self.consumer = Some(tokio::spawn(inner.consume(self.abort.clone()))); result } } -impl InputBufferImpl { - pub fn new(externals: mpsc::UnboundedReceiver) -> Self { - let abort = Arc::new(Notify::new()); - let inner = InputBufferImplInner { - externals, - abort: abort.clone(), - data: VecDeque::new(), - data_bytes: 0, - offset_elements: 0, - }; - Self { - abort, - consumer: Some(tokio::spawn(inner.consume())), - } - } +struct InputBufferImplInner { + externals: mpsc::UnboundedReceiver, + data: InputBufferData, } impl InputBufferImplInner { - async fn consume(mut self) -> Self { + async fn consume(mut self, abort: Arc) -> Self { + let mut notified = std::pin::pin!(abort.notified()); loop { tokio::select! { - () = self.abort.notified() => break self, - recieved = self.externals.recv() => match recieved { - Some(payload) => self.add(payload), - None => panic!("externals input channel to mempool is closed") + _ = &mut notified => break self, + payload = self.externals.recv() => { + self.data.add(payload.expect("externals input channel to mempool is closed")); }, } } } +} + +#[derive(Default)] +struct InputBufferData { + data: VecDeque, + data_bytes: usize, + offset_elements: usize, +} + +impl InputBufferData { fn fetch(&mut self) -> Vec { let mut taken_bytes = 0; let result = self @@ -88,20 +102,7 @@ impl InputBufferImplInner { self.offset_elements = result.len(); // overwrite result } - fn commit_offset(&mut self) { - let initial_capacity = self.data.capacity(); - let committed_bytes: usize = self - .data - .drain(..self.offset_elements) - .map(|comitted_bytes| comitted_bytes.len()) - .sum(); - self.update_capacity(initial_capacity); - self.data_bytes = self - .data_bytes - .checked_sub(committed_bytes) - .expect("decrease buffered data size on commit offset"); - self.offset_elements = 0; - } + fn add(&mut self, payload: Bytes) { let payload_bytes = payload.len(); assert!( @@ -111,9 +112,9 @@ impl InputBufferImplInner { or filter out insanely large messages prior sending them to mempool", MempoolConfig::PAYLOAD_BUFFER_BYTES ); - let max_used_bytes = MempoolConfig::PAYLOAD_BUFFER_BYTES - payload_bytes; - if self.data_bytes > max_used_bytes { - let initial_capacity = self.data.capacity(); + + let max_data_bytes = MempoolConfig::PAYLOAD_BUFFER_BYTES - payload_bytes; + if self.data_bytes > max_data_bytes { let to_drop = self .data .iter() @@ -122,20 +123,42 @@ impl InputBufferImplInner { .data_bytes .checked_sub(evicted.len()) .expect("decrease buffered data size on eviction"); - self.data_bytes > max_used_bytes + self.data_bytes > max_data_bytes }) .count(); + self.offset_elements = self.offset_elements.saturating_sub(to_drop); _ = self.data.drain(..to_drop); - self.update_capacity(initial_capacity); } + self.data_bytes += payload_bytes; self.data.push_back(payload); } - fn update_capacity(&mut self, initial_capacity: usize) { + + fn commit_offset(&mut self) { + let committed_bytes: usize = self + .data + .drain(..self.offset_elements) + .map(|comitted_bytes| comitted_bytes.len()) + .sum(); + + self.update_capacity(); + + self.data_bytes = self + .data_bytes + .checked_sub(committed_bytes) + .expect("decrease buffered data size on commit offset"); + + self.offset_elements = 0; + } + + /// Ensures that the capacity is not too large. + fn update_capacity(&mut self) { + let len = self.data.len(); + // because reallocation on adding elements doubles the capacity - if self.data.capacity() < initial_capacity / 4 { - self.data.shrink_to(initial_capacity / 2); + if self.data.capacity() >= len * 4 { + self.data.shrink_to(len / 2); } } } @@ -147,8 +170,9 @@ pub struct InputBufferStub { } impl InputBufferStub { - /// external message is limited by 64 KiB + /// External message is limited by 64 KiB const EXTERNAL_MSG_MAX_BYTES: usize = 64 * 1024; + pub fn new(fetches_in_step: usize, steps_until_full: usize) -> Self { Self { fetch_count: 0,