diff --git a/edb/server/conn_pool/README.md b/edb/server/conn_pool/README.md index 37784975495..941f478bed4 100644 --- a/edb/server/conn_pool/README.md +++ b/edb/server/conn_pool/README.md @@ -110,3 +110,15 @@ more connections are waiting on this block, we return the connection to the block to be re-used immediately. If no connections are waiting but the block is hungry, we return it. If the block is satisfied or overfull and we have hungry blocks waiting, we transfer it to a hungry block that has waiters. + +## Error Handling + +The pool will attempt to provide a connection where possible, but connection +operations may not always be reliable. The error for a connection failure will +be routed through the acquire operation if the pool detects there are no other +potential sources for a connection for the acquire. Sources for a connection may +be a currently-connecting connection, a reconnecting connection, a connection +that is actively held by someone else or a connection that is sitting idle. + +The pool does not currently retry, and retry logic should be included in the +connect operation. diff --git a/edb/server/conn_pool/src/algo.rs b/edb/server/conn_pool/src/algo.rs index a06a5d31022..ba99311dc59 100644 --- a/edb/server/conn_pool/src/algo.rs +++ b/edb/server/conn_pool/src/algo.rs @@ -538,6 +538,20 @@ impl PoolConstraints { ); } + /// Plan a shutdown. + pub fn plan_shutdown(&self, it: &impl VisitPoolAlgoData) -> Vec { + let mut ops = vec![]; + it.with_all(|name, block| { + let idle = block.count(MetricVariant::Idle); + let failed = block.count(MetricVariant::Failed); + + for _ in 0..(idle + failed) { + ops.push(RebalanceOp::Close(name.clone())); + } + }); + ops + } + /// Plan a rebalance to better match the target quotas of the blocks in the /// pool. pub fn plan_rebalance(&self, it: &impl VisitPoolAlgoData) -> Vec { diff --git a/edb/server/conn_pool/src/block.rs b/edb/server/conn_pool/src/block.rs index e32f83ce28a..6aac43354d6 100644 --- a/edb/server/conn_pool/src/block.rs +++ b/edb/server/conn_pool/src/block.rs @@ -140,6 +140,13 @@ impl Block { #[track_caller] pub fn check_consistency(&self) { if cfg!(debug_assertions) { + // These should never be non-zero during the consistency check + assert_eq!( + self.state.metrics.count(MetricVariant::Closed), + 0, + "Should not have any closed connections" + ); + assert_eq!( self.len(), self.conns.len(), @@ -164,7 +171,10 @@ impl Block { /// Creates a connection from this block. #[cfg(test)] - fn create(self: Rc, connector: &C) -> impl Future>> { + fn create( + self: Rc, + connector: &C, + ) -> impl Future, C::Error>> { let conn = { consistency_check!(self); let conn = Conn::new(connector.connect(&self.db_name), &self.state.metrics); @@ -173,8 +183,14 @@ impl Block { }; async move { consistency_check!(self); - poll_fn(|cx| conn.poll_ready(cx, &self.state.metrics, MetricVariant::Active)).await?; - Ok(self.conn(conn)) + let res = + poll_fn(|cx| conn.poll_ready(cx, &self.state.metrics, MetricVariant::Active)).await; + if res.is_ok() { + Ok(self.conn(conn)) + } else { + let res = self.conns.remove(conn, &self.state.metrics); + Err(res.err().unwrap()) + } } } @@ -183,22 +199,21 @@ impl Block { fn create_if_needed( self: Rc, connector: &C, - ) -> impl Future>> { - if let Some(conn) = self.conns.try_acquire_idle_mru(&self.state.metrics) { - return Either::Left(ready(Ok(self.conn(conn)))); + ) -> impl Future, C::Error>> { + use futures::future::Either; + use std::future::ready; + if let Some(res) = self.conns.try_acquire_idle_mru(&self.state.metrics) { + return Either::Left(ready(res.map(|c| self.conn(c)))); } Either::Right(self.create(connector)) } /// Awaits a connection from this block. - fn queue(self: Rc) -> impl Future>> { - // If someone else is waiting, we have to queue, even if there's a connection - if self.state.waiters.len() == 0 { - if let Some(conn) = self.conns.try_acquire_idle_mru(&self.state.metrics) { - trace!("Got a connection"); - return Either::Left(ready(Ok(self.conn(conn)))); - } - } + fn queue(self: Rc) -> impl Future, C::Error>> { + // Note that we cannot skip the waiting queue because this makes it + // difficult to test and manage metrics -- if we skipped the queue then + // there would never be an apparent waiter. + // Update the metrics now before we actually queue self.state.waiters.lock(); self.state.metrics.insert(MetricVariant::Waiting); @@ -210,34 +225,31 @@ impl Block { .metrics .remove_time(MetricVariant::Waiting, now.elapsed()); }); - Either::Right(async move { - consistency_check!(self); + async move { loop { - if let Some(conn) = self.conns.try_acquire_idle_mru(&self.state.metrics) { - trace!("Got a connection"); + consistency_check!(self); + if let Some(res) = self.conns.try_acquire_idle_mru(&self.state.metrics) { drop(guard); - return Ok(self.conn(conn)); + return res.map(|c| self.conn(c)); } trace!("Queueing for a connection"); self.state.waiters.queue().await; } - }) + } } /// Creates a connection from this block. - fn task_create(self: Rc, connector: &C) -> impl Future> { + fn task_create( + self: Rc, + connector: &C, + ) -> impl Future> { let conn = { consistency_check!(self); let conn = Conn::new(connector.connect(&self.db_name), &self.state.metrics); self.conns.insert(conn.clone()); conn }; - async move { - consistency_check!(self); - poll_fn(|cx| conn.poll_ready(cx, &self.state.metrics, MetricVariant::Idle)).await?; - self.state.waiters.trigger(); - Ok(()) - } + self.poll_to_idle(conn) } /// Close one of idle connections in this block @@ -245,23 +257,26 @@ impl Block { /// ## Panics /// /// If there are no idle connections, this function will panic. - fn task_close_one(self: Rc, connector: &C) -> impl Future> { + fn task_close_one( + self: Rc, + connector: &C, + ) -> impl Future> { let conn = { consistency_check!(self); let conn = self .conns - .try_get_idle_lru() + .try_get_idle(&self.state.metrics, false) .expect("Could not acquire a connection"); + let conn = match conn { + Ok(conn) => conn, + Err(err) => { + return Either::Left(ready(Err(err))); + } + }; conn.close(connector, &self.state.metrics); conn }; - async move { - consistency_check!(self); - poll_fn(|cx| conn.poll_ready(cx, &self.state.metrics, MetricVariant::Closed)).await?; - self.conns.remove(&conn); - conn.untrack(&self.state.metrics); - Ok(()) - } + Either::Right(self.poll_to_close(conn)) } /// Steals a connection from one block to another. @@ -273,31 +288,20 @@ impl Block { from: Rc, to: Rc, connector: &C, - ) -> impl Future> { + ) -> impl Future> { let conn = { consistency_check!(from); consistency_check!(to); let conn = from .conns - .try_take_idle_lru() + .try_take_idle_lru(&from.state.metrics) .expect("Could not acquire a connection"); + let conn = Conn::new(connector.reconnect(conn, &to.db_name), &to.state.metrics); to.conns.insert(conn.clone()); - conn.transfer( - connector, - &from.state.metrics, - &to.state.metrics, - &to.db_name, - ); conn }; - async move { - consistency_check!(from); - consistency_check!(to); - poll_fn(|cx| conn.poll_ready(cx, &to.state.metrics, MetricVariant::Idle)).await?; - to.state.waiters.trigger(); - Ok(()) - } + to.poll_to_idle(conn) } /// Moves a connection to a different block than it was acquired from @@ -307,29 +311,22 @@ impl Block { to: Rc, conn: ConnHandle, connector: &C, - ) -> impl Future> { + ) -> impl Future> { let conn = { consistency_check!(from); consistency_check!(to); let conn = conn.into_inner(); - from.conns.remove(&conn); + let conn = from + .conns + .remove(conn, &from.state.metrics) + .unwrap() + .unwrap(); + let conn = Conn::new(connector.reconnect(conn, &to.db_name), &to.state.metrics); to.conns.insert(conn.clone()); - conn.transfer( - connector, - &from.state.metrics, - &to.state.metrics, - &to.db_name, - ); conn }; - async move { - consistency_check!(from); - consistency_check!(to); - poll_fn(|cx| conn.poll_ready(cx, &to.state.metrics, MetricVariant::Idle)).await?; - to.state.waiters.trigger(); - Ok(()) - } + to.poll_to_idle(conn) } /// Marks a connection as requiring re-open. @@ -337,19 +334,14 @@ impl Block { self: Rc, conn: ConnHandle, connector: &C, - ) -> impl Future> { + ) -> impl Future> { let conn = { consistency_check!(self); let conn = conn.into_inner(); conn.reopen(connector, &self.state.metrics, &self.db_name); conn }; - async move { - consistency_check!(self); - poll_fn(|cx| conn.poll_ready(cx, &self.state.metrics, MetricVariant::Idle)).await?; - self.state.waiters.trigger(); - Ok(()) - } + self.poll_to_idle(conn) } /// Marks a connection as requiring a discard. @@ -357,20 +349,35 @@ impl Block { self: Rc, conn: ConnHandle, connector: &C, - ) -> impl Future> { + ) -> impl Future> { let conn = { consistency_check!(self); let conn = conn.into_inner(); conn.discard(connector, &self.state.metrics); conn }; - async move { - consistency_check!(self); - poll_fn(|cx| conn.poll_ready(cx, &self.state.metrics, MetricVariant::Closed)).await?; - self.conns.remove(&conn); - conn.untrack(&self.state.metrics); - Ok(()) - } + self.poll_to_close(conn) + } + + async fn poll_to_close(self: Rc, conn: Conn) -> ConnResult<(), C::Error> { + consistency_check!(self); + // If the close task fails, we route the error to the task's result + // as there will never be a waiter who will pick it up + let res = + poll_fn(|cx| conn.poll_ready(cx, &self.state.metrics, MetricVariant::Closed)).await; + let res2 = self.conns.remove(conn, &self.state.metrics); + debug_assert_eq!(res.is_ok(), res2.is_ok()); + let conn = res2?; + debug_assert!(conn.is_none()); + Ok(()) + } + + /// Once a connection has completed the `Connecting` phase, we will route + /// the results to a waiter, assuming there is one. + async fn poll_to_idle(self: Rc, conn: Conn) -> ConnResult<(), C::Error> { + let res = poll_fn(|cx| conn.poll_ready(cx, &self.state.metrics, MetricVariant::Idle)).await; + self.state.waiters.trigger(); + res.map_err(|_| ConnError::TaskFailed) } } @@ -532,7 +539,8 @@ impl Blocks { assert_eq!( total, self.metrics.total(), - "Blocks failed consistency check. Total connection count was wrong." + "Blocks failed consistency check. Total connection count was wrong. {:?}", + self.metrics ); } } @@ -595,7 +603,7 @@ impl Blocks { &self, connector: &C, db: &str, - ) -> impl Future>> { + ) -> impl Future, C::Error>> { consistency_check!(self); let block = self.block(db); block.create(connector) @@ -608,28 +616,36 @@ impl Blocks { &self, connector: &C, db: &str, - ) -> impl Future>> { + ) -> impl Future, C::Error>> { consistency_check!(self); let block = self.block(db); block.create_if_needed(connector) } /// Queue for a connection. - pub fn queue(&self, db: &str) -> impl Future>> { + pub fn queue(&self, db: &str) -> impl Future, C::Error>> { consistency_check!(self); let block = self.block(db); block.queue() } /// Creates one connection in a block. - pub fn task_create_one(&self, connector: &C, db: &str) -> impl Future> { + pub fn task_create_one( + &self, + connector: &C, + db: &str, + ) -> impl Future> { consistency_check!(self); let block = self.block(db); block.task_create(connector) } /// Closes one connection in a block. - pub fn task_close_one(&self, connector: &C, db: &str) -> impl Future> { + pub fn task_close_one( + &self, + connector: &C, + db: &str, + ) -> impl Future> { consistency_check!(self); let block = self.block(db); block.task_close_one(connector) @@ -641,7 +657,7 @@ impl Blocks { connector: &C, db: &str, from: &str, - ) -> impl Future> { + ) -> impl Future> { let from_block = self.block(from); let to_block = self.block(db); Block::task_reconnect(from_block, to_block, connector) @@ -654,7 +670,7 @@ impl Blocks { connector: &C, conn: ConnHandle, db: &str, - ) -> impl Future> { + ) -> impl Future> { let from_block = self.block(&conn.state.db_name); let to_block = self.block(db); Block::task_reconnect_conn(from_block, to_block, conn, connector) @@ -665,7 +681,7 @@ impl Blocks { &self, connector: &C, conn: ConnHandle, - ) -> impl Future> { + ) -> impl Future> { let block = self.block(&conn.state.db_name); block.task_discard(conn, connector) } @@ -675,7 +691,7 @@ impl Blocks { &self, connector: &C, conn: ConnHandle, - ) -> impl Future> { + ) -> impl Future> { let block = self.block(&conn.state.db_name); block.task_reopen(conn, connector) } @@ -708,11 +724,25 @@ mod tests { ($block:ident $db:literal is empty) => { assert_eq!($block.metrics($db).summary().value, VariantArray::default(), stringify!(Expected block is empty)); }; - ($block:ident $db:literal has $count:literal $type:ident) => { + ($block:ident $db:literal has $($count:literal $type:ident),+) => { assert_eq!( $block.metrics($db).summary().value, - VariantArray::with(MetricVariant::$type, $count), - stringify!(Expected block has $count $type) + [$(VariantArray::with(MetricVariant::$type, $count)),+].into_iter().sum(), + stringify!(Expected block $db has $($count $type),+) + ); + }; + ($block:ident $db:literal has max $($count:literal $type:ident),+) => { + assert_eq!( + $block.metrics($db).summary().max, + [$(VariantArray::with(MetricVariant::$type, $count)),+].into_iter().sum(), + stringify!(Expected block $db has max $($count $type),+) + ); + }; + ($block:ident $db:literal has all time $($count:literal $type:ident),+) => { + assert_eq!( + $block.metrics($db).summary().all_time, + [$(VariantArray::with(MetricVariant::$type, $count)),+].into_iter().sum(), + stringify!(Expected block $db has all time $($count $type),+) ); }; } @@ -784,6 +814,69 @@ mod tests { Ok(()) } + #[test(tokio::test)] + async fn test_block_fails_connect() -> Result<()> { + let connector = BasicConnector::no_delay(); + let block = Rc::new(Block::::new(Name::from("db"), None)); + connector.fail_next_connect(); + block + .clone() + .create(&connector) + .await + .expect_err("Expected this to fail"); + Ok(()) + } + + #[test(tokio::test)] + async fn test_block_fails_queue() -> Result<()> { + let connector = BasicConnector::no_delay(); + let block = Rc::new(Block::::new(Name::from("db"), None)); + connector.fail_next_connect(); + let queue = block.clone().queue(); + let local = LocalSet::new(); + local.spawn_local(block.clone().task_create(&connector)); + local.await; + queue + .await + .expect_err("Expected this queueing operation to fail"); + Ok(()) + } + + #[test(tokio::test)] + async fn test_block_fails_queue_multiple() -> Result<()> { + let connector = BasicConnector::no_delay(); + let block = Rc::new(Block::::new(Name::from("db"), None)); + connector.fail_next_connect(); + let queue = block.clone().queue(); + let local = LocalSet::new(); + // The first will fail, but the second one will provide a valid + // connection to the waiter. + local.spawn_local(block.clone().task_create(&connector)); + local.spawn_local(block.clone().task_create(&connector)); + local.await; + queue.await?; + Ok(()) + } + + /// This test shows that we + #[test(tokio::test)] + async fn test_block_fails_queue_multiple_2() -> Result<()> { + let connector = BasicConnector::no_delay(); + let block = Rc::new(Block::::new(Name::from("db"), None)); + let queue1 = block.clone().queue(); + let queue2 = block.clone().queue(); + let local = LocalSet::new(); + + connector.fail_next_connect(); + local.spawn_local(block.clone().task_create(&connector)); + connector.fail_next_connect(); + local.spawn_local(block.clone().task_create(&connector)); + local.await; + queue1.await.expect_err("Expected this to fail"); + queue2.await.expect_err("Expected this to fail"); + Ok(()) + } + #[test(tokio::test)] async fn test_steal() -> Result<()> { let connector = BasicConnector::no_delay(); @@ -805,8 +898,34 @@ mod tests { assert_block!(blocks "db" is empty); assert_block!(blocks "db2" has 3 Idle); // Should not activate a connection to steal it - assert_eq!(0, blocks.metrics("db").max(MetricVariant::Active)); - assert_eq!(0, blocks.metrics("db2").max(MetricVariant::Active)); + assert_block!(blocks "db" has max 0 Active, 3 Idle); + assert_block!(blocks "db2" has max 0 Active, 1 Connecting, 3 Idle); + Ok(()) + } + + #[test(tokio::test)] + async fn test_steal_fails() -> Result<()> { + let connector = BasicConnector::no_delay(); + let blocks = Blocks::<_, ()>::default(); + assert_eq!(0, blocks.block_count()); + blocks.create(&connector, "db").await?; + blocks.create(&connector, "db").await?; + blocks.metrics("db").reset_max(); + connector.fail_next_connect(); + blocks + .task_steal(&connector, "db2", "db") + .await + .expect_err("Expected to fail"); + assert_block!(blocks "db" has 1 Idle); + assert_block!(blocks "db2" has 1 Failed); + let queue = blocks.queue("db2"); + assert_block!(blocks "db2" has 1 Waiting, 1 Failed); + connector.fail_next_connect(); + blocks + .task_steal(&connector, "db2", "db") + .await + .expect_err("Expected to fail"); + queue.await.expect_err("Expected this to fail"); Ok(()) } @@ -829,8 +948,37 @@ mod tests { assert_block!(blocks "db" has 2 Idle); assert_block!(blocks "db2" has 1 Idle); // Should not activate a connection to move it - assert_eq!(1, blocks.metrics("db").max(MetricVariant::Active)); - assert_eq!(0, blocks.metrics("db2").max(MetricVariant::Active)); + assert_block!(blocks "db" has max 1 Active, 3 Idle, 1 Waiting); + assert_block!(blocks "db2" has max 0 Active, 1 Connecting, 1 Idle); + Ok(()) + } + + #[test(tokio::test)] + async fn test_move_fail() -> Result<()> { + let connector = BasicConnector::no_delay(); + let blocks = Blocks::<_, ()>::default(); + assert_eq!(0, blocks.block_count()); + blocks.create(&connector, "db").await?; + blocks.create(&connector, "db").await?; + blocks.create(&connector, "db").await?; + assert_block!(blocks "db" has 3 Idle); + let conn1 = blocks.queue("db").await?; + let conn2 = blocks.queue("db").await?; + blocks.metrics("db").reset_max(); + blocks.metrics("db2").reset_max(); + connector.fail_next_connect(); + let queue = blocks.queue("db2"); + blocks + .task_move_to(&connector, conn1, "db2") + .await + .expect_err("Expected this to fail"); + blocks.task_move_to(&connector, conn2, "db2").await?; + queue.await?; + assert_eq!(2, blocks.block_count()); + assert_block!(blocks "db" has 1 Idle); + assert_block!(blocks "db2" has 1 Idle, 1 Failed); + assert_block!(blocks "db" has max 2 Active, 1 Idle); + assert_block!(blocks "db2" has max 1 Active, 1 Connecting, 1 Idle, 1 Failed, 1 Waiting); Ok(()) } @@ -845,12 +993,17 @@ mod tests { assert_eq!(1, blocks.block_count()); assert_block!(blocks "db" has 2 Idle); blocks.task_close_one(&connector, "db").await?; - blocks.task_close_one(&connector, "db").await?; + // This will only fail the task + connector.fail_next_disconnect(); + blocks + .task_close_one(&connector, "db") + .await + .expect_err("Expected to fail"); assert_block!(blocks "db" is empty); // Hasn't GC'd yet assert_eq!(1, blocks.block_count()); // Should not activate a connection to close it - assert_eq!(0, blocks.metrics("db").max(MetricVariant::Active)); + assert_block!(blocks "db" has max 0 Active, 1 Disconnecting, 2 Idle, 1 Failed, 1 Closed); Ok(()) } @@ -862,14 +1015,57 @@ mod tests { let conn = blocks.create(&connector, "db").await?; blocks.task_reopen(&connector, conn).await?; assert_block!(blocks "db" has 1 Idle); - assert_eq!( - blocks.metrics("db").all_time()[MetricVariant::Connecting], - 2 - ); - assert_eq!( - blocks.metrics("db").all_time()[MetricVariant::Disconnecting], - 1 - ); + assert_block!(blocks "db" has all time 2 Connecting, 1 Disconnecting, 1 Idle, 1 Active, 1 Closed); + Ok(()) + } + + #[test(tokio::test)] + async fn test_reopen_fails() -> Result<()> { + let connector = BasicConnector::no_delay(); + let blocks = Blocks::<_, ()>::default(); + assert_eq!(0, blocks.block_count()); + let local = LocalSet::new(); + + // Success + let conn = blocks.create(&connector, "db").await?; + // This one gets the error + let acq1 = blocks.queue("db"); + // This one doesn't + let acq2 = local.spawn_local(blocks.queue("db")); + + assert_block!(blocks "db" has 1 Active, 2 Waiting); + + connector.fail_next_connect(); + blocks + .task_reopen(&connector, conn) + .await + .expect_err("Expected a failure"); + acq1.await.expect_err("Expected a failure"); + + local.spawn_local(blocks.task_create_one(&connector, "db")); + local.await; + _ = acq2.await?; + + assert_block!(blocks "db" has 1 Idle); + assert_block!(blocks "db" has all time + 3 Connecting, 1 Disconnecting, 2 Idle, 2 Active, 1 Failed, 1 Closed, 2 Waiting); + Ok(()) + } + + #[test(tokio::test)] + async fn test_reopen_fails_2() -> Result<()> { + let connector = BasicConnector::no_delay(); + let blocks = Blocks::<_, ()>::default(); + assert_eq!(0, blocks.block_count()); + let conn = blocks.create(&connector, "db").await?; + assert_block!(blocks "db" has all time 1 Connecting, 1 Active); + connector.fail_next_connect(); + blocks + .task_reopen(&connector, conn) + .await + .expect_err("Expected a failure"); + assert_block!(blocks "db" has 1 Failed); + assert_block!(blocks "db" has all time 2 Connecting, 1 Disconnecting, 1 Failed, 1 Active, 1 Closed); Ok(()) } @@ -881,14 +1077,7 @@ mod tests { let conn = blocks.create(&connector, "db").await?; blocks.task_discard(&connector, conn).await?; assert_block!(blocks "db" is empty); - assert_eq!( - blocks.metrics("db").all_time()[MetricVariant::Connecting], - 1 - ); - assert_eq!( - blocks.metrics("db").all_time()[MetricVariant::Disconnecting], - 1 - ); + assert_block!(blocks "db" has all time 1 Connecting, 1 Disconnecting, 1 Active, 1 Closed); Ok(()) } } diff --git a/edb/server/conn_pool/src/conn.rs b/edb/server/conn_pool/src/conn.rs index d9041b1ee00..c115a1cbd4a 100644 --- a/edb/server/conn_pool/src/conn.rs +++ b/edb/server/conn_pool/src/conn.rs @@ -14,7 +14,7 @@ use std::{ task::{ready, Poll}, time::Duration, }; -use tracing::error; +use tracing::{error, trace}; pub struct ConnState { pub db_name: Name, @@ -22,32 +22,45 @@ pub struct ConnState { pub metrics: Rc, } -#[derive(Debug, thiserror::Error)] -pub enum ConnError { +#[derive(Debug, Clone, thiserror::Error)] +pub enum ConnError { #[error("Shutdown")] Shutdown, + #[error("Underlying")] + Underlying(E), + #[error("Task failed, but error was re-routed")] + TaskFailed, #[error("{0}")] Other(Cow<'static, str>), } -pub type ConnResult = Result; +pub type ConnResult = Result>; pub trait Connector: std::fmt::Debug + 'static { /// The type of connection associated with this [`Connector`]. type Conn; + /// The type of error returned from this [`Connector`]. The error must be + /// `Clone`able as it may be returned through multiple channels. + type Error: Into> + Clone + std::fmt::Debug; /// Perform a connect operation to the given database. - fn connect(&self, db: &str) -> impl Future> + 'static; + fn connect( + &self, + db: &str, + ) -> impl Future> + 'static; /// Perform a graceful reconnect operation from the existing connection to a new database. fn reconnect( &self, conn: Self::Conn, db: &str, - ) -> impl Future> + 'static; + ) -> impl Future> + 'static; /// Perform a graceful disconnect operation on the given connection. - fn disconnect(&self, conn: Self::Conn) -> impl Future> + 'static; + fn disconnect( + &self, + conn: Self::Conn, + ) -> impl Future> + 'static; } #[derive(Debug)] @@ -73,7 +86,7 @@ impl Clone for Conn { impl Conn { pub fn new( - f: impl Future> + 'static, + f: impl Future> + 'static, metrics: &MetricsAccum, ) -> Self { metrics.insert(MetricVariant::Connecting); @@ -85,6 +98,19 @@ impl Conn { } } + pub fn new_transfer( + f: impl Future> + 'static, + metrics: &MetricsAccum, + ) -> Self { + metrics.insert(MetricVariant::Reconnecting); + Self { + inner: Rc::new(RefCell::new(ConnInner::Reconnecting( + Instant::now(), + f.boxed_local(), + ))), + } + } + #[inline(always)] pub fn with_handle(&self, f: impl Fn(&C::Conn) -> T) -> Option { match &*self.inner.borrow() { @@ -130,20 +156,6 @@ impl Conn { }); } - pub fn transfer(&self, connector: &C, from: &MetricsAccum, to: &MetricsAccum, db: &str) { - self.untrack(from); - self.transition(|inner| match inner { - ConnInner::Idle(_t, conn, ..) | ConnInner::Active(_t, conn, ..) => { - from.inc_all_time(MetricVariant::Disconnecting); - from.inc_all_time(MetricVariant::Closed); - to.insert(MetricVariant::Reconnecting); - let f = connector.reconnect(conn, db).boxed_local(); - ConnInner::Reconnecting(Instant::now(), f) - } - _ => unreachable!(), - }); - } - pub fn reopen(&self, connector: &C, metrics: &MetricsAccum, db: &str) { self.transition(|inner| match inner { ConnInner::Active(t, conn) => { @@ -161,45 +173,48 @@ impl Conn { }); } + /// Polls a connection that is in a pollable state. + /// + /// ## Panics + /// + /// This function panics if the connection is not pollable. pub fn poll_ready( &self, cx: &mut std::task::Context, metrics: &MetricsAccum, to: MetricVariant, - ) -> Poll> { + ) -> Poll> { let mut lock = self.inner.borrow_mut(); debug_assert!( to == MetricVariant::Active || to == MetricVariant::Idle || to == MetricVariant::Closed ); let res = match &mut *lock { - ConnInner::Idle(..) => Ok(()), - ConnInner::Connecting(t, f) | ConnInner::Reconnecting(t, f) => { - match ready!(f.poll_unpin(cx)) { - Ok(c) => { - let elapsed = t.elapsed(); - let from = (&std::mem::replace(&mut *lock, ConnInner::Transition)).into(); - metrics.transition(from, to, elapsed); - if to == MetricVariant::Active { - *lock = ConnInner::Active(Instant::now(), c); - } else if to == MetricVariant::Idle { - *lock = ConnInner::Idle(Instant::now(), c); - } else { - unreachable!() - } - Ok(()) - } - Err(err) => { - metrics.transition( - MetricVariant::Connecting, - MetricVariant::Failed, - t.elapsed(), - ); - *lock = ConnInner::Failed; - Err(err) + ConnInner::Connecting(t, f) | ConnInner::Reconnecting(t, f) => match ready!(f.poll_unpin(cx)) { + Ok(c) => { + let elapsed = t.elapsed(); + debug_assert!(to == MetricVariant::Active || to == MetricVariant::Idle); + let from = (&std::mem::replace(&mut *lock, ConnInner::Transition)).into(); + metrics.transition(from, to, elapsed); + if to == MetricVariant::Active { + *lock = ConnInner::Active(Instant::now(), c); + } else { + *lock = ConnInner::Idle(Instant::now(), c); } + Ok(()) } - } + Err(err) => { + let elapsed = t.elapsed(); + let from = (&std::mem::replace(&mut *lock, ConnInner::Transition)).into(); + metrics.transition( + from, + MetricVariant::Failed, + elapsed, + ); + *lock = ConnInner::Failed(err); + Err(ConnError::TaskFailed) + } + }, ConnInner::Disconnecting(t, f) => match ready!(f.poll_unpin(cx)) { Ok(_) => { debug_assert_eq!(to, MetricVariant::Closed); @@ -213,11 +228,10 @@ impl Conn { MetricVariant::Failed, t.elapsed(), ); - *lock = ConnInner::Failed; - Err(err) + *lock = ConnInner::Failed(err); + Err(ConnError::TaskFailed) } }, - ConnInner::Failed => Err(ConnError::Other("Failed".into())), _ => unreachable!(), }; Poll::Ready(res) @@ -242,14 +256,18 @@ impl Conn { (&*self.inner.borrow()).into() } - pub fn untrack(&self, metrics: &MetricsAccum) { + fn untrack(&self, metrics: &MetricsAccum) { match &*self.inner.borrow() { - ConnInner::Active(t, _) - | ConnInner::Idle(t, _) - | ConnInner::Connecting(t, _) - | ConnInner::Disconnecting(t, _) - | ConnInner::Reconnecting(t, _) => metrics.remove_time(self.variant(), t.elapsed()), - other => metrics.remove(other.into()), + ConnInner::Active(t, _) | ConnInner::Idle(t, _) => { + // If we're transferring, we need to emit virtual disconnecting/closed events + metrics.inc_all_time(MetricVariant::Disconnecting); + metrics.inc_all_time(MetricVariant::Closed); + metrics.remove_time(self.variant(), t.elapsed()); + } + ConnInner::Closed | ConnInner::Failed(..) => { + metrics.remove(self.variant()); + } + _ => unreachable!() } } } @@ -263,17 +281,26 @@ impl Conn { /// ``` enum ConnInner { /// Connecting connections hold a spot in the pool as they count towards quotas - Connecting(Instant, Pin>>>), + Connecting( + Instant, + Pin>>>, + ), + /// Reconnecting connections hold a spot in the pool as they count towards quotas + Reconnecting( + Instant, + Pin>>>, + ), /// Disconnecting connections hold a spot in the pool as they count towards quotas - Disconnecting(Instant, Pin>>>), - /// Reconnecting hold a spot in the pool as they count towards quotas - Reconnecting(Instant, Pin>>>), + Disconnecting( + Instant, + Pin>>>, + ), /// The connection is alive, but it is not being held. Idle(Instant, C::Conn), /// The connection is alive, and is being held. Active(Instant, C::Conn), /// The connection is in a failed state. - Failed, + Failed(ConnError), /// The connection is in a closed state. Closed, /// Transitioning between states. Used internally, never escapes an internal @@ -285,11 +312,11 @@ impl From<&ConnInner> for MetricVariant { fn from(val: &ConnInner) -> Self { match val { ConnInner::Connecting(..) => MetricVariant::Connecting, - ConnInner::Disconnecting(..) => MetricVariant::Disconnecting, ConnInner::Reconnecting(..) => MetricVariant::Reconnecting, + ConnInner::Disconnecting(..) => MetricVariant::Disconnecting, ConnInner::Idle(..) => MetricVariant::Idle, ConnInner::Active(..) => MetricVariant::Active, - ConnInner::Failed => MetricVariant::Failed, + ConnInner::Failed(..) => MetricVariant::Failed, ConnInner::Closed => MetricVariant::Closed, ConnInner::Transition => unreachable!(), } @@ -371,21 +398,24 @@ pub struct Conns { impl Default for Conns { fn default() -> Self { Self { - conns: Default::default(), + conns: vec![].into(), youngest: Cell::new(Instant::now()), } } } impl Conns { + #[inline] pub fn len(&self) -> usize { self.conns.borrow().len() } + #[inline] pub fn youngest(&self) -> Duration { self.youngest.get().elapsed() } + #[inline] pub fn walk(&self, mut f: impl FnMut(&Conn)) { for conn in self.conns.borrow().iter() { f(conn) @@ -400,58 +430,112 @@ impl Conns { /// Remove a specific connection from the list. This may break MRU ordering /// for performance reasons. - pub fn remove(&self, conn: &Conn) { - let lock = self.conns.borrow_mut(); + pub fn remove( + &self, + conn: Conn, + metrics: &MetricsAccum, + ) -> ConnResult, C::Error> { + conn.untrack(metrics); + + // Find the connection and remove it + let mut lock = self.conns.borrow_mut(); let index = lock .iter() - .position(|other| conn == other) + .position(|other| &conn == other) .expect("Connection unexpectedly could not be found"); - { - let mut lock = lock; - lock.swap_remove(index) - }; + lock.swap_remove(index); + + // We know that a removed connection cannot have a handle active, so the + // `Rc` unwrap will always succeed. + debug_assert_eq!(Rc::strong_count(&conn.inner), 1); + + match Rc::into_inner(conn.inner).unwrap().into_inner() { + ConnInner::Active(_, conn) | ConnInner::Idle(_, conn) => Ok(Some(conn)), + ConnInner::Closed => Ok(None), + ConnInner::Failed(err) => Err(err), + _ => unreachable!(), + } } /// Acquires the most-recently-used idle connection, moving it to the end of - /// the internal vector. - pub fn try_acquire_idle_mru(&self, metrics: &MetricsAccum) -> Option> { - let mut lock = self.conns.borrow_mut(); - let pos = lock - .iter() - .rev() - .position(|conn| conn.variant() == MetricVariant::Idle)?; - let last_item = lock.len() - 1; - let pos = last_item - pos; - lock.swap(last_item, pos); - let conn = lock[last_item].clone(); - if !conn.try_lock(&metrics) { - panic!("Connection unexpectedly could not be locked") + /// the internal vector. If no connections are available, polls a pending error. + pub fn try_acquire_idle_mru( + &self, + metrics: &MetricsAccum, + ) -> Option, C::Error>> { + let res = self.try_get_idle(metrics, true); + if let Some(Ok(conn)) = &res { + if !conn.try_lock(metrics) { + panic!("Connection unexpectedly could not be locked") + } } - Some(conn) + res } - /// Gets the least-recently-used idle connection, does not re-order the - /// underlying list. - pub fn try_get_idle_lru(&self) -> Option> { - for conn in self.conns.borrow().iter() { - if conn.variant() == MetricVariant::Idle { - return Some(conn.clone()); + /// Gets the most-recently-used idle connection, moving it to the end of + /// the internal vector. If no connections are available, polls a pending error. + pub fn try_get_idle( + &self, + metrics: &MetricsAccum, + mru: bool, + ) -> Option, C::Error>> { + let mut lock = self.conns.borrow_mut(); + if lock.is_empty() { + None + } else { + let last_item = lock.len() - 1; + let range: &mut dyn Iterator = if mru { + &mut (0..lock.len()).rev() + } else { + &mut (0..lock.len()) + }; + for pos in range { + return match lock[pos].variant() { + MetricVariant::Idle => { + trace!("Got a connection"); + lock.swap(last_item, pos); + let conn = lock[last_item].clone(); + Some(Ok(conn)) + } + MetricVariant::Failed => { + trace!("Got an error"); + let conn = lock.swap_remove(pos); + conn.untrack(metrics); + + // We know that a removed connection cannot have a handle active, so the + // `Rc` unwrap will always succeed. + debug_assert_eq!(Rc::strong_count(&conn.inner), 1); + + match Rc::into_inner(conn.inner).unwrap().into_inner() { + ConnInner::Failed(err) => Some(Err(err)), + _ => unreachable!(), + } + } + _ => continue, + }; } + None } - None } /// Takes the least-recently-used idle connection, does not re-order the /// underlying list. - pub fn try_take_idle_lru(&self) -> Option> { - let lock = self.conns.borrow_mut(); - let pos = lock + pub fn try_take_idle_lru(&self, metrics: &MetricsAccum) -> Option { + let mut lock = self.conns.borrow_mut(); + let index = lock .iter() .position(|conn| conn.variant() == MetricVariant::Idle)?; - let conn = { - let mut lock = lock; - lock.swap_remove(pos) - }; - Some(conn) + + let conn = lock.swap_remove(index); + conn.untrack(metrics); + + // We know that a removed connection cannot have a handle active, so the + // `Rc` unwrap will always succeed. + debug_assert_eq!(Rc::strong_count(&conn.inner), 1); + + match Rc::into_inner(conn.inner).unwrap().into_inner() { + ConnInner::Idle(_, conn) => Some(conn), + _ => unreachable!(), + } } } diff --git a/edb/server/conn_pool/src/metrics.rs b/edb/server/conn_pool/src/metrics.rs index 1d0c8bfff5e..7ca2cc3d6ae 100644 --- a/edb/server/conn_pool/src/metrics.rs +++ b/edb/server/conn_pool/src/metrics.rs @@ -152,6 +152,7 @@ impl VariantArray { #[allow(unused)] pub struct ConnMetrics { pub(crate) value: VariantArray, + pub(crate) all_time: VariantArray, pub(crate) max: VariantArray, pub(crate) avg_time: VariantArray, pub(crate) total: usize, @@ -283,6 +284,29 @@ impl MetricsAccum { self.raw.borrow().counts[variant] } + /// Sums the values of all the given variants. + #[inline(always)] + pub fn sum_all(&self, variants: &[MetricVariant]) -> usize { + let mut sum = 0; + let lock = self.raw.borrow(); + for variant in variants { + sum += lock.counts[*variant]; + } + sum + } + + /// Returns true if there is a non-zero count for any of the variants. + #[inline(always)] + pub fn has_any(&self, variants: &[MetricVariant]) -> bool { + let lock = self.raw.borrow(); + for variant in variants { + if lock.counts[*variant] > 0 { + return true; + } + } + false + } + #[inline(always)] pub fn reset_max(&self) { self.raw.borrow_mut().reset_max(); @@ -296,6 +320,7 @@ impl MetricsAccum { } ConnMetrics { value: lock.counts, + all_time: lock.all_time, max: lock.max, avg_time, total: lock.total, @@ -303,9 +328,12 @@ impl MetricsAccum { } } + pub fn counts(&self) -> VariantArray { + self.raw.borrow().counts + } + pub fn all_time(&self) -> VariantArray { - let lock = self.raw.borrow(); - lock.all_time + self.raw.borrow().all_time } #[inline] diff --git a/edb/server/conn_pool/src/pool.rs b/edb/server/conn_pool/src/pool.rs index 4472c0940a0..d9d5be92584 100644 --- a/edb/server/conn_pool/src/pool.rs +++ b/edb/server/conn_pool/src/pool.rs @@ -154,10 +154,36 @@ impl Pool { return; } + if self.drain.shutdown.get() { + for op in self.config.constraints.plan_shutdown(&self.blocks) { + trace!("Shutdown: {op:?}"); + match op { + RebalanceOp::Transfer { from, to } => { + tokio::task::spawn_local(self.blocks.task_steal( + &self.connector, + &to, + &from, + )); + } + RebalanceOp::Create(name) => { + tokio::task::spawn_local( + self.blocks.task_create_one(&self.connector, &name), + ); + } + RebalanceOp::Close(name) => { + tokio::task::spawn_local( + self.blocks.task_close_one(&self.connector, &name), + ); + } + } + } + return; + } + self.config.constraints.adjust(&self.blocks); let mut s = String::new(); self.blocks.with_all(|name, block| { - s += &format!("{name}={} ", block.target()); + s += &format!("{name}={}/{} ", block.target(), block.len()); }); trace!("Targets: {s}"); for op in self.config.constraints.plan_rebalance(&self.blocks) { @@ -179,7 +205,7 @@ impl Pool { /// Acquire a handle from this connection pool. The returned [`PoolHandle`] /// controls the lock for the connection and may be dropped to release it /// back into the pool. - pub async fn acquire(self: &Rc, db: &str) -> ConnResult> { + pub async fn acquire(self: &Rc, db: &str) -> ConnResult, C::Error> { if self.drain.shutdown.get() { return Err(ConnError::Shutdown); } @@ -297,16 +323,18 @@ impl Pool { } if cfg!(debug_assertions) { let all_time = &pool.metrics().all_time; - assert_eq!( - all_time[MetricVariant::Connecting] + all_time[MetricVariant::Reconnecting], - all_time[MetricVariant::Disconnecting], - "Connecting + Reconnecting != Disconnecting" - ); - assert_eq!( - all_time[MetricVariant::Disconnecting], - all_time[MetricVariant::Closed], - "Disconnecting != Closed" - ); + if all_time[MetricVariant::Failed] == 0 { + assert_eq!( + all_time[MetricVariant::Connecting] + all_time[MetricVariant::Reconnecting], + all_time[MetricVariant::Disconnecting], + "Connecting + Reconnecting != Disconnecting" + ); + assert_eq!( + all_time[MetricVariant::Disconnecting], + all_time[MetricVariant::Closed], + "Disconnecting != Closed" + ); + } } } } @@ -391,6 +419,7 @@ mod tests { use crate::time::Instant; use anyhow::{Ok, Result}; use itertools::Itertools; + use rand::{thread_rng, Rng}; use rstest::rstest; use test_log::test; @@ -501,6 +530,43 @@ mod tests { run(spec).await.map(drop) } + #[test(tokio::test(flavor = "current_thread", start_paused = true))] + #[rstest] + #[case::small(1)] + #[case::medium(10)] + #[case::large(20)] + async fn test_pool_failures(#[case] databases: usize) -> Result<()> { + let spec = Spec { + name: format!("test_pool_fail50_{databases}").into(), + desc: "", + capacity: 10, + conn_cost: Triangle(0.05, 0.0), + conn_failure_percentage: 50, + score: vec![ + Score::new( + 0.8, + [2.0, 0.5, 0.25, 0.0], + LatencyDistribution { + group: 0..databases, + }, + ), + Score::new(0.2, [0.5, 0.2, 0.1, 0.0], ConnectionOverhead {}), + ], + dbs: (0..databases) + .map(|db| DBSpec { + db, + start_at: 0.0, + end_at: 1.0, + qps: 1200, + query_cost: Triangle(0.001, 0.0), + }) + .collect_vec(), + ..Default::default() + }; + + run(spec).await.map(drop) + } + async fn run(spec: Spec) -> Result { let local = LocalSet::new(); let res = local.run_until(run_local(spec)).await?; @@ -514,16 +580,20 @@ mod tests { let config = PoolConfig::suggested_default_for(spec.capacity); let disconnect_cost = spec.disconn_cost; let connect_cost = spec.disconn_cost; - let pool = Pool::new( - config, - BasicConnector::delay(move |disconnect| { - if disconnect { - disconnect_cost.random_duration() - } else { - connect_cost.random_duration() + let conn_failure_percentage = spec.conn_failure_percentage; + let connector = BasicConnector::delay(move |disconnect| { + if conn_failure_percentage > 0 { + if thread_rng().gen_range(0..100) > conn_failure_percentage { + return std::result::Result::Err(()); } - }), - ); + } + std::result::Result::Ok(if disconnect { + disconnect_cost.random_duration() + } else { + connect_cost.random_duration() + }) + }); + let pool = Pool::new(config, connector); let mut tasks = vec![]; let latencies = Latencies::default(); diff --git a/edb/server/conn_pool/src/test.rs b/edb/server/conn_pool/src/test.rs index cbfe46d0159..fc094c38c34 100644 --- a/edb/server/conn_pool/src/test.rs +++ b/edb/server/conn_pool/src/test.rs @@ -1,4 +1,7 @@ //! Test utilities. +use itertools::Itertools; +use rand::random; +use statrs::statistics::{Data, Distribution, OrderStatistics, Statistics}; use std::{ borrow::Cow, cell::{Cell, RefCell}, @@ -9,12 +12,8 @@ use std::{ time::Duration, }; -use itertools::Itertools; -use rand::random; -use statrs::statistics::{Data, Distribution, OrderStatistics, Statistics}; - use crate::{ - conn::{ConnResult, Connector}, + conn::{ConnError, ConnResult, Connector}, metrics::{MetricVariant, PoolMetrics}, PoolConfig, }; @@ -22,27 +21,64 @@ use crate::{ #[derive(derive_more::Debug)] pub struct BasicConnector { #[debug(skip)] - delay: Option Duration>>, + delay: Option Result>>, + fail_next_connect: Cell, + fail_next_disconnect: Cell, } impl BasicConnector { pub fn no_delay() -> Self { - BasicConnector { delay: None } + BasicConnector { + delay: None, + fail_next_connect: Default::default(), + fail_next_disconnect: Default::default(), + } } - pub fn delay(f: impl Fn(bool) -> Duration + 'static) -> Self { + + pub fn delay(f: impl Fn(bool) -> Result + 'static) -> Self { BasicConnector { delay: Some(Rc::new(f)), + fail_next_connect: Default::default(), + fail_next_disconnect: Default::default(), + } + } + + pub fn fail_next_connect(&self) { + self.fail_next_connect.set(true); + } + + pub fn fail_next_disconnect(&self) { + self.fail_next_disconnect.set(true); + } + + fn duration(&self, disconnect: bool) -> ConnResult, String> { + if disconnect && self.fail_next_disconnect.replace(false) { + return Err(ConnError::Underlying("failed".to_string())); + } + if !disconnect && self.fail_next_connect.replace(false) { + return Err(ConnError::Underlying("failed".to_string())); + } + if let Some(f) = &self.delay { + Ok(Some(f(disconnect).map_err(|_| { + ConnError::Underlying("failed".to_string()) + })?)) + } else { + Ok(None) } } } impl Connector for BasicConnector { type Conn = (); - fn connect(&self, _db: &str) -> impl Future> + 'static { - let delay = self.delay.clone(); + type Error = String; + fn connect( + &self, + _db: &str, + ) -> impl Future> + 'static { + let connect = self.duration(false); async move { - if let Some(f) = delay { - tokio::time::sleep(f(false)).await; + if let Some(f) = connect? { + tokio::time::sleep(f).await; } Ok(()) } @@ -51,21 +87,27 @@ impl Connector for BasicConnector { &self, conn: Self::Conn, _db: &str, - ) -> impl Future> + 'static { - let delay = self.delay.clone(); + ) -> impl Future> + 'static { + let connect = self.duration(false); + let disconnect = self.duration(true); async move { - if let Some(f) = delay { - tokio::time::sleep(f(true)).await; - tokio::time::sleep(f(false)).await; + if let Some(f) = disconnect? { + tokio::time::sleep(f).await; + } + if let Some(f) = connect? { + tokio::time::sleep(f).await; } Ok(conn) } } - fn disconnect(&self, _conn: Self::Conn) -> impl Future> + 'static { - let delay = self.delay.clone(); + fn disconnect( + &self, + _conn: Self::Conn, + ) -> impl Future> + 'static { + let disconnect = self.duration(true); async move { - if let Some(f) = delay { - tokio::time::sleep(f(true)).await; + if let Some(f) = disconnect? { + tokio::time::sleep(f).await; } Ok(()) } @@ -172,6 +214,8 @@ pub struct Spec { pub duration: f64, pub capacity: usize, pub conn_cost: Triangle, + #[default = 0] + pub conn_failure_percentage: u8, pub dbs: Vec, #[default(Triangle(0.006, 0.0015))] pub disconn_cost: Triangle, diff --git a/edb/server/conn_pool/src/waitqueue.rs b/edb/server/conn_pool/src/waitqueue.rs index 5f00792c130..ef8510de3de 100644 --- a/edb/server/conn_pool/src/waitqueue.rs +++ b/edb/server/conn_pool/src/waitqueue.rs @@ -82,8 +82,14 @@ impl WaitQueue { .await; } + #[inline] pub fn len(&self) -> usize { - self.waiters.borrow().len() + self.lock.get() + } + + #[inline] + pub fn is_empty(&self) -> bool { + self.len() == 0 } pub(crate) fn lock(&self) {