diff --git a/crossbeam-channel/benchmarks/Cargo.toml b/crossbeam-channel/benchmarks/Cargo.toml index 7b334b78c..fd201b6ae 100644 --- a/crossbeam-channel/benchmarks/Cargo.toml +++ b/crossbeam-channel/benchmarks/Cargo.toml @@ -14,6 +14,7 @@ flume = "0.11" futures = { version = "0.3", features = ["thread-pool"] } lockfree = "0.5.1" mpmc = "0.1.6" +orengine = "0.3.1" [[bin]] name = "atomicring" @@ -70,5 +71,10 @@ name = "mpmc" path = "mpmc.rs" doc = false +[[bin]] +name = "orengine" +path = "orengine.rs" +doc = false + [lints] workspace = true diff --git a/crossbeam-channel/benchmarks/orengine.rs b/crossbeam-channel/benchmarks/orengine.rs new file mode 100644 index 000000000..c5c876707 --- /dev/null +++ b/crossbeam-channel/benchmarks/orengine.rs @@ -0,0 +1,148 @@ +use orengine::{stop_all_executors, Executor}; +use orengine::sync::{global_scope, Channel}; + +mod message; + +const MESSAGES: usize = 5_000_000; +const THREADS: usize = 5; + +fn new(cap: Option) -> Channel { + match cap { + None => Channel::unbounded(), + Some(cap) => Channel::bounded(cap), + } +} + +fn seq(cap: Option) { + let cfg = orengine::runtime::Config::default() + .disable_io_worker() + .disable_work_sharing() + .set_numbers_of_thread_workers(0); + Executor::init_with_config(cfg).run_and_block_on_global(async move { + let chan = new(cap); + for i in 0..MESSAGES { + let _ = chan.send(message::new(i)).await; + } + + for _ in 0..MESSAGES { + let _ = chan.recv().await; + } + }).expect("failed to run"); +} + +fn spsc(cap: Option) { + let cfg = orengine::runtime::Config::default() + .disable_io_worker() + .disable_work_sharing() + .set_numbers_of_thread_workers(0); + Executor::init_with_config(cfg).run_and_block_on_global(async move { + let chan = new(cap); + + global_scope(|scope| async { + scope.spawn(async { + for i in 0..MESSAGES { + let _ = chan.send(message::new(i)).await; + } + }); + + for _ in 0..MESSAGES { + let _ = chan.recv().await; + } + }).await; + }).expect("failed to run"); +} + +fn mpsc(cap: Option) { + let cfg = orengine::runtime::Config::default() + .set_work_sharing_level(0) + .set_numbers_of_thread_workers(1); + let chan = new(cap); + crossbeam::scope(|s| { + for _ in 0..THREADS { + s.spawn(|_| { + Executor::init_with_config(cfg).run_with_global_future(async { + for i in 0..MESSAGES / THREADS { + let _ = chan.send(message::new(i)).await; + } + }) + }); + } + + Executor::init_with_config(cfg).run_with_global_future(async { + for _ in 0..MESSAGES { + let _ = chan.recv().await; + } + stop_all_executors(); + }); + }).expect("scope failed"); +} + +fn mpmc(cap: Option) { + let cfg = orengine::runtime::Config::default() + .set_work_sharing_level(1) + .set_numbers_of_thread_workers(0); + let chan = new(cap); + crossbeam::scope(|s| { + for _ in 0..THREADS { + s.spawn(|_| { + Executor::init_with_config(cfg).run_with_global_future(async { + for i in 0..MESSAGES / THREADS { + let _ = chan.send(message::new(i)).await; + } + }); + }); + } + + for _ in 0..THREADS { + s.spawn(|_| { + Executor::init_with_config(cfg).run_with_global_future(async { + for _ in 0..MESSAGES / THREADS { + let _ = chan.recv().await; + } + stop_all_executors(); + }); + }); + } + }).expect("scope failed"); +} + +fn main() { + macro_rules! run { + ($name:expr, $f:expr) => { + let mut total = 0; + for _ in 0..15 { + let now = ::std::time::Instant::now(); + $f; + let elapsed = now.elapsed(); + total += elapsed.as_nanos(); + } + + let avg = total as f64 / 15.0; + let elapsed = ::std::time::Duration::from_nanos(avg.round() as u64); + println!( + "{:25} {:15} {:7.3} sec", + $name, + "Rust orengine", + elapsed.as_secs() as f64 + elapsed.subsec_nanos() as f64 / 1e9 + ); + }; + } + + run!("bounded0_mpmc", mpmc(Some(0))); + run!("bounded0_mpsc", mpsc(Some(0))); + run!("bounded0_spsc", spsc(Some(0))); + + run!("bounded1_mpmc", mpmc(Some(1))); + run!("bounded1_mpsc", mpsc(Some(1))); + run!("bounded1_spsc", spsc(Some(1))); + + run!("bounded_mpmc", mpmc(Some(MESSAGES))); + run!("bounded_mpsc", mpsc(Some(MESSAGES))); + run!("bounded_seq", seq(Some(MESSAGES))); + run!("bounded_spsc", spsc(Some(MESSAGES))); + + run!("unbounded_mpmc", mpmc(None)); + run!("unbounded_mpsc", mpsc(None)); + run!("unbounded_seq", seq(None)); + run!("unbounded_spsc", spsc(None)); +} diff --git a/crossbeam-channel/benchmarks/plot.png b/crossbeam-channel/benchmarks/plot.png deleted file mode 100644 index f5ea13ac4..000000000 Binary files a/crossbeam-channel/benchmarks/plot.png and /dev/null differ diff --git a/crossbeam-channel/benchmarks/run.sh b/crossbeam-channel/benchmarks/run.sh index 32db1f0e9..ac8153a7e 100755 --- a/crossbeam-channel/benchmarks/run.sh +++ b/crossbeam-channel/benchmarks/run.sh @@ -4,7 +4,7 @@ IFS=$'\n\t' cd "$(dirname "$0")" cargo run --release --bin crossbeam-channel | tee crossbeam-channel.txt -cargo run --release --bin futures-channel | tee futures-channel.txt +cargo run --release --bin orengine | tee orengine.txt cargo run --release --bin mpsc | tee mpsc.txt cargo run --release --bin flume | tee flume.txt go run go.go | tee go.txt @@ -12,7 +12,7 @@ go run go.go | tee go.txt # These can also be run, but too many plot bars mess # up the plot (they start to overlap). So only 5 contenders # with the most tests are included by default. - +# cargo run --release --bin futures-channel | tee futures-channel.txt # cargo run --release --bin atomicringqueue | tee atomicringqueue.txt # cargo run --release --bin atomicring | tee atomicring.txt # cargo run --release --bin bus | tee bus.txt