Skip to content
This repository was archived by the owner on Jun 6, 2024. It is now read-only.

Commit cce4f42

Browse files
committed
merge refinement
2 parents 2760fcc + 13488fa commit cce4f42

File tree

1 file changed

+26
-21
lines changed

1 file changed

+26
-21
lines changed

src/benchmark.rs

Lines changed: 26 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,4 @@
1-
use csv;
21
use istziio_client::client_api::{StorageClient, StorageRequest};
3-
use std::collections::VecDeque;
42
use std::error::Error;
53
use std::path::PathBuf;
64
use std::thread::sleep;
@@ -22,51 +20,59 @@ pub enum ClientType {
2220
Client2(),
2321
}
2422

25-
pub fn parse_trace(trace_path: PathBuf) -> Result<VecDeque<TraceEntry>, Box<dyn Error>> {
26-
let mut trace: VecDeque<TraceEntry> = VecDeque::new();
23+
pub fn parse_trace(trace_path: PathBuf) -> Result<Vec<TraceEntry>, Box<dyn Error>> {
2724
let mut rdr = csv::Reader::from_path(trace_path)?;
25+
let mut traces = Vec::new();
2826
for result in rdr.records() {
29-
// The iterator yields Result<StringRecord, Error>, so we check the
30-
// error here.
3127
let record = result?;
32-
trace.push_back(TraceEntry {
28+
traces.push(TraceEntry {
3329
timestamp: record.get(0).unwrap().parse().unwrap(),
3430
request: StorageRequest::Table(record.get(1).unwrap().parse().unwrap()),
3531
});
3632
}
37-
Ok(trace)
33+
Ok(traces)
3834
}
35+
3936
pub async fn run_trace(
40-
mut trace: VecDeque<TraceEntry>,
37+
traces: Vec<TraceEntry>,
4138
client_builder: &dyn Fn() -> Box<dyn StorageClient>,
4239
) {
4340
let start_time = SystemTime::now();
44-
let request_num = trace.len();
41+
let request_num = traces.len();
4542
let (tx, mut rx) = mpsc::channel(32);
46-
while !trace.is_empty() {
47-
let next_entry = trace.pop_front().unwrap();
43+
for (i, trace) in traces.into_iter().enumerate() {
4844
if let Some(diff) =
49-
Duration::from_millis(next_entry.timestamp).checked_sub(start_time.elapsed().unwrap())
45+
Duration::from_millis(trace.timestamp).checked_sub(start_time.elapsed().unwrap())
5046
{
5147
sleep(diff);
5248
}
53-
println!("next trace: {}", next_entry.timestamp);
5449
let tx = tx.clone();
5550
let client = client_builder();
5651
tokio::spawn(async move {
57-
let table_id = match next_entry.request {
52+
let table_id = match trace.request {
5853
StorageRequest::Table(id) => id,
5954
_ => panic!("Invalid request type"),
6055
};
61-
println!("start thread reading {}", table_id);
56+
println!(
57+
"Trace {} sends request for table {} at timestamp {}",
58+
i, table_id, trace.timestamp
59+
);
6260
let client_start = Instant::now();
63-
let req = next_entry.request;
6461

65-
let res = client.request_data(req.clone()).await;
66-
if let Err(e) = res {
67-
println!("Error: {}", e);
62+
let res = client.request_data(trace.request).await;
63+
if res.is_err() {
64+
println!("Error: {}", res.as_ref().err().unwrap());
65+
}
66+
let mut rx = res.unwrap();
67+
let mut total_num_rows = 0;
68+
while let Some(rb) = rx.recv().await {
69+
total_num_rows += rb.num_rows();
6870
}
6971
let client_duration = client_start.elapsed();
72+
println!(
73+
"Trace {} gets {} rows from the client, latency is {:?}",
74+
i, total_num_rows, client_duration
75+
);
7076
tx.send(client_duration).await.unwrap();
7177
});
7278
}
@@ -75,7 +81,6 @@ pub async fn run_trace(
7581
let mut duration_sum = Duration::new(0, 0);
7682
for _ in 0..request_num {
7783
let client_duration = rx.recv().await.unwrap();
78-
println!("Client latency: {:?}", client_duration);
7984
duration_sum += client_duration;
8085
}
8186

0 commit comments

Comments
 (0)