diff --git a/edb/server/conn_pool/src/algo.rs b/edb/server/conn_pool/src/algo.rs index f6be10b93ddd..8aed60ac046b 100644 --- a/edb/server/conn_pool/src/algo.rs +++ b/edb/server/conn_pool/src/algo.rs @@ -1,9 +1,9 @@ -use scopeguard::defer; use std::cell::{Cell, RefCell}; use tracing::trace; use crate::{ block::Name, + drain::Drain, metrics::{MetricVariant, RollingAverageU32}, }; @@ -199,13 +199,6 @@ pub enum RebalanceOp { Close(Name), } -/// Determines the shutdown plan based on the current pool state. -#[derive(Debug, Clone, PartialEq, Eq)] -pub enum ShutdownOp { - /// Garbage collect a block. - Close(Name), -} - /// Determines the acquire plan based on the current pool state. #[derive(Debug, Clone, PartialEq, Eq)] pub enum AcquireOp { @@ -255,8 +248,6 @@ pub trait VisitPoolAlgoData: PoolAlgorithmDataPool { fn with_all(&self, f: impl FnMut(&Name, &Self::Block)); /// Retreives a single block, returning `None` if the block doesn't exist. fn with(&self, db: &str, f: impl Fn(&Self::Block) -> T) -> Option; - fn in_shutdown(&self) -> bool; - fn is_draining(&self, db: &str) -> bool; #[inline] fn target(&self, db: &str) -> usize { @@ -386,6 +377,7 @@ pub trait PoolAlgorithmDataBlock: PoolAlgorithmDataMetrics { /// estimated database time statistic we can use for relative /// weighting. fn demand_score(&self) -> usize { + // This gives us an approximate count of incoming connection load during the last period let active = self.max(MetricVariant::Active); let active_ms = self.avg_ms(MetricVariant::Active).max(MIN_TIME.get()); let waiting = self.max(MetricVariant::Waiting); @@ -436,23 +428,42 @@ pub struct PoolConstraints { pub max: usize, } -impl PoolConstraints { +/// The algorithm runs against these data structures. +pub struct AlgoState<'a, V: VisitPoolAlgoData> { + pub drain: &'a Drain, + pub it: &'a V, + pub constraints: &'a PoolConstraints, +} + +impl<'a, V: VisitPoolAlgoData> AlgoState<'a, V> { /// Recalculate the quota targets for each block within the pool/ - pub fn recalculate_shares(&self, it: &impl VisitPoolAlgoData) { + fn recalculate_shares(&self, update_demand: bool) { // First, compute the overall request load and number of backend targets let mut total_demand = 0; let mut total_target = 0; let mut s = "".to_owned(); - it.with_all(|name, data| { - let demand_avg = data.demand(); + self.it.with_all(|name, data| { + // Draining targets act like they have demand zero + if self.drain.is_draining(name) { + if tracing::enabled!(tracing::Level::TRACE) { + s += &format!("{name}=drain "); + } + return; + } + + if update_demand { + let demand = data.demand_score(); + data.insert_demand(demand as _); + } + let demand = data.demand(); if tracing::enabled!(tracing::Level::TRACE) { - s += &format!("{name}={demand_avg} ",); + s += &format!("{name}={demand} "); } - total_demand += demand_avg as usize; - if demand_avg > 0 { + total_demand += demand as usize; + if demand > 0 { total_target += 1; } else { data.set_target(0); @@ -463,70 +474,44 @@ impl PoolConstraints { trace!("Demand: {total_target} {}", s); } - self.allocate_demand(it, total_target, total_demand); + self.allocate_demand(total_target, total_demand); } /// Adjust the quota targets for each block within the pool. - pub fn adjust(&self, it: &impl VisitPoolAlgoData) { - // Once we've adjusted the constraints, reset the max settings - defer!(it.reset_max()); + pub fn adjust(&self) { + self.recalculate_shares(true); - // First, compute the overall request load and number of backend targets - let mut total_demand = 0_usize; - let mut total_target = 0; - let mut s = "".to_owned(); - - it.with_all(|name, data| { - let demand = data.demand_score(); - data.insert_demand(demand as _); - let demand_avg = data.demand(); - - if tracing::enabled!(tracing::Level::TRACE) { - s += &format!("{name}={demand_avg}/{demand}",); - } - - total_demand += demand_avg as usize; - if demand_avg > 0 { - total_target += 1; - } else { - data.set_target(0); - } - }); - - if tracing::enabled!(tracing::Level::TRACE) { - trace!("Demand: {total_target} {}", s); - } - - self.allocate_demand(it, total_target, total_demand); + // Once we've adjusted the constraints, reset the max settings + self.it.reset_max(); } /// Allocate the calculated demand to target quotas. - fn allocate_demand( - &self, - it: &impl VisitPoolAlgoData, - total_target: usize, - total_demand: usize, - ) { + fn allocate_demand(&self, total_target: usize, total_demand: usize) { // Empty pool, no math if total_target == 0 || total_demand == 0 { + self.it.with_all(|_name, data| { + data.set_target(0); + }); return; } let mut allocated = 0; + let max = self.constraints.max; // This is the minimum number of connections we'll allocate to any particular // backend regardless of demand if there are less backends than the capacity. - let min = (self.max / total_target).min(MAXIMUM_SHARED_TARGET.get()); + let min = (max / total_target).min(MAXIMUM_SHARED_TARGET.get()); // The remaining capacity after we allocated the `min` value above. - let capacity = self.max - min * total_target; + let capacity = max - min * total_target; if min == 0 { - it.with_all(|_name, data| { + self.it.with_all(|_name, data| { data.set_target(0); }); } else { - it.with_all(|_name, data| { + self.it.with_all(|name, data| { let demand = data.demand(); - if demand == 0 { + if demand == 0 || self.drain.is_draining(name) { + data.set_target(0); return; } @@ -541,23 +526,31 @@ impl PoolConstraints { }); } + if tracing::enabled!(tracing::Level::TRACE) { + let mut s = String::new(); + self.it.with_all(|name, block| { + s += &format!("{name}={}/{} ", block.target(), block.total()); + }); + trace!("Targets: {s}"); + } + debug_assert!( - allocated <= self.max, + allocated <= max, "Attempted to allocate more than we were allowed: {allocated} > {} \ (req={total_demand}, target={total_target})", - self.max + max ); } /// Plan a shutdown. - pub fn plan_shutdown(&self, it: &impl VisitPoolAlgoData) -> Vec { + fn plan_shutdown(&self) -> Vec { let mut ops = vec![]; - it.with_all(|name, block| { + self.it.with_all(|name, block| { let idle = block.count(MetricVariant::Idle); let failed = block.count(MetricVariant::Failed); for _ in 0..(idle + failed) { - ops.push(ShutdownOp::Close(name.clone())); + ops.push(RebalanceOp::Close(name.clone())); } }); ops @@ -565,34 +558,38 @@ impl PoolConstraints { /// Plan a rebalance to better match the target quotas of the blocks in the /// pool. - pub fn plan_rebalance(&self, it: &impl VisitPoolAlgoData) -> Vec { + pub fn plan_rebalance(&self) -> Vec { // In shutdown, we just want to close all idle connections where possible. - if it.in_shutdown() { - let mut ops = vec![]; - it.with_all(|name, block| { - for _ in 0..block.count(MetricVariant::Idle) { - ops.push(RebalanceOp::Close(name.clone())) - } - }); - return ops; + if self.drain.in_shutdown() { + return self.plan_shutdown(); } - let mut current_pool_size = it.total(); - let max_pool_size = self.max; + let mut current_pool_size = self.it.total(); + let max_pool_size = self.constraints.max; + let mut tasks = vec![]; + + // TODO: These could potentially be transferred instead, but + // drain/shutdown are pretty unlikely. + if self.drain.are_any_draining() { + self.it.with_all(|name, data| { + if self.drain.is_draining(name) { + for _ in 0..data.count(MetricVariant::Idle) + data.count(MetricVariant::Failed) + { + tasks.push(RebalanceOp::Close(name.clone())) + } + } + }) + } // If there's room in the pool, we can be more aggressive in // how we allocate. if current_pool_size < max_pool_size { - let mut changes = vec![]; let mut made_changes = false; for i in 0..MAX_REBALANCE_OPS.get() { - it.with_all(|name, block| { - // TODO: We should swap the order of iteration here so we - // can drain the whole block - if it.is_draining(name) && block.count(MetricVariant::Idle) > i { - changes.push(RebalanceOp::Close(name.clone())); - made_changes = true; + self.it.with_all(|name, block| { + // Drains are handled at the start of this function + if self.drain.is_draining(name) { return; } @@ -605,7 +602,7 @@ impl PoolConstraints { > (block.total() + i) .saturating_sub(MIN_REBALANCE_HEADROOM_TO_CREATE.get()) { - changes.push(RebalanceOp::Create(name.clone())); + tasks.push(RebalanceOp::Create(name.clone())); current_pool_size += 1; made_changes = true; } else if block.total() > block.target() @@ -617,7 +614,7 @@ impl PoolConstraints { // around, we'll try to keep a few excess connections if // nobody else wants them. Otherwise, we'll just try to close // all the idle connections over time. - changes.push(RebalanceOp::Close(name.clone())); + tasks.push(RebalanceOp::Close(name.clone())); made_changes = true; } }); @@ -626,7 +623,7 @@ impl PoolConstraints { } } - return changes; + return tasks; } // For any block with less connections than its quota that has @@ -638,15 +635,9 @@ impl PoolConstraints { let mut s1 = "".to_owned(); let mut s2 = "".to_owned(); - let mut tasks = vec![]; - - it.with_all(|name, block| { - // TODO: These could potentially be transferred instead, but - // drain/shutdown are pretty unlikely. - if it.is_draining(name) { - for _ in 0..block.count(MetricVariant::Idle) { - tasks.push(RebalanceOp::Close(name.clone())) - } + self.it.with_all(|name, block| { + // Drains are handled at the start of this function + if self.drain.is_draining(name) { return; } @@ -698,21 +689,24 @@ impl PoolConstraints { } /// Plan a connection acquisition. - pub fn plan_acquire(&self, db: &str, it: &impl VisitPoolAlgoData) -> AcquireOp { - if it.in_shutdown() { + pub fn plan_acquire(&self, db: &str) -> AcquireOp { + if self.drain.in_shutdown() { return AcquireOp::FailInShutdown; } // If the block is new, we need to perform an initial adjustment to // ensure this block gets some capacity. - if it.ensure_block(db, DEMAND_MINIMUM.get() * DEMAND_HISTORY_LENGTH) { - self.recalculate_shares(it); + if self + .it + .ensure_block(db, DEMAND_MINIMUM.get() * DEMAND_HISTORY_LENGTH) + { + self.recalculate_shares(false); } - let target_block_size = it.target(db); - let current_block_size = it.with(db, |data| data.total()).unwrap_or_default(); - let current_pool_size = it.total(); - let max_pool_size = self.max; + let target_block_size = self.it.target(db); + let current_block_size = self.it.with(db, |data| data.total()).unwrap_or_default(); + let current_pool_size = self.it.total(); + let max_pool_size = self.constraints.max; let pool_is_full = current_pool_size >= max_pool_size; if !pool_is_full { @@ -725,7 +719,7 @@ impl PoolConstraints { if pool_is_full && block_has_room { let mut max = isize::MIN; let mut which = None; - it.with_all(|name, block| { + self.it.with_all(|name, block| { if let Some(overfullness) = block.overfull_score(false) { if overfullness > max { which = Some(name.clone()); @@ -745,13 +739,8 @@ impl PoolConstraints { } /// Plan a connection release. - pub fn plan_release( - &self, - db: &str, - release_type: ReleaseType, - it: &impl VisitPoolAlgoData, - ) -> ReleaseOp { - if it.is_draining(db) { + pub fn plan_release(&self, db: &str, release_type: ReleaseType) -> ReleaseOp { + if self.drain.is_draining(db) { return ReleaseOp::Discard; } @@ -759,20 +748,20 @@ impl PoolConstraints { return ReleaseOp::Reopen; } - let current_pool_size = it.total(); - let max_pool_size = self.max; + let current_pool_size = self.it.total(); + let max_pool_size = self.constraints.max; if current_pool_size < max_pool_size { trace!("Pool has room, keeping connection"); return ReleaseOp::Release; } // We only want to consider a release elsewhere if this block is overfull - if let Some(Some(overfull)) = it.with(db, |block| block.overfull_score(true)) { + if let Some(Some(overfull)) = self.it.with(db, |block| block.overfull_score(true)) { trace!("Block {db} is overfull ({overfull}), trying to release"); let mut max = isize::MIN; let mut which = None; let mut s = "".to_owned(); - it.with_all(|name, block| { + self.it.with_all(|name, block| { let is_self = &**name == db; if let Some(mut hunger) = block.hunger_score(is_self) { // Penalize switching by boosting the current database's relative hunger here @@ -827,29 +816,34 @@ mod tests { let connector = BasicConnector::no_delay(); let config = PoolConfig::suggested_default_for(10); let blocks = Blocks::default(); - let algo = &config.constraints; + let drain = Drain::default(); + let algo = AlgoState { + constraints: &config.constraints, + drain: &drain, + it: &blocks, + }; let futures = FuturesUnordered::new(); - for i in (0..algo.max).map(Name::from) { - assert_eq!(algo.plan_acquire(&i, &blocks), AcquireOp::Create); + for i in (0..algo.constraints.max).map(Name::from) { + assert_eq!(algo.plan_acquire(&i), AcquireOp::Create); futures.push(blocks.create_if_needed(&connector, &i)); } let conns: Vec<_> = futures.collect().await; let futures = FuturesUnordered::new(); - for i in (0..algo.max).map(Name::from) { - assert_eq!(algo.plan_acquire(&i, &blocks), AcquireOp::Wait); + for i in (0..algo.constraints.max).map(Name::from) { + assert_eq!(algo.plan_acquire(&i), AcquireOp::Wait); futures.push(blocks.queue(&i)); } for conn in conns { assert_eq!( - algo.plan_release(&conn?.state.db_name, ReleaseType::Normal, &blocks), + algo.plan_release(&conn?.state.db_name, ReleaseType::Normal), ReleaseOp::Release ); } let conns: Vec<_> = futures.collect().await; for conn in conns { assert_eq!( - algo.plan_release(&conn?.state.db_name, ReleaseType::Normal, &blocks), + algo.plan_release(&conn?.state.db_name, ReleaseType::Normal), ReleaseOp::Release ); } @@ -866,25 +860,30 @@ mod tests { let future = async { let connector = BasicConnector::no_delay(); let config = PoolConfig::suggested_default_for(10); - let algo = &config.constraints; let blocks = Blocks::default(); + let drain = Drain::default(); + let algo = AlgoState { + constraints: &config.constraints, + drain: &drain, + it: &blocks, + }; // Room for these let futures = FuturesUnordered::new(); for db in (0..5).map(Name::from) { - assert_eq!(algo.plan_acquire(&db, &blocks), AcquireOp::Create); + assert_eq!(algo.plan_acquire(&db), AcquireOp::Create); futures.push(blocks.create_if_needed(&connector, &db)); } // ... and these let futures2 = FuturesUnordered::new(); for db in (5..10).map(Name::from) { - assert_eq!(algo.plan_acquire(&db, &blocks), AcquireOp::Create); + assert_eq!(algo.plan_acquire(&db), AcquireOp::Create); futures2.push(blocks.create_if_needed(&connector, &db)); } // But not these (yet) let futures3 = FuturesUnordered::new(); for db in (10..15).map(Name::from) { - assert_eq!(algo.plan_acquire(&db, &blocks), AcquireOp::Wait); + assert_eq!(algo.plan_acquire(&db), AcquireOp::Wait); futures3.push(blocks.queue(&db)); } let conns: Vec<_> = futures.collect().await; @@ -892,7 +891,7 @@ mod tests { // These are released to 10..15 for conn in conns { let conn = conn?; - let res = algo.plan_release(&conn.state.db_name, ReleaseType::Normal, &blocks); + let res = algo.plan_release(&conn.state.db_name, ReleaseType::Normal); let ReleaseOp::ReleaseTo(to) = res else { panic!("Wrong release: {res:?}"); }; @@ -901,7 +900,7 @@ mod tests { // These don't have anywhere to go for conn in conns2 { let conn = conn?; - let res = algo.plan_release(&conn.state.db_name, ReleaseType::Normal, &blocks); + let res = algo.plan_release(&conn.state.db_name, ReleaseType::Normal); let ReleaseOp::Release = res else { panic!("Wrong release: {res:?}"); }; diff --git a/edb/server/conn_pool/src/block.rs b/edb/server/conn_pool/src/block.rs index fad76d4c4c1e..e95fa2c68f8b 100644 --- a/edb/server/conn_pool/src/block.rs +++ b/edb/server/conn_pool/src/block.rs @@ -4,13 +4,14 @@ use crate::{ PoolAlgorithmDataPool, VisitPoolAlgoData, }, conn::*, + drain::Drain, metrics::{MetricVariant, MetricsAccum, PoolMetrics}, time::Instant, waitqueue::WaitQueue, }; use futures::future::Either; use std::{ - cell::{Cell, RefCell}, + cell::RefCell, collections::HashMap, future::{poll_fn, ready, Future}, rc::Rc, @@ -384,7 +385,6 @@ impl Block { pub struct Blocks { map: RefCell>>>, metrics: Rc, - pub drain: Drain } impl Default for Blocks { @@ -392,7 +392,6 @@ impl Default for Blocks { Self { map: RefCell::new(HashMap::default()), metrics: Rc::new(MetricsAccum::default()), - drain: Default::default() } } } @@ -510,14 +509,6 @@ impl VisitPoolAlgoData for Blocks { } }); } - - fn in_shutdown(&self) -> bool { - self.drain.shutdown.get() - } - - fn is_draining(&self, db: &str) -> bool { - self.drain.is_draining(db) - } } impl Blocks { @@ -710,79 +701,6 @@ impl Blocks { } } -impl AsRef for Blocks { - fn as_ref(&self) -> &Drain { - &self.drain - } -} - -/// Holds the current drainage and shutdown state for the `Pool`. -#[derive(Default, Debug)] -pub struct Drain { - drain_all: Cell, - drain: RefCell>, - pub shutdown: Cell, -} - -impl Drain { - pub fn shutdown(&self) { - self.shutdown.set(true) - } - - /// Lock all connections for draining. - pub fn lock_all>(this: T) -> DrainLock { - let drain = this.as_ref(); - drain.drain_all.set(drain.drain_all.get() + 1); - DrainLock { - db: None, - has_drain: this, - } - } - - // Lock a specific connection for draining. - pub fn lock>(this: T, db: Name) -> DrainLock { - { - let mut drain = this.as_ref().drain.borrow_mut(); - drain.entry(db.clone()).and_modify(|v| *v += 1).or_default(); - } - DrainLock { - db: Some(db), - has_drain: this, - } - } - - /// Is this connection draining? - fn is_draining(&self, db: &str) -> bool { - self.drain_all.get() > 0 || self.drain.borrow().contains_key(db) || self.shutdown.get() - } -} - -/// Provides a RAII lock for a db- or whole-pool drain operation. -pub struct DrainLock> { - db: Option, - has_drain: T, -} - -impl> Drop for DrainLock { - fn drop(&mut self) { - if let Some(name) = self.db.take() { - let mut drain = self.has_drain.as_ref().drain.borrow_mut(); - if let Some(count) = drain.get_mut(&name) { - if *count > 1 { - *count -= 1; - } else { - drain.remove(&name); - } - } else { - unreachable!() - } - } else { - let this = self.has_drain.as_ref(); - this.drain_all.set(this.drain_all.get() - 1); - } - } -} - #[cfg(test)] mod tests { use super::*; diff --git a/edb/server/conn_pool/src/drain.rs b/edb/server/conn_pool/src/drain.rs new file mode 100644 index 000000000000..4f54f83095c8 --- /dev/null +++ b/edb/server/conn_pool/src/drain.rs @@ -0,0 +1,81 @@ +use crate::block::Name; +use std::{ + cell::{Cell, RefCell}, + collections::HashMap, +}; + +/// Holds the current drainage and shutdown state for the `Pool`. +#[derive(Default, Debug)] +pub struct Drain { + drain_all: Cell, + drain: RefCell>, + shutdown: Cell, +} + +impl Drain { + pub fn shutdown(&self) { + self.shutdown.set(true) + } + + pub fn in_shutdown(&self) -> bool { + self.shutdown.get() + } + + /// Lock all connections for draining. + pub fn lock_all>(this: T) -> DrainLock { + let drain = this.as_ref(); + drain.drain_all.set(drain.drain_all.get() + 1); + DrainLock { + db: None, + has_drain: this, + } + } + + // Lock a specific connection for draining. + pub fn lock>(this: T, db: Name) -> DrainLock { + { + let mut drain = this.as_ref().drain.borrow_mut(); + drain.entry(db.clone()).and_modify(|v| *v += 1).or_default(); + } + DrainLock { + db: Some(db), + has_drain: this, + } + } + + /// Is this connection draining? + pub fn is_draining(&self, db: &str) -> bool { + self.drain_all.get() > 0 || self.drain.borrow().contains_key(db) || self.shutdown.get() + } + + /// Are any connections draining? + pub fn are_any_draining(&self) -> bool { + !self.drain.borrow().is_empty() || self.shutdown.get() || self.drain_all.get() > 0 + } +} + +/// Provides a RAII lock for a db- or whole-pool drain operation. +pub struct DrainLock> { + db: Option, + has_drain: T, +} + +impl> Drop for DrainLock { + fn drop(&mut self) { + if let Some(name) = self.db.take() { + let mut drain = self.has_drain.as_ref().drain.borrow_mut(); + if let Some(count) = drain.get_mut(&name) { + if *count > 1 { + *count -= 1; + } else { + drain.remove(&name); + } + } else { + unreachable!() + } + } else { + let this = self.has_drain.as_ref(); + this.drain_all.set(this.drain_all.get() - 1); + } + } +} diff --git a/edb/server/conn_pool/src/lib.rs b/edb/server/conn_pool/src/lib.rs index 36aa76f4fa84..c88a931d3a11 100644 --- a/edb/server/conn_pool/src/lib.rs +++ b/edb/server/conn_pool/src/lib.rs @@ -1,6 +1,7 @@ pub(crate) mod algo; pub(crate) mod block; pub(crate) mod conn; +pub(crate) mod drain; pub(crate) mod metrics; pub(crate) mod pool; pub(crate) mod waitqueue; diff --git a/edb/server/conn_pool/src/pool.rs b/edb/server/conn_pool/src/pool.rs index a2b7c5f22ec8..3380954c7c26 100644 --- a/edb/server/conn_pool/src/pool.rs +++ b/edb/server/conn_pool/src/pool.rs @@ -1,19 +1,16 @@ use crate::{ algo::{ - AcquireOp, PoolAlgoTargetData, PoolAlgorithmDataBlock, PoolAlgorithmDataMetrics, - PoolConstraints, RebalanceOp, ReleaseOp, ReleaseType, ShutdownOp, VisitPoolAlgoData, + AcquireOp, AlgoState, PoolAlgoTargetData, PoolAlgorithmDataBlock, PoolAlgorithmDataMetrics, + PoolConstraints, RebalanceOp, ReleaseOp, ReleaseType, VisitPoolAlgoData, }, - block::{Blocks, Drain}, + block::Blocks, conn::{ConnError, ConnHandle, ConnResult, Connector}, + drain::Drain, metrics::{MetricVariant, PoolMetrics}, }; use consume_on_drop::{Consume, ConsumeOnDrop}; use derive_more::Debug; -use std::{ - cell::Cell, - rc::Rc, - time::Duration, -}; +use std::{cell::Cell, rc::Rc, time::Duration}; use tracing::trace; #[derive(Debug)] @@ -117,6 +114,7 @@ pub struct Pool { connector: C, config: PoolConfig, blocks: Blocks, + drain: Drain, /// If the pool has been dirtied by acquiring or releasing a connection dirty: Rc>, } @@ -129,11 +127,20 @@ impl Pool { blocks: Default::default(), connector, dirty: Default::default(), + drain: Drain::default(), }) } } impl Pool { + fn algo<'a>(&'a self) -> AlgoState<'a, Blocks> { + AlgoState { + drain: &self.drain, + it: &self.blocks, + constraints: &self.config.constraints, + } + } + /// Runs the required async task that takes care of quota management, garbage collection, /// and other important async tasks. This should happen only if something has changed in /// the pool. @@ -151,27 +158,8 @@ impl Pool { return; } - if self.blocks.drain.shutdown.get() { - for op in self.config.constraints.plan_shutdown(&self.blocks) { - trace!("Shutdown: {op:?}"); - match op { - ShutdownOp::Close(name) => { - tokio::task::spawn_local( - self.blocks.task_close_one(&self.connector, &name), - ); - } - } - } - return; - } - - self.config.constraints.adjust(&self.blocks); - let mut s = String::new(); - self.blocks.with_all(|name, block| { - s += &format!("{name}={}/{} ", block.target(), block.len()); - }); - trace!("Targets: {s}"); - for op in self.config.constraints.plan_rebalance(&self.blocks) { + self.algo().adjust(); + for op in self.algo().plan_rebalance() { trace!("Rebalance: {op:?}"); match op { RebalanceOp::Transfer { from, to } => { @@ -192,7 +180,7 @@ impl Pool { /// back into the pool. pub async fn acquire(self: &Rc, db: &str) -> ConnResult, C::Error> { self.dirty.set(true); - let plan = self.config.constraints.plan_acquire(db, &self.blocks); + let plan = self.algo().plan_acquire(db); trace!("Acquire {db}: {plan:?}"); match plan { AcquireOp::Create => { @@ -201,7 +189,7 @@ impl Pool { AcquireOp::Steal(from) => { tokio::task::spawn_local(self.blocks.task_steal(&self.connector, db, &from)); } - AcquireOp::Wait => {}, + AcquireOp::Wait => {} AcquireOp::FailInShutdown => { return Err(ConnError::Shutdown); } @@ -220,10 +208,7 @@ impl Pool { } else { ReleaseType::Normal }; - let plan = self - .config - .constraints - .plan_release(db, release_type, &self.blocks); + let plan = self.algo().plan_release(db, release_type); trace!("Release: {conn:?} {plan:?}"); match plan { ReleaseOp::Release => {} @@ -315,7 +300,7 @@ impl Pool { /// Shuts this pool down safely. Dropping this future does not cancel /// the shutdown operation. pub async fn shutdown(mut self: Rc) { - self.blocks.drain.shutdown(); + self.drain.shutdown(); let pool = loop { match Rc::try_unwrap(self) { Ok(pool) => break pool, @@ -347,7 +332,7 @@ impl Pool { impl AsRef for Rc> { fn as_ref(&self) -> &Drain { - self.blocks.as_ref() + &self.drain } } diff --git a/edb/server/conn_pool/src/python.rs b/edb/server/conn_pool/src/python.rs index 5974d22b6437..837514c84d6a 100644 --- a/edb/server/conn_pool/src/python.rs +++ b/edb/server/conn_pool/src/python.rs @@ -4,14 +4,17 @@ use crate::{ PoolHandle, }; use futures::future::poll_fn; -use pyo3::{ - exceptions::PyException, prelude::* -}; +use pyo3::{exceptions::PyException, prelude::*}; use std::{ + cell::{Cell, RefCell}, + collections::BTreeMap, + os::fd::IntoRawFd, pin::Pin, - cell::{Cell, RefCell}, collections::BTreeMap, os::fd::IntoRawFd, rc::Rc, thread, time::Duration + rc::Rc, + thread, + time::Duration, }; -use tokio::{task::LocalSet, io::AsyncWrite}; +use tokio::{io::AsyncWrite, task::LocalSet}; use tracing::{error, info, subscriber::DefaultGuard, trace}; use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; @@ -26,6 +29,19 @@ enum RustToPythonMessage { PerformReconnect(ConnHandleId, String), } +impl ToPyObject for RustToPythonMessage { + fn to_object(&self, py: Python<'_>) -> PyObject { + use RustToPythonMessage::*; + match self { + Acquired(a, b) => (0, a, b).to_object(py), + PerformConnect(conn, s) => (1, conn, s).to_object(py), + PerformDisconnect(conn) => (2, conn).to_object(py), + PerformReconnect(conn, s) => (3, conn, s).to_object(py), + Pruned(conn) => (4, conn).to_object(py), + } + } +} + #[derive(Debug)] enum PythonToRustMessage { /// Acquire a connection. @@ -52,7 +68,7 @@ struct RpcPipe { python_to_rust: RefCell>, handles: RefCell>>>, next_id: Cell, - async_ops: RefCell>> + async_ops: RefCell>>, } impl std::fmt::Debug for RpcPipe { @@ -68,7 +84,8 @@ impl RpcPipe { let pipe = &mut *self.rust_to_python_notify.borrow_mut(); let this = Pin::new(pipe); this.poll_write(cx, &[0]) - }).await; + }) + .await; } } @@ -76,7 +93,12 @@ impl Connector for Rc { type Conn = ConnHandleId; type Error = String; - fn connect(&self, db: &str) -> impl futures::Future::Conn, ::Error>> + 'static { + fn connect( + &self, + db: &str, + ) -> impl futures::Future< + Output = ConnResult<::Conn, ::Error>, + > + 'static { let id = self.next_id.get(); self.next_id.set(id + 1); let (tx, rx) = tokio::sync::oneshot::channel(); @@ -91,12 +113,16 @@ impl Connector for Rc { } } - fn disconnect(&self, conn: Self::Conn) -> impl futures::Future::Error>> + 'static { + fn disconnect( + &self, + conn: Self::Conn, + ) -> impl futures::Future::Error>> + 'static { let (tx, rx) = tokio::sync::oneshot::channel(); self.async_ops.borrow_mut().insert(conn, tx); let this = self.clone(); async move { - this.write(RustToPythonMessage::PerformDisconnect(conn)).await; + this.write(RustToPythonMessage::PerformDisconnect(conn)) + .await; // TODO(error) _ = rx.await; Ok(()) @@ -104,10 +130,12 @@ impl Connector for Rc { } fn reconnect( - &self, - conn: Self::Conn, - db: &str, - ) -> impl futures::Future::Conn, ::Error>> + 'static { + &self, + conn: Self::Conn, + db: &str, + ) -> impl futures::Future< + Output = ConnResult<::Conn, ::Error>, + > + 'static { let (tx, rx) = tokio::sync::oneshot::channel(); self.async_ops.borrow_mut().insert(conn, tx); let msg = RustToPythonMessage::PerformReconnect(conn, db.to_owned()); @@ -139,12 +167,9 @@ fn internal_error(message: &str) -> PyErr { InternalError::new_err(()) } -async fn run_and_block( - max_capacity: usize, - rpc_pipe: RpcPipe, -) { +async fn run_and_block(max_capacity: usize, rpc_pipe: RpcPipe) { let rpc_pipe = Rc::new(rpc_pipe); - + let pool = Pool::new( PoolConfig::suggested_default_for(max_capacity), rpc_pipe.clone(), @@ -161,7 +186,8 @@ async fn run_and_block( }; loop { - let Some(rpc) = poll_fn(|cx| rpc_pipe.python_to_rust.borrow_mut().poll_recv(cx)).await else { + let Some(rpc) = poll_fn(|cx| rpc_pipe.python_to_rust.borrow_mut().poll_recv(cx)).await + else { info!("ConnPool shutting down"); pool_task.abort(); pool.shutdown().await; @@ -180,21 +206,28 @@ async fn run_and_block( }; let handle = conn.handle(); rpc_pipe.handles.borrow_mut().insert(conn_id, conn); - rpc_pipe.write(RustToPythonMessage::Acquired(conn_id, handle)).await; - }, + rpc_pipe + .write(RustToPythonMessage::Acquired(conn_id, handle)) + .await; + } Release(conn_id) => { rpc_pipe.handles.borrow_mut().remove(&conn_id); - }, + } Discard(conn_id) => { - rpc_pipe.handles.borrow_mut().remove(&conn_id).unwrap().poison(); - }, + rpc_pipe + .handles + .borrow_mut() + .remove(&conn_id) + .unwrap() + .poison(); + } Prune(conn_id, db) => { pool.drain_idle(&db).await; rpc_pipe.write(RustToPythonMessage::Pruned(conn_id)).await; } CompletedAsync(handle_id) => { rpc_pipe.async_ops.borrow_mut().remove(&handle_id); - }, + } } }); } @@ -211,7 +244,7 @@ impl ConnPool { let (txrp, rxrp) = std::sync::mpsc::channel(); let (txpr, rxpr) = tokio::sync::mpsc::unbounded_channel(); let (txfd, rxfd) = std::sync::mpsc::channel(); - + thread::spawn(move || { info!("Rust-side ConnPool thread booted"); let rt = tokio::runtime::Builder::new_current_thread() @@ -243,7 +276,7 @@ impl ConnPool { rust_to_python: rxrp, notify_fd, } - }) + }) } #[getter] @@ -252,44 +285,54 @@ impl ConnPool { } fn _acquire(&self, id: u64, db: &str) -> PyResult<()> { - self.python_to_rust.send(PythonToRustMessage::Acquire(id, db.to_owned())).map_err(|_| internal_error("In shutdown")) + self.python_to_rust + .send(PythonToRustMessage::Acquire(id, db.to_owned())) + .map_err(|_| internal_error("In shutdown")) } fn _release(&self, id: u64) -> PyResult<()> { - self.python_to_rust.send(PythonToRustMessage::Release(id)).map_err(|_| internal_error("In shutdown")) + self.python_to_rust + .send(PythonToRustMessage::Release(id)) + .map_err(|_| internal_error("In shutdown")) } fn _discard(&self, id: u64) -> PyResult<()> { - self.python_to_rust.send(PythonToRustMessage::Discard(id)).map_err(|_| internal_error("In shutdown")) + self.python_to_rust + .send(PythonToRustMessage::Discard(id)) + .map_err(|_| internal_error("In shutdown")) } fn _completed(&self, id: u64) -> PyResult<()> { - self.python_to_rust.send(PythonToRustMessage::CompletedAsync(id)).map_err(|_| internal_error("In shutdown")) + self.python_to_rust + .send(PythonToRustMessage::CompletedAsync(id)) + .map_err(|_| internal_error("In shutdown")) } fn _prune(&self, id: u64, db: &str) -> PyResult<()> { - self.python_to_rust.send(PythonToRustMessage::Prune(id, db.to_owned())).map_err(|_| internal_error("In shutdown")) + self.python_to_rust + .send(PythonToRustMessage::Prune(id, db.to_owned())) + .map_err(|_| internal_error("In shutdown")) } fn _read<'py>(&self, py: Python<'py>) -> Py { let Ok(msg) = self.rust_to_python.recv() else { return py.None(); }; - use RustToPythonMessage::*; - match msg { - Acquired(a, b) => (0, a, b).to_object(py), - PerformConnect(conn, s) => (1, conn, s).to_object(py), - PerformDisconnect(conn) => (2, conn).to_object(py), - PerformReconnect(conn, s) => (3, conn, s).to_object(py), - Pruned(conn) => (4, conn).to_object(py), - } + msg.to_object(py) + } + + fn _try_read<'py>(&self, py: Python<'py>) -> Py { + let Ok(msg) = self.rust_to_python.try_recv() else { + return py.None(); + }; + msg.to_object(py) } } /// Ensure that logging does not outlive the Python runtime. #[pyclass] struct LoggingGuard { - guard: DefaultGuard + guard: DefaultGuard, } #[pymethods] @@ -311,7 +354,11 @@ impl LoggingGuard { } impl tracing_subscriber::Layer for PythonSubscriber { - fn on_event(&self, event: &tracing::Event, _ctx: tracing_subscriber::layer::Context) { + fn on_event( + &self, + event: &tracing::Event, + _ctx: tracing_subscriber::layer::Context, + ) { let mut message = format!("[{}] ", event.metadata().target()); #[derive(Default)] struct Visitor(String); @@ -370,7 +417,7 @@ impl LoggingGuard { .set_default(); tracing::info!("ConnPool initialized (level = {level})"); - Ok(LoggingGuard{ guard }) + Ok(LoggingGuard { guard }) } } diff --git a/edb/server/connpool/pool2.py b/edb/server/connpool/pool2.py index b07b03b39346..372d8f7f5a43 100644 --- a/edb/server/connpool/pool2.py +++ b/edb/server/connpool/pool2.py @@ -79,6 +79,8 @@ class Pool(typing.Generic[C]): _prunes: dict[int, asyncio.Future[None]] _conns: dict[int, C] _conns_held: dict[C, int] + _loop: asyncio.AbstractEventLoop + _skip_reads: int def __init__(self, *, connect: Connector[C], disconnect: Disconnector[C], @@ -97,9 +99,10 @@ def __init__(self, *, connect: Connector[C], self._conns = {} self._conns_held = {} self._prunes = {} + self._skip_reads = 0 - loop = asyncio.get_running_loop() - self._task = loop.create_task(self._boot(loop)) + self._loop = asyncio.get_running_loop() + self._task = self._loop.create_task(self._boot(self._loop)) self._failed_connects = 0 self._failed_disconnects = 0 @@ -134,6 +137,9 @@ async def _boot(self, loop: asyncio.AbstractEventLoop) -> None: while len(await reader.read(1)) == 1: if not self._pool: break + if self._skip_reads > 0: + self._skip_reads -= 1 + continue msg = self._pool._read() if not msg: break @@ -148,6 +154,20 @@ async def _boot(self, loop: asyncio.AbstractEventLoop) -> None: elif msg[0] == 4: loop.create_task(self._perform_prune(msg[1])) + def _try_read(self) -> None: + while msg := self._pool._try_read(): + self._skip_reads += 1 + if msg[0] == 0: + self._acquires[msg[1]].set_result(msg[2]) + elif msg[0] == 1: + self._loop.create_task(self._perform_connect(msg[1], msg[2])) + elif msg[0] == 2: + self._loop.create_task(self._perform_disconnect(msg[1])) + elif msg[0] == 3: + self._loop.create_task(self._perform_reconnect(msg[1], msg[2])) + elif msg[0] == 4: + self._loop.create_task(self._perform_prune(msg[1])) + async def _perform_connect(self, id: int, db: str) -> None: self._cur_capacity += 1 self._conns[id] = await self._connect(db) @@ -181,6 +201,7 @@ async def acquire(self, dbname: str) -> C: self._next_conn_id += 1 self._acquires[id] = asyncio.Future() self._pool._acquire(id, dbname) + self._try_read() conn = await self._acquires[id] del self._acquires[id] c = self._conns[conn] @@ -196,6 +217,7 @@ def release(self, dbname: str, conn: C, discard: bool = False) -> None: self._pool._discard(id) else: self._pool._release(id) + self._try_read() async def prune_inactive_connections(self, dbname: str) -> None: id = self._next_conn_id