From 686bd4d4d92370970b162c3fb82cb08c376b8dbd Mon Sep 17 00:00:00 2001 From: Colin <161344340+colin-sentry@users.noreply.github.com> Date: Fri, 10 Jan 2025 16:49:35 -0500 Subject: [PATCH] feat(ourlogs): Add a kafka consumer (#6743) This consumer will be used in CI and dev (among other places) to put logs into clickhouse directly --- rust_snuba/src/processors/mod.rs | 3 + rust_snuba/src/processors/ourlogs.rs | 224 ++++++++++++++++++ ...or-snuba-ourlogs__1__maximal_log.json.snap | 30 +++ ...or-snuba-ourlogs__1__minimal_log.json.snap | 22 ++ .../datasets/processors/ourlogs_processor.py | 6 + 5 files changed, 285 insertions(+) create mode 100644 rust_snuba/src/processors/ourlogs.rs create mode 100644 rust_snuba/src/processors/snapshots/rust_snuba__processors__tests__schemas@snuba-ourlogs-OurlogsMessageProcessor-snuba-ourlogs__1__maximal_log.json.snap create mode 100644 rust_snuba/src/processors/snapshots/rust_snuba__processors__tests__schemas@snuba-ourlogs-OurlogsMessageProcessor-snuba-ourlogs__1__minimal_log.json.snap create mode 100644 snuba/datasets/processors/ourlogs_processor.py diff --git a/rust_snuba/src/processors/mod.rs b/rust_snuba/src/processors/mod.rs index 8ad75fe7b7a..b3817d7f17b 100644 --- a/rust_snuba/src/processors/mod.rs +++ b/rust_snuba/src/processors/mod.rs @@ -2,6 +2,7 @@ pub(crate) mod eap_spans; mod errors; mod functions; mod generic_metrics; +mod ourlogs; mod outcomes; mod profile_chunks; mod profiles; @@ -11,6 +12,7 @@ mod replays; mod spans; mod uptime_monitor_checks; mod utils; + use crate::config::ProcessorConfig; use crate::types::{InsertBatch, InsertOrReplacement, KafkaMessageMetadata}; use sentry_arroyo::backends::kafka::types::KafkaPayload; @@ -57,6 +59,7 @@ define_processing_functions! { ("UptimeMonitorChecksProcessor", "snuba-uptime-results", ProcessingFunctionType::ProcessingFunction(uptime_monitor_checks::process_message)), ("SpansMessageProcessor", "snuba-spans", ProcessingFunctionType::ProcessingFunction(spans::process_message)), ("EAPSpansMessageProcessor", "snuba-spans", ProcessingFunctionType::ProcessingFunction(eap_spans::process_message)), + ("OurlogsMessageProcessor", "snuba-ourlogs", ProcessingFunctionType::ProcessingFunction(ourlogs::process_message)), ("OutcomesProcessor", "outcomes", ProcessingFunctionType::ProcessingFunction(outcomes::process_message)), ("GenericCountersMetricsProcessor", "snuba-generic-metrics", ProcessingFunctionType::ProcessingFunction(generic_metrics::process_counter_message)), ("GenericSetsMetricsProcessor", "snuba-generic-metrics", ProcessingFunctionType::ProcessingFunction(generic_metrics::process_set_message)), diff --git a/rust_snuba/src/processors/ourlogs.rs b/rust_snuba/src/processors/ourlogs.rs new file mode 100644 index 00000000000..39a54203cd6 --- /dev/null +++ b/rust_snuba/src/processors/ourlogs.rs @@ -0,0 +1,224 @@ +use anyhow::Context; +use chrono::DateTime; +use serde::{de, Deserialize, Deserializer, Serialize}; +use std::collections::BTreeMap; +use std::fmt; +use uuid::Uuid; + +use schemars::JsonSchema; +use sentry_arroyo::backends::kafka::types::KafkaPayload; +use serde::de::{MapAccess, Visitor}; + +use crate::config::ProcessorConfig; +use crate::processors::utils::enforce_retention; +use crate::types::{InsertBatch, KafkaMessageMetadata}; + +#[derive(Debug, Default, Deserialize, JsonSchema)] +pub(crate) struct FromLogMessage { + //required attributes + organization_id: u64, + project_id: u64, + timestamp_nanos: u64, + observed_timestamp_nanos: u64, + retention_days: u16, + body: String, + + //optional attributes + trace_id: Option, + span_id: Option, //hex encoded + attributes: Option>, + severity_text: Option, + severity_number: Option, +} + +#[derive(Debug, JsonSchema)] + +enum FromAttribute { + String(String), + Int(i64), + Double(f64), + Bool(bool), +} + +impl<'de> Deserialize<'de> for FromAttribute { + fn deserialize(deserializer: D) -> Result + where + D: Deserializer<'de>, + { + struct FromAttributeVisitor; + + impl<'de> Visitor<'de> for FromAttributeVisitor { + type Value = FromAttribute; + + fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result { + formatter.write_str("a map with a single key-value pair for FromAttribute") + } + + fn visit_map(self, mut map: A) -> Result + where + A: MapAccess<'de>, + { + let key_value = map.next_entry::()?; + if let Some((key, value)) = key_value { + match key.as_str() { + "string_value" => { + let str_val: String = + serde_json::from_value(value).map_err(de::Error::custom)?; + Ok(FromAttribute::String(str_val)) + } + "int_value" => { + let int_val: i64 = + serde_json::from_value(value).map_err(de::Error::custom)?; + Ok(FromAttribute::Int(int_val)) + } + "double_value" => { + let double_val: f64 = + serde_json::from_value(value).map_err(de::Error::custom)?; + Ok(FromAttribute::Double(double_val)) + } + "bool_value" => { + let bool_val: bool = + serde_json::from_value(value).map_err(de::Error::custom)?; + Ok(FromAttribute::Bool(bool_val)) + } + _ => Err(de::Error::unknown_field( + &key, + &["string_value", "int_value", "double_value", "bool_value"], + )), + } + } else { + Err(de::Error::custom("expected a single key-value pair")) + } + } + } + + deserializer.deserialize_map(FromAttributeVisitor) + } +} + +pub fn process_message( + payload: KafkaPayload, + _metadata: KafkaMessageMetadata, + config: &ProcessorConfig, +) -> anyhow::Result { + let payload_bytes = payload.payload().context("Expected payload")?; + let msg: FromLogMessage = serde_json::from_slice(payload_bytes)?; + let origin_timestamp = DateTime::from_timestamp( + (msg.observed_timestamp_nanos / 1_000_000_000) as i64, + (msg.observed_timestamp_nanos % 1_000_000_000) as u32, + ); + let mut ourlog: Ourlog = msg.into(); + + ourlog.retention_days = enforce_retention(Some(ourlog.retention_days), &config.env_config); + + InsertBatch::from_rows([ourlog], origin_timestamp) +} + +#[derive(Debug, Default, Serialize)] +struct Ourlog { + organization_id: u64, + project_id: u64, + trace_id: Uuid, + span_id: u64, + severity_text: String, + severity_number: u8, + retention_days: u16, + timestamp: u64, + body: String, + attr_string: BTreeMap, + attr_int: BTreeMap, + attr_double: BTreeMap, + attr_bool: BTreeMap, +} + +impl From for Ourlog { + fn from(from: FromLogMessage) -> Ourlog { + let mut res = Self { + organization_id: from.organization_id, + project_id: from.project_id, + trace_id: from.trace_id.unwrap_or_default(), + span_id: from + .span_id + .map_or(0, |s| u64::from_str_radix(&s, 16).unwrap_or(0)), + severity_text: from.severity_text.unwrap_or_else(|| "INFO".into()), + severity_number: from.severity_number.unwrap_or_default(), + retention_days: from.retention_days, + timestamp: from.timestamp_nanos, + body: from.body, + attr_string: BTreeMap::new(), + attr_int: BTreeMap::new(), + attr_double: BTreeMap::new(), + attr_bool: BTreeMap::new(), + }; + + if let Some(attributes) = from.attributes { + for (k, v) in attributes { + match v { + FromAttribute::String(s) => { + res.attr_string.insert(k, s); + } + FromAttribute::Int(i) => { + res.attr_int.insert(k, i); + } + FromAttribute::Double(d) => { + res.attr_double.insert(k, d); + } + FromAttribute::Bool(b) => { + res.attr_bool.insert(k, b); + } + } + } + } + + res + } +} + +#[cfg(test)] +mod tests { + use std::time::SystemTime; + + use super::*; + + const OURLOG_KAFKA_MESSAGE: &str = r#" +{ + "organization_id": 10, + "project_id": 1, + "trace_id": "3c8c20d5-0a54-4a1c-ba10-f76f574d856f", + "trace_flags": 255, + "span_id": "11002233AABBCCDD", + "severity_text": "WARNING", + "severity_number": 1, + "retention_days": 90, + "timestamp_nanos": 1715868485371000, + "observed_timestamp_nanos": 1715868485371000, + "body": "hello world!", + "attributes": { + "some.user.tag": { + "string_value": "hello" + }, + "another.user.tag": { + "int_value": 10 + }, + "double.user.tag": { + "double_value": -10.59 + }, + "bool.user.tag": { + "bool_value": true + } + } +} + "#; + + #[test] + fn test_valid_log() { + let payload = KafkaPayload::new(None, None, Some(OURLOG_KAFKA_MESSAGE.as_bytes().to_vec())); + let meta = KafkaMessageMetadata { + partition: 0, + offset: 1, + timestamp: DateTime::from(SystemTime::now()), + }; + process_message(payload, meta, &ProcessorConfig::default()) + .expect("The message should be processed"); + } +} diff --git a/rust_snuba/src/processors/snapshots/rust_snuba__processors__tests__schemas@snuba-ourlogs-OurlogsMessageProcessor-snuba-ourlogs__1__maximal_log.json.snap b/rust_snuba/src/processors/snapshots/rust_snuba__processors__tests__schemas@snuba-ourlogs-OurlogsMessageProcessor-snuba-ourlogs__1__maximal_log.json.snap new file mode 100644 index 00000000000..ac361bcce62 --- /dev/null +++ b/rust_snuba/src/processors/snapshots/rust_snuba__processors__tests__schemas@snuba-ourlogs-OurlogsMessageProcessor-snuba-ourlogs__1__maximal_log.json.snap @@ -0,0 +1,30 @@ +--- +source: src/processors/mod.rs +description: "{\n \"organization_id\": 69,\n \"project_id\": 1,\n \"trace_id\": \"3c8c20d5-0a54-4a1c-ba10-f76f574d856f\",\n \"trace_flags\": 255,\n \"span_id\": \"11002233AABBCCDD\",\n \"severity_text\": \"WARNING\",\n \"severity_number\": 1,\n \"retention_days\": 90,\n \"timestamp_nanos\": 1715868485371000,\n \"observed_timestamp_nanos\": 1715868485371000,\n \"body\": \"hello world!\",\n \"attributes\": {\n \"some.user.tag\": {\n \"string_value\": \"hello\"\n },\n \"another.user.tag\": {\n \"int_value\": 10\n },\n \"double.user.tag\": {\n \"double_value\": -10.59\n },\n \"bool.user.tag\": {\n \"bool_value\": true\n }\n }\n}\n" +expression: snapshot_payload +--- +[ + { + "attr_bool": { + "bool.user.tag": true + }, + "attr_double": { + "double.user.tag": -10.59 + }, + "attr_int": { + "another.user.tag": 10 + }, + "attr_string": { + "some.user.tag": "hello" + }, + "body": "hello world!", + "organization_id": 69, + "project_id": 1, + "retention_days": 90, + "severity_number": 1, + "severity_text": "WARNING", + "span_id": 1225016703947885789, + "timestamp": 1715868485371000, + "trace_id": "3c8c20d5-0a54-4a1c-ba10-f76f574d856f" + } +] diff --git a/rust_snuba/src/processors/snapshots/rust_snuba__processors__tests__schemas@snuba-ourlogs-OurlogsMessageProcessor-snuba-ourlogs__1__minimal_log.json.snap b/rust_snuba/src/processors/snapshots/rust_snuba__processors__tests__schemas@snuba-ourlogs-OurlogsMessageProcessor-snuba-ourlogs__1__minimal_log.json.snap new file mode 100644 index 00000000000..671e7e24132 --- /dev/null +++ b/rust_snuba/src/processors/snapshots/rust_snuba__processors__tests__schemas@snuba-ourlogs-OurlogsMessageProcessor-snuba-ourlogs__1__minimal_log.json.snap @@ -0,0 +1,22 @@ +--- +source: src/processors/mod.rs +description: "{\n \"organization_id\": 69,\n \"project_id\": 1,\n \"received\": 1715868485.381,\n \"retention_days\": 90,\n \"timestamp_nanos\": 1715868485371000,\n \"observed_timestamp_nanos\": 1715868485371000,\n \"body\": \"hello world!\"\n}\n" +expression: snapshot_payload +--- +[ + { + "attr_bool": {}, + "attr_double": {}, + "attr_int": {}, + "attr_string": {}, + "body": "hello world!", + "organization_id": 69, + "project_id": 1, + "retention_days": 90, + "severity_number": 0, + "severity_text": "INFO", + "span_id": 0, + "timestamp": 1715868485371000, + "trace_id": "00000000-0000-0000-0000-000000000000" + } +] diff --git a/snuba/datasets/processors/ourlogs_processor.py b/snuba/datasets/processors/ourlogs_processor.py new file mode 100644 index 00000000000..4bb4ec0d1e8 --- /dev/null +++ b/snuba/datasets/processors/ourlogs_processor.py @@ -0,0 +1,6 @@ +from snuba.datasets.processors.rust_compat_processor import RustCompatProcessor + + +class OurlogsMessageProcessor(RustCompatProcessor): + def __init__(self) -> None: + super().__init__("OurlogsMessageProcessor")