Skip to content
This repository has been archived by the owner on Jun 6, 2024. It is now read-only.

Commit

Permalink
Merge branch 'main' into group2-multi-instance-benchmark
Browse files Browse the repository at this point in the history
  • Loading branch information
J-HowHuang authored May 1, 2024
2 parents 7e757f0 + a1b0841 commit f354468
Show file tree
Hide file tree
Showing 4 changed files with 171 additions and 54 deletions.
80 changes: 80 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,4 @@ rocket = { version = "0.5.0", features = ["json"] }
reqwest = { version = "0.11", features = ["stream", "json"] }
csv = "1.3.0"
istziio-client = "0.1.6"
prettytable = "0.10.0"
102 changes: 70 additions & 32 deletions src/benchmark.rs
Original file line number Diff line number Diff line change
@@ -1,78 +1,116 @@
use istziio_client::client_api::{StorageRequest, StorageClient};
use istziio_client::client_api::{StorageClient, StorageRequest};
use prettytable::{Cell, Row, Table};
use std::error::Error;
use std::path::PathBuf;
use std::thread::sleep;
use std::time::Instant;
use tokio::sync::mpsc;
use std::time::{Duration, SystemTime};
use std::thread::sleep;
use std::collections::VecDeque;
use std::error::Error;
use csv;
use tokio::sync::mpsc;

// This scans the bench_files dir to figure out which test files are present,
// then builds a map of TableId -> filename to init storage client(only when catalog is not available)
// and also generates workload based on table ids. Finally it runs the workload



pub struct TraceEntry {
pub timestamp: u64,
pub request: StorageRequest
pub request: StorageRequest,
}

pub enum ClientType {
Client1(),
Client2()
Client2(),
}

pub fn parse_trace(trace_path: PathBuf) -> Result<VecDeque<TraceEntry>, Box<dyn Error>> {
let mut trace: VecDeque<TraceEntry> = VecDeque::new();
pub fn parse_trace(trace_path: PathBuf) -> Result<Vec<TraceEntry>, Box<dyn Error>> {
let mut rdr = csv::Reader::from_path(trace_path)?;
let mut traces = Vec::new();
for result in rdr.records() {
// The iterator yields Result<StringRecord, Error>, so we check the
// error here.
let record = result?;
trace.push_back(TraceEntry{timestamp: record.get(0).unwrap().parse().unwrap(), request: StorageRequest::Table(record.get(1).unwrap().parse().unwrap())});
traces.push(TraceEntry {
timestamp: record.get(0).unwrap().parse().unwrap(),
request: StorageRequest::Table(record.get(1).unwrap().parse().unwrap()),
});
}
Ok(trace)
Ok(traces)
}
pub async fn run_trace(mut trace: VecDeque<TraceEntry>, client_builder: &dyn Fn() -> Box<dyn StorageClient>) {

pub async fn run_trace(
traces: Vec<TraceEntry>,
client_builder: &dyn Fn() -> Box<dyn StorageClient>,
) {
let start_time = SystemTime::now();
let request_num = trace.len();
let request_num = traces.len();
let (tx, mut rx) = mpsc::channel(32);
while !trace.is_empty() {
let next_entry = trace.pop_front().unwrap();
if let Some(diff) = Duration::from_millis(next_entry.timestamp).checked_sub(start_time.elapsed().unwrap()) {
for (i, trace) in traces.into_iter().enumerate() {
if let Some(diff) =
Duration::from_millis(trace.timestamp).checked_sub(start_time.elapsed().unwrap())
{
sleep(diff);
}
println!("next trace: {}", next_entry.timestamp);
let tx = tx.clone();
let client = client_builder();
tokio::spawn(async move {
let table_id = match next_entry.request {
let table_id = match trace.request {
StorageRequest::Table(id) => id,
_ => panic!("Invalid request type"),
};
println!("start thread reading {}", table_id);
let client_start = Instant::now();
let req = next_entry.request;

let res = client.request_data_sync(req.clone()).await;
if let Err(e) = res {
println!("Error: {}", e);
let res = client.request_data(trace.request).await;
if res.is_err() {
println!("Error: {}", res.as_ref().err().unwrap());
}
let mut rx = res.unwrap();
let mut total_num_rows = 0;
while let Some(rb) = rx.recv().await {
total_num_rows += rb.num_rows();
}
let client_duration = client_start.elapsed();
tx.send(client_duration).await.unwrap();
tx.send((
i,
table_id,
total_num_rows,
trace.timestamp,
client_duration,
))
.await
.unwrap();
});
}

// Collect and print client latencies
let mut duration_sum = Duration::new(0, 0);
let mut rows = Vec::new();
for _ in 0..request_num {
let client_duration = rx.recv().await.unwrap();
println!("Client latency: {:?}", client_duration);
duration_sum += client_duration;
let tuple = rx.recv().await.unwrap();
rows.push(tuple);
duration_sum += tuple.4;
}

// Sort rows based on the first element of the tuple
rows.sort_by(|a, b| a.0.cmp(&b.0));

// Construct a table to print the results
let mut table = Table::new();
table.add_row(Row::new(vec![
Cell::new("Trace ID"),
Cell::new("File ID"),
Cell::new("Num Rows"),
Cell::new("Arrival Time"),
Cell::new("Wait Time"),
]));

for row in rows {
table.add_row(Row::new(vec![
Cell::new(&row.0.to_string()),
Cell::new(&row.1.to_string()),
Cell::new(&row.2.to_string()),
Cell::new(&row.3.to_string()),
Cell::new(&row.4.as_millis().to_string()),
]));
}

let avg_duration = duration_sum.div_f32(request_num as f32);
table.printstd();
println!("Average duration: {:?}", avg_duration);
}
42 changes: 20 additions & 22 deletions traces/trace_100m.csv
Original file line number Diff line number Diff line change
@@ -1,23 +1,21 @@
timestamp,file_index
0,14
282,10
724,12
999,10
1483,10
1881,10
1974,10
2427,19
2555,10
2943,17
3412,12
3702,10
4011,10
4491,11
4827,12
4912,11
5376,11
5547,10
5579,11
5788,14


0,11
8,10
365,10
688,10
1123,10
1203,10
1213,11
1406,11
1854,10
2200,14
2483,10
2660,10
2999,10
3017,10
3428,10
3769,16
3861,10
4354,10
4538,12
4572,11

0 comments on commit f354468

Please sign in to comment.