From a4776a371df360776664b53b3e5ee8396cc7cc0f Mon Sep 17 00:00:00 2001 From: Dirkjan Ochtman Date: Thu, 17 Oct 2024 11:25:57 +0200 Subject: [PATCH 1/9] Replace actions-rs/toolchain with dtolnay/rust-toolchain --- .github/workflows/main.yml | 19 ++++--------------- 1 file changed, 4 insertions(+), 15 deletions(-) diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index a415b5d..6849b69 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -23,11 +23,9 @@ jobs: steps: - uses: actions/checkout@v2 - - uses: actions-rs/toolchain@v1 + - uses: dtolnay/rust-toolchain@master with: - profile: minimal toolchain: ${{ matrix.rust }} - override: true - uses: actions-rs/cargo@v1 with: command: build @@ -42,11 +40,9 @@ jobs: steps: - uses: actions/checkout@v2 - - uses: actions-rs/toolchain@v1 + - uses: dtolnay/rust-toolchain@master with: - profile: minimal toolchain: 1.70.0 - override: true - uses: actions-rs/cargo@v1 with: command: build @@ -60,11 +56,8 @@ jobs: runs-on: ubuntu-latest steps: - uses: actions/checkout@v2 - - uses: actions-rs/toolchain@v1 + - uses: dtolnay/rust-toolchain@stable with: - profile: minimal - toolchain: stable - override: true components: rustfmt, clippy - uses: actions-rs/cargo@v1 with: @@ -88,11 +81,7 @@ jobs: CARGO_TERM_COLOR: always steps: - uses: actions/checkout@v3 - - uses: actions-rs/toolchain@v1 - with: - profile: minimal - toolchain: stable - override: true + - uses: dtolnay/rust-toolchain@stable - name: Install cargo-llvm-cov uses: taiki-e/install-action@cargo-llvm-cov - name: Generate code coverage From 38af099bfd8ccad2dc3530ce3391038f03303c62 Mon Sep 17 00:00:00 2001 From: Dirkjan Ochtman Date: Thu, 17 Oct 2024 11:28:46 +0200 Subject: [PATCH 2/9] Remove use of actions-rs/cargo action --- .github/workflows/main.yml | 30 +++++------------------------- 1 file changed, 5 insertions(+), 25 deletions(-) diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index 6849b69..3468c6d 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -26,14 +26,8 @@ jobs: - uses: dtolnay/rust-toolchain@master with: toolchain: ${{ matrix.rust }} - - uses: actions-rs/cargo@v1 - with: - command: build - args: --workspace --all-targets - - uses: actions-rs/cargo@v1 - with: - command: test - args: --workspace + - run: cargo check --all-targets + - run: cargo test --all-features msrv: runs-on: ubuntu-latest @@ -43,14 +37,7 @@ jobs: - uses: dtolnay/rust-toolchain@master with: toolchain: 1.70.0 - - uses: actions-rs/cargo@v1 - with: - command: build - args: -p bb8 --all-targets - - uses: actions-rs/cargo@v1 - with: - command: test - args: -p bb8 + - run: cargo test -p bb8 lint: runs-on: ubuntu-latest @@ -59,15 +46,8 @@ jobs: - uses: dtolnay/rust-toolchain@stable with: components: rustfmt, clippy - - uses: actions-rs/cargo@v1 - with: - command: fmt - args: --all -- --check - - uses: actions-rs/cargo@v1 - if: always() - with: - command: clippy - args: --workspace --all-targets -- -D warnings + - run: cargo fmt -- --check + - run: cargo clippy --all-features --all-targets -- -D warnings audit: runs-on: ubuntu-latest From a62d048cd762fbbc3073206fc3357bf5a8e29aa4 Mon Sep 17 00:00:00 2001 From: Dirkjan Ochtman Date: Thu, 17 Oct 2024 11:29:20 +0200 Subject: [PATCH 3/9] Run tests with --nocapture in CI --- .github/workflows/main.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index 3468c6d..12f371e 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -27,7 +27,7 @@ jobs: with: toolchain: ${{ matrix.rust }} - run: cargo check --all-targets - - run: cargo test --all-features + - run: cargo test --all-features -- --nocapture msrv: runs-on: ubuntu-latest From f9f71b216779c8a1a3c870cb666408c491bcf133 Mon Sep 17 00:00:00 2001 From: Dirkjan Ochtman Date: Thu, 17 Oct 2024 11:29:49 +0200 Subject: [PATCH 4/9] Upgrade actions/checkout to v4 --- .github/workflows/main.yml | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index 12f371e..25c4d51 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -22,7 +22,7 @@ jobs: runs-on: ${{ matrix.os }} steps: - - uses: actions/checkout@v2 + - uses: actions/checkout@v4 - uses: dtolnay/rust-toolchain@master with: toolchain: ${{ matrix.rust }} @@ -33,7 +33,7 @@ jobs: runs-on: ubuntu-latest steps: - - uses: actions/checkout@v2 + - uses: actions/checkout@v4 - uses: dtolnay/rust-toolchain@master with: toolchain: 1.70.0 @@ -42,7 +42,7 @@ jobs: lint: runs-on: ubuntu-latest steps: - - uses: actions/checkout@v2 + - uses: actions/checkout@v4 - uses: dtolnay/rust-toolchain@stable with: components: rustfmt, clippy @@ -52,7 +52,7 @@ jobs: audit: runs-on: ubuntu-latest steps: - - uses: actions/checkout@v2 + - uses: actions/checkout@v4 - uses: EmbarkStudios/cargo-deny-action@v1 coverage: From b03e30b61e046d20f8208942a696e0f0a7e144bf Mon Sep 17 00:00:00 2001 From: Taylor Neely Date: Tue, 15 Oct 2024 20:35:54 +0000 Subject: [PATCH 5/9] Move (some) connection expiry logic into method --- bb8/src/internals.rs | 35 ++++++++++++++++++++++++++++++++++- 1 file changed, 34 insertions(+), 1 deletion(-) diff --git a/bb8/src/internals.rs b/bb8/src/internals.rs index 155e21a..8197910 100644 --- a/bb8/src/internals.rs +++ b/bb8/src/internals.rs @@ -169,7 +169,7 @@ where } } if let Some(lifetime) = config.max_lifetime { - if now - conn.conn.birth >= lifetime { + if conn.conn.is_expired(now, lifetime) { closed_max_lifetime += 1; keep &= false; } @@ -252,6 +252,11 @@ impl Conn { birth: Instant::now(), } } + + pub(crate) fn is_expired(&self, now: Instant, max: Duration) -> bool { + // The current age of the connection is longer than the maximum allowed lifetime + now - self.birth >= max + } } impl From> for Conn { @@ -357,3 +362,31 @@ pub(crate) enum StatsKind { ClosedBroken, ClosedInvalid, } + +#[cfg(test)] +mod tests { + use std::time::{Duration, Instant}; + + use crate::internals::Conn; + + #[test] + fn test_conn_is_expired() { + let conn = Conn { + conn: (), + birth: Instant::now(), + }; + + assert!( + !conn.is_expired(conn.birth, Duration::from_nanos(1)), + "at birth, the connection has not expired" + ); + assert!( + !conn.is_expired(Instant::now(), Duration::from_secs(5)), + "from setup to now is shorter than 5s, so the connection has not expired" + ); + assert!( + conn.is_expired(Instant::now(), Duration::from_nanos(1)), + "from setup to now is longer than 1ns, so the connection has expired" + ); + } +} From 77ffbc720a942fe2241434aae644b106dfe89ba5 Mon Sep 17 00:00:00 2001 From: Taylor Neely Date: Tue, 15 Oct 2024 20:36:04 +0000 Subject: [PATCH 6/9] Reap expired connections on drop The reaper only runs against the connections in its idle pool. This is fine for reaping idle connections, but for hotly contested connections beyond their maximum lifetime this can prove problematic. Consider an active connection beyond its lifetime and a reaper that runs every 3 seconds: - [t0] Connection is idle - [t1] Connection is active - [t2] Reaper runs, does not see connection - [t3] Connection is idle This pattern can repeat infinitely with the connection never being reaped. By checking the max lifetime on drop, we can ensure that expired connections are reaped in a reason amount of time (assuming they eventually do get dropped). --- bb8/src/inner.rs | 12 ++++++++++-- bb8/tests/test.rs | 37 ++++++++++++++++++++++++++++++++++++- 2 files changed, 46 insertions(+), 3 deletions(-) diff --git a/bb8/src/inner.rs b/bb8/src/inner.rs index 80e7922..7c6e3b9 100644 --- a/bb8/src/inner.rs +++ b/bb8/src/inner.rs @@ -149,12 +149,20 @@ where "handled in caller" ); + let is_broken = self.inner.manager.has_broken(&mut conn.conn); + let is_expired = match self.inner.statics.max_lifetime { + Some(lt) => conn.is_expired(Instant::now(), lt), + None => false, + }; + let mut locked = self.inner.internals.lock(); - match (state, self.inner.manager.has_broken(&mut conn.conn)) { + match (state, is_broken || is_expired) { (ConnectionState::Present, false) => locked.put(conn, None, self.inner.clone()), - (_, is_broken) => { + _ => { if is_broken { self.inner.statistics.record(StatsKind::ClosedBroken); + } else if is_expired { + self.inner.statistics.record_connections_reaped(0, 1); } let approvals = locked.dropped(1, &self.inner.statics); self.spawn_replenishing_approvals(approvals); diff --git a/bb8/tests/test.rs b/bb8/tests/test.rs index 0e1225e..724a376 100644 --- a/bb8/tests/test.rs +++ b/bb8/tests/test.rs @@ -12,7 +12,8 @@ use std::{error, fmt}; use async_trait::async_trait; use futures_util::future::{err, lazy, ok, pending, ready, try_join_all, FutureExt}; use futures_util::stream::{FuturesUnordered, TryStreamExt}; -use tokio::{sync::oneshot, time::timeout}; +use tokio::sync::oneshot; +use tokio::time::{sleep, timeout}; #[derive(Debug, PartialEq, Eq)] pub struct Error; @@ -585,6 +586,40 @@ async fn test_max_lifetime() { assert_eq!(pool.state().statistics.connections_closed_max_lifetime, 5); } +#[tokio::test] +async fn test_max_lifetime_reap_on_drop() { + static DROPPED: AtomicUsize = AtomicUsize::new(0); + + #[derive(Default)] + struct Connection; + + impl Drop for Connection { + fn drop(&mut self) { + DROPPED.fetch_add(1, Ordering::SeqCst); + } + } + + let manager = OkManager::::new(); + let pool = Pool::builder() + .max_lifetime(Some(Duration::from_secs(1))) + .connection_timeout(Duration::from_secs(1)) + .reaper_rate(Duration::from_secs(999)) + .build(manager) + .await + .unwrap(); + + let conn = pool.get().await; + + // And wait. + sleep(Duration::from_secs(2)).await; + assert_eq!(DROPPED.load(Ordering::SeqCst), 0); + + // Connection is reaped on drop. + drop(conn); + assert_eq!(DROPPED.load(Ordering::SeqCst), 1); + assert_eq!(pool.state().statistics.connections_closed_max_lifetime, 1); +} + #[tokio::test] async fn test_min_idle() { let pool = Pool::builder() From 4e51e6a526fcc1e7baacf346f75690ac3a94746e Mon Sep 17 00:00:00 2001 From: Taylor Neely Date: Fri, 11 Oct 2024 19:01:08 +0000 Subject: [PATCH 7/9] Fix leaky connections It's possible to trigger more approvals than are necessary, in turn grabbing more connections than we need. This happens when we drop a connection. The drop produces a notify, which doesn't get used until the pool is empty. The first `Pool::get()` call on an empty pool will spawn an connect task, immediately complete `notify.notified().await`, then spawn a second connect task. Both will connect and we'll end up with 1 more connection than we need. Rather than address the notify issue directly, this fix introduces some bookkeeping that tracks the number of open `pool.get()` requests we have waiting on connections. If the number of pending connections >= the number of pending gets, we will not spawn any additional connect tasks. --- bb8/src/inner.rs | 3 ++- bb8/src/internals.rs | 54 +++++++++++++++++++++++++++++++++++--------- bb8/tests/test.rs | 34 ++++++++++++++++++++++++++++ 3 files changed, 79 insertions(+), 12 deletions(-) diff --git a/bb8/src/inner.rs b/bb8/src/inner.rs index 7c6e3b9..95a698c 100644 --- a/bb8/src/inner.rs +++ b/bb8/src/inner.rs @@ -91,8 +91,9 @@ where let mut wait_time_start = None; let future = async { + let getting = self.inner.start_get(); loop { - let (conn, approvals) = self.inner.pop(); + let (conn, approvals) = getting.get(); self.spawn_replenishing_approvals(approvals); // Cancellation safety: make sure to wrap the connection in a `PooledConnection` diff --git a/bb8/src/internals.rs b/bb8/src/internals.rs index 8197910..7fdde3f 100644 --- a/bb8/src/internals.rs +++ b/bb8/src/internals.rs @@ -36,17 +36,6 @@ where } } - pub(crate) fn pop(&self) -> (Option>, ApprovalIter) { - let mut locked = self.internals.lock(); - let conn = locked.conns.pop_front().map(|idle| idle.conn); - let approvals = match &conn { - Some(_) => locked.wanted(&self.statics), - None => locked.approvals(&self.statics, 1), - }; - - (conn, approvals) - } - pub(crate) fn try_put(self: &Arc, conn: M::Connection) -> Result<(), M::Connection> { let mut locked = self.internals.lock(); let mut approvals = locked.approvals(&self.statics, 1); @@ -67,6 +56,10 @@ where iter } + pub(crate) fn start_get(self: &Arc) -> Getting { + Getting::new(self.clone()) + } + pub(crate) fn forward_error(&self, err: M::Error) { self.statics.error_sink.sink(err); } @@ -81,6 +74,7 @@ where conns: VecDeque>, num_conns: u32, pending_conns: u32, + in_flight: u32, } impl PoolInternals @@ -202,6 +196,7 @@ where conns: VecDeque::new(), num_conns: 0, pending_conns: 0, + in_flight: 0, } } } @@ -236,6 +231,43 @@ pub(crate) struct Approval { _priv: (), } +pub(crate) struct Getting { + inner: Arc>, +} + +impl Getting { + pub(crate) fn get(&self) -> (Option>, ApprovalIter) { + let mut locked = self.inner.internals.lock(); + if let Some(IdleConn { conn, .. }) = locked.conns.pop_front() { + return (Some(conn), locked.wanted(&self.inner.statics)); + } + + let approvals = match locked.in_flight > locked.pending_conns { + true => 1, + false => 0, + }; + + (None, locked.approvals(&self.inner.statics, approvals)) + } +} + +impl Getting { + fn new(inner: Arc>) -> Self { + { + let mut locked = inner.internals.lock(); + locked.in_flight += 1; + } + Getting { inner } + } +} + +impl Drop for Getting { + fn drop(&mut self) { + let mut locked = self.inner.internals.lock(); + locked.in_flight -= 1; + } +} + #[derive(Debug)] pub(crate) struct Conn where diff --git a/bb8/tests/test.rs b/bb8/tests/test.rs index 724a376..8680309 100644 --- a/bb8/tests/test.rs +++ b/bb8/tests/test.rs @@ -1103,3 +1103,37 @@ async fn test_add_checks_broken_connections() { let res = pool.add(conn); assert!(matches!(res, Err(AddError::Broken(_)))); } + +#[tokio::test] +async fn test_reuse_on_drop() { + let pool = Pool::builder() + .min_idle(0) + .max_size(100) + .queue_strategy(QueueStrategy::Lifo) + .build(OkManager::::new()) + .await + .unwrap(); + + // The first get should + // 1) see nothing in the pool, + // 2) spawn a single replenishing approval, + // 3) get notified of the new connection and grab it from the pool + let conn_0 = pool.get().await.expect("should connect"); + // Dropping the connection queues up a notify + drop(conn_0); + + // The second get should + // 1) see the first connection in the pool and grab it + let _conn_1 = pool.get().await.expect("should connect"); + + // The third get will + // 1) see nothing in the pool, + // 2) spawn a single replenishing approval, + // 3) get notified of the new connection, + // 4) see nothing in the pool, + // 5) _not_ spawn a single replenishing approval, + // 6) get notified of the new connection and grab it from the pool + let _conn_2 = pool.get().await.expect("should connect"); + + assert_eq!(pool.state().connections, 2); +} From 9f6629ad0e68d9730bad0b24a97dcbcccbc88850 Mon Sep 17 00:00:00 2001 From: Dirkjan Ochtman Date: Thu, 17 Oct 2024 11:57:01 +0200 Subject: [PATCH 8/9] bb8: linearize code in PoolInner::put_back() --- bb8/src/inner.rs | 23 +++++++++++------------ 1 file changed, 11 insertions(+), 12 deletions(-) diff --git a/bb8/src/inner.rs b/bb8/src/inner.rs index 95a698c..08c9ffd 100644 --- a/bb8/src/inner.rs +++ b/bb8/src/inner.rs @@ -157,19 +157,18 @@ where }; let mut locked = self.inner.internals.lock(); - match (state, is_broken || is_expired) { - (ConnectionState::Present, false) => locked.put(conn, None, self.inner.clone()), - _ => { - if is_broken { - self.inner.statistics.record(StatsKind::ClosedBroken); - } else if is_expired { - self.inner.statistics.record_connections_reaped(0, 1); - } - let approvals = locked.dropped(1, &self.inner.statics); - self.spawn_replenishing_approvals(approvals); - self.inner.notify.notify_one(); - } + if let (ConnectionState::Present, false) = (state, is_broken || is_expired) { + locked.put(conn, None, self.inner.clone()); + return; + } else if is_broken { + self.inner.statistics.record(StatsKind::ClosedBroken); + } else if is_expired { + self.inner.statistics.record_connections_reaped(0, 1); } + + let approvals = locked.dropped(1, &self.inner.statics); + self.spawn_replenishing_approvals(approvals); + self.inner.notify.notify_one(); } /// Adds an external connection to the pool if there is capacity for it. From 6a8ad308066e929cf68732359db8bd9e784ddd13 Mon Sep 17 00:00:00 2001 From: Dirkjan Ochtman Date: Thu, 17 Oct 2024 11:12:45 +0200 Subject: [PATCH 9/9] Bump bb8 version to 0.8.6 --- bb8/Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/bb8/Cargo.toml b/bb8/Cargo.toml index aa20290..d952c99 100644 --- a/bb8/Cargo.toml +++ b/bb8/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "bb8" -version = "0.8.5" +version = "0.8.6" edition = "2021" rust-version = "1.70" description = "Full-featured async (tokio-based) connection pool (like r2d2)"