Skip to content

Commit

Permalink
Cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
mmastrac committed Jul 23, 2024
1 parent 396abc4 commit 0141132
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 42 deletions.
17 changes: 13 additions & 4 deletions edb/server/conn_pool/src/algo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,13 @@ pub enum RebalanceOp {
Close(Name),
}

/// Determines the shutdown plan based on the current pool state.
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum ShutdownOp {
/// Garbage collect a block.
Close(Name),
}

/// Determines the acquire plan based on the current pool state.
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum AcquireOp {
Expand Down Expand Up @@ -288,7 +295,8 @@ pub trait PoolAlgorithmDataBlock: PoolAlgorithmDataMetrics {
/// and there are waiting elements; otherwise, returns `None`.
fn hunger_score(&self, will_release: bool) -> Option<isize> {
let waiting = self.count(MetricVariant::Waiting);
let connecting = self.count(MetricVariant::Connecting);
let connecting =
self.count(MetricVariant::Connecting) + self.count(MetricVariant::Reconnecting);
let waiters = waiting.saturating_sub(connecting);
let current = self.total() - if will_release { 1 } else { 0 };
let target = self.target();
Expand Down Expand Up @@ -331,7 +339,8 @@ pub trait PoolAlgorithmDataBlock: PoolAlgorithmDataMetrics {
let idle = self.count(MetricVariant::Idle) + if will_release { 1 } else { 0 };
let current = self.total();
let target = self.target();
let connecting = self.count(MetricVariant::Connecting);
let connecting =
self.count(MetricVariant::Connecting) + self.count(MetricVariant::Reconnecting);
let waiting = self.count(MetricVariant::Waiting);
let waiters = waiting.saturating_sub(connecting);
let active_ms = self.avg_ms(MetricVariant::Active).max(MIN_TIME.get());
Expand Down Expand Up @@ -539,14 +548,14 @@ impl PoolConstraints {
}

/// Plan a shutdown.
pub fn plan_shutdown(&self, it: &impl VisitPoolAlgoData) -> Vec<RebalanceOp> {
pub fn plan_shutdown(&self, it: &impl VisitPoolAlgoData) -> Vec<ShutdownOp> {
let mut ops = vec![];
it.with_all(|name, block| {
let idle = block.count(MetricVariant::Idle);
let failed = block.count(MetricVariant::Failed);

for _ in 0..(idle + failed) {
ops.push(RebalanceOp::Close(name.clone()));
ops.push(ShutdownOp::Close(name.clone()));
}
});
ops
Expand Down
46 changes: 22 additions & 24 deletions edb/server/conn_pool/src/conn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -190,31 +190,29 @@ impl<C: Connector> Conn<C> {
);

let res = match &mut *lock {
ConnInner::Connecting(t, f) | ConnInner::Reconnecting(t, f) => match ready!(f.poll_unpin(cx)) {
Ok(c) => {
let elapsed = t.elapsed();
debug_assert!(to == MetricVariant::Active || to == MetricVariant::Idle);
let from = (&std::mem::replace(&mut *lock, ConnInner::Transition)).into();
metrics.transition(from, to, elapsed);
if to == MetricVariant::Active {
*lock = ConnInner::Active(Instant::now(), c);
} else {
*lock = ConnInner::Idle(Instant::now(), c);
ConnInner::Connecting(t, f) | ConnInner::Reconnecting(t, f) => {
match ready!(f.poll_unpin(cx)) {
Ok(c) => {
let elapsed = t.elapsed();
debug_assert!(to == MetricVariant::Active || to == MetricVariant::Idle);
let from = (&std::mem::replace(&mut *lock, ConnInner::Transition)).into();
metrics.transition(from, to, elapsed);
if to == MetricVariant::Active {
*lock = ConnInner::Active(Instant::now(), c);
} else {
*lock = ConnInner::Idle(Instant::now(), c);
}
Ok(())
}
Err(err) => {
let elapsed = t.elapsed();
let from = (&std::mem::replace(&mut *lock, ConnInner::Transition)).into();
metrics.transition(from, MetricVariant::Failed, elapsed);
*lock = ConnInner::Failed(err);
Err(ConnError::TaskFailed)
}
Ok(())
}
Err(err) => {
let elapsed = t.elapsed();
let from = (&std::mem::replace(&mut *lock, ConnInner::Transition)).into();
metrics.transition(
from,
MetricVariant::Failed,
elapsed,
);
*lock = ConnInner::Failed(err);
Err(ConnError::TaskFailed)
}
},
}
ConnInner::Disconnecting(t, f) => match ready!(f.poll_unpin(cx)) {
Ok(_) => {
debug_assert_eq!(to, MetricVariant::Closed);
Expand Down Expand Up @@ -267,7 +265,7 @@ impl<C: Connector> Conn<C> {
ConnInner::Closed | ConnInner::Failed(..) => {
metrics.remove(self.variant());
}
_ => unreachable!()
_ => unreachable!(),
}
}
}
Expand Down
16 changes: 2 additions & 14 deletions edb/server/conn_pool/src/pool.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::{
algo::{
AcquireOp, PoolAlgoTargetData, PoolAlgorithmDataBlock, PoolAlgorithmDataMetrics,
PoolConstraints, RebalanceOp, ReleaseOp, ReleaseType, VisitPoolAlgoData,
PoolConstraints, RebalanceOp, ReleaseOp, ReleaseType, ShutdownOp, VisitPoolAlgoData,
},
block::{Blocks, Name},
conn::{ConnError, ConnHandle, ConnResult, Connector},
Expand Down Expand Up @@ -158,19 +158,7 @@ impl<C: Connector> Pool<C> {
for op in self.config.constraints.plan_shutdown(&self.blocks) {
trace!("Shutdown: {op:?}");
match op {
RebalanceOp::Transfer { from, to } => {
tokio::task::spawn_local(self.blocks.task_steal(
&self.connector,
&to,
&from,
));
}
RebalanceOp::Create(name) => {
tokio::task::spawn_local(
self.blocks.task_create_one(&self.connector, &name),
);
}
RebalanceOp::Close(name) => {
ShutdownOp::Close(name) => {
tokio::task::spawn_local(
self.blocks.task_close_one(&self.connector, &name),
);
Expand Down

0 comments on commit 0141132

Please sign in to comment.