From 01411329e41f1f4986adecc26b598a7af8ac8321 Mon Sep 17 00:00:00 2001 From: Matt Mastracci Date: Tue, 23 Jul 2024 12:46:22 -0600 Subject: [PATCH] Cleanup --- edb/server/conn_pool/src/algo.rs | 17 +++++++++--- edb/server/conn_pool/src/conn.rs | 46 +++++++++++++++----------------- edb/server/conn_pool/src/pool.rs | 16 ++--------- 3 files changed, 37 insertions(+), 42 deletions(-) diff --git a/edb/server/conn_pool/src/algo.rs b/edb/server/conn_pool/src/algo.rs index ba99311dc59..213a060ff69 100644 --- a/edb/server/conn_pool/src/algo.rs +++ b/edb/server/conn_pool/src/algo.rs @@ -199,6 +199,13 @@ pub enum RebalanceOp { Close(Name), } +/// Determines the shutdown plan based on the current pool state. +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum ShutdownOp { + /// Garbage collect a block. + Close(Name), +} + /// Determines the acquire plan based on the current pool state. #[derive(Debug, Clone, PartialEq, Eq)] pub enum AcquireOp { @@ -288,7 +295,8 @@ pub trait PoolAlgorithmDataBlock: PoolAlgorithmDataMetrics { /// and there are waiting elements; otherwise, returns `None`. fn hunger_score(&self, will_release: bool) -> Option { let waiting = self.count(MetricVariant::Waiting); - let connecting = self.count(MetricVariant::Connecting); + let connecting = + self.count(MetricVariant::Connecting) + self.count(MetricVariant::Reconnecting); let waiters = waiting.saturating_sub(connecting); let current = self.total() - if will_release { 1 } else { 0 }; let target = self.target(); @@ -331,7 +339,8 @@ pub trait PoolAlgorithmDataBlock: PoolAlgorithmDataMetrics { let idle = self.count(MetricVariant::Idle) + if will_release { 1 } else { 0 }; let current = self.total(); let target = self.target(); - let connecting = self.count(MetricVariant::Connecting); + let connecting = + self.count(MetricVariant::Connecting) + self.count(MetricVariant::Reconnecting); let waiting = self.count(MetricVariant::Waiting); let waiters = waiting.saturating_sub(connecting); let active_ms = self.avg_ms(MetricVariant::Active).max(MIN_TIME.get()); @@ -539,14 +548,14 @@ impl PoolConstraints { } /// Plan a shutdown. - pub fn plan_shutdown(&self, it: &impl VisitPoolAlgoData) -> Vec { + pub fn plan_shutdown(&self, it: &impl VisitPoolAlgoData) -> Vec { let mut ops = vec![]; it.with_all(|name, block| { let idle = block.count(MetricVariant::Idle); let failed = block.count(MetricVariant::Failed); for _ in 0..(idle + failed) { - ops.push(RebalanceOp::Close(name.clone())); + ops.push(ShutdownOp::Close(name.clone())); } }); ops diff --git a/edb/server/conn_pool/src/conn.rs b/edb/server/conn_pool/src/conn.rs index c115a1cbd4a..66b258639e1 100644 --- a/edb/server/conn_pool/src/conn.rs +++ b/edb/server/conn_pool/src/conn.rs @@ -190,31 +190,29 @@ impl Conn { ); let res = match &mut *lock { - ConnInner::Connecting(t, f) | ConnInner::Reconnecting(t, f) => match ready!(f.poll_unpin(cx)) { - Ok(c) => { - let elapsed = t.elapsed(); - debug_assert!(to == MetricVariant::Active || to == MetricVariant::Idle); - let from = (&std::mem::replace(&mut *lock, ConnInner::Transition)).into(); - metrics.transition(from, to, elapsed); - if to == MetricVariant::Active { - *lock = ConnInner::Active(Instant::now(), c); - } else { - *lock = ConnInner::Idle(Instant::now(), c); + ConnInner::Connecting(t, f) | ConnInner::Reconnecting(t, f) => { + match ready!(f.poll_unpin(cx)) { + Ok(c) => { + let elapsed = t.elapsed(); + debug_assert!(to == MetricVariant::Active || to == MetricVariant::Idle); + let from = (&std::mem::replace(&mut *lock, ConnInner::Transition)).into(); + metrics.transition(from, to, elapsed); + if to == MetricVariant::Active { + *lock = ConnInner::Active(Instant::now(), c); + } else { + *lock = ConnInner::Idle(Instant::now(), c); + } + Ok(()) + } + Err(err) => { + let elapsed = t.elapsed(); + let from = (&std::mem::replace(&mut *lock, ConnInner::Transition)).into(); + metrics.transition(from, MetricVariant::Failed, elapsed); + *lock = ConnInner::Failed(err); + Err(ConnError::TaskFailed) } - Ok(()) - } - Err(err) => { - let elapsed = t.elapsed(); - let from = (&std::mem::replace(&mut *lock, ConnInner::Transition)).into(); - metrics.transition( - from, - MetricVariant::Failed, - elapsed, - ); - *lock = ConnInner::Failed(err); - Err(ConnError::TaskFailed) } - }, + } ConnInner::Disconnecting(t, f) => match ready!(f.poll_unpin(cx)) { Ok(_) => { debug_assert_eq!(to, MetricVariant::Closed); @@ -267,7 +265,7 @@ impl Conn { ConnInner::Closed | ConnInner::Failed(..) => { metrics.remove(self.variant()); } - _ => unreachable!() + _ => unreachable!(), } } } diff --git a/edb/server/conn_pool/src/pool.rs b/edb/server/conn_pool/src/pool.rs index d9d5be92584..2af91a3c471 100644 --- a/edb/server/conn_pool/src/pool.rs +++ b/edb/server/conn_pool/src/pool.rs @@ -1,7 +1,7 @@ use crate::{ algo::{ AcquireOp, PoolAlgoTargetData, PoolAlgorithmDataBlock, PoolAlgorithmDataMetrics, - PoolConstraints, RebalanceOp, ReleaseOp, ReleaseType, VisitPoolAlgoData, + PoolConstraints, RebalanceOp, ReleaseOp, ReleaseType, ShutdownOp, VisitPoolAlgoData, }, block::{Blocks, Name}, conn::{ConnError, ConnHandle, ConnResult, Connector}, @@ -158,19 +158,7 @@ impl Pool { for op in self.config.constraints.plan_shutdown(&self.blocks) { trace!("Shutdown: {op:?}"); match op { - RebalanceOp::Transfer { from, to } => { - tokio::task::spawn_local(self.blocks.task_steal( - &self.connector, - &to, - &from, - )); - } - RebalanceOp::Create(name) => { - tokio::task::spawn_local( - self.blocks.task_create_one(&self.connector, &name), - ); - } - RebalanceOp::Close(name) => { + ShutdownOp::Close(name) => { tokio::task::spawn_local( self.blocks.task_close_one(&self.connector, &name), );