From 0f9d34b6d121db9607fb180eb31b929e54cd2b1d Mon Sep 17 00:00:00 2001 From: Matt Mastracci Date: Mon, 15 Jul 2024 11:22:05 -0600 Subject: [PATCH] Review feedback --- Cargo.lock | 263 ++++++++++++++++++- edb/server/conn_pool/Cargo.toml | 3 + edb/server/conn_pool/src/algo.rs | 417 +++++++++++++++++++++--------- edb/server/conn_pool/src/block.rs | 15 +- edb/server/conn_pool/src/conn.rs | 30 ++- edb/server/conn_pool/src/lib.rs | 3 + edb/server/conn_pool/src/pool.rs | 335 +++++++++++++++++++----- edb/server/conn_pool/src/test.rs | 95 ++++++- tests/test_server_pool.py | 8 +- 9 files changed, 954 insertions(+), 215 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 852ceea87163..28bf46d3b189 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -17,6 +17,18 @@ version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f26201604c87b1e01bd3d98f8d5d9a8fcbb815e8cedb41ffccbeb4bf593a35fe" +[[package]] +name = "ahash" +version = "0.8.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e89da841a80418a9b391ebaea17f5c112ffaaa96f621d2c285b5174da76b9011" +dependencies = [ + "cfg-if", + "once_cell", + "version_check", + "zerocopy", +] + [[package]] name = "aho-corasick" version = "1.1.3" @@ -26,6 +38,12 @@ dependencies = [ "memchr", ] +[[package]] +name = "allocator-api2" +version = "0.2.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5c6cb57a04249c6480766f7f7cef5467412af1490f8d1e243141daddada3264f" + [[package]] name = "anstream" version = "0.6.14" @@ -102,6 +120,17 @@ version = "0.9.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "eab1c04a571841102f5345a8fc0f6bb3d31c315dec879b5c6e42e40ce7ffa34e" +[[package]] +name = "atty" +version = "0.2.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d9b39be18770d11421cdb1b9947a45dd3f37e93092cbf377614828a319d5fee8" +dependencies = [ + "hermit-abi 0.1.19", + "libc", + "winapi", +] + [[package]] name = "autocfg" version = "1.1.0" @@ -243,7 +272,9 @@ dependencies = [ "consume_on_drop", "derive_more", "futures", - "itertools", + "genetic_algorithm", + "itertools 0.13.0", + "lru", "pretty_assertions", "pyo3", "rand", @@ -283,6 +314,62 @@ dependencies = [ "libc", ] +[[package]] +name = "crossbeam" +version = "0.8.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1137cd7e7fc0fb5d3c5a8678be38ec56e819125d8d7907411fe24ccb943faca8" +dependencies = [ + "crossbeam-channel", + "crossbeam-deque", + "crossbeam-epoch", + "crossbeam-queue", + "crossbeam-utils", +] + +[[package]] +name = "crossbeam-channel" +version = "0.5.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "33480d6946193aa8033910124896ca395333cae7e2d1113d1fef6c3272217df2" +dependencies = [ + "crossbeam-utils", +] + +[[package]] +name = "crossbeam-deque" +version = "0.8.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "613f8cc01fe9cf1a3eb3d7f488fd2fa8388403e97039e2f73692932e291a770d" +dependencies = [ + "crossbeam-epoch", + "crossbeam-utils", +] + +[[package]] +name = "crossbeam-epoch" +version = "0.9.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5b82ac4a3c2ca9c3460964f020e1402edd5753411d7737aa39c3714ad1b5420e" +dependencies = [ + "crossbeam-utils", +] + +[[package]] +name = "crossbeam-queue" +version = "0.3.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "df0346b5d5e76ac2fe4e327c5fd1118d6be7c51dfb18f9b7922923f287471e35" +dependencies = [ + "crossbeam-utils", +] + +[[package]] +name = "crossbeam-utils" +version = "0.8.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "22ec99545bb0ed0ea7bb9b8e1e9122ea386ff8a48c0922e43f36d45ab09e0e80" + [[package]] name = "crypto-common" version = "0.1.6" @@ -431,6 +518,19 @@ dependencies = [ "log", ] +[[package]] +name = "env_logger" +version = "0.9.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a12e6657c4c97ebab115a42dcee77225f7f482cdd841cf7088c657a42e9e00e7" +dependencies = [ + "atty", + "humantime", + "log", + "regex", + "termcolor", +] + [[package]] name = "env_logger" version = "0.11.3" @@ -449,6 +549,15 @@ version = "1.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5443807d6dff69373d433ab9ef5378ad8df50ca6298caf15de6e52e24aaf54d5" +[[package]] +name = "factorial" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6398219c33c5768705156b8a69c99c463a51847e2d3647f5adce32bb6e990b1c" +dependencies = [ + "num-traits", +] + [[package]] name = "futures" version = "0.3.30" @@ -554,6 +663,24 @@ dependencies = [ "version_check", ] +[[package]] +name = "genetic_algorithm" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6310c399e71285ca3a06cedd9df0c5c7a53414316579b75883ca3d8521d594e5" +dependencies = [ + "crossbeam", + "env_logger 0.9.3", + "factorial", + "itertools 0.10.5", + "log", + "num", + "rand", + "rayon", + "streaming-stats", + "thread_local", +] + [[package]] name = "getrandom" version = "0.2.14" @@ -601,6 +728,10 @@ name = "hashbrown" version = "0.14.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "290f1a1d9242c78d09ce40a5e87e7554ee637af1351968159f4952f028f75604" +dependencies = [ + "ahash", + "allocator-api2", +] [[package]] name = "heck" @@ -614,12 +745,27 @@ version = "0.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2304e00983f87ffb38b55b444b5e3b60a884b5d30c0fca7d82fe33449bbe55ea" +[[package]] +name = "hermit-abi" +version = "0.1.19" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "62b467343b94ba476dcb2500d242dadbb39557df889310ac77c5d99100aaac33" +dependencies = [ + "libc", +] + [[package]] name = "hermit-abi" version = "0.3.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d231dfb89cfffdbc30e7fc41579ed6066ad03abda9e567ccafae602b97ec5024" +[[package]] +name = "humantime" +version = "2.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9a3a5bfb195931eeb336b2a7b4d761daec841b97f947d34394601737a7bba5e4" + [[package]] name = "indexmap" version = "1.9.3" @@ -652,6 +798,15 @@ version = "1.70.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f8478577c03552c21db0e2724ffb8986a5ce7af88107e6be5d2ee6e158c12800" +[[package]] +name = "itertools" +version = "0.10.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b0fd2260e829bddf4cb6ea802289de2f86d6a7a690192fbe91b3f46e0f2c8473" +dependencies = [ + "either", +] + [[package]] name = "itertools" version = "0.13.0" @@ -701,6 +856,15 @@ version = "0.4.21" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "90ed8c1e510134f979dbc4f070f87d4313098b704861a105fe34231c70a3901c" +[[package]] +name = "lru" +version = "0.12.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d3262e75e648fce39813cb56ac41f3c3e3f65217ebf3844d818d1f9398cfb0dc" +dependencies = [ + "hashbrown 0.14.3", +] + [[package]] name = "matchers" version = "0.1.0" @@ -783,6 +947,20 @@ dependencies = [ "winapi", ] +[[package]] +name = "num" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3135b08af27d103b0a51f2ae0f8632117b7b185ccf931445affa8df530576a41" +dependencies = [ + "num-bigint 0.4.4", + "num-complex", + "num-integer", + "num-iter", + "num-rational", + "num-traits", +] + [[package]] name = "num-bigint" version = "0.2.6" @@ -835,12 +1013,24 @@ dependencies = [ "num-traits", ] +[[package]] +name = "num-iter" +version = "0.1.45" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1429034a0490724d0075ebb2bc9e875d6503c3cf69e235a8941aa757d83ef5bf" +dependencies = [ + "autocfg", + "num-integer", + "num-traits", +] + [[package]] name = "num-rational" version = "0.4.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f83d14da390562dca69fc84082e73e548e1ad308d24accdedd2720017cb37824" dependencies = [ + "num-bigint 0.4.4", "num-integer", "num-traits", ] @@ -861,7 +1051,7 @@ version = "1.16.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4161fcb6d602d4d2081af7c3a45852d875a03dd337a6bfdd6e06407b61342a43" dependencies = [ - "hermit-abi", + "hermit-abi 0.3.9", "libc", ] @@ -1127,6 +1317,26 @@ version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "60a357793950651c4ed0f3f52338f53b2f809f32d83a07f72909fa13e4c6c1e3" +[[package]] +name = "rayon" +version = "1.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b418a60154510ca1a002a752ca9714984e21e4241e804d32555251faf8b78ffa" +dependencies = [ + "either", + "rayon-core", +] + +[[package]] +name = "rayon-core" +version = "1.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1465873a3dfdaa8ae7cb14b4383657caab0b3e8a0aa9ae8e04b044854c8dfce2" +dependencies = [ + "crossbeam-deque", + "crossbeam-utils", +] + [[package]] name = "redox_syscall" version = "0.4.1" @@ -1394,6 +1604,15 @@ dependencies = [ "rand", ] +[[package]] +name = "streaming-stats" +version = "0.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b0d670ce4e348a2081843569e0f79b21c99c91bb9028b3b3ecb0f050306de547" +dependencies = [ + "num-traits", +] + [[package]] name = "strum" version = "0.26.2" @@ -1450,13 +1669,22 @@ version = "0.12.14" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e1fc403891a21bcfb7c37834ba66a547a8f402146eba7265b5a6d88059c9ff2f" +[[package]] +name = "termcolor" +version = "1.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "06794f8f6c5c898b3275aebefa6b8a1cb24cd2c6c79397ab15774837a0bc5755" +dependencies = [ + "winapi-util", +] + [[package]] name = "test-log" version = "0.2.16" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3dffced63c2b5c7be278154d76b479f9f9920ed34e7574201407f0b14e2bbb93" dependencies = [ - "env_logger", + "env_logger 0.11.3", "test-log-macros", "tracing-subscriber", ] @@ -1766,6 +1994,15 @@ version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6" +[[package]] +name = "winapi-util" +version = "0.1.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4d4cc384e1e73b93bafa6fb4f1df8c41695c8a91cf9c4c64358067d15a7b6c6b" +dependencies = [ + "windows-sys", +] + [[package]] name = "winapi-x86_64-pc-windows-gnu" version = "0.4.0" @@ -1916,3 +2153,23 @@ name = "yansi" version = "0.5.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "09041cd90cf85f7f8b2df60c646f853b7f535ce68f85244eb6731cf89fa498ec" + +[[package]] +name = "zerocopy" +version = "0.7.35" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1b9b4fd18abc82b8136838da5d50bae7bdea537c574d8dc1a34ed098d6c166f0" +dependencies = [ + "zerocopy-derive", +] + +[[package]] +name = "zerocopy-derive" +version = "0.7.35" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fa4f8080344d4671fb4e831a13ad1e68092748387dfc4f55e356242fae12ce3e" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.52", +] diff --git a/edb/server/conn_pool/Cargo.toml b/edb/server/conn_pool/Cargo.toml index ea6ba28ecb49..ebf697449eb5 100644 --- a/edb/server/conn_pool/Cargo.toml +++ b/edb/server/conn_pool/Cargo.toml @@ -8,6 +8,7 @@ edition = "2021" [features] python_extension = ["pyo3/extension-module"] +optimizer = [] [dependencies] futures = "0" @@ -36,6 +37,8 @@ anyhow = "1" rstest = "0" rand = "0" statrs = "0" +genetic_algorithm = "0" +lru = "0" [dev-dependencies.tokio] version = "1" diff --git a/edb/server/conn_pool/src/algo.rs b/edb/server/conn_pool/src/algo.rs index 73872d6e1ed3..f029b0f5725c 100644 --- a/edb/server/conn_pool/src/algo.rs +++ b/edb/server/conn_pool/src/algo.rs @@ -1,8 +1,5 @@ use scopeguard::defer; -use std::{ - cell::{Cell, RefCell}, - num::NonZeroUsize, -}; +use std::cell::{Cell, RefCell}; use tracing::trace; use crate::{ @@ -10,49 +7,186 @@ use crate::{ metrics::{MetricVariant, RollingAverageU32}, }; -/// The maximum number of connections to create during a rebalance. -const MAX_REBALANCE_CREATE: usize = 5; -/// The maximum number of excess connections (> target) we'll keep around during -/// a rebalance if there is still some demand. -const MAX_EXCESS_IDLE_CONNECTIONS: usize = 2; -/// The minimum amount of time we'll consider for an active connection. -const MIN_ACTIVE_TIME: usize = 1; - -/// The weight we apply to waiting connections. -const DEMAND_WEIGHT_WAITING: usize = 1; -/// The weight we apply to active connections. -const DEMAND_WEIGHT_ACTIVE: usize = 1; /// The historical length of data we'll maintain for demand. -const DEMAND_HISTORY_LENGTH: usize = 4; -/// The minimum non-zero demand. This makes the demand calculations less noisy -/// when we are competing at lower levels of demand, allowing for more -/// reproducable results. -const DEMAND_MINIMUM: usize = 16; - -/// The maximum-minimum connection count we'll allocate to connections if there -/// is more capacity than backends. -const MAXIMUM_SHARED_TARGET: usize = 1; -/// The boost we apply to our own apparent hunger when releasing a connection. -/// This prevents excessive swapping when hunger is similar across various -/// backends. -const SELF_HUNGER_BOOST_FOR_RELEASE: usize = 10; - -/// The weight we apply to the difference between the target and required -/// connections when determining overfullness. -const HUNGER_DIFF_WEIGHT: usize = 100; -/// The weight we apply to waiters when determining hunger. -const HUNGER_WAITER_WEIGHT: usize = 1; -/// The weight we apply to the oldest waiter's age in milliseconds (as a divisor). -const HUNGER_AGE_DIVISOR_WEIGHT: usize = 10; - -/// The weight we apply to the difference between the target and required -/// connections when determining overfullness. -const OVERFULL_DIFF_WEIGHT: usize = 100; -/// The weight we apply to idle connections when determining overfullness. -const OVERFULL_IDLE_WEIGHT: usize = 1; -/// This is divided by the youngest connection metric to penalize switching from -/// a backend which has changed recently. -const OVERFULL_CHANGE_WEIGHT_DIVIDEND: usize = 1000; +const DEMAND_HISTORY_LENGTH: usize = 16; + +#[cfg(not(feature = "optimizer"))] +#[derive(Clone, Copy, derive_more::From)] +pub struct Knob(&'static str, T); + +#[cfg(not(feature = "optimizer"))] +impl Knob { + pub const fn new(name: &'static str, value: T) -> Self { + Self(name, value) + } + + pub fn get(&self) -> T { + self.1 + } +} + +#[cfg(feature = "optimizer")] +pub struct Knob( + &'static str, + &'static std::thread::LocalKey>, + Option>, +); + +impl + std::fmt::Display + std::fmt::Debug> std::fmt::Debug for Knob { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.write_fmt(format_args!("{}={:?}", self.0, self.get())) + } +} + +#[cfg(feature = "optimizer")] +impl + std::fmt::Display + std::fmt::Debug> Knob { + pub const fn new( + name: &'static str, + value: &'static std::thread::LocalKey>, + bounds: &[std::ops::RangeInclusive], + ) -> Self { + let copy = if !bounds.is_empty() { + Some(*bounds[0].start()..=*bounds[0].end()) + } else { + None + }; + Self(name, value, copy) + } + + pub fn name(&self) -> &'static str { + self.0 + } + + pub fn get(&self) -> T { + self.1.with_borrow(|t| *t) + } + + pub fn set(&self, value: T) -> Result<(), String> { + if let Some(range) = &self.2 { + if range.contains(&value) { + self.1.with_borrow_mut(|t| *t = value); + Ok(()) + } else { + Err(format!("{value} is out of range of {range:?}")) + } + } else { + self.1.with_borrow_mut(|t| *t = value); + Ok(()) + } + } + + pub fn clamp(&self, value: &mut T) { + if let Some(range) = &self.2 { + if !range.contains(value) { + if *value < *range.start() { + *value = *range.start() + } else { + *value = *range.end() + } + } + } + } +} + +macro_rules! constants { + ($( + $( #[doc=$doc:literal] )* + $( #[range $range:tt] )? + const $name:ident: $type:ty = $value:literal; + )*) => { + #[cfg(feature="optimizer")] + pub mod knobs { + pub use super::Knob; + mod locals { + $( + thread_local! { + pub static $name: std::cell::RefCell<$type> = std::cell::RefCell::new($value); + } + )* + } + + $( + $( #[doc=$doc] )* + pub static $name: Knob<$type> = Knob::new(stringify!($name), &locals::$name, &[$($range)?]); + )* + + pub const ALL_KNOB_COUNT: usize = [$(stringify!($name)),*].len(); + pub static ALL_KNOBS: [&Knob; ALL_KNOB_COUNT] = [ + $(&$name),* + ]; + } + #[cfg(not(feature="optimizer"))] + pub mod knobs { + pub use super::Knob; + $( + $( #[doc=$doc] )* + pub const $name: Knob<$type> = Knob::new(stringify!($name), $value); + )* + } + pub use knobs::*; + }; +} + +// Note: these constants are tuned via the generic algorithm optimizer. +constants! { + /// The maximum number of connections to create or destroy during a rebalance. + #[range(0..=10)] + const MAX_REBALANCE_OPS: usize = 5; + /// The minimum headroom in a block between its current total and its target + /// for us to pre-create connections for it. + #[range(0..=10)] + const MIN_REBALANCE_HEADROOM_TO_CREATE: usize = 2; + /// The maximum number of excess connections (> target) we'll keep around during + /// a rebalance if there is still some demand. + #[range(0..=10)] + const MAX_REBALANCE_EXCESS_IDLE_CONNECTIONS: usize = 2; + + /// The minimum amount of time we'll consider for an active connection. + #[range(1..=100)] + const MIN_TIME: usize = 1; + + /// The weight we apply to waiting connections. + const DEMAND_WEIGHT_WAITING: usize = 3; + /// The weight we apply to active connections. + const DEMAND_WEIGHT_ACTIVE: usize = 3; + /// The minimum non-zero demand. This makes the demand calculations less noisy + /// when we are competing at lower levels of demand, allowing for more + /// reproducable results. + #[range(1..=256)] + const DEMAND_MINIMUM: usize = 168; + + /// The maximum-minimum connection count we'll allocate to connections if there + /// is more capacity than backends. + const MAXIMUM_SHARED_TARGET: usize = 1; + /// The boost we apply to our own apparent hunger when releasing a connection. + /// This prevents excessive swapping when hunger is similar across various + /// backends. + const SELF_HUNGER_BOOST_FOR_RELEASE: usize = 16; + + /// The weight we apply to the difference between the target and required + /// connections when determining overfullness. + const HUNGER_DIFF_WEIGHT: usize = 2; + /// The weight we apply to waiters when determining hunger. + const HUNGER_WAITER_WEIGHT: usize = 1; + const HUNGER_WAITER_ACTIVE_WEIGHT: usize = 0; + const HUNGER_ACTIVE_WEIGHT_DIVIDEND: usize = 634; + /// The weight we apply to the oldest waiter's age in milliseconds (as a divisor). + #[range(1..=1000)] + const HUNGER_AGE_DIVISOR_WEIGHT: usize = 674; + + /// The weight we apply to the difference between the target and required + /// connections when determining overfullness. + const OVERFULL_DIFF_WEIGHT: usize = 2; + /// The weight we apply to idle connections when determining overfullness. + const OVERFULL_IDLE_WEIGHT: usize = 10; + /// This is divided by the youngest connection metric to penalize switching from + /// a backend which has changed recently. + const OVERFULL_CHANGE_WEIGHT_DIVIDEND: usize = 469; + /// The weight we apply to waiters when determining overfullness. + const OVERFULL_WAITER_WEIGHT: usize = 71; + const OVERFULL_WAITER_ACTIVE_WEIGHT: usize = 130; + const OVERFULL_ACTIVE_WEIGHT_DIVIDEND: usize = 2; +} /// Determines the rebalance plan based on the current pool state. #[derive(Debug, Clone, PartialEq, Eq)] @@ -152,28 +286,29 @@ pub trait PoolAlgorithmDataBlock: PoolAlgorithmDataMetrics { /// /// Returns an `Option` containing the hunger score if the current state is below the target /// and there are waiting elements; otherwise, returns `None`. - fn hunger_score(&self, will_release: bool) -> Option { + fn hunger_score(&self, will_release: bool) -> Option { let waiting = self.count(MetricVariant::Waiting); let connecting = self.count(MetricVariant::Connecting); let waiters = waiting.saturating_sub(connecting); let current = self.total() - if will_release { 1 } else { 0 }; let target = self.target(); + let active_ms = self.avg_ms(MetricVariant::Active).max(MIN_TIME.get()); // Waiters become more hungry as they age - let age_score = self.oldest_ms(MetricVariant::Waiting) / HUNGER_AGE_DIVISOR_WEIGHT; - let base_score = age_score + waiters * HUNGER_WAITER_WEIGHT; + let age_score = + self.oldest_ms(MetricVariant::Waiting) / HUNGER_AGE_DIVISOR_WEIGHT.get().max(1); + let waiter_score = waiters * HUNGER_WAITER_WEIGHT.get() + + (waiters * HUNGER_WAITER_ACTIVE_WEIGHT.get() / active_ms) + + (HUNGER_ACTIVE_WEIGHT_DIVIDEND.get() / active_ms); + let base_score = age_score + waiter_score; // If we have more connections than our target, we are not hungry. We // may still be hungry if current <= target if we have waiters, however. - if current > target { + if current > target || (target == current && waiters < 1) { None - } else if target > current { - let diff = target - current; - (base_score + diff * HUNGER_DIFF_WEIGHT).try_into().ok() - } else if waiters > 0 { - base_score.try_into().ok() } else { - None + let diff = target - current; + Some((base_score + diff * HUNGER_DIFF_WEIGHT.get()) as _) } } @@ -192,25 +327,59 @@ pub trait PoolAlgorithmDataBlock: PoolAlgorithmDataMetrics { /// /// Returns an `Option` containing the overfull score if the current state is overfull /// and there are idle elements; otherwise, returns `None`. - fn overfull_score(&self, will_release: bool) -> Option { + fn overfull_score(&self, will_release: bool) -> Option { let idle = self.count(MetricVariant::Idle) + if will_release { 1 } else { 0 }; let current = self.total(); let target = self.target(); - let youngest_ms = self.youngest_ms().max(1); + let connecting = self.count(MetricVariant::Connecting); + let waiting = self.count(MetricVariant::Waiting); + let waiters = waiting.saturating_sub(connecting); + let active_ms = self.avg_ms(MetricVariant::Active).max(MIN_TIME.get()); + let connecting_ms = self.avg_ms(MetricVariant::Connecting).max(MIN_TIME.get()); + let youngest_ms = self.youngest_ms().max(MIN_TIME.get()); + let waiter_score = (waiters * OVERFULL_WAITER_WEIGHT.get() + + (waiters * OVERFULL_WAITER_ACTIVE_WEIGHT.get() / active_ms) + + (OVERFULL_ACTIVE_WEIGHT_DIVIDEND.get() / active_ms)) + as isize; + // If we have no idle connections, or we don't have enough connections we're not overfull. if target >= current || idle == 0 { None } else { - let base_score = - idle * OVERFULL_IDLE_WEIGHT + OVERFULL_CHANGE_WEIGHT_DIVIDEND / youngest_ms; + let youngest_ratio = (youngest_ms / connecting_ms).max(1); + let idle_score = (idle * OVERFULL_IDLE_WEIGHT.get()) as isize; + let youngest_score = (OVERFULL_CHANGE_WEIGHT_DIVIDEND.get() / youngest_ratio) as isize; + let base_score = idle_score + youngest_score - waiter_score; if current > target { let diff = current - target; - (diff * OVERFULL_DIFF_WEIGHT + base_score).try_into().ok() + let diff_score = (diff * OVERFULL_DIFF_WEIGHT.get()) as isize; + Some(diff_score + base_score) } else { - base_score.try_into().ok() + Some(base_score) } } } + + /// We calculate demand based on the estimated connection active time + /// multiplied by the active + waiting counts. This gives us an + /// estimated database time statistic we can use for relative + /// weighting. + fn demand_score(&self) -> usize { + let active = self.max(MetricVariant::Active); + let active_ms = self.avg_ms(MetricVariant::Active).max(MIN_TIME.get()); + let waiting = self.max(MetricVariant::Waiting); + let idle = active == 0 && waiting == 0; + + if idle { + 0 + } else { + let waiting_score = waiting * DEMAND_WEIGHT_WAITING.get(); + let active_score = active * DEMAND_WEIGHT_ACTIVE.get(); + // Note that we clamp to DEMAND_MINIMUM to ensure the average is non-zero + (active_ms * (waiting_score + active_score)) + .max(DEMAND_MINIMUM.get() * DEMAND_HISTORY_LENGTH) + } + } } pub trait PoolAlgorithmDataPool: PoolAlgorithmDataMetrics { @@ -261,7 +430,7 @@ impl PoolConstraints { s += &format!("{name}={demand_avg} ",); } - total_demand += demand_avg; + total_demand += demand_avg as usize; if demand_avg > 0 { total_target += 1; } else { @@ -282,37 +451,20 @@ impl PoolConstraints { defer!(it.reset_max()); // First, compute the overall request load and number of backend targets - let mut total_demand = 0; + let mut total_demand = 0_usize; let mut total_target = 0; let mut s = "".to_owned(); it.with_all(|name, data| { - // We calculate demand based on the estimated connection active time - // multiplied by the active + waiting counts. This gives us an - // estimated database time statistic we can use for relative - // weighting. - let active = data.max(MetricVariant::Active); - let active_ms = data.avg_ms(MetricVariant::Active).max(MIN_ACTIVE_TIME); - let waiting = data.max(MetricVariant::Waiting); - let idle = active == 0 && waiting == 0; - let demand = if idle { - 0 - } else { - // Note that we clamp to DEMAND_MINIMUM to ensure the average is non-zero - (active_ms * (waiting * DEMAND_WEIGHT_WAITING + active * DEMAND_WEIGHT_ACTIVE)) - .max(DEMAND_MINIMUM) - }; + let demand = data.demand_score(); data.insert_demand(demand as _); let demand_avg = data.demand(); if tracing::enabled!(tracing::Level::TRACE) { - s += &format!( - "{name}={demand_avg}/{demand} (a={},w={},t={}ms) ", - active, waiting, active_ms - ); + s += &format!("{name}={demand_avg}/{demand}",); } - total_demand += demand_avg; + total_demand += demand_avg as usize; if demand_avg > 0 { total_target += 1; } else { @@ -328,7 +480,12 @@ impl PoolConstraints { } /// Allocate the calculated demand to target quotas. - fn allocate_demand(&self, it: &impl VisitPoolAlgoData, total_target: usize, total_demand: u32) { + fn allocate_demand( + &self, + it: &impl VisitPoolAlgoData, + total_target: usize, + total_demand: usize, + ) { // Empty pool, no math if total_target == 0 || total_demand == 0 { return; @@ -337,25 +494,31 @@ impl PoolConstraints { let mut allocated = 0; // This is the minimum number of connections we'll allocate to any particular // backend regardless of demand if there are less backends than the capacity. - let min = (self.max / total_target).min(MAXIMUM_SHARED_TARGET); + let min = (self.max / total_target).min(MAXIMUM_SHARED_TARGET.get()); // The remaining capacity after we allocated the `min` value above. let capacity = self.max - min * total_target; - it.with_all(|_name, data| { - let demand = data.demand(); - if demand == 0 { - return; - } + if min == 0 { + it.with_all(|_name, data| { + data.set_target(0); + }); + } else { + it.with_all(|_name, data| { + let demand = data.demand(); + if demand == 0 { + return; + } - // Give everyone what they requested, plus a share of the spare - // capacity. If there is leftover spare capacity, that capacity - // may float between whoever needs it the most. - let target = - (demand as f32 * capacity as f32 / total_demand as f32).floor() as usize + min; + // Give everyone what they requested, plus a share of the spare + // capacity. If there is leftover spare capacity, that capacity + // may float between whoever needs it the most. + let target = + (demand as f32 * capacity as f32 / total_demand as f32).floor() as usize + min; - data.set_target(target); - allocated += target; - }); + data.set_target(target); + allocated += target; + }); + } debug_assert!( allocated <= self.max, @@ -377,17 +540,23 @@ impl PoolConstraints { let mut changes = vec![]; let mut made_changes = false; - for i in 0..MAX_REBALANCE_CREATE { + for i in 0..MAX_REBALANCE_OPS.get() { it.with_all(|name, block| { - if block.target() > block.total() && current_pool_size < max_pool_size { - // If we are allocated more connections than we currently have, - // we'll try to grab some more. + // If there's room in the block, and room in the pool, and + // the block is bumping up against its current headroom, we'll grab + // another one. + if block.target() > block.total() + && current_pool_size < max_pool_size + && (block.max(MetricVariant::Active) + block.max(MetricVariant::Waiting)) + > (block.total() + i) + .saturating_sub(MIN_REBALANCE_HEADROOM_TO_CREATE.get()) + { changes.push(RebalanceOp::Create(name.clone())); current_pool_size += 1; made_changes = true; } else if block.total() > block.target() && block.count(MetricVariant::Idle) > i - && (i > MAX_EXCESS_IDLE_CONNECTIONS || block.demand() == 0) + && (i > MAX_REBALANCE_EXCESS_IDLE_CONNECTIONS.get() || block.demand() == 0) { // If we're holding on to too many connections, we'll // release some of them. If there is still some demand @@ -410,6 +579,7 @@ impl PoolConstraints { // waiters, we want to transfer from the most overloaded block. let mut overloaded = vec![]; let mut hungriest = vec![]; + let mut idle = vec![]; let mut s1 = "".to_owned(); let mut s2 = "".to_owned(); @@ -424,7 +594,11 @@ impl PoolConstraints { if tracing::enabled!(tracing::Level::TRACE) { s2 += &format!("{name}={value} "); } - overloaded.push((value, name.clone())) + if block.demand() == 0 { + idle.push(name.clone()); + } else { + overloaded.push((value, name.clone())) + } } }); @@ -437,21 +611,24 @@ impl PoolConstraints { let mut tasks = vec![]; - // TODO: rebalance more than one? - loop { + for _ in 0..MAX_REBALANCE_OPS.get() { let Some((_, to)) = hungriest.pop() else { // TODO: close more than one? - if let Some((_, from)) = overloaded.pop() { - tasks.push(RebalanceOp::Close(from.clone())); + if let Some(idle) = idle.pop() { + tasks.push(RebalanceOp::Close(idle.clone())); } break; }; - let Some((_, from)) = overloaded.pop() else { + // Prefer rebalancing from idle connections, otherwise take from + // overloaded ones. + if let Some(from) = idle.pop() { + tasks.push(RebalanceOp::Transfer { to, from }); + } else if let Some((_, from)) = overloaded.pop() { + tasks.push(RebalanceOp::Transfer { to, from }); + } else { break; - }; - - tasks.push(RebalanceOp::Transfer { to, from }); + } } tasks @@ -461,7 +638,7 @@ impl PoolConstraints { pub fn plan_acquire(&self, db: &str, it: &impl VisitPoolAlgoData) -> AcquireOp { // If the block is new, we need to perform an initial adjustment to // ensure this block gets some capacity. - if it.ensure_block(db, DEMAND_MINIMUM) { + if it.ensure_block(db, DEMAND_MINIMUM.get() * DEMAND_HISTORY_LENGTH) { self.recalculate_shares(it); } @@ -479,11 +656,10 @@ impl PoolConstraints { let block_has_room = current_block_size < target_block_size || target_block_size == 0; trace!("Acquiring {db}: {current_pool_size}/{max_pool_size} {current_block_size}/{target_block_size}"); if pool_is_full && block_has_room { - let mut max = 0; + let mut max = isize::MIN; let mut which = None; it.with_all(|name, block| { if let Some(overfullness) = block.overfull_score(false) { - let overfullness: usize = overfullness.into(); if overfullness > max { which = Some(name.clone()); max = overfullness; @@ -525,22 +701,21 @@ impl PoolConstraints { // We only want to consider a release elsewhere if this block is overfull if let Some(Some(overfull)) = it.with(db, |block| block.overfull_score(true)) { trace!("Block {db} is overfull ({overfull}), trying to release"); - let mut max = 0; + let mut max = isize::MIN; let mut which = None; let mut s = "".to_owned(); it.with_all(|name, block| { let is_self = &**name == db; - if let Some(hunger) = block.hunger_score(is_self) { - let mut hunger: usize = hunger.into(); + if let Some(mut hunger) = block.hunger_score(is_self) { // Penalize switching by boosting the current database's relative hunger here if is_self { - hunger += SELF_HUNGER_BOOST_FOR_RELEASE; + hunger += SELF_HUNGER_BOOST_FOR_RELEASE.get() as isize; } if tracing::enabled!(tracing::Level::TRACE) { s += &format!("{name}={hunger} "); } - // If this current block has equal hunger to the hungriest, it takes priority + if hunger > max { which = if is_self { None } else { Some(name.clone()) }; max = hunger; diff --git a/edb/server/conn_pool/src/block.rs b/edb/server/conn_pool/src/block.rs index dd0be3563ada..2fb27dc97194 100644 --- a/edb/server/conn_pool/src/block.rs +++ b/edb/server/conn_pool/src/block.rs @@ -153,7 +153,11 @@ impl Block { conn_metrics.insert(conn.variant()) } conn_metrics.set_value(MetricVariant::Waiting, self.state.waiters.lock.get()); - assert_eq!(self.metrics().summary().value, conn_metrics.summary().value); + assert_eq!( + self.metrics().summary().value, + conn_metrics.summary().value, + "Connection metrics are incorrect. Left: actual, right: expected" + ); } } @@ -223,9 +227,12 @@ impl Block { /// Awaits a connection from this block. fn queue(self: Rc) -> impl Future>> { - if let Some(conn) = self.try_acquire_used() { - trace!("Got a connection"); - return Either::Left(ready(Ok(self.conn(conn)))); + // If someone else is waiting, we have to queue, even if there's a connection + if self.state.waiters.len() == 0 { + if let Some(conn) = self.try_acquire_used() { + trace!("Got a connection"); + return Either::Left(ready(Ok(self.conn(conn)))); + } } // Update the metrics now before we actually queue self.state.waiters.lock(); diff --git a/edb/server/conn_pool/src/conn.rs b/edb/server/conn_pool/src/conn.rs index 1ca399a56d24..b2459581ec5e 100644 --- a/edb/server/conn_pool/src/conn.rs +++ b/edb/server/conn_pool/src/conn.rs @@ -134,6 +134,7 @@ impl Conn { self.transition(|inner| match inner { ConnInner::Idle(_t, conn, ..) | ConnInner::Active(_t, conn, ..) => { from.inc_all_time(MetricVariant::Disconnecting); + from.inc_all_time(MetricVariant::Closed); to.insert(MetricVariant::Connecting); let f = connector.reconnect(conn, db).boxed_local(); ConnInner::Connecting(Instant::now(), f) @@ -146,6 +147,7 @@ impl Conn { self.transition(|inner| match inner { ConnInner::Active(t, conn) => { metrics.inc_all_time(MetricVariant::Disconnecting); + metrics.inc_all_time(MetricVariant::Closed); metrics.transition( MetricVariant::Active, MetricVariant::Connecting, @@ -165,9 +167,10 @@ impl Conn { to: MetricVariant, ) -> Poll> { let mut lock = self.inner.borrow_mut(); - match &mut *lock { - ConnInner::Idle(..) => Poll::Ready(Ok(())), - ConnInner::Connecting(t, f) => Poll::Ready(match ready!(f.poll_unpin(cx)) { + + let res = match &mut *lock { + ConnInner::Idle(..) => Ok(()), + ConnInner::Connecting(t, f) => match ready!(f.poll_unpin(cx)) { Ok(c) => { debug_assert!(to == MetricVariant::Active || to == MetricVariant::Idle); metrics.transition(MetricVariant::Connecting, to, t.elapsed()); @@ -187,8 +190,8 @@ impl Conn { *lock = ConnInner::Failed; Err(err) } - }), - ConnInner::Disconnecting(t, f) => Poll::Ready(match ready!(f.poll_unpin(cx)) { + }, + ConnInner::Disconnecting(t, f) => match ready!(f.poll_unpin(cx)) { Ok(_) => { debug_assert_eq!(to, MetricVariant::Closed); metrics.transition(MetricVariant::Disconnecting, to, t.elapsed()); @@ -204,10 +207,11 @@ impl Conn { *lock = ConnInner::Failed; Err(err) } - }), - ConnInner::Failed => Poll::Ready(Err(ConnError::Other("Failed".into()))), + }, + ConnInner::Failed => Err(ConnError::Other("Failed".into())), _ => unreachable!(), - } + }; + Poll::Ready(res) } pub fn try_lock(&self, metrics: &MetricsAccum) -> bool { @@ -240,6 +244,13 @@ impl Conn { } } +/// Connection state diagram: +/// +/// ```text +/// v-------------+ +/// S -> Connecting -> Idle -> Active -+ +/// -> Failed +-> Disconnecting -> Closed +/// ``` enum ConnInner { /// Connecting connections hold a spot in the pool as they count towards quotas Connecting(Instant, Pin>>>), @@ -253,7 +264,8 @@ enum ConnInner { Failed, /// The connection is in a closed state. Closed, - /// Transitioning between states. + /// Transitioning between states. Used internally, never escapes an internal + /// function. Transition, } diff --git a/edb/server/conn_pool/src/lib.rs b/edb/server/conn_pool/src/lib.rs index feeb984379dc..36aa76f4fa84 100644 --- a/edb/server/conn_pool/src/lib.rs +++ b/edb/server/conn_pool/src/lib.rs @@ -12,6 +12,9 @@ mod time { pub use tokio::time::Instant; } +#[cfg(feature = "optimizer")] +pub use algo::knobs; + // Public interface pub use conn::Connector; diff --git a/edb/server/conn_pool/src/pool.rs b/edb/server/conn_pool/src/pool.rs index 5ed51af7ed22..9952742170e3 100644 --- a/edb/server/conn_pool/src/pool.rs +++ b/edb/server/conn_pool/src/pool.rs @@ -5,7 +5,7 @@ use crate::{ }, block::{Blocks, Name}, conn::{ConnError, ConnHandle, ConnResult, Connector}, - metrics::PoolMetrics, + metrics::{MetricVariant, PoolMetrics}, }; use consume_on_drop::{Consume, ConsumeOnDrop}; use derive_more::Debug; @@ -134,7 +134,9 @@ impl Pool { drain: Default::default(), }) } +} +impl Pool { /// Runs the required async task that takes care of quota management, garbage collection, /// and other important async tasks. This should happen only if something has changed in /// the pool. @@ -282,9 +284,29 @@ impl Pool { /// the shutdown operation. pub async fn shutdown(mut self: Rc) { self.drain.shutdown(); - while let Err(pool_) = Rc::try_unwrap(self) { - self = pool_; + let pool = loop { + match Rc::try_unwrap(self) { + Ok(pool) => break pool, + Err(pool) => self = pool, + }; tokio::time::sleep(Duration::from_millis(10)).await; + }; + while !pool.idle() { + pool.run_once(); + tokio::time::sleep(Duration::from_millis(10)).await; + } + if cfg!(debug_assertions) { + let all_time = &pool.metrics().all_time; + assert_eq!( + all_time[MetricVariant::Connecting], + all_time[MetricVariant::Disconnecting], + "Connecting != Disconnecting" + ); + assert_eq!( + all_time[MetricVariant::Disconnecting], + all_time[MetricVariant::Closed], + "Disconnecting != Closed" + ); } } } @@ -373,9 +395,9 @@ mod tests { use test_log::test; use tokio::task::LocalSet; - use tracing::{info, trace}; + use tracing::{error, info, trace}; - #[test(tokio::test)] + #[test(tokio::test(flavor = "current_thread", start_paused = true))] async fn test_pool_basic() -> Result<()> { LocalSet::new() .run_until(async { @@ -476,17 +498,17 @@ mod tests { ..Default::default() }; - run(spec).await + run(spec).await.map(drop) } - async fn run(spec: Spec) -> Result<()> { + async fn run(spec: Spec) -> Result { let local = LocalSet::new(); - local.run_until(run_local(spec)).await?; + let res = local.run_until(run_local(spec)).await?; local.await; - Ok(()) + Ok(res) } - async fn run_local(spec: Spec) -> std::result::Result<(), anyhow::Error> { + async fn run_local(spec: Spec) -> std::result::Result { let start = Instant::now(); let real_time = std::time::Instant::now(); let config = PoolConfig::suggested_default_for(spec.capacity); @@ -545,7 +567,7 @@ mod tests { } tokio::time::timeout(Duration::from_secs(120), local) .await - .unwrap_or_else(move |_| panic!("*[{i:-2}] DBSpec {i} for {db} timed out")); + .unwrap_or_else(move |_| error!("*[{i:-2}] DBSpec {i} for {db} timed out")); let end_time = now.elapsed().as_secs_f64(); info!("-[{i:-2}] Finished db t{} at {}qps. Load generated from {}..{}, processed from {}..{}", db_spec.db, db_spec.qps, db_spec.start_at, db_spec.end_at, start_time, end_time); @@ -599,8 +621,10 @@ mod tests { let metrics = pool.metrics(); let mut qos = 0.0; + let mut scores = vec![]; for score in spec.score { let scored = score.method.score(&latencies, &metrics, &pool.config); + let score_component = score.calculate(scored.raw_value); info!( "[QoS: {}] {} = {:.2} -> {:.2} (weight {:.2})", @@ -613,6 +637,11 @@ mod tests { (scored.detailed_calculation)(3), scored.raw_value ); + scores.push(WeightedScored { + scored, + weight: score.weight, + score: score_component, + }); qos += score_component * score.weight; } info!("[QoS: {}] Score = {qos:0.02}", spec.name); @@ -620,11 +649,10 @@ mod tests { info!("Shutting down..."); pool.shutdown().await; - Ok(()) + Ok(QoS { scores, qos }) } - #[test(tokio::test(flavor = "current_thread", start_paused = true))] - async fn test_connpool_1() -> Result<()> { + fn test_connpool_1() -> Spec { let mut dbs = vec![]; for i in 0..6 { dbs.push(DBSpec { @@ -654,7 +682,7 @@ mod tests { }) } - let spec = Spec { + Spec { name: "test_connpool_1".into(), desc: r#" This is a test for Mode D, where 2 groups of blocks race for connections @@ -687,13 +715,10 @@ mod tests { ], dbs, ..Default::default() - }; - - run(spec).await + } } - #[test(tokio::test(flavor = "current_thread", start_paused = true))] - async fn test_connpool_2() -> Result<()> { + fn test_connpool_2() -> Spec { let mut dbs = vec![]; for i in 0..6 { dbs.push(DBSpec { @@ -723,7 +748,7 @@ mod tests { }) } - let spec = Spec { + Spec { name: "test_connpool_2".into(), desc: r#" In this test, we have 6x1500qps connections that simulate fast @@ -755,13 +780,10 @@ mod tests { ], dbs, ..Default::default() - }; - - run(spec).await + } } - #[test(tokio::test(flavor = "current_thread", start_paused = true))] - async fn test_connpool_3() -> Result<()> { + fn test_connpool_3() -> Spec { let mut dbs = vec![]; for i in 0..6 { dbs.push(DBSpec { @@ -773,7 +795,7 @@ mod tests { }) } - let spec = Spec { + Spec { name: "test_connpool_3".into(), desc: r#" This test simply starts 6 same crazy requesters for 6 databases to @@ -791,13 +813,10 @@ mod tests { ], dbs, ..Default::default() - }; - - run(spec).await + } } - #[test(tokio::test(flavor = "current_thread", start_paused = true))] - async fn test_connpool_4() -> Result<()> { + fn test_connpool_4() -> Spec { let mut dbs = vec![]; for i in 0..6 { dbs.push(DBSpec { @@ -809,7 +828,7 @@ mod tests { }) } - let spec = Spec { + Spec { name: "test_connpool_4".into(), desc: r#" Similar to test 3, this test also has 6 requesters for 6 databases, @@ -829,13 +848,10 @@ mod tests { ], dbs, ..Default::default() - }; - - run(spec).await + } } - #[test(tokio::test(flavor = "current_thread", start_paused = true))] - async fn test_connpool_5() -> Result<()> { + fn test_connpool_5() -> Spec { let mut dbs = vec![]; for i in 0..6 { @@ -866,7 +882,7 @@ mod tests { }); } - let spec = Spec { + Spec { name: "test_connpool_5".into(), desc: r#" This is a mixed test with pool max capacity set to 6. Requests in @@ -911,13 +927,10 @@ mod tests { ], dbs, ..Default::default() - }; - - run(spec).await + } } - #[test(tokio::test(flavor = "current_thread", start_paused = true))] - async fn test_connpool_6() -> Result<()> { + fn test_connpool_6() -> Spec { let mut dbs = vec![]; for i in 0..6 { @@ -930,7 +943,7 @@ mod tests { }); } - let spec = Spec { + Spec { name: "test_connpool_6".into(), desc: r#" This is a simple test for Mode A. In this case, we don't want to @@ -941,14 +954,11 @@ mod tests { score: vec![Score::new(1.0, [0.5, 0.2, 0.1, 0.0], ConnectionOverhead {})], dbs, ..Default::default() - }; - - run(spec).await + } } - #[test(tokio::test(flavor = "current_thread", start_paused = true))] - async fn test_connpool_7() -> Result<()> { - let spec = Spec { + fn test_connpool_7() -> Spec { + Spec { name: "test_connpool_7".into(), desc: r#" The point of this test is to have one connection "t1" that @@ -1006,15 +1016,13 @@ mod tests { }, ], ..Default::default() - }; - - run(spec).await + } } - #[test(tokio::test(flavor = "current_thread", start_paused = true))] - async fn test_connpool_8() -> Result<()> { + fn test_connpool_8() -> Spec { let base_load = 200; - let spec = Spec { + + Spec { name: "test_connpool_8".into(), desc: r#" This test spec is to check the pool connection reusability with a @@ -1049,15 +1057,13 @@ mod tests { }, ], ..Default::default() - }; - - run(spec).await + } } - #[test(tokio::test(flavor = "current_thread", start_paused = true))] - async fn test_connpool_9() -> Result<()> { + fn test_connpool_9() -> Spec { let full_qps = 20000; - let spec = Spec { + + Spec { name: "test_connpool_9".into(), desc: r#" This test spec is to check the pool performance with low traffic @@ -1137,14 +1143,13 @@ mod tests { }, ], ..Default::default() - }; - run(spec).await + } } - #[test(tokio::test(flavor = "current_thread", start_paused = true))] - async fn test_connpool_10() -> Result<()> { + fn test_connpool_10() -> Spec { let full_qps = 2000; - let spec = Spec { + + Spec { name: "test_connpool_10".into(), desc: r#" This test spec is to check the pool garbage collection feature. @@ -1178,7 +1183,199 @@ mod tests { }, ], ..Default::default() + } + } + + #[test(tokio::test(flavor = "current_thread", start_paused = true))] + async fn run_spec_tests() -> Result<()> { + spec_tests().await?; + Ok(()) + } + + async fn spec_tests() -> Result { + let mut results = SuiteQoS::default(); + for spec in SPEC_FUNCTIONS { + let spec = spec(); + let name = spec.name.clone(); + let res = run(spec).await?; + results.insert(name, res); + } + for (name, QoS { qos, .. }) in &results { + info!("QoS[{name}] = [{qos:.02}]"); + } + info!( + "QoS = [{:.02}] (rms={:.02})", + results.qos(), + results.qos_rms_error() + ); + Ok(results) + } + + /// Runs the specs `count` times, returning the median run. + #[allow(unused)] + fn run_specs_tests_in_runtime(count: usize) -> Result { + let mut runs = vec![]; + for _ in 0..count { + let runtime = tokio::runtime::Builder::new_current_thread() + .enable_time() + .build() + .unwrap(); + let _guard = runtime.enter(); + tokio::time::pause(); + let qos = runtime.block_on(spec_tests())?; + runs.push(qos); + } + runs.sort_by_cached_key(|run| (run.qos_rms_error() * 1_000_000.0) as usize); + let ret = runs.drain(count / 2..).next().unwrap(); + Ok(ret) + } + + #[test] + #[cfg(feature = "optimizer")] + fn optimizer() { + use crate::knobs::*; + use std::sync::atomic::AtomicIsize; + + use genetic_algorithm::strategy::evolve::prelude::*; + use lru::LruCache; + use rand::Rng; + + // the search goal to optimize towards (maximize or minimize) + #[derive(Clone, std::fmt::Debug, smart_default::SmartDefault)] + pub struct Optimizer { + #[default(std::sync::Arc::new(AtomicIsize::new(isize::MIN)))] + best: std::sync::Arc, + #[default(LruCache::new(1_000_000.try_into().unwrap()))] + lru: LruCache<[usize; ALL_KNOB_COUNT], isize>, + } + + impl Fitness for Optimizer { + type Genotype = ContinuousGenotype; + fn calculate_for_chromosome( + &mut self, + chromosome: &Chromosome, + ) -> Option { + let mut knobs: [usize; ALL_KNOB_COUNT] = Default::default(); + for (knob, gene) in knobs.iter_mut().zip(&chromosome.genes) { + *knob = *gene as _; + } + if let Some(res) = self.lru.get(&knobs) { + return Some(*res); + } + + for (i, knob) in crate::knobs::ALL_KNOBS.iter().enumerate() { + if knob.set(knobs[i]).is_err() { + return None; + }; + } + let qos = run_specs_tests_in_runtime(5).ok()?; + let score = qos.qos_rms_error(); + let qos_i = (score * 1_000_000.0) as isize; + if qos_i > self.best.load(std::sync::atomic::Ordering::SeqCst) { + eprintln!( + "*** New best: {score:.02} {:?} {:?}", + crate::knobs::ALL_KNOBS, + qos + ); + self.best.store(qos_i, std::sync::atomic::Ordering::SeqCst); + } + self.lru.push(knobs, qos_i); + + Some(qos_i) + } + } + + let mut seeds: Vec> = vec![]; + + // The current state + seeds.push( + crate::knobs::ALL_KNOBS + .iter() + .map(|k| k.get() as _) + .collect(), + ); + + // A constant value for all knobs + for i in 0..100 { + seeds.push([i].repeat(crate::knobs::ALL_KNOBS.len())); + } + + // Some randomness + for _ in 0..100 { + seeds.push( + (0..crate::knobs::ALL_KNOBS.len()) + .map(|_| rand::thread_rng().gen_range(0..1000)) + .collect(), + ); + } + + let mut f32_seeds = vec![]; + for mut seed in seeds { + for (i, knob) in crate::knobs::ALL_KNOBS.iter().enumerate() { + let mut value = seed[i] as _; + if knob.set(value).is_err() { + knob.clamp(&mut value); + seed[i] = value as _; + }; + } + f32_seeds.push(seed.into_iter().map(|n| n as _).collect()); + } + + let genotype = ContinuousGenotype::builder() + .with_genes_size(crate::knobs::ALL_KNOBS.len()) + .with_allele_range(0.0..1000.0) + .with_allele_neighbour_ranges(vec![-50.0..50.0, -5.0..5.0]) + .with_seed_genes_list(f32_seeds) + .build() + .unwrap(); + + let mut rng = rand::thread_rng(); // a randomness provider implementing Trait rand::Rng + let evolve = Evolve::builder() + .with_multithreading(true) + .with_genotype(genotype) + .with_target_population_size(1000) + .with_target_fitness_score(100 * 1_000_000) + .with_max_stale_generations(1000) + .with_fitness(Optimizer::default()) + .with_crossover(CrossoverUniform::new(true)) + .with_mutate(MutateOnce::new(0.5)) + .with_compete(CompeteTournament::new(200)) + .with_extension(ExtensionMassInvasion::new(0.6, 0.6)) + .call(&mut rng) + .unwrap(); + println!("{}", evolve); + } + + macro_rules! run_spec { + ($($spec:ident),* $(,)?) => { + const SPEC_FUNCTIONS: [fn() -> Spec; [$( $spec ),*].len()] = [ + $( + $spec, + )* + ]; + + mod spec { + use super::*; + $( + #[super::test(tokio::test(flavor = "current_thread", start_paused = true))] + async fn $spec() -> Result<()> { + run(super::$spec()).await.map(drop) + } + )* + } }; - run(spec).await } + + run_spec!( + test_connpool_1, + test_connpool_2, + test_connpool_3, + test_connpool_4, + test_connpool_5, + test_connpool_6, + test_connpool_7, + test_connpool_8, + test_connpool_9, + test_connpool_10, + ); } diff --git a/edb/server/conn_pool/src/test.rs b/edb/server/conn_pool/src/test.rs index d4f3353b76f2..f858040a7d25 100644 --- a/edb/server/conn_pool/src/test.rs +++ b/edb/server/conn_pool/src/test.rs @@ -1,6 +1,11 @@ //! Test utilities. use std::{ - borrow::Cow, cell::RefCell, collections::HashMap, future::Future, ops::Range, rc::Rc, + borrow::Cow, + cell::{Cell, RefCell}, + collections::{BTreeMap, HashMap}, + future::Future, + ops::Range, + rc::Rc, time::Duration, }; @@ -180,12 +185,73 @@ pub struct Spec { pub score: Vec, } +#[derive(derive_more::Debug)] pub struct Scored { pub description: String, + #[debug(skip)] pub detailed_calculation: Box String>, pub raw_value: f64, } +#[derive(Debug)] +pub struct WeightedScored { + pub weight: f64, + pub score: f64, + pub scored: Scored, +} + +#[derive(Debug)] +pub struct QoS { + pub scores: Vec, + pub qos: f64, +} + +#[derive(Default, derive_more::Deref, derive_more::DerefMut, derive_more::IntoIterator)] +pub struct SuiteQoS(#[into_iterator(owned, ref, ref_mut)] BTreeMap, QoS>); + +impl std::fmt::Debug for SuiteQoS { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let mut s = f.debug_struct("SuiteQos"); + for (name, qos) in self { + s.field(name, &format!("QoS = {:.02}", qos.qos)); + } + s.field("qos", &self.qos()); + s.field("qos_rms", &self.qos_rms_error()); + s.finish() + } +} + +impl SuiteQoS { + pub fn qos(&self) -> f64 { + let mut total = 0.0; + for qos in self.values() { + total += qos.qos; + } + total /= self.len() as f64; + if !total.is_normal() || total < 0.0 { + 0.0 + } else { + total + } + } + + /// Return the root-mean-square error QoS. The error between the QoS and 100 + /// is squared, averaged, and we subtract that from 100 for a final score. + pub fn qos_rms_error(&self) -> f64 { + let mut total = 0.0; + for qos in self.values() { + total += (100.0 - qos.qos).powf(2.0); + } + total /= self.len() as f64; + total = 100.0 - total.sqrt(); + if !total.is_normal() || total < 0.0 { + 0.0 + } else { + total + } + } +} + pub trait ScoringMethod { fn score(&self, latencies: &Latencies, metrics: &PoolMetrics, config: &PoolConfig) -> Scored; } @@ -212,6 +278,10 @@ impl Score { } pub fn calculate(&self, value: f64) -> f64 { + if value.is_nan() || value.is_infinite() { + return 0.0; + } + let intervals = [ (self.v100, self.v90, 90.0, 10.0), (self.v90, self.v60, 60.0, 30.0), @@ -248,6 +318,8 @@ impl ScoringMethod for LatencyDistribution { fn score(&self, latencies: &Latencies, _metrics: &PoolMetrics, _config: &PoolConfig) -> Scored { let dbs = self.group.clone().map(|t| format!("t{t}")).collect_vec(); let mut data = latencies.data.borrow_mut(); + let fail = Cell::new(false); + // Calculates the average CV (coefficient of variation) of the given // distributions. The result is a float ranging from zero indicating how // different the given distributions are, where zero means no @@ -259,7 +331,11 @@ impl ScoringMethod for LatencyDistribution { let decile = Data::new( dbs.iter() .map(|db| { - let mut data = Data::new(data.get_mut(db).expect(db).as_mut_slice()); + let Some(data) = data.get_mut(db) else { + fail.set(true); + return 0.0; + }; + let mut data = Data::new(data.as_mut_slice()); // This is equivalent to Python's statistics.quartile(n=10) data.percentile(n * 10) }) @@ -323,7 +399,10 @@ impl ScoringMethod for LatencyRatio { let divisor = dbs .iter() .map(|db| { - let mut data = Data::new(data.get_mut(db).expect(db).as_mut_slice()); + let Some(data) = data.get_mut(db) else { + return f64::NAN; + }; + let mut data = Data::new(data.as_mut_slice()); data.percentile(self.percentile as _) }) .mean(); @@ -331,7 +410,10 @@ impl ScoringMethod for LatencyRatio { let dividend = dbs .iter() .map(|db| { - let mut data = Data::new(data.get_mut(db).expect(db).as_mut_slice()); + let Some(data) = data.get_mut(db) else { + return f64::NAN; + }; + let mut data = Data::new(data.as_mut_slice()); data.percentile(self.percentile as _) }) .mean(); @@ -375,7 +457,10 @@ impl ScoringMethod for AbsoluteLatency { let raw_value = dbs .iter() .map(|db| { - let mut data = Data::new(data.get_mut(db).expect(db).as_mut_slice()); + let Some(data) = data.get_mut(db) else { + return f64::NAN; + }; + let mut data = Data::new(data.as_mut_slice()); data.percentile(self.percentile as _) }) .mean(); diff --git a/tests/test_server_pool.py b/tests/test_server_pool.py index f5ae94bea058..ffcf7916e7bc 100644 --- a/tests/test_server_pool.py +++ b/tests/test_server_pool.py @@ -522,10 +522,10 @@ def on_stats(stat): if hasattr(pool, '_gc_interval'): pool._gc_interval = 0.1 * TIME_SCALE - started_at = time.monotonic() - async with asyncio.TaskGroup() as g: - for db in spec.dbs: - g.create_task(self.simulate_db(sim, pool, g, db)) + started_at = time.monotonic() + async with asyncio.TaskGroup() as g: + for db in spec.dbs: + g.create_task(self.simulate_db(sim, pool, g, db)) self.assertEqual(sim.failed_disconnects, 0) self.assertEqual(sim.failed_queries, 0)