Skip to content

Commit

Permalink
CCOL-2132: Sends ActiveRecords back to calling method for post_proces…
Browse files Browse the repository at this point in the history
…s. Allows override for bulk_import_id column
  • Loading branch information
harsha-flipp committed Aug 24, 2023
1 parent 037f08f commit 30e3f12
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 16 deletions.
46 changes: 33 additions & 13 deletions lib/deimos/active_record_consume/batch_consumption.rb
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ module BatchConsumption
# they are split
# @param payloads [Array<Hash,Deimos::SchemaClass::Record>] Decoded payloads
# @param metadata [Hash] Information about batch, including keys.
# @return [void]
# @return [Array<ActiveRecord::Base>]
def consume_batch(payloads, metadata)
messages = payloads.
zip(metadata[:keys]).
Expand All @@ -31,21 +31,30 @@ def consume_batch(payloads, metadata)
tags = %W(topic:#{metadata[:topic]})

Deimos.instrument('ar_consumer.consume_batch', tags) do
upserts = []
# The entire batch should be treated as one transaction so that if
# any message fails, the whole thing is rolled back or retried
# if there is deadlock
Deimos::Utils::DeadlockRetry.wrap(tags) do
if @compacted || self.class.config[:no_keys]
update_database(compact_messages(messages))
else
uncompacted_update(messages)
end
upserts = if @compacted || self.class.config[:no_keys]
update_database(compact_messages(messages))
else
uncompacted_update(messages)
end
end
post_process(upserts)
end
end

protected

# Takes the ActiveRecord objects created during batch consumption and use them after the transaction is complete.
# @param [Array<ActiveRecord::Base>]
# @return [void]
def post_process(_records)
nil
end

# Get the set of attribute names that uniquely identify messages in the
# batch. Requires at least one record.
# The parameters are mutually exclusive. records is used by default implementation.
Expand Down Expand Up @@ -98,6 +107,13 @@ def should_consume?(_record)
true
end

# Method to generate Unique ID. By default, it uses GUID v4 however there are other efficient IDs
# like ULID. The implementing consumer can decide on the format of this unique ID based on their needs.
# @return [String]
def generate_unique_id
nil
end

private

# Compact a batch of messages, taking only the last message for each
Expand Down Expand Up @@ -125,19 +141,20 @@ def uncompacted_update(messages)
# All messages with payloads are passed to upsert_records.
# All tombstones messages are passed to remove_records.
# @param messages [Array<Message>] List of messages.
# @return [void]
# @return [Array<ActiveRecord::Base>]
def update_database(messages)
# Find all upserted records (i.e. that have a payload) and all
# deleted record (no payload)
removed, upserted = messages.partition(&:tombstone?)

max_db_batch_size = self.class.config[:max_db_batch_size]
upserts = []
if upserted.any?
if max_db_batch_size
upserted.each_slice(max_db_batch_size) { |group| upsert_records(group) }
else
upsert_records(upserted)
end
upserts = if max_db_batch_size
upserted.each_slice(max_db_batch_size) { |group| upsert_records(group) }
else
upsert_records(upserted)
end
end

return if removed.empty?
Expand All @@ -147,12 +164,13 @@ def update_database(messages)
else
remove_records(removed)
end
upserts
end

# Upsert any non-deleted records
# @param messages [Array<Message>] List of messages for a group of
# records to either be updated or inserted.
# @return [void]
# @return [Array<ActiveRelation>]
def upsert_records(messages)
record_list = build_records(messages)
record_list.filter!(self.method(:should_consume?).to_proc)
Expand All @@ -161,10 +179,12 @@ def upsert_records(messages)

key_col_proc = self.method(:key_columns).to_proc
col_proc = self.method(:columns).to_proc
unique_id_proc = self.method(:generate_unique_id).to_proc

updater = MassUpdater.new(@klass,
key_col_proc: key_col_proc,
col_proc: col_proc,
unique_id_proc: unique_id_proc,
replace_associations: self.class.config[:replace_associations])
updater.mass_update(record_list)
end
Expand Down
6 changes: 5 additions & 1 deletion lib/deimos/active_record_consume/batch_record.rb
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,11 @@ def initialize(klass:, attributes:, bulk_import_column: nil)
@klass = klass
if bulk_import_column
self.bulk_import_column = bulk_import_column
self.bulk_import_id = ULID.generate
self.bulk_import_id = if klass.method_defined?(:generate_unique_id)
klass.generate_unique_id
else
SecureRandom.uuid
end
attributes[bulk_import_column] = bulk_import_id
end
attributes = attributes.with_indifferent_access
Expand Down
14 changes: 12 additions & 2 deletions lib/deimos/active_record_consume/mass_updater.rb
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,9 @@ def default_cols(klass)
# @param klass [Class < ActiveRecord::Base]
# @param key_col_proc [Proc<Class < ActiveRecord::Base>]
# @param col_proc [Proc<Class < ActiveRecord::Base>]
# @param unique_id_proc [Proc<Class < ActiveRecord::Base>]
# @param replace_associations [Boolean]
def initialize(klass, key_col_proc: nil, col_proc: nil, replace_associations: true)
def initialize(klass, key_col_proc: nil, col_proc: nil, unique_id_proc: nil, replace_associations: true)
@klass = klass
@replace_associations = replace_associations

Expand All @@ -28,6 +29,8 @@ def initialize(klass, key_col_proc: nil, col_proc: nil, replace_associations: tr

@columns = {}
@col_proc = col_proc

@unique_id_proc = unique_id_proc
end

# @param klass [Class < ActiveRecord::Base]
Expand All @@ -40,6 +43,11 @@ def key_cols(klass)
@key_cols[klass] ||= @key_col_proc&.call(klass) || self.default_keys(klass)
end

# @return [String]
def generate_unique_id
@unique_id_proc&.call || SecureRandom.uuid
end

# @param record_list [BatchRecordList]
def save_records_to_database(record_list)
columns = self.columns(record_list.klass)
Expand Down Expand Up @@ -69,7 +77,7 @@ def save_records_to_database(record_list)
def import_associations(record_list)
record_list.fill_primary_keys!

import_id = @replace_associations ? ULID.generate : nil
import_id = @replace_associations ? generate_unique_id : nil
record_list.associations.each do |assoc|
sub_records = record_list.map { |r| r.sub_records(assoc.name, import_id) }.flatten
next unless sub_records.any?
Expand All @@ -82,9 +90,11 @@ def import_associations(record_list)
end

# @param record_list [BatchRecordList]
# @return [Array<ActiveRecord::Base>]
def mass_update(record_list)
save_records_to_database(record_list)
import_associations(record_list) if record_list.associations.any?
record_list.records
end

end
Expand Down

0 comments on commit 30e3f12

Please sign in to comment.