Skip to content

Commit

Permalink
CCOL-2440: Add consumer configuration to save associated keys prior t…
Browse files Browse the repository at this point in the history
…o saving primary record (#217)

* CCOL-2440: Add support for joined record batch consumption

* Update specs & use duplicate key update instead of ignore

* CR fixes

* Break out save_associations_first method
  • Loading branch information
lionelpereira authored May 9, 2024
1 parent c329138 commit 98e478f
Show file tree
Hide file tree
Showing 8 changed files with 223 additions and 10 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## UNRELEASED
- Feature: Enable `producers.persistent_connections` phobos setting
- Feature: Add consumer configuration, `save_associations_first` to save associated records of primary class prior to upserting primary records. Foreign key of associated records are assigned to the record class prior to saving the record class

# 1.24.2 - 2024-05-01
- Fix: Deprecation notice with Rails 7.
Expand Down
1 change: 1 addition & 0 deletions docs/CONFIGURATION.md
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ heartbeat_interval|10|Interval between heartbeats; must be less than the session
backoff|`(1000..60_000)`|Range representing the minimum and maximum number of milliseconds to back off after a consumer error.
replace_associations|nil| Whether to delete existing associations for records during bulk consumption for this consumer. If no value is specified the provided/default value from the `consumers` configuration will be used.
bulk_import_id_generator|nil| Block to determine the `bulk_import_id` generated during bulk consumption. If no block is specified the provided/default block from the `consumers` configuration will be used.
save_associations_first|false|Whether to save associated records of primary class prior to upserting primary records. Foreign key of associated records are assigned to the record class prior to saving the record class

## Defining Database Pollers

Expand Down
6 changes: 4 additions & 2 deletions lib/deimos/active_record_consume/batch_consumption.rb
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ def record_key(key)
def deleted_query(records)
keys = records.
map { |m| record_key(m.key)[@klass.primary_key] }.
reject(&:nil?)
compact

@klass.unscoped.where(@klass.primary_key => keys)
end
Expand Down Expand Up @@ -168,7 +168,9 @@ def upsert_records(messages)
key_col_proc: key_col_proc,
col_proc: col_proc,
replace_associations: self.class.replace_associations,
bulk_import_id_generator: self.class.bulk_import_id_generator)
bulk_import_id_generator: self.class.bulk_import_id_generator,
save_associations_first: self.class.save_associations_first,
bulk_import_id_column: self.class.bulk_import_id_column)
ActiveSupport::Notifications.instrument('batch_consumption.valid_records', {
records: updater.mass_update(record_list),
consumer: self.class
Expand Down
63 changes: 59 additions & 4 deletions lib/deimos/active_record_consume/mass_updater.rb
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,13 @@ def default_cols(klass)
# @param col_proc [Proc<Class < ActiveRecord::Base>]
# @param replace_associations [Boolean]
def initialize(klass, key_col_proc: nil, col_proc: nil,
replace_associations: true, bulk_import_id_generator: nil)
replace_associations: true, bulk_import_id_generator: nil, save_associations_first: false,
bulk_import_id_column: nil)
@klass = klass
@replace_associations = replace_associations
@bulk_import_id_generator = bulk_import_id_generator
@save_associations_first = save_associations_first
@bulk_import_id_column = bulk_import_id_column&.to_s

@key_cols = {}
@key_col_proc = key_col_proc
Expand Down Expand Up @@ -84,15 +87,67 @@ def import_associations(record_list)
end
end

# Assign associated records to corresponding primary records
# @param record_list [BatchRecordList] RecordList of primary records for this consumer
# @return [Hash]
def assign_associations(record_list)
associations_info = {}
record_list.associations.each do |assoc|
col = @bulk_import_id_column if assoc.klass.column_names.include?(@bulk_import_id_column)
associations_info[[assoc, col]] = []
end
record_list.batch_records.each do |primary_batch_record|
associations_info.each_key do |assoc, col|
batch_record = BatchRecord.new(klass: assoc.klass,
attributes: primary_batch_record.associations[assoc.name],
bulk_import_column: col,
bulk_import_id_generator: @bulk_import_id_generator)
# Associate this associated batch record's record with the primary record to
# retrieve foreign_key after associated records have been saved and primary
# keys have been filled
primary_batch_record.record.assign_attributes({ assoc.name => batch_record.record })
associations_info[[assoc, col]] << batch_record
end
end
associations_info
end

# Save associated records and fill foreign keys on RecordList records
# @param record_list [BatchRecordList] RecordList of primary records for this consumer
# @param associations_info [Hash] Contains association info
def save_associations_first(record_list, associations_info)
associations_info.each_value do |records|
assoc_record_list = BatchRecordList.new(records)
Deimos::Utils::DeadlockRetry.wrap(Deimos.config.tracer.active_span.get_tag('topic')) do
save_records_to_database(assoc_record_list)
end
import_associations(assoc_record_list)
end
record_list.records.each do |record|
associations_info.each_key do |assoc, _|
record.assign_attributes({ assoc.foreign_key => record.send(assoc.name).id })
end
end
end

# @param record_list [BatchRecordList]
# @return [Array<ActiveRecord::Base>]
def mass_update(record_list)
# The entire batch should be treated as one transaction so that if
# any message fails, the whole thing is rolled back or retried
# if there is deadlock
Deimos::Utils::DeadlockRetry.wrap(Deimos.config.tracer.active_span.get_tag('topic')) do
save_records_to_database(record_list)
import_associations(record_list) if record_list.associations.any?

if @save_associations_first
associations_info = assign_associations(record_list)
save_associations_first(record_list, associations_info)
Deimos::Utils::DeadlockRetry.wrap(Deimos.config.tracer.active_span.get_tag('topic')) do
save_records_to_database(record_list)
end
else
Deimos::Utils::DeadlockRetry.wrap(Deimos.config.tracer.active_span.get_tag('topic')) do
save_records_to_database(record_list)
import_associations(record_list) if record_list.associations.any?
end
end
record_list.records
end
Expand Down
5 changes: 5 additions & 0 deletions lib/deimos/active_record_consumer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,11 @@ def replace_associations
config[:replace_associations]
end

# @return [Boolean]
def save_associations_first
config[:save_associations_first]
end

# @param val [Boolean] Turn pre-compaction of the batch on or off. If true,
# only the last message for each unique key in a batch is processed.
# @return [void]
Expand Down
8 changes: 7 additions & 1 deletion lib/deimos/config/configuration.rb
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,8 @@ def self.configure_producer_or_consumer(kafka_config)
kafka_config.replace_associations
end,
bulk_import_id_generator: kafka_config.bulk_import_id_generator ||
Deimos.config.consumers.bulk_import_id_generator
Deimos.config.consumers.bulk_import_id_generator,
save_associations_first: kafka_config.save_associations_first
)
end
end
Expand Down Expand Up @@ -476,6 +477,11 @@ def self.configure_producer_or_consumer(kafka_config)
# @return [Block]
setting :bulk_import_id_generator, nil

