diff --git a/capture/src/sinks/kafka.rs b/capture/src/sinks/kafka.rs index bc45fa1..bff61b5 100644 --- a/capture/src/sinks/kafka.rs +++ b/capture/src/sinks/kafka.rs @@ -36,12 +36,11 @@ impl rdkafka::ClientContext for KafkaContext { for (topic, stats) in stats.topics { gauge!( "capture_kafka_produce_avg_batch_size_bytes", - "topic" => topic.clone() + "topic" => topic.clone() ) .set(stats.batchsize.avg as f64); gauge!( "capture_kafka_produce_avg_batch_size_events", - "topic" => topic ) .set(stats.batchcnt.avg as f64); @@ -49,30 +48,58 @@ impl rdkafka::ClientContext for KafkaContext { for (_, stats) in stats.brokers { let id_string = format!("{}", stats.nodeid); + if let Some(rtt) = stats.rtt { + gauge!( + "capture_kafka_produce_rtt_latency_ms", + "quantile" => "p50", + "broker" => id_string.clone() + ) + .set(rtt.p50 as f64); + gauge!( + "capture_kafka_produce_rtt_latency_ms", + "quantile" => "p90", + "broker" => id_string.clone() + ) + .set(rtt.p90 as f64); + gauge!( + "capture_kafka_produce_rtt_latency_ms", + "quantile" => "p95", + "broker" => id_string.clone() + ) + .set(rtt.p95 as f64); + gauge!( + "capture_kafka_produce_rtt_latency_ms", + "quantile" => "p99", + "broker" => id_string.clone() + ) + .set(rtt.p99 as f64); + } + gauge!( "capture_kafka_broker_requests_pending", - "broker" => id_string.clone() ) .set(stats.outbuf_cnt as f64); gauge!( "capture_kafka_broker_responses_awaiting", - "broker" => id_string.clone() ) .set(stats.waitresp_cnt as f64); counter!( "capture_kafka_broker_tx_errors_total", - "broker" => id_string.clone() ) .absolute(stats.txerrs); counter!( "capture_kafka_broker_rx_errors_total", - - "broker" => id_string + "broker" => id_string.clone() ) .absolute(stats.rxerrs); + counter!( + "capture_kafka_broker_request_timeouts", + "broker" => id_string + ) + .absolute(stats.req_timeouts); } } }