diff --git a/rust_snuba/src/processors/functions.rs b/rust_snuba/src/processors/functions.rs index 612786805f..68e136ead7 100644 --- a/rust_snuba/src/processors/functions.rs +++ b/rust_snuba/src/processors/functions.rs @@ -14,7 +14,6 @@ pub fn process_message( ) -> anyhow::Result { let payload_bytes = payload.payload().context("Expected payload")?; let msg: InputMessage = serde_json::from_slice(payload_bytes)?; - let functions = msg.functions.iter().map(|from| { Function { profile_id: msg.profile_id, @@ -40,8 +39,8 @@ pub fn process_message( }); Ok(InsertBatch { - origin_timestamp: DateTime::from_timestamp(msg.received, 0), rows: RowData::from_rows(functions)?, + origin_timestamp: DateTime::from_timestamp(msg.received, 0), sentry_received_timestamp: None, }) } diff --git a/rust_snuba/src/processors/profiles.rs b/rust_snuba/src/processors/profiles.rs index c8aba57f80..415d1e249a 100644 --- a/rust_snuba/src/processors/profiles.rs +++ b/rust_snuba/src/processors/profiles.rs @@ -5,26 +5,24 @@ use rust_arroyo::backends::kafka::types::KafkaPayload; use serde::{Deserialize, Serialize}; use uuid::Uuid; -use crate::processors::utils::enforce_retention; use crate::types::{InsertBatch, KafkaMessageMetadata, RowData}; pub fn process_message( payload: KafkaPayload, metadata: KafkaMessageMetadata, - config: &ProcessorConfig, + _config: &ProcessorConfig, ) -> anyhow::Result { let payload_bytes = payload.payload().context("Expected payload")?; let mut msg: ProfileMessage = serde_json::from_slice(payload_bytes)?; - msg.retention_days = Some(enforce_retention(msg.retention_days, &config.env_config)); msg.offset = metadata.offset; msg.partition = metadata.partition; let origin_timestamp = DateTime::from_timestamp(msg.received, 0); Ok(InsertBatch { - origin_timestamp, rows: RowData::from_rows([msg])?, + origin_timestamp, sentry_received_timestamp: None, }) } @@ -52,7 +50,7 @@ struct ProfileMessage { profile_id: Uuid, project_id: u64, received: i64, - retention_days: Option, + retention_days: u32, trace_id: Uuid, transaction_id: Uuid, transaction_name: String, diff --git a/rust_snuba/src/processors/spans.rs b/rust_snuba/src/processors/spans.rs index c54e8c780b..39a94c95ac 100644 --- a/rust_snuba/src/processors/spans.rs +++ b/rust_snuba/src/processors/spans.rs @@ -1,7 +1,6 @@ use std::collections::BTreeMap; use anyhow::Context; -use chrono::DateTime; use rust_arroyo::backends::kafka::types::KafkaPayload; use serde::{Deserialize, Serialize}; use serde_json::Value; @@ -9,7 +8,7 @@ use uuid::Uuid; use crate::config::ProcessorConfig; use crate::processors::utils::{enforce_retention, hex_to_u64}; -use crate::types::{InsertBatch, KafkaMessageMetadata, RowData}; +use crate::types::{InsertBatch, KafkaMessageMetadata}; pub fn process_message( payload: KafkaPayload, @@ -19,18 +18,13 @@ pub fn process_message( let payload_bytes = payload.payload().context("Expected payload")?; let msg: FromSpanMessage = serde_json::from_slice(payload_bytes)?; - let origin_timestamp = DateTime::from_timestamp(msg.received as i64, 0); let mut span: Span = msg.try_into()?; - span.retention_days = Some(enforce_retention(span.retention_days, &config.env_config)); + span.offset = metadata.offset; span.partition = metadata.partition; - Ok(InsertBatch { - origin_timestamp, - rows: RowData::from_rows([span])?, - sentry_received_timestamp: None, - }) + InsertBatch::from_rows([span]) } #[derive(Debug, Default, Deserialize)] @@ -49,7 +43,6 @@ struct FromSpanMessage { parent_span_id: u64, profile_id: Option, project_id: u64, - received: f64, retention_days: Option, #[serde(default, deserialize_with = "hex_to_u64")] segment_id: u64, @@ -418,7 +411,6 @@ mod tests { parent_span_id: Option, profile_id: Option, project_id: Option, - received: Option, retention_days: Option, segment_id: Option, sentry_tags: TestSentryTags, @@ -439,8 +431,15 @@ mod tests { profile_id: Some(Uuid::new_v4()), project_id: Some(1), retention_days: Some(90), - received: Some(1691105878.720), segment_id: Some("deadbeefdeadbeef".into()), + span_id: Some("deadbeefdeadbeef".into()), + start_timestamp_ms: Some(1691105878720), + trace_id: Some(Uuid::new_v4()), + tags: Some(BTreeMap::from([ + ("tag1".into(), "value1".into()), + ("tag2".into(), "123".into()), + ("tag3".into(), "true".into()), + ])), sentry_tags: TestSentryTags { action: Some("GET".into()), domain: Some("targetdomain.tld:targetport".into()), @@ -455,14 +454,6 @@ mod tests { transaction_method: Some("GET".into()), transaction_op: Some("navigation".into()), }, - span_id: Some("deadbeefdeadbeef".into()), - start_timestamp_ms: Some(1691105878720), - tags: Some(BTreeMap::from([ - ("tag1".into(), "value1".into()), - ("tag2".into(), "123".into()), - ("tag3".into(), "true".into()), - ])), - trace_id: Some(Uuid::new_v4()), } }