From 94ab6c5c33a5d9f6774036200796d5f575b04fe0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arpad=20M=C3=BCller?= Date: Sat, 4 Jan 2025 00:07:14 +0100 Subject: [PATCH 1/2] Switch to diesel-async and tokio-postgres --- Cargo.lock | 159 ++++-- storage_controller/Cargo.toml | 5 +- storage_controller/src/main.rs | 2 +- storage_controller/src/persistence.rs | 760 +++++++++++++++----------- 4 files changed, 548 insertions(+), 378 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 1e29f4fc08aa..9e3339411fdd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -942,6 +942,18 @@ version = "1.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8c3c1a368f70d6cf7302d78f8f7093da241fb8e8807c05cc9e51a125895a6d5b" +[[package]] +name = "bb8" +version = "0.8.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d89aabfae550a5c44b43ab941844ffcd2e993cb6900b342debf59e9ea74acdb8" +dependencies = [ + "async-trait", + "futures-util", + "parking_lot 0.12.1", + "tokio", +] + [[package]] name = "bcder" version = "0.7.4" @@ -1301,7 +1313,7 @@ dependencies = [ "tar", "thiserror", "tokio", - "tokio-postgres", + "tokio-postgres 0.7.7", "tokio-stream", "tokio-util", "tower 0.5.2", @@ -1409,7 +1421,7 @@ dependencies = [ "storage_broker", "thiserror", "tokio", - "tokio-postgres", + "tokio-postgres 0.7.7", "tokio-util", "toml", "toml_edit", @@ -1785,11 +1797,24 @@ dependencies = [ "chrono", "diesel_derives", "itoa", - "pq-sys", - "r2d2", "serde_json", ] +[[package]] +name = "diesel-async" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "51a307ac00f7c23f526a04a77761a0519b9f0eb2838ebf5b905a58580095bdcb" +dependencies = [ + "async-trait", + "bb8", + "diesel", + "futures-util", + "scoped-futures", + "tokio", + "tokio-postgres 0.7.12", +] + [[package]] name = "diesel_derives" version = "2.2.1" @@ -4038,8 +4063,8 @@ dependencies = [ "pageserver_compaction", "pin-project-lite", "postgres", - "postgres-protocol", - "postgres-types", + "postgres-protocol 0.6.4", + "postgres-types 0.2.4", "postgres_backend", "postgres_connection", "postgres_ffi", @@ -4070,7 +4095,7 @@ dependencies = [ "tokio", "tokio-epoll-uring", "tokio-io-timeout", - "tokio-postgres", + "tokio-postgres 0.7.7", "tokio-stream", "tokio-tar", "tokio-util", @@ -4128,7 +4153,7 @@ dependencies = [ "serde", "thiserror", "tokio", - "tokio-postgres", + "tokio-postgres 0.7.7", "tokio-stream", "tokio-util", "utils", @@ -4434,7 +4459,7 @@ dependencies = [ "futures-util", "log", "tokio", - "tokio-postgres", + "tokio-postgres 0.7.7", ] [[package]] @@ -4455,6 +4480,24 @@ dependencies = [ "stringprep", ] +[[package]] +name = "postgres-protocol" +version = "0.6.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "acda0ebdebc28befa84bee35e651e4c5f09073d668c7aed4cf7e23c3cda84b23" +dependencies = [ + "base64 0.22.1", + "byteorder", + "bytes", + "fallible-iterator", + "hmac", + "md-5", + "memchr", + "rand 0.8.5", + "sha2", + "stringprep", +] + [[package]] name = "postgres-protocol2" version = "0.1.0" @@ -4478,7 +4521,18 @@ source = "git+https://github.com/neondatabase/rust-postgres.git?branch=neon#511f dependencies = [ "bytes", "fallible-iterator", - "postgres-protocol", + "postgres-protocol 0.6.4", +] + +[[package]] +name = "postgres-types" +version = "0.2.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f66ea23a2d0e5734297357705193335e0a957696f34bed2f2faefacb2fec336f" +dependencies = [ + "bytes", + "fallible-iterator", + "postgres-protocol 0.6.7", ] [[package]] @@ -4503,7 +4557,7 @@ dependencies = [ "serde", "thiserror", "tokio", - "tokio-postgres", + "tokio-postgres 0.7.7", "tokio-postgres-rustls", "tokio-rustls 0.26.0", "tokio-util", @@ -4518,7 +4572,7 @@ dependencies = [ "itertools 0.10.5", "once_cell", "postgres", - "tokio-postgres", + "tokio-postgres 0.7.7", "url", ] @@ -4605,15 +4659,6 @@ version = "0.2.17" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5b40af805b3121feab8a3c29f04d8ad262fa8e0561883e7653e024ae4479e6de" -[[package]] -name = "pq-sys" -version = "0.6.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f6cc05d7ea95200187117196eee9edd0644424911821aeb28a18ce60ea0b8793" -dependencies = [ - "vcpkg", -] - [[package]] name = "pq_proto" version = "0.1.0" @@ -4621,7 +4666,7 @@ dependencies = [ "byteorder", "bytes", "itertools 0.10.5", - "postgres-protocol", + "postgres-protocol 0.6.4", "rand 0.8.5", "serde", "thiserror", @@ -4869,7 +4914,7 @@ dependencies = [ "tikv-jemalloc-ctl", "tikv-jemallocator", "tokio", - "tokio-postgres", + "tokio-postgres 0.7.7", "tokio-postgres2", "tokio-rustls 0.26.0", "tokio-tungstenite 0.21.0", @@ -4926,17 +4971,6 @@ dependencies = [ "proc-macro2", ] -[[package]] -name = "r2d2" -version = "0.8.10" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "51de85fb3fb6524929c8a2eb85e6b6d363de4e8c48f9e2c2eac4944abc181c93" -dependencies = [ - "log", - "parking_lot 0.12.1", - "scheduled-thread-pool", -] - [[package]] name = "rand" version = "0.7.3" @@ -5667,7 +5701,7 @@ dependencies = [ "pageserver_api", "parking_lot 0.12.1", "postgres", - "postgres-protocol", + "postgres-protocol 0.6.4", "postgres_backend", "postgres_ffi", "pprof", @@ -5690,7 +5724,7 @@ dependencies = [ "tikv-jemallocator", "tokio", "tokio-io-timeout", - "tokio-postgres", + "tokio-postgres 0.7.7", "tokio-stream", "tokio-tar", "tokio-util", @@ -5746,12 +5780,12 @@ dependencies = [ ] [[package]] -name = "scheduled-thread-pool" -version = "0.2.7" +name = "scoped-futures" +version = "0.1.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3cbc66816425a074528352f5789333ecff06ca41b36b0b0efdfbb29edc391a19" +checksum = "1b24aae2d0636530f359e9d5ef0c04669d11c5e756699b27a6a6d845d8329091" dependencies = [ - "parking_lot 0.12.1", + "pin-project-lite", ] [[package]] @@ -6286,6 +6320,7 @@ dependencies = [ "clap", "control_plane", "diesel", + "diesel-async", "diesel_migrations", "fail", "futures", @@ -6300,10 +6335,10 @@ dependencies = [ "pageserver_api", "pageserver_client", "postgres_connection", - "r2d2", "rand 0.8.5", "reqwest", "routerify", + "scoped-futures", "scopeguard", "serde", "serde_json", @@ -6356,7 +6391,7 @@ dependencies = [ "serde_json", "storage_controller_client", "tokio", - "tokio-postgres", + "tokio-postgres 0.7.7", "tokio-postgres-rustls", "tokio-stream", "tokio-util", @@ -6815,13 +6850,39 @@ dependencies = [ "percent-encoding", "phf", "pin-project-lite", - "postgres-protocol", - "postgres-types", + "postgres-protocol 0.6.4", + "postgres-types 0.2.4", "socket2", "tokio", "tokio-util", ] +[[package]] +name = "tokio-postgres" +version = "0.7.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3b5d3742945bc7d7f210693b0c58ae542c6fd47b17adbbda0885f3dcb34a6bdb" +dependencies = [ + "async-trait", + "byteorder", + "bytes", + "fallible-iterator", + "futures-channel", + "futures-util", + "log", + "parking_lot 0.12.1", + "percent-encoding", + "phf", + "pin-project-lite", + "postgres-protocol 0.6.7", + "postgres-types 0.2.8", + "rand 0.8.5", + "socket2", + "tokio", + "tokio-util", + "whoami", +] + [[package]] name = "tokio-postgres-rustls" version = "0.12.0" @@ -6831,7 +6892,7 @@ dependencies = [ "ring", "rustls 0.23.18", "tokio", - "tokio-postgres", + "tokio-postgres 0.7.7", "tokio-rustls 0.26.0", "x509-certificate", ] @@ -7493,12 +7554,6 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "830b7e5d4d90034032940e4ace0d9a9a057e7a45cd94e6c007832e39edb82f6d" -[[package]] -name = "vcpkg" -version = "0.2.15" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "accd4ea62f7bb7a82fe23066fb0957d48ef677f6eeb8215f372f52e48bb32426" - [[package]] name = "version_check" version = "0.9.4" @@ -7518,7 +7573,7 @@ dependencies = [ "serde_json", "sysinfo", "tokio", - "tokio-postgres", + "tokio-postgres 0.7.7", "tokio-util", "tracing", "tracing-subscriber", diff --git a/storage_controller/Cargo.toml b/storage_controller/Cargo.toml index caaa22d0a5d7..9860bd5d0e44 100644 --- a/storage_controller/Cargo.toml +++ b/storage_controller/Cargo.toml @@ -45,12 +45,11 @@ strum_macros.workspace = true diesel = { version = "2.2.6", features = [ "serde_json", - "postgres", - "r2d2", "chrono", ] } +diesel-async = { version = "0.5.2", features = ["postgres", "bb8", "async-connection-wrapper"] } diesel_migrations = { version = "2.2.0" } -r2d2 = { version = "0.8.10" } +scoped-futures = "0.1.4" utils = { path = "../libs/utils/" } metrics = { path = "../libs/metrics/" } diff --git a/storage_controller/src/main.rs b/storage_controller/src/main.rs index 801409d61292..659c088d5143 100644 --- a/storage_controller/src/main.rs +++ b/storage_controller/src/main.rs @@ -308,7 +308,7 @@ async fn async_main() -> anyhow::Result<()> { // Validate that we can connect to the database Persistence::await_connection(&secrets.database_url, args.db_connect_timeout.into()).await?; - let persistence = Arc::new(Persistence::new(secrets.database_url)); + let persistence = Arc::new(Persistence::new(secrets.database_url).await); let service = Service::spawn(config, persistence.clone()).await?; diff --git a/storage_controller/src/persistence.rs b/storage_controller/src/persistence.rs index eb0bfc879e22..693142be2fce 100644 --- a/storage_controller/src/persistence.rs +++ b/storage_controller/src/persistence.rs @@ -5,9 +5,12 @@ use std::time::Duration; use std::time::Instant; use self::split_state::SplitState; -use diesel::pg::PgConnection; use diesel::prelude::*; -use diesel::Connection; +use diesel_async::async_connection_wrapper::AsyncConnectionWrapper; +use diesel_async::pooled_connection::bb8::Pool; +use diesel_async::pooled_connection::AsyncDieselConnectionManager; +use diesel_async::RunQueryDsl; +use diesel_async::{AsyncConnection, AsyncPgConnection}; use itertools::Itertools; use pageserver_api::controller_api::AvailabilityZone; use pageserver_api::controller_api::MetadataHealthRecord; @@ -20,6 +23,7 @@ use pageserver_api::shard::ShardConfigError; use pageserver_api::shard::ShardIdentity; use pageserver_api::shard::ShardStripeSize; use pageserver_api::shard::{ShardCount, ShardNumber, TenantShardId}; +use scoped_futures::ScopedBoxFuture; use serde::{Deserialize, Serialize}; use utils::generation::Generation; use utils::id::{NodeId, TenantId}; @@ -60,7 +64,7 @@ const MIGRATIONS: EmbeddedMigrations = embed_migrations!("./migrations"); /// updated, and reads of nodes are always from memory, not the database. We only require that /// we can UPDATE a node's scheduling mode reasonably quickly to mark a bad node offline. pub struct Persistence { - connection_pool: diesel::r2d2::Pool>, + connection_pool: Pool, } /// Legacy format, for use in JSON compat objects in test environment @@ -76,7 +80,7 @@ pub(crate) enum DatabaseError { #[error(transparent)] Connection(#[from] diesel::result::ConnectionError), #[error(transparent)] - ConnectionPool(#[from] r2d2::Error), + ConnectionPool(#[from] diesel_async::pooled_connection::bb8::RunError), #[error("Logical error: {0}")] Logical(String), #[error("Migration error: {0}")] @@ -124,6 +128,7 @@ pub(crate) enum AbortShardSplitStatus { pub(crate) type DatabaseResult = Result; /// Some methods can operate on either a whole tenant or a single shard +#[derive(Clone)] pub(crate) enum TenantFilter { Tenant(TenantId), Shard(TenantShardId), @@ -136,6 +141,11 @@ pub(crate) struct ShardGenerationState { pub(crate) generation_pageserver: Option, } +// A generous allowance for how many times we may retry serializable transactions +// before giving up. This is not expected to be hit: it is a defensive measure in case we +// somehow engineer a situation where duelling transactions might otherwise live-lock. +const MAX_RETRIES: usize = 128; + impl Persistence { // The default postgres connection limit is 100. We use up to 99, to leave one free for a human admin under // normal circumstances. This assumes we have exclusive use of the database cluster to which we connect. @@ -145,12 +155,12 @@ impl Persistence { const IDLE_CONNECTION_TIMEOUT: Duration = Duration::from_secs(10); const MAX_CONNECTION_LIFETIME: Duration = Duration::from_secs(60); - pub fn new(database_url: String) -> Self { - let manager = diesel::r2d2::ConnectionManager::::new(database_url); + pub async fn new(database_url: String) -> Self { + let manager = AsyncDieselConnectionManager::::new(database_url); // We will use a connection pool: this is primarily to _limit_ our connection count, rather than to optimize time // to execute queries (database queries are not generally on latency-sensitive paths). - let connection_pool = diesel::r2d2::Pool::builder() + let connection_pool = Pool::builder() .max_size(Self::MAX_CONNECTIONS) .max_lifetime(Some(Self::MAX_CONNECTION_LIFETIME)) .idle_timeout(Some(Self::IDLE_CONNECTION_TIMEOUT)) @@ -158,6 +168,7 @@ impl Persistence { .min_idle(Some(1)) .test_on_check_out(true) .build(manager) + .await .expect("Could not build connection pool"); Self { connection_pool } @@ -171,7 +182,7 @@ impl Persistence { ) -> Result<(), diesel::ConnectionError> { let started_at = Instant::now(); loop { - match PgConnection::establish(database_url) { + match AsyncPgConnection::establish(database_url).await { Ok(_) => { tracing::info!("Connected to database."); return Ok(()); @@ -192,20 +203,66 @@ impl Persistence { pub(crate) async fn migration_run(&self) -> DatabaseResult<()> { use diesel_migrations::{HarnessWithOutput, MigrationHarness}; - self.with_conn(move |conn| -> DatabaseResult<()> { - HarnessWithOutput::write_to_stdout(conn) - .run_pending_migrations(MIGRATIONS) - .map(|_| ()) - .map_err(|e| DatabaseError::Migration(e.to_string())) + // Can't use self.with_conn here as we do spawn_blocking which requires static. + let conn = self + .connection_pool + .dedicated_connection() + .await + .map_err(|e| DatabaseError::Migration(e.to_string()))?; + let mut async_wrapper: AsyncConnectionWrapper = + AsyncConnectionWrapper::from(conn); + tokio::task::spawn_blocking(move || { + let mut retry_count = 0; + loop { + let result = HarnessWithOutput::write_to_stdout(&mut async_wrapper) + .run_pending_migrations(MIGRATIONS) + .map(|_| ()) + .map_err(|e| DatabaseError::Migration(e.to_string())); + match result { + Ok(r) => break Ok(r), + Err( + err @ DatabaseError::Query(diesel::result::Error::DatabaseError( + diesel::result::DatabaseErrorKind::SerializationFailure, + _, + )), + ) => { + retry_count += 1; + if retry_count > MAX_RETRIES { + tracing::error!( + "Exceeded max retries on SerializationFailure errors: {err:?}" + ); + break Err(err); + } else { + // Retry on serialization errors: these are expected, because even though our + // transactions don't fight for the same rows, they will occasionally collide + // on index pages (e.g. increment_generation for unrelated shards can collide) + tracing::debug!( + "Retrying transaction on serialization failure {err:?}" + ); + continue; + } + } + Err(e) => break Err(e), + } + } }) .await + .map_err(|e| DatabaseError::Migration(e.to_string()))??; + Ok(()) } /// Wraps `with_conn` in order to collect latency and error metrics - async fn with_measured_conn(&self, op: DatabaseOperation, func: F) -> DatabaseResult + async fn with_measured_conn<'a, 'b, F, R>( + &self, + op: DatabaseOperation, + func: F, + ) -> DatabaseResult where - F: Fn(&mut PgConnection) -> DatabaseResult + Send + 'static, - R: Send + 'static, + F: for<'r> Fn(&'r mut AsyncPgConnection) -> ScopedBoxFuture<'b, 'r, DatabaseResult> + + Send + + std::marker::Sync + + 'a, + R: Send + 'b, { let latency = &METRICS_REGISTRY .metrics_group @@ -228,76 +285,74 @@ impl Persistence { } /// Call the provided function in a tokio blocking thread, with a Diesel database connection. - async fn with_conn(&self, func: F) -> DatabaseResult + async fn with_conn<'a, 'b, F, R>(&self, func: F) -> DatabaseResult where - F: Fn(&mut PgConnection) -> DatabaseResult + Send + 'static, - R: Send + 'static, + F: for<'r> Fn(&'r mut AsyncPgConnection) -> ScopedBoxFuture<'b, 'r, DatabaseResult> + + Send + + std::marker::Sync + + 'a, + R: Send + 'b, { - // A generous allowance for how many times we may retry serializable transactions - // before giving up. This is not expected to be hit: it is a defensive measure in case we - // somehow engineer a situation where duelling transactions might otherwise live-lock. - const MAX_RETRIES: usize = 128; - - let mut conn = self.connection_pool.get()?; - tokio::task::spawn_blocking(move || -> DatabaseResult { - let mut retry_count = 0; - loop { - match conn.build_transaction().serializable().run(|c| func(c)) { - Ok(r) => break Ok(r), - Err( - err @ DatabaseError::Query(diesel::result::Error::DatabaseError( - diesel::result::DatabaseErrorKind::SerializationFailure, - _, - )), - ) => { - retry_count += 1; - if retry_count > MAX_RETRIES { - tracing::error!( - "Exceeded max retries on SerializationFailure errors: {err:?}" - ); - break Err(err); - } else { - // Retry on serialization errors: these are expected, because even though our - // transactions don't fight for the same rows, they will occasionally collide - // on index pages (e.g. increment_generation for unrelated shards can collide) - tracing::debug!( - "Retrying transaction on serialization failure {err:?}" - ); - continue; - } + let mut conn = self.connection_pool.get().await?; + let mut retry_count = 0; + loop { + match conn + .build_transaction() + .serializable() + .run(|c| func(c)) + .await + { + Ok(r) => break Ok(r), + Err( + err @ DatabaseError::Query(diesel::result::Error::DatabaseError( + diesel::result::DatabaseErrorKind::SerializationFailure, + _, + )), + ) => { + retry_count += 1; + if retry_count > MAX_RETRIES { + tracing::error!( + "Exceeded max retries on SerializationFailure errors: {err:?}" + ); + break Err(err); + } else { + // Retry on serialization errors: these are expected, because even though our + // transactions don't fight for the same rows, they will occasionally collide + // on index pages (e.g. increment_generation for unrelated shards can collide) + tracing::debug!("Retrying transaction on serialization failure {err:?}"); + continue; } - Err(e) => break Err(e), } + Err(e) => break Err(e), } - }) - .await - .expect("Task panic") + } } /// When a node is first registered, persist it before using it for anything pub(crate) async fn insert_node(&self, node: &Node) -> DatabaseResult<()> { - let np = node.to_persistent(); - self.with_measured_conn( - DatabaseOperation::InsertNode, - move |conn| -> DatabaseResult<()> { + let np = &node.to_persistent(); + self.with_measured_conn(DatabaseOperation::InsertNode, move |conn| { + Box::pin(async move { diesel::insert_into(crate::schema::nodes::table) - .values(&np) - .execute(conn)?; + .values(np) + .execute(conn) + .await?; Ok(()) - }, - ) + }) + }) .await } /// At startup, populate the list of nodes which our shards may be placed on pub(crate) async fn list_nodes(&self) -> DatabaseResult> { let nodes: Vec = self - .with_measured_conn( - DatabaseOperation::ListNodes, - move |conn| -> DatabaseResult<_> { - Ok(crate::schema::nodes::table.load::(conn)?) - }, - ) + .with_measured_conn(DatabaseOperation::ListNodes, move |conn| { + Box::pin(async move { + Ok(crate::schema::nodes::table + .load::(conn) + .await?) + }) + }) .await?; tracing::info!("list_nodes: loaded {} nodes", nodes.len()); @@ -313,11 +368,14 @@ impl Persistence { use crate::schema::nodes::dsl::*; let updated = self .with_measured_conn(DatabaseOperation::UpdateNode, move |conn| { - let updated = diesel::update(nodes) - .filter(node_id.eq(input_node_id.0 as i64)) - .set((scheduling_policy.eq(String::from(input_scheduling)),)) - .execute(conn)?; - Ok(updated) + Box::pin(async move { + let updated = diesel::update(nodes) + .filter(node_id.eq(input_node_id.0 as i64)) + .set((scheduling_policy.eq(String::from(input_scheduling)),)) + .execute(conn) + .await?; + Ok(updated) + }) }) .await?; @@ -339,17 +397,16 @@ impl Persistence { &self, ) -> DatabaseResult> { use crate::schema::tenant_shards::dsl::*; - self.with_measured_conn( - DatabaseOperation::ListTenantShards, - move |conn| -> DatabaseResult<_> { + self.with_measured_conn(DatabaseOperation::ListTenantShards, move |conn| { + Box::pin(async move { let query = tenant_shards.filter( placement_policy.ne(serde_json::to_string(&PlacementPolicy::Detached).unwrap()), ); - let result = query.load::(conn)?; + let result = query.load::(conn).await?; Ok(result) - }, - ) + }) + }) .await } @@ -359,15 +416,14 @@ impl Persistence { filter_tenant_id: TenantId, ) -> DatabaseResult> { use crate::schema::tenant_shards::dsl::*; - self.with_measured_conn( - DatabaseOperation::LoadTenant, - move |conn| -> DatabaseResult<_> { + self.with_measured_conn(DatabaseOperation::LoadTenant, move |conn| { + Box::pin(async move { let query = tenant_shards.filter(tenant_id.eq(filter_tenant_id.to_string())); - let result = query.load::(conn)?; + let result = query.load::(conn).await?; Ok(result) - }, - ) + }) + }) .await } @@ -393,19 +449,22 @@ impl Persistence { }) .collect::>(); - self.with_measured_conn( - DatabaseOperation::InsertTenantShards, - move |conn| -> DatabaseResult<()> { + let shards = &shards; + let metadata_health_records = &metadata_health_records; + self.with_measured_conn(DatabaseOperation::InsertTenantShards, move |conn| { + Box::pin(async move { diesel::insert_into(tenant_shards::table) - .values(&shards) - .execute(conn)?; + .values(shards) + .execute(conn) + .await?; diesel::insert_into(metadata_health::table) - .values(&metadata_health_records) - .execute(conn)?; + .values(metadata_health_records) + .execute(conn) + .await?; Ok(()) - }, - ) + }) + }) .await } @@ -413,31 +472,31 @@ impl Persistence { /// the tenant from memory on this server. pub(crate) async fn delete_tenant(&self, del_tenant_id: TenantId) -> DatabaseResult<()> { use crate::schema::tenant_shards::dsl::*; - self.with_measured_conn( - DatabaseOperation::DeleteTenant, - move |conn| -> DatabaseResult<()> { + self.with_measured_conn(DatabaseOperation::DeleteTenant, move |conn| { + Box::pin(async move { // `metadata_health` status (if exists) is also deleted based on the cascade behavior. diesel::delete(tenant_shards) .filter(tenant_id.eq(del_tenant_id.to_string())) - .execute(conn)?; + .execute(conn) + .await?; Ok(()) - }, - ) + }) + }) .await } pub(crate) async fn delete_node(&self, del_node_id: NodeId) -> DatabaseResult<()> { use crate::schema::nodes::dsl::*; - self.with_measured_conn( - DatabaseOperation::DeleteNode, - move |conn| -> DatabaseResult<()> { + self.with_measured_conn(DatabaseOperation::DeleteNode, move |conn| { + Box::pin(async move { diesel::delete(nodes) .filter(node_id.eq(del_node_id.0 as i64)) - .execute(conn)?; + .execute(conn) + .await?; Ok(()) - }, - ) + }) + }) .await } @@ -454,34 +513,41 @@ impl Persistence { use crate::schema::tenant_shards::dsl::*; let updated = self .with_measured_conn(DatabaseOperation::ReAttach, move |conn| { - let rows_updated = diesel::update(tenant_shards) - .filter(generation_pageserver.eq(input_node_id.0 as i64)) - .set(generation.eq(generation + 1)) - .execute(conn)?; - - tracing::info!("Incremented {} tenants' generations", rows_updated); - - // TODO: UPDATE+SELECT in one query - - let updated = tenant_shards - .filter(generation_pageserver.eq(input_node_id.0 as i64)) - .select(TenantShardPersistence::as_select()) - .load(conn)?; - - // If the node went through a drain and restart phase before re-attaching, - // then reset it's node scheduling policy to active. - diesel::update(nodes) - .filter(node_id.eq(input_node_id.0 as i64)) - .filter( - scheduling_policy - .eq(String::from(NodeSchedulingPolicy::PauseForRestart)) - .or(scheduling_policy.eq(String::from(NodeSchedulingPolicy::Draining))) - .or(scheduling_policy.eq(String::from(NodeSchedulingPolicy::Filling))), - ) - .set(scheduling_policy.eq(String::from(NodeSchedulingPolicy::Active))) - .execute(conn)?; + Box::pin(async move { + let rows_updated = diesel::update(tenant_shards) + .filter(generation_pageserver.eq(input_node_id.0 as i64)) + .set(generation.eq(generation + 1)) + .execute(conn) + .await?; + + tracing::info!("Incremented {} tenants' generations", rows_updated); + + // TODO: UPDATE+SELECT in one query + + let updated = tenant_shards + .filter(generation_pageserver.eq(input_node_id.0 as i64)) + .select(TenantShardPersistence::as_select()) + .load(conn) + .await?; + + // If the node went through a drain and restart phase before re-attaching, + // then reset it's node scheduling policy to active. + diesel::update(nodes) + .filter(node_id.eq(input_node_id.0 as i64)) + .filter( + scheduling_policy + .eq(String::from(NodeSchedulingPolicy::PauseForRestart)) + .or(scheduling_policy + .eq(String::from(NodeSchedulingPolicy::Draining))) + .or(scheduling_policy + .eq(String::from(NodeSchedulingPolicy::Filling))), + ) + .set(scheduling_policy.eq(String::from(NodeSchedulingPolicy::Active))) + .execute(conn) + .await?; - Ok(updated) + Ok(updated) + }) }) .await?; @@ -518,19 +584,22 @@ impl Persistence { use crate::schema::tenant_shards::dsl::*; let updated = self .with_measured_conn(DatabaseOperation::IncrementGeneration, move |conn| { - let updated = diesel::update(tenant_shards) - .filter(tenant_id.eq(tenant_shard_id.tenant_id.to_string())) - .filter(shard_number.eq(tenant_shard_id.shard_number.0 as i32)) - .filter(shard_count.eq(tenant_shard_id.shard_count.literal() as i32)) - .set(( - generation.eq(generation + 1), - generation_pageserver.eq(node_id.0 as i64), - )) - // TODO: only returning() the generation column - .returning(TenantShardPersistence::as_returning()) - .get_result(conn)?; + Box::pin(async move { + let updated = diesel::update(tenant_shards) + .filter(tenant_id.eq(tenant_shard_id.tenant_id.to_string())) + .filter(shard_number.eq(tenant_shard_id.shard_number.0 as i32)) + .filter(shard_count.eq(tenant_shard_id.shard_count.literal() as i32)) + .set(( + generation.eq(generation + 1), + generation_pageserver.eq(node_id.0 as i64), + )) + // TODO: only returning() the generation column + .returning(TenantShardPersistence::as_returning()) + .get_result(conn) + .await?; - Ok(updated) + Ok(updated) + }) }) .await?; @@ -562,12 +631,15 @@ impl Persistence { use crate::schema::tenant_shards::dsl::*; let rows = self .with_measured_conn(DatabaseOperation::TenantGenerations, move |conn| { - let result = tenant_shards - .filter(tenant_id.eq(filter_tenant_id.to_string())) - .select(TenantShardPersistence::as_select()) - .order(shard_number) - .load(conn)?; - Ok(result) + Box::pin(async move { + let result = tenant_shards + .filter(tenant_id.eq(filter_tenant_id.to_string())) + .select(TenantShardPersistence::as_select()) + .order(shard_number) + .load(conn) + .await?; + Ok(result) + }) }) .await?; @@ -615,15 +687,18 @@ impl Persistence { break; } + let in_clause = &in_clause; let chunk_rows = self .with_measured_conn(DatabaseOperation::ShardGenerations, move |conn| { - // diesel doesn't support multi-column IN queries, so we compose raw SQL. No escaping is required because - // the inputs are strongly typed and cannot carry any user-supplied raw string content. - let result : Vec = diesel::sql_query( - format!("SELECT * from tenant_shards where (tenant_id, shard_number, shard_count) in ({in_clause});").as_str() - ).load(conn)?; - - Ok(result) + Box::pin(async move { + // diesel doesn't support multi-column IN queries, so we compose raw SQL. No escaping is required because + // the inputs are strongly typed and cannot carry any user-supplied raw string content. + let result : Vec = diesel::sql_query( + format!("SELECT * from tenant_shards where (tenant_id, shard_number, shard_count) in ({in_clause});").as_str() + ).load(conn).await?; + + Ok(result) + }) }) .await?; rows.extend(chunk_rows.into_iter()) @@ -657,51 +732,58 @@ impl Persistence { ) -> DatabaseResult<()> { use crate::schema::tenant_shards::dsl::*; + let tenant = &tenant; + let input_placement_policy = &input_placement_policy; + let input_config = &input_config; + let input_generation = &input_generation; + let input_scheduling_policy = &input_scheduling_policy; self.with_measured_conn(DatabaseOperation::UpdateTenantShard, move |conn| { - let query = match tenant { - TenantFilter::Shard(tenant_shard_id) => diesel::update(tenant_shards) - .filter(tenant_id.eq(tenant_shard_id.tenant_id.to_string())) - .filter(shard_number.eq(tenant_shard_id.shard_number.0 as i32)) - .filter(shard_count.eq(tenant_shard_id.shard_count.literal() as i32)) - .into_boxed(), - TenantFilter::Tenant(input_tenant_id) => diesel::update(tenant_shards) - .filter(tenant_id.eq(input_tenant_id.to_string())) - .into_boxed(), - }; + Box::pin(async move { + let query = match tenant { + TenantFilter::Shard(tenant_shard_id) => diesel::update(tenant_shards) + .filter(tenant_id.eq(tenant_shard_id.tenant_id.to_string())) + .filter(shard_number.eq(tenant_shard_id.shard_number.0 as i32)) + .filter(shard_count.eq(tenant_shard_id.shard_count.literal() as i32)) + .into_boxed(), + TenantFilter::Tenant(input_tenant_id) => diesel::update(tenant_shards) + .filter(tenant_id.eq(input_tenant_id.to_string())) + .into_boxed(), + }; - // Clear generation_pageserver if we are moving into a state where we won't have - // any attached pageservers. - let input_generation_pageserver = match input_placement_policy { - None | Some(PlacementPolicy::Attached(_)) => None, - Some(PlacementPolicy::Detached | PlacementPolicy::Secondary) => Some(None), - }; + // Clear generation_pageserver if we are moving into a state where we won't have + // any attached pageservers. + let input_generation_pageserver = match input_placement_policy { + None | Some(PlacementPolicy::Attached(_)) => None, + Some(PlacementPolicy::Detached | PlacementPolicy::Secondary) => Some(None), + }; - #[derive(AsChangeset)] - #[diesel(table_name = crate::schema::tenant_shards)] - struct ShardUpdate { - generation: Option, - placement_policy: Option, - config: Option, - scheduling_policy: Option, - generation_pageserver: Option>, - } + #[derive(AsChangeset)] + #[diesel(table_name = crate::schema::tenant_shards)] + struct ShardUpdate { + generation: Option, + placement_policy: Option, + config: Option, + scheduling_policy: Option, + generation_pageserver: Option>, + } - let update = ShardUpdate { - generation: input_generation.map(|g| g.into().unwrap() as i32), - placement_policy: input_placement_policy - .as_ref() - .map(|p| serde_json::to_string(&p).unwrap()), - config: input_config - .as_ref() - .map(|c| serde_json::to_string(&c).unwrap()), - scheduling_policy: input_scheduling_policy - .map(|p| serde_json::to_string(&p).unwrap()), - generation_pageserver: input_generation_pageserver, - }; + let update = ShardUpdate { + generation: input_generation.map(|g| g.into().unwrap() as i32), + placement_policy: input_placement_policy + .as_ref() + .map(|p| serde_json::to_string(&p).unwrap()), + config: input_config + .as_ref() + .map(|c| serde_json::to_string(&c).unwrap()), + scheduling_policy: input_scheduling_policy + .map(|p| serde_json::to_string(&p).unwrap()), + generation_pageserver: input_generation_pageserver, + }; - query.set(update).execute(conn)?; + query.set(update).execute(conn).await?; - Ok(()) + Ok(()) + }) }) .await?; @@ -715,23 +797,27 @@ impl Persistence { ) -> DatabaseResult)>> { use crate::schema::tenant_shards::dsl::*; + let preferred_azs = preferred_azs.as_slice(); self.with_measured_conn(DatabaseOperation::SetPreferredAzs, move |conn| { - let mut shards_updated = Vec::default(); - - for (tenant_shard_id, preferred_az) in preferred_azs.iter() { - let updated = diesel::update(tenant_shards) - .filter(tenant_id.eq(tenant_shard_id.tenant_id.to_string())) - .filter(shard_number.eq(tenant_shard_id.shard_number.0 as i32)) - .filter(shard_count.eq(tenant_shard_id.shard_count.literal() as i32)) - .set(preferred_az_id.eq(preferred_az.as_ref().map(|az| az.0.clone()))) - .execute(conn)?; - - if updated == 1 { - shards_updated.push((*tenant_shard_id, preferred_az.clone())); + Box::pin(async move { + let mut shards_updated = Vec::default(); + + for (tenant_shard_id, preferred_az) in preferred_azs.iter() { + let updated = diesel::update(tenant_shards) + .filter(tenant_id.eq(tenant_shard_id.tenant_id.to_string())) + .filter(shard_number.eq(tenant_shard_id.shard_number.0 as i32)) + .filter(shard_count.eq(tenant_shard_id.shard_count.literal() as i32)) + .set(preferred_az_id.eq(preferred_az.as_ref().map(|az| az.0.clone()))) + .execute(conn) + .await?; + + if updated == 1 { + shards_updated.push((*tenant_shard_id, preferred_az.clone())); + } } - } - Ok(shards_updated) + Ok(shards_updated) + }) }) .await } @@ -739,17 +825,21 @@ impl Persistence { pub(crate) async fn detach(&self, tenant_shard_id: TenantShardId) -> anyhow::Result<()> { use crate::schema::tenant_shards::dsl::*; self.with_measured_conn(DatabaseOperation::Detach, move |conn| { - let updated = diesel::update(tenant_shards) - .filter(tenant_id.eq(tenant_shard_id.tenant_id.to_string())) - .filter(shard_number.eq(tenant_shard_id.shard_number.0 as i32)) - .filter(shard_count.eq(tenant_shard_id.shard_count.literal() as i32)) - .set(( - generation_pageserver.eq(Option::::None), - placement_policy.eq(serde_json::to_string(&PlacementPolicy::Detached).unwrap()), - )) - .execute(conn)?; - - Ok(updated) + Box::pin(async move { + let updated = diesel::update(tenant_shards) + .filter(tenant_id.eq(tenant_shard_id.tenant_id.to_string())) + .filter(shard_number.eq(tenant_shard_id.shard_number.0 as i32)) + .filter(shard_count.eq(tenant_shard_id.shard_count.literal() as i32)) + .set(( + generation_pageserver.eq(Option::::None), + placement_policy + .eq(serde_json::to_string(&PlacementPolicy::Detached).unwrap()), + )) + .execute(conn) + .await?; + + Ok(updated) + }) }) .await?; @@ -768,14 +858,16 @@ impl Persistence { parent_to_children: Vec<(TenantShardId, Vec)>, ) -> DatabaseResult<()> { use crate::schema::tenant_shards::dsl::*; - self.with_measured_conn(DatabaseOperation::BeginShardSplit, move |conn| -> DatabaseResult<()> { + let parent_to_children = parent_to_children.as_slice(); + self.with_measured_conn(DatabaseOperation::BeginShardSplit, move |conn| { + Box::pin(async move { // Mark parent shards as splitting let updated = diesel::update(tenant_shards) .filter(tenant_id.eq(split_tenant_id.to_string())) .filter(shard_count.eq(old_shard_count.literal() as i32)) .set((splitting.eq(1),)) - .execute(conn)?; + .execute(conn).await?; if u8::try_from(updated) .map_err(|_| DatabaseError::Logical( format!("Overflow existing shard count {} while splitting", updated)) @@ -788,7 +880,7 @@ impl Persistence { } // FIXME: spurious clone to sidestep closure move rules - let parent_to_children = parent_to_children.clone(); + let parent_to_children = parent_to_children.to_vec(); // Insert child shards for (parent_shard_id, children) in parent_to_children { @@ -796,7 +888,7 @@ impl Persistence { .filter(tenant_id.eq(parent_shard_id.tenant_id.to_string())) .filter(shard_number.eq(parent_shard_id.shard_number.0 as i32)) .filter(shard_count.eq(parent_shard_id.shard_count.literal() as i32)) - .load::(conn)?; + .load::(conn).await?; let parent = if parent.len() != 1 { return Err(DatabaseError::Logical(format!( "Parent shard {parent_shard_id} not found" @@ -811,12 +903,13 @@ impl Persistence { debug_assert!(shard.splitting == SplitState::Splitting); diesel::insert_into(tenant_shards) .values(shard) - .execute(conn)?; + .execute(conn).await?; } } Ok(()) }) + }) .await } @@ -828,25 +921,26 @@ impl Persistence { old_shard_count: ShardCount, ) -> DatabaseResult<()> { use crate::schema::tenant_shards::dsl::*; - self.with_measured_conn( - DatabaseOperation::CompleteShardSplit, - move |conn| -> DatabaseResult<()> { + self.with_measured_conn(DatabaseOperation::CompleteShardSplit, move |conn| { + Box::pin(async move { // Drop parent shards diesel::delete(tenant_shards) .filter(tenant_id.eq(split_tenant_id.to_string())) .filter(shard_count.eq(old_shard_count.literal() as i32)) - .execute(conn)?; + .execute(conn) + .await?; // Clear sharding flag let updated = diesel::update(tenant_shards) .filter(tenant_id.eq(split_tenant_id.to_string())) .set((splitting.eq(0),)) - .execute(conn)?; + .execute(conn) + .await?; debug_assert!(updated > 0); Ok(()) - }, - ) + }) + }) .await } @@ -858,15 +952,15 @@ impl Persistence { new_shard_count: ShardCount, ) -> DatabaseResult { use crate::schema::tenant_shards::dsl::*; - self.with_measured_conn( - DatabaseOperation::AbortShardSplit, - move |conn| -> DatabaseResult { + self.with_measured_conn(DatabaseOperation::AbortShardSplit, move |conn| { + Box::pin(async move { // Clear the splitting state on parent shards let updated = diesel::update(tenant_shards) .filter(tenant_id.eq(split_tenant_id.to_string())) .filter(shard_count.ne(new_shard_count.literal() as i32)) .set((splitting.eq(0),)) - .execute(conn)?; + .execute(conn) + .await?; // Parent shards are already gone: we cannot abort. if updated == 0 { @@ -886,11 +980,12 @@ impl Persistence { diesel::delete(tenant_shards) .filter(tenant_id.eq(split_tenant_id.to_string())) .filter(shard_count.eq(new_shard_count.literal() as i32)) - .execute(conn)?; + .execute(conn) + .await?; Ok(AbortShardSplitStatus::Aborted) - }, - ) + }) + }) .await } @@ -906,25 +1001,28 @@ impl Persistence { ) -> DatabaseResult<()> { use crate::schema::metadata_health::dsl::*; - self.with_measured_conn( - DatabaseOperation::UpdateMetadataHealth, - move |conn| -> DatabaseResult<_> { + let healthy_records = healthy_records.as_slice(); + let unhealthy_records = unhealthy_records.as_slice(); + self.with_measured_conn(DatabaseOperation::UpdateMetadataHealth, move |conn| { + Box::pin(async move { diesel::insert_into(metadata_health) - .values(&healthy_records) + .values(healthy_records) .on_conflict((tenant_id, shard_number, shard_count)) .do_update() .set((healthy.eq(true), last_scrubbed_at.eq(now))) - .execute(conn)?; + .execute(conn) + .await?; diesel::insert_into(metadata_health) - .values(&unhealthy_records) + .values(unhealthy_records) .on_conflict((tenant_id, shard_number, shard_count)) .do_update() .set((healthy.eq(false), last_scrubbed_at.eq(now))) - .execute(conn)?; + .execute(conn) + .await?; Ok(()) - }, - ) + }) + }) .await } @@ -933,15 +1031,13 @@ impl Persistence { pub(crate) async fn list_metadata_health_records( &self, ) -> DatabaseResult> { - self.with_measured_conn( - DatabaseOperation::ListMetadataHealth, - move |conn| -> DatabaseResult<_> { - Ok( - crate::schema::metadata_health::table - .load::(conn)?, - ) - }, - ) + self.with_measured_conn(DatabaseOperation::ListMetadataHealth, move |conn| { + Box::pin(async { + Ok(crate::schema::metadata_health::table + .load::(conn) + .await?) + }) + }) .await } @@ -953,10 +1049,15 @@ impl Persistence { use crate::schema::metadata_health::dsl::*; self.with_measured_conn( DatabaseOperation::ListMetadataHealthUnhealthy, - move |conn| -> DatabaseResult<_> { - Ok(crate::schema::metadata_health::table - .filter(healthy.eq(false)) - .load::(conn)?) + move |conn| { + Box::pin(async { + DatabaseResult::Ok( + crate::schema::metadata_health::table + .filter(healthy.eq(false)) + .load::(conn) + .await?, + ) + }) }, ) .await @@ -970,15 +1071,14 @@ impl Persistence { ) -> DatabaseResult> { use crate::schema::metadata_health::dsl::*; - self.with_measured_conn( - DatabaseOperation::ListMetadataHealthOutdated, - move |conn| -> DatabaseResult<_> { + self.with_measured_conn(DatabaseOperation::ListMetadataHealthOutdated, move |conn| { + Box::pin(async move { let query = metadata_health.filter(last_scrubbed_at.lt(earlier)); - let res = query.load::(conn)?; + let res = query.load::(conn).await?; Ok(res) - }, - ) + }) + }) .await } @@ -986,12 +1086,13 @@ impl Persistence { /// It is an error for the table to contain more than one entry. pub(crate) async fn get_leader(&self) -> DatabaseResult> { let mut leader: Vec = self - .with_measured_conn( - DatabaseOperation::GetLeader, - move |conn| -> DatabaseResult<_> { - Ok(crate::schema::controllers::table.load::(conn)?) - }, - ) + .with_measured_conn(DatabaseOperation::GetLeader, move |conn| { + Box::pin(async move { + Ok(crate::schema::controllers::table + .load::(conn) + .await?) + }) + }) .await?; if leader.len() > 1 { @@ -1014,26 +1115,33 @@ impl Persistence { use crate::schema::controllers::dsl::*; let updated = self - .with_measured_conn( - DatabaseOperation::UpdateLeader, - move |conn| -> DatabaseResult { + .with_measured_conn(DatabaseOperation::UpdateLeader, move |conn| { + let prev = prev.clone(); + let new = new.clone(); + Box::pin(async move { let updated = match &prev { - Some(prev) => diesel::update(controllers) - .filter(address.eq(prev.address.clone())) - .filter(started_at.eq(prev.started_at)) - .set(( - address.eq(new.address.clone()), - started_at.eq(new.started_at), - )) - .execute(conn)?, - None => diesel::insert_into(controllers) - .values(new.clone()) - .execute(conn)?, + Some(prev) => { + diesel::update(controllers) + .filter(address.eq(prev.address.clone())) + .filter(started_at.eq(prev.started_at)) + .set(( + address.eq(new.address.clone()), + started_at.eq(new.started_at), + )) + .execute(conn) + .await? + } + None => { + diesel::insert_into(controllers) + .values(new.clone()) + .execute(conn) + .await? + } }; Ok(updated) - }, - ) + }) + }) .await?; if updated == 0 { @@ -1048,12 +1156,13 @@ impl Persistence { /// At startup, populate the list of nodes which our shards may be placed on pub(crate) async fn list_safekeepers(&self) -> DatabaseResult> { let safekeepers: Vec = self - .with_measured_conn( - DatabaseOperation::ListNodes, - move |conn| -> DatabaseResult<_> { - Ok(crate::schema::safekeepers::table.load::(conn)?) - }, - ) + .with_measured_conn(DatabaseOperation::ListNodes, move |conn| { + Box::pin(async move { + Ok(crate::schema::safekeepers::table + .load::(conn) + .await?) + }) + }) .await?; tracing::info!("list_safekeepers: loaded {} nodes", safekeepers.len()); @@ -1066,11 +1175,14 @@ impl Persistence { id: i64, ) -> Result { use crate::schema::safekeepers::dsl::{id as id_column, safekeepers}; - self.with_conn(move |conn| -> DatabaseResult { - Ok(safekeepers - .filter(id_column.eq(&id)) - .select(SafekeeperPersistence::as_select()) - .get_result(conn)?) + self.with_conn(move |conn| { + Box::pin(async move { + Ok(safekeepers + .filter(id_column.eq(&id)) + .select(SafekeeperPersistence::as_select()) + .get_result(conn) + .await?) + }) }) .await } @@ -1081,26 +1193,30 @@ impl Persistence { ) -> Result<(), DatabaseError> { use crate::schema::safekeepers::dsl::*; - self.with_conn(move |conn| -> DatabaseResult<()> { - let bind = record - .as_insert_or_update() - .map_err(|e| DatabaseError::Logical(format!("{e}")))?; - - let inserted_updated = diesel::insert_into(safekeepers) - .values(&bind) - .on_conflict(id) - .do_update() - .set(&bind) - .execute(conn)?; - - if inserted_updated != 1 { - return Err(DatabaseError::Logical(format!( - "unexpected number of rows ({})", - inserted_updated - ))); - } + self.with_conn(move |conn| { + let record = record.clone(); + Box::pin(async move { + let bind = record + .as_insert_or_update() + .map_err(|e| DatabaseError::Logical(format!("{e}")))?; - Ok(()) + let inserted_updated = diesel::insert_into(safekeepers) + .values(&bind) + .on_conflict(id) + .do_update() + .set(&bind) + .execute(conn) + .await?; + + if inserted_updated != 1 { + return Err(DatabaseError::Logical(format!( + "unexpected number of rows ({})", + inserted_updated + ))); + } + + Ok(()) + }) }) .await } From a661209fb2e652ae41cc2578dbe5413a064bfee4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arpad=20M=C3=BCller?= Date: Mon, 13 Jan 2025 02:21:20 +0100 Subject: [PATCH 2/2] Use r2d2 pooler --- Cargo.lock | 34 ++++++++++++++++---------- storage_controller/Cargo.toml | 2 +- storage_controller/src/persistence.rs | 35 +++++++++++++++------------ 3 files changed, 41 insertions(+), 30 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 9e3339411fdd..c4863a10495f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -942,18 +942,6 @@ version = "1.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8c3c1a368f70d6cf7302d78f8f7093da241fb8e8807c05cc9e51a125895a6d5b" -[[package]] -name = "bb8" -version = "0.8.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d89aabfae550a5c44b43ab941844ffcd2e993cb6900b342debf59e9ea74acdb8" -dependencies = [ - "async-trait", - "futures-util", - "parking_lot 0.12.1", - "tokio", -] - [[package]] name = "bcder" version = "0.7.4" @@ -1797,6 +1785,7 @@ dependencies = [ "chrono", "diesel_derives", "itoa", + "r2d2", "serde_json", ] @@ -1807,7 +1796,6 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "51a307ac00f7c23f526a04a77761a0519b9f0eb2838ebf5b905a58580095bdcb" dependencies = [ "async-trait", - "bb8", "diesel", "futures-util", "scoped-futures", @@ -4971,6 +4959,17 @@ dependencies = [ "proc-macro2", ] +[[package]] +name = "r2d2" +version = "0.8.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "51de85fb3fb6524929c8a2eb85e6b6d363de4e8c48f9e2c2eac4944abc181c93" +dependencies = [ + "log", + "parking_lot 0.12.1", + "scheduled-thread-pool", +] + [[package]] name = "rand" version = "0.7.3" @@ -5779,6 +5778,15 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "scheduled-thread-pool" +version = "0.2.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3cbc66816425a074528352f5789333ecff06ca41b36b0b0efdfbb29edc391a19" +dependencies = [ + "parking_lot 0.12.1", +] + [[package]] name = "scoped-futures" version = "0.1.4" diff --git a/storage_controller/Cargo.toml b/storage_controller/Cargo.toml index 9860bd5d0e44..d4c427658427 100644 --- a/storage_controller/Cargo.toml +++ b/storage_controller/Cargo.toml @@ -47,7 +47,7 @@ diesel = { version = "2.2.6", features = [ "serde_json", "chrono", ] } -diesel-async = { version = "0.5.2", features = ["postgres", "bb8", "async-connection-wrapper"] } +diesel-async = { version = "0.5.2", features = ["postgres", "r2d2", "async-connection-wrapper"] } diesel_migrations = { version = "2.2.0" } scoped-futures = "0.1.4" diff --git a/storage_controller/src/persistence.rs b/storage_controller/src/persistence.rs index 693142be2fce..f78058397e24 100644 --- a/storage_controller/src/persistence.rs +++ b/storage_controller/src/persistence.rs @@ -1,14 +1,13 @@ pub(crate) mod split_state; use std::collections::HashMap; use std::str::FromStr; +use std::sync::Arc; use std::time::Duration; use std::time::Instant; use self::split_state::SplitState; use diesel::prelude::*; use diesel_async::async_connection_wrapper::AsyncConnectionWrapper; -use diesel_async::pooled_connection::bb8::Pool; -use diesel_async::pooled_connection::AsyncDieselConnectionManager; use diesel_async::RunQueryDsl; use diesel_async::{AsyncConnection, AsyncPgConnection}; use itertools::Itertools; @@ -64,7 +63,11 @@ const MIGRATIONS: EmbeddedMigrations = embed_migrations!("./migrations"); /// updated, and reads of nodes are always from memory, not the database. We only require that /// we can UPDATE a node's scheduling mode reasonably quickly to mark a bad node offline. pub struct Persistence { - connection_pool: Pool, + connection_pool: Arc< + diesel::r2d2::Pool< + diesel::r2d2::ConnectionManager>, + >, + >, } /// Legacy format, for use in JSON compat objects in test environment @@ -80,7 +83,7 @@ pub(crate) enum DatabaseError { #[error(transparent)] Connection(#[from] diesel::result::ConnectionError), #[error(transparent)] - ConnectionPool(#[from] diesel_async::pooled_connection::bb8::RunError), + ConnectionPool(#[from] diesel::r2d2::PoolError), #[error("Logical error: {0}")] Logical(String), #[error("Migration error: {0}")] @@ -156,11 +159,14 @@ impl Persistence { const MAX_CONNECTION_LIFETIME: Duration = Duration::from_secs(60); pub async fn new(database_url: String) -> Self { - let manager = AsyncDieselConnectionManager::::new(database_url); + let manager = + diesel::r2d2::ConnectionManager::>::new( + database_url, + ); // We will use a connection pool: this is primarily to _limit_ our connection count, rather than to optimize time // to execute queries (database queries are not generally on latency-sensitive paths). - let connection_pool = Pool::builder() + let connection_pool = diesel::r2d2::Pool::builder() .max_size(Self::MAX_CONNECTIONS) .max_lifetime(Some(Self::MAX_CONNECTION_LIFETIME)) .idle_timeout(Some(Self::IDLE_CONNECTION_TIMEOUT)) @@ -168,9 +174,10 @@ impl Persistence { .min_idle(Some(1)) .test_on_check_out(true) .build(manager) - .await .expect("Could not build connection pool"); + let connection_pool = Arc::new(connection_pool); + Self { connection_pool } } @@ -204,17 +211,12 @@ impl Persistence { use diesel_migrations::{HarnessWithOutput, MigrationHarness}; // Can't use self.with_conn here as we do spawn_blocking which requires static. - let conn = self - .connection_pool - .dedicated_connection() - .await - .map_err(|e| DatabaseError::Migration(e.to_string()))?; - let mut async_wrapper: AsyncConnectionWrapper = - AsyncConnectionWrapper::from(conn); + let connection_pool = self.connection_pool.clone(); tokio::task::spawn_blocking(move || { + let mut conn = connection_pool.get()?; let mut retry_count = 0; loop { - let result = HarnessWithOutput::write_to_stdout(&mut async_wrapper) + let result = HarnessWithOutput::write_to_stdout(&mut conn) .run_pending_migrations(MIGRATIONS) .map(|_| ()) .map_err(|e| DatabaseError::Migration(e.to_string())); @@ -293,7 +295,8 @@ impl Persistence { + 'a, R: Send + 'b, { - let mut conn = self.connection_pool.get().await?; + let connection_pool = self.connection_pool.clone(); + let mut conn = tokio::task::spawn_blocking(move || connection_pool.get()).await.unwrap()?; let mut retry_count = 0; loop { match conn