Skip to content

Commit

Permalink
CV2-5717: update es parent_id after pin another item (#2141)
Browse files Browse the repository at this point in the history
* CV2-5717: update es parent_id after pin another item

* CV2-5717: fix tests

* CV2-5717: add rake task to fix parent_id for suggested list

* CV2-5717: use save to run callbacks and add extra rake task to fix existing items
  • Loading branch information
melsawy authored Nov 29, 2024
1 parent 1a87e4d commit e5018c4
Show file tree
Hide file tree
Showing 4 changed files with 159 additions and 16 deletions.
21 changes: 12 additions & 9 deletions app/models/relationship.rb
Original file line number Diff line number Diff line change
Expand Up @@ -205,21 +205,20 @@ def self.create_unless_exists(source_id, target_id, relationship_type, options =

def update_elasticsearch_parent(action = 'create_or_update')
return if self.is_default? || self.disable_es_callbacks || RequestStore.store[:disable_es_callbacks]
# touch target to update `updated_at` date
target = self.target
unless target.nil?
[self.source, self.target].compact.each do |pm|
updated_at = Time.now
target.update_columns(updated_at: updated_at)
# touch item to update `updated_at` date
pm.update_columns(updated_at: updated_at)
data = { updated_at: updated_at.utc }
data['parent_id'] = {
method: "#{action}_parent_id",
klass: self.class.name,
id: self.id,
default: target_id,
default: pm.id,
type: 'int'
} if self.is_confirmed?
target.update_elasticsearch_doc(data.keys, data, target.id, true)
end
}
pm.update_elasticsearch_doc(data.keys, data, pm.id, true)
end if self.is_confirmed?
end

def set_unmatched_field(value)
Expand Down Expand Up @@ -270,7 +269,11 @@ def propagate_inversion
claim.project_media_id = self.source_id
claim.save
end
Relationship.where(source_id: self.target_id).update_all({ source_id: self.source_id })
Relationship.where(source_id: self.target_id).find_each do |r|
r.source_id = self.source_id
r.skip_check_ability = true
r.save!
end
self.source&.clear_cached_fields
self.target&.clear_cached_fields
Relationship.delay_for(1.second).propagate_inversion(ids, self.source_id)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,18 @@
namespace :check do
namespace :migrate do
def parse_args(args)
output = {}
return output if args.blank?
args.each do |a|
arg = a.split('&')
arg.each do |pair|
key, value = pair.split(':')
output.merge!({ key => value })
end
end
output
end

task fix_parent_id_and_sources_count_for_suggested_items: :environment do
started = Time.now.to_i
index_alias = CheckElasticSearchModel.get_index_alias
Expand Down Expand Up @@ -56,5 +69,59 @@ namespace :check do
minutes = (Time.now.to_i - started) / 60
puts "[#{Time.now}] Done in #{minutes} minutes."
end

# bundle exec rails check:migrate:fix_parent_id_for_suggested_list['slug:team_slug&ids:1-2-3']
task fix_parent_id_for_suggested_list: :environment do |_t, args|
data_args = parse_args args.extras
started = Time.now.to_i
pm_ids = []
pm_ids = begin ids.split('-').map{ |s| s.to_i } rescue [] end
# Add Team condition
team_condition = {}
if data_args['slug'].blank?
last_team_id = Rails.cache.read('check:migrate:fix_parent_id_for_suggested_list:team_id') || 0
else
last_team_id = 0
team_condition = { slug: data_args['slug'] }
end
index_alias = CheckElasticSearchModel.get_index_alias
Team.where('id > ?', last_team_id).where(team_condition).find_each do |team|
result_ids = CheckSearch.new({"suggestions_count"=>{"min"=>1}}.to_json, nil, team.id).medias.map(&:id)
result_ids.concat(pm_ids) unless pm_ids.blank?
# Confirmed items
Relationship.where(source_id: result_ids, relationship_type: Relationship.confirmed_type).find_in_batches(:batch_size => 1000) do |relations|
es_body = []
# Update parent_id for sources
source_ids = relations.map(&:source_id).uniq
source_ids.each do |source_id|
print '.'
doc_id = Base64.encode64("ProjectMedia/#{source_id}")
fields = { "parent_id" => source_id }
es_body << { update: { _index: index_alias, _id: doc_id, retry_on_conflict: 3, data: { doc: fields } } }
end
relations.each do |r|
print '.'
doc_id = Base64.encode64("ProjectMedia/#{r.target_id}")
fields = { "parent_id" => r.source_id }
es_body << { update: { _index: index_alias, _id: doc_id, retry_on_conflict: 3, data: { doc: fields } } }
end
$repository.client.bulk body: es_body unless es_body.blank?
end
# Suggested items
Relationship.where(source_id: result_ids, relationship_type: Relationship.suggested_type).find_in_batches(:batch_size => 1000) do |relations|
es_body = []
relations.each do |r|
print '.'
doc_id = Base64.encode64("ProjectMedia/#{r.target_id}")
fields = { "parent_id" => r.target_id }
es_body << { update: { _index: index_alias, _id: doc_id, retry_on_conflict: 3, data: { doc: fields } } }
end
$repository.client.bulk body: es_body unless es_body.blank?
end
Rails.cache.write('check:migrate:fix_parent_id_for_suggested_list:team_id', team.id) if data_args['slug'].blank?
end
minutes = ((Time.now.to_i - started) / 60).to_i
puts "[#{Time.now}] Done in #{minutes} minutes."
end
end
end
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,52 @@ namespace :check do
$repository.client.update_by_query options
end
end
Rails.cache.write('check:migrate:add_unmatched_to_project_media:team_id', team.id)
end
minutes = ((Time.now.to_i - started) / 60).to_i
puts "[#{Time.now}] Done in #{minutes} minutes."
end

# bundle exec rails check:migrate:fix_unmatched_list
task fix_unmatched_list: :environment do |_t, args|
started = Time.now.to_i
slug = args.extras.last
team_condition = {}
if slug.blank?
last_team_id = Rails.cache.read('check:migrate:fix_unmatched_list:team_id') || 0
else
last_team_id = 0
team_condition = { slug: slug }
end
Team.where('id > ?', last_team_id).where(team_condition).find_each do |team|
puts "Processing team #{team.slug} .... \n"
unmatched_ids = team.project_medias.where(unmatched: 1).map(&:id)
# Get re-matched items (suggested or confirmed)
relationships = Relationship.where('source_id IN (?) OR target_id IN (?)', unmatched_ids, unmatched_ids)
.where('relationship_type = ? OR relationship_type = ?', Relationship.suggested_type.to_yaml, Relationship.confirmed_type.to_yaml)
s_ids = relationships.map(&:source_id).uniq
t_ids = relationships.map(&:target_id).uniq
matched_ids = s_ids.concat(t_ids).uniq
unless matched_ids.blank?
index_alias = CheckElasticSearchModel.get_index_alias
ProjectMedia.where(id: matched_ids).find_in_batches(:batch_size => 500) do |pms|
print '.'
pm_ids = pms.map(&:id)
# Update PG
ProjectMedia.where(id: pm_ids).update_all(unmatched: 0)
# Update ES
options = {
index: index_alias,
conflicts: 'proceed',
body: {
script: { source: "ctx._source.unmatched = params.unmatched", params: { unmatched: 0 } },
query: { terms: { annotated_id: pm_ids } }
}
}
$repository.client.update_by_query options
end
end
Rails.cache.write('check:migrate:fix_unmatched_list:team_id', team.id)
end
minutes = ((Time.now.to_i - started) / 60).to_i
puts "[#{Time.now}] Done in #{minutes} minutes."
Expand Down
41 changes: 34 additions & 7 deletions test/models/relationship_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,10 @@ def setup
test "should update sources_count and parent_id for confirmed item" do
setup_elasticsearch
t = create_team
pm_s = create_project_media team: t
pm_t = create_project_media team: t
pm_s = create_project_media team: t, disable_es_callbacks: false
pm_t = create_project_media team: t, disable_es_callbacks: false
pm_t2 = create_project_media team: t, disable_es_callbacks: false
pm_t3 = create_project_media team: t, disable_es_callbacks: false
r = create_relationship source_id: pm_s.id, target_id: pm_t.id, relationship_type: Relationship.suggested_type
sleep 2
es_t = $repository.find(get_es_id(pm_t))
Expand All @@ -47,11 +49,36 @@ def setup
# Confirm item
r.relationship_type = Relationship.confirmed_type
r.save!
sleep 2
es_t = $repository.find(get_es_id(pm_t))
assert_equal r.source_id, es_t['parent_id']
assert_equal pm_t.reload.sources_count, es_t['sources_count']
assert_equal 1, pm_t.reload.sources_count
r2 = create_relationship source_id: pm_s.id, target_id: pm_t2.id, relationship_type: Relationship.confirmed_type
r3 = create_relationship source_id: pm_s.id, target_id: pm_t3.id, relationship_type: Relationship.suggested_type
sleep 1
es_s = $repository.find(get_es_id(pm_s))
assert_equal pm_s.id, es_s['parent_id']
assert_equal pm_s.reload.sources_count, es_s['sources_count']
assert_equal 0, pm_s.reload.sources_count
[pm_t, pm_t2].each do |pm|
es = $repository.find(get_es_id(pm))
assert_equal pm_s.id, es['parent_id']
assert_equal pm.reload.sources_count, es['sources_count']
assert_equal 1, pm.reload.sources_count
end
# Verify parent_id after pin another item
r2.source_id = pm_t2.id
r2.target_id = pm_s.id
r2.disable_es_callbacks = false
r2.save!
sleep 1
es_t2 = $repository.find(get_es_id(pm_t2))
assert_equal pm_t2.id, es_t2['parent_id']
assert_equal pm_t2.reload.sources_count, es_t2['sources_count']
assert_equal 0, pm_t2.reload.sources_count
[pm_s, pm_t].each do |pm|
es = $repository.find(get_es_id(pm))
assert_equal pm_t2.id, es['parent_id']
assert_equal pm.reload.sources_count, es['sources_count']
assert_equal 1, pm.reload.sources_count
end
# Verify destory
r.destroy!
es_t = $repository.find(get_es_id(pm_t))
assert_equal pm_t.id, es_t['parent_id']
Expand Down

0 comments on commit e5018c4

Please sign in to comment.