From 3a844b395f5c6e155ef82ebd9bbc2326633948f3 Mon Sep 17 00:00:00 2001 From: Matt Mastracci Date: Thu, 4 Jul 2024 17:30:33 -0600 Subject: [PATCH] Tests passing --- edb/server/conn_pool/src/algo.rs | 112 +++++++++++++++++++++++------- edb/server/conn_pool/src/block.rs | 31 +++++++-- edb/server/conn_pool/src/conn.rs | 12 +++- edb/server/conn_pool/src/pool.rs | 25 ++++--- edb/server/conn_pool/src/test.rs | 21 ++++-- 5 files changed, 150 insertions(+), 51 deletions(-) diff --git a/edb/server/conn_pool/src/algo.rs b/edb/server/conn_pool/src/algo.rs index c3da44849903..eba542a21fed 100644 --- a/edb/server/conn_pool/src/algo.rs +++ b/edb/server/conn_pool/src/algo.rs @@ -20,6 +20,8 @@ const DEMAND_WEIGHT_WAITING: usize = 1; const DEMAND_WEIGHT_ACTIVE: usize = 1; /// The historical length of data we'll maintain for demand. const DEMAND_HISTORY_LENGTH: usize = 4; +/// The minimum non-zero demand. +const DEMAND_MINIMUM: usize = 16; /// The maximum-minimum connection count we'll allocate to connections if there /// is more capacity than backends. const MAXIMUM_SHARED_TARGET: usize = 1; @@ -46,6 +48,8 @@ pub enum RebalanceOp { Transfer { to: Name, from: Name }, /// Create a block Create(Name), + /// Garbage collect a block. + Close(Name), } #[derive(Debug, Clone, PartialEq, Eq)] @@ -105,11 +109,12 @@ pub trait PoolAlgorithmDataBlock: PoolAlgorithmDataMetrics { /// Returns an `Option` containing the hunger score if the current state is below the target /// and there are waiting elements; otherwise, returns `None`. fn hunger_score(&self, will_release: bool) -> Option { - let waiters = self - .count(MetricVariant::Waiting) - .saturating_sub(self.count(MetricVariant::Connecting)); + let waiting = self.count(MetricVariant::Waiting); + let connecting = self.count(MetricVariant::Connecting); + let waiters = waiting.saturating_sub(connecting); let current = self.total() - if will_release { 1 } else { 0 }; let target = self.target(); + // If we have more connections than our target, we are not hungry. We // may still be hungry if current == target. if current > target { @@ -195,46 +200,64 @@ pub struct PoolConstraints { impl PoolConstraints { /// Adjust the quota targets for each block within the pool - pub fn adjust<'a, 'b, T, U>(&self, it: &'a U) + pub fn adjust<'a, 'b, T, U>(&self, it: &'a U, db: Option<&str>) where U: VisitPoolAlgoData, T: 'b, T: PoolAlgorithmDataBlock, { // Once we've adjusted the constraints, reset the max settings - defer!(it.reset_max()); + defer!(if db.is_none() { + it.reset_max() + }); // First, compute the overall request load and number of backend targets let mut total_demand = 0; let mut total_target = 0; + let mut s = "".to_owned(); - it.with_all(|_name, data| { + it.with_all(|name, data| { // We calculate demand based on the estimated connection active time // multiplied by the active + waiting counts. This gives us an // estimated database time statistic we can use for relative // weighting. - let demand = data.avg_ms(MetricVariant::Active).max(MIN_ACTIVE_TIME) - * (data.max(MetricVariant::Waiting) * DEMAND_WEIGHT_WAITING - + data.max(MetricVariant::Active) * DEMAND_WEIGHT_ACTIVE); - data.insert_demand(demand as _); - let demand = data.demand(); + let active = data.max(MetricVariant::Active); + let active_ms = data.avg_ms(MetricVariant::Active).max(MIN_ACTIVE_TIME); + let waiting = data.max(MetricVariant::Waiting); + let idle = active == 0 && waiting == 0; + let demand = if Some(name.as_ref()) == db { + DEMAND_MINIMUM + } else if idle { + 0 + } else { + // Note that we add DEMAND_HISTORY_LENGTH to ensure the average is non-zero + let demand = (active_ms + * (waiting * DEMAND_WEIGHT_WAITING + active * DEMAND_WEIGHT_ACTIVE)) + .max(DEMAND_MINIMUM); + demand + }; + if db.is_none() || Some(name.as_ref()) == db { + data.insert_demand(demand as _); + } + let demand_avg = data.demand(); + + if tracing::enabled!(tracing::Level::TRACE) { + s += &format!( + "{name}={demand_avg}/{demand} (a={},w={},t={}ms) ", + active, waiting, active_ms + ); + } - total_demand += demand; - total_target += 1; + total_demand += demand_avg; + if demand_avg > 0 { + total_target += 1; + } else { + data.set_target(0); + } }); if tracing::enabled!(tracing::Level::TRACE) { - let mut s = "".to_owned(); - it.with_all(|name, data| { - let demand = data.demand(); - s += &format!( - "{name}={demand} (a={},w={},t={}ms) ", - data.max(MetricVariant::Active), - data.max(MetricVariant::Waiting), - data.avg_ms(MetricVariant::Active) - ); - }); - trace!("Demand: {}", s); + trace!("Demand: {total_target} {}", s); } // Empty pool, no math @@ -251,6 +274,9 @@ impl PoolConstraints { it.with_all(|_name, data| { let demand = data.demand(); + if demand == 0 { + return; + } // Give everyone what they requested, plus a share of the spare // capacity. If there is leftover spare capacity, that capacity @@ -279,12 +305,16 @@ impl PoolConstraints { let mut allocate = vec![]; let mut made_changes = false; - for _ in 0..MAX_REBALANCE_CREATE { + for i in 0..MAX_REBALANCE_CREATE { it.with_all(|name, block| { if block.target() > block.total() && current_pool_size < max_pool_size { allocate.push(RebalanceOp::Create(name.clone())); current_pool_size += 1; made_changes = true; + } else if block.total() > block.target() && block.count(MetricVariant::Idle) > i + { + allocate.push(RebalanceOp::Close(name.clone())); + made_changes = true; } }); if !made_changes { @@ -300,14 +330,27 @@ impl PoolConstraints { let mut overloaded = vec![]; let mut hungriest = vec![]; + let mut s1 = "".to_owned(); + let mut s2 = "".to_owned(); + it.with_all(|name, block| { if let Some(value) = block.hunger_score(false) { + if tracing::enabled!(tracing::Level::TRACE) { + s1 += &format!("{name}={value} "); + } hungriest.push((value, name.clone())) } else if let Some(value) = block.overfull_score(false) { + if tracing::enabled!(tracing::Level::TRACE) { + s2 += &format!("{name}={value} "); + } overloaded.push((value, name.clone())) } }); + if tracing::enabled!(tracing::Level::TRACE) { + trace!("Hunger: {s1}"); + trace!("Overfullness: {s2}"); + } overloaded.sort(); hungriest.sort(); @@ -316,6 +359,11 @@ impl PoolConstraints { // TODO: rebalance more than one? loop { let Some((_, to)) = hungriest.pop() else { + // TODO: close more than one? + while let Some((_, from)) = overloaded.pop() { + tasks.push(RebalanceOp::Close(from.clone())); + break; + } break; }; @@ -337,7 +385,7 @@ impl PoolConstraints { { // If the block is new, we need to perform an initial adjustment. if it.ensure_block(db) { - self.adjust(it); + self.adjust(it, Some(db)); } let target_block_size = it.target(db); @@ -398,6 +446,7 @@ impl PoolConstraints { trace!("Block {db} is overfull ({overfull}), trying to release"); let mut max = 0; let mut which = None; + let mut s = "".to_owned(); it.with_all(|name, block| { let is_self = &**name == db; if let Some(hunger) = block.hunger_score(is_self) { @@ -406,6 +455,10 @@ impl PoolConstraints { if is_self { hunger += SELF_HUNGER_BOOST_FOR_RELEASE; } + + if tracing::enabled!(tracing::Level::TRACE) { + s += &format!("{name}={hunger} "); + } // If this current block has equal hunger to the hungriest, it takes priority if hunger > max { which = if is_self { None } else { Some(name.clone()) }; @@ -414,6 +467,10 @@ impl PoolConstraints { } }); + if tracing::enabled!(tracing::Level::TRACE) { + trace!("Hunger: {s}"); + } + match which { Some(name) => { trace!("Releasing to {name:?} with score {max}"); @@ -488,7 +545,7 @@ mod tests { .await } - #[test(tokio::test(flavor = "current_thread"))] + #[test(tokio::test(flavor = "current_thread", start_paused = true))] async fn test_pool_starved() -> Result<()> { LocalSet::new() .run_until(async { @@ -496,6 +553,7 @@ mod tests { let config = PoolConfig::suggested_default_for(10); let blocks = Blocks::default(); let futures = FuturesUnordered::new(); + // Room for these for i in 0..5 { let db = format!("{i}"); assert_eq!( diff --git a/edb/server/conn_pool/src/block.rs b/edb/server/conn_pool/src/block.rs index eb591e1ab876..9c0e1847570f 100644 --- a/edb/server/conn_pool/src/block.rs +++ b/edb/server/conn_pool/src/block.rs @@ -117,7 +117,8 @@ impl Block { assert_eq!( self.conn_count(), self.conns.borrow().len(), - "Blocks failed consistency check. Total connection count was wrong." + "Block {} failed consistency check. Total connection count was wrong.", + self.db_name ); let conn_metrics = MetricsAccum::default(); for conn in &*self.conns.borrow() { @@ -142,11 +143,20 @@ impl Block { None } + fn try_get_used(&self) -> Option> { + for conn in &*self.conns.borrow() { + if conn.variant() == MetricVariant::Idle { + return Some(conn.clone()); + } + } + None + } + fn try_take_used(&self) -> Option> { let mut lock = self.conns.borrow_mut(); let pos = lock .iter() - .position(|conn| conn.try_lock(&self.state.metrics)); + .position(|conn| conn.variant() == MetricVariant::Idle); if let Some(index) = pos { let conn = lock.remove(index); return Some(conn); @@ -233,9 +243,7 @@ impl Block { fn task_close_one(self: Rc, connector: &C) -> impl Future> { let conn = { consistency_check!(self); - let conn = self - .try_acquire_used() - .expect("Could not acquire a connection"); + let conn = self.try_get_used().expect("Could not acquire a connection"); conn.close(connector, &self.state.metrics); conn }; @@ -726,6 +734,8 @@ mod tests { blocks.create(&connector, "db").await?; blocks.create(&connector, "db").await?; blocks.create(&connector, "db").await?; + blocks.metrics("db").reset_max(); + blocks.metrics("db2").reset_max(); assert_eq!(1, blocks.block_count()); assert_block!(blocks "db" has 3 Idle); assert_block!(blocks "db2" is empty); @@ -736,6 +746,9 @@ mod tests { assert_eq!(2, blocks.block_count()); assert_block!(blocks "db" is empty); assert_block!(blocks "db2" has 3 Idle); + // Should not activate a connection to steal it + assert_eq!(0, blocks.metrics("db").max(MetricVariant::Active)); + assert_eq!(0, blocks.metrics("db2").max(MetricVariant::Active)); Ok(()) } @@ -747,6 +760,8 @@ mod tests { blocks.create(&connector, "db").await?; blocks.create(&connector, "db").await?; blocks.create(&connector, "db").await?; + blocks.metrics("db").reset_max(); + blocks.metrics("db2").reset_max(); assert_eq!(1, blocks.block_count()); assert_block!(blocks "db" has 3 Idle); assert_block!(blocks "db2" is empty); @@ -755,6 +770,9 @@ mod tests { assert_eq!(2, blocks.block_count()); assert_block!(blocks "db" has 2 Idle); assert_block!(blocks "db2" has 1 Idle); + // Should not activate a connection to move it + assert_eq!(1, blocks.metrics("db").max(MetricVariant::Active)); + assert_eq!(0, blocks.metrics("db2").max(MetricVariant::Active)); Ok(()) } @@ -765,6 +783,7 @@ mod tests { assert_eq!(0, blocks.block_count()); blocks.create(&connector, "db").await?; blocks.create(&connector, "db").await?; + blocks.metrics("db").reset_max(); assert_eq!(1, blocks.block_count()); assert_block!(blocks "db" has 2 Idle); blocks.task_close_one(&connector, "db").await?; @@ -772,6 +791,8 @@ mod tests { assert_block!(blocks "db" is empty); // Hasn't GC'd yet assert_eq!(1, blocks.block_count()); + // Should not activate a connection to close it + assert_eq!(0, blocks.metrics("db").max(MetricVariant::Active)); Ok(()) } } diff --git a/edb/server/conn_pool/src/conn.rs b/edb/server/conn_pool/src/conn.rs index 3630a78f32cb..7963ecd71774 100644 --- a/edb/server/conn_pool/src/conn.rs +++ b/edb/server/conn_pool/src/conn.rs @@ -100,9 +100,9 @@ impl Conn { pub fn close(&self, connector: &C, metrics: &MetricsAccum) { self.transition(|inner| match inner { - ConnInner::Active(t, conn, ..) => { + ConnInner::Idle(t, conn, ..) => { metrics.transition( - MetricVariant::Active, + MetricVariant::Idle, MetricVariant::Disconnecting, t.elapsed(), ); @@ -116,7 +116,13 @@ impl Conn { pub fn transfer(&self, connector: &C, from: &MetricsAccum, to: &MetricsAccum, db: &str) { self.untrack(from); self.transition(|inner| match inner { - ConnInner::Active(_, conn, ..) => { + ConnInner::Idle(t, conn, ..) => { + from.inc_all_time(MetricVariant::Disconnecting); + to.insert(MetricVariant::Connecting); + let f = connector.reconnect(conn, db).boxed_local(); + ConnInner::Connecting(Instant::now(), f) + } + ConnInner::Active(t, conn, ..) => { from.inc_all_time(MetricVariant::Disconnecting); to.insert(MetricVariant::Connecting); let f = connector.reconnect(conn, db).boxed_local(); diff --git a/edb/server/conn_pool/src/pool.rs b/edb/server/conn_pool/src/pool.rs index c33b256a55ea..dc18504aa3e5 100644 --- a/edb/server/conn_pool/src/pool.rs +++ b/edb/server/conn_pool/src/pool.rs @@ -161,11 +161,11 @@ impl Pool { /// and other important async tasks. This should happen only if something has changed in /// the pool. pub fn run_once(&self) { - if !self.dirty.take() { - return; - } + // if !self.dirty.take() { + // return; + // } - self.config.constraints.adjust(&self.blocks); + self.config.constraints.adjust(&self.blocks, None); let mut s = String::new(); self.blocks.with_all(|name, block| { s += &format!("{name}={} ", block.target()); @@ -180,6 +180,9 @@ impl Pool { RebalanceOp::Create(name) => { tokio::task::spawn_local(self.blocks.task_create_one(&self.connector, &name)); } + RebalanceOp::Close(name) => { + tokio::task::spawn_local(self.blocks.task_close_one(&self.connector, &name)); + } } } } @@ -327,7 +330,11 @@ mod tests { for (name, block) in pool.metrics().blocks { s += &format!("{name}={} ", block.total); } - trace!("Blocks: {s}"); + trace!( + "Blocks: {}/{} {s}", + pool.metrics().pool.total, + pool.config.constraints.max + ); virtual_sleep(Duration::from_millis(100)).await; } }); @@ -426,7 +433,7 @@ mod tests { s += &format!("{name}={} ", block.total); } if !s.is_empty() && s != orig { - trace!("Blocks: {s}"); + trace!("Blocks: {}/{} {s}", pool.metrics().pool.total, pool.config.constraints.max); orig = s; } virtual_sleep(Duration::from_millis(10)).await; @@ -456,7 +463,7 @@ mod tests { let metrics = pool.metrics(); let mut qos = 0.0; for score in spec.score { - qos += score.score(&latencies, &metrics); + qos += score.score(&latencies, &metrics, &pool.config); } info!("[{}] QoS = {qos:0.02}", spec.name); @@ -1112,11 +1119,11 @@ mod tests { #[test(tokio::test(flavor = "current_thread", start_paused = true))] async fn test_connpool_10() -> Result<()> { - let full_qps = 10000; + let full_qps = 2000; let spec = Spec { name: "test_connpool_10", timeout: 10, - duration: 1.1, + duration: 2.0, capacity: 100, conn_cost: Triangle(0.01, 0.005), score: vec![EndingCapacity { diff --git a/edb/server/conn_pool/src/test.rs b/edb/server/conn_pool/src/test.rs index 555f74ddede8..6120ce5a1155 100644 --- a/edb/server/conn_pool/src/test.rs +++ b/edb/server/conn_pool/src/test.rs @@ -11,6 +11,7 @@ use tracing::info; use crate::{ conn::{ConnResult, Connector}, metrics::{MetricVariant, PoolMetrics}, + PoolConfig, }; #[derive(derive_more::Debug)] @@ -158,7 +159,7 @@ pub struct Spec { } pub trait ScoringMethod { - fn score(&self, latencies: &Latencies, metrics: &PoolMetrics) -> f64; + fn score(&self, latencies: &Latencies, metrics: &PoolMetrics, config: &PoolConfig) -> f64; } pub struct Score { @@ -205,7 +206,7 @@ pub struct LatencyDistribution { } impl ScoringMethod for LatencyDistribution { - fn score(&self, latencies: &Latencies, metrics: &PoolMetrics) -> f64 { + fn score(&self, latencies: &Latencies, metrics: &PoolMetrics, config: &PoolConfig) -> f64 { let dbs = self.group.clone().map(|t| format!("t{t}")).collect_vec(); let mut data = latencies.data.borrow_mut(); // Calculates the average CV (coefficient of variation) of the given @@ -249,7 +250,7 @@ pub struct ConnectionOverhead { } impl ScoringMethod for ConnectionOverhead { - fn score(&self, latencies: &Latencies, metrics: &PoolMetrics) -> f64 { + fn score(&self, latencies: &Latencies, metrics: &PoolMetrics, config: &PoolConfig) -> f64 { let count = metrics.all_time[MetricVariant::Disconnecting] as f64 / latencies.len() as f64; info!( "Num of disconnects/query = {count} ({})", @@ -267,7 +268,7 @@ pub struct LatencyRatio { } impl ScoringMethod for LatencyRatio { - fn score(&self, latencies: &Latencies, metrics: &PoolMetrics) -> f64 { + fn score(&self, latencies: &Latencies, metrics: &PoolMetrics, config: &PoolConfig) -> f64 { let mut data = latencies.data.borrow_mut(); let dbs = self.divisor.clone().map(|t| format!("t{t}")).collect_vec(); let divisor = dbs @@ -302,8 +303,14 @@ pub struct EndingCapacity { } impl ScoringMethod for EndingCapacity { - fn score(&self, latencies: &Latencies, metrics: &PoolMetrics) -> f64 { - self.score.weight + fn score(&self, latencies: &Latencies, metrics: &PoolMetrics, config: &PoolConfig) -> f64 { + info!( + "Ending capacity = {} ({})", + (metrics.pool.total), + self.score.calculate((metrics.pool.total) as _) + ); + + self.score.calculate((metrics.pool.total) as _) * self.score.weight } } @@ -314,7 +321,7 @@ pub struct AbsoluteLatency { } impl ScoringMethod for AbsoluteLatency { - fn score(&self, latencies: &Latencies, metrics: &PoolMetrics) -> f64 { + fn score(&self, latencies: &Latencies, metrics: &PoolMetrics, config: &PoolConfig) -> f64 { let mut data = latencies.data.borrow_mut(); let dbs = self.group.clone().map(|t| format!("t{t}")).collect_vec(); let value = dbs