diff --git a/Cargo.lock b/Cargo.lock index 9e3339411fdd..c4863a10495f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -942,18 +942,6 @@ version = "1.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8c3c1a368f70d6cf7302d78f8f7093da241fb8e8807c05cc9e51a125895a6d5b" -[[package]] -name = "bb8" -version = "0.8.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d89aabfae550a5c44b43ab941844ffcd2e993cb6900b342debf59e9ea74acdb8" -dependencies = [ - "async-trait", - "futures-util", - "parking_lot 0.12.1", - "tokio", -] - [[package]] name = "bcder" version = "0.7.4" @@ -1797,6 +1785,7 @@ dependencies = [ "chrono", "diesel_derives", "itoa", + "r2d2", "serde_json", ] @@ -1807,7 +1796,6 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "51a307ac00f7c23f526a04a77761a0519b9f0eb2838ebf5b905a58580095bdcb" dependencies = [ "async-trait", - "bb8", "diesel", "futures-util", "scoped-futures", @@ -4971,6 +4959,17 @@ dependencies = [ "proc-macro2", ] +[[package]] +name = "r2d2" +version = "0.8.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "51de85fb3fb6524929c8a2eb85e6b6d363de4e8c48f9e2c2eac4944abc181c93" +dependencies = [ + "log", + "parking_lot 0.12.1", + "scheduled-thread-pool", +] + [[package]] name = "rand" version = "0.7.3" @@ -5779,6 +5778,15 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "scheduled-thread-pool" +version = "0.2.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3cbc66816425a074528352f5789333ecff06ca41b36b0b0efdfbb29edc391a19" +dependencies = [ + "parking_lot 0.12.1", +] + [[package]] name = "scoped-futures" version = "0.1.4" diff --git a/storage_controller/Cargo.toml b/storage_controller/Cargo.toml index 9860bd5d0e44..d4c427658427 100644 --- a/storage_controller/Cargo.toml +++ b/storage_controller/Cargo.toml @@ -47,7 +47,7 @@ diesel = { version = "2.2.6", features = [ "serde_json", "chrono", ] } -diesel-async = { version = "0.5.2", features = ["postgres", "bb8", "async-connection-wrapper"] } +diesel-async = { version = "0.5.2", features = ["postgres", "r2d2", "async-connection-wrapper"] } diesel_migrations = { version = "2.2.0" } scoped-futures = "0.1.4" diff --git a/storage_controller/src/persistence.rs b/storage_controller/src/persistence.rs index 693142be2fce..f78058397e24 100644 --- a/storage_controller/src/persistence.rs +++ b/storage_controller/src/persistence.rs @@ -1,14 +1,13 @@ pub(crate) mod split_state; use std::collections::HashMap; use std::str::FromStr; +use std::sync::Arc; use std::time::Duration; use std::time::Instant; use self::split_state::SplitState; use diesel::prelude::*; use diesel_async::async_connection_wrapper::AsyncConnectionWrapper; -use diesel_async::pooled_connection::bb8::Pool; -use diesel_async::pooled_connection::AsyncDieselConnectionManager; use diesel_async::RunQueryDsl; use diesel_async::{AsyncConnection, AsyncPgConnection}; use itertools::Itertools; @@ -64,7 +63,11 @@ const MIGRATIONS: EmbeddedMigrations = embed_migrations!("./migrations"); /// updated, and reads of nodes are always from memory, not the database. We only require that /// we can UPDATE a node's scheduling mode reasonably quickly to mark a bad node offline. pub struct Persistence { - connection_pool: Pool, + connection_pool: Arc< + diesel::r2d2::Pool< + diesel::r2d2::ConnectionManager>, + >, + >, } /// Legacy format, for use in JSON compat objects in test environment @@ -80,7 +83,7 @@ pub(crate) enum DatabaseError { #[error(transparent)] Connection(#[from] diesel::result::ConnectionError), #[error(transparent)] - ConnectionPool(#[from] diesel_async::pooled_connection::bb8::RunError), + ConnectionPool(#[from] diesel::r2d2::PoolError), #[error("Logical error: {0}")] Logical(String), #[error("Migration error: {0}")] @@ -156,11 +159,14 @@ impl Persistence { const MAX_CONNECTION_LIFETIME: Duration = Duration::from_secs(60); pub async fn new(database_url: String) -> Self { - let manager = AsyncDieselConnectionManager::::new(database_url); + let manager = + diesel::r2d2::ConnectionManager::>::new( + database_url, + ); // We will use a connection pool: this is primarily to _limit_ our connection count, rather than to optimize time // to execute queries (database queries are not generally on latency-sensitive paths). - let connection_pool = Pool::builder() + let connection_pool = diesel::r2d2::Pool::builder() .max_size(Self::MAX_CONNECTIONS) .max_lifetime(Some(Self::MAX_CONNECTION_LIFETIME)) .idle_timeout(Some(Self::IDLE_CONNECTION_TIMEOUT)) @@ -168,9 +174,10 @@ impl Persistence { .min_idle(Some(1)) .test_on_check_out(true) .build(manager) - .await .expect("Could not build connection pool"); + let connection_pool = Arc::new(connection_pool); + Self { connection_pool } } @@ -204,17 +211,12 @@ impl Persistence { use diesel_migrations::{HarnessWithOutput, MigrationHarness}; // Can't use self.with_conn here as we do spawn_blocking which requires static. - let conn = self - .connection_pool - .dedicated_connection() - .await - .map_err(|e| DatabaseError::Migration(e.to_string()))?; - let mut async_wrapper: AsyncConnectionWrapper = - AsyncConnectionWrapper::from(conn); + let connection_pool = self.connection_pool.clone(); tokio::task::spawn_blocking(move || { + let mut conn = connection_pool.get()?; let mut retry_count = 0; loop { - let result = HarnessWithOutput::write_to_stdout(&mut async_wrapper) + let result = HarnessWithOutput::write_to_stdout(&mut conn) .run_pending_migrations(MIGRATIONS) .map(|_| ()) .map_err(|e| DatabaseError::Migration(e.to_string())); @@ -293,7 +295,8 @@ impl Persistence { + 'a, R: Send + 'b, { - let mut conn = self.connection_pool.get().await?; + let connection_pool = self.connection_pool.clone(); + let mut conn = tokio::task::spawn_blocking(move || connection_pool.get()).await.unwrap()?; let mut retry_count = 0; loop { match conn