Skip to content

Commit

Permalink
Implement an explicit reconnection phase (#7584)
Browse files Browse the repository at this point in the history
We want to track reconnections for QoS and algorithm purposes -- we want to account for the time spend disconnecting and reconnecting when calculating the expense of switching connections.
  • Loading branch information
mmastrac authored Jul 23, 2024
1 parent 71ca4c9 commit 26ec907
Show file tree
Hide file tree
Showing 5 changed files with 70 additions and 42 deletions.
7 changes: 5 additions & 2 deletions edb/server/conn_pool/src/algo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -335,7 +335,10 @@ pub trait PoolAlgorithmDataBlock: PoolAlgorithmDataMetrics {
let waiting = self.count(MetricVariant::Waiting);
let waiters = waiting.saturating_sub(connecting);
let active_ms = self.avg_ms(MetricVariant::Active).max(MIN_TIME.get());
let connecting_ms = self.avg_ms(MetricVariant::Connecting).max(MIN_TIME.get());
let reconnecting_ms = self
.avg_ms(MetricVariant::Reconnecting)
.max(self.avg_ms(MetricVariant::Connecting) + self.avg_ms(MetricVariant::Disconnecting))
.max(MIN_TIME.get());
let youngest_ms = self.youngest_ms().max(MIN_TIME.get());

// If we have no idle connections, or we don't have enough connections we're not overfull.
Expand All @@ -348,7 +351,7 @@ pub trait PoolAlgorithmDataBlock: PoolAlgorithmDataMetrics {
// `OVERFULL_CHANGE_WEIGHT_DIVIDEND` by that to give an overfullness
// "negative" penalty to blocks that have newly acquired a connection.
let youngest_score =
((OVERFULL_CHANGE_WEIGHT_DIVIDEND.get() * connecting_ms) / youngest_ms) as isize;
((OVERFULL_CHANGE_WEIGHT_DIVIDEND.get() * reconnecting_ms) / youngest_ms) as isize;
// The number of waiters and the amount of time we expect to spend
// active on these waiters also acts as a "negative" penalty.
let waiter_score = (waiters * OVERFULL_WAITER_WEIGHT.get()
Expand Down
56 changes: 34 additions & 22 deletions edb/server/conn_pool/src/conn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,9 +136,9 @@ impl<C: Connector> Conn<C> {
ConnInner::Idle(_t, conn, ..) | ConnInner::Active(_t, conn, ..) => {
from.inc_all_time(MetricVariant::Disconnecting);
from.inc_all_time(MetricVariant::Closed);
to.insert(MetricVariant::Connecting);
to.insert(MetricVariant::Reconnecting);
let f = connector.reconnect(conn, db).boxed_local();
ConnInner::Connecting(Instant::now(), f)
ConnInner::Reconnecting(Instant::now(), f)
}
_ => unreachable!(),
});
Expand Down Expand Up @@ -168,30 +168,38 @@ impl<C: Connector> Conn<C> {
to: MetricVariant,
) -> Poll<ConnResult<()>> {
let mut lock = self.inner.borrow_mut();
debug_assert!(
to == MetricVariant::Active || to == MetricVariant::Idle || to == MetricVariant::Closed
);

let res = match &mut *lock {
ConnInner::Idle(..) => Ok(()),
ConnInner::Connecting(t, f) => match ready!(f.poll_unpin(cx)) {
Ok(c) => {
debug_assert!(to == MetricVariant::Active || to == MetricVariant::Idle);
metrics.transition(MetricVariant::Connecting, to, t.elapsed());
if to == MetricVariant::Active {
*lock = ConnInner::Active(Instant::now(), c);
} else {
*lock = ConnInner::Idle(Instant::now(), c);
ConnInner::Connecting(t, f) | ConnInner::Reconnecting(t, f) => {
match ready!(f.poll_unpin(cx)) {
Ok(c) => {
let elapsed = t.elapsed();
let from = (&std::mem::replace(&mut *lock, ConnInner::Transition)).into();
metrics.transition(from, to, elapsed);
if to == MetricVariant::Active {
*lock = ConnInner::Active(Instant::now(), c);
} else if to == MetricVariant::Idle {
*lock = ConnInner::Idle(Instant::now(), c);
} else {
unreachable!()
}
Ok(())
}
Err(err) => {
metrics.transition(
MetricVariant::Connecting,
MetricVariant::Failed,
t.elapsed(),
);
*lock = ConnInner::Failed;
Err(err)
}
Ok(())
}
Err(err) => {
metrics.transition(
MetricVariant::Connecting,
MetricVariant::Failed,
t.elapsed(),
);
*lock = ConnInner::Failed;
Err(err)
}
},
}
ConnInner::Disconnecting(t, f) => match ready!(f.poll_unpin(cx)) {
Ok(_) => {
debug_assert_eq!(to, MetricVariant::Closed);
Expand Down Expand Up @@ -239,7 +247,8 @@ impl<C: Connector> Conn<C> {
ConnInner::Active(t, _)
| ConnInner::Idle(t, _)
| ConnInner::Connecting(t, _)
| ConnInner::Disconnecting(t, _) => metrics.remove_time(self.variant(), t.elapsed()),
| ConnInner::Disconnecting(t, _)
| ConnInner::Reconnecting(t, _) => metrics.remove_time(self.variant(), t.elapsed()),
other => metrics.remove(other.into()),
}
}
Expand All @@ -257,6 +266,8 @@ enum ConnInner<C: Connector> {
Connecting(Instant, Pin<Box<dyn Future<Output = ConnResult<C::Conn>>>>),
/// Disconnecting connections hold a spot in the pool as they count towards quotas
Disconnecting(Instant, Pin<Box<dyn Future<Output = ConnResult<()>>>>),
/// Reconnecting hold a spot in the pool as they count towards quotas
Reconnecting(Instant, Pin<Box<dyn Future<Output = ConnResult<C::Conn>>>>),
/// The connection is alive, but it is not being held.
Idle(Instant, C::Conn),
/// The connection is alive, and is being held.
Expand All @@ -275,6 +286,7 @@ impl<C: Connector> From<&ConnInner<C>> for MetricVariant {
match val {
ConnInner::Connecting(..) => MetricVariant::Connecting,
ConnInner::Disconnecting(..) => MetricVariant::Disconnecting,
ConnInner::Reconnecting(..) => MetricVariant::Reconnecting,
ConnInner::Idle(..) => MetricVariant::Idle,
ConnInner::Active(..) => MetricVariant::Active,
ConnInner::Failed => MetricVariant::Failed,
Expand Down
1 change: 1 addition & 0 deletions edb/server/conn_pool/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use crate::block::Name;
pub enum MetricVariant {
Connecting,
Disconnecting,
Reconnecting,
Idle,
Active,
Failed,
Expand Down
4 changes: 2 additions & 2 deletions edb/server/conn_pool/src/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -298,9 +298,9 @@ impl<C: Connector> Pool<C> {
if cfg!(debug_assertions) {
let all_time = &pool.metrics().all_time;
assert_eq!(
all_time[MetricVariant::Connecting],
all_time[MetricVariant::Connecting] + all_time[MetricVariant::Reconnecting],
all_time[MetricVariant::Disconnecting],
"Connecting != Disconnecting"
"Connecting + Reconnecting != Disconnecting"
);
assert_eq!(
all_time[MetricVariant::Disconnecting],
Expand Down
44 changes: 28 additions & 16 deletions edb/server/conn_pool/src/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ impl Spec {
pub struct Scored {
pub description: String,
#[debug(skip)]
pub detailed_calculation: Box<dyn Fn(usize) -> String>,
pub detailed_calculation: Box<dyn Fn(usize) -> String + Send + Sync>,
pub raw_value: f64,
}

Expand Down Expand Up @@ -219,6 +219,7 @@ impl std::fmt::Debug for SuiteQoS {
}
s.field("qos", &self.qos());
s.field("qos_rms", &self.qos_rms_error());
s.field("qos_min", &self.qos_min());
s.finish()
}
}
Expand Down Expand Up @@ -252,6 +253,20 @@ impl SuiteQoS {
total
}
}

/// Return the root-mean-square error QoS. The error between the QoS and 100
/// is squared, averaged, and we subtract that from 100 for a final score.
pub fn qos_min(&self) -> f64 {
let mut min: f64 = 100.0;
for qos in self.values() {
min = min.min(qos.qos);
}
if !min.is_normal() || min < 0.0 {
0.0
} else {
min
}
}
}

pub trait ScoringMethod {
Expand All @@ -264,18 +279,22 @@ pub struct Score {
pub v60: f64,
pub v0: f64,
pub weight: f64,
pub method: Box<dyn ScoringMethod>,
pub method: Box<dyn ScoringMethod + Send + Sync + 'static>,
}

impl Score {
pub fn new(weight: f64, scores: [f64; 4], method: impl ScoringMethod + 'static) -> Self {
pub fn new(
weight: f64,
scores: [f64; 4],
method: impl ScoringMethod + Send + Sync + 'static,
) -> Self {
Self {
weight,
v0: scores[0],
v60: scores[1],
v90: scores[2],
v100: scores[3],
method: method.into(),
method: Box::new(method),
}
}

Expand Down Expand Up @@ -369,20 +388,13 @@ impl<T: ScoringMethod + 'static> From<T> for Box<dyn ScoringMethod> {
pub struct ConnectionOverhead {}

impl ScoringMethod for ConnectionOverhead {
fn score(&self, latencies: &Latencies, metrics: &PoolMetrics, config: &PoolConfig) -> Scored {
let disconnects = metrics.all_time[MetricVariant::Disconnecting];
// Calculate the GC
let max = config.constraints.max;
let total = metrics.pool.total;
let gc = max - total;
let disconnects_adj = disconnects.saturating_sub(gc);
fn score(&self, latencies: &Latencies, metrics: &PoolMetrics, _config: &PoolConfig) -> Scored {
let reconnects = metrics.all_time[MetricVariant::Reconnecting];
let count = latencies.len();
let raw_value = disconnects_adj as f64 / count as f64;
let raw_value = reconnects as f64 / count as f64;
Scored {
description: "Num of disconnects/query".to_owned(),
detailed_calculation: Box::new(move |_precision| {
format!("({disconnects}-({max}-{total}))/{count}")
}),
description: "Num of re-connects/query".to_owned(),
detailed_calculation: Box::new(move |_precision| format!("{reconnects}/{count}")),
raw_value,
}
}
Expand Down

0 comments on commit 26ec907

Please sign in to comment.