Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rust connpool: provide connections in LRU/MRU #7583

Merged
merged 1 commit into from
Jul 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 22 additions & 62 deletions edb/server/conn_pool/src/block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use crate::{
};
use futures::future::Either;
use std::{
cell::{Cell, RefCell},
cell::RefCell,
collections::HashMap,
future::{poll_fn, ready, Future},
rc::Rc,
Expand Down Expand Up @@ -101,9 +101,8 @@ impl std::borrow::Borrow<str> for Name {
/// where additional metadata for this block can live.
pub struct Block<C: Connector, D: Default = ()> {
pub db_name: Name,
conns: RefCell<Vec<Conn<C>>>,
conns: Conns<C>,
state: Rc<ConnState>,
youngest: Cell<Instant>,
/// Associated data for this block useful for statistics, quotas or other
/// information. This is provided by the algorithm in this crate.
data: D,
Expand All @@ -120,10 +119,9 @@ impl<C: Connector, D: Default> Block<C, D> {
.into();
Self {
db_name,
conns: Vec::new().into(),
conns: Conns::default(),
state,
data: Default::default(),
youngest: Cell::new(Instant::now()),
}
}

Expand All @@ -144,14 +142,12 @@ impl<C: Connector, D: Default> Block<C, D> {
if cfg!(debug_assertions) {
assert_eq!(
self.len(),
self.conns.borrow().len(),
self.conns.len(),
"Block {} failed consistency check. Total connection count was wrong.",
self.db_name
);
let conn_metrics = MetricsAccum::default();
for conn in &*self.conns.borrow() {
conn_metrics.insert(conn.variant())
}
self.conns.walk(|conn| conn_metrics.insert(conn.variant()));
conn_metrics.set_value(MetricVariant::Waiting, self.state.waiters.lock.get());
assert_eq!(
self.metrics().summary().value,
Expand All @@ -166,44 +162,13 @@ impl<C: Connector, D: Default> Block<C, D> {
self.state.metrics.clone()
}

fn try_acquire_used(&self) -> Option<Conn<C>> {
for conn in &*self.conns.borrow() {
if conn.try_lock(&self.state.metrics) {
return Some(conn.clone());
}
}
None
}

fn try_get_used(&self) -> Option<Conn<C>> {
for conn in &*self.conns.borrow() {
if conn.variant() == MetricVariant::Idle {
return Some(conn.clone());
}
}
None
}

fn try_take_used(&self) -> Option<Conn<C>> {
let mut lock = self.conns.borrow_mut();
let pos = lock
.iter()
.position(|conn| conn.variant() == MetricVariant::Idle);
if let Some(index) = pos {
let conn = lock.remove(index);
return Some(conn);
}
None
}

/// Creates a connection from this block.
#[cfg(test)]
fn create(self: Rc<Self>, connector: &C) -> impl Future<Output = ConnResult<ConnHandle<C>>> {
let conn = {
consistency_check!(self);
let conn = Conn::new(connector.connect(&self.db_name), &self.state.metrics);
self.youngest.set(Instant::now());
self.conns.borrow_mut().push(conn.clone());
self.conns.insert(conn.clone());
conn
};
async move {
Expand All @@ -219,7 +184,7 @@ impl<C: Connector, D: Default> Block<C, D> {
self: Rc<Self>,
connector: &C,
) -> impl Future<Output = ConnResult<ConnHandle<C>>> {
if let Some(conn) = self.try_acquire_used() {
if let Some(conn) = self.conns.try_acquire_idle_mru(&self.state.metrics) {
return Either::Left(ready(Ok(self.conn(conn))));
}
Either::Right(self.create(connector))
Expand All @@ -229,7 +194,7 @@ impl<C: Connector, D: Default> Block<C, D> {
fn queue(self: Rc<Self>) -> impl Future<Output = ConnResult<ConnHandle<C>>> {
// If someone else is waiting, we have to queue, even if there's a connection
if self.state.waiters.len() == 0 {
if let Some(conn) = self.try_acquire_used() {
if let Some(conn) = self.conns.try_acquire_idle_mru(&self.state.metrics) {
trace!("Got a connection");
return Either::Left(ready(Ok(self.conn(conn))));
}
Expand All @@ -248,7 +213,7 @@ impl<C: Connector, D: Default> Block<C, D> {
Either::Right(async move {
consistency_check!(self);
loop {
if let Some(conn) = self.try_acquire_used() {
if let Some(conn) = self.conns.try_acquire_idle_mru(&self.state.metrics) {
trace!("Got a connection");
drop(guard);
return Ok(self.conn(conn));
Expand All @@ -264,8 +229,7 @@ impl<C: Connector, D: Default> Block<C, D> {
let conn = {
consistency_check!(self);
let conn = Conn::new(connector.connect(&self.db_name), &self.state.metrics);
self.youngest.set(Instant::now());
self.conns.borrow_mut().push(conn.clone());
self.conns.insert(conn.clone());
conn
};
async move {
Expand All @@ -284,16 +248,17 @@ impl<C: Connector, D: Default> Block<C, D> {
fn task_close_one(self: Rc<Self>, connector: &C) -> impl Future<Output = ConnResult<()>> {
let conn = {
consistency_check!(self);
let conn = self.try_get_used().expect("Could not acquire a connection");
let conn = self
.conns
.try_get_idle_lru()
.expect("Could not acquire a connection");
conn.close(connector, &self.state.metrics);
conn
};
async move {
consistency_check!(self);
poll_fn(|cx| conn.poll_ready(cx, &self.state.metrics, MetricVariant::Closed)).await?;
// TODO: this can be replaced by moving the final item of the list into the
// empty spot to avoid reshuffling
self.conns.borrow_mut().retain(|other| other != &conn);
self.conns.remove(&conn);
conn.untrack(&self.state.metrics);
Ok(())
}
Expand All @@ -314,10 +279,10 @@ impl<C: Connector, D: Default> Block<C, D> {
consistency_check!(to);

let conn = from
.try_take_used()
.conns
.try_take_idle_lru()
.expect("Could not acquire a connection");
to.youngest.set(Instant::now());
to.conns.borrow_mut().push(conn.clone());
to.conns.insert(conn.clone());
conn.transfer(
connector,
&from.state.metrics,
Expand Down Expand Up @@ -347,12 +312,9 @@ impl<C: Connector, D: Default> Block<C, D> {
consistency_check!(from);
consistency_check!(to);

// TODO: this can be replaced by moving the final item of the list into the
// empty spot to avoid reshuffling
let conn = conn.into_inner();
from.conns.borrow_mut().retain(|other| other != &conn);
to.youngest.set(Instant::now());
to.conns.borrow_mut().push(conn.clone());
from.conns.remove(&conn);
to.conns.insert(conn.clone());
conn.transfer(
connector,
&from.state.metrics,
Expand Down Expand Up @@ -405,9 +367,7 @@ impl<C: Connector, D: Default> Block<C, D> {
async move {
consistency_check!(self);
poll_fn(|cx| conn.poll_ready(cx, &self.state.metrics, MetricVariant::Closed)).await?;
// TODO: this can be replaced by moving the final item of the list into the
// empty spot to avoid reshuffling
self.conns.borrow_mut().retain(|other| other != &conn);
self.conns.remove(&conn);
conn.untrack(&self.state.metrics);
Ok(())
}
Expand Down Expand Up @@ -477,7 +437,7 @@ impl<C: Connector> PoolAlgorithmDataBlock for Block<C, PoolAlgoTargetData> {
}
#[inline(always)]
fn youngest_ms(&self) -> usize {
self.youngest.get().elapsed().as_millis() as _
self.conns.youngest().as_millis() as _
}
}

Expand Down
97 changes: 97 additions & 0 deletions edb/server/conn_pool/src/conn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use std::{
pin::Pin,
rc::Rc,
task::{ready, Poll},
time::Duration,
};
use tracing::error;

Expand Down Expand Up @@ -346,3 +347,99 @@ impl<C: Connector> Drop for ConnHandle<C> {
});
}
}

/// Maintains a list of connections. Tries to provide idle connections for use
/// in a MRU mode, and for release in a LRU mode where possible.
#[derive(Debug)]
pub struct Conns<C: Connector> {
conns: RefCell<Vec<Conn<C>>>,
youngest: Cell<Instant>,
}

impl<C: Connector> Default for Conns<C> {
fn default() -> Self {
Self {
conns: Default::default(),
youngest: Cell::new(Instant::now()),
}
}
}

impl<C: Connector> Conns<C> {
pub fn len(&self) -> usize {
self.conns.borrow().len()
}

pub fn youngest(&self) -> Duration {
self.youngest.get().elapsed()
}

pub fn walk(&self, mut f: impl FnMut(&Conn<C>)) {
for conn in self.conns.borrow().iter() {
f(conn)
}
}

/// Insert a new connection, in the MRU spot.
pub fn insert(&self, conn: Conn<C>) {
self.conns.borrow_mut().push(conn);
self.youngest.set(Instant::now());
}

/// Remove a specific connection from the list. This may break MRU ordering
/// for performance reasons.
pub fn remove(&self, conn: &Conn<C>) {
let lock = self.conns.borrow_mut();
let index = lock
.iter()
.position(|other| conn == other)
.expect("Connection unexpectedly could not be found");
{
let mut lock = lock;
lock.swap_remove(index)
};
}

/// Acquires the most-recently-used idle connection, moving it to the end of
/// the internal vector.
pub fn try_acquire_idle_mru(&self, metrics: &MetricsAccum) -> Option<Conn<C>> {
let mut lock = self.conns.borrow_mut();
let pos = lock
.iter()
.rev()
.position(|conn| conn.variant() == MetricVariant::Idle)?;
let last_item = lock.len() - 1;
let pos = last_item - pos;
lock.swap(last_item, pos);
let conn = lock[last_item].clone();
if !conn.try_lock(&metrics) {
panic!("Connection unexpectedly could not be locked")
}
Some(conn)
}

/// Gets the least-recently-used idle connection, does not re-order the
/// underlying list.
pub fn try_get_idle_lru(&self) -> Option<Conn<C>> {
for conn in self.conns.borrow().iter() {
if conn.variant() == MetricVariant::Idle {
return Some(conn.clone());
}
}
None
}

/// Takes the least-recently-used idle connection, does not re-order the
/// underlying list.
pub fn try_take_idle_lru(&self) -> Option<Conn<C>> {
let lock = self.conns.borrow_mut();
let pos = lock
.iter()
.position(|conn| conn.variant() == MetricVariant::Idle)?;
let conn = {
let mut lock = lock;
lock.swap_remove(pos)
};
Some(conn)
}
}
Loading