# If enabled save associated records prior to saving the main record class
# This will also set foreign keys for associated records
# @return [Boolean]
setting :save_associations_first, false

# These are the phobos "listener" configs. See CONFIGURATION.md for more
# info.
setting :group_id
Expand Down
137 changes: 137 additions & 0 deletions spec/active_record_consume/mass_updater_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -114,5 +114,142 @@

end

context 'with save_associations_first' do
before(:all) do
ActiveRecord::Base.connection.create_table(:fidgets, force: true) do |t|
t.string(:test_id)
t.integer(:some_int)
t.string(:bulk_import_id)
t.timestamps
end

ActiveRecord::Base.connection.create_table(:fidget_details, force: true) do |t|
t.string(:title)
t.string(:bulk_import_id)
t.belongs_to(:fidget)

t.index(%i(title), unique: true)
end

ActiveRecord::Base.connection.create_table(:widget_fidgets, force: true, id: false) do |t|
t.belongs_to(:fidget)
t.belongs_to(:widget)
t.string(:bulk_import_id)
t.string(:note)
t.index(%i(widget_id fidget_id), unique: true)
end
end

after(:all) do
ActiveRecord::Base.connection.drop_table(:fidgets)
ActiveRecord::Base.connection.drop_table(:fidget_details)
ActiveRecord::Base.connection.drop_table(:widget_fidgets)
end

let(:fidget_detail_class) do
Class.new(ActiveRecord::Base) do
self.table_name = 'fidget_details'
belongs_to :fidget
end
end

let(:fidget_class) do
Class.new(ActiveRecord::Base) do
self.table_name = 'fidgets'
has_one :fidget_detail
end
end

let(:widget_fidget_class) do
Class.new(ActiveRecord::Base) do
self.table_name = 'widget_fidgets'
belongs_to :fidget
belongs_to :widget
end
end

let(:bulk_id_generator) { proc { SecureRandom.uuid } }

let(:key_proc) do
lambda do |klass|
case klass.to_s
when 'Widget', 'Fidget'
%w(id)
when 'WidgetFidget'
%w(widget_id fidget_id)
when 'FidgetDetail', 'Detail'
%w(title)
else
raise "Key Columns for #{klass} not defined"
end

