From 5919772dc7b1f1396778f9548395ed466991585b Mon Sep 17 00:00:00 2001 From: Lionel Pereira Date: Thu, 30 Nov 2023 14:44:10 -0500 Subject: [PATCH 1/4] CCOL-2039: Preprocess message before batch consumption --- lib/deimos/active_record_consume/batch_consumption.rb | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/lib/deimos/active_record_consume/batch_consumption.rb b/lib/deimos/active_record_consume/batch_consumption.rb index dee466f8..242530ed 100644 --- a/lib/deimos/active_record_consume/batch_consumption.rb +++ b/lib/deimos/active_record_consume/batch_consumption.rb @@ -169,9 +169,17 @@ def upsert_records(messages) updater.mass_update(record_list) end + # Process messages prior to saving to datbase + # @param _messages [Array] + # @return [Void] + def pre_process(_messages) + nil + end + # @param messages [Array] # @return [BatchRecordList] def build_records(messages) + pre_process(messages) records = messages.map do |m| attrs = if self.method(:record_attributes).parameters.size == 2 record_attributes(m.payload, m.key) From 75c365e80cdd5448bbb8e29d44206c9208da099a Mon Sep 17 00:00:00 2001 From: Lionel Pereira Date: Thu, 30 Nov 2023 15:10:20 -0500 Subject: [PATCH 2/4] CCOL-2039: Add specs --- spec/active_record_batch_consumer_spec.rb | 41 +++++++++++++++++++++++ 1 file changed, 41 insertions(+) diff --git a/spec/active_record_batch_consumer_spec.rb b/spec/active_record_batch_consumer_spec.rb index d1558dd2..3591ece8 100644 --- a/spec/active_record_batch_consumer_spec.rb +++ b/spec/active_record_batch_consumer_spec.rb @@ -493,5 +493,46 @@ def record_attributes(payload, key) end end + describe 'pre processing' do + context 'with uncompacted messages' do + let(:consumer_class) do + Class.new(described_class) do + schema 'MySchema' + namespace 'com.my-namespace' + key_config plain: true + record_class Widget + compacted false + + def pre_process(messages) + messages.each do |message| + message.payload[:some_int] = -message.payload[:some_int] + end + end + + end + end + + it 'should process successful and failed records' do + Widget.create!(id: 1, test_id: 'abc', some_int: 1) + Widget.create!(id: 2, test_id: 'def', some_int: 2) + + publish_batch( + [ + { key: 1, + payload: { test_id: 'abc', some_int: 11 } }, + { key: 2, + payload: { test_id: 'def', some_int: 20 } } + ] + ) + + widget_one, widget_two = Widget.all.to_a + + expect(widget_one.some_int).to eq(-11) + expect(widget_two.some_int).to eq(-20) + end + end + + end + end end From 3be511f791f555436b09088881319ea3c5bc3083 Mon Sep 17 00:00:00 2001 From: Lionel Pereira Date: Fri, 1 Dec 2023 10:55:41 -0500 Subject: [PATCH 3/4] CCOL-2039: Rename spec --- spec/active_record_batch_consumer_spec.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/spec/active_record_batch_consumer_spec.rb b/spec/active_record_batch_consumer_spec.rb index 3591ece8..ad281306 100644 --- a/spec/active_record_batch_consumer_spec.rb +++ b/spec/active_record_batch_consumer_spec.rb @@ -512,7 +512,7 @@ def pre_process(messages) end end - it 'should process successful and failed records' do + it 'should pre-process records' do Widget.create!(id: 1, test_id: 'abc', some_int: 1) Widget.create!(id: 2, test_id: 'def', some_int: 2) From abcb592ab05d00b318b6e5be91655f77bc232f31 Mon Sep 17 00:00:00 2001 From: Lionel Pereira Date: Fri, 1 Dec 2023 14:47:57 -0500 Subject: [PATCH 4/4] CCOL-2039: Add CHANGELOG entry & fix typo --- CHANGELOG.md | 1 + lib/deimos/active_record_consume/batch_consumption.rb | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 60c31c65..7c08c053 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## UNRELEASED - Fix: Fixed handler metric for status:received, status:success in batch consumption +- Feature: Allow pre processing of messages prior to bulk consumption # 1.22.5 - 2023-07-18 - Fix: Fixed buffer overflow crash with DB producer. diff --git a/lib/deimos/active_record_consume/batch_consumption.rb b/lib/deimos/active_record_consume/batch_consumption.rb index 242530ed..060597f9 100644 --- a/lib/deimos/active_record_consume/batch_consumption.rb +++ b/lib/deimos/active_record_consume/batch_consumption.rb @@ -169,7 +169,7 @@ def upsert_records(messages) updater.mass_update(record_list) end - # Process messages prior to saving to datbase + # Process messages prior to saving to database # @param _messages [Array] # @return [Void] def pre_process(_messages)