Skip to content

Commit

Permalink
executor improvements
Browse files Browse the repository at this point in the history
  • Loading branch information
Sammers21 committed Dec 4, 2024
1 parent 753498b commit 14f6e9c
Show file tree
Hide file tree
Showing 5 changed files with 124 additions and 96 deletions.
14 changes: 14 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,5 @@ parse_duration = "2.1.1"
comfy-table = "7.1.3"
histogram = "0.11.1"
human_format = "1.1.0"
tokio_schedule = "0.3.2"
chrono = "0.4.38"
51 changes: 11 additions & 40 deletions src/executor.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::reporter::{QueryType, Reporter};
use crate::reporter::{QueryType, Reporter, SimpleReporter};

Check warning on line 1 in src/executor.rs

View workflow job for this annotation

GitHub Actions / Check

unused import: `Reporter`
use anyhow::Result;
use rand::distributions::{Alphanumeric, DistString};
use rand::{random, Rng};
Expand All @@ -14,9 +14,9 @@ use tokio::sync::{broadcast, oneshot, Mutex};
pub struct Executor {
concurrency: usize,
reads_percentage: f32,
reporter: Arc<Mutex<Box<dyn Reporter + Send>>>,
reporter: Arc<SimpleReporter>,
key_values_range: Vec<KeyValue>,
drop_test_keyspace: bool,
dont_drop_test_keyspace: bool,
}

pub struct KeyValue(String, Vec<u8>);
Expand All @@ -34,8 +34,8 @@ impl Executor {
value_blob_size: usize,
reads_percentage: f32,
total_keys: usize,
reporter: Box<dyn Reporter + Send>,
drop_test_keyspace: bool,
reporter: Arc<SimpleReporter>,
dont_drop_test_keyspace: bool,
) -> Executor {
if reads_percentage < 0.0 || reads_percentage > 1.0 {
panic!("Reads percentage must be between 0.0 and 1.0");
Expand All @@ -48,8 +48,8 @@ impl Executor {
key_string_length,
value_blob_size,
),
reporter: Arc::new(Mutex::new(reporter)),
drop_test_keyspace,
reporter: reporter,
dont_drop_test_keyspace,
}
}

