From 98e478ffb32c196da669235907e63e2fbeb9179d Mon Sep 17 00:00:00 2001 From: Lionel Pereira <81594013+lionelpereira@users.noreply.github.com> Date: Thu, 9 May 2024 10:10:54 -0400 Subject: [PATCH] CCOL-2440: Add consumer configuration to save associated keys prior to saving primary record (#217) * CCOL-2440: Add support for joined record batch consumption * Update specs & use duplicate key update instead of ignore * CR fixes * Break out save_associations_first method --- CHANGELOG.md | 1 + docs/CONFIGURATION.md | 1 + .../batch_consumption.rb | 6 +- .../active_record_consume/mass_updater.rb | 63 +++++++- lib/deimos/active_record_consumer.rb | 5 + lib/deimos/config/configuration.rb | 8 +- .../mass_updater_spec.rb | 137 ++++++++++++++++++ spec/config/configuration_spec.rb | 12 +- 8 files changed, 223 insertions(+), 10 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 7acec1f7..c65fc34f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## UNRELEASED - Feature: Enable `producers.persistent_connections` phobos setting +- Feature: Add consumer configuration, `save_associations_first` to save associated records of primary class prior to upserting primary records. Foreign key of associated records are assigned to the record class prior to saving the record class # 1.24.2 - 2024-05-01 - Fix: Deprecation notice with Rails 7. diff --git a/docs/CONFIGURATION.md b/docs/CONFIGURATION.md index 115c5463..fb7f6e63 100644 --- a/docs/CONFIGURATION.md +++ b/docs/CONFIGURATION.md @@ -102,6 +102,7 @@ heartbeat_interval|10|Interval between heartbeats; must be less than the session backoff|`(1000..60_000)`|Range representing the minimum and maximum number of milliseconds to back off after a consumer error. replace_associations|nil| Whether to delete existing associations for records during bulk consumption for this consumer. If no value is specified the provided/default value from the `consumers` configuration will be used. bulk_import_id_generator|nil| Block to determine the `bulk_import_id` generated during bulk consumption. If no block is specified the provided/default block from the `consumers` configuration will be used. +save_associations_first|false|Whether to save associated records of primary class prior to upserting primary records. Foreign key of associated records are assigned to the record class prior to saving the record class ## Defining Database Pollers diff --git a/lib/deimos/active_record_consume/batch_consumption.rb b/lib/deimos/active_record_consume/batch_consumption.rb index 007e2a3a..1916a9a1 100644 --- a/lib/deimos/active_record_consume/batch_consumption.rb +++ b/lib/deimos/active_record_consume/batch_consumption.rb @@ -83,7 +83,7 @@ def record_key(key) def deleted_query(records) keys = records. map { |m| record_key(m.key)[@klass.primary_key] }. - reject(&:nil?) + compact @klass.unscoped.where(@klass.primary_key => keys) end @@ -168,7 +168,9 @@ def upsert_records(messages) key_col_proc: key_col_proc, col_proc: col_proc, replace_associations: self.class.replace_associations, - bulk_import_id_generator: self.class.bulk_import_id_generator) + bulk_import_id_generator: self.class.bulk_import_id_generator, + save_associations_first: self.class.save_associations_first, + bulk_import_id_column: self.class.bulk_import_id_column) ActiveSupport::Notifications.instrument('batch_consumption.valid_records', { records: updater.mass_update(record_list), consumer: self.class diff --git a/lib/deimos/active_record_consume/mass_updater.rb b/lib/deimos/active_record_consume/mass_updater.rb index 3ba95b10..cc21bc12 100644 --- a/lib/deimos/active_record_consume/mass_updater.rb +++ b/lib/deimos/active_record_consume/mass_updater.rb @@ -20,10 +20,13 @@ def default_cols(klass) # @param col_proc [Proc] # @param replace_associations [Boolean] def initialize(klass, key_col_proc: nil, col_proc: nil, - replace_associations: true, bulk_import_id_generator: nil) + replace_associations: true, bulk_import_id_generator: nil, save_associations_first: false, + bulk_import_id_column: nil) @klass = klass @replace_associations = replace_associations @bulk_import_id_generator = bulk_import_id_generator + @save_associations_first = save_associations_first + @bulk_import_id_column = bulk_import_id_column&.to_s @key_cols = {} @key_col_proc = key_col_proc @@ -84,15 +87,67 @@ def import_associations(record_list) end end + # Assign associated records to corresponding primary records + # @param record_list [BatchRecordList] RecordList of primary records for this consumer + # @return [Hash] + def assign_associations(record_list) + associations_info = {} + record_list.associations.each do |assoc| + col = @bulk_import_id_column if assoc.klass.column_names.include?(@bulk_import_id_column) + associations_info[[assoc, col]] = [] + end + record_list.batch_records.each do |primary_batch_record| + associations_info.each_key do |assoc, col| + batch_record = BatchRecord.new(klass: assoc.klass, + attributes: primary_batch_record.associations[assoc.name], + bulk_import_column: col, + bulk_import_id_generator: @bulk_import_id_generator) + # Associate this associated batch record's record with the primary record to + # retrieve foreign_key after associated records have been saved and primary + # keys have been filled + primary_batch_record.record.assign_attributes({ assoc.name => batch_record.record }) + associations_info[[assoc, col]] << batch_record + end + end + associations_info + end + + # Save associated records and fill foreign keys on RecordList records + # @param record_list [BatchRecordList] RecordList of primary records for this consumer + # @param associations_info [Hash] Contains association info + def save_associations_first(record_list, associations_info) + associations_info.each_value do |records| + assoc_record_list = BatchRecordList.new(records) + Deimos::Utils::DeadlockRetry.wrap(Deimos.config.tracer.active_span.get_tag('topic')) do + save_records_to_database(assoc_record_list) + end + import_associations(assoc_record_list) + end + record_list.records.each do |record| + associations_info.each_key do |assoc, _| + record.assign_attributes({ assoc.foreign_key => record.send(assoc.name).id }) + end + end + end + # @param record_list [BatchRecordList] # @return [Array] def mass_update(record_list) # The entire batch should be treated as one transaction so that if # any message fails, the whole thing is rolled back or retried # if there is deadlock - Deimos::Utils::DeadlockRetry.wrap(Deimos.config.tracer.active_span.get_tag('topic')) do - save_records_to_database(record_list) - import_associations(record_list) if record_list.associations.any? + + if @save_associations_first + associations_info = assign_associations(record_list) + save_associations_first(record_list, associations_info) + Deimos::Utils::DeadlockRetry.wrap(Deimos.config.tracer.active_span.get_tag('topic')) do + save_records_to_database(record_list) + end + else + Deimos::Utils::DeadlockRetry.wrap(Deimos.config.tracer.active_span.get_tag('topic')) do + save_records_to_database(record_list) + import_associations(record_list) if record_list.associations.any? + end end record_list.records end diff --git a/lib/deimos/active_record_consumer.rb b/lib/deimos/active_record_consumer.rb index 8fd7bc61..b45e2d99 100644 --- a/lib/deimos/active_record_consumer.rb +++ b/lib/deimos/active_record_consumer.rb @@ -45,6 +45,11 @@ def replace_associations config[:replace_associations] end + # @return [Boolean] + def save_associations_first + config[:save_associations_first] + end + # @param val [Boolean] Turn pre-compaction of the batch on or off. If true, # only the last message for each unique key in a batch is processed. # @return [void] diff --git a/lib/deimos/config/configuration.rb b/lib/deimos/config/configuration.rb index 1f1fad0e..9a0abaf2 100644 --- a/lib/deimos/config/configuration.rb +++ b/lib/deimos/config/configuration.rb @@ -97,7 +97,8 @@ def self.configure_producer_or_consumer(kafka_config) kafka_config.replace_associations end, bulk_import_id_generator: kafka_config.bulk_import_id_generator || - Deimos.config.consumers.bulk_import_id_generator + Deimos.config.consumers.bulk_import_id_generator, + save_associations_first: kafka_config.save_associations_first ) end end @@ -476,6 +477,11 @@ def self.configure_producer_or_consumer(kafka_config) # @return [Block] setting :bulk_import_id_generator, nil + # If enabled save associated records prior to saving the main record class + # This will also set foreign keys for associated records + # @return [Boolean] + setting :save_associations_first, false + # These are the phobos "listener" configs. See CONFIGURATION.md for more # info. setting :group_id diff --git a/spec/active_record_consume/mass_updater_spec.rb b/spec/active_record_consume/mass_updater_spec.rb index 7fcb961c..de04841f 100644 --- a/spec/active_record_consume/mass_updater_spec.rb +++ b/spec/active_record_consume/mass_updater_spec.rb @@ -114,5 +114,142 @@ end + context 'with save_associations_first' do + before(:all) do + ActiveRecord::Base.connection.create_table(:fidgets, force: true) do |t| + t.string(:test_id) + t.integer(:some_int) + t.string(:bulk_import_id) + t.timestamps + end + + ActiveRecord::Base.connection.create_table(:fidget_details, force: true) do |t| + t.string(:title) + t.string(:bulk_import_id) + t.belongs_to(:fidget) + + t.index(%i(title), unique: true) + end + + ActiveRecord::Base.connection.create_table(:widget_fidgets, force: true, id: false) do |t| + t.belongs_to(:fidget) + t.belongs_to(:widget) + t.string(:bulk_import_id) + t.string(:note) + t.index(%i(widget_id fidget_id), unique: true) + end + end + + after(:all) do + ActiveRecord::Base.connection.drop_table(:fidgets) + ActiveRecord::Base.connection.drop_table(:fidget_details) + ActiveRecord::Base.connection.drop_table(:widget_fidgets) + end + + let(:fidget_detail_class) do + Class.new(ActiveRecord::Base) do + self.table_name = 'fidget_details' + belongs_to :fidget + end + end + + let(:fidget_class) do + Class.new(ActiveRecord::Base) do + self.table_name = 'fidgets' + has_one :fidget_detail + end + end + + let(:widget_fidget_class) do + Class.new(ActiveRecord::Base) do + self.table_name = 'widget_fidgets' + belongs_to :fidget + belongs_to :widget + end + end + + let(:bulk_id_generator) { proc { SecureRandom.uuid } } + + let(:key_proc) do + lambda do |klass| + case klass.to_s + when 'Widget', 'Fidget' + %w(id) + when 'WidgetFidget' + %w(widget_id fidget_id) + when 'FidgetDetail', 'Detail' + %w(title) + else + raise "Key Columns for #{klass} not defined" + end + + end + end + + before(:each) do + stub_const('Fidget', fidget_class) + stub_const('FidgetDetail', fidget_detail_class) + stub_const('WidgetFidget', widget_fidget_class) + Widget.reset_column_information + Fidget.reset_column_information + WidgetFidget.reset_column_information + end + + # rubocop:disable RSpec/MultipleExpectations, RSpec/ExampleLength + it 'should backfill the associations when upserting primary records' do + batch = Deimos::ActiveRecordConsume::BatchRecordList.new( + [ + Deimos::ActiveRecordConsume::BatchRecord.new( + klass: WidgetFidget, + attributes: { + widget: { test_id: 'id1', some_int: 10, detail: { title: 'Widget Title 1' } }, + fidget: { test_id: 'id1', some_int: 10, fidget_detail: { title: 'Fidget Title 1' } }, + note: 'Stuff 1' + }, + bulk_import_column: 'bulk_import_id', + bulk_import_id_generator: bulk_id_generator + ), + Deimos::ActiveRecordConsume::BatchRecord.new( + klass: WidgetFidget, + attributes: { + widget: { test_id: 'id2', some_int: 20, detail: { title: 'Widget Title 2' } }, + fidget: { test_id: 'id2', some_int: 20, fidget_detail: { title: 'Fidget Title 2' } }, + note: 'Stuff 2' + }, + bulk_import_column: 'bulk_import_id', + bulk_import_id_generator: bulk_id_generator + ) + ] + ) + + results = described_class.new(WidgetFidget, + bulk_import_id_generator: bulk_id_generator, + bulk_import_id_column: 'bulk_import_id', + key_col_proc: key_proc, + save_associations_first: true).mass_update(batch) + expect(results.count).to eq(2) + expect(Widget.count).to eq(2) + expect(Detail.count).to eq(2) + expect(Fidget.count).to eq(2) + expect(FidgetDetail.count).to eq(2) + + WidgetFidget.all.each_with_index do |widget_fidget, ind| + widget = Widget.find_by(id: widget_fidget.widget_id) + expect(widget.test_id).to eq("id#{ind + 1}") + expect(widget.some_int).to eq((ind + 1) * 10) + detail = Detail.find_by(widget_id: widget_fidget.widget_id) + expect(detail.title).to eq("Widget Title #{ind + 1}") + fidget = Fidget.find_by(id: widget_fidget.fidget_id) + expect(fidget.test_id).to eq("id#{ind + 1}") + expect(fidget.some_int).to eq((ind + 1) * 10) + fidget_detail = FidgetDetail.find_by(fidget_id: widget_fidget.fidget_id) + expect(fidget_detail.title).to eq("Fidget Title #{ind + 1}") + expect(widget_fidget.note).to eq("Stuff #{ind + 1}") + end + end + # rubocop:enable RSpec/MultipleExpectations, RSpec/ExampleLength + + end + end end diff --git a/spec/config/configuration_spec.rb b/spec/config/configuration_spec.rb index 981961ef..0eab6522 100644 --- a/spec/config/configuration_spec.rb +++ b/spec/config/configuration_spec.rb @@ -92,7 +92,8 @@ def consume handler: 'ConsumerTest::MyConsumer', use_schema_classes: nil, max_db_batch_size: nil, - bulk_import_id_generator: nil + bulk_import_id_generator: nil, + save_associations_first: false }, { topic: 'my_batch_consume_topic', group_id: 'my_batch_group_id', @@ -111,7 +112,8 @@ def consume handler: 'ConsumerTest::MyBatchConsumer', use_schema_classes: nil, max_db_batch_size: nil, - bulk_import_id_generator: nil + bulk_import_id_generator: nil, + save_associations_first: false } ], producer: { @@ -265,7 +267,8 @@ def consume handler: 'MyConfigConsumer', use_schema_classes: false, max_db_batch_size: nil, - bulk_import_id_generator: nil + bulk_import_id_generator: nil, + save_associations_first: false } ], producer: { @@ -297,6 +300,7 @@ def consume group_id 'myconsumerid' bulk_import_id_generator(-> { 'consumer' }) replace_associations false + save_associations_first true end consumer do @@ -314,10 +318,12 @@ def consume custom = MyConfigConsumer.config expect(custom[:replace_associations]).to eq(false) expect(custom[:bulk_import_id_generator].call).to eq('consumer') + expect(custom[:save_associations_first]).to eq(true) default = MyConfigConsumer2.config expect(default[:replace_associations]).to eq(true) expect(default[:bulk_import_id_generator].call).to eq('global') + expect(default[:save_associations_first]).to eq(false) end end