diff --git a/edb/server/conn_pool/src/pool.rs b/edb/server/conn_pool/src/pool.rs index 6d2d57bf154d..18a664eff22c 100644 --- a/edb/server/conn_pool/src/pool.rs +++ b/edb/server/conn_pool/src/pool.rs @@ -734,4 +734,402 @@ mod tests { run(spec).await } + + #[test(tokio::test(flavor = "current_thread", start_paused = true))] + async fn test_connpool_5() -> Result<()> { + let mut dbs = vec![]; + + for i in 0..6 { + dbs.push(DBSpec { + db: format!("t{i}"), + start_at: 0.0 + i as f64 / 10.0, + end_at: 0.5 + i as f64 / 10.0, + qps: 150, + query_cost: Triangle(0.020, 0.005), + }); + } + for i in 6..12 { + dbs.push(DBSpec { + db: format!("t{i}"), + start_at: 0.3, + end_at: 0.7, + qps: 50, + query_cost: Triangle(0.008, 0.003), + }); + } + for i in 0..6 { + dbs.push(DBSpec { + db: format!("t{i}"), + start_at: 0.6, + end_at: 0.8, + qps: 50, + query_cost: Triangle(0.003, 0.002), + }); + } + + let spec = Spec { + capacity: 6, + conn_cost: Triangle(0.15, 0.05), + score: vec![ + LatencyDistribution { + score: Score { + v100: 0.0, + v90: 0.4, + v60: 0.8, + v0: 2.0, + weight: 0.05, + }, + group: 0..6, + } + .into(), + LatencyDistribution { + score: Score { + v100: 0.0, + v90: 0.4, + v60: 0.8, + v0: 2.0, + weight: 0.25, + }, + group: 6..12, + } + .into(), + LatencyRatio { + score: Score { + v100: 30.0, + v90: 5.0, + v60: 2.0, + v0: 1.0, + weight: 0.45, + }, + percentile: 75, + dividend: 0..6, + divisor: 6..12, + } + .into(), + ConnectionOverhead { + score: Score { + v100: 0.0, + v90: 0.1, + v60: 0.2, + v0: 0.5, + weight: 0.15, + }, + } + .into(), + EndingCapacity { + score: Score { + v100: 6.0, + v90: 5.0, + v60: 4.0, + v0: 3.0, + weight: 0.10, + }, + } + .into(), + ], + dbs, + ..Default::default() + }; + + run(spec).await + } + + #[test(tokio::test(flavor = "current_thread", start_paused = true))] + async fn test_connpool_6() -> Result<()> { + let mut dbs = vec![]; + + for i in 0..6 { + dbs.push(DBSpec { + db: "t0".into(), + start_at: 0.0 + i as f64 / 10.0, + end_at: 0.5 + i as f64 / 10.0, + qps: 150, + query_cost: Triangle(0.020, 0.005), + }); + } + + let spec = Spec { + capacity: 6, + conn_cost: Triangle(0.15, 0.05), + score: vec![ConnectionOverhead { + score: Score { + v100: 0.0, + v90: 0.1, + v60: 0.2, + v0: 0.5, + weight: 1.0, + }, + } + .into()], + dbs, + ..Default::default() + }; + + run(spec).await + } + + #[test(tokio::test(flavor = "current_thread", start_paused = true))] + async fn test_connpool_7() -> Result<()> { + let mut dbs = vec![]; + + dbs.push(DBSpec { + db: "t1".into(), + start_at: 0.0, + end_at: 1.0, + qps: 500, + query_cost: Triangle(0.040, 0.005), + }); + + dbs.push(DBSpec { + db: "t2".into(), + start_at: 0.1, + end_at: 0.3, + qps: 30, + query_cost: Triangle(0.030, 0.005), + }); + + dbs.push(DBSpec { + db: "t2".into(), + start_at: 0.6, + end_at: 0.9, + qps: 30, + query_cost: Triangle(0.010, 0.005), + }); + + let spec = Spec { + capacity: 6, + conn_cost: Triangle(0.15, 0.05), + score: vec![ + LatencyRatio { + percentile: 99, + dividend: 1..2, + divisor: 2..3, + score: Score { + v100: 100.0, + v90: 50.0, + v60: 10.0, + v0: 1.0, + weight: 0.2, + }, + } + .into(), + LatencyRatio { + percentile: 75, + dividend: 1..2, + divisor: 2..3, + score: Score { + v100: 200.0, + v90: 100.0, + v60: 20.0, + v0: 1.0, + weight: 0.4, + }, + } + .into(), + ConnectionOverhead { + score: Score { + v100: 0.0, + v90: 0.1, + v60: 0.2, + v0: 0.5, + weight: 0.4, + }, + } + .into(), + ], + dbs, + ..Default::default() + }; + + run(spec).await + } + + #[test(tokio::test(flavor = "current_thread", start_paused = true))] + async fn test_connpool_8() -> Result<()> { + let mut dbs = vec![]; + let base_load = 200; + dbs.push(DBSpec { + db: "t1".into(), + start_at: 0.0, + end_at: 0.1, + qps: base_load / 4, + query_cost: Triangle(0.01, 0.0), + }); + + dbs.push(DBSpec { + db: "t1".into(), + start_at: 0.1, + end_at: 0.2, + qps: base_load / 2, + query_cost: Triangle(0.01, 0.0), + }); + + dbs.push(DBSpec { + db: "t1".into(), + start_at: 0.2, + end_at: 0.6, + qps: base_load, + query_cost: Triangle(0.01, 0.0), + }); + + let spec = Spec { + capacity: 100, + conn_cost: Triangle(0.0, 0.0), + score: vec![ConnectionOverhead { + score: Score { + v100: 0.0, + v90: 0.1, + v60: 0.2, + v0: 0.5, + weight: 1.0, + }, + } + .into()], + dbs, + ..Default::default() + }; + + run(spec).await + } + + #[test(tokio::test(flavor = "current_thread", start_paused = true))] + async fn test_connpool_9() -> Result<()> { + let full_qps = 10000; + let spec = Spec { + capacity: 100, + conn_cost: Triangle(0.01, 0.005), + score: vec![ + LatencyDistribution { + score: Score { + weight: 0.1, + v100: 0.2, + v90: 0.5, + v60: 1.0, + v0: 2.0, + }, + group: 1..4, + } + .into(), + AbsoluteLatency { + score: Score { + weight: 0.1, + v100: 0.001, + v90: 0.002, + v60: 0.004, + v0: 0.05, + }, + group: 1..4, + percentile: 99, + } + .into(), + AbsoluteLatency { + score: Score { + weight: 0.2, + v100: 0.0001, + v90: 0.0002, + v60: 0.0004, + v0: 0.005, + }, + group: 1..4, + percentile: 75, + } + .into(), + ConnectionOverhead { + score: Score { + v100: 0.0, + v90: 0.1, + v60: 0.2, + v0: 0.5, + weight: 0.6, + }, + } + .into(), + ], + dbs: vec![ + DBSpec { + db: String::from("t1"), + start_at: 0.0, + end_at: 0.1, + qps: (full_qps / 32), + query_cost: Triangle(0.01, 0.005), + }, + DBSpec { + db: String::from("t1"), + start_at: 0.1, + end_at: 0.4, + qps: (full_qps / 16), + query_cost: Triangle(0.01, 0.005), + }, + DBSpec { + db: String::from("t2"), + start_at: 0.5, + end_at: 0.6, + qps: (full_qps / 32), + query_cost: Triangle(0.01, 0.005), + }, + DBSpec { + db: String::from("t2"), + start_at: 0.6, + end_at: 1.0, + qps: (full_qps / 16), + query_cost: Triangle(0.01, 0.005), + }, + DBSpec { + db: String::from("t3"), + start_at: 0.7, + end_at: 0.8, + qps: (full_qps / 16), + query_cost: Triangle(0.01, 0.005), + }, + DBSpec { + db: String::from("t3"), + start_at: 0.8, + end_at: 0.9, + qps: (full_qps / 8), + query_cost: Triangle(0.01, 0.005), + }, + ], + ..Default::default() + }; + run(spec).await + } + + #[test(tokio::test(flavor = "current_thread", start_paused = true))] + async fn test_connpool_10() -> Result<()> { + let full_qps = 10000; + let spec = Spec { + timeout: 10, + duration: 1.1, + capacity: 100, + conn_cost: Triangle(0.01, 0.005), + score: vec![EndingCapacity { + score: Score { + weight: 1.0, + v100: 10.0, + v90: 20.0, + v60: 40.0, + v0: 100.0, + }, + } + .into()], + dbs: vec![ + DBSpec { + db: String::from("t1"), + start_at: 0.0, + end_at: 1.0, + qps: (full_qps / 32), + query_cost: Triangle(0.01, 0.005), + }, + DBSpec { + db: String::from("t2"), + start_at: 0.4, + end_at: 0.6, + qps: ((full_qps / 32) * 31), + query_cost: Triangle(0.01, 0.005), + }, + ], + ..Default::default() + }; + run(spec).await + } } diff --git a/edb/server/conn_pool/src/test.rs b/edb/server/conn_pool/src/test.rs index a5a787709982..e6ca9c3f8ccb 100644 --- a/edb/server/conn_pool/src/test.rs +++ b/edb/server/conn_pool/src/test.rs @@ -275,6 +275,65 @@ impl ScoringMethod for ConnectionOverhead { } } +pub struct LatencyRatio { + pub score: Score, + pub percentile: u8, + pub dividend: Range, + pub divisor: Range, +} + +impl ScoringMethod for LatencyRatio { + fn score(&self, latencies: &Latencies, metrics: &PoolMetrics) -> f64 { + let mut data = latencies.data.borrow_mut(); + let dbs = self.divisor.clone().map(|t| format!("t{t}")).collect_vec(); + let divisor = dbs + .iter() + .map(|db| { + let mut data = Data::new(data.get_mut(db).expect(db).as_mut_slice()); + data.percentile(self.percentile as _) + }) + .mean(); + let dbs = self.dividend.clone().map(|t| format!("t{t}")).collect_vec(); + let dividend = dbs + .iter() + .map(|db| { + let mut data = Data::new(data.get_mut(db).expect(db).as_mut_slice()); + data.percentile(self.percentile as _) + }) + .mean(); + info!( + "P{} ratio {:?}/{:?} {dividend}/{divisor} = {}", + self.percentile, + self.dividend, + self.divisor, + self.score.calculate(dividend / divisor) + ); + self.score.calculate(dividend / divisor) * self.score.weight + } +} + +pub struct EndingCapacity { + pub score: Score, +} + +impl ScoringMethod for EndingCapacity { + fn score(&self, latencies: &Latencies, metrics: &PoolMetrics) -> f64 { + self.score.weight + } +} + +pub struct AbsoluteLatency { + pub score: Score, + pub percentile: u8, + pub group: Range, +} + +impl ScoringMethod for AbsoluteLatency { + fn score(&self, latencies: &Latencies, metrics: &PoolMetrics) -> f64 { + self.score.weight + } +} + #[derive(Debug)] pub struct DBSpec { pub db: String,