Skip to content

Commit

Permalink
Slow down introspector in prometheus mode to allow other DB operation…
Browse files Browse the repository at this point in the history
…s to be processed
  • Loading branch information
vstakhov committed Sep 7, 2022
1 parent 66243cd commit 431be42
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 6 deletions.
33 changes: 28 additions & 5 deletions src/kvdb/prometheus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,9 @@
use crate::kvdb::IntrospectorKvdb;
use clap::Parser;
use color_eyre::Result;
use log::{error, info};
use log::{debug, error, info};
use prometheus_endpoint::{prometheus::IntGaugeVec, Opts, Registry};
use rand::{thread_rng, Rng};

#[derive(Clone, Debug, Parser, Default)]
#[clap(rename_all = "kebab-case")]
Expand All @@ -29,6 +30,12 @@ pub struct KvdbPrometheusOptions {
/// Database poll timeout (default, once per 5 minutes).
#[clap(long, default_value = "300.0")]
poll_timeout: f32,
/// Probability to delay in the iteration to reduce instant DB load (default - around 10000 iterations)
#[clap(long, default_value = "0.00001")]
sleep_probability: f32,
/// Sleep time in seconds (default 10ms)
#[clap(long, default_value = "0.01")]
sleep_time: f32,
}

struct KvdbPrometheusMetrics {
Expand Down Expand Up @@ -77,10 +84,13 @@ async fn update_db<D: IntrospectorKvdb>(
Ok(columns) => {
let mut update_results: Vec<UpdateResult> = vec![];

let all_done = columns.iter().all(|col| {
let mut all_done = true;
for col in columns {
let mut keys_space = 0_i64;
let mut keys_count = 0_i64;
let mut values_space = 0_i64;
// Used to sleep more frequently on large keys
let mut size_factor: f32 = 1.0;

info!("Iterating over column {}", col.as_str());
match db.iter_values(col.as_str()) {
Expand All @@ -89,20 +99,33 @@ async fn update_db<D: IntrospectorKvdb>(
keys_space += key.len() as i64;
keys_count += 1;
values_space += value.len() as i64;

if prometheus_opts.sleep_probability > 0.0 && prometheus_opts.sleep_time > 0.0 {
let dice: f32 = thread_rng().gen_range(0.0..1.0);
size_factor += (key.len() + value.len()) as f32 / 10240.0;
if dice < prometheus_opts.sleep_probability * size_factor {
debug!("sleeping to unload database");
tokio::time::sleep(std::time::Duration::from_secs_f32(
prometheus_opts.sleep_time,
))
.await;
size_factor = 1.0;
}
}
},
Err(e) => {
error!(
"Failed to get iterator for column {} in database: {:?}, trying to reopen database",
col.as_str(),
e
);
return false
all_done = false;
break
},
}

update_results.push(UpdateResult { column: col.clone(), keys_count, keys_space, values_space });
true
});
}

if all_done {
for res in &update_results {
Expand Down
2 changes: 1 addition & 1 deletion src/kvdb/traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

use color_eyre::Result;

pub type DBIter<'a> = Box<dyn Iterator<Item = (Box<[u8]>, Box<[u8]>)> + 'a>;
pub type DBIter<'a> = Box<dyn Iterator<Item = (Box<[u8]>, Box<[u8]>)> + 'a + Send + Sync>;
/// A minimum subset of the functions required to open a database for introspection
pub trait IntrospectorKvdb {
/// Opens database with some configuration
Expand Down

0 comments on commit 431be42

Please sign in to comment.