Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CCOL-2039: Support for post processing and custom batch generation #203

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 60 additions & 20 deletions lib/deimos/active_record_consume/batch_consumption.rb
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,16 @@ def consume_batch(payloads, metadata)
# The entire batch should be treated as one transaction so that if
# any message fails, the whole thing is rolled back or retried
# if there is deadlock
valid_upserts = []
invalid_upserts = []
Deimos::Utils::DeadlockRetry.wrap(tags) do
if @compacted || self.class.config[:no_keys]
update_database(compact_messages(messages))
else
uncompacted_update(messages)
end
valid_upserts, invalid_upserts = if @compacted || self.class.config[:no_keys]
update_database(compact_messages(messages))
else
uncompacted_update(messages)
end
end
post_process(valid_upserts, invalid_upserts)
end
end

Expand Down Expand Up @@ -132,21 +135,30 @@ def update_database(messages)
removed, upserted = messages.partition(&:tombstone?)

max_db_batch_size = self.class.config[:max_db_batch_size]
valid_upserts = []
invalid_upserts = []
if upserted.any?
valid_upserts, invalid_upserts = if max_db_batch_size
upserted.each_slice(max_db_batch_size) do |group|
valid, invalid = upsert_records(group)
valid_upserts.push(valid)
invalid_upserts.push(invalid)
end
valid_upserts.compact!
invalid_upserts.compact!
else
upsert_records(upserted)
end
end

unless removed.empty?
if max_db_batch_size
upserted.each_slice(max_db_batch_size) { |group| upsert_records(group) }
removed.each_slice(max_db_batch_size) { |group| remove_records(group) }
else
upsert_records(upserted)
remove_records(removed)
end
end

return if removed.empty?

if max_db_batch_size
removed.each_slice(max_db_batch_size) { |group| remove_records(group) }
else
remove_records(removed)
end
[valid_upserts, invalid_upserts]
end

# Upsert any non-deleted records
Expand All @@ -155,7 +167,7 @@ def update_database(messages)
# @return [void]
def upsert_records(messages)
record_list = build_records(messages)
record_list.filter!(self.method(:should_consume?).to_proc)
invalid = filter_records(record_list)

return if record_list.empty?

Expand All @@ -165,19 +177,38 @@ def upsert_records(messages)
updater = MassUpdater.new(@klass,
key_col_proc: key_col_proc,
col_proc: col_proc,
replace_associations: self.class.config[:replace_associations])
updater.mass_update(record_list)
replace_associations: self.class.replace_associations,
batch_id_generator: self.class.bulk_import_id_generator)
[updater.mass_update(record_list), invalid]
end

# @param record_list [BatchRecordList]
# @return [BatchRecordList]
def filter_records(record_list)
record_list.reject!(self.method(:should_consume?).to_proc)
end

# Returns a lookup entity to be used during record attributes
# @param _messages [Array<Deimos::Message>]
# @return [Hash, Set, ActiveRecord::Relation]
def record_lookup(_messages)
{}
end

# @param messages [Array<Deimos::Message>]
# @return [BatchRecordList]
def build_records(messages)
lookup = record_lookup(messages)
records = messages.map do |m|
attrs = if self.method(:record_attributes).parameters.size == 2
attrs = case self.method(:record_attributes).parameters.size
when 3
record_attributes(m.payload, m.key, lookup)
when 2
record_attributes(m.payload, m.key)
else
record_attributes(m.payload)
end

next nil if attrs.nil?

attrs = attrs.merge(record_key(m.key))
Expand All @@ -189,7 +220,8 @@ def build_records(messages)

BatchRecord.new(klass: @klass,
attributes: attrs,
bulk_import_column: col)
bulk_import_column: col,
bulk_id_generator: self.class.bulk_import_id_generator)
end
BatchRecordList.new(records.compact)
end
Expand All @@ -203,6 +235,14 @@ def remove_records(messages)

clause.delete_all
end

# Additional processing after records have been successfully upserted
# @param _valid_records [Array<ActiveRecord>] Records to be post processed
# @param _invalid_records [Array<BatchRecord>] Invalid records to be processed
# @return [void]
def post_process(_valid_records, _invalid_records)
nil
end
end
end
end
7 changes: 4 additions & 3 deletions lib/deimos/active_record_consume/batch_record.rb
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,17 @@ class BatchRecord
# @return [String] The column name to use for bulk IDs - defaults to `bulk_import_id`.
attr_accessor :bulk_import_column

