From 591219cb697450eeed9dc604ffebab5031ab8e52 Mon Sep 17 00:00:00 2001 From: Markus Unterwaditzer Date: Mon, 27 Nov 2023 18:00:37 +0100 Subject: [PATCH] ref(rust): Add a few metrics missing from snuba dashboard (#5102) * implement batch_time metric * add insertions.batch_write_ms metric * implement schema validation and invalid message metrics --- .../src/processing/strategies/reduce.rs | 57 ++++++++++--------- rust_snuba/src/factory.rs | 3 + rust_snuba/src/strategies/clickhouse.rs | 19 ++++++- rust_snuba/src/strategies/python.rs | 4 ++ rust_snuba/src/strategies/validate_schema.rs | 6 ++ 5 files changed, 61 insertions(+), 28 deletions(-) diff --git a/rust_snuba/rust_arroyo/src/processing/strategies/reduce.rs b/rust_snuba/rust_arroyo/src/processing/strategies/reduce.rs index 64034e2c938..1b0f921e7de 100644 --- a/rust_snuba/rust_arroyo/src/processing/strategies/reduce.rs +++ b/rust_snuba/rust_arroyo/src/processing/strategies/reduce.rs @@ -3,6 +3,7 @@ use crate::processing::strategies::{ SubmitError, }; use crate::types::{Message, Partition}; +use crate::utils::metrics::{get_metrics, BoxMetrics}; use std::collections::BTreeMap; use std::mem; use std::sync::Arc; @@ -50,6 +51,7 @@ pub struct Reduce { batch_state: BatchState, message_carried_over: Option>, commit_request_carried_over: Option, + metrics: BoxMetrics, } impl ProcessingStrategy for Reduce { @@ -130,6 +132,7 @@ impl Reduce { batch_state, message_carried_over: None, commit_request_carried_over: None, + metrics: get_metrics(), } } @@ -153,38 +156,40 @@ impl Reduce { return Ok(()); } + let batch_time = self.batch_state.batch_start_time.elapsed().ok(); let batch_complete = self.batch_state.message_count >= self.max_batch_size - || self - .batch_state - .batch_start_time - .elapsed() - .unwrap_or_default() - > self.max_batch_time; - - if batch_complete || force { - let batch_state = mem::replace( - &mut self.batch_state, - BatchState::new(self.initial_value.clone(), self.accumulator.clone()), + || batch_time.unwrap_or_default() > self.max_batch_time; + + if !batch_complete && !force { + return Ok(()); + } + + if let Some(batch_time) = batch_time { + self.metrics.timing( + "arroyo.strategies.reduce.batch_time", + batch_time.as_secs(), + None, ); + } - let next_message = - Message::new_any_message(batch_state.value.unwrap(), batch_state.offsets); + let batch_state = mem::replace( + &mut self.batch_state, + BatchState::new(self.initial_value.clone(), self.accumulator.clone()), + ); - match self.next_step.submit(next_message) { - Err(SubmitError::MessageRejected(MessageRejected { - message: transformed_message, - })) => { - self.message_carried_over = Some(transformed_message); - return Ok(()); - } - Err(SubmitError::InvalidMessage(invalid_message)) => { - return Err(invalid_message); - } - Ok(_) => return Ok(()), + let next_message = + Message::new_any_message(batch_state.value.unwrap(), batch_state.offsets); + + match self.next_step.submit(next_message) { + Err(SubmitError::MessageRejected(MessageRejected { + message: transformed_message, + })) => { + self.message_carried_over = Some(transformed_message); + Ok(()) } + Err(SubmitError::InvalidMessage(invalid_message)) => Err(invalid_message), + Ok(_) => Ok(()), } - - Ok(()) } } diff --git a/rust_snuba/src/factory.rs b/rust_snuba/src/factory.rs index 2d9d8b1320f..15a990d9695 100644 --- a/rust_snuba/src/factory.rs +++ b/rust_snuba/src/factory.rs @@ -13,6 +13,7 @@ use rust_arroyo::processing::strategies::run_task_in_threads::{ use rust_arroyo::processing::strategies::InvalidMessage; use rust_arroyo::processing::strategies::{ProcessingStrategy, ProcessingStrategyFactory}; use rust_arroyo::types::{BrokerMessage, InnerMessage, Message}; +use rust_arroyo::utils::metrics::get_metrics; use std::collections::BTreeMap; use std::sync::Arc; @@ -95,6 +96,8 @@ impl TaskRunner for MessageProcessor { // however, as Sentry captures `error` logs as errors by default, // we would double-log this error here: tracing::error!(%error, "Failed processing message"); + let metrics = get_metrics(); + metrics.increment("invalid_message", 1, None); sentry::with_scope( |_scope| { // FIXME(swatinem): we already moved `broker_message.payload` diff --git a/rust_snuba/src/strategies/clickhouse.rs b/rust_snuba/src/strategies/clickhouse.rs index ce4cb7b219d..3ec6b59fbab 100644 --- a/rust_snuba/src/strategies/clickhouse.rs +++ b/rust_snuba/src/strategies/clickhouse.rs @@ -1,5 +1,5 @@ use std::sync::Arc; -use std::time::Duration; +use std::time::{Duration, SystemTime}; use reqwest::header::{HeaderMap, HeaderValue, ACCEPT_ENCODING, CONNECTION}; use reqwest::{Client, Response}; @@ -46,6 +46,8 @@ impl TaskRunner for ClickhouseWriter { } tracing::debug!("performing write"); + let write_start = SystemTime::now(); + let response = client .send(insert_batch.encoded_rows().to_vec()) .await @@ -53,7 +55,20 @@ impl TaskRunner for ClickhouseWriter { tracing::debug!(?response); tracing::info!("Inserted {} rows", insert_batch.len()); - + let write_finish = SystemTime::now(); + + if let Ok(elapsed) = write_finish.duration_since(write_start) { + metrics.timing( + "insertions.batch_write_ms", + elapsed.as_millis() as u64, + None, + ); + } + metrics.increment( + "insertions.batch_write_msgs", + insert_batch.len() as i64, + None, + ); insert_batch.record_message_latency(&metrics); Ok(message) diff --git a/rust_snuba/src/strategies/python.rs b/rust_snuba/src/strategies/python.rs index c65208e92c3..99ef8730cb0 100644 --- a/rust_snuba/src/strategies/python.rs +++ b/rust_snuba/src/strategies/python.rs @@ -3,6 +3,7 @@ use rust_arroyo::processing::strategies::{ CommitRequest, InvalidMessage, MessageRejected, ProcessingStrategy, SubmitError, }; use rust_arroyo::types::{BrokerMessage, InnerMessage, Message, Partition}; +use rust_arroyo::utils::metrics::{get_metrics, BoxMetrics}; use std::collections::BTreeMap; use std::collections::VecDeque; @@ -55,6 +56,7 @@ pub struct PythonTransformStep { message_carried_over: Option>, processing_pool: Option, max_queue_depth: usize, + metrics: BoxMetrics, } impl PythonTransformStep { @@ -97,6 +99,7 @@ impl PythonTransformStep { message_carried_over: None, processing_pool, max_queue_depth: max_queue_depth.unwrap_or(processes), + metrics: get_metrics(), }) } @@ -159,6 +162,7 @@ impl PythonTransformStep { } } Err(error) => { + self.metrics.increment("invalid_message", 1, None); tracing::error!(error, "Invalid message"); } } diff --git a/rust_snuba/src/strategies/validate_schema.rs b/rust_snuba/src/strategies/validate_schema.rs index 43e95530b70..41026b52efa 100644 --- a/rust_snuba/src/strategies/validate_schema.rs +++ b/rust_snuba/src/strategies/validate_schema.rs @@ -9,11 +9,13 @@ use rust_arroyo::processing::strategies::{ CommitRequest, InvalidMessage, ProcessingStrategy, SubmitError, }; use rust_arroyo::types::{InnerMessage, Message}; +use rust_arroyo::utils::metrics::{get_metrics, BoxMetrics}; use sentry_kafka_schemas; pub struct SchemaValidator { schema: Option>, enforce_schema: bool, + metrics: BoxMetrics, } impl SchemaValidator { @@ -34,6 +36,7 @@ impl SchemaValidator { SchemaValidator { schema, enforce_schema, + metrics: get_metrics(), } } } @@ -44,6 +47,7 @@ impl TaskRunner for SchemaValidator { return Box::pin(async move { Ok(message) }); }; let enforce_schema = self.enforce_schema; + let metrics = self.metrics.clone(); Box::pin(async move { // FIXME: this will panic when the payload is empty @@ -54,6 +58,8 @@ impl TaskRunner for SchemaValidator { }; tracing::error!(%error, "Validation error"); + metrics.increment("schema_validation.failed", 1, None); + if !enforce_schema { return Ok(message); };