end
end

before(:each) do
stub_const('Fidget', fidget_class)
stub_const('FidgetDetail', fidget_detail_class)
stub_const('WidgetFidget', widget_fidget_class)
Widget.reset_column_information
Fidget.reset_column_information
WidgetFidget.reset_column_information
end

# rubocop:disable RSpec/MultipleExpectations, RSpec/ExampleLength
it 'should backfill the associations when upserting primary records' do
batch = Deimos::ActiveRecordConsume::BatchRecordList.new(
[
Deimos::ActiveRecordConsume::BatchRecord.new(
klass: WidgetFidget,
attributes: {
widget: { test_id: 'id1', some_int: 10, detail: { title: 'Widget Title 1' } },
fidget: { test_id: 'id1', some_int: 10, fidget_detail: { title: 'Fidget Title 1' } },
note: 'Stuff 1'
},
bulk_import_column: 'bulk_import_id',
bulk_import_id_generator: bulk_id_generator
),
Deimos::ActiveRecordConsume::BatchRecord.new(
klass: WidgetFidget,
attributes: {
widget: { test_id: 'id2', some_int: 20, detail: { title: 'Widget Title 2' } },
fidget: { test_id: 'id2', some_int: 20, fidget_detail: { title: 'Fidget Title 2' } },
note: 'Stuff 2'
},
bulk_import_column: 'bulk_import_id',
bulk_import_id_generator: bulk_id_generator
)
]
)

results = described_class.new(WidgetFidget,
bulk_import_id_generator: bulk_id_generator,
bulk_import_id_column: 'bulk_import_id',
key_col_proc: key_proc,
save_associations_first: true).mass_update(batch)
expect(results.count).to eq(2)
expect(Widget.count).to eq(2)
expect(Detail.count).to eq(2)
expect(Fidget.count).to eq(2)
expect(FidgetDetail.count).to eq(2)

WidgetFidget.all.each_with_index do |widget_fidget, ind|
widget = Widget.find_by(id: widget_fidget.widget_id)
expect(widget.test_id).to eq("id#{ind + 1}")
expect(widget.some_int).to eq((ind + 1) * 10)
detail = Detail.find_by(widget_id: widget_fidget.widget_id)
expect(detail.title).to eq("Widget Title #{ind + 1}")
fidget = Fidget.find_by(id: widget_fidget.fidget_id)
expect(fidget.test_id).to eq("id#{ind + 1}")
expect(fidget.some_int).to eq((ind + 1) * 10)
fidget_detail = FidgetDetail.find_by(fidget_id: widget_fidget.fidget_id)
expect(fidget_detail.title).to eq("Fidget Title #{ind + 1}")
expect(widget_fidget.note).to eq("Stuff #{ind + 1}")
end
end
# rubocop:enable RSpec/MultipleExpectations, RSpec/ExampleLength

end

end
end
12 changes: 9 additions & 3 deletions spec/config/configuration_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,8 @@ def consume
handler: 'ConsumerTest::MyConsumer',
use_schema_classes: nil,
max_db_batch_size: nil,
bulk_import_id_generator: nil
bulk_import_id_generator: nil,
save_associations_first: false
}, {
topic: 'my_batch_consume_topic',
group_id: 'my_batch_group_id',
Expand All @@ -111,7 +112,8 @@ def consume
handler: 'ConsumerTest::MyBatchConsumer',
use_schema_classes: nil,
max_db_batch_size: nil,
bulk_import_id_generator: nil
bulk_import_id_generator: nil,
save_associations_first: false
}
],
producer: {
Expand Down Expand Up @@ -265,7 +267,8 @@ def consume
handler: 'MyConfigConsumer',
use_schema_classes: false,
max_db_batch_size: nil,
bulk_import_id_generator: nil
bulk_import_id_generator: nil,
save_associations_first: false
}
],
producer: {
Expand Down Expand Up @@ -297,6 +300,7 @@ def consume
group_id 'myconsumerid'
bulk_import_id_generator(-> { 'consumer' })
replace_associations false
save_associations_first true
end

consumer do
Expand All @@ -314,10 +318,12 @@ def consume
custom = MyConfigConsumer.config
expect(custom[:replace_associations]).to eq(false)
expect(custom[:bulk_import_id_generator].call).to eq('consumer')
expect(custom[:save_associations_first]).to eq(true)

default = MyConfigConsumer2.config
expect(default[:replace_associations]).to eq(true)
expect(default[:bulk_import_id_generator].call).to eq('global')
expect(default[:save_associations_first]).to eq(false)

end
end

0 comments on commit 98e478f

Please sign in to comment.