delegate :valid?, to: :record
delegate :valid?, :errors, :send, :attributes, to: :record

# @param klass [Class < ActiveRecord::Base]
# @param attributes [Hash] the full attribute list, including associations.
# @param bulk_import_column [String]
def initialize(klass:, attributes:, bulk_import_column: nil)
# @param bulk_id_generator [Proc]
def initialize(klass:, attributes:, bulk_import_column: nil, bulk_id_generator: nil)
@klass = klass
if bulk_import_column
self.bulk_import_column = bulk_import_column
self.bulk_import_id = SecureRandom.uuid
self.bulk_import_id = bulk_id_generator&.call
attributes[bulk_import_column] = bulk_import_id
end
attributes = attributes.with_indifferent_access
Expand Down
17 changes: 17 additions & 0 deletions lib/deimos/active_record_consume/batch_record_list.rb
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,23 @@ def filter!(method)
self.batch_records.delete_if { |record| !method.call(record.record) }
end

# @param method [Proc]
# @param block [Block]
def reject!(method = nil, &block)
if method.nil?
self.batch_records.reject!(&block)
else
case method.parameters.size
when 2
self.batch_records.reject! do |record|
!method.call(record.record, record.associations)
end
else
self.batch_records.reject! { |record| !method.call(record.record)}
end
end
end

# Get the original ActiveRecord objects.
# @return [Array<ActiveRecord::Base>]
def records
Expand Down
6 changes: 4 additions & 2 deletions lib/deimos/active_record_consume/mass_updater.rb
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,10 @@ def default_cols(klass)
# @param key_col_proc [Proc<Class < ActiveRecord::Base>]
# @param col_proc [Proc<Class < ActiveRecord::Base>]
# @param replace_associations [Boolean]
def initialize(klass, key_col_proc: nil, col_proc: nil, replace_associations: true)
def initialize(klass, key_col_proc: nil, col_proc: nil, replace_associations: true, batch_id_generator: nil)
@klass = klass
@replace_associations = replace_associations
@batch_id_generator = batch_id_generator || proc { SecureRandom.uuid }

@key_cols = {}
@key_col_proc = key_col_proc
Expand Down Expand Up @@ -69,7 +70,7 @@ def save_records_to_database(record_list)
def import_associations(record_list)
record_list.fill_primary_keys!

import_id = @replace_associations ? SecureRandom.uuid : nil
import_id = @replace_associations ? @batch_id_generator.call : nil
record_list.associations.each do |assoc|
sub_records = record_list.map { |r| r.sub_records(assoc.name, import_id) }.flatten
next unless sub_records.any?
Expand All @@ -85,6 +86,7 @@ def import_associations(record_list)
def mass_update(record_list)
save_records_to_database(record_list)
import_associations(record_list) if record_list.associations.any?
record_list.records
end

end
Expand Down
13 changes: 12 additions & 1 deletion lib/deimos/active_record_consumer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,16 @@ def bulk_import_id_column
config[:bulk_import_id_column]
end

# @return [Proc]
def bulk_import_id_generator
config[:bulk_import_id_generator]
end

# @return [Boolean]
def replace_associations
config[:replace_associations]
end

# @param val [Boolean] Turn pre-compaction of the batch on or off. If true,
# only the last message for each unique key in a batch is processed.
# @return [void]
Expand Down Expand Up @@ -65,8 +75,9 @@ def initialize
# attributes set to the new/existing record.
# @param payload [Hash,Deimos::SchemaClass::Record]
# @param _key [String]
# @param _lookup [Set | ActiveRecord::Relation | Hash]
# @return [Hash]
def record_attributes(payload, _key=nil)
def record_attributes(payload, _key=nil, _lookup=nil)
@converter.convert(payload)
end

