Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
mmastrac committed Jul 18, 2024
1 parent a7b82a4 commit 2a0dbfa
Show file tree
Hide file tree
Showing 8 changed files with 421 additions and 266 deletions.
49 changes: 42 additions & 7 deletions edb/server/conn_pool/src/algo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,8 @@ pub enum AcquireOp {
Steal(Name),
/// Wait for a connection.
Wait,
/// A connection cannot be established due to shutdown.
FailInShutdown,
}

/// Determines the release plan based on the current pool state.
Expand All @@ -231,8 +233,6 @@ pub enum ReleaseType {
Normal,
/// A release of a poisoned connection.
Poison,
/// A release of a drained connection.
Drain,
}

/// Generic trait to decouple the algorithm from the underlying pool blocks.
Expand All @@ -248,6 +248,8 @@ pub trait VisitPoolAlgoData: PoolAlgorithmDataPool {
fn with_all(&self, f: impl FnMut(&Name, &Self::Block));
/// Retreives a single block, returning `None` if the block doesn't exist.
fn with<T>(&self, db: &str, f: impl Fn(&Self::Block) -> T) -> Option<T>;
fn in_shutdown(&self) -> bool;
fn is_draining(&self, db: &str) -> bool;

#[inline]
fn target(&self, db: &str) -> usize {
Expand Down Expand Up @@ -538,6 +540,17 @@ impl PoolConstraints {
/// Plan a rebalance to better match the target quotas of the blocks in the
/// pool.
pub fn plan_rebalance(&self, it: &impl VisitPoolAlgoData) -> Vec<RebalanceOp> {
// In shutdown, we just want to close all idle connections where possible.
if it.in_shutdown() {
let mut ops = vec![];
it.with_all(|name, block| {
for _ in 0..block.count(MetricVariant::Idle) {
ops.push(RebalanceOp::Close(name.clone()))
}
});
return ops;
}

let mut current_pool_size = it.total();
let max_pool_size = self.max;

Expand All @@ -549,6 +562,14 @@ impl PoolConstraints {

for i in 0..MAX_REBALANCE_OPS.get() {
it.with_all(|name, block| {
// TODO: We should swap the order of iteration here so we
// can drain the whole block
if it.is_draining(name) && block.count(MetricVariant::Idle) > i {
changes.push(RebalanceOp::Close(name.clone()));
made_changes = true;
return;
}

// If there's room in the block, and room in the pool, and
// the block is bumping up against its current headroom, we'll grab
// another one.
Expand Down Expand Up @@ -591,7 +612,18 @@ impl PoolConstraints {
let mut s1 = "".to_owned();
let mut s2 = "".to_owned();

let mut tasks = vec![];

it.with_all(|name, block| {
// TODO: These could potentially be transferred instead, but
// drain/shutdown are pretty unlikely.
if it.is_draining(name) {
for _ in 0..block.count(MetricVariant::Idle) {
tasks.push(RebalanceOp::Close(name.clone()))
}
return;
}

if let Some(value) = block.hunger_score(false) {
if tracing::enabled!(tracing::Level::TRACE) {
s1 += &format!("{name}={value} ");
Expand All @@ -616,8 +648,6 @@ impl PoolConstraints {
overloaded.sort();
hungriest.sort();

let mut tasks = vec![];

for _ in 0..MAX_REBALANCE_OPS.get() {
let Some((_, to)) = hungriest.pop() else {
// TODO: close more than one?
Expand All @@ -643,6 +673,10 @@ impl PoolConstraints {

/// Plan a connection acquisition.
pub fn plan_acquire(&self, db: &str, it: &impl VisitPoolAlgoData) -> AcquireOp {
if it.in_shutdown() {
return AcquireOp::FailInShutdown;
}

// If the block is new, we need to perform an initial adjustment to
// ensure this block gets some capacity.
if it.ensure_block(db, DEMAND_MINIMUM.get() * DEMAND_HISTORY_LENGTH) {
Expand Down Expand Up @@ -691,12 +725,13 @@ impl PoolConstraints {
release_type: ReleaseType,
it: &impl VisitPoolAlgoData,
) -> ReleaseOp {
if it.is_draining(db) {
return ReleaseOp::Discard;
}

if release_type == ReleaseType::Poison {
return ReleaseOp::Reopen;
}
if release_type == ReleaseType::Drain {
return ReleaseOp::Discard;
}

let current_pool_size = it.total();
let max_pool_size = self.max;
Expand Down
83 changes: 83 additions & 0 deletions edb/server/conn_pool/src/block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -406,13 +406,15 @@ impl<C: Connector, D: Default> Block<C, D> {
pub struct Blocks<C: Connector, D: Default = ()> {
map: RefCell<HashMap<Name, Rc<Block<C, D>>>>,
metrics: Rc<MetricsAccum>,
drain: Drain
}

impl<C: Connector, D: Default> Default for Blocks<C, D> {
fn default() -> Self {
Self {
map: RefCell::new(HashMap::default()),
metrics: Rc::new(MetricsAccum::default()),
drain: Default::default()
}
}
}
Expand Down Expand Up @@ -530,6 +532,14 @@ impl<C: Connector> VisitPoolAlgoData for Blocks<C, PoolAlgoTargetData> {
}
});
}

fn in_shutdown(&self) -> bool {
self.drain.shutdown.get()
}

fn is_draining(&self, db: &str) -> bool {
self.drain.is_draining(db)
}
}

impl<C: Connector, D: Default> Blocks<C, D> {
Expand Down Expand Up @@ -713,6 +723,79 @@ impl<C: Connector, D: Default> Blocks<C, D> {
}
}

impl <C: Connector, D: Default> AsRef<Drain> for Blocks<C, D> {
fn as_ref(&self) -> &Drain {
&self.drain
}
}

/// Holds the current drainage and shutdown state for the `Pool`.
#[derive(Default, Debug)]
pub struct Drain {
drain_all: Cell<usize>,
drain: RefCell<HashMap<Name, usize>>,
shutdown: Cell<bool>,
}

impl Drain {
pub fn shutdown(&self) {
self.shutdown.set(true)
}

/// Lock all connections for draining.
pub fn lock_all<T: AsRef<Drain>>(this: T) -> DrainLock<T> {
let drain = this.as_ref();
drain.drain_all.set(drain.drain_all.get() + 1);
DrainLock {
db: None,
has_drain: this,
}
}

// Lock a specific connection for draining.
pub fn lock<T: AsRef<Drain>>(this: T, db: Name) -> DrainLock<T> {
{
let mut drain = this.as_ref().drain.borrow_mut();
drain.entry(db.clone()).and_modify(|v| *v += 1).or_default();
}
DrainLock {
db: Some(db),
has_drain: this,
}
}

/// Is this connection draining?
fn is_draining(&self, db: &str) -> bool {
self.drain_all.get() > 0 || self.drain.borrow().contains_key(db) || self.shutdown.get()
}
}

/// Provides a RAII lock for a db- or whole-pool drain operation.
pub struct DrainLock<T: AsRef<Drain>> {
db: Option<Name>,
has_drain: T,
}

impl<T: AsRef<Drain>> Drop for DrainLock<T> {
fn drop(&mut self) {
if let Some(name) = self.db.take() {
let mut drain = self.has_drain.as_ref().drain.borrow_mut();
if let Some(count) = drain.get_mut(&name) {
if *count > 1 {
*count -= 1;
} else {
drain.remove(&name);
}
} else {
unreachable!()
}
} else {
let this = self.has_drain.as_ref();
this.drain_all.set(this.drain_all.get() - 1);
}
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
106 changes: 30 additions & 76 deletions edb/server/conn_pool/src/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use crate::{
AcquireOp, PoolAlgoTargetData, PoolAlgorithmDataBlock, PoolAlgorithmDataMetrics,
PoolConstraints, RebalanceOp, ReleaseOp, ReleaseType, VisitPoolAlgoData,
},
block::{Blocks, Name},
block::{Blocks, Drain, Name},
conn::{ConnError, ConnHandle, ConnResult, Connector},
metrics::{MetricVariant, PoolMetrics},
};
Expand Down Expand Up @@ -180,9 +180,6 @@ impl<C: Connector> Pool<C> {
/// controls the lock for the connection and may be dropped to release it
/// back into the pool.
pub async fn acquire(self: &Rc<Self>, db: &str) -> ConnResult<PoolHandle<C>> {
if self.drain.shutdown.get() {
return Err(ConnError::Shutdown);
}
self.dirty.set(true);
let plan = self.config.constraints.plan_acquire(db, &self.blocks);
trace!("Acquire {db}: {plan:?}");
Expand All @@ -193,7 +190,10 @@ impl<C: Connector> Pool<C> {
AcquireOp::Steal(from) => {
tokio::task::spawn_local(self.blocks.task_steal(&self.connector, db, &from));
}
AcquireOp::Wait => {}
AcquireOp::Wait => {},
AcquireOp::FailInShutdown => {
return Err(ConnError::Shutdown);
}
};
let conn = self.blocks.queue(db).await?;

Expand All @@ -204,9 +204,7 @@ impl<C: Connector> Pool<C> {
fn release(self: Rc<Self>, conn: ConnHandle<C>, poison: bool) {
let db = &conn.state.db_name;
self.dirty.set(true);
let release_type = if self.drain.is_draining(db) {
ReleaseType::Drain
} else if poison {
let release_type = if poison {
ReleaseType::Poison
} else {
ReleaseType::Normal
Expand Down Expand Up @@ -263,6 +261,29 @@ impl<C: Connector> Pool<C> {
drop(lock);
}

/// Drain all idle connections to the given database. All connections will be
/// poisoned on return and this method will return when the given database
/// is idle. Multiple calls to this method with the same database are valid,
/// and the drain operation will be kept alive as long as one future has not
/// been dropped.
///
/// It is valid, though unadvisable, to request a connection during this
/// period. The connection will be poisoned on return as well.
///
/// Dropping this future cancels the drain operation.
pub async fn drain_idle(self: Rc<Self>, db: &str) {
// If the block doesn't exist, we can return
let Some(name) = self.blocks.name(db) else {
return;
};

let lock = Drain::lock(self.clone(), name);
while self.blocks.metrics(db).get(MetricVariant::Idle) > 0 {
tokio::time::sleep(Duration::from_millis(10)).await;
}
drop(lock);
}

/// Drain all connections in the pool, returning when the pool is completely
/// empty. Multiple calls to this method with the same database are valid,
/// and the drain operation will be kept alive as long as one future has not
Expand Down Expand Up @@ -313,74 +334,7 @@ impl<C: Connector> Pool<C> {

impl<C: Connector> AsRef<Drain> for Rc<Pool<C>> {
fn as_ref(&self) -> &Drain {
&self.drain
}
}

/// Holds the current drainage and shutdown state for the `Pool`.
#[derive(Default, Debug)]
struct Drain {
drain_all: Cell<usize>,
drain: RefCell<HashMap<Name, usize>>,
shutdown: Cell<bool>,
}

impl Drain {
pub fn shutdown(&self) {
self.shutdown.set(true)
}

/// Lock all connections for draining.
pub fn lock_all<T: AsRef<Drain>>(this: T) -> DrainLock<T> {
let drain = this.as_ref();
drain.drain_all.set(drain.drain_all.get() + 1);
DrainLock {
db: None,
has_drain: this,
}
}

// Lock a specific connection for draining.
pub fn lock<T: AsRef<Drain>>(this: T, db: Name) -> DrainLock<T> {
{
let mut drain = this.as_ref().drain.borrow_mut();
drain.entry(db.clone()).and_modify(|v| *v += 1).or_default();
}
DrainLock {
db: Some(db),
has_drain: this,
}
}

/// Is this connection draining?
fn is_draining(&self, db: &str) -> bool {
self.drain_all.get() > 0 || self.drain.borrow().contains_key(db) || self.shutdown.get()
}
}

/// Provides a RAII lock for a db- or whole-pool drain operation.
struct DrainLock<T: AsRef<Drain>> {
db: Option<Name>,
has_drain: T,
}

impl<T: AsRef<Drain>> Drop for DrainLock<T> {
fn drop(&mut self) {
if let Some(name) = self.db.take() {
let mut drain = self.has_drain.as_ref().drain.borrow_mut();
if let Some(count) = drain.get_mut(&name) {
if *count > 1 {
*count -= 1;
} else {
drain.remove(&name);
}
} else {
unreachable!()
}
} else {
let this = self.has_drain.as_ref();
this.drain_all.set(this.drain_all.get() - 1);
}
self.blocks.as_ref()
}
}

Expand Down
Loading

0 comments on commit 2a0dbfa

Please sign in to comment.