From 65298e89f7712c3bacb569f6c0d56b35129323ab Mon Sep 17 00:00:00 2001 From: Matt Mastracci Date: Fri, 19 Jul 2024 12:59:47 -0600 Subject: [PATCH] Implement an explicit reconnection phase --- edb/server/conn_pool/src/algo.rs | 7 ++-- edb/server/conn_pool/src/conn.rs | 56 +++++++++++++++++------------ edb/server/conn_pool/src/metrics.rs | 1 + edb/server/conn_pool/src/pool.rs | 4 +-- edb/server/conn_pool/src/test.rs | 44 ++++++++++++++--------- 5 files changed, 70 insertions(+), 42 deletions(-) diff --git a/edb/server/conn_pool/src/algo.rs b/edb/server/conn_pool/src/algo.rs index 61be7e07331..a06a5d31022 100644 --- a/edb/server/conn_pool/src/algo.rs +++ b/edb/server/conn_pool/src/algo.rs @@ -335,7 +335,10 @@ pub trait PoolAlgorithmDataBlock: PoolAlgorithmDataMetrics { let waiting = self.count(MetricVariant::Waiting); let waiters = waiting.saturating_sub(connecting); let active_ms = self.avg_ms(MetricVariant::Active).max(MIN_TIME.get()); - let connecting_ms = self.avg_ms(MetricVariant::Connecting).max(MIN_TIME.get()); + let reconnecting_ms = self + .avg_ms(MetricVariant::Reconnecting) + .max(self.avg_ms(MetricVariant::Connecting) + self.avg_ms(MetricVariant::Disconnecting)) + .max(MIN_TIME.get()); let youngest_ms = self.youngest_ms().max(MIN_TIME.get()); // If we have no idle connections, or we don't have enough connections we're not overfull. @@ -348,7 +351,7 @@ pub trait PoolAlgorithmDataBlock: PoolAlgorithmDataMetrics { // `OVERFULL_CHANGE_WEIGHT_DIVIDEND` by that to give an overfullness // "negative" penalty to blocks that have newly acquired a connection. let youngest_score = - ((OVERFULL_CHANGE_WEIGHT_DIVIDEND.get() * connecting_ms) / youngest_ms) as isize; + ((OVERFULL_CHANGE_WEIGHT_DIVIDEND.get() * reconnecting_ms) / youngest_ms) as isize; // The number of waiters and the amount of time we expect to spend // active on these waiters also acts as a "negative" penalty. let waiter_score = (waiters * OVERFULL_WAITER_WEIGHT.get() diff --git a/edb/server/conn_pool/src/conn.rs b/edb/server/conn_pool/src/conn.rs index b2459581ec5..3834553a3bf 100644 --- a/edb/server/conn_pool/src/conn.rs +++ b/edb/server/conn_pool/src/conn.rs @@ -135,9 +135,9 @@ impl Conn { ConnInner::Idle(_t, conn, ..) | ConnInner::Active(_t, conn, ..) => { from.inc_all_time(MetricVariant::Disconnecting); from.inc_all_time(MetricVariant::Closed); - to.insert(MetricVariant::Connecting); + to.insert(MetricVariant::Reconnecting); let f = connector.reconnect(conn, db).boxed_local(); - ConnInner::Connecting(Instant::now(), f) + ConnInner::Reconnecting(Instant::now(), f) } _ => unreachable!(), }); @@ -167,30 +167,38 @@ impl Conn { to: MetricVariant, ) -> Poll> { let mut lock = self.inner.borrow_mut(); + debug_assert!( + to == MetricVariant::Active || to == MetricVariant::Idle || to == MetricVariant::Closed + ); let res = match &mut *lock { ConnInner::Idle(..) => Ok(()), - ConnInner::Connecting(t, f) => match ready!(f.poll_unpin(cx)) { - Ok(c) => { - debug_assert!(to == MetricVariant::Active || to == MetricVariant::Idle); - metrics.transition(MetricVariant::Connecting, to, t.elapsed()); - if to == MetricVariant::Active { - *lock = ConnInner::Active(Instant::now(), c); - } else { - *lock = ConnInner::Idle(Instant::now(), c); + ConnInner::Connecting(t, f) | ConnInner::Reconnecting(t, f) => { + match ready!(f.poll_unpin(cx)) { + Ok(c) => { + let elapsed = t.elapsed(); + let from = (&std::mem::replace(&mut *lock, ConnInner::Transition)).into(); + metrics.transition(from, to, elapsed); + if to == MetricVariant::Active { + *lock = ConnInner::Active(Instant::now(), c); + } else if to == MetricVariant::Idle { + *lock = ConnInner::Idle(Instant::now(), c); + } else { + unreachable!() + } + Ok(()) + } + Err(err) => { + metrics.transition( + MetricVariant::Connecting, + MetricVariant::Failed, + t.elapsed(), + ); + *lock = ConnInner::Failed; + Err(err) } - Ok(()) - } - Err(err) => { - metrics.transition( - MetricVariant::Connecting, - MetricVariant::Failed, - t.elapsed(), - ); - *lock = ConnInner::Failed; - Err(err) } - }, + } ConnInner::Disconnecting(t, f) => match ready!(f.poll_unpin(cx)) { Ok(_) => { debug_assert_eq!(to, MetricVariant::Closed); @@ -238,7 +246,8 @@ impl Conn { ConnInner::Active(t, _) | ConnInner::Idle(t, _) | ConnInner::Connecting(t, _) - | ConnInner::Disconnecting(t, _) => metrics.remove_time(self.variant(), t.elapsed()), + | ConnInner::Disconnecting(t, _) + | ConnInner::Reconnecting(t, _) => metrics.remove_time(self.variant(), t.elapsed()), other => metrics.remove(other.into()), } } @@ -256,6 +265,8 @@ enum ConnInner { Connecting(Instant, Pin>>>), /// Disconnecting connections hold a spot in the pool as they count towards quotas Disconnecting(Instant, Pin>>>), + /// Reconnecting hold a spot in the pool as they count towards quotas + Reconnecting(Instant, Pin>>>), /// The connection is alive, but it is not being held. Idle(Instant, C::Conn), /// The connection is alive, and is being held. @@ -274,6 +285,7 @@ impl From<&ConnInner> for MetricVariant { match val { ConnInner::Connecting(..) => MetricVariant::Connecting, ConnInner::Disconnecting(..) => MetricVariant::Disconnecting, + ConnInner::Reconnecting(..) => MetricVariant::Reconnecting, ConnInner::Idle(..) => MetricVariant::Idle, ConnInner::Active(..) => MetricVariant::Active, ConnInner::Failed => MetricVariant::Failed, diff --git a/edb/server/conn_pool/src/metrics.rs b/edb/server/conn_pool/src/metrics.rs index bd82a33f8d2..1d0c8bfff5e 100644 --- a/edb/server/conn_pool/src/metrics.rs +++ b/edb/server/conn_pool/src/metrics.rs @@ -12,6 +12,7 @@ use crate::block::Name; pub enum MetricVariant { Connecting, Disconnecting, + Reconnecting, Idle, Active, Failed, diff --git a/edb/server/conn_pool/src/pool.rs b/edb/server/conn_pool/src/pool.rs index 63729898246..4472c0940a0 100644 --- a/edb/server/conn_pool/src/pool.rs +++ b/edb/server/conn_pool/src/pool.rs @@ -298,9 +298,9 @@ impl Pool { if cfg!(debug_assertions) { let all_time = &pool.metrics().all_time; assert_eq!( - all_time[MetricVariant::Connecting], + all_time[MetricVariant::Connecting] + all_time[MetricVariant::Reconnecting], all_time[MetricVariant::Disconnecting], - "Connecting != Disconnecting" + "Connecting + Reconnecting != Disconnecting" ); assert_eq!( all_time[MetricVariant::Disconnecting], diff --git a/edb/server/conn_pool/src/test.rs b/edb/server/conn_pool/src/test.rs index 1e8069e41e5..cbfe46d0159 100644 --- a/edb/server/conn_pool/src/test.rs +++ b/edb/server/conn_pool/src/test.rs @@ -191,7 +191,7 @@ impl Spec { pub struct Scored { pub description: String, #[debug(skip)] - pub detailed_calculation: Box String>, + pub detailed_calculation: Box String + Send + Sync>, pub raw_value: f64, } @@ -219,6 +219,7 @@ impl std::fmt::Debug for SuiteQoS { } s.field("qos", &self.qos()); s.field("qos_rms", &self.qos_rms_error()); + s.field("qos_min", &self.qos_min()); s.finish() } } @@ -252,6 +253,20 @@ impl SuiteQoS { total } } + + /// Return the root-mean-square error QoS. The error between the QoS and 100 + /// is squared, averaged, and we subtract that from 100 for a final score. + pub fn qos_min(&self) -> f64 { + let mut min: f64 = 100.0; + for qos in self.values() { + min = min.min(qos.qos); + } + if !min.is_normal() || min < 0.0 { + 0.0 + } else { + min + } + } } pub trait ScoringMethod { @@ -264,18 +279,22 @@ pub struct Score { pub v60: f64, pub v0: f64, pub weight: f64, - pub method: Box, + pub method: Box, } impl Score { - pub fn new(weight: f64, scores: [f64; 4], method: impl ScoringMethod + 'static) -> Self { + pub fn new( + weight: f64, + scores: [f64; 4], + method: impl ScoringMethod + Send + Sync + 'static, + ) -> Self { Self { weight, v0: scores[0], v60: scores[1], v90: scores[2], v100: scores[3], - method: method.into(), + method: Box::new(method), } } @@ -369,20 +388,13 @@ impl From for Box { pub struct ConnectionOverhead {} impl ScoringMethod for ConnectionOverhead { - fn score(&self, latencies: &Latencies, metrics: &PoolMetrics, config: &PoolConfig) -> Scored { - let disconnects = metrics.all_time[MetricVariant::Disconnecting]; - // Calculate the GC - let max = config.constraints.max; - let total = metrics.pool.total; - let gc = max - total; - let disconnects_adj = disconnects.saturating_sub(gc); + fn score(&self, latencies: &Latencies, metrics: &PoolMetrics, _config: &PoolConfig) -> Scored { + let reconnects = metrics.all_time[MetricVariant::Reconnecting]; let count = latencies.len(); - let raw_value = disconnects_adj as f64 / count as f64; + let raw_value = reconnects as f64 / count as f64; Scored { - description: "Num of disconnects/query".to_owned(), - detailed_calculation: Box::new(move |_precision| { - format!("({disconnects}-({max}-{total}))/{count}") - }), + description: "Num of re-connects/query".to_owned(), + detailed_calculation: Box::new(move |_precision| format!("{reconnects}/{count}")), raw_value, } }