Skip to content

Commit

Permalink
Fix leaky connections
Browse files Browse the repository at this point in the history
It's possible to trigger more approvals than are necessary, in turn
grabbing more connections than we need. This happens when we drop a
connection. The drop produces a notify, which doesn't get used until the
pool is empty. The first `Pool::get()` call on an empty pool will spawn
an connect task, immediately complete `notify.notified().await`, then
spawn a second connect task. Both will connect and we'll end up with 1
more connection than we need.

Rather than address the notify issue directly, this fix introduces some
bookkeeping that tracks the number of open `pool.get()` requests we have
waiting on connections. If the number of pending connections >= the
number of pending gets, we will not spawn any additional connect tasks.
  • Loading branch information
tneely authored and djc committed Oct 17, 2024
1 parent b198221 commit bce0b0a
Show file tree
Hide file tree
Showing 3 changed files with 79 additions and 12 deletions.
3 changes: 2 additions & 1 deletion bb8/src/inner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,9 @@ where
let mut wait_time_start = None;

let future = async {
let getting = self.inner.start_get();
loop {
let (conn, approvals) = self.inner.pop();
let (conn, approvals) = getting.get();
self.spawn_replenishing_approvals(approvals);

// Cancellation safety: make sure to wrap the connection in a `PooledConnection`
Expand Down
54 changes: 43 additions & 11 deletions bb8/src/internals.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,17 +36,6 @@ where
}
}

pub(crate) fn pop(&self) -> (Option<Conn<M::Connection>>, ApprovalIter) {
let mut locked = self.internals.lock();
let conn = locked.conns.pop_front().map(|idle| idle.conn);
let approvals = match &conn {
Some(_) => locked.wanted(&self.statics),
None => locked.approvals(&self.statics, 1),
};

(conn, approvals)
}

pub(crate) fn try_put(self: &Arc<Self>, conn: M::Connection) -> Result<(), M::Connection> {
let mut locked = self.internals.lock();
let mut approvals = locked.approvals(&self.statics, 1);
Expand All @@ -67,6 +56,10 @@ where
iter
}

pub(crate) fn start_get(self: &Arc<Self>) -> Getting<M> {
Getting::new(self.clone())
}

pub(crate) fn forward_error(&self, err: M::Error) {
self.statics.error_sink.sink(err);
}
Expand All @@ -81,6 +74,7 @@ where
conns: VecDeque<IdleConn<M::Connection>>,
num_conns: u32,
pending_conns: u32,
in_flight: u32,
}

impl<M> PoolInternals<M>
Expand Down Expand Up @@ -202,6 +196,7 @@ where
conns: VecDeque::new(),
num_conns: 0,
pending_conns: 0,
in_flight: 0,
}
}
}
Expand Down Expand Up @@ -236,6 +231,43 @@ pub(crate) struct Approval {
_priv: (),
}

pub(crate) struct Getting<M: ManageConnection + Send> {
inner: Arc<SharedPool<M>>,
}

impl<M: ManageConnection + Send> Getting<M> {
pub(crate) fn get(&self) -> (Option<Conn<M::Connection>>, ApprovalIter) {
let mut locked = self.inner.internals.lock();
if let Some(IdleConn { conn, .. }) = locked.conns.pop_front() {
return (Some(conn), locked.wanted(&self.inner.statics));
}

let approvals = match locked.in_flight > locked.pending_conns {
true => 1,
false => 0,
};

(None, locked.approvals(&self.inner.statics, approvals))
}
}

impl<M: ManageConnection + Send> Getting<M> {
fn new(inner: Arc<SharedPool<M>>) -> Self {
{
let mut locked = inner.internals.lock();
locked.in_flight += 1;
}
Getting { inner }
}
}

impl<M: ManageConnection + Send> Drop for Getting<M> {
fn drop(&mut self) {
let mut locked = self.inner.internals.lock();
locked.in_flight -= 1;
}
}

#[derive(Debug)]
pub(crate) struct Conn<C>
where
Expand Down
34 changes: 34 additions & 0 deletions bb8/tests/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1103,3 +1103,37 @@ async fn test_add_checks_broken_connections() {
let res = pool.add(conn);
assert!(matches!(res, Err(AddError::Broken(_))));
}

#[tokio::test]
async fn test_reuse_on_drop() {
let pool = Pool::builder()
.min_idle(0)
.max_size(100)
.queue_strategy(QueueStrategy::Lifo)
.build(OkManager::<FakeConnection>::new())
.await
.unwrap();

// The first get should
// 1) see nothing in the pool,
// 2) spawn a single replenishing approval,
// 3) get notified of the new connection and grab it from the pool
let conn_0 = pool.get().await.expect("should connect");
// Dropping the connection queues up a notify
drop(conn_0);

// The second get should
// 1) see the first connection in the pool and grab it
let _conn_1 = pool.get().await.expect("should connect");

// The third get will
// 1) see nothing in the pool,
// 2) spawn a single replenishing approval,
// 3) get notified of the new connection,
// 4) see nothing in the pool,
// 5) _not_ spawn a single replenishing approval,
// 6) get notified of the new connection and grab it from the pool
let _conn_2 = pool.get().await.expect("should connect");

assert_eq!(pool.state().connections, 2);
}

0 comments on commit bce0b0a

Please sign in to comment.