Skip to content

Commit

Permalink
long running test setup
Browse files Browse the repository at this point in the history
  • Loading branch information
grooviegermanikus committed Mar 25, 2024
1 parent 7d676a4 commit 92b4cfb
Show file tree
Hide file tree
Showing 4 changed files with 269 additions and 7 deletions.
27 changes: 21 additions & 6 deletions examples/debug_blockstream_lag.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
use std::collections::HashMap;

Check warning on line 1 in examples/debug_blockstream_lag.rs

View workflow job for this annotation

GitHub Actions / test

Diff in /home/runner/work/geyser-grpc-connector/geyser-grpc-connector/examples/debug_blockstream_lag.rs
use futures::{Stream, StreamExt};
use log::info;
use log::{debug, info, trace};
use solana_sdk::clock::Slot;
use solana_sdk::commitment_config::CommitmentConfig;
use std::env;

Check warning on line 6 in examples/debug_blockstream_lag.rs

View workflow job for this annotation

GitHub Actions / test

Diff in /home/runner/work/geyser-grpc-connector/geyser-grpc-connector/examples/debug_blockstream_lag.rs
use std::pin::pin;
use std::sync::Arc;
use std::sync::atomic::AtomicU64;
use itertools::Itertools;
use solana_sdk::pubkey::Pubkey;

use geyser_grpc_connector::grpc_subscription_autoreconnect_streams::create_geyser_reconnecting_stream;
Expand All @@ -18,6 +19,7 @@ use yellowstone_grpc_proto::geyser::subscribe_update::UpdateOneof;
use yellowstone_grpc_proto::geyser::{SubscribeRequest, SubscribeRequestFilterBlocksMeta, SubscribeUpdate};
use yellowstone_grpc_proto::prost::Message as _;
use geyser_grpc_connector::grpc_subscription_autoreconnect_tasks::{create_geyser_autoconnection_task, create_geyser_autoconnection_task_with_mpsc};
use geyser_grpc_connector::histogram::calculate_percentiles;

