Skip to content

Commit

Permalink
refactor(consensus): fix input buffer for externals
Browse files Browse the repository at this point in the history
  • Loading branch information
Rexagon committed May 17, 2024
1 parent 5424d60 commit c069a45
Show file tree
Hide file tree
Showing 4 changed files with 86 additions and 59 deletions.
1 change: 1 addition & 0 deletions cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ tycho-util = { workspace = true }

[dev-dependencies]
tycho-collator = { workspace = true, features = ["test"] }
tycho-storage = { workspace = true, features = ["test"] }

[build-dependencies]
anyhow = { workspace = true }
Expand Down
4 changes: 3 additions & 1 deletion consensus/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ tracing-appender = { workspace = true }
# local deps
tycho-network = { workspace = true }
tycho-storage = { workspace = true }
tycho-util = { workspace = true, features = ["test"] }
tycho-util = { workspace = true }

[dev-dependencies]
parking_lot = { workspace = true, features = ["deadlock_detection"] }
Expand All @@ -51,5 +51,7 @@ tikv-jemallocator = { workspace = true, features = [
"background_threads",
]}

tycho-util = { workspace = true, features = ["test"] }

[lints]
workspace = true
4 changes: 2 additions & 2 deletions consensus/src/engine/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ pub struct Engine {
top_dag_round: Arc<RwLock<DagRound>>,
tasks: JoinSet<()>, // should be JoinSet<!>
committed: UnboundedSender<(Arc<Point>, Vec<Arc<Point>>)>,
input_buffer: Box<dyn InputBuffer + Send>,
input_buffer: Box<dyn InputBuffer>,
}

