From 639971a2281cd20b640005f159f5f49999a78a36 Mon Sep 17 00:00:00 2001 From: Matt Mastracci Date: Mon, 22 Jul 2024 17:10:30 -0600 Subject: [PATCH] Finalize --- edb/server/conn_pool/README.md | 18 +++ edb/server/conn_pool/src/block.rs | 214 ++++++++++++++++++---------- edb/server/conn_pool/src/conn.rs | 90 +++++++++--- edb/server/conn_pool/src/metrics.rs | 14 +- edb/server/conn_pool/src/pool.rs | 1 + 5 files changed, 241 insertions(+), 96 deletions(-) diff --git a/edb/server/conn_pool/README.md b/edb/server/conn_pool/README.md index 37784975495a..7a4d095a69d6 100644 --- a/edb/server/conn_pool/README.md +++ b/edb/server/conn_pool/README.md @@ -110,3 +110,21 @@ more connections are waiting on this block, we return the connection to the block to be re-used immediately. If no connections are waiting but the block is hungry, we return it. If the block is satisfied or overfull and we have hungry blocks waiting, we transfer it to a hungry block that has waiters. + +## Error Handling + +The pool will attempt to provide a connection where possible, but connection +operations may not always be reliable. The error for a connection failure will +be routed through the acquire operation if the pool detects there are no other +potential sources for a connection for the acquire. Sources for a connection may +be a currently-connecting connection, a reconnecting connection, a connection +that is actively held by someone else or a connection that is sitting idle. + +If an acquire operation is queued for a connection, and a block's last +connection becomes invalid due to an error, and the operation is next in line, +that operation will fail, returning the error as the failure reason. The +remainder of the connections that are queued will continue to await a connection +and, if the error is persistent, will eventually return errors as well. + +The pool does not currently retry, and retry logic should be included in the +connect operation. diff --git a/edb/server/conn_pool/src/block.rs b/edb/server/conn_pool/src/block.rs index 95db5e37bc96..59ef0be2a646 100644 --- a/edb/server/conn_pool/src/block.rs +++ b/edb/server/conn_pool/src/block.rs @@ -102,7 +102,6 @@ impl std::borrow::Borrow for Name { pub struct Block { pub db_name: Name, conns: Conns, - pending_errors: RefCell>>, state: Rc, /// Associated data for this block useful for statistics, quotas or other /// information. This is provided by the algorithm in this crate. @@ -123,7 +122,6 @@ impl Block { conns: Conns::default(), state, data: Default::default(), - pending_errors: Default::default(), } } @@ -154,19 +152,26 @@ impl Block { "Should not have any closed connections" ); - let pending_errors = self.pending_errors.borrow().len(); + // We should never have more pending errors than the number of + // waiters - the number of "live" connections that can eventually + // provide a connection (or potentially another error) + let pending_errors = self.conns.pending_error_len(); let waiting = self.state.metrics.count(MetricVariant::Waiting); + let live = self.state.metrics.sum_all(MetricVariant::live()); + if self.conns.is_empty() && waiting == 0 { + assert_eq!( + pending_errors, 0, + "Pending errors must be empty if there are no connections and no errors" + ) + } assert!( - pending_errors == 0 || !self.state.metrics.has(MetricVariant::live()), - "Either pending_errors must be empty or conns must be empty! {} {:?}", - pending_errors, - self.state.metrics.counts() - ); - assert!( - pending_errors <= waiting, - "Pending errors must be less than or equal to number of waiters: {} > {}", + // Note the inequality is changed to avoid usize underflow + pending_errors == 0 || (pending_errors + live <= waiting), + "Pending errors must be less than or equal to number of waiters - live: {} > {} - {} ({:?})", pending_errors, - waiting + waiting, + live, + self.state.metrics ); assert_eq!( self.len(), @@ -200,7 +205,6 @@ impl Block { consistency_check!(self); let conn = Conn::new(connector.connect(&self.db_name), &self.state.metrics); self.conns.insert(conn.clone()); - self.pending_errors.borrow_mut().clear(); conn }; async move { @@ -223,8 +227,8 @@ impl Block { self: Rc, connector: &C, ) -> impl Future, C::Error>> { - if let Some(conn) = self.conns.try_acquire_idle_mru(&self.state.metrics) { - return Either::Left(ready(Ok(self.conn(conn)))); + if let Some(res) = self.conns.try_acquire_idle_mru(&self.state.metrics) { + return Either::Left(ready(res.map(|c| self.conn(c)))); } Either::Right(self.create(connector)) } @@ -233,13 +237,9 @@ impl Block { fn queue(self: Rc) -> impl Future, C::Error>> { // If someone else is waiting, we have to queue, even if there's a connection if self.state.waiters.is_empty() { - if let Some(conn) = self.conns.try_acquire_idle_mru(&self.state.metrics) { - trace!("Got a connection"); - return Either::Left(ready(Ok(self.conn(conn)))); - } - if let Some(err) = self.pending_errors.borrow_mut().pop() { - trace!("Got an error"); - return Either::Left(ready(Err(err))); + consistency_check!(self); + if let Some(res) = self.conns.try_acquire_idle_mru(&self.state.metrics) { + return Either::Left(ready(res.map(|c| self.conn(c)))); } } // Update the metrics now before we actually queue @@ -254,19 +254,18 @@ impl Block { .remove_time(MetricVariant::Waiting, now.elapsed()); }); Either::Right(async move { - consistency_check!(self); loop { - if self.conns.is_empty() { - if let Some(err) = self.pending_errors.borrow_mut().pop() { - trace!("Got an error"); - drop(guard); - return Err(err); - } - } - if let Some(conn) = self.conns.try_acquire_idle_mru(&self.state.metrics) { - trace!("Got a connection"); + consistency_check!(self); + if let Some(res) = self.conns.try_acquire_idle_mru(&self.state.metrics) { + // Important note: we _must_ pop an error here because every + // error must have a corresponding waiter, and if there is + // one, the waiter that we're satisfying was supporting the + // existance of an error. This does not happen in the "fast + // case" if we can acquire a connection without waiting. + self.conns.pop_error(); + drop(guard); - return Ok(self.conn(conn)); + return res.map(|c| self.conn(c)); } trace!("Queueing for a connection"); self.state.waiters.queue().await; @@ -283,7 +282,6 @@ impl Block { consistency_check!(self); let conn = Conn::new(connector.connect(&self.db_name), &self.state.metrics); self.conns.insert(conn.clone()); - self.pending_errors.borrow_mut().clear(); conn }; self.poll_to_idle(conn) @@ -329,7 +327,6 @@ impl Block { .try_take_idle_lru() .expect("Could not acquire a connection"); to.conns.insert(conn.clone()); - to.pending_errors.borrow_mut().clear(); conn.transfer( connector, &from.state.metrics, @@ -356,7 +353,6 @@ impl Block { let conn = conn.into_inner(); from.conns.remove(&conn); to.conns.insert(conn.clone()); - to.pending_errors.borrow_mut().clear(); conn.transfer( connector, &from.state.metrics, @@ -410,49 +406,40 @@ impl Block { } async fn poll_to_idle(self: Rc, conn: Conn) -> ConnResult<(), C::Error> { - consistency_check!(self); - let mut res = - poll_fn(|cx| conn.poll_ready(cx, &self.state.metrics, MetricVariant::Idle)).await; - if let Err(err) = res { - // If we have an error, we need to decide where the error goes... - // Note that we may try to route a connection to a waiter, but that - // waiter may end up grabbing a returned connection. - - let mut route_to_waiter = false; - - if !self.state.waiters.is_empty() { - // If waiters may be serviced by active or idle connections, - // we'll return the error here. If there are too few - // connections, one will eventually get created and we'll still - // make forward progress. - if !self.state.metrics.has(MetricVariant::live()) { - let waiters = self.state.metrics.count(MetricVariant::Waiting); - let failed = self.pending_errors.borrow().len(); - - // If there are less errors than waiters, we can route the - // error to a waiter. - if waiters > failed { - route_to_waiter = true; + let res = poll_fn(|cx| conn.poll_ready(cx, &self.state.metrics, MetricVariant::Idle)).await; + { + let waiters = self.state.waiters.len(); + let live = self.state.metrics.sum_all(MetricVariant::live()); + + if let Err(err) = &res { + self.conns.remove(&conn); + conn.untrack(&self.state.metrics); + + // If we have an error, we need to decide whether to keep it or not. + // The formula we use is `pending_errors < waiters - live`. We will + // only wake waiters if there are no live connections left. + + let mut pending_errors = self.conns.pending_error_len(); + + if !self.state.waiters.is_empty() { + if pending_errors + live < waiters { + self.conns.push_error(&err); + pending_errors += 1 } } - } - - self.conns.remove(&conn); - conn.untrack(&self.state.metrics); - if route_to_waiter { - res = Err(ConnError::TaskFailed); - self.pending_errors.borrow_mut().push(err); - self.state.waiters.trigger(); + if live == 0 { + for _ in 0..waiters.min(pending_errors) { + self.state.waiters.trigger(); + } + } } else { - res = Err(err); + // No error, wake a waiter (if we have one). Trim the + self.conns.trim_errors_to(waiters.saturating_sub(live)); + self.state.waiters.trigger(); } - } else { - // No error, wake a waiter (if we have one) - self.pending_errors.borrow_mut().clear(); - self.state.waiters.trigger(); + res } - res } } @@ -932,6 +919,25 @@ mod tests { Ok(()) } + /// This test shows that we + #[test(tokio::test)] + async fn test_block_fails_queue_multiple_2() -> Result<()> { + let connector = BasicConnector::no_delay(); + let block = Rc::new(Block::::new(Name::from("db"), None)); + let queue1 = block.clone().queue(); + let queue2 = block.clone().queue(); + let local = LocalSet::new(); + + connector.fail_next_connect(); + local.spawn_local(block.clone().task_create(&connector)); + connector.fail_next_connect(); + local.spawn_local(block.clone().task_create(&connector)); + local.await; + queue1.await.expect_err("Expected this to fail"); + queue2.await.expect_err("Expected this to fail"); + Ok(()) + } + #[test(tokio::test)] async fn test_steal() -> Result<()> { let connector = BasicConnector::no_delay(); @@ -1008,6 +1014,35 @@ mod tests { Ok(()) } + #[test(tokio::test)] + async fn test_move_fail() -> Result<()> { + let connector = BasicConnector::no_delay(); + let blocks = Blocks::<_, ()>::default(); + assert_eq!(0, blocks.block_count()); + blocks.create(&connector, "db").await?; + blocks.create(&connector, "db").await?; + blocks.create(&connector, "db").await?; + assert_block!(blocks "db" has 3 Idle); + let conn1 = blocks.queue("db").await?; + let conn2 = blocks.queue("db").await?; + blocks.metrics("db").reset_max(); + blocks.metrics("db2").reset_max(); + connector.fail_next_connect(); + let queue = blocks.queue("db2"); + blocks + .task_move_to(&connector, conn1, "db2") + .await + .expect_err("Expected this to fail"); + blocks.task_move_to(&connector, conn2, "db2").await?; + queue.await?; + assert_eq!(2, blocks.block_count()); + assert_block!(blocks "db" has 1 Idle); + assert_block!(blocks "db2" has 1 Idle); + assert_block!(blocks "db" has max 2 Active, 1 Idle); + assert_block!(blocks "db2" has max 1 Active, 1 Connecting, 1 Idle, 1 Failed, 1 Waiting); + Ok(()) + } + #[test(tokio::test)] async fn test_close() -> Result<()> { let connector = BasicConnector::no_delay(); @@ -1045,6 +1080,39 @@ mod tests { Ok(()) } + #[test(tokio::test)] + async fn test_reopen_error() -> Result<()> { + let connector = BasicConnector::no_delay(); + let blocks = Blocks::<_, ()>::default(); + assert_eq!(0, blocks.block_count()); + let local = LocalSet::new(); + + // Success + let conn = blocks.create(&connector, "db").await?; + // This one gets the error + let acq1 = blocks.queue("db"); + // This one doesn't + let acq2 = local.spawn_local(blocks.queue("db")); + + assert_block!(blocks "db" has 1 Active, 2 Waiting); + + connector.fail_next_connect(); + blocks + .task_reopen(&connector, conn) + .await + .expect_err("Expected a failure"); + acq1.await.expect_err("Expected a failure"); + + local.spawn_local(blocks.task_create_one(&connector, "db")); + local.await; + _ = acq2.await?; + + assert_block!(blocks "db" has 1 Idle); + assert_block!(blocks "db" has all time + 3 Connecting, 1 Disconnecting, 2 Idle, 2 Active, 1 Failed, 1 Closed, 2 Waiting); + Ok(()) + } + #[test(tokio::test)] async fn test_reopen_fails() -> Result<()> { let connector = BasicConnector::no_delay(); diff --git a/edb/server/conn_pool/src/conn.rs b/edb/server/conn_pool/src/conn.rs index 9bafb73ef549..8fd99af8a0e3 100644 --- a/edb/server/conn_pool/src/conn.rs +++ b/edb/server/conn_pool/src/conn.rs @@ -14,7 +14,7 @@ use std::{ task::{ready, Poll}, time::Duration, }; -use tracing::error; +use tracing::{error, trace}; pub struct ConnState { pub db_name: Name, @@ -22,8 +22,8 @@ pub struct ConnState { pub metrics: Rc, } -#[derive(Debug, thiserror::Error)] -pub enum ConnError { +#[derive(Debug, Clone, thiserror::Error)] +pub enum ConnError { #[error("Shutdown")] Shutdown, #[error("Underlying")] @@ -39,7 +39,9 @@ pub type ConnResult = Result>; pub trait Connector: std::fmt::Debug + 'static { /// The type of connection associated with this [`Connector`]. type Conn; - type Error: Into>; + /// The type of error returned from this [`Connector`]. The error must be + /// `Clone`able as it may be returned through multiple channels. + type Error: Into> + Clone; /// Perform a connect operation to the given database. fn connect( @@ -371,36 +373,67 @@ impl Drop for ConnHandle { pub struct Conns { conns: RefCell>>, youngest: Cell, + pending_errors: RefCell>>, } impl Default for Conns { fn default() -> Self { Self { - conns: Default::default(), + conns: vec![].into(), youngest: Cell::new(Instant::now()), + pending_errors: vec![].into(), } } } impl Conns { + #[inline] pub fn len(&self) -> usize { self.conns.borrow().len() } + #[inline] + pub fn is_empty(&self) -> bool { + self.conns.borrow().is_empty() + } + + #[inline] + pub fn pending_error_len(&self) -> usize { + self.pending_errors.borrow().len() + } + + #[inline] + pub fn trim_errors_to(&self, live: usize) { + self.pending_errors.borrow_mut().truncate(live) + } + + #[inline] pub fn youngest(&self) -> Duration { self.youngest.get().elapsed() } + #[inline] pub fn walk(&self, mut f: impl FnMut(&Conn)) { for conn in self.conns.borrow().iter() { f(conn) } } + #[inline] + pub fn pop_error(&self) -> Option> { + self.pending_errors.borrow_mut().pop() + } + + #[inline] + pub fn push_error(&self, error: &ConnError) { + self.pending_errors.borrow_mut().push(error.clone()) + } + /// Insert a new connection, in the MRU spot. pub fn insert(&self, conn: Conn) { self.conns.borrow_mut().push(conn); self.youngest.set(Instant::now()); + self.pending_errors.borrow_mut().clear(); } /// Remove a specific connection from the list. This may break MRU ordering @@ -418,21 +451,38 @@ impl Conns { } /// Acquires the most-recently-used idle connection, moving it to the end of - /// the internal vector. - pub fn try_acquire_idle_mru(&self, metrics: &MetricsAccum) -> Option> { + /// the internal vector. If no connections are available, polls a pending error. + pub fn try_acquire_idle_mru( + &self, + metrics: &MetricsAccum, + ) -> Option, C::Error>> { let mut lock = self.conns.borrow_mut(); - let pos = lock - .iter() - .rev() - .position(|conn| conn.variant() == MetricVariant::Idle)?; - let last_item = lock.len() - 1; - let pos = last_item - pos; - lock.swap(last_item, pos); - let conn = lock[last_item].clone(); - if !conn.try_lock(metrics) { - panic!("Connection unexpectedly could not be locked") + if lock.is_empty() { + if let Some(err) = self.pending_errors.borrow_mut().pop() { + trace!("No connections, got an error"); + Some(Err(err)) + } else { + None + } + } else { + if let Some(pos) = lock + .iter() + .rev() + .position(|conn| conn.variant() == MetricVariant::Idle) + { + let last_item = lock.len() - 1; + let pos = last_item - pos; + lock.swap(last_item, pos); + let conn = lock[last_item].clone(); + if !conn.try_lock(metrics) { + panic!("Connection unexpectedly could not be locked") + } + trace!("Got a connection"); + Some(Ok(conn)) + } else { + None + } } - Some(conn) } /// Gets the least-recently-used idle connection, does not re-order the @@ -459,8 +509,4 @@ impl Conns { }; Some(conn) } - - pub fn is_empty(&self) -> bool { - self.conns.borrow().is_empty() - } } diff --git a/edb/server/conn_pool/src/metrics.rs b/edb/server/conn_pool/src/metrics.rs index 3c1e43f9674d..ec64914a5bd3 100644 --- a/edb/server/conn_pool/src/metrics.rs +++ b/edb/server/conn_pool/src/metrics.rs @@ -20,6 +20,7 @@ pub enum MetricVariant { } impl MetricVariant { + /// Variants that can provide a connection. pub const fn live() -> &'static [MetricVariant] { &[ MetricVariant::Connecting, @@ -293,9 +294,20 @@ impl MetricsAccum { self.raw.borrow().counts[variant] } + /// Sums the values of all the given variants. + #[inline(always)] + pub fn sum_all(&self, variants: &[MetricVariant]) -> usize { + let mut sum = 0; + let lock = self.raw.borrow(); + for variant in variants { + sum += lock.counts[*variant]; + } + sum + } + /// Returns true if there is a non-zero count for any of the variants. #[inline(always)] - pub fn has(&self, variants: &[MetricVariant]) -> bool { + pub fn has_any(&self, variants: &[MetricVariant]) -> bool { let lock = self.raw.borrow(); for variant in variants { if lock.counts[*variant] > 0 { diff --git a/edb/server/conn_pool/src/pool.rs b/edb/server/conn_pool/src/pool.rs index 2ba58297ce57..95101e79db52 100644 --- a/edb/server/conn_pool/src/pool.rs +++ b/edb/server/conn_pool/src/pool.rs @@ -507,6 +507,7 @@ mod tests { #[test(tokio::test(flavor = "current_thread", start_paused = true))] #[rstest] #[case::small(1)] + #[case::medium(10)] #[case::large(20)] async fn test_pool_failures(#[case] databases: usize) -> Result<()> { let spec = Spec {