Skip to content
This repository has been archived by the owner on Jun 6, 2024. It is now read-only.

Commit

Permalink
refine benchmark
Browse files Browse the repository at this point in the history
  • Loading branch information
xx01cyx committed Apr 30, 2024
1 parent c18e33e commit b1f7ac8
Showing 1 changed file with 42 additions and 29 deletions.
71 changes: 42 additions & 29 deletions src/benchmark.rs
Original file line number Diff line number Diff line change
@@ -1,66 +1,80 @@
use istziio_client::client_api::{StorageRequest, StorageClient};
use csv;
use istziio_client::client_api::{StorageClient, StorageRequest};
use std::error::Error;
use std::path::PathBuf;
use std::thread::sleep;
use std::time::Instant;
use tokio::sync::mpsc;
use std::time::{Duration, SystemTime};
use std::thread::sleep;
use std::collections::VecDeque;
use std::error::Error;
use csv;
use tokio::sync::mpsc;

// This scans the bench_files dir to figure out which test files are present,
// then builds a map of TableId -> filename to init storage client(only when catalog is not available)
// and also generates workload based on table ids. Finally it runs the workload



pub struct TraceEntry {
pub timestamp: u64,
pub request: StorageRequest
pub request: StorageRequest,
}

pub enum ClientType {
Client1(),
Client2()
Client2(),
}

pub fn parse_trace(trace_path: PathBuf) -> Result<VecDeque<TraceEntry>, Box<dyn Error>> {
let mut trace: VecDeque<TraceEntry> = VecDeque::new();
pub fn parse_trace(trace_path: PathBuf) -> Result<Vec<TraceEntry>, Box<dyn Error>> {
let mut rdr = csv::Reader::from_path(trace_path)?;
let mut traces = Vec::new();
for result in rdr.records() {
// The iterator yields Result<StringRecord, Error>, so we check the
// error here.
let record = result?;
trace.push_back(TraceEntry{timestamp: record.get(0).unwrap().parse().unwrap(), request: StorageRequest::Table(record.get(1).unwrap().parse().unwrap())});
traces.push(TraceEntry {
timestamp: record.get(0).unwrap().parse().unwrap(),
request: StorageRequest::Table(record.get(1).unwrap().parse().unwrap()),
});
}
Ok(trace)
Ok(traces)
}
pub async fn run_trace(mut trace: VecDeque<TraceEntry>, client_builder: &dyn Fn() -> Box<dyn StorageClient>) {

pub async fn run_trace(
traces: Vec<TraceEntry>,
client_builder: &dyn Fn() -> Box<dyn StorageClient>,
) {
let start_time = SystemTime::now();
let request_num = trace.len();
let request_num = traces.len();
let (tx, mut rx) = mpsc::channel(32);
while !trace.is_empty() {
let next_entry = trace.pop_front().unwrap();
if let Some(diff) = Duration::from_millis(next_entry.timestamp).checked_sub(start_time.elapsed().unwrap()) {
for (i, trace) in traces.into_iter().enumerate() {
if let Some(diff) =
Duration::from_millis(trace.timestamp).checked_sub(start_time.elapsed().unwrap())
{
sleep(diff);
}
println!("next trace: {}", next_entry.timestamp);
let tx = tx.clone();
let client = client_builder();
tokio::spawn(async move {
let table_id = match next_entry.request {
let table_id = match trace.request {
StorageRequest::Table(id) => id,
_ => panic!("Invalid request type"),
};
println!("start thread reading {}", table_id);
println!(
"Trace {} sends request for table {} at timestamp {}",
i, table_id, trace.timestamp
);
let client_start = Instant::now();
let req = next_entry.request;
let req = trace.request;

let res = client.request_data_sync(req.clone()).await;
if let Err(e) = res {
println!("Error: {}", e);
let res = client.request_data(req.clone()).await;
if res.is_err() {
println!("Error: {}", res.as_ref().err().unwrap());
}
let mut rx = res.unwrap();
let mut total_num_rows = 0;
while let Some(rb) = rx.recv().await {
total_num_rows += rb.num_rows();
}
let client_duration = client_start.elapsed();
println!(
"Trace {} gets {} rows from the client, latency is {:?}",
i, total_num_rows, client_duration
);
tx.send(client_duration).await.unwrap();
});
}
Expand All @@ -69,7 +83,6 @@ pub async fn run_trace(mut trace: VecDeque<TraceEntry>, client_builder: &dyn Fn(
let mut duration_sum = Duration::new(0, 0);
for _ in 0..request_num {
let client_duration = rx.recv().await.unwrap();
println!("Client latency: {:?}", client_duration);
duration_sum += client_duration;
}

Expand Down

0 comments on commit b1f7ac8

Please sign in to comment.