diff --git a/Cargo.lock b/Cargo.lock index 9d5ce2d..13eddda 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -199,7 +199,9 @@ checksum = "a21f936df1771bf62b77f047b726c4625ff2e8aa607c01ec06e5a05bd8463401" dependencies = [ "android-tzdata", "iana-time-zone", + "js-sys", "num-traits", + "wasm-bindgen", "windows-targets", ] @@ -964,6 +966,7 @@ name = "scylla-perf" version = "1.0.0" dependencies = [ "anyhow", + "chrono", "clap", "comfy-table", "histogram 0.11.1", @@ -972,6 +975,7 @@ dependencies = [ "rand", "scylla", "tokio", + "tokio_schedule", ] [[package]] @@ -1148,6 +1152,16 @@ dependencies = [ "syn", ] +[[package]] +name = "tokio_schedule" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "61c291c554da3518d6ef69c76ea35aabc78f736185a16b6017f6d1c224dac2e0" +dependencies = [ + "chrono", + "tokio", +] + [[package]] name = "tracing" version = "0.1.41" diff --git a/Cargo.toml b/Cargo.toml index 5b95366..2f8c821 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -16,3 +16,5 @@ parse_duration = "2.1.1" comfy-table = "7.1.3" histogram = "0.11.1" human_format = "1.1.0" +tokio_schedule = "0.3.2" +chrono = "0.4.38" diff --git a/src/executor.rs b/src/executor.rs index be2c488..ede7143 100644 --- a/src/executor.rs +++ b/src/executor.rs @@ -1,4 +1,4 @@ -use crate::reporter::{QueryType, Reporter}; +use crate::reporter::{QueryType, Reporter, SimpleReporter}; use anyhow::Result; use rand::distributions::{Alphanumeric, DistString}; use rand::{random, Rng}; @@ -14,9 +14,9 @@ use tokio::sync::{broadcast, oneshot, Mutex}; pub struct Executor { concurrency: usize, reads_percentage: f32, - reporter: Arc>>, + reporter: Arc, key_values_range: Vec, - drop_test_keyspace: bool, + dont_drop_test_keyspace: bool, } pub struct KeyValue(String, Vec); @@ -34,8 +34,8 @@ impl Executor { value_blob_size: usize, reads_percentage: f32, total_keys: usize, - reporter: Box, - drop_test_keyspace: bool, + reporter: Arc, + dont_drop_test_keyspace: bool, ) -> Executor { if reads_percentage < 0.0 || reads_percentage > 1.0 { panic!("Reads percentage must be between 0.0 and 1.0"); @@ -48,8 +48,8 @@ impl Executor { key_string_length, value_blob_size, ), - reporter: Arc::new(Mutex::new(reporter)), - drop_test_keyspace, + reporter: reporter, + dont_drop_test_keyspace, } } @@ -71,12 +71,6 @@ impl Executor { .await?; let (tx_stop_coordinator, mut rx_stop_coordinator): (Sender<()>, Receiver<()>) = oneshot::channel(); - let (tx_stop_metrics_reporter, mut rx_stop_metrics_reporter): (Sender<()>, Receiver<()>) = - oneshot::channel(); - let (tx_metrics, mut rx_metrics): ( - broadcast::Sender<(QueryType, Duration)>, - broadcast::Receiver<(QueryType, Duration)>, - ) = broadcast::channel(10000); println!("Inserting initial key-value pairs..."); for kv in &self.key_values_range { perform_write(session.clone(), prepared_write.clone(), kv.clone()).await?; @@ -87,20 +81,7 @@ impl Executor { let key_values_range = self.key_values_range.clone(); let reads_percentage = self.reads_percentage.clone(); let reporter_clone = self.reporter.clone(); - let _ = tokio::task::spawn(async move { - let mut reporter = reporter_clone.lock().await; - loop { - if rx_stop_metrics_reporter.try_recv().is_ok() { - println!("Stopping metrics reporter..."); - break; - } - while let Ok(res) = rx_metrics.try_recv() { - (*reporter).report_results(res.0, res.1); - } - } - () - }); - let drop_test_keyspace_clone = self.drop_test_keyspace.clone(); + let dont_drop_test_keyspace_clone = self.dont_drop_test_keyspace.clone(); let coordinator_thread = tokio::task::spawn(async move { let current_concurrency = Arc::new(AtomicUsize::new(0)); loop { @@ -111,9 +92,7 @@ impl Executor { while current_concurrency.load(std::sync::atomic::Ordering::Relaxed) > 0 { tokio::time::sleep(Duration::from_millis(100)).await; } - println!("All tasks finished, stopping metrics reporter..."); - tx_stop_metrics_reporter.send(()).unwrap(); - if drop_test_keyspace_clone { + if !dont_drop_test_keyspace_clone { println!("Dropping test keyspace..."); let res = session.query_unpaged("DROP KEYSPACE test", &[]).await; if res.is_err() { @@ -127,12 +106,12 @@ impl Executor { while current_concurrency.load(std::sync::atomic::Ordering::Relaxed) < concurrency { current_concurrency.fetch_add(1, std::sync::atomic::Ordering::Relaxed); let session_clone = session.clone(); - let tx_metrics_clone = tx_metrics.clone(); let kvs = key_values_range.clone(); let reads_percentage = reads_percentage.clone(); let pread = prepared_read.clone(); let pwrite = prepared_write.clone(); let current_concurrency_clone = Arc::clone(¤t_concurrency); + let reporter_clone_clone = reporter_clone.clone(); tokio::spawn(async move { let kv = kvs.get(rand::thread_rng().gen_range(0..kvs.len())).unwrap(); let rng = random::(); @@ -152,15 +131,7 @@ impl Executor { } else { let q_type = res.0; let elapsed = res.1.unwrap(); - loop { - let result = tx_metrics_clone.send((q_type, elapsed)); - if result.is_ok() { - break; - } else { - println!("Error sending metrics: {:?}", result.err()); - break; - } - } + reporter_clone_clone.report_results(q_type, elapsed); } current_concurrency_clone .fetch_sub(1, std::sync::atomic::Ordering::Relaxed); diff --git a/src/main.rs b/src/main.rs index 19e7396..46fc287 100644 --- a/src/main.rs +++ b/src/main.rs @@ -12,6 +12,7 @@ use std::fmt::Debug; use std::num::NonZeroUsize; use std::sync::Arc; use std::time::Duration; +use tokio::sync::Mutex; #[derive(Parser, Debug)] #[command( @@ -24,10 +25,18 @@ struct Args { short, long, default_value = "1000", - help = "Number of concurrent requests at any given moment of time" + help = "Number of concurrent requests at any given moment of time per executor", )] pub concurrency: usize, + #[arg( + short, + long, + default_value = "1", + help = "Number of executors to run in parallel." + )] + pub executors_count: usize, + #[arg(short, long, value_parser = parse, default_value = "10s", help = "Duration of the benchmark" )] pub duration: Duration, @@ -107,13 +116,13 @@ struct Args { default_value = "true", help = "Drop the keyspace after the benchmark" )] - pub drop_test_keyspace: bool, + pub dont_drop_test_keyspace: bool, } -fn reporter_mode(mode: String, period: Duration) -> Box { - let reporter: Box = match mode.as_str() { - "simple" => Box::new(SimpleReporter::new(period)), - "percentile" => Box::new(PercentileReporter::new(period)), +fn reporter_mode(mode: String, period: Duration) -> SimpleReporter { + let reporter: SimpleReporter = match mode.as_str() { + "simple" => SimpleReporter::new(period), + // "percentile" => Box::new(PercentileReporter::new(period)), _ => panic!("Invalid mode: {}", mode), }; reporter @@ -131,7 +140,12 @@ async fn main() -> Result<()> { .collect::(); let pass = pass_first4 + &pass_after4; println!( - "Args: duration: {}s, scylla_host: {}, pool_size: {}, user: {}, password: {}, key_string_length: {}, value_blob_size: {}, reads_percentage: {}, total_keys: {}, report_mode: {}, report_period: {}, drop_test_keyspace: {}", + "Args: \ + duration: {}s, scylla_host: {}, pool_size: {},\n\ + user: {}, password: {}, key_string_length: {},\n\ + value_blob_size: {}, reads_percentage: {}, total_keys: {},\n\ + report_mode: {}, report_period: {}s, drop_test_keyspace: {}\ + executors: {}", args.duration.as_secs_f64(), args.scylla_hosts, args.pool_size, @@ -143,7 +157,8 @@ async fn main() -> Result<()> { args.total_keys, args.report_mode, args.report_period.as_secs_f64(), - args.drop_test_keyspace + args.dont_drop_test_keyspace, + args.executors_count ); let hosts_split = args.scylla_hosts.split(","); let mut builder = SessionBuilder::new() @@ -156,21 +171,42 @@ async fn main() -> Result<()> { builder = builder.known_node(host); } let session: Arc> = Arc::new(builder.build().await?); - let reporter = reporter_mode(args.report_mode, args.report_period); - let mut executor = executor::Executor::new( - args.concurrency, - args.key_string_length, - args.value_blob_size, - args.reads_percentage, - args.total_keys, - reporter, - args.drop_test_keyspace, - ); - let (stop_sender, executor_thread) = executor.start(session).await?; - tokio::time::sleep(args.duration).await; - if let Err(e) = stop_sender.send(()) { - println!("Error sending stop signal: {:?}", e); + let reporter = Arc::new(reporter_mode(args.report_mode, args.report_period)); + let reporter_clone_for_thread = reporter.clone(); + tokio::spawn(async move { + loop { + tokio::time::sleep(args.report_period).await; + reporter_clone_for_thread.print_report(); + } + }); + let mut handles = Vec::new(); + for i in 0..args.executors_count { + let i_clone = i.clone(); + let reporter_clone = reporter.clone(); + let session_clone = session.clone(); + let handle = tokio::spawn(async move { + let mut executor = executor::Executor::new( + args.concurrency, + args.key_string_length, + args.value_blob_size, + args.reads_percentage, + args.total_keys, + reporter_clone, + args.dont_drop_test_keyspace, + ); + let (stop_sender, executor_thread) = executor.start(session_clone).await.unwrap(); + tokio::time::sleep(args.duration).await; + println!("Requesting stop since the duration has passed"); + if let Err(e) = stop_sender.send(()) { + println!("Error sending stop signal: {:?}", e); + } + executor_thread.await.unwrap(); + println!("Executor #{} done", i_clone +1); + }); + handles.push(handle); + } + for handle in handles { + handle.await?; } - executor_thread.await?; Ok(()) } diff --git a/src/reporter.rs b/src/reporter.rs index c220424..7462084 100644 --- a/src/reporter.rs +++ b/src/reporter.rs @@ -4,15 +4,20 @@ use histogram::Histogram; use human_format::Formatter; use std::collections::BTreeMap; use std::fmt; -use std::time::Duration; +use std::sync::atomic::AtomicUsize; +use std::sync::Arc; +use tokio::spawn; +use tokio::sync::Mutex; use tokio::time::Instant; +use tokio_schedule::{every, Job}; +use std::time::Duration; pub trait Reporter { /// Create a new reporter with a given period between reports fn new(period: Duration) -> Self where Self: Sized; - fn report_results(&mut self, query_type: QueryType, time: Duration) -> (); + fn report_results(&self, query_type: QueryType, time: Duration) -> (); } #[derive(Hash, Eq, PartialEq, Copy, Clone, Debug, Ord, PartialOrd)] @@ -22,11 +27,10 @@ pub enum QueryType { Write, } -#[derive(Clone)] pub struct SimpleReporter { period: Duration, - request_counts: usize, - request_durations: Duration, + request_counts: AtomicUsize, + request_durations_micros: AtomicUsize, first_reported_at: Instant, last_reported_at: Instant, } @@ -40,39 +44,40 @@ pub struct PercentileReporter { last_reported_at: Instant, } -unsafe impl Send for SimpleReporter { - -} - -unsafe impl Send for PercentileReporter { - -} +unsafe impl Send for SimpleReporter {} +unsafe impl Send for PercentileReporter {} +unsafe impl Sync for SimpleReporter {} -impl Reporter for SimpleReporter { - fn new(period: Duration) -> Self { - SimpleReporter { +impl SimpleReporter { + pub fn new(period: Duration) -> Self { + let res = SimpleReporter { period, - request_counts: 0, - request_durations: Duration::from_secs(0), + request_counts: AtomicUsize::new(0), + request_durations_micros: AtomicUsize::new(0), last_reported_at: Instant::now(), first_reported_at: Instant::now(), - } + }; + res } - fn report_results(&mut self, _: QueryType, latency: Duration) -> () { - self.request_counts += 1; - self.request_durations += latency; - if self.last_reported_at.elapsed() > self.period { - let rps = self.request_counts as f64 / self.first_reported_at.elapsed().as_secs_f64(); - let avg_latency = self.request_durations.as_secs_f64() / self.request_counts as f64; - println!( - "Total: {} reqs, {:.2} req/s, avg latency: {:.2} ms", - self.request_counts, - rps, - avg_latency * 1000.0 - ); - self.last_reported_at = Instant::now(); - } + pub fn print_report(&self) { + let request_counts = self.request_counts.load(std::sync::atomic::Ordering::Relaxed); + let request_durations_micros = self.request_durations_micros.load(std::sync::atomic::Ordering::Relaxed); + let rps = request_counts as f64 / self.first_reported_at.elapsed().as_secs_f64(); + let avg_latency = request_durations_micros as f64 / request_counts as f64; + println!( + "Total requests: {}, RPS: {:.2}, Avg latency: {:.2} ms", + request_counts, rps, avg_latency / 1000.0 + ); + } + + pub fn report_results(&self, _: QueryType, latency: Duration) -> () { + self.request_counts + .fetch_add(1, std::sync::atomic::Ordering::Relaxed); + self.request_durations_micros.fetch_add( + latency.as_micros() as usize, + std::sync::atomic::Ordering::Relaxed, + ); } } @@ -95,7 +100,7 @@ impl fmt::Display for QueryType { } } -impl Reporter for PercentileReporter { +impl PercentileReporter { fn new(period: Duration) -> Self { PercentileReporter { period,