From 46da388dd2b0b4f9d43472cc61368038f7762516 Mon Sep 17 00:00:00 2001 From: Lionel Pereira Date: Wed, 4 Oct 2023 12:04:41 -0400 Subject: [PATCH 1/5] CCOL-2039: Initial features for item platform migration --- .../batch_consumption.rb | 49 +++++++++++++------ .../active_record_consume/mass_updater.rb | 1 + sig/defs.rbs | 4 ++ 3 files changed, 38 insertions(+), 16 deletions(-) diff --git a/lib/deimos/active_record_consume/batch_consumption.rb b/lib/deimos/active_record_consume/batch_consumption.rb index dee466f8..0fcf9c52 100644 --- a/lib/deimos/active_record_consume/batch_consumption.rb +++ b/lib/deimos/active_record_consume/batch_consumption.rb @@ -34,13 +34,15 @@ def consume_batch(payloads, metadata) # The entire batch should be treated as one transaction so that if # any message fails, the whole thing is rolled back or retried # if there is deadlock + valid_upserts = [] Deimos::Utils::DeadlockRetry.wrap(tags) do - if @compacted || self.class.config[:no_keys] - update_database(compact_messages(messages)) - else - uncompacted_update(messages) - end + valid_upserts = if @compacted || self.class.config[:no_keys] + update_database(compact_messages(messages)) + else + uncompacted_update(messages) + end end + post_process(valid_upserts) end end @@ -132,21 +134,23 @@ def update_database(messages) removed, upserted = messages.partition(&:tombstone?) max_db_batch_size = self.class.config[:max_db_batch_size] + valid_upserts = [] if upserted.any? + valid_upserts = if max_db_batch_size + upserted.each_slice(max_db_batch_size) { |group| upsert_records(group) } + else + upsert_records(upserted) + end + end + + unless removed.empty? if max_db_batch_size - upserted.each_slice(max_db_batch_size) { |group| upsert_records(group) } + removed.each_slice(max_db_batch_size) { |group| remove_records(group) } else - upsert_records(upserted) + remove_records(removed) end end - - return if removed.empty? - - if max_db_batch_size - removed.each_slice(max_db_batch_size) { |group| remove_records(group) } - else - remove_records(removed) - end + valid_upserts end # Upsert any non-deleted records @@ -155,7 +159,7 @@ def update_database(messages) # @return [void] def upsert_records(messages) record_list = build_records(messages) - record_list.filter!(self.method(:should_consume?).to_proc) + filter_records(record_list) return if record_list.empty? @@ -169,6 +173,12 @@ def upsert_records(messages) updater.mass_update(record_list) end + # @param record_list [BatchRecordList] + # @return [BatchRecordList] + def filter_records(record_list) + record_list.filter!(self.method(:should_consume?).to_proc) + end + # @param messages [Array] # @return [BatchRecordList] def build_records(messages) @@ -203,6 +213,13 @@ def remove_records(messages) clause.delete_all end + + # Additional processing after records have been successfully upserted + # @param _records [Array] Records to be post processed + # @return [void] + def post_process(_records) + nil + end end end end diff --git a/lib/deimos/active_record_consume/mass_updater.rb b/lib/deimos/active_record_consume/mass_updater.rb index 5694dbb7..ac61ab85 100644 --- a/lib/deimos/active_record_consume/mass_updater.rb +++ b/lib/deimos/active_record_consume/mass_updater.rb @@ -85,6 +85,7 @@ def import_associations(record_list) def mass_update(record_list) save_records_to_database(record_list) import_associations(record_list) if record_list.associations.any? + record_list.records end end diff --git a/sig/defs.rbs b/sig/defs.rbs index 443daac5..7d785ec2 100644 --- a/sig/defs.rbs +++ b/sig/defs.rbs @@ -1840,6 +1840,10 @@ module Deimos # # _@return_ — Compacted batch. def compact_messages: (::Array[Message] batch) -> ::Array[Message] + + private + + def post_process: -> untyped end # Methods for consuming individual messages and saving them to the database From 709576e8cdfad3996a15d78cff9b269c20d6f1d5 Mon Sep 17 00:00:00 2001 From: Lionel Pereira Date: Wed, 22 Nov 2023 13:19:37 -0500 Subject: [PATCH 2/5] CCOL-2039: Initial support for item platform monkey patches --- lib/deimos/active_record_consume/batch_consumption.rb | 9 +++++---- lib/deimos/active_record_consume/batch_record.rb | 4 +++- lib/deimos/active_record_consume/batch_record_list.rb | 6 ++++++ lib/deimos/active_record_consumer.rb | 3 ++- 4 files changed, 16 insertions(+), 6 deletions(-) diff --git a/lib/deimos/active_record_consume/batch_consumption.rb b/lib/deimos/active_record_consume/batch_consumption.rb index 0fcf9c52..c0940e1a 100644 --- a/lib/deimos/active_record_consume/batch_consumption.rb +++ b/lib/deimos/active_record_consume/batch_consumption.rb @@ -180,13 +180,14 @@ def filter_records(record_list) end # @param messages [Array] + # @param lookup [Set | ActiveRecord::Relation | Hash] # @return [BatchRecordList] - def build_records(messages) + def build_records(messages, lookup = nil) records = messages.map do |m| - attrs = if self.method(:record_attributes).parameters.size == 2 - record_attributes(m.payload, m.key) + attrs = if self.method(:record_attributes).parameters.size == 3 + record_attributes(m.payload, m.key, lookup) else - record_attributes(m.payload) + record_attributes(m.payload, lookup) end next nil if attrs.nil? diff --git a/lib/deimos/active_record_consume/batch_record.rb b/lib/deimos/active_record_consume/batch_record.rb index 19611a99..f376c0af 100644 --- a/lib/deimos/active_record_consume/batch_record.rb +++ b/lib/deimos/active_record_consume/batch_record.rb @@ -26,7 +26,9 @@ def initialize(klass:, attributes:, bulk_import_column: nil) @klass = klass if bulk_import_column self.bulk_import_column = bulk_import_column - self.bulk_import_id = SecureRandom.uuid + # TODO: Figure out how to use the setting + self.bulk_import_id = @klass.respond_to?(:generate_bulk_id) ? + @klass.generate_bulk_id : SecureRandom.uuid attributes[bulk_import_column] = bulk_import_id end attributes = attributes.with_indifferent_access diff --git a/lib/deimos/active_record_consume/batch_record_list.rb b/lib/deimos/active_record_consume/batch_record_list.rb index 30f99d23..74f1b3a3 100644 --- a/lib/deimos/active_record_consume/batch_record_list.rb +++ b/lib/deimos/active_record_consume/batch_record_list.rb @@ -23,6 +23,12 @@ def filter!(method) self.batch_records.delete_if { |record| !method.call(record.record) } end + # Partition batch records by the given block + # @param block [Proc] + def partition(&block) + self.batch_records.partition(&block) + end + # Get the original ActiveRecord objects. # @return [Array] def records diff --git a/lib/deimos/active_record_consumer.rb b/lib/deimos/active_record_consumer.rb index 56911ab1..8585a16c 100644 --- a/lib/deimos/active_record_consumer.rb +++ b/lib/deimos/active_record_consumer.rb @@ -65,8 +65,9 @@ def initialize # attributes set to the new/existing record. # @param payload [Hash,Deimos::SchemaClass::Record] # @param _key [String] + # @param _lookup [Set | ActiveRecord::Relation | Hash] # @return [Hash] - def record_attributes(payload, _key=nil) + def record_attributes(payload, _key=nil, _lookup=nil) @converter.convert(payload) end From 87dedacf6074119af1a194a51b95e5978ea37e05 Mon Sep 17 00:00:00 2001 From: Lionel Pereira Date: Thu, 23 Nov 2023 17:50:37 -0500 Subject: [PATCH 3/5] CCOL-2039: Pass invalid records to post_process --- .../batch_consumption.rb | 47 +++++++++++++------ .../active_record_consume/batch_record.rb | 9 ++-- .../batch_record_list.rb | 19 ++++++-- lib/deimos/active_record_consumer.rb | 10 ++++ lib/deimos/config/configuration.rb | 23 ++++++++- 5 files changed, 82 insertions(+), 26 deletions(-) diff --git a/lib/deimos/active_record_consume/batch_consumption.rb b/lib/deimos/active_record_consume/batch_consumption.rb index c0940e1a..8051a550 100644 --- a/lib/deimos/active_record_consume/batch_consumption.rb +++ b/lib/deimos/active_record_consume/batch_consumption.rb @@ -35,14 +35,15 @@ def consume_batch(payloads, metadata) # any message fails, the whole thing is rolled back or retried # if there is deadlock valid_upserts = [] + invalid_upserts = [] Deimos::Utils::DeadlockRetry.wrap(tags) do - valid_upserts = if @compacted || self.class.config[:no_keys] + valid_upserts, invalid_upserts = if @compacted || self.class.config[:no_keys] update_database(compact_messages(messages)) else uncompacted_update(messages) end end - post_process(valid_upserts) + post_process(valid_upserts, invalid_upserts) end end @@ -135,8 +136,10 @@ def update_database(messages) max_db_batch_size = self.class.config[:max_db_batch_size] valid_upserts = [] + invalid_upserts = [] if upserted.any? - valid_upserts = if max_db_batch_size + valid_upserts, invalid_upserts = if max_db_batch_size + # TODO: This makes a 2d array, need to compact upserted.each_slice(max_db_batch_size) { |group| upsert_records(group) } else upsert_records(upserted) @@ -150,7 +153,7 @@ def update_database(messages) remove_records(removed) end end - valid_upserts + [valid_upserts, invalid_upserts] end # Upsert any non-deleted records @@ -159,7 +162,7 @@ def update_database(messages) # @return [void] def upsert_records(messages) record_list = build_records(messages) - filter_records(record_list) + invalid = filter_records(record_list) return if record_list.empty? @@ -169,26 +172,37 @@ def upsert_records(messages) updater = MassUpdater.new(@klass, key_col_proc: key_col_proc, col_proc: col_proc, - replace_associations: self.class.config[:replace_associations]) - updater.mass_update(record_list) + replace_associations: self.class.replace_associations) + [updater.mass_update(record_list), invalid] end # @param record_list [BatchRecordList] # @return [BatchRecordList] def filter_records(record_list) - record_list.filter!(self.method(:should_consume?).to_proc) + record_list.reject!(self.method(:should_consume?).to_proc) + end + + # Returns a lookup entity to be used during record attributes + # @param _messages [Array] + # @return [Hash, Set, ActiveRecord::Relation] + def record_lookup(_messages) + {} end # @param messages [Array] - # @param lookup [Set | ActiveRecord::Relation | Hash] # @return [BatchRecordList] - def build_records(messages, lookup = nil) + def build_records(messages) + lookup = record_lookup(messages) records = messages.map do |m| - attrs = if self.method(:record_attributes).parameters.size == 3 + attrs = case self.method(:record_attributes).parameters.size + when 3 record_attributes(m.payload, m.key, lookup) + when 2 + record_attributes(m.payload, m.key) else - record_attributes(m.payload, lookup) + record_attributes(m.payload) end + next nil if attrs.nil? attrs = attrs.merge(record_key(m.key)) @@ -200,7 +214,9 @@ def build_records(messages, lookup = nil) BatchRecord.new(klass: @klass, attributes: attrs, - bulk_import_column: col) + bulk_import_column: col, + bulk_id_generator: self.class.bulk_import_id_generator + ) end BatchRecordList.new(records.compact) end @@ -216,9 +232,10 @@ def remove_records(messages) end # Additional processing after records have been successfully upserted - # @param _records [Array] Records to be post processed + # @param _valid_records [Array] Records to be post processed + # @param _invalid_records [Array] Invalid records to be processed # @return [void] - def post_process(_records) + def post_process(_valid_records, _invalid_records) nil end end diff --git a/lib/deimos/active_record_consume/batch_record.rb b/lib/deimos/active_record_consume/batch_record.rb index f376c0af..194ef0e3 100644 --- a/lib/deimos/active_record_consume/batch_record.rb +++ b/lib/deimos/active_record_consume/batch_record.rb @@ -17,18 +17,17 @@ class BatchRecord # @return [String] The column name to use for bulk IDs - defaults to `bulk_import_id`. attr_accessor :bulk_import_column - delegate :valid?, to: :record + delegate :valid?, :errors, :send, :attributes, to: :record # @param klass [Class < ActiveRecord::Base] # @param attributes [Hash] the full attribute list, including associations. # @param bulk_import_column [String] - def initialize(klass:, attributes:, bulk_import_column: nil) + # @param bulk_id_generator [Proc] + def initialize(klass:, attributes:, bulk_import_column: nil, bulk_id_generator: nil) @klass = klass if bulk_import_column self.bulk_import_column = bulk_import_column - # TODO: Figure out how to use the setting - self.bulk_import_id = @klass.respond_to?(:generate_bulk_id) ? - @klass.generate_bulk_id : SecureRandom.uuid + self.bulk_import_id = bulk_id_generator&.call attributes[bulk_import_column] = bulk_import_id end attributes = attributes.with_indifferent_access diff --git a/lib/deimos/active_record_consume/batch_record_list.rb b/lib/deimos/active_record_consume/batch_record_list.rb index 74f1b3a3..2735b60c 100644 --- a/lib/deimos/active_record_consume/batch_record_list.rb +++ b/lib/deimos/active_record_consume/batch_record_list.rb @@ -23,10 +23,21 @@ def filter!(method) self.batch_records.delete_if { |record| !method.call(record.record) } end - # Partition batch records by the given block - # @param block [Proc] - def partition(&block) - self.batch_records.partition(&block) + # @param method [Proc] + # @param block [Block] + def reject!(method = nil, &block) + if method.nil? + self.batch_records.reject!(&block) + else + case method.parameters.size + when 2 + self.batch_records.reject! do |record| + !method.call(record.record, record.associations) + end + else + self.batch_records.reject! { |record| !method.call(record.record)} + end + end end # Get the original ActiveRecord objects. diff --git a/lib/deimos/active_record_consumer.rb b/lib/deimos/active_record_consumer.rb index 8585a16c..c8433d6e 100644 --- a/lib/deimos/active_record_consumer.rb +++ b/lib/deimos/active_record_consumer.rb @@ -35,6 +35,16 @@ def bulk_import_id_column config[:bulk_import_id_column] end + # @return [Proc] + def bulk_import_id_generator + config[:bulk_import_id_generator] + end + + # @return [Boolean] + def replace_associations + config[:replace_associations] + end + # @param val [Boolean] Turn pre-compaction of the batch on or off. If true, # only the last message for each unique key in a batch is processed. # @return [void] diff --git a/lib/deimos/config/configuration.rb b/lib/deimos/config/configuration.rb index 7677d43c..7659c8d7 100644 --- a/lib/deimos/config/configuration.rb +++ b/lib/deimos/config/configuration.rb @@ -90,7 +90,11 @@ def self.configure_producer_or_consumer(kafka_config) if kafka_config.respond_to?(:bulk_import_id_column) # consumer klass.config.merge!( bulk_import_id_column: kafka_config.bulk_import_id_column, - replace_associations: kafka_config.replace_associations + replace_associations: kafka_config.replace_associations.nil? ? + Deimos.config.consumers.replace_associations : + kafka_config.replace_associations, + bulk_import_id_generator: kafka_config.bulk_import_id_generator || + Deimos.config.consumers.bulk_import_id_generator ) end end @@ -242,6 +246,15 @@ def self.configure_producer_or_consumer(kafka_config) # Not needed if reraise_errors is set to true. # @return [Block] setting(:fatal_error, proc { false }) + + # The default function to generate a bulk ID for bulk consumers + # @return [Block] + setting :bulk_import_id_generator, proc { SecureRandom.uuid } + + # If true, multi-table consumers will blow away associations rather than appending to them. + # Applies to all consumers unless specified otherwise + # @return [Boolean] + setting :replace_associations, true end setting :producers do @@ -445,7 +458,13 @@ def self.configure_producer_or_consumer(kafka_config) setting :bulk_import_id_column, :bulk_import_id # If true, multi-table consumers will blow away associations rather than appending to them. # @return [Boolean] - setting :replace_associations, true + setting :replace_associations, nil + + # The default function to generate a bulk ID for this consumer + # Uses the consumers proc defined in the consumers config by default unless + # specified for individual consumers + # @return [void] + setting :bulk_import_id_generator, nil # These are the phobos "listener" configs. See CONFIGURATION.md for more # info. From 121ee7285ff50b8953e6699bf17e279ff86a82b4 Mon Sep 17 00:00:00 2001 From: Lionel Pereira Date: Fri, 24 Nov 2023 07:01:37 -0500 Subject: [PATCH 4/5] CCOL-2039: Initial pass at specs --- .../batch_consumption.rb | 21 ++++++++++++------- .../active_record_consume/mass_updater.rb | 5 +++-- spec/config/configuration_spec.rb | 9 +++++--- 3 files changed, 23 insertions(+), 12 deletions(-) diff --git a/lib/deimos/active_record_consume/batch_consumption.rb b/lib/deimos/active_record_consume/batch_consumption.rb index 8051a550..4f8a17b9 100644 --- a/lib/deimos/active_record_consume/batch_consumption.rb +++ b/lib/deimos/active_record_consume/batch_consumption.rb @@ -39,7 +39,7 @@ def consume_batch(payloads, metadata) Deimos::Utils::DeadlockRetry.wrap(tags) do valid_upserts, invalid_upserts = if @compacted || self.class.config[:no_keys] update_database(compact_messages(messages)) - else + else uncompacted_update(messages) end end @@ -139,9 +139,16 @@ def update_database(messages) invalid_upserts = [] if upserted.any? valid_upserts, invalid_upserts = if max_db_batch_size - # TODO: This makes a 2d array, need to compact - upserted.each_slice(max_db_batch_size) { |group| upsert_records(group) } - else + upserted.each_slice(max_db_batch_size) { |group| + upsert_records(group) + }. + reduce([[], []]) do |results, result| + results[0].push(result[0]) + results[1].push(result[1]) + end + valid_upserts.compact! + invalid_upserts.compact! + else upsert_records(upserted) end end @@ -172,7 +179,8 @@ def upsert_records(messages) updater = MassUpdater.new(@klass, key_col_proc: key_col_proc, col_proc: col_proc, - replace_associations: self.class.replace_associations) + replace_associations: self.class.replace_associations, + batch_id_generator: self.class.batch_id_generator) [updater.mass_update(record_list), invalid] end @@ -215,8 +223,7 @@ def build_records(messages) BatchRecord.new(klass: @klass, attributes: attrs, bulk_import_column: col, - bulk_id_generator: self.class.bulk_import_id_generator - ) + bulk_id_generator: self.class.bulk_import_id_generator) end BatchRecordList.new(records.compact) end diff --git a/lib/deimos/active_record_consume/mass_updater.rb b/lib/deimos/active_record_consume/mass_updater.rb index ac61ab85..0dc294e2 100644 --- a/lib/deimos/active_record_consume/mass_updater.rb +++ b/lib/deimos/active_record_consume/mass_updater.rb @@ -19,9 +19,10 @@ def default_cols(klass) # @param key_col_proc [Proc] # @param col_proc [Proc] # @param replace_associations [Boolean] - def initialize(klass, key_col_proc: nil, col_proc: nil, replace_associations: true) + def initialize(klass, key_col_proc: nil, col_proc: nil, replace_associations: true, batch_id_generator: nil) @klass = klass @replace_associations = replace_associations + @batch_id_generator = batch_id_generator || proc { SecureRandom.uuid } @key_cols = {} @key_col_proc = key_col_proc @@ -69,7 +70,7 @@ def save_records_to_database(record_list) def import_associations(record_list) record_list.fill_primary_keys! - import_id = @replace_associations ? SecureRandom.uuid : nil + import_id = @replace_associations ? @batch_id_generator.call : nil record_list.associations.each do |assoc| sub_records = record_list.map { |r| r.sub_records(assoc.name, import_id) }.flatten next unless sub_records.any? diff --git a/spec/config/configuration_spec.rb b/spec/config/configuration_spec.rb index ee7c04b9..0f78dfcd 100644 --- a/spec/config/configuration_spec.rb +++ b/spec/config/configuration_spec.rb @@ -91,7 +91,8 @@ def consume heartbeat_interval: 10, handler: 'ConsumerTest::MyConsumer', use_schema_classes: nil, - max_db_batch_size: nil + max_db_batch_size: nil, + bulk_import_id_generator: nil }, { topic: 'my_batch_consume_topic', group_id: 'my_batch_group_id', @@ -109,7 +110,8 @@ def consume heartbeat_interval: 10, handler: 'ConsumerTest::MyBatchConsumer', use_schema_classes: nil, - max_db_batch_size: nil + max_db_batch_size: nil, + bulk_import_id_generator: nil } ], producer: { @@ -261,7 +263,8 @@ def consume heartbeat_interval: 13, handler: 'MyConfigConsumer', use_schema_classes: false, - max_db_batch_size: nil + max_db_batch_size: nil, + bulk_import_id_generator: nil } ], producer: { From 9fddb239c39613fc1c9e8ae3b65e3a492da82191 Mon Sep 17 00:00:00 2001 From: Lionel Pereira Date: Fri, 24 Nov 2023 08:22:44 -0500 Subject: [PATCH 5/5] CCOL-2039: Update existing specs --- .../active_record_consume/batch_consumption.rb | 18 ++++++++---------- .../active_record_consume/mass_updater_spec.rb | 11 ++++++++--- 2 files changed, 16 insertions(+), 13 deletions(-) diff --git a/lib/deimos/active_record_consume/batch_consumption.rb b/lib/deimos/active_record_consume/batch_consumption.rb index 4f8a17b9..f6836989 100644 --- a/lib/deimos/active_record_consume/batch_consumption.rb +++ b/lib/deimos/active_record_consume/batch_consumption.rb @@ -139,15 +139,13 @@ def update_database(messages) invalid_upserts = [] if upserted.any? valid_upserts, invalid_upserts = if max_db_batch_size - upserted.each_slice(max_db_batch_size) { |group| - upsert_records(group) - }. - reduce([[], []]) do |results, result| - results[0].push(result[0]) - results[1].push(result[1]) - end - valid_upserts.compact! - invalid_upserts.compact! + upserted.each_slice(max_db_batch_size) do |group| + valid, invalid = upsert_records(group) + valid_upserts.push(valid) + invalid_upserts.push(invalid) + end + valid_upserts.compact! + invalid_upserts.compact! else upsert_records(upserted) end @@ -180,7 +178,7 @@ def upsert_records(messages) key_col_proc: key_col_proc, col_proc: col_proc, replace_associations: self.class.replace_associations, - batch_id_generator: self.class.batch_id_generator) + batch_id_generator: self.class.bulk_import_id_generator) [updater.mass_update(record_list), invalid] end diff --git a/spec/active_record_consume/mass_updater_spec.rb b/spec/active_record_consume/mass_updater_spec.rb index a3e82428..503cf1bf 100644 --- a/spec/active_record_consume/mass_updater_spec.rb +++ b/spec/active_record_consume/mass_updater_spec.rb @@ -39,6 +39,8 @@ end end + let(:bulk_id_generator) { proc { SecureRandom.uuid } } + before(:each) do stub_const('Widget', widget_class) stub_const('Detail', detail_class) @@ -52,19 +54,22 @@ Deimos::ActiveRecordConsume::BatchRecord.new( klass: Widget, attributes: { test_id: 'id1', some_int: 5, detail: { title: 'Title 1' } }, - bulk_import_column: 'bulk_import_id' + bulk_import_column: 'bulk_import_id', + bulk_id_generator: bulk_id_generator ), Deimos::ActiveRecordConsume::BatchRecord.new( klass: Widget, attributes: { test_id: 'id2', some_int: 10, detail: { title: 'Title 2' } }, - bulk_import_column: 'bulk_import_id' + bulk_import_column: 'bulk_import_id', + bulk_id_generator: bulk_id_generator ) ] ) end it 'should mass update the batch' do - described_class.new(Widget).mass_update(batch) + results = described_class.new(Widget).mass_update(batch) + expect(results.count).to eq(2) expect(Widget.count).to eq(2) expect(Detail.count).to eq(2) expect(Widget.first.detail).not_to be_nil