From b9fb36f8e77fe9535f32ed76f547aee6b7f4df77 Mon Sep 17 00:00:00 2001 From: Lyn Date: Wed, 4 Oct 2023 20:21:11 -0700 Subject: [PATCH] meh --- .../src/processing/strategies/commit_log.rs | 104 ++++++++++++++++++ rust_snuba/src/strategies/commit_log.rs | 2 +- 2 files changed, 105 insertions(+), 1 deletion(-) create mode 100644 rust_snuba/rust_arroyo/src/processing/strategies/commit_log.rs diff --git a/rust_snuba/rust_arroyo/src/processing/strategies/commit_log.rs b/rust_snuba/rust_arroyo/src/processing/strategies/commit_log.rs new file mode 100644 index 0000000000..ba71aad72a --- /dev/null +++ b/rust_snuba/rust_arroyo/src/processing/strategies/commit_log.rs @@ -0,0 +1,104 @@ +use crate::backends::kafka::types::KafkaPayload; +use serde::{Deserialize, Serialize}; +use std::str; +use thiserror::Error; + +#[derive(Debug)] +struct Commit { + topic: String, + partition: u16, + consumer_group: String, + orig_message_ts: f64, + offset: u64, +} + +#[derive(Debug, Deserialize, Serialize)] +struct Payload { + offset: u64, + orig_message_ts: f64, +} + +#[derive(Error, Debug)] +enum CommitLogError { + #[error("json error")] + JsonError(#[from] serde_json::Error), + #[error("invalid message key")] + InvalidKey, + #[error("invalid message payload")] + InvalidPayload, +} + +impl TryFrom for Commit { + type Error = CommitLogError; + + fn try_from(payload: KafkaPayload) -> Result { + let key = payload.key.unwrap(); + + let data: Vec<&str> = str::from_utf8(&key).unwrap().split(':').collect(); + if data.len() != 3 { + return Err(CommitLogError::InvalidKey); + } + + let topic = data[0].to_string(); + let partition = data[1].parse::().unwrap(); + let consumer_group = data[2].to_string(); + + let d: Payload = + serde_json::from_slice(&payload.payload.ok_or(CommitLogError::InvalidPayload)?)?; + + Ok(Commit { + topic, + partition, + consumer_group, + orig_message_ts: d.orig_message_ts, + offset: d.offset, + }) + } +} + +impl TryFrom for KafkaPayload { + type Error = CommitLogError; + + fn try_from(commit: Commit) -> Result { + let key = Some( + format!( + "{}:{}:{}", + commit.topic, commit.partition, commit.consumer_group + ) + .into_bytes(), + ); + + let payload = Some(serde_json::to_vec(&Payload { + offset: commit.offset, + orig_message_ts: commit.orig_message_ts, + })?); + + Ok(KafkaPayload { + key, + headers: None, + payload, + }) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn commit() { + let payload = KafkaPayload { + key: Some(b"topic:0:group1".to_vec()), + headers: None, + payload: Some(b"{\"offset\":5,\"orig_message_ts\":1696381946.0}".to_vec()), + }; + + let payload_clone = payload.clone(); + + let commit: Commit = payload.try_into().unwrap(); + assert_eq!(commit.partition, 0); + let transformed: KafkaPayload = commit.try_into().unwrap(); + assert_eq!(transformed.key, payload_clone.key); + assert_eq!(transformed.payload, payload_clone.payload); + } +} diff --git a/rust_snuba/src/strategies/commit_log.rs b/rust_snuba/src/strategies/commit_log.rs index c5116290df..33f6669c0a 100644 --- a/rust_snuba/src/strategies/commit_log.rs +++ b/rust_snuba/src/strategies/commit_log.rs @@ -90,7 +90,7 @@ mod tests { let payload = KafkaPayload { key: Some(b"topic:0:group1".to_vec()), headers: None, - payload: Some(b"{\"offset\": 5,\"orig_message_ts\": 1696381946.0}".to_vec()), + payload: Some(b"{\"offset\":5,\"orig_message_ts\":1696381946.0}".to_vec()), }; let payload_clone = payload.clone();