From 3951a517e40369c1237e6e90e147bd4a75609c3f Mon Sep 17 00:00:00 2001 From: Lyn Nagara Date: Thu, 5 Oct 2023 11:08:35 -0700 Subject: [PATCH] feat: Port the commit log encoder/decoder to Rust (#4786) Ports https://github.com/getsentry/arroyo/blob/f43a9d69ab0e504869f88fcc39ede98bc0cc6807/arroyo/backends/kafka/commit.py to Rust. The codec is not used yet. --- rust_snuba/Cargo.lock | 1 + rust_snuba/Cargo.toml | 1 + .../src/backends/kafka/producer.rs | 2 +- .../src/processing/strategies/commit_log.rs | 104 ++++++++++++++++++ rust_snuba/src/strategies/commit_log.rs | 104 ++++++++++++++++++ rust_snuba/src/strategies/mod.rs | 1 + 6 files changed, 212 insertions(+), 1 deletion(-) create mode 100644 rust_snuba/rust_arroyo/src/processing/strategies/commit_log.rs create mode 100644 rust_snuba/src/strategies/commit_log.rs diff --git a/rust_snuba/Cargo.lock b/rust_snuba/Cargo.lock index ba4fb1edf7..efa90cf171 100644 --- a/rust_snuba/Cargo.lock +++ b/rust_snuba/Cargo.lock @@ -1500,6 +1500,7 @@ dependencies = [ "serde", "serde_json", "serde_yaml", + "thiserror", "tokio", "uuid 1.4.1", ] diff --git a/rust_snuba/Cargo.toml b/rust_snuba/Cargo.toml index 1ed0e187c5..173ea0cea7 100644 --- a/rust_snuba/Cargo.toml +++ b/rust_snuba/Cargo.toml @@ -38,6 +38,7 @@ reqwest = "0.11.11" uuid = "1.4.1" procspawn = { version = "0.10.2", features = ["json"] } pretty_env_logger = "0.5.0" +thiserror = "1.0" [features] ffi = ["pyo3/extension-module"] diff --git a/rust_snuba/rust_arroyo/src/backends/kafka/producer.rs b/rust_snuba/rust_arroyo/src/backends/kafka/producer.rs index aa1e0d6e59..a13a0e5586 100644 --- a/rust_snuba/rust_arroyo/src/backends/kafka/producer.rs +++ b/rust_snuba/rust_arroyo/src/backends/kafka/producer.rs @@ -70,7 +70,7 @@ mod tests { let configuration = KafkaConfig::new_producer_config(vec!["127.0.0.1:9092".to_string()], None); - let mut producer = KafkaProducer::new(configuration); + let producer = KafkaProducer::new(configuration); let payload = KafkaPayload { key: None, diff --git a/rust_snuba/rust_arroyo/src/processing/strategies/commit_log.rs b/rust_snuba/rust_arroyo/src/processing/strategies/commit_log.rs new file mode 100644 index 0000000000..ba71aad72a --- /dev/null +++ b/rust_snuba/rust_arroyo/src/processing/strategies/commit_log.rs @@ -0,0 +1,104 @@ +use crate::backends::kafka::types::KafkaPayload; +use serde::{Deserialize, Serialize}; +use std::str; +use thiserror::Error; + +#[derive(Debug)] +struct Commit { + topic: String, + partition: u16, + consumer_group: String, + orig_message_ts: f64, + offset: u64, +} + +#[derive(Debug, Deserialize, Serialize)] +struct Payload { + offset: u64, + orig_message_ts: f64, +} + +#[derive(Error, Debug)] +enum CommitLogError { + #[error("json error")] + JsonError(#[from] serde_json::Error), + #[error("invalid message key")] + InvalidKey, + #[error("invalid message payload")] + InvalidPayload, +} + +impl TryFrom for Commit { + type Error = CommitLogError; + + fn try_from(payload: KafkaPayload) -> Result { + let key = payload.key.unwrap(); + + let data: Vec<&str> = str::from_utf8(&key).unwrap().split(':').collect(); + if data.len() != 3 { + return Err(CommitLogError::InvalidKey); + } + + let topic = data[0].to_string(); + let partition = data[1].parse::().unwrap(); + let consumer_group = data[2].to_string(); + + let d: Payload = + serde_json::from_slice(&payload.payload.ok_or(CommitLogError::InvalidPayload)?)?; + + Ok(Commit { + topic, + partition, + consumer_group, + orig_message_ts: d.orig_message_ts, + offset: d.offset, + }) + } +} + +impl TryFrom for KafkaPayload { + type Error = CommitLogError; + + fn try_from(commit: Commit) -> Result { + let key = Some( + format!( + "{}:{}:{}", + commit.topic, commit.partition, commit.consumer_group + ) + .into_bytes(), + ); + + let payload = Some(serde_json::to_vec(&Payload { + offset: commit.offset, + orig_message_ts: commit.orig_message_ts, + })?); + + Ok(KafkaPayload { + key, + headers: None, + payload, + }) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn commit() { + let payload = KafkaPayload { + key: Some(b"topic:0:group1".to_vec()), + headers: None, + payload: Some(b"{\"offset\":5,\"orig_message_ts\":1696381946.0}".to_vec()), + }; + + let payload_clone = payload.clone(); + + let commit: Commit = payload.try_into().unwrap(); + assert_eq!(commit.partition, 0); + let transformed: KafkaPayload = commit.try_into().unwrap(); + assert_eq!(transformed.key, payload_clone.key); + assert_eq!(transformed.payload, payload_clone.payload); + } +} diff --git a/rust_snuba/src/strategies/commit_log.rs b/rust_snuba/src/strategies/commit_log.rs new file mode 100644 index 0000000000..33f6669c0a --- /dev/null +++ b/rust_snuba/src/strategies/commit_log.rs @@ -0,0 +1,104 @@ +use rust_arroyo::backends::kafka::types::KafkaPayload; +use serde::{Deserialize, Serialize}; +use std::str; +use thiserror::Error; + +#[derive(Debug)] +struct Commit { + topic: String, + partition: u16, + consumer_group: String, + orig_message_ts: f64, + offset: u64, +} + +#[derive(Debug, Deserialize, Serialize)] +struct Payload { + offset: u64, + orig_message_ts: f64, +} + +#[derive(Error, Debug)] +enum CommitLogError { + #[error("json error")] + JsonError(#[from] serde_json::Error), + #[error("invalid message key")] + InvalidKey, + #[error("invalid message payload")] + InvalidPayload, +} + +impl TryFrom for Commit { + type Error = CommitLogError; + + fn try_from(payload: KafkaPayload) -> Result { + let key = payload.key.unwrap(); + + let data: Vec<&str> = str::from_utf8(&key).unwrap().split(':').collect(); + if data.len() != 3 { + return Err(CommitLogError::InvalidKey); + } + + let topic = data[0].to_string(); + let partition = data[1].parse::().unwrap(); + let consumer_group = data[2].to_string(); + + let d: Payload = + serde_json::from_slice(&payload.payload.ok_or(CommitLogError::InvalidPayload)?)?; + + Ok(Commit { + topic, + partition, + consumer_group, + orig_message_ts: d.orig_message_ts, + offset: d.offset, + }) + } +} + +impl TryFrom for KafkaPayload { + type Error = CommitLogError; + + fn try_from(commit: Commit) -> Result { + let key = Some( + format!( + "{}:{}:{}", + commit.topic, commit.partition, commit.consumer_group + ) + .into_bytes(), + ); + + let payload = Some(serde_json::to_vec(&Payload { + offset: commit.offset, + orig_message_ts: commit.orig_message_ts, + })?); + + Ok(KafkaPayload { + key, + headers: None, + payload, + }) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn commit() { + let payload = KafkaPayload { + key: Some(b"topic:0:group1".to_vec()), + headers: None, + payload: Some(b"{\"offset\":5,\"orig_message_ts\":1696381946.0}".to_vec()), + }; + + let payload_clone = payload.clone(); + + let commit: Commit = payload.try_into().unwrap(); + assert_eq!(commit.partition, 0); + let transformed: KafkaPayload = commit.try_into().unwrap(); + assert_eq!(transformed.key, payload_clone.key); + assert_eq!(transformed.payload, payload_clone.payload); + } +} diff --git a/rust_snuba/src/strategies/mod.rs b/rust_snuba/src/strategies/mod.rs index 484effa361..d8a7e04017 100644 --- a/rust_snuba/src/strategies/mod.rs +++ b/rust_snuba/src/strategies/mod.rs @@ -1,2 +1,3 @@ pub mod clickhouse; +pub mod commit_log; pub mod python;