Skip to content

Commit

Permalink
Final pass
Browse files Browse the repository at this point in the history
  • Loading branch information
mmastrac committed Jul 23, 2024
1 parent 4df9331 commit 138528c
Show file tree
Hide file tree
Showing 5 changed files with 217 additions and 204 deletions.
14 changes: 14 additions & 0 deletions edb/server/conn_pool/src/algo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -535,6 +535,20 @@ impl PoolConstraints {
);
}

/// Plan a shutdown.
pub fn plan_shutdown(&self, it: &impl VisitPoolAlgoData) -> Vec<RebalanceOp> {
let mut ops = vec![];
it.with_all(|name, block| {
let idle = block.count(MetricVariant::Idle);
let failed = block.count(MetricVariant::Failed);

for _ in 0..(idle + failed) {
ops.push(RebalanceOp::Close(name.clone()));
}
});
ops
}

/// Plan a rebalance to better match the target quotas of the blocks in the
/// pool.
pub fn plan_rebalance(&self, it: &impl VisitPoolAlgoData) -> Vec<RebalanceOp> {
Expand Down
147 changes: 43 additions & 104 deletions edb/server/conn_pool/src/block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,38 +141,12 @@ impl<C: Connector, D: Default> Block<C, D> {
pub fn check_consistency(&self) {
if cfg!(debug_assertions) {
// These should never be non-zero during the consistency check
assert_eq!(
self.state.metrics.count(MetricVariant::Failed),
0,
"Should not have any failed connections"
);
assert_eq!(
self.state.metrics.count(MetricVariant::Closed),
0,
"Should not have any closed connections"
);

// We should never have more pending errors than the number of
// waiters - the number of "live" connections that can eventually
// provide a connection (or potentially another error)
let pending_errors = self.conns.pending_error_len();
let waiting = self.state.metrics.count(MetricVariant::Waiting);
let live = self.state.metrics.sum_all(MetricVariant::live());
if self.conns.is_empty() && waiting == 0 {
assert_eq!(
pending_errors, 0,
"Pending errors must be empty if there are no connections and no errors"
)
}
assert!(
// Note the inequality is changed to avoid usize underflow
pending_errors == 0 || (pending_errors + live <= waiting),
"Pending errors must be less than or equal to number of waiters - live: {} > {} - {} ({:?})",
pending_errors,
waiting,
live,
self.state.metrics
);
assert_eq!(
self.len(),
self.conns.len(),
Expand Down Expand Up @@ -214,8 +188,7 @@ impl<C: Connector, D: Default> Block<C, D> {
if res.is_ok() {
Ok(self.conn(conn))
} else {
self.conns.remove(&conn);
conn.untrack(&self.state.metrics);
let res = self.conns.remove(conn, &self.state.metrics);
Err(res.err().unwrap())
}
}
Expand All @@ -227,6 +200,8 @@ impl<C: Connector, D: Default> Block<C, D> {
self: Rc<Self>,
connector: &C,
) -> impl Future<Output = ConnResult<ConnHandle<C>, C::Error>> {
use futures::future::Either;
use std::future::ready;
if let Some(res) = self.conns.try_acquire_idle_mru(&self.state.metrics) {
return Either::Left(ready(res.map(|c| self.conn(c))));
}
Expand All @@ -235,13 +210,10 @@ impl<C: Connector, D: Default> Block<C, D> {

/// Awaits a connection from this block.
fn queue(self: Rc<Self>) -> impl Future<Output = ConnResult<ConnHandle<C>, C::Error>> {
// If someone else is waiting, we have to queue, even if there's a connection
if self.state.waiters.is_empty() {
consistency_check!(self);
if let Some(res) = self.conns.try_acquire_idle_mru(&self.state.metrics) {
return Either::Left(ready(res.map(|c| self.conn(c))));
}
}
// Note that we cannot skip the waiting queue because this makes it
// difficult to test and manage metrics -- if we skipped the queue then
// there would never be an apparent waiter.

// Update the metrics now before we actually queue
self.state.waiters.lock();
self.state.metrics.insert(MetricVariant::Waiting);
Expand All @@ -253,24 +225,17 @@ impl<C: Connector, D: Default> Block<C, D> {
.metrics
.remove_time(MetricVariant::Waiting, now.elapsed());
});
Either::Right(async move {
async move {
loop {
consistency_check!(self);
if let Some(res) = self.conns.try_acquire_idle_mru(&self.state.metrics) {
// Important note: we _must_ pop an error here because every
// error must have a corresponding waiter, and if there is
// one, the waiter that we're satisfying was supporting the
// existance of an error. This does not happen in the "fast
// case" if we can acquire a connection without waiting.
self.conns.pop_error();

drop(guard);
return res.map(|c| self.conn(c));
}
trace!("Queueing for a connection");
self.state.waiters.queue().await;
}
})
}
}

/// Creates a connection from this block.
Expand Down Expand Up @@ -300,12 +265,18 @@ impl<C: Connector, D: Default> Block<C, D> {
consistency_check!(self);
let conn = self
.conns
.try_get_idle_lru()
.try_get_idle(&self.state.metrics, false)
.expect("Could not acquire a connection");
let conn = match conn {
Ok(conn) => conn,
Err(err) => {
return Either::Left(ready(Err(err)));
}
};
conn.close(connector, &self.state.metrics);
conn
};
self.poll_to_close(conn)
Either::Right(self.poll_to_close(conn))
}

/// Steals a connection from one block to another.
Expand All @@ -324,15 +295,10 @@ impl<C: Connector, D: Default> Block<C, D> {

let conn = from
.conns
.try_take_idle_lru()
.try_take_idle_lru(&from.state.metrics)
.expect("Could not acquire a connection");
let conn = Conn::new(connector.reconnect(conn, &to.db_name), &to.state.metrics);
to.conns.insert(conn.clone());
conn.transfer(
connector,
&from.state.metrics,
&to.state.metrics,
&to.db_name,
);
conn
};
to.poll_to_idle(conn)
Expand All @@ -351,14 +317,13 @@ impl<C: Connector, D: Default> Block<C, D> {
consistency_check!(to);

let conn = conn.into_inner();
from.conns.remove(&conn);
let conn = from
.conns
.remove(conn, &from.state.metrics)
.unwrap()
.unwrap();
let conn = Conn::new(connector.reconnect(conn, &to.db_name), &to.state.metrics);
to.conns.insert(conn.clone());
conn.transfer(
connector,
&from.state.metrics,
&to.state.metrics,
&to.db_name,
);
conn
};
to.poll_to_idle(conn)
Expand Down Expand Up @@ -400,46 +365,19 @@ impl<C: Connector, D: Default> Block<C, D> {
// as there will never be a waiter who will pick it up
let res =
poll_fn(|cx| conn.poll_ready(cx, &self.state.metrics, MetricVariant::Closed)).await;
self.conns.remove(&conn);
conn.untrack(&self.state.metrics);
res
let res2 = self.conns.remove(conn, &self.state.metrics);
debug_assert_eq!(res.is_ok(), res2.is_ok());
let conn = res2?;
debug_assert!(conn.is_none());
Ok(())
}

/// Once a connection has completed the `Connecting` phase, we will route
/// the results to a waiter, assuming there is one.
async fn poll_to_idle(self: Rc<Self>, conn: Conn<C>) -> ConnResult<(), C::Error> {
let res = poll_fn(|cx| conn.poll_ready(cx, &self.state.metrics, MetricVariant::Idle)).await;
{
let waiters = self.state.waiters.len();
let live = self.state.metrics.sum_all(MetricVariant::live());

if let Err(err) = &res {
self.conns.remove(&conn);
conn.untrack(&self.state.metrics);

// If we have an error, we need to decide whether to keep it or not.
// The formula we use is `pending_errors < waiters - live`. We will
// only wake waiters if there are no live connections left.

let mut pending_errors = self.conns.pending_error_len();

if !self.state.waiters.is_empty() {
if pending_errors + live < waiters {
self.conns.push_error(&err);
pending_errors += 1
}
}

if live == 0 {
for _ in 0..waiters.min(pending_errors) {
self.state.waiters.trigger();
}
}
} else {
// No error, wake a waiter (if we have one). Trim the
self.conns.trim_errors_to(waiters.saturating_sub(live));
self.state.waiters.trigger();
}
res
}
self.state.waiters.trigger();
res.map_err(|_| ConnError::TaskFailed)
}
}

Expand Down Expand Up @@ -601,7 +539,8 @@ impl<C: Connector, D: Default> Blocks<C, D> {
assert_eq!(
total,
self.metrics.total(),
"Blocks failed consistency check. Total connection count was wrong."
"Blocks failed consistency check. Total connection count was wrong. {:?}",
self.metrics
);
}
}
Expand Down Expand Up @@ -978,9 +917,9 @@ mod tests {
.await
.expect_err("Expected to fail");
assert_block!(blocks "db" has 1 Idle);
assert_block!(blocks "db2" is empty);
assert_block!(blocks "db2" has 1 Failed);
let queue = blocks.queue("db2");
assert_block!(blocks "db2" has 1 Waiting);
assert_block!(blocks "db2" has 1 Waiting, 1 Failed);
connector.fail_next_connect();
blocks
.task_steal(&connector, "db2", "db")
Expand Down Expand Up @@ -1009,7 +948,7 @@ mod tests {
assert_block!(blocks "db" has 2 Idle);
assert_block!(blocks "db2" has 1 Idle);
// Should not activate a connection to move it
assert_block!(blocks "db" has max 1 Active, 3 Idle);
assert_block!(blocks "db" has max 1 Active, 3 Idle, 1 Waiting);
assert_block!(blocks "db2" has max 0 Active, 1 Connecting, 1 Idle);
Ok(())
}
Expand Down Expand Up @@ -1037,7 +976,7 @@ mod tests {
queue.await?;
assert_eq!(2, blocks.block_count());
assert_block!(blocks "db" has 1 Idle);
assert_block!(blocks "db2" has 1 Idle);
assert_block!(blocks "db2" has 1 Idle, 1 Failed);
assert_block!(blocks "db" has max 2 Active, 1 Idle);
assert_block!(blocks "db2" has max 1 Active, 1 Connecting, 1 Idle, 1 Failed, 1 Waiting);
Ok(())
Expand Down Expand Up @@ -1081,7 +1020,7 @@ mod tests {
}

#[test(tokio::test)]
async fn test_reopen_error() -> Result<()> {
async fn test_reopen_fails() -> Result<()> {
let connector = BasicConnector::no_delay();
let blocks = Blocks::<_, ()>::default();
assert_eq!(0, blocks.block_count());
Expand Down Expand Up @@ -1114,7 +1053,7 @@ mod tests {
}

#[test(tokio::test)]
async fn test_reopen_fails() -> Result<()> {
async fn test_reopen_fails_2() -> Result<()> {
let connector = BasicConnector::no_delay();
let blocks = Blocks::<_, ()>::default();
assert_eq!(0, blocks.block_count());
Expand All @@ -1125,7 +1064,7 @@ mod tests {
.task_reopen(&connector, conn)
.await
.expect_err("Expected a failure");
assert_block!(blocks "db" is empty);
assert_block!(blocks "db" has 1 Failed);
assert_block!(blocks "db" has all time 2 Connecting, 1 Disconnecting, 1 Failed, 1 Active, 1 Closed);
Ok(())
}
Expand Down
Loading

0 comments on commit 138528c

Please sign in to comment.