From cb919274512739b5c2a018a16e52599d2ef0ba4d Mon Sep 17 00:00:00 2001 From: Matt Mastracci Date: Thu, 18 Jul 2024 16:51:32 -0600 Subject: [PATCH] Provide connections in LRU/MRU --- edb/server/conn_pool/src/block.rs | 84 +++++++------------------- edb/server/conn_pool/src/conn.rs | 97 +++++++++++++++++++++++++++++++ 2 files changed, 119 insertions(+), 62 deletions(-) diff --git a/edb/server/conn_pool/src/block.rs b/edb/server/conn_pool/src/block.rs index 374675e019f..e32f83ce28a 100644 --- a/edb/server/conn_pool/src/block.rs +++ b/edb/server/conn_pool/src/block.rs @@ -10,7 +10,7 @@ use crate::{ }; use futures::future::Either; use std::{ - cell::{Cell, RefCell}, + cell::RefCell, collections::HashMap, future::{poll_fn, ready, Future}, rc::Rc, @@ -101,9 +101,8 @@ impl std::borrow::Borrow for Name { /// where additional metadata for this block can live. pub struct Block { pub db_name: Name, - conns: RefCell>>, + conns: Conns, state: Rc, - youngest: Cell, /// Associated data for this block useful for statistics, quotas or other /// information. This is provided by the algorithm in this crate. data: D, @@ -120,10 +119,9 @@ impl Block { .into(); Self { db_name, - conns: Vec::new().into(), + conns: Conns::default(), state, data: Default::default(), - youngest: Cell::new(Instant::now()), } } @@ -144,14 +142,12 @@ impl Block { if cfg!(debug_assertions) { assert_eq!( self.len(), - self.conns.borrow().len(), + self.conns.len(), "Block {} failed consistency check. Total connection count was wrong.", self.db_name ); let conn_metrics = MetricsAccum::default(); - for conn in &*self.conns.borrow() { - conn_metrics.insert(conn.variant()) - } + self.conns.walk(|conn| conn_metrics.insert(conn.variant())); conn_metrics.set_value(MetricVariant::Waiting, self.state.waiters.lock.get()); assert_eq!( self.metrics().summary().value, @@ -166,44 +162,13 @@ impl Block { self.state.metrics.clone() } - fn try_acquire_used(&self) -> Option> { - for conn in &*self.conns.borrow() { - if conn.try_lock(&self.state.metrics) { - return Some(conn.clone()); - } - } - None - } - - fn try_get_used(&self) -> Option> { - for conn in &*self.conns.borrow() { - if conn.variant() == MetricVariant::Idle { - return Some(conn.clone()); - } - } - None - } - - fn try_take_used(&self) -> Option> { - let mut lock = self.conns.borrow_mut(); - let pos = lock - .iter() - .position(|conn| conn.variant() == MetricVariant::Idle); - if let Some(index) = pos { - let conn = lock.remove(index); - return Some(conn); - } - None - } - /// Creates a connection from this block. #[cfg(test)] fn create(self: Rc, connector: &C) -> impl Future>> { let conn = { consistency_check!(self); let conn = Conn::new(connector.connect(&self.db_name), &self.state.metrics); - self.youngest.set(Instant::now()); - self.conns.borrow_mut().push(conn.clone()); + self.conns.insert(conn.clone()); conn }; async move { @@ -219,7 +184,7 @@ impl Block { self: Rc, connector: &C, ) -> impl Future>> { - if let Some(conn) = self.try_acquire_used() { + if let Some(conn) = self.conns.try_acquire_idle_mru(&self.state.metrics) { return Either::Left(ready(Ok(self.conn(conn)))); } Either::Right(self.create(connector)) @@ -229,7 +194,7 @@ impl Block { fn queue(self: Rc) -> impl Future>> { // If someone else is waiting, we have to queue, even if there's a connection if self.state.waiters.len() == 0 { - if let Some(conn) = self.try_acquire_used() { + if let Some(conn) = self.conns.try_acquire_idle_mru(&self.state.metrics) { trace!("Got a connection"); return Either::Left(ready(Ok(self.conn(conn)))); } @@ -248,7 +213,7 @@ impl Block { Either::Right(async move { consistency_check!(self); loop { - if let Some(conn) = self.try_acquire_used() { + if let Some(conn) = self.conns.try_acquire_idle_mru(&self.state.metrics) { trace!("Got a connection"); drop(guard); return Ok(self.conn(conn)); @@ -264,8 +229,7 @@ impl Block { let conn = { consistency_check!(self); let conn = Conn::new(connector.connect(&self.db_name), &self.state.metrics); - self.youngest.set(Instant::now()); - self.conns.borrow_mut().push(conn.clone()); + self.conns.insert(conn.clone()); conn }; async move { @@ -284,16 +248,17 @@ impl Block { fn task_close_one(self: Rc, connector: &C) -> impl Future> { let conn = { consistency_check!(self); - let conn = self.try_get_used().expect("Could not acquire a connection"); + let conn = self + .conns + .try_get_idle_lru() + .expect("Could not acquire a connection"); conn.close(connector, &self.state.metrics); conn }; async move { consistency_check!(self); poll_fn(|cx| conn.poll_ready(cx, &self.state.metrics, MetricVariant::Closed)).await?; - // TODO: this can be replaced by moving the final item of the list into the - // empty spot to avoid reshuffling - self.conns.borrow_mut().retain(|other| other != &conn); + self.conns.remove(&conn); conn.untrack(&self.state.metrics); Ok(()) } @@ -314,10 +279,10 @@ impl Block { consistency_check!(to); let conn = from - .try_take_used() + .conns + .try_take_idle_lru() .expect("Could not acquire a connection"); - to.youngest.set(Instant::now()); - to.conns.borrow_mut().push(conn.clone()); + to.conns.insert(conn.clone()); conn.transfer( connector, &from.state.metrics, @@ -347,12 +312,9 @@ impl Block { consistency_check!(from); consistency_check!(to); - // TODO: this can be replaced by moving the final item of the list into the - // empty spot to avoid reshuffling let conn = conn.into_inner(); - from.conns.borrow_mut().retain(|other| other != &conn); - to.youngest.set(Instant::now()); - to.conns.borrow_mut().push(conn.clone()); + from.conns.remove(&conn); + to.conns.insert(conn.clone()); conn.transfer( connector, &from.state.metrics, @@ -405,9 +367,7 @@ impl Block { async move { consistency_check!(self); poll_fn(|cx| conn.poll_ready(cx, &self.state.metrics, MetricVariant::Closed)).await?; - // TODO: this can be replaced by moving the final item of the list into the - // empty spot to avoid reshuffling - self.conns.borrow_mut().retain(|other| other != &conn); + self.conns.remove(&conn); conn.untrack(&self.state.metrics); Ok(()) } @@ -477,7 +437,7 @@ impl PoolAlgorithmDataBlock for Block { } #[inline(always)] fn youngest_ms(&self) -> usize { - self.youngest.get().elapsed().as_millis() as _ + self.conns.youngest().as_millis() as _ } } diff --git a/edb/server/conn_pool/src/conn.rs b/edb/server/conn_pool/src/conn.rs index b2459581ec5..a45e7bcf53a 100644 --- a/edb/server/conn_pool/src/conn.rs +++ b/edb/server/conn_pool/src/conn.rs @@ -12,6 +12,7 @@ use std::{ pin::Pin, rc::Rc, task::{ready, Poll}, + time::Duration, }; use tracing::error; @@ -346,3 +347,99 @@ impl Drop for ConnHandle { }); } } + +/// Maintains a list of connections. Tries to provide idle connections for use +/// in a MRU mode, and for release in a LRU mode where possible. +#[derive(Debug)] +pub struct Conns { + conns: RefCell>>, + youngest: Cell, +} + +impl Default for Conns { + fn default() -> Self { + Self { + conns: Default::default(), + youngest: Cell::new(Instant::now()), + } + } +} + +impl Conns { + pub fn len(&self) -> usize { + self.conns.borrow().len() + } + + pub fn youngest(&self) -> Duration { + self.youngest.get().elapsed() + } + + pub fn walk(&self, mut f: impl FnMut(&Conn)) { + for conn in self.conns.borrow().iter() { + f(conn) + } + } + + /// Insert a new connection, in the MRU spot. + pub fn insert(&self, conn: Conn) { + self.conns.borrow_mut().push(conn); + self.youngest.set(Instant::now()); + } + + /// Remove a specific connection from the list. This may break MRU ordering + /// for performance reasons. + pub fn remove(&self, conn: &Conn) { + let lock = self.conns.borrow_mut(); + let index = lock + .iter() + .position(|other| conn == other) + .expect("Connection unexpectedly could not be found"); + { + let mut lock = lock; + lock.swap_remove(index) + }; + } + + /// Acquires the most-recently-used idle connection, moving it to the end of + /// the internal vector. + pub fn try_acquire_idle_mru(&self, metrics: &MetricsAccum) -> Option> { + let mut lock = self.conns.borrow_mut(); + let pos = lock + .iter() + .rev() + .position(|conn| conn.variant() == MetricVariant::Idle)?; + let last_item = lock.len() - 1; + let pos = last_item - pos; + lock.swap(last_item, pos); + let conn = lock[last_item].clone(); + if !conn.try_lock(&metrics) { + panic!("Connection unexpectedly could not be locked") + } + Some(conn) + } + + /// Gets the least-recently-used idle connection, does not re-order the + /// underlying list. + pub fn try_get_idle_lru(&self) -> Option> { + for conn in self.conns.borrow().iter() { + if conn.variant() == MetricVariant::Idle { + return Some(conn.clone()); + } + } + None + } + + /// Takes the least-recently-used idle connection, does not re-order the + /// underlying list. + pub fn try_take_idle_lru(&self) -> Option> { + let lock = self.conns.borrow_mut(); + let pos = lock + .iter() + .position(|conn| conn.variant() == MetricVariant::Idle)?; + let conn = { + let mut lock = lock; + lock.swap_remove(pos) + }; + Some(conn) + } +}