Expand Down
23 changes: 21 additions & 2 deletions lib/deimos/config/configuration.rb
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,11 @@ def self.configure_producer_or_consumer(kafka_config)
if kafka_config.respond_to?(:bulk_import_id_column) # consumer
klass.config.merge!(
bulk_import_id_column: kafka_config.bulk_import_id_column,
replace_associations: kafka_config.replace_associations
replace_associations: kafka_config.replace_associations.nil? ?
Deimos.config.consumers.replace_associations :
kafka_config.replace_associations,
bulk_import_id_generator: kafka_config.bulk_import_id_generator ||
Deimos.config.consumers.bulk_import_id_generator
)
end
end
Expand Down Expand Up @@ -242,6 +246,15 @@ def self.configure_producer_or_consumer(kafka_config)
# Not needed if reraise_errors is set to true.
# @return [Block]
setting(:fatal_error, proc { false })

# The default function to generate a bulk ID for bulk consumers
# @return [Block]
setting :bulk_import_id_generator, proc { SecureRandom.uuid }

# If true, multi-table consumers will blow away associations rather than appending to them.
# Applies to all consumers unless specified otherwise
# @return [Boolean]
setting :replace_associations, true
end

setting :producers do
Expand Down Expand Up @@ -445,7 +458,13 @@ def self.configure_producer_or_consumer(kafka_config)
setting :bulk_import_id_column, :bulk_import_id
# If true, multi-table consumers will blow away associations rather than appending to them.
# @return [Boolean]
setting :replace_associations, true
setting :replace_associations, nil

# The default function to generate a bulk ID for this consumer
# Uses the consumers proc defined in the consumers config by default unless
# specified for individual consumers
# @return [void]
setting :bulk_import_id_generator, nil

# These are the phobos "listener" configs. See CONFIGURATION.md for more
# info.
Expand Down
4 changes: 4 additions & 0 deletions sig/defs.rbs
Original file line number Diff line number Diff line change
Expand Up @@ -1840,6 +1840,10 @@ module Deimos
#
# _@return_ — Compacted batch.
def compact_messages: (::Array[Message] batch) -> ::Array[Message]

private

def post_process: -> untyped
end

# Methods for consuming individual messages and saving them to the database
Expand Down
11 changes: 8 additions & 3 deletions spec/active_record_consume/mass_updater_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@
end
end

let(:bulk_id_generator) { proc { SecureRandom.uuid } }

before(:each) do
stub_const('Widget', widget_class)
stub_const('Detail', detail_class)
Expand All @@ -52,19 +54,22 @@
Deimos::ActiveRecordConsume::BatchRecord.new(
klass: Widget,
attributes: { test_id: 'id1', some_int: 5, detail: { title: 'Title 1' } },
bulk_import_column: 'bulk_import_id'
bulk_import_column: 'bulk_import_id',
bulk_id_generator: bulk_id_generator
),
Deimos::ActiveRecordConsume::BatchRecord.new(
klass: Widget,
attributes: { test_id: 'id2', some_int: 10, detail: { title: 'Title 2' } },
bulk_import_column: 'bulk_import_id'
bulk_import_column: 'bulk_import_id',
bulk_id_generator: bulk_id_generator
)
]
)
end

it 'should mass update the batch' do
described_class.new(Widget).mass_update(batch)
results = described_class.new(Widget).mass_update(batch)
expect(results.count).to eq(2)
expect(Widget.count).to eq(2)
expect(Detail.count).to eq(2)
expect(Widget.first.detail).not_to be_nil
Expand Down
9 changes: 6 additions & 3 deletions spec/config/configuration_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,8 @@ def consume
heartbeat_interval: 10,
handler: 'ConsumerTest::MyConsumer',
use_schema_classes: nil,
max_db_batch_size: nil
max_db_batch_size: nil,
bulk_import_id_generator: nil
}, {
topic: 'my_batch_consume_topic',
group_id: 'my_batch_group_id',
Expand All @@ -109,7 +110,8 @@ def consume
heartbeat_interval: 10,
handler: 'ConsumerTest::MyBatchConsumer',
use_schema_classes: nil,
max_db_batch_size: nil
max_db_batch_size: nil,
bulk_import_id_generator: nil
}
],
producer: {
Expand Down Expand Up @@ -261,7 +263,8 @@ def consume
heartbeat_interval: 13,
handler: 'MyConfigConsumer',
use_schema_classes: false,
max_db_batch_size: nil
max_db_batch_size: nil,
bulk_import_id_generator: nil
}
],
producer: {
Expand Down
Loading