From 30e3f12679a6bc63f243c98b77d912606f453c15 Mon Sep 17 00:00:00 2001 From: Harsha Subraveti Date: Thu, 24 Aug 2023 10:10:44 -0400 Subject: [PATCH] CCOL-2132: Sends ActiveRecords back to calling method for post_process. Allows override for bulk_import_id column --- .../batch_consumption.rb | 46 +++++++++++++------ .../active_record_consume/batch_record.rb | 6 ++- .../active_record_consume/mass_updater.rb | 14 +++++- 3 files changed, 50 insertions(+), 16 deletions(-) diff --git a/lib/deimos/active_record_consume/batch_consumption.rb b/lib/deimos/active_record_consume/batch_consumption.rb index dee466f8..73b2989e 100644 --- a/lib/deimos/active_record_consume/batch_consumption.rb +++ b/lib/deimos/active_record_consume/batch_consumption.rb @@ -22,7 +22,7 @@ module BatchConsumption # they are split # @param payloads [Array] Decoded payloads # @param metadata [Hash] Information about batch, including keys. - # @return [void] + # @return [Array] def consume_batch(payloads, metadata) messages = payloads. zip(metadata[:keys]). @@ -31,21 +31,30 @@ def consume_batch(payloads, metadata) tags = %W(topic:#{metadata[:topic]}) Deimos.instrument('ar_consumer.consume_batch', tags) do + upserts = [] # The entire batch should be treated as one transaction so that if # any message fails, the whole thing is rolled back or retried # if there is deadlock Deimos::Utils::DeadlockRetry.wrap(tags) do - if @compacted || self.class.config[:no_keys] - update_database(compact_messages(messages)) - else - uncompacted_update(messages) - end + upserts = if @compacted || self.class.config[:no_keys] + update_database(compact_messages(messages)) + else + uncompacted_update(messages) + end end + post_process(upserts) end end protected + # Takes the ActiveRecord objects created during batch consumption and use them after the transaction is complete. + # @param [Array] + # @return [void] + def post_process(_records) + nil + end + # Get the set of attribute names that uniquely identify messages in the # batch. Requires at least one record. # The parameters are mutually exclusive. records is used by default implementation. @@ -98,6 +107,13 @@ def should_consume?(_record) true end + # Method to generate Unique ID. By default, it uses GUID v4 however there are other efficient IDs + # like ULID. The implementing consumer can decide on the format of this unique ID based on their needs. + # @return [String] + def generate_unique_id + nil + end + private # Compact a batch of messages, taking only the last message for each @@ -125,19 +141,20 @@ def uncompacted_update(messages) # All messages with payloads are passed to upsert_records. # All tombstones messages are passed to remove_records. # @param messages [Array] List of messages. - # @return [void] + # @return [Array] def update_database(messages) # Find all upserted records (i.e. that have a payload) and all # deleted record (no payload) removed, upserted = messages.partition(&:tombstone?) max_db_batch_size = self.class.config[:max_db_batch_size] + upserts = [] if upserted.any? - if max_db_batch_size - upserted.each_slice(max_db_batch_size) { |group| upsert_records(group) } - else - upsert_records(upserted) - end + upserts = if max_db_batch_size + upserted.each_slice(max_db_batch_size) { |group| upsert_records(group) } + else + upsert_records(upserted) + end end return if removed.empty? @@ -147,12 +164,13 @@ def update_database(messages) else remove_records(removed) end + upserts end # Upsert any non-deleted records # @param messages [Array] List of messages for a group of # records to either be updated or inserted. - # @return [void] + # @return [Array] def upsert_records(messages) record_list = build_records(messages) record_list.filter!(self.method(:should_consume?).to_proc) @@ -161,10 +179,12 @@ def upsert_records(messages) key_col_proc = self.method(:key_columns).to_proc col_proc = self.method(:columns).to_proc + unique_id_proc = self.method(:generate_unique_id).to_proc updater = MassUpdater.new(@klass, key_col_proc: key_col_proc, col_proc: col_proc, + unique_id_proc: unique_id_proc, replace_associations: self.class.config[:replace_associations]) updater.mass_update(record_list) end diff --git a/lib/deimos/active_record_consume/batch_record.rb b/lib/deimos/active_record_consume/batch_record.rb index 2e376d90..c5d8a5bf 100644 --- a/lib/deimos/active_record_consume/batch_record.rb +++ b/lib/deimos/active_record_consume/batch_record.rb @@ -26,7 +26,11 @@ def initialize(klass:, attributes:, bulk_import_column: nil) @klass = klass if bulk_import_column self.bulk_import_column = bulk_import_column - self.bulk_import_id = ULID.generate + self.bulk_import_id = if klass.method_defined?(:generate_unique_id) + klass.generate_unique_id + else + SecureRandom.uuid + end attributes[bulk_import_column] = bulk_import_id end attributes = attributes.with_indifferent_access diff --git a/lib/deimos/active_record_consume/mass_updater.rb b/lib/deimos/active_record_consume/mass_updater.rb index a7eb6e41..69f647da 100644 --- a/lib/deimos/active_record_consume/mass_updater.rb +++ b/lib/deimos/active_record_consume/mass_updater.rb @@ -18,8 +18,9 @@ def default_cols(klass) # @param klass [Class < ActiveRecord::Base] # @param key_col_proc [Proc] # @param col_proc [Proc] + # @param unique_id_proc [Proc] # @param replace_associations [Boolean] - def initialize(klass, key_col_proc: nil, col_proc: nil, replace_associations: true) + def initialize(klass, key_col_proc: nil, col_proc: nil, unique_id_proc: nil, replace_associations: true) @klass = klass @replace_associations = replace_associations @@ -28,6 +29,8 @@ def initialize(klass, key_col_proc: nil, col_proc: nil, replace_associations: tr @columns = {} @col_proc = col_proc + + @unique_id_proc = unique_id_proc end # @param klass [Class < ActiveRecord::Base] @@ -40,6 +43,11 @@ def key_cols(klass) @key_cols[klass] ||= @key_col_proc&.call(klass) || self.default_keys(klass) end + # @return [String] + def generate_unique_id + @unique_id_proc&.call || SecureRandom.uuid + end + # @param record_list [BatchRecordList] def save_records_to_database(record_list) columns = self.columns(record_list.klass) @@ -69,7 +77,7 @@ def save_records_to_database(record_list) def import_associations(record_list) record_list.fill_primary_keys! - import_id = @replace_associations ? ULID.generate : nil + import_id = @replace_associations ? generate_unique_id : nil record_list.associations.each do |assoc| sub_records = record_list.map { |r| r.sub_records(assoc.name, import_id) }.flatten next unless sub_records.any? @@ -82,9 +90,11 @@ def import_associations(record_list) end # @param record_list [BatchRecordList] + # @return [Array] def mass_update(record_list) save_records_to_database(record_list) import_associations(record_list) if record_list.associations.any? + record_list.records end end