Skip to content

Commit

Permalink
Finalize
Browse files Browse the repository at this point in the history
  • Loading branch information
mmastrac committed Jul 22, 2024
1 parent 13f2e9f commit 639971a
Show file tree
Hide file tree
Showing 5 changed files with 241 additions and 96 deletions.
18 changes: 18 additions & 0 deletions edb/server/conn_pool/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -110,3 +110,21 @@ more connections are waiting on this block, we return the connection to the
block to be re-used immediately. If no connections are waiting but the block is
hungry, we return it. If the block is satisfied or overfull and we have hungry
blocks waiting, we transfer it to a hungry block that has waiters.

## Error Handling

The pool will attempt to provide a connection where possible, but connection
operations may not always be reliable. The error for a connection failure will
be routed through the acquire operation if the pool detects there are no other
potential sources for a connection for the acquire. Sources for a connection may
be a currently-connecting connection, a reconnecting connection, a connection
that is actively held by someone else or a connection that is sitting idle.

If an acquire operation is queued for a connection, and a block's last
connection becomes invalid due to an error, and the operation is next in line,
that operation will fail, returning the error as the failure reason. The
remainder of the connections that are queued will continue to await a connection
and, if the error is persistent, will eventually return errors as well.

The pool does not currently retry, and retry logic should be included in the
connect operation.
214 changes: 141 additions & 73 deletions edb/server/conn_pool/src/block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,6 @@ impl std::borrow::Borrow<str> for Name {
pub struct Block<C: Connector, D: Default = ()> {
pub db_name: Name,
conns: Conns<C>,
pending_errors: RefCell<Vec<ConnError<C::Error>>>,
state: Rc<ConnState>,
/// Associated data for this block useful for statistics, quotas or other
/// information. This is provided by the algorithm in this crate.
Expand All @@ -123,7 +122,6 @@ impl<C: Connector, D: Default> Block<C, D> {
conns: Conns::default(),
state,
data: Default::default(),
pending_errors: Default::default(),
}
}

Expand Down Expand Up @@ -154,19 +152,26 @@ impl<C: Connector, D: Default> Block<C, D> {
"Should not have any closed connections"
);

let pending_errors = self.pending_errors.borrow().len();
// We should never have more pending errors than the number of
// waiters - the number of "live" connections that can eventually
// provide a connection (or potentially another error)
let pending_errors = self.conns.pending_error_len();
let waiting = self.state.metrics.count(MetricVariant::Waiting);
let live = self.state.metrics.sum_all(MetricVariant::live());
if self.conns.is_empty() && waiting == 0 {
assert_eq!(
pending_errors, 0,
"Pending errors must be empty if there are no connections and no errors"
)
}
assert!(
pending_errors == 0 || !self.state.metrics.has(MetricVariant::live()),
"Either pending_errors must be empty or conns must be empty! {} {:?}",
pending_errors,
self.state.metrics.counts()
);
assert!(
pending_errors <= waiting,
"Pending errors must be less than or equal to number of waiters: {} > {}",
// Note the inequality is changed to avoid usize underflow
pending_errors == 0 || (pending_errors + live <= waiting),
"Pending errors must be less than or equal to number of waiters - live: {} > {} - {} ({:?})",
pending_errors,
waiting
waiting,
live,
self.state.metrics
);
assert_eq!(
self.len(),
Expand Down Expand Up @@ -200,7 +205,6 @@ impl<C: Connector, D: Default> Block<C, D> {
consistency_check!(self);
let conn = Conn::new(connector.connect(&self.db_name), &self.state.metrics);
self.conns.insert(conn.clone());
self.pending_errors.borrow_mut().clear();
conn
};
async move {
Expand All @@ -223,8 +227,8 @@ impl<C: Connector, D: Default> Block<C, D> {
self: Rc<Self>,
connector: &C,
) -> impl Future<Output = ConnResult<ConnHandle<C>, C::Error>> {
if let Some(conn) = self.conns.try_acquire_idle_mru(&self.state.metrics) {
return Either::Left(ready(Ok(self.conn(conn))));
if let Some(res) = self.conns.try_acquire_idle_mru(&self.state.metrics) {
return Either::Left(ready(res.map(|c| self.conn(c))));
}
Either::Right(self.create(connector))
}
Expand All @@ -233,13 +237,9 @@ impl<C: Connector, D: Default> Block<C, D> {
fn queue(self: Rc<Self>) -> impl Future<Output = ConnResult<ConnHandle<C>, C::Error>> {
// If someone else is waiting, we have to queue, even if there's a connection
if self.state.waiters.is_empty() {
if let Some(conn) = self.conns.try_acquire_idle_mru(&self.state.metrics) {
trace!("Got a connection");
return Either::Left(ready(Ok(self.conn(conn))));
}
if let Some(err) = self.pending_errors.borrow_mut().pop() {
trace!("Got an error");
return Either::Left(ready(Err(err)));
consistency_check!(self);
if let Some(res) = self.conns.try_acquire_idle_mru(&self.state.metrics) {
return Either::Left(ready(res.map(|c| self.conn(c))));
}
}
// Update the metrics now before we actually queue
Expand All @@ -254,19 +254,18 @@ impl<C: Connector, D: Default> Block<C, D> {
.remove_time(MetricVariant::Waiting, now.elapsed());
});
Either::Right(async move {
consistency_check!(self);
loop {
if self.conns.is_empty() {
if let Some(err) = self.pending_errors.borrow_mut().pop() {
trace!("Got an error");
drop(guard);
return Err(err);
}
}
if let Some(conn) = self.conns.try_acquire_idle_mru(&self.state.metrics) {
trace!("Got a connection");
consistency_check!(self);
if let Some(res) = self.conns.try_acquire_idle_mru(&self.state.metrics) {
// Important note: we _must_ pop an error here because every
// error must have a corresponding waiter, and if there is
// one, the waiter that we're satisfying was supporting the
// existance of an error. This does not happen in the "fast
// case" if we can acquire a connection without waiting.
self.conns.pop_error();

drop(guard);
return Ok(self.conn(conn));
return res.map(|c| self.conn(c));
}
trace!("Queueing for a connection");
self.state.waiters.queue().await;
Expand All @@ -283,7 +282,6 @@ impl<C: Connector, D: Default> Block<C, D> {
consistency_check!(self);
let conn = Conn::new(connector.connect(&self.db_name), &self.state.metrics);
self.conns.insert(conn.clone());
self.pending_errors.borrow_mut().clear();
conn
};
self.poll_to_idle(conn)
Expand Down Expand Up @@ -329,7 +327,6 @@ impl<C: Connector, D: Default> Block<C, D> {
.try_take_idle_lru()
.expect("Could not acquire a connection");
to.conns.insert(conn.clone());
to.pending_errors.borrow_mut().clear();
conn.transfer(
connector,
&from.state.metrics,
Expand All @@ -356,7 +353,6 @@ impl<C: Connector, D: Default> Block<C, D> {
let conn = conn.into_inner();
from.conns.remove(&conn);
to.conns.insert(conn.clone());
to.pending_errors.borrow_mut().clear();
conn.transfer(
connector,
&from.state.metrics,
Expand Down Expand Up @@ -410,49 +406,40 @@ impl<C: Connector, D: Default> Block<C, D> {
}

async fn poll_to_idle(self: Rc<Self>, conn: Conn<C>) -> ConnResult<(), C::Error> {
consistency_check!(self);
let mut res =
poll_fn(|cx| conn.poll_ready(cx, &self.state.metrics, MetricVariant::Idle)).await;
if let Err(err) = res {
// If we have an error, we need to decide where the error goes...
// Note that we may try to route a connection to a waiter, but that
// waiter may end up grabbing a returned connection.

let mut route_to_waiter = false;

if !self.state.waiters.is_empty() {
// If waiters may be serviced by active or idle connections,
// we'll return the error here. If there are too few
// connections, one will eventually get created and we'll still
// make forward progress.
if !self.state.metrics.has(MetricVariant::live()) {
let waiters = self.state.metrics.count(MetricVariant::Waiting);
let failed = self.pending_errors.borrow().len();

// If there are less errors than waiters, we can route the
// error to a waiter.
if waiters > failed {
route_to_waiter = true;
let res = poll_fn(|cx| conn.poll_ready(cx, &self.state.metrics, MetricVariant::Idle)).await;
{
let waiters = self.state.waiters.len();
let live = self.state.metrics.sum_all(MetricVariant::live());

if let Err(err) = &res {
self.conns.remove(&conn);
conn.untrack(&self.state.metrics);

// If we have an error, we need to decide whether to keep it or not.
// The formula we use is `pending_errors < waiters - live`. We will
// only wake waiters if there are no live connections left.

let mut pending_errors = self.conns.pending_error_len();

if !self.state.waiters.is_empty() {
if pending_errors + live < waiters {
self.conns.push_error(&err);
pending_errors += 1
}
}
}

self.conns.remove(&conn);
conn.untrack(&self.state.metrics);

if route_to_waiter {
res = Err(ConnError::TaskFailed);
self.pending_errors.borrow_mut().push(err);
self.state.waiters.trigger();
if live == 0 {
for _ in 0..waiters.min(pending_errors) {
self.state.waiters.trigger();
}
}
} else {
res = Err(err);
// No error, wake a waiter (if we have one). Trim the
self.conns.trim_errors_to(waiters.saturating_sub(live));
self.state.waiters.trigger();
}
} else {
// No error, wake a waiter (if we have one)
self.pending_errors.borrow_mut().clear();
self.state.waiters.trigger();
res
}
res
}
}

Expand Down Expand Up @@ -932,6 +919,25 @@ mod tests {
Ok(())
}

/// This test shows that we
#[test(tokio::test)]
async fn test_block_fails_queue_multiple_2() -> Result<()> {
let connector = BasicConnector::no_delay();
let block = Rc::new(Block::<BasicConnector>::new(Name::from("db"), None));
let queue1 = block.clone().queue();
let queue2 = block.clone().queue();
let local = LocalSet::new();

connector.fail_next_connect();
local.spawn_local(block.clone().task_create(&connector));
connector.fail_next_connect();
local.spawn_local(block.clone().task_create(&connector));
local.await;
queue1.await.expect_err("Expected this to fail");
queue2.await.expect_err("Expected this to fail");
Ok(())
}

#[test(tokio::test)]
async fn test_steal() -> Result<()> {
let connector = BasicConnector::no_delay();
Expand Down Expand Up @@ -1008,6 +1014,35 @@ mod tests {
Ok(())
}

#[test(tokio::test)]
async fn test_move_fail() -> Result<()> {
let connector = BasicConnector::no_delay();
let blocks = Blocks::<_, ()>::default();
assert_eq!(0, blocks.block_count());
blocks.create(&connector, "db").await?;
blocks.create(&connector, "db").await?;
blocks.create(&connector, "db").await?;
assert_block!(blocks "db" has 3 Idle);
let conn1 = blocks.queue("db").await?;
let conn2 = blocks.queue("db").await?;
blocks.metrics("db").reset_max();
blocks.metrics("db2").reset_max();
connector.fail_next_connect();
let queue = blocks.queue("db2");
blocks
.task_move_to(&connector, conn1, "db2")
.await
.expect_err("Expected this to fail");
blocks.task_move_to(&connector, conn2, "db2").await?;
queue.await?;
assert_eq!(2, blocks.block_count());
assert_block!(blocks "db" has 1 Idle);
assert_block!(blocks "db2" has 1 Idle);
assert_block!(blocks "db" has max 2 Active, 1 Idle);
assert_block!(blocks "db2" has max 1 Active, 1 Connecting, 1 Idle, 1 Failed, 1 Waiting);
Ok(())
}

#[test(tokio::test)]
async fn test_close() -> Result<()> {
let connector = BasicConnector::no_delay();
Expand Down Expand Up @@ -1045,6 +1080,39 @@ mod tests {
Ok(())
}

#[test(tokio::test)]
async fn test_reopen_error() -> Result<()> {
let connector = BasicConnector::no_delay();
let blocks = Blocks::<_, ()>::default();
assert_eq!(0, blocks.block_count());
let local = LocalSet::new();

// Success
let conn = blocks.create(&connector, "db").await?;
// This one gets the error
let acq1 = blocks.queue("db");
// This one doesn't
let acq2 = local.spawn_local(blocks.queue("db"));

assert_block!(blocks "db" has 1 Active, 2 Waiting);

connector.fail_next_connect();
blocks
.task_reopen(&connector, conn)
.await
.expect_err("Expected a failure");
acq1.await.expect_err("Expected a failure");

local.spawn_local(blocks.task_create_one(&connector, "db"));
local.await;
_ = acq2.await?;

assert_block!(blocks "db" has 1 Idle);
assert_block!(blocks "db" has all time
3 Connecting, 1 Disconnecting, 2 Idle, 2 Active, 1 Failed, 1 Closed, 2 Waiting);
Ok(())
}

#[test(tokio::test)]
async fn test_reopen_fails() -> Result<()> {
let connector = BasicConnector::no_delay();
Expand Down
Loading

0 comments on commit 639971a

Please sign in to comment.