#[allow(dead_code)]
fn start_example_blockmini_consumer(
Expand Down Expand Up @@ -120,7 +122,7 @@ pub async fn main() {
Message::GeyserSubscribeUpdate(subscriber_update) => {
match subscriber_update.update_oneof {

Check warning on line 123 in examples/debug_blockstream_lag.rs

View workflow job for this annotation

GitHub Actions / test

Diff in /home/runner/work/geyser-grpc-connector/geyser-grpc-connector/examples/debug_blockstream_lag.rs
Some(UpdateOneof::BlockMeta(update)) => {
info!("got blockmeta update (green)!!! slot: {}", update.slot);
trace!("got blockmeta update (green)!!! slot: {}", update.slot);
last_slot_from_meta.store(update.slot, std::sync::atomic::Ordering::Relaxed);
}
_ => {}
Expand All @@ -134,16 +136,20 @@ pub async fn main() {
warn!("Stream aborted");
});

tokio::spawn(async move {
// buffer
let (delta_tx, mut delta_rx) = tokio::sync::mpsc::channel(10000);

let jh_block_task = tokio::spawn(async move {
while let Some(message) = block_rx.recv().await {
match message {
Message::GeyserSubscribeUpdate(subscriber_update) => {
match subscriber_update.update_oneof {
Some(UpdateOneof::Block(update)) => {

Check warning on line 147 in examples/debug_blockstream_lag.rs

View workflow job for this annotation

GitHub Actions / test

Diff in /home/runner/work/geyser-grpc-connector/geyser-grpc-connector/examples/debug_blockstream_lag.rs
// note: if you see no data, the grpc client might be misconfigured
info!("got block update (blue)!!! slot: {}", update.slot);
trace!("got block update (blue)!!! slot: {}", update.slot);
let delta = last_slot_from_meta2.load(std::sync::atomic::Ordering::Relaxed) as i64 - update.slot as i64;
info!("delta: {}", delta);
debug!("block lag: {}", delta);
delta_tx.send(delta).await.unwrap();
}
_ => {}
}
Expand All @@ -157,5 +163,14 @@ pub async fn main() {
});

// "infinite" sleep
sleep(Duration::from_secs(1800)).await;
sleep(Duration::from_secs(120)).await;
jh_block_task.abort();

let mut deltas = Vec::new();
delta_rx.recv_many(&mut deltas, 10000).await;

info!("Deltas: {:?}", deltas);

let histogram = calculate_percentiles(&deltas.iter().sorted().map(|x| *x as f64).collect_vec());
info!("Histogram: {}", histogram);
}
2 changes: 1 addition & 1 deletion src/grpc_subscription_autoreconnect_tasks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ pub fn create_geyser_autoconnection_task_with_mpsc(
};

let connect_result = yellowstone_grpc_util::connect_with_timeout_hacked(
endpoint_config_groovie3,
endpoint_config_default,
addr,
token,
// config,
Expand Down
246 changes: 246 additions & 0 deletions src/histogram.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,246 @@
use itertools::Itertools;
use std::fmt::Display;
use std::iter::zip;

// #[derive(Clone, Copy, Debug, Default)]
pub struct Point {
pub priority: f64,
pub value: f64,
}

impl From<(f64, f64)> for Point {
fn from((priority, cu_consumed): (f64, f64)) -> Self {
Point {
priority,
value: cu_consumed,
}
}
}

// #[derive(Clone, Debug, Eq, PartialEq, Hash)]
pub struct HistValue {
// percentile
pub percentile: f32,
// value of fees in lamports
pub value: f64,
}

/// `quantile` function is the same as the median if q=50, the same as the minimum if q=0 and the same as the maximum if q=100.
pub fn calculate_percentiles(input: &[f64]) -> Percentiles {
if input.is_empty() {
// note: percentile for empty array is undefined
return Percentiles {
v: vec![],
p: vec![],
};
}

let is_monotonic = input.windows(2).all(|w| w[0] <= w[1]);
assert!(is_monotonic, "array of values must be sorted");

let p_step = 5;
let i_percentiles = (0..=100).step_by(p_step).collect_vec();

let mut bucket_values = Vec::with_capacity(i_percentiles.len());
let mut percentiles = Vec::with_capacity(i_percentiles.len());
for p in i_percentiles {
let value = {
let index = input.len() * p / 100;
let cap_index = index.min(input.len() - 1);
input[cap_index]
};

bucket_values.push(value);
percentiles.push(p as f32 / 100.0);
}

Percentiles {
v: bucket_values,
p: percentiles,
}
}

pub fn calculate_cummulative(values: &[Point]) -> PercentilesCummulative {
if values.is_empty() {
// note: percentile for empty array is undefined
return PercentilesCummulative {
bucket_values: vec![],
percentiles: vec![],
};
}

let is_monotonic = values.windows(2).all(|w| w[0].priority <= w[1].priority);
assert!(is_monotonic, "array of values must be sorted");

let value_sum: f64 = values.iter().map(|x| x.value).sum();
let mut agg: f64 = values[0].value;
let mut index = 0;
let p_step = 5;

let percentiles = (0..=100).step_by(p_step).map(|p| p as f64).collect_vec();

let dist = percentiles
.iter()
.map(|percentile| {
while agg < (value_sum * *percentile) / 100.0 {
index += 1;
agg += values[index].value;
}
let priority = values[index].priority;
HistValue {
percentile: *percentile as f32,
value: priority,
}
})
.collect_vec();

PercentilesCummulative {
bucket_values: dist.iter().map(|hv| hv.value).collect_vec(),
percentiles: dist.iter().map(|hv| hv.percentile / 100.0).collect_vec(),
}
}

pub struct Percentiles {
// value
pub v: Vec<f64>,
// percentile in range 0.0..1.0
pub p: Vec<f32>,
}

impl Display for Percentiles {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
for i in 0..self.v.len() {
write!(f, "p{}=>{} ", self.p[i] * 100.0, self.v[i])?;
}
Ok(())
}
}

#[allow(dead_code)]
impl Percentiles {
fn get_bucket_value(&self, percentile: f32) -> Option<f64> {
zip(&self.p, &self.v)
.find(|(&p, _v)| p == percentile)
.map(|(_p, &v)| v)
}
}

pub struct PercentilesCummulative {
pub bucket_values: Vec<f64>,
pub percentiles: Vec<f32>,
}

#[allow(dead_code)]
impl PercentilesCummulative {
fn get_bucket_value(&self, percentile: f32) -> Option<f64> {
zip(&self.percentiles, &self.bucket_values)
.find(|(&p, _cu)| p == percentile)
.map(|(_p, &cu)| cu)
}
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn test_calculate_supp_info() {
let mut values = vec![2.0, 4.0, 5.0, 3.0, 1.0];
values.sort_by_key(|&x| (x * 100.0) as i64);
let supp_info = calculate_percentiles(&values).v;
assert_eq!(supp_info[0], 1.0);
assert_eq!(supp_info[10], 3.0);
assert_eq!(supp_info[15], 4.0);
assert_eq!(supp_info[18], 5.0);
assert_eq!(supp_info[20], 5.0);
}

#[test]
fn test_calculate_supp_info_by_cu() {
// total of 20000 CU where consumed
let values = vec![Point::from((100.0, 10000.0)), Point::from((200.0, 10000.0))];
let PercentilesCummulative {
bucket_values: by_cu,
percentiles: by_cu_percentiles,
..
} = calculate_cummulative(&values);
assert_eq!(by_cu_percentiles[10], 0.5);
assert_eq!(by_cu[10], 100.0); // need more than 100 to beat 50% of the CU
assert_eq!(by_cu[11], 200.0); // need more than 200 to beat 55% of the CU
assert_eq!(by_cu[20], 200.0); // need more than 200 to beat 100% of the CU
}

#[test]
fn test_empty_array() {
let values = vec![];
let supp_info = calculate_percentiles(&values).v;
// note: this is controversal
assert!(supp_info.is_empty());
}
#[test]
fn test_zeros() {
let values = vec![Point::from((0.0, 0.0)), Point::from((0.0, 0.0))];
let supp_info = calculate_cummulative(&values).bucket_values;
assert_eq!(supp_info[0], 0.0);
}

#[test]
fn test_statisticshowto() {
let values = vec![30.0, 33.0, 43.0, 53.0, 56.0, 67.0, 68.0, 72.0];
let supp_info = calculate_percentiles(&values);
assert_eq!(supp_info.v[5], 43.0);
assert_eq!(supp_info.p[5], 0.25);
assert_eq!(supp_info.get_bucket_value(0.25), Some(43.0));

let values = vec![
Point::from((30.0, 1.0)),
Point::from((33.0, 2.0)),
Point::from((43.0, 3.0)),
Point::from((53.0, 4.0)),
Point::from((56.0, 5.0)),
Point::from((67.0, 6.0)),
Point::from((68.0, 7.0)),
Point::from((72.0, 8.0)),
];
let supp_info = calculate_cummulative(&values);
assert_eq!(supp_info.percentiles[20], 1.0);
assert_eq!(supp_info.bucket_values[20], 72.0);
}

#[test]
fn test_simple_non_integer_index() {
// Messwerte: 3 – 5 – 5 – 6 – 7 – 7 – 8 – 10 – 10
// In diesem Fall lautet es also 5.
let values = vec![3.0, 5.0, 5.0, 6.0, 7.0, 7.0, 8.0, 10.0, 10.0];

let supp_info = calculate_percentiles(&values);
assert_eq!(supp_info.p[4], 0.20);
assert_eq!(supp_info.v[5], 5.0);

let values = vec![
Point::from((3.0, 1.0)),
Point::from((5.0, 2.0)),
Point::from((5.0, 3.0)),
Point::from((6.0, 4.0)),
Point::from((7.0, 5.0)),
Point::from((7.0, 6.0)),
Point::from((8.0, 7.0)),
Point::from((10.0, 8.0)),
Point::from((10.0, 9.0)),
];
let supp_info = calculate_cummulative(&values);
assert_eq!(supp_info.percentiles[19], 0.95);
assert_eq!(supp_info.percentiles[20], 1.0);
assert_eq!(supp_info.bucket_values[19], 10.0);
assert_eq!(supp_info.bucket_values[20], 10.0);
}

#[test]
fn test_large_list() {
let values = (0..1000).map(|i| i as f64).collect_vec();
let supp_info = calculate_percentiles(&values);
assert_eq!(supp_info.v[19], 950.0);
assert_eq!(supp_info.p[19], 0.95);
}
}
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ pub mod channel_plugger;
pub mod grpc_subscription_autoreconnect_streams;
pub mod grpc_subscription_autoreconnect_tasks;
pub mod grpcmultiplex_fastestwins;
pub mod histogram;
mod obfuscate;
mod yellowstone_grpc_util;

Expand Down

0 comments on commit 92b4cfb

Please sign in to comment.