Skip to content

Commit

Permalink
Port other tests over to spec tests
Browse files Browse the repository at this point in the history
  • Loading branch information
mmastrac committed Jul 5, 2024
1 parent a4bef97 commit 8d9e5f6
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 74 deletions.
2 changes: 1 addition & 1 deletion edb/server/conn_pool/src/block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -492,7 +492,7 @@ impl<C: Connector> VisitPoolAlgoData<Block<C, PoolAlgoTargetData>>
if block.is_empty() && block.data.demand() == 0 {
false
} else {
f(&name, &block);
f(name, block);
true
}
});
Expand Down
108 changes: 39 additions & 69 deletions edb/server/conn_pool/src/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,7 @@ mod tests {
use crate::time::Instant;
use crate::{block::Name, test::*};
use anyhow::{Ok, Result};
use itertools::Itertools;
use rstest::rstest;
use std::rc::Rc;
use test_log::test;
Expand Down Expand Up @@ -279,70 +280,39 @@ mod tests {

#[test(tokio::test(flavor = "current_thread", start_paused = true))]
#[rstest]
#[case::one(1)]
#[case::small(10)]
#[case::medium(12)]
#[case::large(20)]
async fn test_pool(#[case] databases: usize) -> Result<()> {
let config = PoolConfig::suggested_default_for(10);
let spec = Spec {
name: format!("test_pool_{databases}").into(),
desc: "",
capacity: 10,
conn_cost: Triangle(0.05, 0.0),
score: vec![
Score::new(
0.8,
[2.0, 0.5, 0.25, 0.0],
LatencyDistribution {
group: 0..databases,
},
),
Score::new(0.2, [0.5, 0.2, 0.1, 0.0], ConnectionOverhead {}),
],
dbs: (0..databases)
.map(|db| DBSpec {
db,
start_at: 0.0,
end_at: 1.0,
qps: 1200,
query_cost: Triangle(0.001, 0.0),
})
.collect_vec(),
..Default::default()
};

let local = LocalSet::new();
let start = Instant::now();
const CONNECTIONS: usize = 10000;
info!("Starting tasks");
let real_time = std::time::Instant::now();
local
.run_until(async {
let mut tasks = vec![];
let pool = Rc::new(Pool::new(
config,
BasicConnector::delay(|disconnect| {
if disconnect {
Duration::from_millis(5)
} else {
Duration::from_millis(50)
}
}),
));
for i in 0..CONNECTIONS {
let pool = pool.clone();
let task = tokio::task::spawn_local(async move {
let db = Name::from(i % databases);
trace!("In local task for connection {i} (using {db})");
let conn = pool.acquire(&db).await?;
virtual_sleep(Duration::from_millis(500)).await;
drop(conn);
Ok(())
});
tasks.push(task);
}
let monitor = tokio::task::spawn_local(async move {
loop {
let mut s = "".to_owned();
for (name, block) in pool.metrics().blocks {
s += &format!("{name}={} ", block.total);
}
trace!(
"Blocks: {}/{} {s}",
pool.metrics().pool.total,
pool.config.constraints.max
);
virtual_sleep(Duration::from_millis(100)).await;
}
});
for task in tasks {
task.await??;
}
monitor.abort();
// let metrics = pool.metrics();
// info!("{metrics:?}");
Ok(())
})
.await?;
info!(
"Took {:?} of virtual time ({:?} real time) for {CONNECTIONS} connections to {databases} databases",
start.elapsed(), real_time.elapsed()
);
Ok(())
run(spec).await
}

async fn run(spec: Spec) -> Result<()> {
Expand Down Expand Up @@ -506,7 +476,7 @@ mod tests {
}

let spec = Spec {
name: "test_connpool_1",
name: "test_connpool_1".into(),
desc: r#"
This is a test for Mode D, where 2 groups of blocks race for connections
in the pool with max capacity set to 6. The first group (0-5) has more
Expand Down Expand Up @@ -575,7 +545,7 @@ mod tests {
}

let spec = Spec {
name: "test_connpool_2",
name: "test_connpool_2".into(),
desc: r#"
In this test, we have 6x1500qps connections that simulate fast
queries (0.001..0.006s), and 6x700qps connections that simulate
Expand Down Expand Up @@ -625,7 +595,7 @@ mod tests {
}

let spec = Spec {
name: "test_connpool_3",
name: "test_connpool_3".into(),
desc: r#"
This test simply starts 6 same crazy requesters for 6 databases to
test the pool fairness in Mode C with max capacity of 100.
Expand Down Expand Up @@ -661,7 +631,7 @@ mod tests {
}

let spec = Spec {
name: "test_connpool_4",
name: "test_connpool_4".into(),
desc: r#"
Similar to test 3, this test also has 6 requesters for 6 databases,
they have the same Q/s but with different query cost. In Mode C,
Expand Down Expand Up @@ -718,7 +688,7 @@ mod tests {
}

let spec = Spec {
name: "test_connpool_5",
name: "test_connpool_5".into(),
desc: r#"
This is a mixed test with pool max capacity set to 6. Requests in
the first group (0-5) come and go alternatively as time goes on,
Expand Down Expand Up @@ -782,7 +752,7 @@ mod tests {
}

let spec = Spec {
name: "test_connpool_6",
name: "test_connpool_6".into(),
desc: r#"
This is a simple test for Mode A. In this case, we don't want to
have lots of reconnection overhead.
Expand All @@ -800,7 +770,7 @@ mod tests {
#[test(tokio::test(flavor = "current_thread", start_paused = true))]
async fn test_connpool_7() -> Result<()> {
let spec = Spec {
name: "test_connpool_7",
name: "test_connpool_7".into(),
desc: r#"
The point of this test is to have one connection "t1" that
just has crazy demand for connections. Then the "t2" connections
Expand Down Expand Up @@ -866,7 +836,7 @@ mod tests {
async fn test_connpool_8() -> Result<()> {
let base_load = 200;
let spec = Spec {
name: "test_connpool_8",
name: "test_connpool_8".into(),
desc: r#"
This test spec is to check the pool connection reusability with a
single block before the pool reaches its full capacity in Mode A.
Expand Down Expand Up @@ -909,7 +879,7 @@ mod tests {
async fn test_connpool_9() -> Result<()> {
let full_qps = 20000;
let spec = Spec {
name: "test_connpool_9",
name: "test_connpool_9".into(),
desc: r#"
This test spec is to check the pool performance with low traffic
between 3 pre-heated blocks in Mode B. t1 is a reference block,
Expand Down Expand Up @@ -996,7 +966,7 @@ mod tests {
async fn test_connpool_10() -> Result<()> {
let full_qps = 2000;
let spec = Spec {
name: "test_connpool_10",
name: "test_connpool_10".into(),
desc: r#"
This test spec is to check the pool garbage collection feature.
t1 is a constantly-running reference block, t2 starts in the middle
Expand Down
14 changes: 10 additions & 4 deletions edb/server/conn_pool/src/test.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
//! Test utilities.
use std::{
cell::RefCell, collections::HashMap, future::Future, ops::Range, rc::Rc, time::Duration,
borrow::Cow, cell::RefCell, collections::HashMap, future::Future, ops::Range, rc::Rc,
time::Duration,
};

use itertools::Itertools;
Expand Down Expand Up @@ -91,7 +92,7 @@ fn m(v: &f64) -> Duration {
#[derive(derive_more::Debug)]
#[allow(unused)]
#[debug(
"%{{1,25,50,75,99}}: {:?}/{:?}/{:?}/{:?}/{:?}, x̄: {:?} Πx: {:?}",
"#{count} %{{1,25,50,75,99}}: {:?}/{:?}/{:?}/{:?}/{:?}, x̄: {:?} Πx: {:?}",
m(p01),
m(p25),
m(p50),
Expand Down Expand Up @@ -165,7 +166,7 @@ fn stats(data: &mut [f64]) -> Stats {

#[derive(smart_default::SmartDefault)]
pub struct Spec {
pub name: &'static str,
pub name: Cow<'static, str>,
pub desc: &'static str,
#[default = 30]
pub timeout: usize,
Expand Down Expand Up @@ -264,7 +265,12 @@ impl ScoringMethod for LatencyDistribution {
})
.collect_vec(),
);
decile.std_dev().unwrap() / decile.mean().unwrap()
let cv = decile.std_dev().unwrap_or_default() / decile.mean().unwrap_or_default();
if cv.is_normal() {
cv
} else {
0.0
}
})
.collect_vec();
let mean = values.iter().geometric_mean();
Expand Down

0 comments on commit 8d9e5f6

Please sign in to comment.