Skip to content

Commit

Permalink
feat(ourlogs): Add a kafka consumer (#6743)
Browse files Browse the repository at this point in the history
This consumer will be used in CI and dev (among other places) to put
logs into clickhouse directly
  • Loading branch information
colin-sentry authored Jan 10, 2025
1 parent 96d5c2b commit 686bd4d
Show file tree
Hide file tree
Showing 5 changed files with 285 additions and 0 deletions.
3 changes: 3 additions & 0 deletions rust_snuba/src/processors/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ pub(crate) mod eap_spans;
mod errors;
mod functions;
mod generic_metrics;
mod ourlogs;
mod outcomes;
mod profile_chunks;
mod profiles;
Expand All @@ -11,6 +12,7 @@ mod replays;
mod spans;
mod uptime_monitor_checks;
mod utils;

use crate::config::ProcessorConfig;
use crate::types::{InsertBatch, InsertOrReplacement, KafkaMessageMetadata};
use sentry_arroyo::backends::kafka::types::KafkaPayload;
Expand Down Expand Up @@ -57,6 +59,7 @@ define_processing_functions! {
("UptimeMonitorChecksProcessor", "snuba-uptime-results", ProcessingFunctionType::ProcessingFunction(uptime_monitor_checks::process_message)),
("SpansMessageProcessor", "snuba-spans", ProcessingFunctionType::ProcessingFunction(spans::process_message)),
("EAPSpansMessageProcessor", "snuba-spans", ProcessingFunctionType::ProcessingFunction(eap_spans::process_message)),
("OurlogsMessageProcessor", "snuba-ourlogs", ProcessingFunctionType::ProcessingFunction(ourlogs::process_message)),
("OutcomesProcessor", "outcomes", ProcessingFunctionType::ProcessingFunction(outcomes::process_message)),
("GenericCountersMetricsProcessor", "snuba-generic-metrics", ProcessingFunctionType::ProcessingFunction(generic_metrics::process_counter_message)),
("GenericSetsMetricsProcessor", "snuba-generic-metrics", ProcessingFunctionType::ProcessingFunction(generic_metrics::process_set_message)),
Expand Down
224 changes: 224 additions & 0 deletions rust_snuba/src/processors/ourlogs.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,224 @@
use anyhow::Context;
use chrono::DateTime;
use serde::{de, Deserialize, Deserializer, Serialize};
use std::collections::BTreeMap;
use std::fmt;
use uuid::Uuid;

use schemars::JsonSchema;
use sentry_arroyo::backends::kafka::types::KafkaPayload;
use serde::de::{MapAccess, Visitor};

use crate::config::ProcessorConfig;
use crate::processors::utils::enforce_retention;
use crate::types::{InsertBatch, KafkaMessageMetadata};

#[derive(Debug, Default, Deserialize, JsonSchema)]
pub(crate) struct FromLogMessage {
//required attributes
organization_id: u64,
project_id: u64,
timestamp_nanos: u64,
observed_timestamp_nanos: u64,
retention_days: u16,
body: String,

//optional attributes
trace_id: Option<Uuid>,
span_id: Option<String>, //hex encoded
attributes: Option<BTreeMap<String, FromAttribute>>,
severity_text: Option<String>,
severity_number: Option<u8>,
}

#[derive(Debug, JsonSchema)]

enum FromAttribute {
String(String),
Int(i64),
Double(f64),
Bool(bool),
}

impl<'de> Deserialize<'de> for FromAttribute {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: Deserializer<'de>,
{
struct FromAttributeVisitor;

impl<'de> Visitor<'de> for FromAttributeVisitor {
type Value = FromAttribute;

fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
formatter.write_str("a map with a single key-value pair for FromAttribute")
}

fn visit_map<A>(self, mut map: A) -> Result<Self::Value, A::Error>
where
A: MapAccess<'de>,
{
let key_value = map.next_entry::<String, serde_json::Value>()?;
if let Some((key, value)) = key_value {
match key.as_str() {
"string_value" => {
let str_val: String =
serde_json::from_value(value).map_err(de::Error::custom)?;
Ok(FromAttribute::String(str_val))
}
"int_value" => {
let int_val: i64 =
serde_json::from_value(value).map_err(de::Error::custom)?;
Ok(FromAttribute::Int(int_val))
}
"double_value" => {
let double_val: f64 =
serde_json::from_value(value).map_err(de::Error::custom)?;
Ok(FromAttribute::Double(double_val))
}
"bool_value" => {
let bool_val: bool =
serde_json::from_value(value).map_err(de::Error::custom)?;
Ok(FromAttribute::Bool(bool_val))
}
_ => Err(de::Error::unknown_field(
&key,
&["string_value", "int_value", "double_value", "bool_value"],
)),
}
} else {
Err(de::Error::custom("expected a single key-value pair"))
}
}
}

deserializer.deserialize_map(FromAttributeVisitor)
}
}

pub fn process_message(
payload: KafkaPayload,
_metadata: KafkaMessageMetadata,
config: &ProcessorConfig,
) -> anyhow::Result<InsertBatch> {
let payload_bytes = payload.payload().context("Expected payload")?;
let msg: FromLogMessage = serde_json::from_slice(payload_bytes)?;
let origin_timestamp = DateTime::from_timestamp(
(msg.observed_timestamp_nanos / 1_000_000_000) as i64,
(msg.observed_timestamp_nanos % 1_000_000_000) as u32,
);
let mut ourlog: Ourlog = msg.into();

ourlog.retention_days = enforce_retention(Some(ourlog.retention_days), &config.env_config);

InsertBatch::from_rows([ourlog], origin_timestamp)
}

#[derive(Debug, Default, Serialize)]
struct Ourlog {
organization_id: u64,
project_id: u64,
trace_id: Uuid,
span_id: u64,
severity_text: String,
severity_number: u8,
retention_days: u16,
timestamp: u64,
body: String,
attr_string: BTreeMap<String, String>,
attr_int: BTreeMap<String, i64>,
attr_double: BTreeMap<String, f64>,
attr_bool: BTreeMap<String, bool>,
}

impl From<FromLogMessage> for Ourlog {
fn from(from: FromLogMessage) -> Ourlog {
let mut res = Self {
organization_id: from.organization_id,
project_id: from.project_id,
trace_id: from.trace_id.unwrap_or_default(),
span_id: from
.span_id
.map_or(0, |s| u64::from_str_radix(&s, 16).unwrap_or(0)),
severity_text: from.severity_text.unwrap_or_else(|| "INFO".into()),
severity_number: from.severity_number.unwrap_or_default(),
retention_days: from.retention_days,
timestamp: from.timestamp_nanos,
body: from.body,
attr_string: BTreeMap::new(),
attr_int: BTreeMap::new(),
attr_double: BTreeMap::new(),
attr_bool: BTreeMap::new(),
};

if let Some(attributes) = from.attributes {
for (k, v) in attributes {
match v {
FromAttribute::String(s) => {
res.attr_string.insert(k, s);
}
FromAttribute::Int(i) => {
res.attr_int.insert(k, i);
}
FromAttribute::Double(d) => {
res.attr_double.insert(k, d);
}
FromAttribute::Bool(b) => {
res.attr_bool.insert(k, b);
}
}
}
}

res
}
}

#[cfg(test)]
mod tests {
use std::time::SystemTime;

use super::*;

const OURLOG_KAFKA_MESSAGE: &str = r#"
{
"organization_id": 10,
"project_id": 1,
"trace_id": "3c8c20d5-0a54-4a1c-ba10-f76f574d856f",
"trace_flags": 255,
"span_id": "11002233AABBCCDD",
"severity_text": "WARNING",
"severity_number": 1,
"retention_days": 90,
"timestamp_nanos": 1715868485371000,
"observed_timestamp_nanos": 1715868485371000,
"body": "hello world!",
"attributes": {
"some.user.tag": {
"string_value": "hello"
},
"another.user.tag": {
"int_value": 10
},
"double.user.tag": {
"double_value": -10.59
},
"bool.user.tag": {
"bool_value": true
}
}
}
"#;

#[test]
fn test_valid_log() {
let payload = KafkaPayload::new(None, None, Some(OURLOG_KAFKA_MESSAGE.as_bytes().to_vec()));
let meta = KafkaMessageMetadata {
partition: 0,
offset: 1,
timestamp: DateTime::from(SystemTime::now()),
};
process_message(payload, meta, &ProcessorConfig::default())
.expect("The message should be processed");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
---
source: src/processors/mod.rs
description: "{\n \"organization_id\": 69,\n \"project_id\": 1,\n \"trace_id\": \"3c8c20d5-0a54-4a1c-ba10-f76f574d856f\",\n \"trace_flags\": 255,\n \"span_id\": \"11002233AABBCCDD\",\n \"severity_text\": \"WARNING\",\n \"severity_number\": 1,\n \"retention_days\": 90,\n \"timestamp_nanos\": 1715868485371000,\n \"observed_timestamp_nanos\": 1715868485371000,\n \"body\": \"hello world!\",\n \"attributes\": {\n \"some.user.tag\": {\n \"string_value\": \"hello\"\n },\n \"another.user.tag\": {\n \"int_value\": 10\n },\n \"double.user.tag\": {\n \"double_value\": -10.59\n },\n \"bool.user.tag\": {\n \"bool_value\": true\n }\n }\n}\n"
expression: snapshot_payload
---
[
{
"attr_bool": {
"bool.user.tag": true
},
"attr_double": {
"double.user.tag": -10.59
},
"attr_int": {
"another.user.tag": 10
},
"attr_string": {
"some.user.tag": "hello"
},
"body": "hello world!",
"organization_id": 69,
"project_id": 1,
"retention_days": 90,
"severity_number": 1,
"severity_text": "WARNING",
"span_id": 1225016703947885789,
"timestamp": 1715868485371000,
"trace_id": "3c8c20d5-0a54-4a1c-ba10-f76f574d856f"
}
]
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
---
source: src/processors/mod.rs
description: "{\n \"organization_id\": 69,\n \"project_id\": 1,\n \"received\": 1715868485.381,\n \"retention_days\": 90,\n \"timestamp_nanos\": 1715868485371000,\n \"observed_timestamp_nanos\": 1715868485371000,\n \"body\": \"hello world!\"\n}\n"
expression: snapshot_payload
---
[
{
"attr_bool": {},
"attr_double": {},
"attr_int": {},
"attr_string": {},
"body": "hello world!",
"organization_id": 69,
"project_id": 1,
"retention_days": 90,
"severity_number": 0,
"severity_text": "INFO",
"span_id": 0,
"timestamp": 1715868485371000,
"trace_id": "00000000-0000-0000-0000-000000000000"
}
]
6 changes: 6 additions & 0 deletions snuba/datasets/processors/ourlogs_processor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
from snuba.datasets.processors.rust_compat_processor import RustCompatProcessor


class OurlogsMessageProcessor(RustCompatProcessor):
def __init__(self) -> None:
super().__init__("OurlogsMessageProcessor")

0 comments on commit 686bd4d

Please sign in to comment.