From c77a83256e75bfc67460e3b7eeba0507cf1b2a2c Mon Sep 17 00:00:00 2001 From: Daniel Knopik Date: Wed, 27 Nov 2024 13:32:29 +0100 Subject: [PATCH 01/12] add draft of processor --- Cargo.lock | 12 +++ Cargo.toml | 7 +- anchor/client/Cargo.toml | 1 + anchor/client/src/config.rs | 3 + anchor/client/src/lib.rs | 3 + anchor/processor/Cargo.toml | 12 +++ anchor/processor/src/lib.rs | 145 ++++++++++++++++++++++++++++++++++++ 7 files changed, 181 insertions(+), 2 deletions(-) create mode 100644 anchor/processor/Cargo.toml create mode 100644 anchor/processor/src/lib.rs diff --git a/Cargo.lock b/Cargo.lock index 7b414109..9508fa0b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1191,6 +1191,7 @@ dependencies = [ "hyper 1.4.1", "network", "parking_lot 0.12.3", + "processor", "sensitive_url 0.1.0 (git+https://github.com/agemanning/lighthouse?branch=modularize-vc)", "serde", "strum", @@ -5808,6 +5809,17 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "processor" +version = "0.1.0" +dependencies = [ + "num_cpus", + "serde", + "task_executor 0.1.0 (git+https://github.com/sigp/lighthouse?branch=unstable)", + "tokio", + "tracing", +] + [[package]] name = "procfs" version = "0.15.1" diff --git a/Cargo.toml b/Cargo.toml index bbdc34b7..ed19e524 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -3,11 +3,12 @@ members = [ "anchor", "anchor/client", + "anchor/common/version", "anchor/http_api", "anchor/http_metrics", - "anchor/qbft", "anchor/network", - "anchor/common/version" + "anchor/processor", + "anchor/qbft", ] resolver = "2" @@ -21,6 +22,7 @@ http_api = { path = "anchor/http_api" } http_metrics = { path = "anchor/http_metrics" } network = { path ="anchor/network"} version = { path ="anchor/common/version"} +processor = { path = "anchor/processor" } lighthouse_network = { git = "https://github.com/sigp/lighthouse", branch = "unstable"} task_executor = { git = "https://github.com/sigp/lighthouse", branch = "unstable", default-features = false, features = [ "tracing", ] } metrics = { git = "https://github.com/agemanning/lighthouse", branch = "modularize-vc" } @@ -38,6 +40,7 @@ either = "1.13.0" futures = "0.3.30" tower-http = {version = "0.6", features = ["cors"] } hyper = "1.4" +num_cpus = "1" parking_lot = "0.12" serde = { version = "1.0.208", features = ["derive"] } strum = { version = "0.24", features = ["derive"] } diff --git a/anchor/client/Cargo.toml b/anchor/client/Cargo.toml index d96eca85..2b1e53eb 100644 --- a/anchor/client/Cargo.toml +++ b/anchor/client/Cargo.toml @@ -24,6 +24,7 @@ network = { workspace = true } unused_port = { workspace = true } tokio = { workspace = true } parking_lot = { workspace = true } +processor = { workspace = true } # Local dependencies fdlimit = "0.3" ethereum_hashing = "0.7.0" diff --git a/anchor/client/src/config.rs b/anchor/client/src/config.rs index a1e0f9cf..9cb8e1ea 100644 --- a/anchor/client/src/config.rs +++ b/anchor/client/src/config.rs @@ -46,6 +46,8 @@ pub struct Config { /// A list of custom certificates that the validator client will additionally use when /// connecting to an execution node over SSL/TLS. pub execution_nodes_tls_certs: Option>, + /// Configuration for the processor + pub processor: processor::Config, } impl Default for Config { @@ -74,6 +76,7 @@ impl Default for Config { network: <_>::default(), beacon_nodes_tls_certs: None, execution_nodes_tls_certs: None, + processor: <_>::default(), } } } diff --git a/anchor/client/src/lib.rs b/anchor/client/src/lib.rs index e6ad2758..f287ac71 100644 --- a/anchor/client/src/lib.rs +++ b/anchor/client/src/lib.rs @@ -42,6 +42,9 @@ impl Client { "Starting the Anchor client" ); + // Start the processor + let processor_senders = processor::spawn(config.processor, executor.clone()).await; + // Optionally start the metrics server. let _http_metrics_shared_state = if config.http_metrics.enabled { let shared_state = Arc::new(RwLock::new(http_metrics::Shared { genesis_time: None })); diff --git a/anchor/processor/Cargo.toml b/anchor/processor/Cargo.toml new file mode 100644 index 00000000..1b054756 --- /dev/null +++ b/anchor/processor/Cargo.toml @@ -0,0 +1,12 @@ +[package] +name = "processor" +version = "0.1.0" +authors = ["Sigma Prime Self { + Self { + max_workers: num_cpus::get(), + } + } +} + +pub struct Sender { + name: &'static str, + tx: mpsc::Sender, +} + +impl Sender { + fn new(name: &'static str, tx: mpsc::Sender) -> Self { + Self { name, tx } + } + + pub fn send_async(&mut self, future: AsyncFn) { + self.send_work_item(WorkItem::new_async(self.name, future)); + } + + pub fn send_blocking(&mut self, func: BlockingFn) { + self.send_work_item(WorkItem::new_blocking(self.name, func)); + } + + pub fn send_work_item(&mut self, item: WorkItem) { + if let Err(err) = self.tx.try_send(item) { + match err { + TrySendError::Full(item) => { + warn!(task = item.name, "Processor queue full") + } + TrySendError::Closed(_) => { + error!("Processor queue closed unexpectedly") + } + } + } + } +} + +pub struct Senders { + example_tx: Sender, + // todo add all the needed queues here +} + +struct Receivers { + example_rx: mpsc::Receiver, + // todo add all the needed queues here +} + +pub type AsyncFn = Pin + Send + Sync>>; +pub type BlockingFn = Box; + +enum AsyncOrBlocking { + Async(AsyncFn), + Blocking(BlockingFn), +} +pub struct WorkItem { + name: &'static str, + func: AsyncOrBlocking, +} + +impl WorkItem { + pub fn new_async(name: &'static str, func: AsyncFn) -> Self { + Self { + name, + func: AsyncOrBlocking::Async(func), + } + } + + pub fn new_blocking(name: &'static str, func: BlockingFn) -> Self { + Self { + name, + func: AsyncOrBlocking::Blocking(func), + } + } +} + +pub async fn spawn(config: Config, executor: TaskExecutor) -> Senders { + // todo macro? just specifying name and capacity? + let (example_tx, example_rx) = mpsc::channel(1000); + + let senders = Senders { + example_tx: Sender::new("example", example_tx), + }; + let receivers = Receivers { example_rx }; + + executor.spawn(processor(config, receivers, executor.clone()), "processor"); + senders +} + +async fn processor(config: Config, mut receivers: Receivers, executor: TaskExecutor) { + // TODO: consider having separate limits for blocking and async? + let semaphore = Arc::new(Semaphore::new(config.max_workers)); + + loop { + let Ok(permit) = semaphore.clone().acquire_owned().await else { + error!("Processor semaphore closed unexpectedly"); + break; + }; + + let work_item = select! { + biased; + Some(w) = receivers.example_rx.recv() => w, + else => { + error!("Processor queues closed unexpectedly"); + break; + } + }; + + match work_item.func { + AsyncOrBlocking::Async(async_fn) => executor.spawn( + async move { + async_fn.await; + drop(permit); + }, + work_item.name, + ), + AsyncOrBlocking::Blocking(blocking_fn) => { + executor.spawn_blocking( + move || { + blocking_fn(); + drop(permit); + }, + work_item.name, + ); + } + } + } +} From 070dd27949c9a534a591d2bd86e97dd4e8d4d462 Mon Sep 17 00:00:00 2001 From: Daniel Knopik Date: Mon, 2 Dec 2024 16:27:04 +0100 Subject: [PATCH 02/12] add two experiments for fully modular processor add metrics to simple example --- Cargo.lock | 1 + anchor/processor/Cargo.toml | 3 +- anchor/processor/examples/experiment.rs | 111 ++++++++++++++ anchor/processor/examples/experiment2.rs | 137 ++++++++++++++++++ .../src/experiment/earliest_deadline.rs | 19 +++ anchor/processor/src/experiment/mod.rs | 115 +++++++++++++++ .../src/experiment2/earliest_deadline.rs | 18 +++ anchor/processor/src/experiment2/mod.rs | 114 +++++++++++++++ anchor/processor/src/lib.rs | 44 +++++- anchor/processor/src/metrics.rs | 60 ++++++++ 10 files changed, 613 insertions(+), 9 deletions(-) create mode 100644 anchor/processor/examples/experiment.rs create mode 100644 anchor/processor/examples/experiment2.rs create mode 100644 anchor/processor/src/experiment/earliest_deadline.rs create mode 100644 anchor/processor/src/experiment/mod.rs create mode 100644 anchor/processor/src/experiment2/earliest_deadline.rs create mode 100644 anchor/processor/src/experiment2/mod.rs create mode 100644 anchor/processor/src/metrics.rs diff --git a/Cargo.lock b/Cargo.lock index 9508fa0b..e1213bdf 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5813,6 +5813,7 @@ dependencies = [ name = "processor" version = "0.1.0" dependencies = [ + "metrics 0.2.0 (git+https://github.com/agemanning/lighthouse?branch=modularize-vc)", "num_cpus", "serde", "task_executor 0.1.0 (git+https://github.com/sigp/lighthouse?branch=unstable)", diff --git a/anchor/processor/Cargo.toml b/anchor/processor/Cargo.toml index 1b054756..a01f33ec 100644 --- a/anchor/processor/Cargo.toml +++ b/anchor/processor/Cargo.toml @@ -5,8 +5,9 @@ authors = ["Sigma Prime , + something_queue: VecDeque, +} + +impl Scheduler for AnchorWorkQueues { + fn received(&mut self, work_item: AnchorWork) -> Option { + match &work_item { + AnchorWork::StartQBFTInstance { .. } => self.start_queue.push_back(work_item), + AnchorWork::SomethingQBFTInstance { .. } => self.something_queue.push_back(work_item), + } + None + } + + async fn next_task(&mut self) -> Option { + self.start_queue + .pop_front() + .or_else(|| self.something_queue.pop_front()) + } +} + +#[derive(Default)] +struct QBFTStore { + instances: HashMap>, +} + +enum AnchorWork { + StartQBFTInstance { + round: i64, + }, + SomethingQBFTInstance { + round: i64, + } +} + +impl Work for AnchorWork { + fn run(self, state: &mut QBFTStore, runner: TaskRunner) { + match self { + AnchorWork::StartQBFTInstance { round } => { + let (tx, mut rx) = mpsc::channel(10); + runner.run_future(async move { + println!("StartQBFTInstance"); + rx.recv().await.unwrap(); + }); + state.instances.insert(round, tx); + } + AnchorWork::SomethingQBFTInstance { round } => { + runner.run_immediate(|drop_on_finish| { + state + .instances + .get(&round) + .unwrap() + .try_send((42, drop_on_finish)) + .unwrap() + }) + } + } + } + + fn kind_name(&self) -> &'static str { + match self { + AnchorWork::StartQBFTInstance { .. } => "StartQBFTInstance", + AnchorWork::SomethingQBFTInstance { .. } => "SomethingQBFTInstance", + } + } +} + +impl DeadlinedWork for AnchorWork { + fn get_deadline(&self) -> i64 { + match self { + AnchorWork::StartQBFTInstance { round } => *round, + AnchorWork::SomethingQBFTInstance { round } => *round, + } + } +} + +#[tokio::main] +async fn main() { + let sched = blackbox(); + let executor = blackbox(); + + let tx = if sched { + spawn::( + Config::default(), + DeadlineScheduler, + executor, + ) + .await + } else { + spawn::( + Config::default(), + AnchorWorkQueues::default(), + executor, + ) + .await + }; + + let _ = tx.send(AnchorWork::StartQBFTInstance { round: 0 }).await; + let _ = tx.send(AnchorWork::SomethingQBFTInstance { round: 0 }).await; +} + +fn blackbox() -> T { + unimplemented!() +} diff --git a/anchor/processor/examples/experiment2.rs b/anchor/processor/examples/experiment2.rs new file mode 100644 index 00000000..92317b43 --- /dev/null +++ b/anchor/processor/examples/experiment2.rs @@ -0,0 +1,137 @@ +use processor::experiment2::earliest_deadline::{DeadlineScheduler, DeadlinedWork}; +use processor::experiment2::{spawn, DropOnFinish, Scheduler, TaskRunner, Work}; +use processor::Config; +use std::collections::{HashMap, VecDeque}; +use tokio::sync::mpsc; + +trait AnchorQueueable { + fn queue_kind(&self) -> AnchorQueueKinds; +} + +enum AnchorQueueKinds { + StartQbftQueue, + SomethingQbftQueue, +} + +#[derive(Default)] +struct AnchorWorkQueues { + start_queue: VecDeque>, + something_queue: VecDeque>, +} + +impl Scheduler for AnchorWorkQueues { + fn received(&mut self, work_item: Box) -> Option> { + match work_item.queue_kind() { + AnchorQueueKinds::StartQbftQueue => self.start_queue.push_back(work_item), + AnchorQueueKinds::SomethingQbftQueue => self.something_queue.push_back(work_item), + } + None + } + + async fn next_task(&mut self) -> Option> { + self.start_queue + .pop_front() + .or_else(|| self.something_queue.pop_front()) + } +} + +trait AnchorWork: DeadlinedWork + AnchorQueueable + Work {} +impl> AnchorWork for T {} + +#[derive(Default)] +struct QBFTStore { + instances: HashMap>, +} + +struct StartQBFTInstance { + round: i64, +} + +impl Work for StartQBFTInstance { + type State = QBFTStore; + + fn run(self: Box, state: &mut QBFTStore, runner: TaskRunner) { + let (tx, mut rx) = mpsc::channel(10); + runner.run_future(async move { + println!("StartQBFTInstance"); + rx.recv().await.unwrap(); + }); + state.instances.insert(self.round, tx); + } + + fn kind_name(&self) -> &'static str { + "start_qbft_instance" + } +} +impl DeadlinedWork for StartQBFTInstance { + fn get_deadline(&self) -> i64 { + self.round + } +} +impl AnchorQueueable for StartQBFTInstance { + fn queue_kind(&self) -> AnchorQueueKinds { + AnchorQueueKinds::StartQbftQueue + } +} + +struct SomethingQBFTInstance { + round: i64, +} + +impl Work for SomethingQBFTInstance { + type State = QBFTStore; + + fn run(self: Box, state: &mut QBFTStore, runner: TaskRunner) { + runner.run_immediate(|drop_on_finish| { + state + .instances + .get(&self.round) + .unwrap() + .try_send((*self, drop_on_finish)) + .unwrap() + }) + } + + fn kind_name(&self) -> &'static str { + "start_qbft_instance" + } +} +impl DeadlinedWork for SomethingQBFTInstance { + fn get_deadline(&self) -> i64 { + self.round + } +} +impl AnchorQueueable for SomethingQBFTInstance { + fn queue_kind(&self) -> AnchorQueueKinds { + AnchorQueueKinds::SomethingQbftQueue + } +} + +#[tokio::main] +async fn main() { + let sched = blackbox(); + let executor = blackbox(); + + let tx = if sched { + spawn::( + Config::default(), + DeadlineScheduler, + executor, + ) + .await + } else { + spawn::( + Config::default(), + AnchorWorkQueues::default(), + executor, + ) + .await + }; + + let _ = tx.send(Box::new(StartQBFTInstance { round: 0 })).await; + let _ = tx.send(Box::new(SomethingQBFTInstance { round: 0 })).await; +} + +fn blackbox() -> T { + unimplemented!() +} diff --git a/anchor/processor/src/experiment/earliest_deadline.rs b/anchor/processor/src/experiment/earliest_deadline.rs new file mode 100644 index 00000000..c35875ca --- /dev/null +++ b/anchor/processor/src/experiment/earliest_deadline.rs @@ -0,0 +1,19 @@ +use crate::experiment::Scheduler; + +pub trait DeadlinedWork { + fn get_deadline(&self) -> i64; +} + +#[derive(Default)] +pub struct DeadlineScheduler; + +impl Scheduler for DeadlineScheduler { + fn received(&mut self, _work_item: W) -> Option { + todo!() + } + + async fn next_task(&mut self) -> Option { + todo!() + } +} + diff --git a/anchor/processor/src/experiment/mod.rs b/anchor/processor/src/experiment/mod.rs new file mode 100644 index 00000000..f1df9d88 --- /dev/null +++ b/anchor/processor/src/experiment/mod.rs @@ -0,0 +1,115 @@ +pub mod earliest_deadline; + +use crate::Config; +use std::future::Future; +use std::sync::Arc; +use task_executor::TaskExecutor; +use tokio::select; +use tokio::sync::{mpsc, OwnedSemaphorePermit, Semaphore}; +use tracing::error; + +pub struct DropOnFinish { + _permit: OwnedSemaphorePermit, + //_worker_timer: Option, +} + +pub trait Work: Send { + + fn run(self, state: &mut T, runner: TaskRunner); + fn kind_name(&self) -> &'static str; +} + +pub struct TaskRunner<'a> { + executor: &'a TaskExecutor, + drop_on_finish: DropOnFinish, + name: &'static str, +} + +impl TaskRunner<'_> { + pub fn run_future(self, future: impl Future + Send + 'static) { + self.executor.spawn( + async move { + future.await; + drop(self.drop_on_finish) + }, + self.name, + ); + } + + pub fn run_blocking(self, function: impl FnOnce() + Send + 'static) { + self.executor.spawn_blocking( + || { + function(); + drop(self.drop_on_finish) + }, + self.name, + ); + } + + pub fn run_immediate(self, function: impl FnOnce(DropOnFinish)) { + function(self.drop_on_finish); + } +} + +pub trait Scheduler: Send { + fn received(&mut self, work_item: W) -> Option; + fn next_task(&mut self) -> impl std::future::Future> + Send; +} + +pub async fn spawn( + config: Config, + scheduler: S, + executor: TaskExecutor, +) -> mpsc::Sender +where + W: Work + 'static, + S: Scheduler + Default + 'static, + T: Default + Send + 'static, +{ + let (tx, rx) = mpsc::channel(1000); + executor.spawn( + processor(config, rx, scheduler, executor.clone()), + "processor", + ); + tx +} + +async fn processor( + config: Config, + mut rx: mpsc::Receiver, + mut scheduler: S, + executor: TaskExecutor, +) where + W: Work, + S: Scheduler, + T: Default, +{ + // TODO: consider having separate limits for blocking and async? + let semaphore = Arc::new(Semaphore::new(config.max_workers)); + let mut state = T::default(); + + loop { + let Ok(permit) = semaphore.clone().acquire_owned().await else { + error!("Processor semaphore closed unexpectedly"); + break; + }; + + let work_item = loop { + if let Some(work_item) = select! { + biased; + Some(work_item) = rx.recv() => scheduler.received(work_item), + Some(work_item) = scheduler.next_task() => Some(work_item), + else => return, + } { + break work_item; + } + }; + + let runner = TaskRunner { + executor: &executor, + drop_on_finish: DropOnFinish { _permit: permit }, + name: work_item.kind_name(), + }; + work_item.run(&mut state, runner); + } +} diff --git a/anchor/processor/src/experiment2/earliest_deadline.rs b/anchor/processor/src/experiment2/earliest_deadline.rs new file mode 100644 index 00000000..0eea1133 --- /dev/null +++ b/anchor/processor/src/experiment2/earliest_deadline.rs @@ -0,0 +1,18 @@ +use crate::experiment2::{Scheduler, Work}; + +pub trait DeadlinedWork: Work { + fn get_deadline(&self) -> i64; +} + +#[derive(Default)] +pub struct DeadlineScheduler; + +impl Scheduler for DeadlineScheduler { + fn received(&mut self, _work_item: Box) -> Option> { + todo!() + } + + async fn next_task(&mut self) -> Option> { + todo!() + } +} diff --git a/anchor/processor/src/experiment2/mod.rs b/anchor/processor/src/experiment2/mod.rs new file mode 100644 index 00000000..4dec09d3 --- /dev/null +++ b/anchor/processor/src/experiment2/mod.rs @@ -0,0 +1,114 @@ +pub mod earliest_deadline; + +use crate::Config; +use std::future::Future; +use std::sync::Arc; +use task_executor::TaskExecutor; +use tokio::select; +use tokio::sync::{mpsc, OwnedSemaphorePermit, Semaphore}; +use tracing::error; + +pub struct DropOnFinish { + _permit: OwnedSemaphorePermit, + //_worker_timer: Option, +} + +pub trait Work: Send { + type State: Default + Send; + + fn run(self: Box, state: &mut Self::State, runner: TaskRunner); + fn kind_name(&self) -> &'static str; +} + +pub struct TaskRunner<'a> { + executor: &'a TaskExecutor, + drop_on_finish: DropOnFinish, + name: &'static str, +} + +impl TaskRunner<'_> { + pub fn run_future(self, future: impl Future + Send + 'static) { + self.executor.spawn( + async move { + future.await; + drop(self.drop_on_finish) + }, + self.name, + ); + } + + pub fn run_blocking(self, function: impl FnOnce() + Send + 'static) { + self.executor.spawn_blocking( + || { + function(); + drop(self.drop_on_finish) + }, + self.name, + ); + } + + pub fn run_immediate(self, function: impl FnOnce(DropOnFinish)) { + function(self.drop_on_finish); + } +} + +pub trait Scheduler: Send { + fn received(&mut self, work_item: Box) -> Option>; + fn next_task(&mut self) -> impl std::future::Future>> + Send; +} + +pub async fn spawn( + config: Config, + scheduler: S, + executor: TaskExecutor, +) -> mpsc::Sender> +where + W: Work + ?Sized + 'static, + S: Scheduler + Default + 'static, +{ + let (tx, rx) = mpsc::channel(1000); + executor.spawn( + processor(config, rx, scheduler, executor.clone()), + "processor", + ); + tx +} + +async fn processor( + config: Config, + mut rx: mpsc::Receiver>, + mut scheduler: S, + executor: TaskExecutor, +) where + W: Work + ?Sized, + S: Scheduler, +{ + // TODO: consider having separate limits for blocking and async? + let semaphore = Arc::new(Semaphore::new(config.max_workers)); + let mut state = W::State::default(); + + loop { + let Ok(permit) = semaphore.clone().acquire_owned().await else { + error!("Processor semaphore closed unexpectedly"); + break; + }; + + let work_item = loop { + if let Some(work_item) = select! { + biased; + Some(work_item) = rx.recv() => scheduler.received(work_item), + Some(work_item) = scheduler.next_task() => Some(work_item), + else => return, + } { + break work_item; + } + }; + + let runner = TaskRunner { + executor: &executor, + drop_on_finish: DropOnFinish { _permit: permit }, + name: work_item.kind_name(), + }; + work_item.run(&mut state, runner); + } +} diff --git a/anchor/processor/src/lib.rs b/anchor/processor/src/lib.rs index 0228b639..79804e44 100644 --- a/anchor/processor/src/lib.rs +++ b/anchor/processor/src/lib.rs @@ -1,3 +1,7 @@ +pub mod experiment; +pub mod experiment2; +mod metrics; + use serde::{Deserialize, Serialize}; use std::future::Future; use std::pin::Pin; @@ -31,25 +35,37 @@ impl Sender { Self { name, tx } } - pub fn send_async(&mut self, future: AsyncFn) { - self.send_work_item(WorkItem::new_async(self.name, future)); + pub fn send_async(&mut self, future: AsyncFn) -> Result<(), TrySendError> { + self.send_work_item(WorkItem::new_async(self.name, future)) } - pub fn send_blocking(&mut self, func: BlockingFn) { - self.send_work_item(WorkItem::new_blocking(self.name, func)); + pub fn send_blocking(&mut self, func: BlockingFn) -> Result<(), TrySendError> { + self.send_work_item(WorkItem::new_blocking(self.name, func)) } - pub fn send_work_item(&mut self, item: WorkItem) { - if let Err(err) = self.tx.try_send(item) { + fn send_work_item(&mut self, item: WorkItem) -> Result<(), TrySendError> { + let result = self.tx.try_send(item); + if let Err(err) = &result { + metrics::inc_counter_vec( + &metrics::ANCHOR_PROCESSOR_SEND_ERROR_PER_WORK_TYPE, + &[self.name], + ); match err { - TrySendError::Full(item) => { - warn!(task = item.name, "Processor queue full") + TrySendError::Full(_) => { + warn!(task = self.name, "Processor queue full") } TrySendError::Closed(_) => { error!("Processor queue closed unexpectedly") } } + } else { + metrics::inc_counter_vec( + &metrics::ANCHOR_PROCESSOR_WORK_EVENTS_SUBMITTED_COUNT, + &[self.name], + ); + metrics::inc_gauge_vec(&metrics::ANCHOR_PROCESSOR_QUEUE_LENGTH, &[self.name]); } + result } } @@ -109,6 +125,7 @@ async fn processor(config: Config, mut receivers: Receivers, executor: TaskExecu let semaphore = Arc::new(Semaphore::new(config.max_workers)); loop { + let _timer = metrics::start_timer(&metrics::ANCHOR_PROCESSOR_EVENT_HANDLING_SECONDS); let Ok(permit) = semaphore.clone().acquire_owned().await else { error!("Processor semaphore closed unexpectedly"); break; @@ -123,10 +140,19 @@ async fn processor(config: Config, mut receivers: Receivers, executor: TaskExecu } }; + metrics::inc_gauge(&metrics::ANCHOR_PROCESSOR_WORKERS_ACTIVE_TOTAL); + metrics::inc_counter_vec( + &metrics::ANCHOR_PROCESSOR_WORK_EVENTS_STARTED_COUNT, + &[work_item.name], + ); + let work_timer = + metrics::start_timer_vec(&metrics::ANCHOR_PROCESSOR_WORKER_TIME, &[work_item.name]); match work_item.func { AsyncOrBlocking::Async(async_fn) => executor.spawn( async move { async_fn.await; + drop(work_timer); + metrics::dec_gauge(&metrics::ANCHOR_PROCESSOR_WORKERS_ACTIVE_TOTAL); drop(permit); }, work_item.name, @@ -135,6 +161,8 @@ async fn processor(config: Config, mut receivers: Receivers, executor: TaskExecu executor.spawn_blocking( move || { blocking_fn(); + drop(work_timer); + metrics::dec_gauge(&metrics::ANCHOR_PROCESSOR_WORKERS_ACTIVE_TOTAL); drop(permit); }, work_item.name, diff --git a/anchor/processor/src/metrics.rs b/anchor/processor/src/metrics.rs new file mode 100644 index 00000000..53631d21 --- /dev/null +++ b/anchor/processor/src/metrics.rs @@ -0,0 +1,60 @@ +pub use metrics::*; +use std::sync::LazyLock; + +/* + * Gossip processor + */ +pub static ANCHOR_PROCESSOR_WORK_EVENTS_SUBMITTED_COUNT: LazyLock> = + LazyLock::new(|| { + try_create_int_counter_vec( + "anchor_processor_work_events_submitted_count", + "Count of work events submitted", + &["type"], + ) + }); +pub static ANCHOR_PROCESSOR_WORK_EVENTS_STARTED_COUNT: LazyLock> = + LazyLock::new(|| { + try_create_int_counter_vec( + "anchor_processor_work_events_started_count", + "Count of work events which have been started by a worker", + &["type"], + ) + }); +pub static ANCHOR_PROCESSOR_WORKER_TIME: LazyLock> = LazyLock::new(|| { + try_create_histogram_vec( + "anchor_processor_worker_time", + "Time taken for a worker to fully process some parcel of work.", + &["type"], + ) +}); +pub static ANCHOR_PROCESSOR_WORKERS_ACTIVE_TOTAL: LazyLock> = + LazyLock::new(|| { + try_create_int_gauge( + "anchor_processor_workers_active_total", + "Count of active workers in the processing pool.", + ) + }); +pub static ANCHOR_PROCESSOR_EVENT_HANDLING_SECONDS: LazyLock> = + LazyLock::new(|| { + try_create_histogram( + "anchor_processor_event_handling_seconds", + "Time spent handling a new message and allocating it to a queue or worker.", + ) + }); +pub static ANCHOR_PROCESSOR_QUEUE_LENGTH: LazyLock> = LazyLock::new(|| { + try_create_int_gauge_vec( + "anchor_processor_work_event_queue_length", + "Count of work events in queue waiting to be processed.", + &["type"], + ) +}); + +/// Errors and Debugging Stats +pub static ANCHOR_PROCESSOR_SEND_ERROR_PER_WORK_TYPE: LazyLock> = + LazyLock::new(|| { + try_create_int_counter_vec( + "anchor_processor_send_error_per_work_type", + "Total number of anchor processor send error per work type", + &["type"], + ) + }); From 671b9c57075de46f9246457f5fa3eb97b9318620 Mon Sep 17 00:00:00 2001 From: Daniel Knopik Date: Mon, 2 Dec 2024 16:40:42 +0100 Subject: [PATCH 03/12] cargo fmt --- anchor/processor/examples/experiment.rs | 30 ++++++++----------- anchor/processor/examples/experiment2.rs | 8 ++--- .../src/experiment/earliest_deadline.rs | 1 - anchor/processor/src/experiment/mod.rs | 7 +---- 4 files changed, 16 insertions(+), 30 deletions(-) diff --git a/anchor/processor/examples/experiment.rs b/anchor/processor/examples/experiment.rs index d279c5d3..43f90e54 100644 --- a/anchor/processor/examples/experiment.rs +++ b/anchor/processor/examples/experiment.rs @@ -32,12 +32,8 @@ struct QBFTStore { } enum AnchorWork { - StartQBFTInstance { - round: i64, - }, - SomethingQBFTInstance { - round: i64, - } + StartQBFTInstance { round: i64 }, + SomethingQBFTInstance { round: i64 }, } impl Work for AnchorWork { @@ -51,16 +47,14 @@ impl Work for AnchorWork { }); state.instances.insert(round, tx); } - AnchorWork::SomethingQBFTInstance { round } => { - runner.run_immediate(|drop_on_finish| { - state - .instances - .get(&round) - .unwrap() - .try_send((42, drop_on_finish)) - .unwrap() - }) - } + AnchorWork::SomethingQBFTInstance { round } => runner.run_immediate(|drop_on_finish| { + state + .instances + .get(&round) + .unwrap() + .try_send((42, drop_on_finish)) + .unwrap() + }), } } @@ -103,7 +97,9 @@ async fn main() { }; let _ = tx.send(AnchorWork::StartQBFTInstance { round: 0 }).await; - let _ = tx.send(AnchorWork::SomethingQBFTInstance { round: 0 }).await; + let _ = tx + .send(AnchorWork::SomethingQBFTInstance { round: 0 }) + .await; } fn blackbox() -> T { diff --git a/anchor/processor/examples/experiment2.rs b/anchor/processor/examples/experiment2.rs index 92317b43..a5303dd3 100644 --- a/anchor/processor/examples/experiment2.rs +++ b/anchor/processor/examples/experiment2.rs @@ -113,12 +113,8 @@ async fn main() { let executor = blackbox(); let tx = if sched { - spawn::( - Config::default(), - DeadlineScheduler, - executor, - ) - .await + spawn::(Config::default(), DeadlineScheduler, executor) + .await } else { spawn::( Config::default(), diff --git a/anchor/processor/src/experiment/earliest_deadline.rs b/anchor/processor/src/experiment/earliest_deadline.rs index c35875ca..fd64da99 100644 --- a/anchor/processor/src/experiment/earliest_deadline.rs +++ b/anchor/processor/src/experiment/earliest_deadline.rs @@ -16,4 +16,3 @@ impl Scheduler for DeadlineScheduler { todo!() } } - diff --git a/anchor/processor/src/experiment/mod.rs b/anchor/processor/src/experiment/mod.rs index f1df9d88..ead13eaf 100644 --- a/anchor/processor/src/experiment/mod.rs +++ b/anchor/processor/src/experiment/mod.rs @@ -14,7 +14,6 @@ pub struct DropOnFinish { } pub trait Work: Send { - fn run(self, state: &mut T, runner: TaskRunner); fn kind_name(&self) -> &'static str; } @@ -56,11 +55,7 @@ pub trait Scheduler: Send { fn next_task(&mut self) -> impl std::future::Future> + Send; } -pub async fn spawn( - config: Config, - scheduler: S, - executor: TaskExecutor, -) -> mpsc::Sender +pub async fn spawn(config: Config, scheduler: S, executor: TaskExecutor) -> mpsc::Sender where W: Work + 'static, S: Scheduler + Default + 'static, From 566d0f1428334b0b7c26bb6ef8d3cc50dcfc78bb Mon Sep 17 00:00:00 2001 From: Daniel Knopik Date: Tue, 3 Dec 2024 11:13:50 +0100 Subject: [PATCH 04/12] remove processor experiments --- anchor/processor/examples/experiment.rs | 107 -------------- anchor/processor/examples/experiment2.rs | 133 ------------------ .../src/experiment/earliest_deadline.rs | 18 --- anchor/processor/src/experiment/mod.rs | 110 --------------- .../src/experiment2/earliest_deadline.rs | 18 --- anchor/processor/src/experiment2/mod.rs | 114 --------------- anchor/processor/src/lib.rs | 2 - 7 files changed, 502 deletions(-) delete mode 100644 anchor/processor/examples/experiment.rs delete mode 100644 anchor/processor/examples/experiment2.rs delete mode 100644 anchor/processor/src/experiment/earliest_deadline.rs delete mode 100644 anchor/processor/src/experiment/mod.rs delete mode 100644 anchor/processor/src/experiment2/earliest_deadline.rs delete mode 100644 anchor/processor/src/experiment2/mod.rs diff --git a/anchor/processor/examples/experiment.rs b/anchor/processor/examples/experiment.rs deleted file mode 100644 index 43f90e54..00000000 --- a/anchor/processor/examples/experiment.rs +++ /dev/null @@ -1,107 +0,0 @@ -use processor::experiment::earliest_deadline::{DeadlineScheduler, DeadlinedWork}; -use processor::experiment::{spawn, DropOnFinish, Scheduler, TaskRunner, Work}; -use processor::Config; -use std::collections::{HashMap, VecDeque}; -use tokio::sync::mpsc; - -#[derive(Default)] -struct AnchorWorkQueues { - start_queue: VecDeque, - something_queue: VecDeque, -} - -impl Scheduler for AnchorWorkQueues { - fn received(&mut self, work_item: AnchorWork) -> Option { - match &work_item { - AnchorWork::StartQBFTInstance { .. } => self.start_queue.push_back(work_item), - AnchorWork::SomethingQBFTInstance { .. } => self.something_queue.push_back(work_item), - } - None - } - - async fn next_task(&mut self) -> Option { - self.start_queue - .pop_front() - .or_else(|| self.something_queue.pop_front()) - } -} - -#[derive(Default)] -struct QBFTStore { - instances: HashMap>, -} - -enum AnchorWork { - StartQBFTInstance { round: i64 }, - SomethingQBFTInstance { round: i64 }, -} - -impl Work for AnchorWork { - fn run(self, state: &mut QBFTStore, runner: TaskRunner) { - match self { - AnchorWork::StartQBFTInstance { round } => { - let (tx, mut rx) = mpsc::channel(10); - runner.run_future(async move { - println!("StartQBFTInstance"); - rx.recv().await.unwrap(); - }); - state.instances.insert(round, tx); - } - AnchorWork::SomethingQBFTInstance { round } => runner.run_immediate(|drop_on_finish| { - state - .instances - .get(&round) - .unwrap() - .try_send((42, drop_on_finish)) - .unwrap() - }), - } - } - - fn kind_name(&self) -> &'static str { - match self { - AnchorWork::StartQBFTInstance { .. } => "StartQBFTInstance", - AnchorWork::SomethingQBFTInstance { .. } => "SomethingQBFTInstance", - } - } -} - -impl DeadlinedWork for AnchorWork { - fn get_deadline(&self) -> i64 { - match self { - AnchorWork::StartQBFTInstance { round } => *round, - AnchorWork::SomethingQBFTInstance { round } => *round, - } - } -} - -#[tokio::main] -async fn main() { - let sched = blackbox(); - let executor = blackbox(); - - let tx = if sched { - spawn::( - Config::default(), - DeadlineScheduler, - executor, - ) - .await - } else { - spawn::( - Config::default(), - AnchorWorkQueues::default(), - executor, - ) - .await - }; - - let _ = tx.send(AnchorWork::StartQBFTInstance { round: 0 }).await; - let _ = tx - .send(AnchorWork::SomethingQBFTInstance { round: 0 }) - .await; -} - -fn blackbox() -> T { - unimplemented!() -} diff --git a/anchor/processor/examples/experiment2.rs b/anchor/processor/examples/experiment2.rs deleted file mode 100644 index a5303dd3..00000000 --- a/anchor/processor/examples/experiment2.rs +++ /dev/null @@ -1,133 +0,0 @@ -use processor::experiment2::earliest_deadline::{DeadlineScheduler, DeadlinedWork}; -use processor::experiment2::{spawn, DropOnFinish, Scheduler, TaskRunner, Work}; -use processor::Config; -use std::collections::{HashMap, VecDeque}; -use tokio::sync::mpsc; - -trait AnchorQueueable { - fn queue_kind(&self) -> AnchorQueueKinds; -} - -enum AnchorQueueKinds { - StartQbftQueue, - SomethingQbftQueue, -} - -#[derive(Default)] -struct AnchorWorkQueues { - start_queue: VecDeque>, - something_queue: VecDeque>, -} - -impl Scheduler for AnchorWorkQueues { - fn received(&mut self, work_item: Box) -> Option> { - match work_item.queue_kind() { - AnchorQueueKinds::StartQbftQueue => self.start_queue.push_back(work_item), - AnchorQueueKinds::SomethingQbftQueue => self.something_queue.push_back(work_item), - } - None - } - - async fn next_task(&mut self) -> Option> { - self.start_queue - .pop_front() - .or_else(|| self.something_queue.pop_front()) - } -} - -trait AnchorWork: DeadlinedWork + AnchorQueueable + Work {} -impl> AnchorWork for T {} - -#[derive(Default)] -struct QBFTStore { - instances: HashMap>, -} - -struct StartQBFTInstance { - round: i64, -} - -impl Work for StartQBFTInstance { - type State = QBFTStore; - - fn run(self: Box, state: &mut QBFTStore, runner: TaskRunner) { - let (tx, mut rx) = mpsc::channel(10); - runner.run_future(async move { - println!("StartQBFTInstance"); - rx.recv().await.unwrap(); - }); - state.instances.insert(self.round, tx); - } - - fn kind_name(&self) -> &'static str { - "start_qbft_instance" - } -} -impl DeadlinedWork for StartQBFTInstance { - fn get_deadline(&self) -> i64 { - self.round - } -} -impl AnchorQueueable for StartQBFTInstance { - fn queue_kind(&self) -> AnchorQueueKinds { - AnchorQueueKinds::StartQbftQueue - } -} - -struct SomethingQBFTInstance { - round: i64, -} - -impl Work for SomethingQBFTInstance { - type State = QBFTStore; - - fn run(self: Box, state: &mut QBFTStore, runner: TaskRunner) { - runner.run_immediate(|drop_on_finish| { - state - .instances - .get(&self.round) - .unwrap() - .try_send((*self, drop_on_finish)) - .unwrap() - }) - } - - fn kind_name(&self) -> &'static str { - "start_qbft_instance" - } -} -impl DeadlinedWork for SomethingQBFTInstance { - fn get_deadline(&self) -> i64 { - self.round - } -} -impl AnchorQueueable for SomethingQBFTInstance { - fn queue_kind(&self) -> AnchorQueueKinds { - AnchorQueueKinds::SomethingQbftQueue - } -} - -#[tokio::main] -async fn main() { - let sched = blackbox(); - let executor = blackbox(); - - let tx = if sched { - spawn::(Config::default(), DeadlineScheduler, executor) - .await - } else { - spawn::( - Config::default(), - AnchorWorkQueues::default(), - executor, - ) - .await - }; - - let _ = tx.send(Box::new(StartQBFTInstance { round: 0 })).await; - let _ = tx.send(Box::new(SomethingQBFTInstance { round: 0 })).await; -} - -fn blackbox() -> T { - unimplemented!() -} diff --git a/anchor/processor/src/experiment/earliest_deadline.rs b/anchor/processor/src/experiment/earliest_deadline.rs deleted file mode 100644 index fd64da99..00000000 --- a/anchor/processor/src/experiment/earliest_deadline.rs +++ /dev/null @@ -1,18 +0,0 @@ -use crate::experiment::Scheduler; - -pub trait DeadlinedWork { - fn get_deadline(&self) -> i64; -} - -#[derive(Default)] -pub struct DeadlineScheduler; - -impl Scheduler for DeadlineScheduler { - fn received(&mut self, _work_item: W) -> Option { - todo!() - } - - async fn next_task(&mut self) -> Option { - todo!() - } -} diff --git a/anchor/processor/src/experiment/mod.rs b/anchor/processor/src/experiment/mod.rs deleted file mode 100644 index ead13eaf..00000000 --- a/anchor/processor/src/experiment/mod.rs +++ /dev/null @@ -1,110 +0,0 @@ -pub mod earliest_deadline; - -use crate::Config; -use std::future::Future; -use std::sync::Arc; -use task_executor::TaskExecutor; -use tokio::select; -use tokio::sync::{mpsc, OwnedSemaphorePermit, Semaphore}; -use tracing::error; - -pub struct DropOnFinish { - _permit: OwnedSemaphorePermit, - //_worker_timer: Option, -} - -pub trait Work: Send { - fn run(self, state: &mut T, runner: TaskRunner); - fn kind_name(&self) -> &'static str; -} - -pub struct TaskRunner<'a> { - executor: &'a TaskExecutor, - drop_on_finish: DropOnFinish, - name: &'static str, -} - -impl TaskRunner<'_> { - pub fn run_future(self, future: impl Future + Send + 'static) { - self.executor.spawn( - async move { - future.await; - drop(self.drop_on_finish) - }, - self.name, - ); - } - - pub fn run_blocking(self, function: impl FnOnce() + Send + 'static) { - self.executor.spawn_blocking( - || { - function(); - drop(self.drop_on_finish) - }, - self.name, - ); - } - - pub fn run_immediate(self, function: impl FnOnce(DropOnFinish)) { - function(self.drop_on_finish); - } -} - -pub trait Scheduler: Send { - fn received(&mut self, work_item: W) -> Option; - fn next_task(&mut self) -> impl std::future::Future> + Send; -} - -pub async fn spawn(config: Config, scheduler: S, executor: TaskExecutor) -> mpsc::Sender -where - W: Work + 'static, - S: Scheduler + Default + 'static, - T: Default + Send + 'static, -{ - let (tx, rx) = mpsc::channel(1000); - executor.spawn( - processor(config, rx, scheduler, executor.clone()), - "processor", - ); - tx -} - -async fn processor( - config: Config, - mut rx: mpsc::Receiver, - mut scheduler: S, - executor: TaskExecutor, -) where - W: Work, - S: Scheduler, - T: Default, -{ - // TODO: consider having separate limits for blocking and async? - let semaphore = Arc::new(Semaphore::new(config.max_workers)); - let mut state = T::default(); - - loop { - let Ok(permit) = semaphore.clone().acquire_owned().await else { - error!("Processor semaphore closed unexpectedly"); - break; - }; - - let work_item = loop { - if let Some(work_item) = select! { - biased; - Some(work_item) = rx.recv() => scheduler.received(work_item), - Some(work_item) = scheduler.next_task() => Some(work_item), - else => return, - } { - break work_item; - } - }; - - let runner = TaskRunner { - executor: &executor, - drop_on_finish: DropOnFinish { _permit: permit }, - name: work_item.kind_name(), - }; - work_item.run(&mut state, runner); - } -} diff --git a/anchor/processor/src/experiment2/earliest_deadline.rs b/anchor/processor/src/experiment2/earliest_deadline.rs deleted file mode 100644 index 0eea1133..00000000 --- a/anchor/processor/src/experiment2/earliest_deadline.rs +++ /dev/null @@ -1,18 +0,0 @@ -use crate::experiment2::{Scheduler, Work}; - -pub trait DeadlinedWork: Work { - fn get_deadline(&self) -> i64; -} - -#[derive(Default)] -pub struct DeadlineScheduler; - -impl Scheduler for DeadlineScheduler { - fn received(&mut self, _work_item: Box) -> Option> { - todo!() - } - - async fn next_task(&mut self) -> Option> { - todo!() - } -} diff --git a/anchor/processor/src/experiment2/mod.rs b/anchor/processor/src/experiment2/mod.rs deleted file mode 100644 index 4dec09d3..00000000 --- a/anchor/processor/src/experiment2/mod.rs +++ /dev/null @@ -1,114 +0,0 @@ -pub mod earliest_deadline; - -use crate::Config; -use std::future::Future; -use std::sync::Arc; -use task_executor::TaskExecutor; -use tokio::select; -use tokio::sync::{mpsc, OwnedSemaphorePermit, Semaphore}; -use tracing::error; - -pub struct DropOnFinish { - _permit: OwnedSemaphorePermit, - //_worker_timer: Option, -} - -pub trait Work: Send { - type State: Default + Send; - - fn run(self: Box, state: &mut Self::State, runner: TaskRunner); - fn kind_name(&self) -> &'static str; -} - -pub struct TaskRunner<'a> { - executor: &'a TaskExecutor, - drop_on_finish: DropOnFinish, - name: &'static str, -} - -impl TaskRunner<'_> { - pub fn run_future(self, future: impl Future + Send + 'static) { - self.executor.spawn( - async move { - future.await; - drop(self.drop_on_finish) - }, - self.name, - ); - } - - pub fn run_blocking(self, function: impl FnOnce() + Send + 'static) { - self.executor.spawn_blocking( - || { - function(); - drop(self.drop_on_finish) - }, - self.name, - ); - } - - pub fn run_immediate(self, function: impl FnOnce(DropOnFinish)) { - function(self.drop_on_finish); - } -} - -pub trait Scheduler: Send { - fn received(&mut self, work_item: Box) -> Option>; - fn next_task(&mut self) -> impl std::future::Future>> + Send; -} - -pub async fn spawn( - config: Config, - scheduler: S, - executor: TaskExecutor, -) -> mpsc::Sender> -where - W: Work + ?Sized + 'static, - S: Scheduler + Default + 'static, -{ - let (tx, rx) = mpsc::channel(1000); - executor.spawn( - processor(config, rx, scheduler, executor.clone()), - "processor", - ); - tx -} - -async fn processor( - config: Config, - mut rx: mpsc::Receiver>, - mut scheduler: S, - executor: TaskExecutor, -) where - W: Work + ?Sized, - S: Scheduler, -{ - // TODO: consider having separate limits for blocking and async? - let semaphore = Arc::new(Semaphore::new(config.max_workers)); - let mut state = W::State::default(); - - loop { - let Ok(permit) = semaphore.clone().acquire_owned().await else { - error!("Processor semaphore closed unexpectedly"); - break; - }; - - let work_item = loop { - if let Some(work_item) = select! { - biased; - Some(work_item) = rx.recv() => scheduler.received(work_item), - Some(work_item) = scheduler.next_task() => Some(work_item), - else => return, - } { - break work_item; - } - }; - - let runner = TaskRunner { - executor: &executor, - drop_on_finish: DropOnFinish { _permit: permit }, - name: work_item.kind_name(), - }; - work_item.run(&mut state, runner); - } -} diff --git a/anchor/processor/src/lib.rs b/anchor/processor/src/lib.rs index 79804e44..b760b78f 100644 --- a/anchor/processor/src/lib.rs +++ b/anchor/processor/src/lib.rs @@ -1,5 +1,3 @@ -pub mod experiment; -pub mod experiment2; mod metrics; use serde::{Deserialize, Serialize}; From 575f753880030592b7e4ba0a8f321acef7c8b57e Mon Sep 17 00:00:00 2001 From: Daniel Knopik Date: Tue, 3 Dec 2024 13:08:43 +0100 Subject: [PATCH 05/12] add a permitless queue for async and fast tasks --- anchor/processor/src/lib.rs | 148 ++++++++++++++++++++++---------- anchor/processor/src/metrics.rs | 7 ++ 2 files changed, 108 insertions(+), 47 deletions(-) diff --git a/anchor/processor/src/lib.rs b/anchor/processor/src/lib.rs index b760b78f..5cc691f4 100644 --- a/anchor/processor/src/lib.rs +++ b/anchor/processor/src/lib.rs @@ -5,9 +5,9 @@ use std::future::Future; use std::pin::Pin; use std::sync::Arc; use task_executor::TaskExecutor; +use tokio::sync::mpsc::error::{TryRecvError, TrySendError}; +use tokio::sync::{mpsc, OwnedSemaphorePermit, Semaphore}; use tokio::select; -use tokio::sync::mpsc::error::TrySendError; -use tokio::sync::{mpsc, Semaphore}; use tracing::{error, warn}; #[derive(Clone, Serialize, Deserialize)] @@ -24,33 +24,51 @@ impl Default for Config { } pub struct Sender { - name: &'static str, tx: mpsc::Sender, } impl Sender { - fn new(name: &'static str, tx: mpsc::Sender) -> Self { - Self { name, tx } + pub fn send_async( + &mut self, + future: AsyncFn, + name: &'static str, + ) -> Result<(), TrySendError> { + self.send_work_item(WorkItem { + func: WorkKind::Async(future), + name, + }) } - pub fn send_async(&mut self, future: AsyncFn) -> Result<(), TrySendError> { - self.send_work_item(WorkItem::new_async(self.name, future)) + pub fn send_blocking( + &mut self, + func: BlockingFn, + name: &'static str, + ) -> Result<(), TrySendError> { + self.send_work_item(WorkItem { + func: WorkKind::Blocking(func), + name, + }) } - pub fn send_blocking(&mut self, func: BlockingFn) -> Result<(), TrySendError> { - self.send_work_item(WorkItem::new_blocking(self.name, func)) + pub fn send_immediate( + &mut self, + func: ImmediateFn, + name: &'static str, + ) -> Result<(), TrySendError> { + self.send_work_item(WorkItem { + func: WorkKind::Immediate(func), + name, + }) } fn send_work_item(&mut self, item: WorkItem) -> Result<(), TrySendError> { + let name = item.name; let result = self.tx.try_send(item); if let Err(err) = &result { - metrics::inc_counter_vec( - &metrics::ANCHOR_PROCESSOR_SEND_ERROR_PER_WORK_TYPE, - &[self.name], - ); + metrics::inc_counter_vec(&metrics::ANCHOR_PROCESSOR_SEND_ERROR_PER_WORK_TYPE, &[name]); match err { TrySendError::Full(_) => { - warn!(task = self.name, "Processor queue full") + warn!(task = name, "Processor queue full") } TrySendError::Closed(_) => { error!("Processor queue closed unexpectedly") @@ -59,113 +77,149 @@ impl Sender { } else { metrics::inc_counter_vec( &metrics::ANCHOR_PROCESSOR_WORK_EVENTS_SUBMITTED_COUNT, - &[self.name], + &[name], ); - metrics::inc_gauge_vec(&metrics::ANCHOR_PROCESSOR_QUEUE_LENGTH, &[self.name]); + metrics::inc_gauge_vec(&metrics::ANCHOR_PROCESSOR_QUEUE_LENGTH, &[name]); } result } } pub struct Senders { - example_tx: Sender, + pub permitless_tx: Sender, + pub example2_tx: Sender, // todo add all the needed queues here } struct Receivers { - example_rx: mpsc::Receiver, + permitless_rx: mpsc::Receiver, + example2_rx: mpsc::Receiver, // todo add all the needed queues here } -pub type AsyncFn = Pin + Send + Sync>>; -pub type BlockingFn = Box; +pub type AsyncFn = Pin + Send>>; +pub type BlockingFn = Box; +pub type ImmediateFn = Box; -enum AsyncOrBlocking { +enum WorkKind { Async(AsyncFn), Blocking(BlockingFn), + Immediate(ImmediateFn), } + pub struct WorkItem { + func: WorkKind, name: &'static str, - func: AsyncOrBlocking, } impl WorkItem { pub fn new_async(name: &'static str, func: AsyncFn) -> Self { Self { name, - func: AsyncOrBlocking::Async(func), + func: WorkKind::Async(func), } } pub fn new_blocking(name: &'static str, func: BlockingFn) -> Self { Self { name, - func: AsyncOrBlocking::Blocking(func), + func: WorkKind::Blocking(func), + } + } +} + +pub struct DropOnFinish { + permit: Option, + _work_timer: Option, +} +impl Drop for DropOnFinish { + fn drop(&mut self) { + metrics::dec_gauge(&metrics::ANCHOR_PROCESSOR_WORKERS_ACTIVE_TOTAL); + if self.permit.is_some() { + metrics::dec_gauge(&metrics::ANCHOR_PROCESSOR_PERMIT_WORKERS_ACTIVE_TOTAL); } } } pub async fn spawn(config: Config, executor: TaskExecutor) -> Senders { // todo macro? just specifying name and capacity? - let (example_tx, example_rx) = mpsc::channel(1000); + let (permitless_tx, permitless_rx) = mpsc::channel(1000); + let (example2_tx, example2_rx) = mpsc::channel(1000); let senders = Senders { - example_tx: Sender::new("example", example_tx), + permitless_tx: Sender { tx: permitless_tx }, + example2_tx: Sender { tx: example2_tx }, + }; + let receivers = Receivers { + permitless_rx, + example2_rx, }; - let receivers = Receivers { example_rx }; executor.spawn(processor(config, receivers, executor.clone()), "processor"); senders } async fn processor(config: Config, mut receivers: Receivers, executor: TaskExecutor) { - // TODO: consider having separate limits for blocking and async? let semaphore = Arc::new(Semaphore::new(config.max_workers)); loop { let _timer = metrics::start_timer(&metrics::ANCHOR_PROCESSOR_EVENT_HANDLING_SECONDS); - let Ok(permit) = semaphore.clone().acquire_owned().await else { - error!("Processor semaphore closed unexpectedly"); - break; - }; - let work_item = select! { + let (permit, work_item) = select! { biased; - Some(w) = receivers.example_rx.recv() => w, - else => { - error!("Processor queues closed unexpectedly"); - break; + Some(w) = receivers.permitless_rx.recv() => (None, Some(w)), + Ok(permit) = semaphore.clone().acquire_owned() => { + select! { + biased; + Some(w) = receivers.example2_rx.recv() => (Some(permit), Some(w)), + + // we have a permit, so we prefer other queues at this point, + // but it should still be possible to receive a permitless event + Some(w) = receivers.permitless_rx.recv() => (None, Some(w)), + else => (None, None), + } } + else => (None, None), + }; + + let Some(work_item) = work_item else { + error!("Processor queues closed unexpectedly"); + break; }; metrics::inc_gauge(&metrics::ANCHOR_PROCESSOR_WORKERS_ACTIVE_TOTAL); + if permit.is_some() { + metrics::inc_gauge(&metrics::ANCHOR_PROCESSOR_PERMIT_WORKERS_ACTIVE_TOTAL); + } metrics::inc_counter_vec( &metrics::ANCHOR_PROCESSOR_WORK_EVENTS_STARTED_COUNT, &[work_item.name], ); - let work_timer = - metrics::start_timer_vec(&metrics::ANCHOR_PROCESSOR_WORKER_TIME, &[work_item.name]); + let drop_on_finish = DropOnFinish { + permit, + _work_timer: metrics::start_timer_vec( + &metrics::ANCHOR_PROCESSOR_WORKER_TIME, + &[work_item.name], + ), + }; match work_item.func { - AsyncOrBlocking::Async(async_fn) => executor.spawn( + WorkKind::Async(async_fn) => executor.spawn( async move { async_fn.await; - drop(work_timer); - metrics::dec_gauge(&metrics::ANCHOR_PROCESSOR_WORKERS_ACTIVE_TOTAL); - drop(permit); + drop(drop_on_finish); }, work_item.name, ), - AsyncOrBlocking::Blocking(blocking_fn) => { + WorkKind::Blocking(blocking_fn) => { executor.spawn_blocking( move || { blocking_fn(); - drop(work_timer); - metrics::dec_gauge(&metrics::ANCHOR_PROCESSOR_WORKERS_ACTIVE_TOTAL); - drop(permit); + drop(drop_on_finish); }, work_item.name, ); } + WorkKind::Immediate(immediate_fn) => immediate_fn(drop_on_finish), } } } diff --git a/anchor/processor/src/metrics.rs b/anchor/processor/src/metrics.rs index 53631d21..9071673a 100644 --- a/anchor/processor/src/metrics.rs +++ b/anchor/processor/src/metrics.rs @@ -34,6 +34,13 @@ pub static ANCHOR_PROCESSOR_WORKERS_ACTIVE_TOTAL: LazyLock> = "Count of active workers in the processing pool.", ) }); +pub static ANCHOR_PROCESSOR_PERMIT_WORKERS_ACTIVE_TOTAL: LazyLock> = + LazyLock::new(|| { + try_create_int_gauge( + "anchor_processor_permit_workers_active_total", + "Count of active workers in the processing pool, holding one permit.", + ) + }); pub static ANCHOR_PROCESSOR_EVENT_HANDLING_SECONDS: LazyLock> = LazyLock::new(|| { try_create_histogram( From 35aee422e603b945d16ee4daa5dddd916a684e3e Mon Sep 17 00:00:00 2001 From: Daniel Knopik Date: Tue, 3 Dec 2024 13:23:51 +0100 Subject: [PATCH 06/12] add work expiry --- anchor/processor/src/lib.rs | 30 ++++++++++++++++++++++++++++-- anchor/processor/src/metrics.rs | 8 ++++++++ 2 files changed, 36 insertions(+), 2 deletions(-) diff --git a/anchor/processor/src/lib.rs b/anchor/processor/src/lib.rs index 5cc691f4..8b9c2a29 100644 --- a/anchor/processor/src/lib.rs +++ b/anchor/processor/src/lib.rs @@ -5,9 +5,10 @@ use std::future::Future; use std::pin::Pin; use std::sync::Arc; use task_executor::TaskExecutor; -use tokio::sync::mpsc::error::{TryRecvError, TrySendError}; +use tokio::sync::mpsc::error::TrySendError; use tokio::sync::{mpsc, OwnedSemaphorePermit, Semaphore}; use tokio::select; +use tokio::time::Instant; use tracing::{error, warn}; #[derive(Clone, Serialize, Deserialize)] @@ -35,6 +36,7 @@ impl Sender { ) -> Result<(), TrySendError> { self.send_work_item(WorkItem { func: WorkKind::Async(future), + expiry: None, name, }) } @@ -46,6 +48,7 @@ impl Sender { ) -> Result<(), TrySendError> { self.send_work_item(WorkItem { func: WorkKind::Blocking(func), + expiry: None, name, }) } @@ -57,11 +60,12 @@ impl Sender { ) -> Result<(), TrySendError> { self.send_work_item(WorkItem { func: WorkKind::Immediate(func), + expiry: None, name, }) } - fn send_work_item(&mut self, item: WorkItem) -> Result<(), TrySendError> { + pub fn send_work_item(&mut self, item: WorkItem) -> Result<(), TrySendError> { let name = item.name; let result = self.tx.try_send(item); if let Err(err) = &result { @@ -109,6 +113,7 @@ enum WorkKind { pub struct WorkItem { func: WorkKind, + expiry: Option, name: &'static str, } @@ -116,6 +121,7 @@ impl WorkItem { pub fn new_async(name: &'static str, func: AsyncFn) -> Self { Self { name, + expiry: None, func: WorkKind::Async(func), } } @@ -123,9 +129,19 @@ impl WorkItem { pub fn new_blocking(name: &'static str, func: BlockingFn) -> Self { Self { name, + expiry: None, func: WorkKind::Blocking(func), } } + + pub fn set_expiry(&mut self, expiry: Instant) { + self.expiry = Some(expiry); + } + + pub fn with_expiry(mut self, expiry: Instant) -> Self { + self.set_expiry(expiry); + self + } } pub struct DropOnFinish { @@ -187,6 +203,16 @@ async fn processor(config: Config, mut receivers: Receivers, executor: TaskExecu break; }; + if let Some(expiry) = work_item.expiry { + if expiry < Instant::now() { + metrics::inc_counter_vec( + &metrics::ANCHOR_PROCESSOR_WORK_EVENTS_EXPIRED_COUNT, + &[work_item.name], + ); + continue; + } + } + metrics::inc_gauge(&metrics::ANCHOR_PROCESSOR_WORKERS_ACTIVE_TOTAL); if permit.is_some() { metrics::inc_gauge(&metrics::ANCHOR_PROCESSOR_PERMIT_WORKERS_ACTIVE_TOTAL); diff --git a/anchor/processor/src/metrics.rs b/anchor/processor/src/metrics.rs index 9071673a..3ad73c43 100644 --- a/anchor/processor/src/metrics.rs +++ b/anchor/processor/src/metrics.rs @@ -20,6 +20,14 @@ pub static ANCHOR_PROCESSOR_WORK_EVENTS_STARTED_COUNT: LazyLock> = + LazyLock::new(|| { + try_create_int_counter_vec( + "anchor_processor_work_events_expired_count", + "Count of work events which expired before processing", + &["type"], + ) + }); pub static ANCHOR_PROCESSOR_WORKER_TIME: LazyLock> = LazyLock::new(|| { try_create_histogram_vec( "anchor_processor_worker_time", From d19c2dd3168ef5eba22b8a1bc5136bae7e2acb6b Mon Sep 17 00:00:00 2001 From: Daniel Knopik Date: Tue, 3 Dec 2024 14:05:04 +0100 Subject: [PATCH 07/12] add processor state (for sending to QBFT instances etc.) --- Cargo.lock | 1 + anchor/processor/Cargo.toml | 1 + anchor/processor/src/lib.rs | 50 ++++++++++++++++++++++++++++++++----- 3 files changed, 46 insertions(+), 6 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index e1213bdf..fea7f3ba 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5815,6 +5815,7 @@ version = "0.1.0" dependencies = [ "metrics 0.2.0 (git+https://github.com/agemanning/lighthouse?branch=modularize-vc)", "num_cpus", + "qbft", "serde", "task_executor 0.1.0 (git+https://github.com/sigp/lighthouse?branch=unstable)", "tokio", diff --git a/anchor/processor/Cargo.toml b/anchor/processor/Cargo.toml index a01f33ec..769a494e 100644 --- a/anchor/processor/Cargo.toml +++ b/anchor/processor/Cargo.toml @@ -11,3 +11,4 @@ task_executor = { workspace = true } serde = { workspace = true } num_cpus = { workspace = true } metrics = { workspace = true } +qbft = { workspace = true } diff --git a/anchor/processor/src/lib.rs b/anchor/processor/src/lib.rs index 8b9c2a29..9464b23a 100644 --- a/anchor/processor/src/lib.rs +++ b/anchor/processor/src/lib.rs @@ -1,13 +1,16 @@ mod metrics; +use qbft::{InMessage, InstanceHeight}; use serde::{Deserialize, Serialize}; +use std::collections::HashMap; use std::future::Future; use std::pin::Pin; use std::sync::Arc; use task_executor::TaskExecutor; +use tokio::select; use tokio::sync::mpsc::error::TrySendError; +use tokio::sync::mpsc::UnboundedSender; use tokio::sync::{mpsc, OwnedSemaphorePermit, Semaphore}; -use tokio::select; use tokio::time::Instant; use tracing::{error, warn}; @@ -37,6 +40,7 @@ impl Sender { self.send_work_item(WorkItem { func: WorkKind::Async(future), expiry: None, + state_modifier: None, name, }) } @@ -49,6 +53,7 @@ impl Sender { self.send_work_item(WorkItem { func: WorkKind::Blocking(func), expiry: None, + state_modifier: None, name, }) } @@ -61,6 +66,7 @@ impl Sender { self.send_work_item(WorkItem { func: WorkKind::Immediate(func), expiry: None, + state_modifier: None, name, }) } @@ -103,7 +109,8 @@ struct Receivers { pub type AsyncFn = Pin + Send>>; pub type BlockingFn = Box; -pub type ImmediateFn = Box; +pub type ImmediateFn = Box; +pub type StateModifierFn = Box; enum WorkKind { Async(AsyncFn), @@ -114,6 +121,7 @@ enum WorkKind { pub struct WorkItem { func: WorkKind, expiry: Option, + state_modifier: Option, name: &'static str, } @@ -122,6 +130,7 @@ impl WorkItem { Self { name, expiry: None, + state_modifier: None, func: WorkKind::Async(func), } } @@ -130,16 +139,35 @@ impl WorkItem { Self { name, expiry: None, + state_modifier: None, func: WorkKind::Blocking(func), } } - pub fn set_expiry(&mut self, expiry: Instant) { - self.expiry = Some(expiry); + pub fn new_immediate(name: &'static str, func: ImmediateFn) -> Self { + Self { + name, + expiry: None, + state_modifier: None, + func: WorkKind::Immediate(func), + } + } + + pub fn set_expiry(&mut self, expiry: Option) { + self.expiry = expiry; } pub fn with_expiry(mut self, expiry: Instant) -> Self { - self.set_expiry(expiry); + self.expiry = Some(expiry); + self + } + + pub fn set_state_modifier(&mut self, state_modifier: Option) { + self.state_modifier = state_modifier; + } + + pub fn with_state_modifier(mut self, state_modifier: StateModifierFn) -> Self { + self.state_modifier = Some(state_modifier); self } } @@ -157,6 +185,12 @@ impl Drop for DropOnFinish { } } +#[derive(Default)] +pub struct ProcessorState { + // placeholder, of course we also have to separate by validator and set data type + qbft_instances: HashMap>>, +} + pub async fn spawn(config: Config, executor: TaskExecutor) -> Senders { // todo macro? just specifying name and capacity? let (permitless_tx, permitless_rx) = mpsc::channel(1000); @@ -177,6 +211,7 @@ pub async fn spawn(config: Config, executor: TaskExecutor) -> Senders { async fn processor(config: Config, mut receivers: Receivers, executor: TaskExecutor) { let semaphore = Arc::new(Semaphore::new(config.max_workers)); + let mut state = ProcessorState::default(); loop { let _timer = metrics::start_timer(&metrics::ANCHOR_PROCESSOR_EVENT_HANDLING_SECONDS); @@ -228,6 +263,9 @@ async fn processor(config: Config, mut receivers: Receivers, executor: TaskExecu &[work_item.name], ), }; + if let Some(state_modifier) = work_item.state_modifier { + state_modifier(&mut state); + } match work_item.func { WorkKind::Async(async_fn) => executor.spawn( async move { @@ -245,7 +283,7 @@ async fn processor(config: Config, mut receivers: Receivers, executor: TaskExecu work_item.name, ); } - WorkKind::Immediate(immediate_fn) => immediate_fn(drop_on_finish), + WorkKind::Immediate(immediate_fn) => immediate_fn(&state, drop_on_finish), } } } From 7dfc67030d96d383fe17ff0dcc195625940803df Mon Sep 17 00:00:00 2001 From: Daniel Knopik Date: Tue, 3 Dec 2024 17:26:45 +0100 Subject: [PATCH 08/12] docs --- anchor/processor/src/lib.rs | 57 +++++++++++++++++++++++++++++++++---- 1 file changed, 52 insertions(+), 5 deletions(-) diff --git a/anchor/processor/src/lib.rs b/anchor/processor/src/lib.rs index 9464b23a..9301225c 100644 --- a/anchor/processor/src/lib.rs +++ b/anchor/processor/src/lib.rs @@ -1,3 +1,12 @@ +//! Central processor, serving roughly the same purpose as Lighthouse's `beacon_processor`. +//! +//! The processor does not centrally define the available work items, but provides [`WorkItem`] +//! which can be used to send work to the processor via [`Sender`]s. The processor then retrieves +//! work items from priority-ranked queues and launches the items in a way corresponding to their +//! type. For most queues, a permit is needed, which are handed out by the processor up to a +//! configured value, effectively limiting the number of concurrent tasks. This avoids overloading +//! the system and prioritizes items based on the queues they were submitted to. + mod metrics; use qbft::{InMessage, InstanceHeight}; @@ -14,8 +23,12 @@ use tokio::sync::{mpsc, OwnedSemaphorePermit, Semaphore}; use tokio::time::Instant; use tracing::{error, warn}; -#[derive(Clone, Serialize, Deserialize)] +#[derive(Clone, Debug, Serialize, Deserialize)] +/// Configuration for a processor. Provided to [spawn]. pub struct Config { + /// The maximum amount of concurrent workers. Note that [WorkItem]s submitted via + /// [Senders::permitless_tx] do not count towards this limit. By default, this is the number of + /// logical CPUs. pub max_workers: usize, } @@ -27,11 +40,13 @@ impl Default for Config { } } +#[derive(Clone, Debug)] pub struct Sender { tx: mpsc::Sender, } impl Sender { + /// Convenience method creating an async [`WorkItem`] and sending it. pub fn send_async( &mut self, future: AsyncFn, @@ -45,6 +60,7 @@ impl Sender { }) } + /// Convenience method creating a blocking [`WorkItem`] and sending it. pub fn send_blocking( &mut self, func: BlockingFn, @@ -58,6 +74,7 @@ impl Sender { }) } + /// Convenience method creating an immediate [`WorkItem`] and sending it. pub fn send_immediate( &mut self, func: ImmediateFn, @@ -71,6 +88,8 @@ impl Sender { }) } + /// Sends a [`WorkItem`] into the queue, non-blocking, returning an error if the queue is full. + /// Handles metrics and logging for you. pub fn send_work_item(&mut self, item: WorkItem) -> Result<(), TrySendError> { let name = item.name; let result = self.tx.try_send(item); @@ -95,7 +114,12 @@ impl Sender { } } +/// Bag of available senders relevant for the Anchor client. +#[derive(Clone, Debug)] pub struct Senders { + /// Catch-all queue for tasks that are either very quick to run or behave well as async task in + /// the Tokio runtime. Is launched immediately and does not require capacity as defined by + /// [`Config::max_worker`]. pub permitless_tx: Sender, pub example2_tx: Sender, // todo add all the needed queues here @@ -112,12 +136,14 @@ pub type BlockingFn = Box; pub type ImmediateFn = Box; pub type StateModifierFn = Box; +#[derive(Debug)] enum WorkKind { Async(AsyncFn), Blocking(BlockingFn), Immediate(ImmediateFn), } +#[derive(Debug)] pub struct WorkItem { func: WorkKind, expiry: Option, @@ -126,6 +152,7 @@ pub struct WorkItem { } impl WorkItem { + /// Create an async work task. Will be spawned on the Tokio runtime. pub fn new_async(name: &'static str, func: AsyncFn) -> Self { Self { name, @@ -135,6 +162,7 @@ impl WorkItem { } } + /// Create a blocking work task. Will be spawned on the Tokio runtime using `spawn_blocking`. pub fn new_blocking(name: &'static str, func: BlockingFn) -> Self { Self { name, @@ -144,6 +172,12 @@ impl WorkItem { } } + /// Create an immediate work task. Has access to the [`ProcessorState`], and is thus ideal for + /// triggering some process, e.g. via a queue retrieved from the state. Must *NEVER* block! + /// + /// The [`DropOnFinish`] should be dropped when the work is done, for proper permit accounting + /// and metrics. This includes any work triggered by the closure, so [`DropOnFinish`] should + /// be sent along if any other process such as a QBFT instance is messaged. pub fn new_immediate(name: &'static str, func: ImmediateFn) -> Self { Self { name, @@ -153,6 +187,8 @@ impl WorkItem { } } + /// Set expiry of this work item. If the processor retrieves the work item after the expiry, + /// it drops the work item instead. pub fn set_expiry(&mut self, expiry: Option) { self.expiry = expiry; } @@ -162,6 +198,8 @@ impl WorkItem { self } + /// Before starting the work, modify the [`ProcessorState`]. Useful for storing stuff to be used + /// by [immediate](new_immediate) [`WorkItem`]s. pub fn set_state_modifier(&mut self, state_modifier: Option) { self.state_modifier = state_modifier; } @@ -172,6 +210,8 @@ impl WorkItem { } } +/// Refunds the permit and updates metrics on drop. +#[derive(Debug)] pub struct DropOnFinish { permit: Option, _work_timer: Option, @@ -185,12 +225,15 @@ impl Drop for DropOnFinish { } } -#[derive(Default)] +/// Contains several items necessary for processing immediate work items, such as queues for +/// triggering work in other parts of the client. +#[derive(Default, Debug)] pub struct ProcessorState { // placeholder, of course we also have to separate by validator and set data type - qbft_instances: HashMap>>, + pub qbft_instances: HashMap>>, } +/// Create a new processor and spawn it with the given executor. Returns the queue senders. pub async fn spawn(config: Config, executor: TaskExecutor) -> Senders { // todo macro? just specifying name and capacity? let (permitless_tx, permitless_rx) = mpsc::channel(1000); @@ -216,6 +259,8 @@ async fn processor(config: Config, mut receivers: Receivers, executor: TaskExecu loop { let _timer = metrics::start_timer(&metrics::ANCHOR_PROCESSOR_EVENT_HANDLING_SECONDS); + // Try to get the next work event. work_item will only be None when the queues are closed. + // Permit will be None when the event was received from permitless_rx. let (permit, work_item) = select! { biased; Some(w) = receivers.permitless_rx.recv() => (None, Some(w)), @@ -232,14 +277,13 @@ async fn processor(config: Config, mut receivers: Receivers, executor: TaskExecu } else => (None, None), }; - let Some(work_item) = work_item else { error!("Processor queues closed unexpectedly"); break; }; - if let Some(expiry) = work_item.expiry { if expiry < Instant::now() { + warn!(task = work_item.name, "Processor skipped expired work"); metrics::inc_counter_vec( &metrics::ANCHOR_PROCESSOR_WORK_EVENTS_EXPIRED_COUNT, &[work_item.name], @@ -248,6 +292,7 @@ async fn processor(config: Config, mut receivers: Receivers, executor: TaskExecu } } + // update metrics metrics::inc_gauge(&metrics::ANCHOR_PROCESSOR_WORKERS_ACTIVE_TOTAL); if permit.is_some() { metrics::inc_gauge(&metrics::ANCHOR_PROCESSOR_PERMIT_WORKERS_ACTIVE_TOTAL); @@ -263,9 +308,11 @@ async fn processor(config: Config, mut receivers: Receivers, executor: TaskExecu &[work_item.name], ), }; + if let Some(state_modifier) = work_item.state_modifier { state_modifier(&mut state); } + match work_item.func { WorkKind::Async(async_fn) => executor.spawn( async move { From 2e450d5437dbadddf477f48349e4c744996c25d1 Mon Sep 17 00:00:00 2001 From: Daniel Knopik Date: Tue, 3 Dec 2024 17:30:09 +0100 Subject: [PATCH 09/12] fix docs --- anchor/processor/src/lib.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/anchor/processor/src/lib.rs b/anchor/processor/src/lib.rs index 9301225c..057638e8 100644 --- a/anchor/processor/src/lib.rs +++ b/anchor/processor/src/lib.rs @@ -119,7 +119,7 @@ impl Sender { pub struct Senders { /// Catch-all queue for tasks that are either very quick to run or behave well as async task in /// the Tokio runtime. Is launched immediately and does not require capacity as defined by - /// [`Config::max_worker`]. + /// [`Config::max_workers`]. pub permitless_tx: Sender, pub example2_tx: Sender, // todo add all the needed queues here @@ -199,7 +199,7 @@ impl WorkItem { } /// Before starting the work, modify the [`ProcessorState`]. Useful for storing stuff to be used - /// by [immediate](new_immediate) [`WorkItem`]s. + /// by [immediate](WorkItem::new_immediate) `WorkItem`s. pub fn set_state_modifier(&mut self, state_modifier: Option) { self.state_modifier = state_modifier; } From 13e812cd0d2dcbbce8fad2f0f1420eac6ecc17be Mon Sep 17 00:00:00 2001 From: Daniel Knopik Date: Wed, 4 Dec 2024 12:36:53 +0100 Subject: [PATCH 10/12] add test and slightly improve api --- Cargo.lock | 2 + anchor/client/src/lib.rs | 2 +- anchor/processor/Cargo.toml | 4 ++ anchor/processor/src/lib.rs | 44 ++++++++---- anchor/processor/tests/processor_tests.rs | 82 +++++++++++++++++++++++ 5 files changed, 121 insertions(+), 13 deletions(-) create mode 100644 anchor/processor/tests/processor_tests.rs diff --git a/Cargo.lock b/Cargo.lock index fea7f3ba..378aaa4c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5813,6 +5813,8 @@ dependencies = [ name = "processor" version = "0.1.0" dependencies = [ + "async-channel", + "futures", "metrics 0.2.0 (git+https://github.com/agemanning/lighthouse?branch=modularize-vc)", "num_cpus", "qbft", diff --git a/anchor/client/src/lib.rs b/anchor/client/src/lib.rs index f287ac71..539028ac 100644 --- a/anchor/client/src/lib.rs +++ b/anchor/client/src/lib.rs @@ -43,7 +43,7 @@ impl Client { ); // Start the processor - let processor_senders = processor::spawn(config.processor, executor.clone()).await; + let processor_senders = processor::spawn(config.processor, executor.clone()); // Optionally start the metrics server. let _http_metrics_shared_state = if config.http_metrics.enabled { diff --git a/anchor/processor/Cargo.toml b/anchor/processor/Cargo.toml index 769a494e..c0e5c012 100644 --- a/anchor/processor/Cargo.toml +++ b/anchor/processor/Cargo.toml @@ -12,3 +12,7 @@ serde = { workspace = true } num_cpus = { workspace = true } metrics = { workspace = true } qbft = { workspace = true } + +[dev-dependencies] +async-channel = { workspace = true } +futures = { workspace = true } diff --git a/anchor/processor/src/lib.rs b/anchor/processor/src/lib.rs index 057638e8..4cb8ae96 100644 --- a/anchor/processor/src/lib.rs +++ b/anchor/processor/src/lib.rs @@ -12,6 +12,7 @@ mod metrics; use qbft::{InMessage, InstanceHeight}; use serde::{Deserialize, Serialize}; use std::collections::HashMap; +use std::fmt::{Debug, Formatter}; use std::future::Future; use std::pin::Pin; use std::sync::Arc; @@ -47,13 +48,13 @@ pub struct Sender { impl Sender { /// Convenience method creating an async [`WorkItem`] and sending it. - pub fn send_async( + pub fn send_async + Send + 'static>( &mut self, - future: AsyncFn, + future: F, name: &'static str, ) -> Result<(), TrySendError> { self.send_work_item(WorkItem { - func: WorkKind::Async(future), + func: WorkKind::Async(Box::pin(future)), expiry: None, state_modifier: None, name, @@ -61,13 +62,13 @@ impl Sender { } /// Convenience method creating a blocking [`WorkItem`] and sending it. - pub fn send_blocking( + pub fn send_blocking( &mut self, - func: BlockingFn, + func: F, name: &'static str, ) -> Result<(), TrySendError> { self.send_work_item(WorkItem { - func: WorkKind::Blocking(func), + func: WorkKind::Blocking(Box::new(func)), expiry: None, state_modifier: None, name, @@ -75,13 +76,13 @@ impl Sender { } /// Convenience method creating an immediate [`WorkItem`] and sending it. - pub fn send_immediate( + pub fn send_immediate( &mut self, - func: ImmediateFn, + func: F, name: &'static str, ) -> Result<(), TrySendError> { self.send_work_item(WorkItem { - func: WorkKind::Immediate(func), + func: WorkKind::Immediate(Box::new(func)), expiry: None, state_modifier: None, name, @@ -136,14 +137,22 @@ pub type BlockingFn = Box; pub type ImmediateFn = Box; pub type StateModifierFn = Box; -#[derive(Debug)] enum WorkKind { Async(AsyncFn), Blocking(BlockingFn), Immediate(ImmediateFn), } -#[derive(Debug)] +impl Debug for WorkKind { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + match self { + WorkKind::Async(_) => f.write_str("Async"), + WorkKind::Blocking(_) => f.write_str("Blocking"), + WorkKind::Immediate(_) => f.write_str("Immediate"), + } + } +} + pub struct WorkItem { func: WorkKind, expiry: Option, @@ -151,6 +160,17 @@ pub struct WorkItem { name: &'static str, } +impl Debug for WorkItem { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + f.debug_struct("WorkItem") + .field("func", &self.func) + .field("expiry", &self.expiry) + .field("state_modifier", &self.state_modifier.is_some()) + .field("name", &self.name) + .finish() + } +} + impl WorkItem { /// Create an async work task. Will be spawned on the Tokio runtime. pub fn new_async(name: &'static str, func: AsyncFn) -> Self { @@ -234,7 +254,7 @@ pub struct ProcessorState { } /// Create a new processor and spawn it with the given executor. Returns the queue senders. -pub async fn spawn(config: Config, executor: TaskExecutor) -> Senders { +pub fn spawn(config: Config, executor: TaskExecutor) -> Senders { // todo macro? just specifying name and capacity? let (permitless_tx, permitless_rx) = mpsc::channel(1000); let (example2_tx, example2_rx) = mpsc::channel(1000); diff --git a/anchor/processor/tests/processor_tests.rs b/anchor/processor/tests/processor_tests.rs new file mode 100644 index 00000000..09d2b937 --- /dev/null +++ b/anchor/processor/tests/processor_tests.rs @@ -0,0 +1,82 @@ +use std::error::Error; +use std::sync::Arc; +use std::time::Duration; +use task_executor::TaskExecutor; +use tokio::select; +use tokio::sync::{oneshot, Barrier, Notify}; +use tokio::time::sleep; + +#[tokio::test] +async fn test_max_workers() -> Result<(), Box> { + let handle = tokio::runtime::Handle::current(); + let (_signal, exit) = async_channel::bounded(1); + let (shutdown_tx, _) = futures::channel::mpsc::channel(1); + let executor = TaskExecutor::new(handle, exit, shutdown_tx); + + let config = processor::Config { + max_workers: 3, + }; + + let mut sender_queues = processor::spawn(config, executor); + + let start_sync = Arc::new(Barrier::new(4)); + let continue_notify = Arc::new(Notify::new()); + + // fill up the available workers + for _ in 0..3 { + let start_sync = start_sync.clone(); + let continue_notify = continue_notify.clone(); + sender_queues.example2_tx.send_async(async move { + start_sync.wait().await; + continue_notify.notified().await; + }, "test_task1")?; + + // throw in some permitless tasks + sender_queues.permitless_tx.send_blocking(|| {}, "test_task2")?; + sender_queues.permitless_tx.send_immediate(|_, _| {}, "test_task3")?; + } + + // wait until every task has been spawned + select! { + _ = sleep(Duration::from_millis(100)) => panic!("we should be able to run the blockers"), + _ = start_sync.wait() => {}, + } + + let permitless_sync = Arc::new(Barrier::new(2)); + let passed_permitless_sync = permitless_sync.clone(); + // now, we should be able to spawn only via the "permitless" queue + sender_queues.permitless_tx.send_async(async move { + passed_permitless_sync.wait().await; + }, "test_task4")?; + + let (did_run_tx, mut did_run_rx) = oneshot::channel(); + // but other queues should only run after we freed up space: + sender_queues.example2_tx.send_async(async move { + let _ = did_run_tx.send(()); + }, "test_task5")?; + + // see if the permitless one ran + select! { + _ = sleep(Duration::from_millis(100)) => panic!("the permitless task should be executed"), + _ = permitless_sync.wait() => {}, + } + + // see if the other one ran + select! { + _ = &mut did_run_rx => panic!("the task should not be executed yet"), + // it's probably fine after one ms - increase if this fails spuriously. Sorry! + // feel free to improve the approach here + _ = sleep(Duration::from_millis(1)) => {}, + } + + // allow the three blocking tasks to finish + continue_notify.notify_waiters(); + + // now, the waiting task should be scheduled + select! { + _ = sleep(Duration::from_millis(100)) => panic!("the task should be executed now"), + _ = did_run_rx => {}, + } + + Ok(()) +} From c7a21b8373737d9d0e8ca9a2d29a28373d476268 Mon Sep 17 00:00:00 2001 From: Daniel Knopik Date: Fri, 6 Dec 2024 08:13:11 +0100 Subject: [PATCH 11/12] cargo fmt --- anchor/processor/tests/processor_tests.rs | 41 ++++++++++++++--------- 1 file changed, 26 insertions(+), 15 deletions(-) diff --git a/anchor/processor/tests/processor_tests.rs b/anchor/processor/tests/processor_tests.rs index 09d2b937..43767818 100644 --- a/anchor/processor/tests/processor_tests.rs +++ b/anchor/processor/tests/processor_tests.rs @@ -13,9 +13,7 @@ async fn test_max_workers() -> Result<(), Box> { let (shutdown_tx, _) = futures::channel::mpsc::channel(1); let executor = TaskExecutor::new(handle, exit, shutdown_tx); - let config = processor::Config { - max_workers: 3, - }; + let config = processor::Config { max_workers: 3 }; let mut sender_queues = processor::spawn(config, executor); @@ -26,14 +24,21 @@ async fn test_max_workers() -> Result<(), Box> { for _ in 0..3 { let start_sync = start_sync.clone(); let continue_notify = continue_notify.clone(); - sender_queues.example2_tx.send_async(async move { - start_sync.wait().await; - continue_notify.notified().await; - }, "test_task1")?; + sender_queues.example2_tx.send_async( + async move { + start_sync.wait().await; + continue_notify.notified().await; + }, + "test_task1", + )?; // throw in some permitless tasks - sender_queues.permitless_tx.send_blocking(|| {}, "test_task2")?; - sender_queues.permitless_tx.send_immediate(|_, _| {}, "test_task3")?; + sender_queues + .permitless_tx + .send_blocking(|| {}, "test_task2")?; + sender_queues + .permitless_tx + .send_immediate(|_, _| {}, "test_task3")?; } // wait until every task has been spawned @@ -45,15 +50,21 @@ async fn test_max_workers() -> Result<(), Box> { let permitless_sync = Arc::new(Barrier::new(2)); let passed_permitless_sync = permitless_sync.clone(); // now, we should be able to spawn only via the "permitless" queue - sender_queues.permitless_tx.send_async(async move { - passed_permitless_sync.wait().await; - }, "test_task4")?; + sender_queues.permitless_tx.send_async( + async move { + passed_permitless_sync.wait().await; + }, + "test_task4", + )?; let (did_run_tx, mut did_run_rx) = oneshot::channel(); // but other queues should only run after we freed up space: - sender_queues.example2_tx.send_async(async move { - let _ = did_run_tx.send(()); - }, "test_task5")?; + sender_queues.example2_tx.send_async( + async move { + let _ = did_run_tx.send(()); + }, + "test_task5", + )?; // see if the permitless one ran select! { From 70a0055dd9f53684297bf57c328f07f3ee8807be Mon Sep 17 00:00:00 2001 From: Daniel Knopik Date: Fri, 6 Dec 2024 08:13:38 +0100 Subject: [PATCH 12/12] mark senders as unused for now --- anchor/client/src/lib.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/anchor/client/src/lib.rs b/anchor/client/src/lib.rs index 539028ac..c4b67402 100644 --- a/anchor/client/src/lib.rs +++ b/anchor/client/src/lib.rs @@ -43,7 +43,7 @@ impl Client { ); // Start the processor - let processor_senders = processor::spawn(config.processor, executor.clone()); + let _processor_senders = processor::spawn(config.processor, executor.clone()); // Optionally start the metrics server. let _http_metrics_shared_state = if config.http_metrics.enabled {