From f9f6e0fa2e482aa704bc6a8f50a6156026befaec Mon Sep 17 00:00:00 2001 From: Lyn Nagara Date: Wed, 22 Nov 2023 13:39:25 -0800 Subject: [PATCH] feat(rust): Populate commit_log_offsets so they can be produced (#5086) Previously BytesInsertBatch.commit_log_offsets was empty so no offsets would be produced. --- rust_snuba/src/factory.rs | 6 ++++-- rust_snuba/src/strategies/python.rs | 9 +++++++-- rust_snuba/src/types.rs | 1 + 3 files changed, 12 insertions(+), 4 deletions(-) diff --git a/rust_snuba/src/factory.rs b/rust_snuba/src/factory.rs index bb262e96efe..2d9d8b1320f 100644 --- a/rust_snuba/src/factory.rs +++ b/rust_snuba/src/factory.rs @@ -80,8 +80,10 @@ impl TaskRunner for MessageProcessor { payload: BytesInsertBatch::new( broker_message.timestamp, transformed, - // TODO: Actually implement this? - BTreeMap::new(), + BTreeMap::from([( + broker_message.partition.index, + (broker_message.offset, broker_message.timestamp), + )]), ), partition: broker_message.partition, offset: broker_message.offset, diff --git a/rust_snuba/src/strategies/python.rs b/rust_snuba/src/strategies/python.rs index 367ae11e780..c65208e92c3 100644 --- a/rust_snuba/src/strategies/python.rs +++ b/rust_snuba/src/strategies/python.rs @@ -135,8 +135,13 @@ impl PythonTransformStep { let replacement = BytesInsertBatch::new( original_message_meta.timestamp, rows, - // TODO: Actually implement this - BTreeMap::new(), + BTreeMap::from([( + original_message_meta.partition.index, + ( + original_message_meta.offset, + original_message_meta.timestamp, + ), + )]), ); let new_message = Message::new_broker_message( diff --git a/rust_snuba/src/types.rs b/rust_snuba/src/types.rs index 43d51665576..b94f72b2c4d 100644 --- a/rust_snuba/src/types.rs +++ b/rust_snuba/src/types.rs @@ -33,6 +33,7 @@ impl BytesInsertBatch { pub fn merge(mut self, other: Self) -> Self { self.rows.encoded_rows.extend(other.rows.encoded_rows); + self.commit_log_offsets.extend(other.commit_log_offsets); self.rows.num_rows += other.rows.num_rows; self.sum_message_timestamp_secs += other.sum_message_timestamp_secs; self.max_message_timestamp_secs = max(