Expand All @@ -71,12 +71,6 @@ impl Executor {
.await?;
let (tx_stop_coordinator, mut rx_stop_coordinator): (Sender<()>, Receiver<()>) =
oneshot::channel();
let (tx_stop_metrics_reporter, mut rx_stop_metrics_reporter): (Sender<()>, Receiver<()>) =
oneshot::channel();
let (tx_metrics, mut rx_metrics): (
broadcast::Sender<(QueryType, Duration)>,
broadcast::Receiver<(QueryType, Duration)>,
) = broadcast::channel(10000);
println!("Inserting initial key-value pairs...");
for kv in &self.key_values_range {
perform_write(session.clone(), prepared_write.clone(), kv.clone()).await?;
Expand All @@ -87,20 +81,7 @@ impl Executor {
let key_values_range = self.key_values_range.clone();
let reads_percentage = self.reads_percentage.clone();
let reporter_clone = self.reporter.clone();
let _ = tokio::task::spawn(async move {
let mut reporter = reporter_clone.lock().await;
loop {
if rx_stop_metrics_reporter.try_recv().is_ok() {
println!("Stopping metrics reporter...");
break;
}
while let Ok(res) = rx_metrics.try_recv() {
(*reporter).report_results(res.0, res.1);
}
}
()
});
let drop_test_keyspace_clone = self.drop_test_keyspace.clone();
let dont_drop_test_keyspace_clone = self.dont_drop_test_keyspace.clone();
let coordinator_thread = tokio::task::spawn(async move {
let current_concurrency = Arc::new(AtomicUsize::new(0));
loop {
Expand All @@ -111,9 +92,7 @@ impl Executor {
while current_concurrency.load(std::sync::atomic::Ordering::Relaxed) > 0 {
tokio::time::sleep(Duration::from_millis(100)).await;
}
println!("All tasks finished, stopping metrics reporter...");
tx_stop_metrics_reporter.send(()).unwrap();
if drop_test_keyspace_clone {
if !dont_drop_test_keyspace_clone {
println!("Dropping test keyspace...");
let res = session.query_unpaged("DROP KEYSPACE test", &[]).await;
if res.is_err() {
Expand All @@ -127,12 +106,12 @@ impl Executor {
while current_concurrency.load(std::sync::atomic::Ordering::Relaxed) < concurrency {
current_concurrency.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
let session_clone = session.clone();
let tx_metrics_clone = tx_metrics.clone();
let kvs = key_values_range.clone();
let reads_percentage = reads_percentage.clone();
let pread = prepared_read.clone();
let pwrite = prepared_write.clone();
let current_concurrency_clone = Arc::clone(&current_concurrency);
let reporter_clone_clone = reporter_clone.clone();
tokio::spawn(async move {
let kv = kvs.get(rand::thread_rng().gen_range(0..kvs.len())).unwrap();
let rng = random::<f32>();
Expand All @@ -152,15 +131,7 @@ impl Executor {
} else {
let q_type = res.0;
let elapsed = res.1.unwrap();
loop {
let result = tx_metrics_clone.send((q_type, elapsed));
if result.is_ok() {
break;
} else {
println!("Error sending metrics: {:?}", result.err());
break;
}
}
reporter_clone_clone.report_results(q_type, elapsed);
}
current_concurrency_clone
.fetch_sub(1, std::sync::atomic::Ordering::Relaxed);
Expand Down
82 changes: 59 additions & 23 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use std::fmt::Debug;
use std::num::NonZeroUsize;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::Mutex;

Check warning on line 15 in src/main.rs

View workflow job for this annotation

GitHub Actions / Check

unused import: `tokio::sync::Mutex`

#[derive(Parser, Debug)]
#[command(
Expand All @@ -24,10 +25,18 @@ struct Args {
short,
long,
default_value = "1000",
help = "Number of concurrent requests at any given moment of time"
help = "Number of concurrent requests at any given moment of time per executor",
)]
pub concurrency: usize,

#[arg(
short,
long,
default_value = "1",
help = "Number of executors to run in parallel."
)]
pub executors_count: usize,

#[arg(short, long, value_parser = parse, default_value = "10s", help = "Duration of the benchmark"
)]
pub duration: Duration,
Expand Down Expand Up @@ -107,13 +116,13 @@ struct Args {
default_value = "true",
help = "Drop the keyspace after the benchmark"
)]
pub drop_test_keyspace: bool,
pub dont_drop_test_keyspace: bool,
}

fn reporter_mode(mode: String, period: Duration) -> Box<dyn Reporter + Send + 'static> {
let reporter: Box<dyn Reporter + Send + 'static> = match mode.as_str() {
"simple" => Box::new(SimpleReporter::new(period)),
"percentile" => Box::new(PercentileReporter::new(period)),
fn reporter_mode(mode: String, period: Duration) -> SimpleReporter {
let reporter: SimpleReporter = match mode.as_str() {
"simple" => SimpleReporter::new(period),
// "percentile" => Box::new(PercentileReporter::new(period)),
_ => panic!("Invalid mode: {}", mode),
};
reporter
Expand All @@ -131,7 +140,12 @@ async fn main() -> Result<()> {
.collect::<String>();
let pass = pass_first4 + &pass_after4;
println!(
"Args: duration: {}s, scylla_host: {}, pool_size: {}, user: {}, password: {}, key_string_length: {}, value_blob_size: {}, reads_percentage: {}, total_keys: {}, report_mode: {}, report_period: {}, drop_test_keyspace: {}",
"Args: \
duration: {}s, scylla_host: {}, pool_size: {},\n\
user: {}, password: {}, key_string_length: {},\n\
value_blob_size: {}, reads_percentage: {}, total_keys: {},\n\
report_mode: {}, report_period: {}s, drop_test_keyspace: {}\
executors: {}",
args.duration.as_secs_f64(),
args.scylla_hosts,
args.pool_size,
Expand All @@ -143,7 +157,8 @@ async fn main() -> Result<()> {
args.total_keys,
args.report_mode,
args.report_period.as_secs_f64(),
args.drop_test_keyspace
args.dont_drop_test_keyspace,
args.executors_count
);
let hosts_split = args.scylla_hosts.split(",");
let mut builder = SessionBuilder::new()
Expand All @@ -156,21 +171,42 @@ async fn main() -> Result<()> {
builder = builder.known_node(host);
}
let session: Arc<GenericSession<CurrentDeserializationApi>> = Arc::new(builder.build().await?);
let reporter = reporter_mode(args.report_mode, args.report_period);
let mut executor = executor::Executor::new(
args.concurrency,
args.key_string_length,
args.value_blob_size,
args.reads_percentage,
args.total_keys,
reporter,
args.drop_test_keyspace,
);
let (stop_sender, executor_thread) = executor.start(session).await?;
tokio::time::sleep(args.duration).await;
if let Err(e) = stop_sender.send(()) {
println!("Error sending stop signal: {:?}", e);
let reporter = Arc::new(reporter_mode(args.report_mode, args.report_period));
let reporter_clone_for_thread = reporter.clone();
tokio::spawn(async move {
loop {
tokio::time::sleep(args.report_period).await;
reporter_clone_for_thread.print_report();
}
});
let mut handles = Vec::new();
for i in 0..args.executors_count {
let i_clone = i.clone();
let reporter_clone = reporter.clone();
let session_clone = session.clone();
let handle = tokio::spawn(async move {
let mut executor = executor::Executor::new(
args.concurrency,
args.key_string_length,
args.value_blob_size,
args.reads_percentage,
args.total_keys,
reporter_clone,
args.dont_drop_test_keyspace,
);
let (stop_sender, executor_thread) = executor.start(session_clone).await.unwrap();
tokio::time::sleep(args.duration).await;
println!("Requesting stop since the duration has passed");
if let Err(e) = stop_sender.send(()) {
println!("Error sending stop signal: {:?}", e);
}
executor_thread.await.unwrap();
println!("Executor #{} done", i_clone +1);
});
handles.push(handle);
}
for handle in handles {
handle.await?;
}
executor_thread.await?;
Ok(())
}
Loading

0 comments on commit 14f6e9c

Please sign in to comment.