Skip to content

Commit

Permalink
Rust connpool: Error handling (#7585)
Browse files Browse the repository at this point in the history
This implements error handling for the Rust connection pool.

For disconnection errors: all disconnection errors are routed through the task.

For (re-)connection errors: a failed connection holds a spot in a block until someone queues for it. That acquisition request then gets an error rather than a connection. There is no retry logic at this moment, but the intent would be that it would live in the pool itself.
  • Loading branch information
mmastrac authored Jul 26, 2024
1 parent 2175067 commit 78fb6ff
Show file tree
Hide file tree
Showing 8 changed files with 704 additions and 264 deletions.
12 changes: 12 additions & 0 deletions edb/server/conn_pool/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -110,3 +110,15 @@ more connections are waiting on this block, we return the connection to the
block to be re-used immediately. If no connections are waiting but the block is
hungry, we return it. If the block is satisfied or overfull and we have hungry
blocks waiting, we transfer it to a hungry block that has waiters.

## Error Handling

The pool will attempt to provide a connection where possible, but connection
operations may not always be reliable. The error for a connection failure will
be routed through the acquire operation if the pool detects there are no other
potential sources for a connection for the acquire. Sources for a connection may
be a currently-connecting connection, a reconnecting connection, a connection
that is actively held by someone else or a connection that is sitting idle.

The pool does not currently retry, and retry logic should be included in the
connect operation.
51 changes: 37 additions & 14 deletions edb/server/conn_pool/src/algo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ constants! {
/// The weight we apply to waiting connections.
const DEMAND_WEIGHT_WAITING: usize = 3;
/// The weight we apply to active connections.
const DEMAND_WEIGHT_ACTIVE: usize = 277;
const DEMAND_WEIGHT_ACTIVE: usize = 4;
/// The minimum non-zero demand. This makes the demand calculations less noisy
/// when we are competing at lower levels of demand, allowing for more
/// reproducable results.
Expand All @@ -162,30 +162,30 @@ constants! {
/// The boost we apply to our own apparent hunger when releasing a connection.
/// This prevents excessive swapping when hunger is similar across various
/// backends.
const SELF_HUNGER_BOOST_FOR_RELEASE: usize = 160;
const SELF_HUNGER_BOOST_FOR_RELEASE: usize = 45;
/// The weight we apply to the difference between the target and required
/// connections when determining overfullness.
const HUNGER_DIFF_WEIGHT: usize = 20;
const HUNGER_DIFF_WEIGHT: usize = 3;
/// The weight we apply to waiters when determining hunger.
const HUNGER_WAITER_WEIGHT: usize = 0;
const HUNGER_WAITER_ACTIVE_WEIGHT: usize = 0;
const HUNGER_WAITER_WEIGHT: usize = 15;
const HUNGER_WAITER_ACTIVE_WEIGHT: usize = 2;
const HUNGER_ACTIVE_WEIGHT_DIVIDEND: usize = 9650;
/// The weight we apply to the oldest waiter's age in milliseconds (as a divisor).
#[range(1..=2000)]
const HUNGER_AGE_DIVISOR_WEIGHT: usize = 1360;
const HUNGER_AGE_DIVISOR_WEIGHT: usize = 707;

/// The weight we apply to the difference between the target and required
/// connections when determining overfullness.
const OVERFULL_DIFF_WEIGHT: usize = 20;
const OVERFULL_DIFF_WEIGHT: usize = 151;
/// The weight we apply to idle connections when determining overfullness.
const OVERFULL_IDLE_WEIGHT: usize = 100;
const OVERFULL_IDLE_WEIGHT: usize = 220;
/// This is divided by the youngest connection metric to penalize switching from
/// a backend which has changed recently.
const OVERFULL_CHANGE_WEIGHT_DIVIDEND: usize = 4690;
const OVERFULL_CHANGE_WEIGHT_DIVIDEND: usize = 57;
/// The weight we apply to waiters when determining overfullness.
const OVERFULL_WAITER_WEIGHT: usize = 4460;
const OVERFULL_WAITER_ACTIVE_WEIGHT: usize = 1300;
const OVERFULL_ACTIVE_WEIGHT_DIVIDEND: usize = 6620;
const OVERFULL_WAITER_WEIGHT: usize = 912;
const OVERFULL_WAITER_ACTIVE_WEIGHT: usize = 49;
const OVERFULL_ACTIVE_WEIGHT_DIVIDEND: usize = 951;
}

/// Determines the rebalance plan based on the current pool state.
Expand All @@ -199,6 +199,13 @@ pub enum RebalanceOp {
Close(Name),
}

/// Determines the shutdown plan based on the current pool state.
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum ShutdownOp {
/// Garbage collect a block.
Close(Name),
}

/// Determines the acquire plan based on the current pool state.
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum AcquireOp {
Expand Down Expand Up @@ -288,7 +295,8 @@ pub trait PoolAlgorithmDataBlock: PoolAlgorithmDataMetrics {
/// and there are waiting elements; otherwise, returns `None`.
fn hunger_score(&self, will_release: bool) -> Option<isize> {
let waiting = self.count(MetricVariant::Waiting);
let connecting = self.count(MetricVariant::Connecting);
let connecting =
self.count(MetricVariant::Connecting) + self.count(MetricVariant::Reconnecting);
let waiters = waiting.saturating_sub(connecting);
let current = self.total() - if will_release { 1 } else { 0 };
let target = self.target();
Expand Down Expand Up @@ -331,7 +339,8 @@ pub trait PoolAlgorithmDataBlock: PoolAlgorithmDataMetrics {
let idle = self.count(MetricVariant::Idle) + if will_release { 1 } else { 0 };
let current = self.total();
let target = self.target();
let connecting = self.count(MetricVariant::Connecting);
let connecting =
self.count(MetricVariant::Connecting) + self.count(MetricVariant::Reconnecting);
let waiting = self.count(MetricVariant::Waiting);
let waiters = waiting.saturating_sub(connecting);
let active_ms = self.avg_ms(MetricVariant::Active).max(MIN_TIME.get());
Expand Down Expand Up @@ -538,6 +547,20 @@ impl PoolConstraints {
);
}

/// Plan a shutdown.
pub fn plan_shutdown(&self, it: &impl VisitPoolAlgoData) -> Vec<ShutdownOp> {
let mut ops = vec![];
it.with_all(|name, block| {
let idle = block.count(MetricVariant::Idle);
let failed = block.count(MetricVariant::Failed);

for _ in 0..(idle + failed) {
ops.push(ShutdownOp::Close(name.clone()));
}
});
ops
}

/// Plan a rebalance to better match the target quotas of the blocks in the
/// pool.
pub fn plan_rebalance(&self, it: &impl VisitPoolAlgoData) -> Vec<RebalanceOp> {
Expand Down
Loading

0 comments on commit 78fb6ff

Please sign in to comment.