Skip to content

Commit

Permalink
Tests passing
Browse files Browse the repository at this point in the history
  • Loading branch information
mmastrac committed Jul 4, 2024
1 parent a4113fe commit 3a844b3
Show file tree
Hide file tree
Showing 5 changed files with 150 additions and 51 deletions.
112 changes: 85 additions & 27 deletions edb/server/conn_pool/src/algo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ const DEMAND_WEIGHT_WAITING: usize = 1;
const DEMAND_WEIGHT_ACTIVE: usize = 1;
/// The historical length of data we'll maintain for demand.
const DEMAND_HISTORY_LENGTH: usize = 4;
/// The minimum non-zero demand.
const DEMAND_MINIMUM: usize = 16;
/// The maximum-minimum connection count we'll allocate to connections if there
/// is more capacity than backends.
const MAXIMUM_SHARED_TARGET: usize = 1;
Expand All @@ -46,6 +48,8 @@ pub enum RebalanceOp {
Transfer { to: Name, from: Name },
/// Create a block
Create(Name),
/// Garbage collect a block.
Close(Name),
}

#[derive(Debug, Clone, PartialEq, Eq)]
Expand Down Expand Up @@ -105,11 +109,12 @@ pub trait PoolAlgorithmDataBlock: PoolAlgorithmDataMetrics {
/// Returns an `Option<NonZeroUsize>` containing the hunger score if the current state is below the target
/// and there are waiting elements; otherwise, returns `None`.
fn hunger_score(&self, will_release: bool) -> Option<NonZeroUsize> {
let waiters = self
.count(MetricVariant::Waiting)
.saturating_sub(self.count(MetricVariant::Connecting));
let waiting = self.count(MetricVariant::Waiting);
let connecting = self.count(MetricVariant::Connecting);
let waiters = waiting.saturating_sub(connecting);
let current = self.total() - if will_release { 1 } else { 0 };
let target = self.target();

// If we have more connections than our target, we are not hungry. We
// may still be hungry if current == target.
if current > target {
Expand Down Expand Up @@ -195,46 +200,64 @@ pub struct PoolConstraints {

impl PoolConstraints {
/// Adjust the quota targets for each block within the pool
pub fn adjust<'a, 'b, T, U>(&self, it: &'a U)
pub fn adjust<'a, 'b, T, U>(&self, it: &'a U, db: Option<&str>)
where
U: VisitPoolAlgoData<T>,
T: 'b,
T: PoolAlgorithmDataBlock,
{
// Once we've adjusted the constraints, reset the max settings
defer!(it.reset_max());
defer!(if db.is_none() {
it.reset_max()
});

// First, compute the overall request load and number of backend targets
let mut total_demand = 0;
let mut total_target = 0;
let mut s = "".to_owned();

it.with_all(|_name, data| {
it.with_all(|name, data| {
// We calculate demand based on the estimated connection active time
// multiplied by the active + waiting counts. This gives us an
// estimated database time statistic we can use for relative
// weighting.
let demand = data.avg_ms(MetricVariant::Active).max(MIN_ACTIVE_TIME)
* (data.max(MetricVariant::Waiting) * DEMAND_WEIGHT_WAITING
+ data.max(MetricVariant::Active) * DEMAND_WEIGHT_ACTIVE);
data.insert_demand(demand as _);
let demand = data.demand();
let active = data.max(MetricVariant::Active);
let active_ms = data.avg_ms(MetricVariant::Active).max(MIN_ACTIVE_TIME);
let waiting = data.max(MetricVariant::Waiting);
let idle = active == 0 && waiting == 0;
let demand = if Some(name.as_ref()) == db {
DEMAND_MINIMUM
} else if idle {
0
} else {
// Note that we add DEMAND_HISTORY_LENGTH to ensure the average is non-zero
let demand = (active_ms
* (waiting * DEMAND_WEIGHT_WAITING + active * DEMAND_WEIGHT_ACTIVE))
.max(DEMAND_MINIMUM);
demand
};
if db.is_none() || Some(name.as_ref()) == db {
data.insert_demand(demand as _);
}
let demand_avg = data.demand();

if tracing::enabled!(tracing::Level::TRACE) {
s += &format!(
"{name}={demand_avg}/{demand} (a={},w={},t={}ms) ",
active, waiting, active_ms
);
}

total_demand += demand;
total_target += 1;
total_demand += demand_avg;
if demand_avg > 0 {
total_target += 1;
} else {
data.set_target(0);
}
});

if tracing::enabled!(tracing::Level::TRACE) {
let mut s = "".to_owned();
it.with_all(|name, data| {
let demand = data.demand();
s += &format!(
"{name}={demand} (a={},w={},t={}ms) ",
data.max(MetricVariant::Active),
data.max(MetricVariant::Waiting),
data.avg_ms(MetricVariant::Active)
);
});
trace!("Demand: {}", s);
trace!("Demand: {total_target} {}", s);
}

// Empty pool, no math
Expand All @@ -251,6 +274,9 @@ impl PoolConstraints {

it.with_all(|_name, data| {
let demand = data.demand();
if demand == 0 {
return;
}

// Give everyone what they requested, plus a share of the spare
// capacity. If there is leftover spare capacity, that capacity
Expand Down Expand Up @@ -279,12 +305,16 @@ impl PoolConstraints {
let mut allocate = vec![];
let mut made_changes = false;

for _ in 0..MAX_REBALANCE_CREATE {
for i in 0..MAX_REBALANCE_CREATE {
it.with_all(|name, block| {
if block.target() > block.total() && current_pool_size < max_pool_size {
allocate.push(RebalanceOp::Create(name.clone()));
current_pool_size += 1;
made_changes = true;
} else if block.total() > block.target() && block.count(MetricVariant::Idle) > i
{
allocate.push(RebalanceOp::Close(name.clone()));
made_changes = true;
}
});
if !made_changes {
Expand All @@ -300,14 +330,27 @@ impl PoolConstraints {
let mut overloaded = vec![];
let mut hungriest = vec![];

let mut s1 = "".to_owned();
let mut s2 = "".to_owned();

it.with_all(|name, block| {
if let Some(value) = block.hunger_score(false) {
if tracing::enabled!(tracing::Level::TRACE) {
s1 += &format!("{name}={value} ");
}
hungriest.push((value, name.clone()))
} else if let Some(value) = block.overfull_score(false) {
if tracing::enabled!(tracing::Level::TRACE) {
s2 += &format!("{name}={value} ");
}
overloaded.push((value, name.clone()))
}
});

if tracing::enabled!(tracing::Level::TRACE) {
trace!("Hunger: {s1}");
trace!("Overfullness: {s2}");
}
overloaded.sort();
hungriest.sort();

Expand All @@ -316,6 +359,11 @@ impl PoolConstraints {
// TODO: rebalance more than one?
loop {
let Some((_, to)) = hungriest.pop() else {
// TODO: close more than one?
while let Some((_, from)) = overloaded.pop() {
tasks.push(RebalanceOp::Close(from.clone()));
break;
}
break;
};

Expand All @@ -337,7 +385,7 @@ impl PoolConstraints {
{
// If the block is new, we need to perform an initial adjustment.
if it.ensure_block(db) {
self.adjust(it);
self.adjust(it, Some(db));
}

let target_block_size = it.target(db);
Expand Down Expand Up @@ -398,6 +446,7 @@ impl PoolConstraints {
trace!("Block {db} is overfull ({overfull}), trying to release");
let mut max = 0;
let mut which = None;
let mut s = "".to_owned();
it.with_all(|name, block| {
let is_self = &**name == db;
if let Some(hunger) = block.hunger_score(is_self) {
Expand All @@ -406,6 +455,10 @@ impl PoolConstraints {
if is_self {
hunger += SELF_HUNGER_BOOST_FOR_RELEASE;
}

if tracing::enabled!(tracing::Level::TRACE) {
s += &format!("{name}={hunger} ");
}
// If this current block has equal hunger to the hungriest, it takes priority
if hunger > max {
which = if is_self { None } else { Some(name.clone()) };
Expand All @@ -414,6 +467,10 @@ impl PoolConstraints {
}
});

if tracing::enabled!(tracing::Level::TRACE) {
trace!("Hunger: {s}");
}

match which {
Some(name) => {
trace!("Releasing to {name:?} with score {max}");
Expand Down Expand Up @@ -488,14 +545,15 @@ mod tests {
.await
}

#[test(tokio::test(flavor = "current_thread"))]
#[test(tokio::test(flavor = "current_thread", start_paused = true))]
async fn test_pool_starved() -> Result<()> {
LocalSet::new()
.run_until(async {
let connector = BasicConnector::no_delay();
let config = PoolConfig::suggested_default_for(10);
let blocks = Blocks::default();
let futures = FuturesUnordered::new();
// Room for these
for i in 0..5 {
let db = format!("{i}");
assert_eq!(
Expand Down
31 changes: 26 additions & 5 deletions edb/server/conn_pool/src/block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,8 @@ impl<C: Connector, D: Default> Block<C, D> {
assert_eq!(
self.conn_count(),
self.conns.borrow().len(),
"Blocks failed consistency check. Total connection count was wrong."
"Block {} failed consistency check. Total connection count was wrong.",
self.db_name
);
let conn_metrics = MetricsAccum::default();
for conn in &*self.conns.borrow() {
Expand All @@ -142,11 +143,20 @@ impl<C: Connector, D: Default> Block<C, D> {
None
}

fn try_get_used(&self) -> Option<Conn<C>> {
for conn in &*self.conns.borrow() {
if conn.variant() == MetricVariant::Idle {
return Some(conn.clone());
}
}
None
}

fn try_take_used(&self) -> Option<Conn<C>> {
let mut lock = self.conns.borrow_mut();
let pos = lock
.iter()
.position(|conn| conn.try_lock(&self.state.metrics));
.position(|conn| conn.variant() == MetricVariant::Idle);
if let Some(index) = pos {
let conn = lock.remove(index);
return Some(conn);
Expand Down Expand Up @@ -233,9 +243,7 @@ impl<C: Connector, D: Default> Block<C, D> {
fn task_close_one(self: Rc<Self>, connector: &C) -> impl Future<Output = ConnResult<()>> {
let conn = {
consistency_check!(self);
let conn = self
.try_acquire_used()
.expect("Could not acquire a connection");
let conn = self.try_get_used().expect("Could not acquire a connection");
conn.close(connector, &self.state.metrics);
conn
};
Expand Down Expand Up @@ -726,6 +734,8 @@ mod tests {
blocks.create(&connector, "db").await?;
blocks.create(&connector, "db").await?;
blocks.create(&connector, "db").await?;
blocks.metrics("db").reset_max();
blocks.metrics("db2").reset_max();
assert_eq!(1, blocks.block_count());
assert_block!(blocks "db" has 3 Idle);
assert_block!(blocks "db2" is empty);
Expand All @@ -736,6 +746,9 @@ mod tests {
assert_eq!(2, blocks.block_count());
assert_block!(blocks "db" is empty);
assert_block!(blocks "db2" has 3 Idle);
// Should not activate a connection to steal it
assert_eq!(0, blocks.metrics("db").max(MetricVariant::Active));
assert_eq!(0, blocks.metrics("db2").max(MetricVariant::Active));
Ok(())
}

Expand All @@ -747,6 +760,8 @@ mod tests {
blocks.create(&connector, "db").await?;
blocks.create(&connector, "db").await?;
blocks.create(&connector, "db").await?;
blocks.metrics("db").reset_max();
blocks.metrics("db2").reset_max();
assert_eq!(1, blocks.block_count());
assert_block!(blocks "db" has 3 Idle);
assert_block!(blocks "db2" is empty);
Expand All @@ -755,6 +770,9 @@ mod tests {
assert_eq!(2, blocks.block_count());
assert_block!(blocks "db" has 2 Idle);
assert_block!(blocks "db2" has 1 Idle);
// Should not activate a connection to move it
assert_eq!(1, blocks.metrics("db").max(MetricVariant::Active));
assert_eq!(0, blocks.metrics("db2").max(MetricVariant::Active));
Ok(())
}

Expand All @@ -765,13 +783,16 @@ mod tests {
assert_eq!(0, blocks.block_count());
blocks.create(&connector, "db").await?;
blocks.create(&connector, "db").await?;
blocks.metrics("db").reset_max();
assert_eq!(1, blocks.block_count());
assert_block!(blocks "db" has 2 Idle);
blocks.task_close_one(&connector, "db").await?;
blocks.task_close_one(&connector, "db").await?;
assert_block!(blocks "db" is empty);
// Hasn't GC'd yet
assert_eq!(1, blocks.block_count());
// Should not activate a connection to close it
assert_eq!(0, blocks.metrics("db").max(MetricVariant::Active));
Ok(())
}
}
12 changes: 9 additions & 3 deletions edb/server/conn_pool/src/conn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,9 +100,9 @@ impl<C: Connector> Conn<C> {

pub fn close(&self, connector: &C, metrics: &MetricsAccum) {
self.transition(|inner| match inner {
ConnInner::Active(t, conn, ..) => {
ConnInner::Idle(t, conn, ..) => {
metrics.transition(
MetricVariant::Active,
MetricVariant::Idle,
MetricVariant::Disconnecting,
t.elapsed(),
);
Expand All @@ -116,7 +116,13 @@ impl<C: Connector> Conn<C> {
pub fn transfer(&self, connector: &C, from: &MetricsAccum, to: &MetricsAccum, db: &str) {
self.untrack(from);
self.transition(|inner| match inner {
ConnInner::Active(_, conn, ..) => {
ConnInner::Idle(t, conn, ..) => {
from.inc_all_time(MetricVariant::Disconnecting);
to.insert(MetricVariant::Connecting);
let f = connector.reconnect(conn, db).boxed_local();
ConnInner::Connecting(Instant::now(), f)
}
ConnInner::Active(t, conn, ..) => {
from.inc_all_time(MetricVariant::Disconnecting);
to.insert(MetricVariant::Connecting);
let f = connector.reconnect(conn, db).boxed_local();
Expand Down
Loading

0 comments on commit 3a844b3

Please sign in to comment.