From bb55f6f8a9aa0af0de301bd23ad1e4009f3a2d7e Mon Sep 17 00:00:00 2001 From: Brian Martin Date: Tue, 5 Mar 2024 11:26:32 -0800 Subject: [PATCH] async-queues: refactoring Refactors the async-queues workload to split the broadcast and mpmc tests into two binary targets. This allows better naming for terms that are specific to each type of queue, differing configuration parameters for each, and better clarity when looking at the source code. --- async-queues/Cargo.toml | 12 +- async-queues/src/async-queues-broadcast.rs | 25 ++ async-queues/src/async-queues-mpmc.rs | 25 ++ .../src/{ => broadcast}/async_broadcast.rs | 1 + .../broadcaster.rs} | 1 + async-queues/src/broadcast/mod.rs | 202 +++++++++++++++++ .../splaycast.rs} | 1 + .../tokio.rs} | 1 + .../widecast.rs} | 1 + async-queues/src/lib.rs | 214 ++---------------- async-queues/src/main.rs | 39 ---- async-queues/src/{ => mpmc}/async_channel.rs | 5 +- .../{asyncstd_channel.rs => mpmc/asyncstd.rs} | 5 +- .../src/{flume_channel.rs => mpmc/flume.rs} | 5 +- .../src/{kanal_channel.rs => mpmc/kanal.rs} | 5 +- async-queues/src/mpmc/mod.rs | 184 +++++++++++++++ .../{postage_channel.rs => mpmc/postage.rs} | 5 +- 17 files changed, 483 insertions(+), 248 deletions(-) create mode 100644 async-queues/src/async-queues-broadcast.rs create mode 100644 async-queues/src/async-queues-mpmc.rs rename async-queues/src/{ => broadcast}/async_broadcast.rs (99%) rename async-queues/src/{broadcaster_broadcast.rs => broadcast/broadcaster.rs} (99%) create mode 100644 async-queues/src/broadcast/mod.rs rename async-queues/src/{splaycast_broadcast.rs => broadcast/splaycast.rs} (99%) rename async-queues/src/{tokio_broadcast.rs => broadcast/tokio.rs} (99%) rename async-queues/src/{widecast_broadcast.rs => broadcast/widecast.rs} (99%) delete mode 100644 async-queues/src/main.rs rename async-queues/src/{ => mpmc}/async_channel.rs (94%) rename async-queues/src/{asyncstd_channel.rs => mpmc/asyncstd.rs} (95%) rename async-queues/src/{flume_channel.rs => mpmc/flume.rs} (94%) rename async-queues/src/{kanal_channel.rs => mpmc/kanal.rs} (95%) create mode 100644 async-queues/src/mpmc/mod.rs rename async-queues/src/{postage_channel.rs => mpmc/postage.rs} (95%) diff --git a/async-queues/Cargo.toml b/async-queues/Cargo.toml index 571d976..590cd5e 100644 --- a/async-queues/Cargo.toml +++ b/async-queues/Cargo.toml @@ -6,6 +6,14 @@ authors = ["Brian Martin "] repository = "https://github.com/iopsystems/workloads" license = "MIT OR Apache-2.0" +[[bin]] +name = "async-queues-broadcast" +path = "src/async-queues-broadcast.rs" + +[[bin]] +name = "async-queues-mpmc" +path = "src/async-queues-mpmc.rs" + [dependencies] async-broadcast = "0.6.0" async-channel = "2.1.1" @@ -25,7 +33,3 @@ splaycast = "0.2.0" tokio = { version = "1.35.1", features = ["full"] } tokio-stream = "0.1.14" widecast = { git = "https://github.com/Phantomical/widecast" } - -[[bin]] -name = "async-queues" -path = "src/main.rs" diff --git a/async-queues/src/async-queues-broadcast.rs b/async-queues/src/async-queues-broadcast.rs new file mode 100644 index 0000000..1382670 --- /dev/null +++ b/async-queues/src/async-queues-broadcast.rs @@ -0,0 +1,25 @@ +use async_queues::broadcast; +use async_queues::broadcast::{Config, Test}; +use clap::Parser; + +fn main() { + let config = Config::parse(); + + match config.test() { + Test::AsyncBroadcast => { + let _ = broadcast::async_broadcast::run(config); + } + Test::Broadcaster => { + let _ = broadcast::broadcaster::run(config); + } + Test::Splaycast => { + let _ = broadcast::splaycast::run(config); + } + Test::Tokio => { + let _ = broadcast::tokio::run(config); + } + Test::Widecast => { + let _ = broadcast::widecast::run(config); + } + }; +} diff --git a/async-queues/src/async-queues-mpmc.rs b/async-queues/src/async-queues-mpmc.rs new file mode 100644 index 0000000..8a81f4f --- /dev/null +++ b/async-queues/src/async-queues-mpmc.rs @@ -0,0 +1,25 @@ +use async_queues::mpmc; +use async_queues::mpmc::{Config, Test}; +use clap::Parser; + +fn main() { + let config = Config::parse(); + + match config.test() { + Test::AsyncChannel => { + let _ = mpmc::async_channel::run(config); + } + Test::Asyncstd => { + let _ = mpmc::asyncstd::run(config); + } + Test::Flume => { + let _ = mpmc::flume::run(config); + } + Test::Kanal => { + let _ = mpmc::kanal::run(config); + } + Test::Postage => { + let _ = mpmc::postage::run(config); + } + }; +} diff --git a/async-queues/src/async_broadcast.rs b/async-queues/src/broadcast/async_broadcast.rs similarity index 99% rename from async-queues/src/async_broadcast.rs rename to async-queues/src/broadcast/async_broadcast.rs index 887fef1..9bf091e 100644 --- a/async-queues/src/async_broadcast.rs +++ b/async-queues/src/broadcast/async_broadcast.rs @@ -1,3 +1,4 @@ +use super::Config; use crate::*; use ::async_broadcast::{Receiver, RecvError, Sender}; diff --git a/async-queues/src/broadcaster_broadcast.rs b/async-queues/src/broadcast/broadcaster.rs similarity index 99% rename from async-queues/src/broadcaster_broadcast.rs rename to async-queues/src/broadcast/broadcaster.rs index 4d81920..917bc5b 100644 --- a/async-queues/src/broadcaster_broadcast.rs +++ b/async-queues/src/broadcast/broadcaster.rs @@ -1,3 +1,4 @@ +use super::Config; use crate::*; use broadcaster::*; diff --git a/async-queues/src/broadcast/mod.rs b/async-queues/src/broadcast/mod.rs new file mode 100644 index 0000000..fbc308b --- /dev/null +++ b/async-queues/src/broadcast/mod.rs @@ -0,0 +1,202 @@ +use crate::*; +use clap::Parser; +use ratelimit::Ratelimiter; +use std::sync::Arc; + +pub mod async_broadcast; +pub mod broadcaster; +pub mod splaycast; +pub mod tokio; +pub mod widecast; + +#[derive(ValueEnum, Copy, Clone, Debug, PartialEq)] +#[clap(rename_all = "snake_case")] +pub enum Test { + AsyncBroadcast, + Broadcaster, + Splaycast, + Tokio, + Widecast, +} + +#[derive(Parser, Debug, Clone)] +#[command(author, version, about, long_about = None)] +pub struct Config { + #[arg(long)] + test: Test, + + #[arg(long, default_value_t = 60)] + duration: u64, + + #[arg(long, default_value_t = 128)] + queue_depth: usize, + + #[arg(long, default_value_t = false)] + split_runtime: bool, + + #[arg(long, default_value_t = 64)] + message_length: usize, + + #[arg(long, default_value_t = 1)] + threads: usize, + + #[arg(long, default_value_t = 61)] + global_queue_interval: u32, + + #[arg(long, default_value_t = 61)] + event_interval: u32, + + #[arg(long, default_value_t = 1)] + publishers: usize, + #[arg(long, default_value_t = 1)] + publisher_threads: usize, + + #[arg(long, default_value_t = 1000)] + publish_rate: u64, + + #[arg(long, default_value_t = 1)] + subscribers: usize, + #[arg(long, default_value_t = 1)] + subscriber_threads: usize, + + #[arg(long, default_value_t = 0)] + fanout: u8, + #[arg(long, default_value_t = 1)] + fanout_threads: usize, + + #[arg(long, default_value = None)] + histogram: Option, +} + +impl Config { + pub fn test(&self) -> Test { + self.test + } + + /// Create a ratelimiter for message sending based on the config + pub fn ratelimiter(&self) -> Arc> { + if self.publish_rate == 0 { + return Arc::new(None); + } + + let quanta = (self.publish_rate / 1_000_000) + 1; + let delay = quanta * Duration::from_secs(1).as_nanos() as u64 / self.publish_rate; + + Arc::new(Some( + Ratelimiter::builder(quanta, Duration::from_nanos(delay)) + .max_tokens(quanta) + .build() + .unwrap(), + )) + } + + /// Return the queue depth to be used for the channel/queue + pub fn queue_depth(&self) -> usize { + self.queue_depth + } + + pub fn subscribers(&self) -> usize { + self.subscribers + } + + pub fn publishers(&self) -> usize { + self.publishers + } + + pub fn fanout(&self) -> u8 { + self.fanout + } + + pub fn message_length(&self) -> usize { + self.message_length + } + + /// Creates a collection of tokio runtimes, either one combined runtime or + /// a dual runtime depending on the configuration + pub fn runtime(&self) -> Runtime { + let combined = self._runtime(self.threads); + + let publisher = if !self.split_runtime { + None + } else { + Some(self._runtime(self.publisher_threads)) + }; + + let subscriber = if !self.split_runtime { + None + } else { + Some(self._runtime(self.subscriber_threads)) + }; + + let fanout = if !self.split_runtime { + None + } else { + Some(self._runtime(self.fanout_threads)) + }; + + Runtime { + config: self.clone(), + combined, + publisher, + subscriber, + fanout, + } + } + + /// Internal function to create a tokio runtime with a given number of + /// threads. This makes sure we use consistent configuration for each + /// runtime + fn _runtime(&self, threads: usize) -> ::tokio::runtime::Runtime { + ::tokio::runtime::Builder::new_multi_thread() + .worker_threads(threads) + .enable_all() + .global_queue_interval(self.global_queue_interval) + .event_interval(self.event_interval) + .build() + .expect("failed to initialize runtime") + } +} + +impl AsyncQueuesConfig for Config { + fn duration(&self) -> std::time::Duration { + core::time::Duration::from_secs(self.duration) + } + + fn histogram(&self) -> std::option::Option<&str> { + self.histogram.as_deref() + } + + fn global_queue_interval(&self) -> u32 { + self.global_queue_interval + } + + fn event_interval(&self) -> u32 { + self.event_interval + } +} + +impl std::fmt::Display for Config { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + if self.split_runtime { + write!(f, "publishers: {} publisher_threads: {} subscribers: {} subscriber_threads: {} fanout: {} fanout_threads: {} publish_rate: {} queue_depth: {} send: {} recv: {} drop: {}", + self.publishers, + self.publisher_threads, + self.subscribers, + self.subscriber_threads, + self.fanout, + self.fanout_threads, + self.publish_rate, + self.queue_depth, + SEND.value(), RECV_OK.value(), DROPPED.value()) + } else { + write!(f, "publishers: {} subscribers: {} fanout: {} threads: {} publish_rate: {} queue_depth: {} send: {} recv: {} drop: {}", + self.publishers, + self.subscribers, + self.fanout, + self.threads, + self.publish_rate, + self.queue_depth, + SEND.value(), RECV_OK.value(), DROPPED.value()) + } + } +} diff --git a/async-queues/src/splaycast_broadcast.rs b/async-queues/src/broadcast/splaycast.rs similarity index 99% rename from async-queues/src/splaycast_broadcast.rs rename to async-queues/src/broadcast/splaycast.rs index a6778a2..3ed9232 100644 --- a/async-queues/src/splaycast_broadcast.rs +++ b/async-queues/src/broadcast/splaycast.rs @@ -1,3 +1,4 @@ +use super::Config; use crate::*; use futures_lite::StreamExt; diff --git a/async-queues/src/tokio_broadcast.rs b/async-queues/src/broadcast/tokio.rs similarity index 99% rename from async-queues/src/tokio_broadcast.rs rename to async-queues/src/broadcast/tokio.rs index 97b3d62..e3473e7 100644 --- a/async-queues/src/tokio_broadcast.rs +++ b/async-queues/src/broadcast/tokio.rs @@ -1,3 +1,4 @@ +use super::Config; use crate::*; use ratelimit::Ratelimiter; diff --git a/async-queues/src/widecast_broadcast.rs b/async-queues/src/broadcast/widecast.rs similarity index 99% rename from async-queues/src/widecast_broadcast.rs rename to async-queues/src/broadcast/widecast.rs index 1ad28b4..38c4567 100644 --- a/async-queues/src/widecast_broadcast.rs +++ b/async-queues/src/broadcast/widecast.rs @@ -1,3 +1,4 @@ +use super::Config; use crate::*; use ratelimit::Ratelimiter; diff --git a/async-queues/src/lib.rs b/async-queues/src/lib.rs index c939099..051eeea 100644 --- a/async-queues/src/lib.rs +++ b/async-queues/src/lib.rs @@ -1,40 +1,14 @@ -use clap::{Parser, ValueEnum}; +use clap::ValueEnum; use histogram::SparseHistogram; use metriken::{metric, AtomicHistogram, Counter}; -use ratelimit::Ratelimiter; use std::future::Future; use std::io::Write; use std::sync::atomic::{AtomicBool, Ordering}; -use std::sync::Arc; use std::time::{Duration, Instant}; use tokio::task::JoinHandle; -pub mod async_broadcast; -pub mod broadcaster_broadcast; -pub mod splaycast_broadcast; -pub mod tokio_broadcast; -pub mod widecast_broadcast; - -pub mod async_channel; -pub mod asyncstd_channel; -pub mod flume_channel; -pub mod kanal_channel; -pub mod postage_channel; - -#[derive(ValueEnum, Copy, Clone, Debug, PartialEq)] -#[clap(rename_all = "snake_case")] -pub enum Test { - AsyncBroadcast, - BroadcasterBroadcast, - SplaycastBroadcast, - TokioBroadcast, - WidecastBroadcast, - AsyncChannel, - AsyncstdChannel, - FlumeChannel, - KanalChannel, - PostageChannel, -} +pub mod broadcast; +pub mod mpmc; pub static RUNNING: AtomicBool = AtomicBool::new(true); @@ -71,144 +45,6 @@ pub static RECV_BYTES: Counter = Counter::new(); #[metric(name = "dropped")] pub static DROPPED: Counter = Counter::new(); -#[derive(Parser, Debug, Clone)] -#[command(author, version, about, long_about = None)] -pub struct Config { - #[arg(long)] - test: Test, - - #[arg(long, default_value_t = 60)] - duration: u64, - - #[arg(long, default_value_t = 128)] - queue_depth: usize, - - #[arg(long, default_value_t = false)] - split_runtime: bool, - - #[arg(long, default_value_t = 64)] - message_length: usize, - - #[arg(long, default_value_t = 1)] - threads: usize, - - #[arg(long, default_value_t = 61)] - global_queue_interval: u32, - - #[arg(long, default_value_t = 61)] - event_interval: u32, - - #[arg(long, default_value_t = 1)] - publishers: usize, - #[arg(long, default_value_t = 1)] - publisher_threads: usize, - - #[arg(long, default_value_t = 1000)] - publish_rate: u64, - - #[arg(long, default_value_t = 1)] - subscribers: usize, - #[arg(long, default_value_t = 1)] - subscriber_threads: usize, - - #[arg(long, default_value_t = 0)] - fanout: u8, - #[arg(long, default_value_t = 1)] - fanout_threads: usize, - - #[arg(long, default_value = None)] - histogram: Option, -} - -impl Config { - pub fn test(&self) -> Test { - self.test - } - - /// Create a ratelimiter for message sending based on the config - pub fn ratelimiter(&self) -> Arc> { - if self.publish_rate == 0 { - return Arc::new(None); - } - - let quanta = (self.publish_rate / 1_000_000) + 1; - let delay = quanta * Duration::from_secs(1).as_nanos() as u64 / self.publish_rate; - - Arc::new(Some( - Ratelimiter::builder(quanta, Duration::from_nanos(delay)) - .max_tokens(quanta) - .build() - .unwrap(), - )) - } - - /// Return the queue depth to be used for the channel/queue - pub fn queue_depth(&self) -> usize { - self.queue_depth - } - - pub fn subscribers(&self) -> usize { - self.subscribers - } - - pub fn publishers(&self) -> usize { - self.publishers - } - - pub fn fanout(&self) -> u8 { - self.fanout - } - - pub fn message_length(&self) -> usize { - self.message_length - } - - /// Creates a collection of tokio runtimes, either one combined runtime or - /// a dual runtime depending on the configuration - pub fn runtime(&self) -> Runtime { - let combined = self._runtime(self.threads); - - let publisher = if !self.split_runtime { - None - } else { - Some(self._runtime(self.publisher_threads)) - }; - - let subscriber = if !self.split_runtime { - None - } else { - Some(self._runtime(self.subscriber_threads)) - }; - - let fanout = if !self.split_runtime { - None - } else { - Some(self._runtime(self.fanout_threads)) - }; - - Runtime { - config: self.clone(), - combined, - publisher, - subscriber, - fanout, - } - } - - /// Internal function to create a tokio runtime with a given number of - /// threads. This makes sure we use consistent configuration for each - /// runtime - fn _runtime(&self, threads: usize) -> tokio::runtime::Runtime { - tokio::runtime::Builder::new_multi_thread() - .worker_threads(threads) - .enable_all() - .global_queue_interval(self.global_queue_interval) - .event_interval(self.event_interval) - .build() - .expect("failed to initialize runtime") - } -} - #[derive(Clone)] pub struct Message { timestamp: Instant, @@ -231,17 +67,26 @@ impl Message { } } +pub trait AsyncQueuesConfig { + fn duration(&self) -> Duration; + + fn histogram(&self) -> Option<&str>; + + fn global_queue_interval(&self) -> u32; + fn event_interval(&self) -> u32; +} + /// An abstraction for having either a combined runtime, or separate runtimes /// for publishers and subscribers -pub struct Runtime { - config: Config, +pub struct Runtime { + config: T, combined: tokio::runtime::Runtime, publisher: Option, subscriber: Option, fanout: Option, } -impl Runtime { +impl Runtime { pub fn spawn_publisher(&self, future: F) -> JoinHandle where F: Future + Send + 'static, @@ -270,7 +115,7 @@ impl Runtime { let start = std::time::Instant::now(); RUNNING.store(true, Ordering::Relaxed); - std::thread::sleep(Duration::from_secs(self.config.duration)); + std::thread::sleep(self.config.duration()); let stop = std::time::Instant::now(); RUNNING.store(false, Ordering::Relaxed); @@ -279,31 +124,10 @@ impl Runtime { println!( "global_queue_interval: {} event_interval: {}", - self.config.global_queue_interval, self.config.event_interval + self.config.global_queue_interval(), + self.config.event_interval() ); - if self.config.split_runtime { - println!("publishers: {} publisher_threads: {} subscribers: {} subscriber_threads: {} fanout: {} fanout_threads: {} publish_rate: {} queue_depth: {} send: {} recv: {} drop: {}", - self.config.publishers, - self.config.publisher_threads, - self.config.subscribers, - self.config.subscriber_threads, - self.config.fanout, - self.config.fanout_threads, - self.config.publish_rate, - self.config.queue_depth, - SEND.value(), RECV_OK.value(), DROPPED.value()); - } else { - println!("publishers: {} subscribers: {} fanout: {} threads: {} publish_rate: {} queue_depth: {} send: {} recv: {} drop: {}", - self.config.publishers, - self.config.subscribers, - self.config.fanout, - self.config.threads, - self.config.publish_rate, - self.config.queue_depth, - SEND.value(), RECV_OK.value(), DROPPED.value()); - } - let elapsed = stop.duration_since(start).as_secs_f64(); println!( "publish/s: {:.2} receive/s: {:.2} drop/s: {:.2}", @@ -327,7 +151,7 @@ impl Runtime { latencies[6].1.end() / 1000, ); - if let Some(path) = self.config.histogram { + if let Some(path) = self.config.histogram() { let sparse = SparseHistogram::from(&histogram); let json = serde_json::to_string(&sparse).expect("failed to serialize"); let mut file = std::fs::File::create(path).expect("failed to create file"); diff --git a/async-queues/src/main.rs b/async-queues/src/main.rs deleted file mode 100644 index 24a4280..0000000 --- a/async-queues/src/main.rs +++ /dev/null @@ -1,39 +0,0 @@ -use async_queues::*; -use clap::Parser; - -fn main() { - let config = Config::parse(); - - match config.test() { - Test::AsyncBroadcast => { - let _ = async_broadcast::run(config); - } - Test::AsyncChannel => { - let _ = async_channel::run(config); - } - Test::AsyncstdChannel => { - let _ = asyncstd_channel::run(config); - } - Test::BroadcasterBroadcast => { - let _ = broadcaster_broadcast::run(config); - } - Test::FlumeChannel => { - let _ = flume_channel::run(config); - } - Test::KanalChannel => { - let _ = kanal_channel::run(config); - } - Test::PostageChannel => { - let _ = postage_channel::run(config); - } - Test::SplaycastBroadcast => { - let _ = splaycast_broadcast::run(config); - } - Test::TokioBroadcast => { - let _ = tokio_broadcast::run(config); - } - Test::WidecastBroadcast => { - let _ = widecast_broadcast::run(config); - } - }; -} diff --git a/async-queues/src/async_channel.rs b/async-queues/src/mpmc/async_channel.rs similarity index 94% rename from async-queues/src/async_channel.rs rename to async-queues/src/mpmc/async_channel.rs index df9b30e..1194067 100644 --- a/async-queues/src/async_channel.rs +++ b/async-queues/src/mpmc/async_channel.rs @@ -1,3 +1,4 @@ +use super::Config; use crate::*; use ::async_channel::{Receiver, Sender}; @@ -13,11 +14,11 @@ pub fn run(config: Config) -> Result<(), Box> { let runtime = config.runtime(); - for _ in 0..config.subscribers() { + for _ in 0..config.consumers() { runtime.spawn_subscriber(receiver(rx.clone())); } - for _ in 0..config.publishers() { + for _ in 0..config.producers() { runtime.spawn_publisher(sender(config.clone(), tx.clone(), ratelimiter.clone())); } diff --git a/async-queues/src/asyncstd_channel.rs b/async-queues/src/mpmc/asyncstd.rs similarity index 95% rename from async-queues/src/asyncstd_channel.rs rename to async-queues/src/mpmc/asyncstd.rs index 04e0d64..0ee0a2e 100644 --- a/async-queues/src/asyncstd_channel.rs +++ b/async-queues/src/mpmc/asyncstd.rs @@ -1,3 +1,4 @@ +use super::Config; use crate::*; use async_std::channel::{Receiver, Sender}; @@ -13,11 +14,11 @@ pub fn run(config: Config) -> Result<(), Box> { let runtime = config.runtime(); - for _ in 0..config.subscribers() { + for _ in 0..config.consumers() { runtime.spawn_subscriber(receiver(rx.clone())); } - for _ in 0..config.publishers() { + for _ in 0..config.producers() { runtime.spawn_publisher(sender(config.clone(), tx.clone(), ratelimiter.clone())); } diff --git a/async-queues/src/flume_channel.rs b/async-queues/src/mpmc/flume.rs similarity index 94% rename from async-queues/src/flume_channel.rs rename to async-queues/src/mpmc/flume.rs index 80dc977..4377630 100644 --- a/async-queues/src/flume_channel.rs +++ b/async-queues/src/mpmc/flume.rs @@ -1,3 +1,4 @@ +use super::Config; use crate::*; use flume::{Receiver, Sender}; @@ -13,11 +14,11 @@ pub fn run(config: Config) -> Result<(), Box> { let runtime = config.runtime(); - for _ in 0..config.subscribers() { + for _ in 0..config.consumers() { runtime.spawn_subscriber(receiver(rx.clone())); } - for _ in 0..config.publishers() { + for _ in 0..config.producers() { runtime.spawn_publisher(sender(config.clone(), tx.clone(), ratelimiter.clone())); } diff --git a/async-queues/src/kanal_channel.rs b/async-queues/src/mpmc/kanal.rs similarity index 95% rename from async-queues/src/kanal_channel.rs rename to async-queues/src/mpmc/kanal.rs index 546680d..6eb0df4 100644 --- a/async-queues/src/kanal_channel.rs +++ b/async-queues/src/mpmc/kanal.rs @@ -1,3 +1,4 @@ +use super::Config; use crate::*; use kanal::{AsyncReceiver, AsyncSender}; @@ -13,11 +14,11 @@ pub fn run(config: Config) -> Result<(), Box> { let runtime = config.runtime(); - for _ in 0..config.subscribers() { + for _ in 0..config.consumers() { runtime.spawn_subscriber(receiver(rx.clone())); } - for _ in 0..config.publishers() { + for _ in 0..config.producers() { runtime.spawn_publisher(sender(config.clone(), tx.clone(), ratelimiter.clone())); } diff --git a/async-queues/src/mpmc/mod.rs b/async-queues/src/mpmc/mod.rs new file mode 100644 index 0000000..55d39ca --- /dev/null +++ b/async-queues/src/mpmc/mod.rs @@ -0,0 +1,184 @@ +use crate::*; +use clap::Parser; +use ratelimit::Ratelimiter; +use std::sync::Arc; + +pub mod async_channel; +pub mod asyncstd; +pub mod flume; +pub mod kanal; +pub mod postage; + +#[derive(ValueEnum, Copy, Clone, Debug, PartialEq)] +#[clap(rename_all = "snake_case")] +pub enum Test { + AsyncChannel, + Asyncstd, + Flume, + Kanal, + Postage, +} + +#[derive(Parser, Debug, Clone)] +#[command(author, version, about, long_about = None)] +pub struct Config { + #[arg(long)] + test: Test, + + #[arg(long, default_value_t = 60)] + duration: u64, + + #[arg(long, default_value_t = 128)] + queue_depth: usize, + + #[arg(long, default_value_t = false)] + split_runtime: bool, + + #[arg(long, default_value_t = 64)] + message_length: usize, + + #[arg(long, default_value_t = 1)] + threads: usize, + + #[arg(long, default_value_t = 61)] + global_queue_interval: u32, + + #[arg(long, default_value_t = 61)] + event_interval: u32, + + #[arg(long, default_value_t = 1)] + producers: usize, + #[arg(long, default_value_t = 1)] + producer_threads: usize, + + #[arg(long, default_value_t = 1000)] + producer_rate: u64, + + #[arg(long, default_value_t = 1)] + consumers: usize, + #[arg(long, default_value_t = 1)] + consumer_threads: usize, + + #[arg(long, default_value = None)] + histogram: Option, +} + +impl Config { + pub fn test(&self) -> Test { + self.test + } + + /// Create a ratelimiter for message sending based on the config + pub fn ratelimiter(&self) -> Arc> { + if self.producer_rate == 0 { + return Arc::new(None); + } + + let quanta = (self.producer_rate / 1_000_000) + 1; + let delay = quanta * Duration::from_secs(1).as_nanos() as u64 / self.producer_rate; + + Arc::new(Some( + Ratelimiter::builder(quanta, Duration::from_nanos(delay)) + .max_tokens(quanta) + .build() + .unwrap(), + )) + } + + /// Return the queue depth to be used for the channel/queue + pub fn queue_depth(&self) -> usize { + self.queue_depth + } + + pub fn consumers(&self) -> usize { + self.consumers + } + + pub fn producers(&self) -> usize { + self.producers + } + + pub fn message_length(&self) -> usize { + self.message_length + } + + /// Creates a collection of tokio runtimes, either one combined runtime or + /// a dual runtime depending on the configuration + pub fn runtime(&self) -> Runtime { + let combined = self._runtime(self.threads); + + let publisher = if !self.split_runtime { + None + } else { + Some(self._runtime(self.producer_threads)) + }; + + let subscriber = if !self.split_runtime { + None + } else { + Some(self._runtime(self.consumer_threads)) + }; + + Runtime { + config: self.clone(), + combined, + publisher, + subscriber, + fanout: None, + } + } + + /// Internal function to create a tokio runtime with a given number of + /// threads. This makes sure we use consistent configuration for each + /// runtime + fn _runtime(&self, threads: usize) -> tokio::runtime::Runtime { + tokio::runtime::Builder::new_multi_thread() + .worker_threads(threads) + .enable_all() + .global_queue_interval(self.global_queue_interval) + .event_interval(self.event_interval) + .build() + .expect("failed to initialize runtime") + } +} + +impl AsyncQueuesConfig for Config { + fn duration(&self) -> std::time::Duration { + core::time::Duration::from_secs(self.duration) + } + + fn histogram(&self) -> std::option::Option<&str> { + self.histogram.as_deref() + } + + fn global_queue_interval(&self) -> u32 { + self.global_queue_interval + } + + fn event_interval(&self) -> u32 { + self.event_interval + } +} + +impl std::fmt::Display for Config { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + if self.split_runtime { + write!(f, "producers: {} producer_threads: {} consumers: {} consumer_threads: {} producer_rate: {} queue_depth: {} send: {} recv: {} drop: {}", + self.producers, + self.producer_threads, + self.consumers, + self.consumer_threads, + self.producer_rate, + self.queue_depth, + SEND.value(), RECV_OK.value(), DROPPED.value()) + } else { + write!(f, "producers: {} consumers: {} threads: {} producer_rate: {} queue_depth: {} send: {} recv: {} drop: {}", + self.producers, + self.consumers, + self.threads, + self.producer_rate, + self.queue_depth, + SEND.value(), RECV_OK.value(), DROPPED.value()) + } + } +} diff --git a/async-queues/src/postage_channel.rs b/async-queues/src/mpmc/postage.rs similarity index 95% rename from async-queues/src/postage_channel.rs rename to async-queues/src/mpmc/postage.rs index 4d9c0d9..9df78c1 100644 --- a/async-queues/src/postage_channel.rs +++ b/async-queues/src/mpmc/postage.rs @@ -1,3 +1,4 @@ +use super::Config; use crate::*; use postage::dispatch::{Receiver, Sender}; @@ -15,11 +16,11 @@ pub fn run(config: Config) -> Result<(), Box> { let runtime = config.runtime(); - for _ in 0..config.subscribers() { + for _ in 0..config.consumers() { runtime.spawn_subscriber(receiver(rx.clone())); } - for _ in 0..config.publishers() { + for _ in 0..config.producers() { runtime.spawn_publisher(sender(config.clone(), tx.clone(), ratelimiter.clone())); }