impl Engine {
Expand All @@ -39,7 +39,7 @@ impl Engine {
dht_client: &DhtClient,
overlay_service: &OverlayService,
committed: UnboundedSender<(Arc<Point>, Vec<Arc<Point>>)>,
input_buffer: impl InputBuffer + Send + 'static,
input_buffer: impl InputBuffer,
) -> Self {
let log_id = Arc::new(format!("{:?}", PeerId::from(key_pair.public_key).ugly()));
let peer_schedule = Arc::new(PeerSchedule::new(key_pair));
Expand Down
136 changes: 80 additions & 56 deletions consensus/src/engine/input_buffer.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use std::collections::VecDeque;
use std::mem;
use std::sync::Arc;

use async_trait::async_trait;
Expand All @@ -11,7 +10,7 @@ use tokio::task::JoinHandle;
use crate::engine::MempoolConfig;

#[async_trait]
pub trait InputBuffer {
pub trait InputBuffer: Send + 'static {
/// `only_fresh = false` to repeat the same elements if they are still buffered,
/// use in case last round failed
async fn fetch(&mut self, only_fresh: bool) -> Vec<Bytes>;
Expand All @@ -22,58 +21,73 @@ pub struct InputBufferImpl {
consumer: Option<JoinHandle<InputBufferImplInner>>,
}

struct InputBufferImplInner {
abort: Arc<Notify>,
externals: mpsc::UnboundedReceiver<Bytes>,
data: VecDeque<Bytes>,
data_bytes: usize,
offset_elements: usize,
impl InputBufferImpl {
pub fn new(externals: mpsc::UnboundedReceiver<Bytes>) -> Self {
let abort = Arc::new(Notify::new());
let inner = InputBufferImplInner {
externals,
data: Default::default(),
};

Self {
consumer: Some(tokio::spawn(inner.consume(abort.clone()))),
abort,
}
}
}

impl Drop for InputBufferImpl {
fn drop(&mut self) {
if let Some(handle) = self.consumer.take() {
handle.abort();
}
}
}

#[async_trait]
impl InputBuffer for InputBufferImpl {
async fn fetch(&mut self, only_fresh: bool) -> Vec<Bytes> {
self.abort.notify_one();
let handle = mem::take(&mut self.consumer).expect("consumer must be set");
self.abort.notify_waiters();
let handle = self.consumer.take().expect("consumer must be set");
let mut inner = handle.await.expect("consumer failed");

if only_fresh {
inner.commit_offset();
inner.data.commit_offset();
}
let result = inner.fetch();
self.consumer = Some(tokio::spawn(inner.consume()));
let result = inner.data.fetch();

self.consumer = Some(tokio::spawn(inner.consume(self.abort.clone())));
result
}
}

impl InputBufferImpl {
pub fn new(externals: mpsc::UnboundedReceiver<Bytes>) -> Self {
let abort = Arc::new(Notify::new());
let inner = InputBufferImplInner {
externals,
abort: abort.clone(),
data: VecDeque::new(),
data_bytes: 0,
offset_elements: 0,
};
Self {
abort,
consumer: Some(tokio::spawn(inner.consume())),
}
}
struct InputBufferImplInner {
externals: mpsc::UnboundedReceiver<Bytes>,
data: InputBufferData,
}

impl InputBufferImplInner {
async fn consume(mut self) -> Self {
async fn consume(mut self, abort: Arc<Notify>) -> Self {
let mut notified = std::pin::pin!(abort.notified());
loop {
tokio::select! {
() = self.abort.notified() => break self,
recieved = self.externals.recv() => match recieved {
Some(payload) => self.add(payload),
None => panic!("externals input channel to mempool is closed")
_ = &mut notified => break self,
payload = self.externals.recv() => {
self.data.add(payload.expect("externals input channel to mempool is closed"));
},
}
}
}
}

#[derive(Default)]
struct InputBufferData {
data: VecDeque<Bytes>,
data_bytes: usize,
offset_elements: usize,
}

impl InputBufferData {
fn fetch(&mut self) -> Vec<Bytes> {
let mut taken_bytes = 0;
let result = self
Expand All @@ -88,20 +102,7 @@ impl InputBufferImplInner {
self.offset_elements = result.len(); // overwrite
result
}
fn commit_offset(&mut self) {
let initial_capacity = self.data.capacity();
let committed_bytes: usize = self
.data
.drain(..self.offset_elements)
.map(|comitted_bytes| comitted_bytes.len())
.sum();
self.update_capacity(initial_capacity);
self.data_bytes = self
.data_bytes
.checked_sub(committed_bytes)
.expect("decrease buffered data size on commit offset");
self.offset_elements = 0;
}

fn add(&mut self, payload: Bytes) {
let payload_bytes = payload.len();
assert!(
Expand All @@ -111,9 +112,9 @@ impl InputBufferImplInner {
or filter out insanely large messages prior sending them to mempool",
MempoolConfig::PAYLOAD_BUFFER_BYTES
);
let max_used_bytes = MempoolConfig::PAYLOAD_BUFFER_BYTES - payload_bytes;
if self.data_bytes > max_used_bytes {
let initial_capacity = self.data.capacity();

let max_data_bytes = MempoolConfig::PAYLOAD_BUFFER_BYTES - payload_bytes;
if self.data_bytes > max_data_bytes {
let to_drop = self
.data
.iter()
Expand All @@ -122,20 +123,42 @@ impl InputBufferImplInner {
.data_bytes
.checked_sub(evicted.len())
.expect("decrease buffered data size on eviction");
self.data_bytes > max_used_bytes
self.data_bytes > max_data_bytes
})
.count();

self.offset_elements = self.offset_elements.saturating_sub(to_drop);
_ = self.data.drain(..to_drop);
self.update_capacity(initial_capacity);
}

self.data_bytes += payload_bytes;
self.data.push_back(payload);
}
fn update_capacity(&mut self, initial_capacity: usize) {

fn commit_offset(&mut self) {
let committed_bytes: usize = self
.data
.drain(..self.offset_elements)
.map(|comitted_bytes| comitted_bytes.len())
.sum();

self.update_capacity();

self.data_bytes = self
.data_bytes
.checked_sub(committed_bytes)
.expect("decrease buffered data size on commit offset");

self.offset_elements = 0;
}

/// Ensures that the capacity is not too large.
fn update_capacity(&mut self) {
let len = self.data.len();

// because reallocation on adding elements doubles the capacity
if self.data.capacity() < initial_capacity / 4 {
self.data.shrink_to(initial_capacity / 2);
if self.data.capacity() >= len * 4 {
self.data.shrink_to(len / 2);
}
}
}
Expand All @@ -147,8 +170,9 @@ pub struct InputBufferStub {
}

impl InputBufferStub {
/// external message is limited by 64 KiB
/// External message is limited by 64 KiB
const EXTERNAL_MSG_MAX_BYTES: usize = 64 * 1024;

pub fn new(fetches_in_step: usize, steps_until_full: usize) -> Self {
Self {
fetch_count: 0,
Expand Down

0 comments on commit c069a45

Please sign in to comment.