From 9536090534412d43047b3ca77409d2868e003a43 Mon Sep 17 00:00:00 2001 From: Mohamed El-Sawy Date: Sun, 31 Mar 2024 19:35:13 +0200 Subject: [PATCH] CV2-4350: improve load claims and bulk-update clusters (#1836) * CV2-4350: improve load claims and bulk-update clusters * CV2-4350: use bluk-update and read cluster from memory not PG * CV2-4350: remove downcase and compare with exact text * CV2-4350: fix tests * CV2-4350: fix CC --------- Co-authored-by: Caio <117518+caiosba@users.noreply.github.com> --- app/models/claim.rb | 2 +- lib/check_cluster_center.rb | 3 +- lib/tasks/check_khousheh.rake | 82 +++++++++++++++++++-------- test/lib/check_cluster_center_test.rb | 16 +++--- test/models/media_test.rb | 2 + 5 files changed, 72 insertions(+), 33 deletions(-) diff --git a/app/models/claim.rb b/app/models/claim.rb index 69d7f47171..5ae022dfe3 100644 --- a/app/models/claim.rb +++ b/app/models/claim.rb @@ -13,7 +13,7 @@ def media_type end def uuid - Media.where(type: 'Claim', quote: self.quote.to_s.strip).first&.id || self.id + Media.where(type: 'Claim', quote: self.quote.to_s.strip).joins("INNER JOIN project_medias pm ON pm.media_id = medias.id").first&.id || self.id end private diff --git a/lib/check_cluster_center.rb b/lib/check_cluster_center.rb index fb7c405a1e..94f2ec7f22 100644 --- a/lib/check_cluster_center.rb +++ b/lib/check_cluster_center.rb @@ -1,5 +1,6 @@ class CheckClusterCenter - def self.replace_or_keep_cluster_center(current_center, new_pm) + def self.replace_or_keep_cluster_center(current_center_id, new_pm) + current_center = ProjectMedia.find_by_id current_center_id unless current_center_id.nil? return new_pm.id if current_center.nil? # Define passed variable to contain items that passed check # Based on `passed` values we should return the id or go to next check diff --git a/lib/tasks/check_khousheh.rake b/lib/tasks/check_khousheh.rake index f0507bb4ba..7762095c0a 100644 --- a/lib/tasks/check_khousheh.rake +++ b/lib/tasks/check_khousheh.rake @@ -13,15 +13,34 @@ namespace :check do puts end - # FIXME: Load only the claims we need - def claim_uuid_for_duplicate_quote - puts 'Collecting claim media UUIDs for duplicate quotes...' - claim_uuid = {} - Media.select('quote, MIN(id) as first').where(type: 'Claim').group(:quote).having('COUNT(id) > 1') - .each do |raw| - claim_uuid[Digest::MD5.hexdigest(raw['quote'])] = raw['first'].to_s + def get_claim_uuid(quote) + quote_es = quote[0..1023] + # Remove last word as may be the splitter cut the last word and we are hitting ES with `AND` + quote_es = quote_es[0...quote_es.rindex(' ')] + # Quote stored in title or description(for tipline items) so I used both fields in search + query = { + bool: { + must: [ + { term: { associated_type: { value: 'Claim' } } }, + { + simple_query_string: { + fields: ["title", "description"], + query: quote_es, + default_operator: "AND" + } + } + ] + } + } + result = $repository.search(query: query, size: 10000) + pm_ids = [] + result.each do |r| + if r['title'] == quote || r['description'] == quote + pm_ids << r['annotated_id'] + end end - claim_uuid + uuid = ProjectMedia.where(id: pm_ids.uniq.compact).map(&:media_id).sort.first + uuid.blank? ? uuid : uuid.to_s end # docker-compose exec -e elasticsearch_log=0 api bundle exec rake check:khousheh:generate_input @@ -30,8 +49,6 @@ namespace :check do print_task_title 'Generating input files' FileUtils.mkdir_p(File.join(Rails.root, 'tmp', 'feed-clusters-input')) started = Time.now.to_i - # Collect claim media UUIDs for duplicate quote - claim_uuid = claim_uuid_for_duplicate_quote sort = [{ annotated_id: { order: :asc } }] Feed.find_each do |feed| # Only feeds that are sharing media @@ -62,7 +79,7 @@ namespace :check do m_ids = pms.map(&:media_id) Media.where(id: m_ids, type: 'Claim').find_each do |m| print '.' - uuid[m.id] = claim_uuid[Digest::MD5.hexdigest(m.quote)] || m.id.to_s + uuid[m.id] = get_claim_uuid(m.quote) || m.id.to_s end end pm_ids.each do |pm_id| @@ -86,7 +103,7 @@ namespace :check do end Media.where(id: tpm_m_mapping.values, type: 'Claim').find_each do |m| print '.' - t_uuid[m.id] = claim_uuid[Digest::MD5.hexdigest(m.quote)] || m.id.to_s + t_uuid[m.id] = get_claim_uuid(m.quote) || m.id.to_s end relations.each do |r| print '.' @@ -191,7 +208,6 @@ namespace :check do task parse_output: [:environment, :download] do print_task_title 'Parsing output files' started = Time.now.to_i - claim_uuid = claim_uuid_for_duplicate_quote sort = [{ annotated_id: { order: :asc } }] error_logs = [] Feed.find_each do |feed| @@ -230,13 +246,14 @@ namespace :check do search_after = [0] page = 0 while true - page += 1 - puts "\nIterating on page #{page}/#{pages}\n" result = $repository.search(_source: 'annotated_id', query: es_query, sort: sort, search_after: search_after, size: PER_PAGE).results pm_ids = result.collect{ |i| i['annotated_id'] }.uniq break if pm_ids.empty? + page += 1 + puts "\nIterating on page #{page}/#{pages}\n" pm_media_mapping = {} # Project Media ID => Media ID uuid = {} + cluster_items = {} cpm_items = [] ProjectMedia.where(id: pm_ids).find_in_batches(:batch_size => PER_PAGE) do |pms| # Collect claim media UUIDs @@ -246,7 +263,7 @@ namespace :check do end Media.where(id: pms.map(&:media_id), type: 'Claim').find_each do |m| print '.' - uuid[m.id] = claim_uuid[Digest::MD5.hexdigest(m.quote)] || m.id.to_s + uuid[m.id] = get_claim_uuid(m.quote) || m.id.to_s end # Fact-checks pm_fc_mapping = {} # Project Media ID => Fact-Check Updated At @@ -263,7 +280,12 @@ namespace :check do print '.' cluster_id = mapping[uuid[pm_media_mapping[pm.id]].to_i] next if cluster_id.nil? - cluster = Cluster.find_by_id(cluster_id) + cluster = nil + if cluster_items[cluster_id] + cluster = OpenStruct.new(cluster_items[cluster_id]) + else + cluster = Cluster.find_by_id(cluster_id) + end next if cluster.nil? updated_cluster_attributes = { id: cluster.id, created_at: cluster.created_at, updated_at: Time.now } updated_cluster_attributes[:first_item_at] = cluster.first_item_at || pm.created_at @@ -280,13 +302,21 @@ namespace :check do updated_cluster_attributes[:last_fact_check_date] = pm_fc_mapping[pm.id] if pm_fc_mapping[pm.id].to_i > cluster.last_fact_check_date.to_i end cpm_items << { project_media_id: pm.id, cluster_id: cluster.id } - cluster_center = CheckClusterCenter.replace_or_keep_cluster_center(cluster.project_media, pm) + cluster_center = CheckClusterCenter.replace_or_keep_cluster_center(cluster.project_media_id, pm) updated_cluster_attributes[:project_media_id] = cluster_center cluster_title = cluster_center == pm.id ? pm.title : cluster.title updated_cluster_attributes[:title] = cluster_title # Update cluster - # FIXME: Update clusters in batches - cluster.update_columns(updated_cluster_attributes) + cluster_items[cluster.id] = updated_cluster_attributes + end + end + # Bulk-update Cluster + unless cluster_items.blank? + begin + cluster_items_values = cluster_items.values.to_a + Cluster.upsert_all(cluster_items_values, unique_by: :id) + rescue + error_logs << {feed: "Failed to import Cluster for feed #{feed.id} page #{page}"} end end # Bulk-insert ClusterProjectMedia @@ -301,13 +331,19 @@ namespace :check do end Team.current = nil end - puts "Rebuilding clusters for feed #{feed.name} took #{Time.now.to_f - started_at} seconds." + puts "\nRebuilding clusters for feed #{feed.name} took #{Time.now.to_f - started_at} seconds." rescue Errno::ENOENT - puts "Output file not found for feed #{feed.name}." + puts "\nOutput file not found for feed #{feed.name}." end end # Delete old clusters - Cluster.where(feed_id: feed.id).where('id <= ?', last_old_cluster_id).delete_all unless last_old_cluster_id.nil? + unless last_old_cluster_id.nil? + deleted_clusters = Cluster.where(feed_id: feed.id).where('id <= ?', last_old_cluster_id).map(&:id) + unless deleted_clusters.blank? + ClusterProjectMedia.where(cluster_id: deleted_clusters).delete_all + Cluster.where(id: deleted_clusters).delete_all + end + end feed.update_column(:last_clusterized_at, Time.now) end puts "Logs: #{error_logs.inspect}." unless error_logs.blank? diff --git a/test/lib/check_cluster_center_test.rb b/test/lib/check_cluster_center_test.rb index 1641749335..20e9882c53 100644 --- a/test/lib/check_cluster_center_test.rb +++ b/test/lib/check_cluster_center_test.rb @@ -10,30 +10,30 @@ class CheckClusterCenterTest < ActiveSupport::TestCase new_pm = create_project_media team: team_b ProjectMedia.where(id: [center.id, new_pm.id]).update_all(updated_at: Time.now) # Nothing match so should sort by Alphabetical team name - assert_equal center.id, CheckClusterCenter.replace_or_keep_cluster_center(center.reload, new_pm.reload) + assert_equal center.id, CheckClusterCenter.replace_or_keep_cluster_center(center.id, new_pm.reload) # Match based on updated_at condition center.updated_at = Time.now - 1.month center.save! - assert_equal new_pm.id, CheckClusterCenter.replace_or_keep_cluster_center(center.reload, new_pm.reload) + assert_equal new_pm.id, CheckClusterCenter.replace_or_keep_cluster_center(center.id, new_pm.reload) # Still should match based on updated_at 2.times { create_tipline_request(team_id: team_a.id, associated: center) } 2.times { create_tipline_request(team_id: team_b.id, associated: new_pm) } - assert_equal new_pm.id, CheckClusterCenter.replace_or_keep_cluster_center(center.reload, new_pm.reload) + assert_equal new_pm.id, CheckClusterCenter.replace_or_keep_cluster_center(center.id, new_pm.reload) # Should match by requests_count create_tipline_request team_id: team_a.id, associated: center - assert_equal center.id, CheckClusterCenter.replace_or_keep_cluster_center(center.reload, new_pm.reload) + assert_equal center.id, CheckClusterCenter.replace_or_keep_cluster_center(center.id, new_pm.reload) # Still should match based on requests_count cd_1 = create_claim_description project_media: center cd_2 = create_claim_description project_media: new_pm - assert_equal center.id, CheckClusterCenter.replace_or_keep_cluster_center(center.reload, new_pm.reload) + assert_equal center.id, CheckClusterCenter.replace_or_keep_cluster_center(center.id, new_pm.reload) # Should match based on Claim cd_1.destroy! - assert_equal new_pm.id, CheckClusterCenter.replace_or_keep_cluster_center(center.reload, new_pm.reload) + assert_equal new_pm.id, CheckClusterCenter.replace_or_keep_cluster_center(center.id, new_pm.reload) # Should match based on Fact-Check publish_report(center) - assert_equal center.id, CheckClusterCenter.replace_or_keep_cluster_center(center.reload, new_pm.reload) + assert_equal center.id, CheckClusterCenter.replace_or_keep_cluster_center(center.id, new_pm.reload) # Should match based on Claim publish_report(new_pm) - assert_equal new_pm.id, CheckClusterCenter.replace_or_keep_cluster_center(center.reload, new_pm.reload) + assert_equal new_pm.id, CheckClusterCenter.replace_or_keep_cluster_center(center.id, new_pm.reload) end end diff --git a/test/models/media_test.rb b/test/models/media_test.rb index b3b415511b..a6169b8dc0 100644 --- a/test/models/media_test.rb +++ b/test/models/media_test.rb @@ -608,8 +608,10 @@ def setup m = create_media assert_equal m.id, m.uuid c1 = create_claim_media quote: 'Foo' + create_project_media media: c1 assert_equal c1.id, c1.uuid c2 = create_claim_media quote: 'Foo' + create_project_media media: c2 assert_equal c1.id, c2.uuid end end