Skip to content

Commit

Permalink
Provide connections in LRU/MRU
Browse files Browse the repository at this point in the history
  • Loading branch information
mmastrac committed Jul 19, 2024
1 parent 2a065c1 commit cb91927
Show file tree
Hide file tree
Showing 2 changed files with 119 additions and 62 deletions.
84 changes: 22 additions & 62 deletions edb/server/conn_pool/src/block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use crate::{
};
use futures::future::Either;
use std::{
cell::{Cell, RefCell},
cell::RefCell,
collections::HashMap,
future::{poll_fn, ready, Future},
rc::Rc,
Expand Down Expand Up @@ -101,9 +101,8 @@ impl std::borrow::Borrow<str> for Name {
/// where additional metadata for this block can live.
pub struct Block<C: Connector, D: Default = ()> {
pub db_name: Name,
conns: RefCell<Vec<Conn<C>>>,
conns: Conns<C>,
state: Rc<ConnState>,
youngest: Cell<Instant>,
/// Associated data for this block useful for statistics, quotas or other
/// information. This is provided by the algorithm in this crate.
data: D,
Expand All @@ -120,10 +119,9 @@ impl<C: Connector, D: Default> Block<C, D> {
.into();
Self {
db_name,
conns: Vec::new().into(),
conns: Conns::default(),
state,
data: Default::default(),
youngest: Cell::new(Instant::now()),
}
}

Expand All @@ -144,14 +142,12 @@ impl<C: Connector, D: Default> Block<C, D> {
if cfg!(debug_assertions) {
assert_eq!(
self.len(),
self.conns.borrow().len(),
self.conns.len(),
"Block {} failed consistency check. Total connection count was wrong.",
self.db_name
);
let conn_metrics = MetricsAccum::default();
for conn in &*self.conns.borrow() {
conn_metrics.insert(conn.variant())
}
self.conns.walk(|conn| conn_metrics.insert(conn.variant()));
conn_metrics.set_value(MetricVariant::Waiting, self.state.waiters.lock.get());
assert_eq!(
self.metrics().summary().value,
Expand All @@ -166,44 +162,13 @@ impl<C: Connector, D: Default> Block<C, D> {
self.state.metrics.clone()
}

fn try_acquire_used(&self) -> Option<Conn<C>> {
for conn in &*self.conns.borrow() {
if conn.try_lock(&self.state.metrics) {
return Some(conn.clone());
}
}
None
}

fn try_get_used(&self) -> Option<Conn<C>> {
for conn in &*self.conns.borrow() {
if conn.variant() == MetricVariant::Idle {
return Some(conn.clone());
}
}
None
}

fn try_take_used(&self) -> Option<Conn<C>> {
let mut lock = self.conns.borrow_mut();
let pos = lock
.iter()
.position(|conn| conn.variant() == MetricVariant::Idle);
if let Some(index) = pos {
let conn = lock.remove(index);
return Some(conn);
}
None
}

/// Creates a connection from this block.
#[cfg(test)]
fn create(self: Rc<Self>, connector: &C) -> impl Future<Output = ConnResult<ConnHandle<C>>> {
let conn = {
consistency_check!(self);
let conn = Conn::new(connector.connect(&self.db_name), &self.state.metrics);
self.youngest.set(Instant::now());
self.conns.borrow_mut().push(conn.clone());
self.conns.insert(conn.clone());
conn
};
async move {
Expand All @@ -219,7 +184,7 @@ impl<C: Connector, D: Default> Block<C, D> {
self: Rc<Self>,
connector: &C,
) -> impl Future<Output = ConnResult<ConnHandle<C>>> {
if let Some(conn) = self.try_acquire_used() {
if let Some(conn) = self.conns.try_acquire_idle_mru(&self.state.metrics) {
return Either::Left(ready(Ok(self.conn(conn))));
}
Either::Right(self.create(connector))
Expand All @@ -229,7 +194,7 @@ impl<C: Connector, D: Default> Block<C, D> {
fn queue(self: Rc<Self>) -> impl Future<Output = ConnResult<ConnHandle<C>>> {
// If someone else is waiting, we have to queue, even if there's a connection
if self.state.waiters.len() == 0 {
if let Some(conn) = self.try_acquire_used() {
if let Some(conn) = self.conns.try_acquire_idle_mru(&self.state.metrics) {
trace!("Got a connection");
return Either::Left(ready(Ok(self.conn(conn))));
}
Expand All @@ -248,7 +213,7 @@ impl<C: Connector, D: Default> Block<C, D> {
Either::Right(async move {
consistency_check!(self);
loop {
if let Some(conn) = self.try_acquire_used() {
if let Some(conn) = self.conns.try_acquire_idle_mru(&self.state.metrics) {
trace!("Got a connection");
drop(guard);
return Ok(self.conn(conn));
Expand All @@ -264,8 +229,7 @@ impl<C: Connector, D: Default> Block<C, D> {
let conn = {
consistency_check!(self);
let conn = Conn::new(connector.connect(&self.db_name), &self.state.metrics);
self.youngest.set(Instant::now());
self.conns.borrow_mut().push(conn.clone());
self.conns.insert(conn.clone());
conn
};
async move {
Expand All @@ -284,16 +248,17 @@ impl<C: Connector, D: Default> Block<C, D> {
fn task_close_one(self: Rc<Self>, connector: &C) -> impl Future<Output = ConnResult<()>> {
let conn = {
consistency_check!(self);
let conn = self.try_get_used().expect("Could not acquire a connection");
let conn = self
.conns
.try_get_idle_lru()
.expect("Could not acquire a connection");
conn.close(connector, &self.state.metrics);
conn
};
async move {
consistency_check!(self);
poll_fn(|cx| conn.poll_ready(cx, &self.state.metrics, MetricVariant::Closed)).await?;
// TODO: this can be replaced by moving the final item of the list into the
// empty spot to avoid reshuffling
self.conns.borrow_mut().retain(|other| other != &conn);
self.conns.remove(&conn);
conn.untrack(&self.state.metrics);
Ok(())
}
Expand All @@ -314,10 +279,10 @@ impl<C: Connector, D: Default> Block<C, D> {
consistency_check!(to);

let conn = from
.try_take_used()
.conns
.try_take_idle_lru()
.expect("Could not acquire a connection");
to.youngest.set(Instant::now());
to.conns.borrow_mut().push(conn.clone());
to.conns.insert(conn.clone());
conn.transfer(
connector,
&from.state.metrics,
Expand Down Expand Up @@ -347,12 +312,9 @@ impl<C: Connector, D: Default> Block<C, D> {
consistency_check!(from);
consistency_check!(to);

// TODO: this can be replaced by moving the final item of the list into the
// empty spot to avoid reshuffling
let conn = conn.into_inner();
from.conns.borrow_mut().retain(|other| other != &conn);
to.youngest.set(Instant::now());
to.conns.borrow_mut().push(conn.clone());
from.conns.remove(&conn);
to.conns.insert(conn.clone());
conn.transfer(
connector,
&from.state.metrics,
Expand Down Expand Up @@ -405,9 +367,7 @@ impl<C: Connector, D: Default> Block<C, D> {
async move {
consistency_check!(self);
poll_fn(|cx| conn.poll_ready(cx, &self.state.metrics, MetricVariant::Closed)).await?;
// TODO: this can be replaced by moving the final item of the list into the
// empty spot to avoid reshuffling
self.conns.borrow_mut().retain(|other| other != &conn);
self.conns.remove(&conn);
conn.untrack(&self.state.metrics);
Ok(())
}
Expand Down Expand Up @@ -477,7 +437,7 @@ impl<C: Connector> PoolAlgorithmDataBlock for Block<C, PoolAlgoTargetData> {
}
#[inline(always)]
fn youngest_ms(&self) -> usize {
self.youngest.get().elapsed().as_millis() as _
self.conns.youngest().as_millis() as _
}
}

Expand Down
97 changes: 97 additions & 0 deletions edb/server/conn_pool/src/conn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use std::{
pin::Pin,
rc::Rc,
task::{ready, Poll},
time::Duration,
};
use tracing::error;

Expand Down Expand Up @@ -346,3 +347,99 @@ impl<C: Connector> Drop for ConnHandle<C> {
});
}
}

/// Maintains a list of connections. Tries to provide idle connections for use
/// in a MRU mode, and for release in a LRU mode where possible.
#[derive(Debug)]
pub struct Conns<C: Connector> {
conns: RefCell<Vec<Conn<C>>>,
youngest: Cell<Instant>,
}

impl<C: Connector> Default for Conns<C> {
fn default() -> Self {
Self {
conns: Default::default(),
youngest: Cell::new(Instant::now()),
}
}
}

impl<C: Connector> Conns<C> {
pub fn len(&self) -> usize {
self.conns.borrow().len()
}

pub fn youngest(&self) -> Duration {
self.youngest.get().elapsed()
}

pub fn walk(&self, mut f: impl FnMut(&Conn<C>)) {
for conn in self.conns.borrow().iter() {
f(conn)
}
}

/// Insert a new connection, in the MRU spot.
pub fn insert(&self, conn: Conn<C>) {
self.conns.borrow_mut().push(conn);
self.youngest.set(Instant::now());
}

/// Remove a specific connection from the list. This may break MRU ordering
/// for performance reasons.
pub fn remove(&self, conn: &Conn<C>) {
let lock = self.conns.borrow_mut();
let index = lock
.iter()
.position(|other| conn == other)
.expect("Connection unexpectedly could not be found");
{
let mut lock = lock;
lock.swap_remove(index)
};
}

/// Acquires the most-recently-used idle connection, moving it to the end of
/// the internal vector.
pub fn try_acquire_idle_mru(&self, metrics: &MetricsAccum) -> Option<Conn<C>> {
let mut lock = self.conns.borrow_mut();
let pos = lock
.iter()
.rev()
.position(|conn| conn.variant() == MetricVariant::Idle)?;
let last_item = lock.len() - 1;
let pos = last_item - pos;
lock.swap(last_item, pos);
let conn = lock[last_item].clone();
if !conn.try_lock(&metrics) {
panic!("Connection unexpectedly could not be locked")
}
Some(conn)
}

/// Gets the least-recently-used idle connection, does not re-order the
/// underlying list.
pub fn try_get_idle_lru(&self) -> Option<Conn<C>> {
for conn in self.conns.borrow().iter() {
if conn.variant() == MetricVariant::Idle {
return Some(conn.clone());
}
}
None
}

/// Takes the least-recently-used idle connection, does not re-order the
/// underlying list.
pub fn try_take_idle_lru(&self) -> Option<Conn<C>> {
let lock = self.conns.borrow_mut();
let pos = lock
.iter()
.position(|conn| conn.variant() == MetricVariant::Idle)?;
let conn = {
let mut lock = lock;
lock.swap_remove(pos)
};
Some(conn)
}
}

0 comments on commit cb91927

Please sign in to comment.