Skip to content

Commit

Permalink
Use r2d2 pooler
Browse files Browse the repository at this point in the history
  • Loading branch information
arpad-m committed Jan 14, 2025
1 parent 94ab6c5 commit a661209
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 30 deletions.
34 changes: 21 additions & 13 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion storage_controller/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ diesel = { version = "2.2.6", features = [
"serde_json",
"chrono",
] }
diesel-async = { version = "0.5.2", features = ["postgres", "bb8", "async-connection-wrapper"] }
diesel-async = { version = "0.5.2", features = ["postgres", "r2d2", "async-connection-wrapper"] }
diesel_migrations = { version = "2.2.0" }
scoped-futures = "0.1.4"

Expand Down
35 changes: 19 additions & 16 deletions storage_controller/src/persistence.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,13 @@
pub(crate) mod split_state;
use std::collections::HashMap;
use std::str::FromStr;
use std::sync::Arc;
use std::time::Duration;
use std::time::Instant;

use self::split_state::SplitState;
use diesel::prelude::*;
use diesel_async::async_connection_wrapper::AsyncConnectionWrapper;
use diesel_async::pooled_connection::bb8::Pool;
use diesel_async::pooled_connection::AsyncDieselConnectionManager;
use diesel_async::RunQueryDsl;
use diesel_async::{AsyncConnection, AsyncPgConnection};
use itertools::Itertools;
Expand Down Expand Up @@ -64,7 +63,11 @@ const MIGRATIONS: EmbeddedMigrations = embed_migrations!("./migrations");
/// updated, and reads of nodes are always from memory, not the database. We only require that
/// we can UPDATE a node's scheduling mode reasonably quickly to mark a bad node offline.
pub struct Persistence {
connection_pool: Pool<AsyncPgConnection>,
connection_pool: Arc<
diesel::r2d2::Pool<
diesel::r2d2::ConnectionManager<AsyncConnectionWrapper<AsyncPgConnection>>,
>,
>,
}

/// Legacy format, for use in JSON compat objects in test environment
Expand All @@ -80,7 +83,7 @@ pub(crate) enum DatabaseError {
#[error(transparent)]
Connection(#[from] diesel::result::ConnectionError),
#[error(transparent)]
ConnectionPool(#[from] diesel_async::pooled_connection::bb8::RunError),
ConnectionPool(#[from] diesel::r2d2::PoolError),
#[error("Logical error: {0}")]
Logical(String),
#[error("Migration error: {0}")]
Expand Down Expand Up @@ -156,21 +159,25 @@ impl Persistence {
const MAX_CONNECTION_LIFETIME: Duration = Duration::from_secs(60);

pub async fn new(database_url: String) -> Self {
let manager = AsyncDieselConnectionManager::<AsyncPgConnection>::new(database_url);
let manager =
diesel::r2d2::ConnectionManager::<AsyncConnectionWrapper<AsyncPgConnection>>::new(
database_url,
);

// We will use a connection pool: this is primarily to _limit_ our connection count, rather than to optimize time
// to execute queries (database queries are not generally on latency-sensitive paths).
let connection_pool = Pool::builder()
let connection_pool = diesel::r2d2::Pool::builder()
.max_size(Self::MAX_CONNECTIONS)
.max_lifetime(Some(Self::MAX_CONNECTION_LIFETIME))
.idle_timeout(Some(Self::IDLE_CONNECTION_TIMEOUT))
// Always keep at least one connection ready to go
.min_idle(Some(1))
.test_on_check_out(true)
.build(manager)
.await
.expect("Could not build connection pool");

let connection_pool = Arc::new(connection_pool);

Self { connection_pool }
}

Expand Down Expand Up @@ -204,17 +211,12 @@ impl Persistence {
use diesel_migrations::{HarnessWithOutput, MigrationHarness};

// Can't use self.with_conn here as we do spawn_blocking which requires static.
let conn = self
.connection_pool
.dedicated_connection()
.await
.map_err(|e| DatabaseError::Migration(e.to_string()))?;
let mut async_wrapper: AsyncConnectionWrapper<AsyncPgConnection> =
AsyncConnectionWrapper::from(conn);
let connection_pool = self.connection_pool.clone();
tokio::task::spawn_blocking(move || {
let mut conn = connection_pool.get()?;
let mut retry_count = 0;
loop {
let result = HarnessWithOutput::write_to_stdout(&mut async_wrapper)
let result = HarnessWithOutput::write_to_stdout(&mut conn)
.run_pending_migrations(MIGRATIONS)
.map(|_| ())
.map_err(|e| DatabaseError::Migration(e.to_string()));
Expand Down Expand Up @@ -293,7 +295,8 @@ impl Persistence {
+ 'a,
R: Send + 'b,
{
let mut conn = self.connection_pool.get().await?;
let connection_pool = self.connection_pool.clone();
let mut conn = tokio::task::spawn_blocking(move || connection_pool.get()).await.unwrap()?;
let mut retry_count = 0;
loop {
match conn
Expand Down

0 comments on commit a661209

Please sign in to comment.