Skip to content

Commit

Permalink
feat: write metrics (#25692)
Browse files Browse the repository at this point in the history
Added prometheus metrics to track lines written and bytes written per
database. The write buffer does the tracking after validation of incoming
line protocol.

Tests added to verify.
  • Loading branch information
hiltontj authored Dec 20, 2024
1 parent 4f9d641 commit d10ad87
Show file tree
Hide file tree
Showing 7 changed files with 301 additions and 13 deletions.
1 change: 1 addition & 0 deletions influxdb3/src/commands/serve.rs
Original file line number Diff line number Diff line change
Expand Up @@ -484,6 +484,7 @@ pub async fn command(config: Config) -> Result<()> {
executor: Arc::clone(&exec),
wal_config,
parquet_cache,
metric_registry: Arc::clone(&metrics),
})
.await
.map_err(|e| Error::WriteBufferInit(e.into()))?;
Expand Down
1 change: 1 addition & 0 deletions influxdb3_server/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -815,6 +815,7 @@ mod tests {
executor: Arc::clone(&exec),
wal_config: WalConfig::test_config(),
parquet_cache: Some(parquet_cache),
metric_registry: Arc::clone(&metrics),
},
)
.await
Expand Down
1 change: 1 addition & 0 deletions influxdb3_server/src/query_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -700,6 +700,7 @@ mod tests {
snapshot_size: 1,
},
parquet_cache: Some(parquet_cache),
metric_registry: Default::default(),
})
.await
.unwrap();
Expand Down
2 changes: 1 addition & 1 deletion influxdb3_write/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ iox_catalog.workspace = true
iox_http.workspace = true
iox_query.workspace = true
iox_time.workspace = true
metric.workspace = true
parquet_file.workspace = true
observability_deps.workspace = true
schema.workspace = true
Expand Down Expand Up @@ -71,7 +72,6 @@ optional = true
# Core Crates
arrow_util.workspace = true
insta.workspace = true
metric.workspace = true
pretty_assertions.workspace = true
test_helpers.workspace = true
test-log.workspace = true
94 changes: 94 additions & 0 deletions influxdb3_write/src/write_buffer/metrics.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
use std::borrow::Cow;

use metric::{Metric, Registry, U64Counter};

#[derive(Debug)]
pub(super) struct WriteMetrics {
write_lines_total: Metric<U64Counter>,
write_bytes_total: Metric<U64Counter>,
}

pub(super) const WRITE_LINES_TOTAL_NAME: &str = "influxdb3_write_lines_total";
pub(super) const WRITE_BYTES_TOTAL_NAME: &str = "influxdb3_write_bytes_total";

impl WriteMetrics {
pub(super) fn new(metric_registry: &Registry) -> Self {
let write_lines_total = metric_registry.register_metric::<U64Counter>(
WRITE_LINES_TOTAL_NAME,
"track total number of lines written to the database",
);
let write_bytes_total = metric_registry.register_metric::<U64Counter>(
WRITE_BYTES_TOTAL_NAME,
"track total number of bytes written to the database",
);
Self {
write_lines_total,
write_bytes_total,
}
}

pub(super) fn record_lines<D: Into<String>>(&self, db: D, lines: u64) {
let db: Cow<'static, str> = Cow::from(db.into());
self.write_lines_total.recorder([("db", db)]).inc(lines);
}

pub(super) fn record_bytes<D: Into<String>>(&self, db: D, bytes: u64) {
let db: Cow<'static, str> = Cow::from(db.into());
self.write_bytes_total.recorder([("db", db)]).inc(bytes);
}
}

#[cfg(test)]
mod tests {
use metric::{Attributes, Registry};

use super::WriteMetrics;

#[test]
fn record_lines() {
let metric_registry = Registry::new();
let metrics = WriteMetrics::new(&metric_registry);
metrics.record_lines("foo", 64);
metrics.record_lines(String::from("bar"), 256);
assert_eq!(
64,
metrics
.write_lines_total
.get_observer(&Attributes::from(&[("db", "foo")]))
.unwrap()
.fetch()
);
assert_eq!(
256,
metrics
.write_lines_total
.get_observer(&Attributes::from(&[("db", "bar")]))
.unwrap()
.fetch()
);
}

#[test]
fn record_bytes() {
let metric_registry = Registry::new();
let metrics = WriteMetrics::new(&metric_registry);
metrics.record_bytes("foo", 64);
metrics.record_bytes(String::from("bar"), 256);
assert_eq!(
64,
metrics
.write_bytes_total
.get_observer(&Attributes::from(&[("db", "foo")]))
.unwrap()
.fetch()
);
assert_eq!(
256,
metrics
.write_bytes_total
.get_observer(&Attributes::from(&[("db", "bar")]))
.unwrap()
.fetch()
);
}
}
Loading

0 comments on commit d10ad87

Please sign in to comment.