Skip to content

Commit

Permalink
CV2-4350: improve load claims and bulk-update clusters (#1836)
Browse files Browse the repository at this point in the history
* CV2-4350: improve load claims and bulk-update clusters

* CV2-4350: use bluk-update and read cluster from memory not PG

* CV2-4350: remove downcase and compare with exact text

* CV2-4350: fix tests

* CV2-4350: fix CC

---------

Co-authored-by: Caio <[email protected]>
  • Loading branch information
melsawy and caiosba authored Mar 31, 2024
1 parent 202e801 commit 9536090
Show file tree
Hide file tree
Showing 5 changed files with 72 additions and 33 deletions.
2 changes: 1 addition & 1 deletion app/models/claim.rb
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ def media_type
end

def uuid
Media.where(type: 'Claim', quote: self.quote.to_s.strip).first&.id || self.id
Media.where(type: 'Claim', quote: self.quote.to_s.strip).joins("INNER JOIN project_medias pm ON pm.media_id = medias.id").first&.id || self.id
end

private
Expand Down
3 changes: 2 additions & 1 deletion lib/check_cluster_center.rb
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
class CheckClusterCenter
def self.replace_or_keep_cluster_center(current_center, new_pm)
def self.replace_or_keep_cluster_center(current_center_id, new_pm)
current_center = ProjectMedia.find_by_id current_center_id unless current_center_id.nil?
return new_pm.id if current_center.nil?
# Define passed variable to contain items that passed check
# Based on `passed` values we should return the id or go to next check
Expand Down
82 changes: 59 additions & 23 deletions lib/tasks/check_khousheh.rake
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,34 @@ namespace :check do
puts
end

# FIXME: Load only the claims we need
def claim_uuid_for_duplicate_quote
puts 'Collecting claim media UUIDs for duplicate quotes...'
claim_uuid = {}
Media.select('quote, MIN(id) as first').where(type: 'Claim').group(:quote).having('COUNT(id) > 1')
.each do |raw|
claim_uuid[Digest::MD5.hexdigest(raw['quote'])] = raw['first'].to_s
def get_claim_uuid(quote)
quote_es = quote[0..1023]
# Remove last word as may be the splitter cut the last word and we are hitting ES with `AND`
quote_es = quote_es[0...quote_es.rindex(' ')]
# Quote stored in title or description(for tipline items) so I used both fields in search
query = {
bool: {
must: [
{ term: { associated_type: { value: 'Claim' } } },
{
simple_query_string: {
fields: ["title", "description"],
query: quote_es,
default_operator: "AND"
}
}
]
}
}
result = $repository.search(query: query, size: 10000)
pm_ids = []
result.each do |r|
if r['title'] == quote || r['description'] == quote
pm_ids << r['annotated_id']
end
end
claim_uuid
uuid = ProjectMedia.where(id: pm_ids.uniq.compact).map(&:media_id).sort.first
uuid.blank? ? uuid : uuid.to_s
end

# docker-compose exec -e elasticsearch_log=0 api bundle exec rake check:khousheh:generate_input
Expand All @@ -30,8 +49,6 @@ namespace :check do
print_task_title 'Generating input files'
FileUtils.mkdir_p(File.join(Rails.root, 'tmp', 'feed-clusters-input'))
started = Time.now.to_i
# Collect claim media UUIDs for duplicate quote
claim_uuid = claim_uuid_for_duplicate_quote
sort = [{ annotated_id: { order: :asc } }]
Feed.find_each do |feed|
# Only feeds that are sharing media
Expand Down Expand Up @@ -62,7 +79,7 @@ namespace :check do
m_ids = pms.map(&:media_id)
Media.where(id: m_ids, type: 'Claim').find_each do |m|
print '.'
uuid[m.id] = claim_uuid[Digest::MD5.hexdigest(m.quote)] || m.id.to_s
uuid[m.id] = get_claim_uuid(m.quote) || m.id.to_s
end
end
pm_ids.each do |pm_id|
Expand All @@ -86,7 +103,7 @@ namespace :check do
end
Media.where(id: tpm_m_mapping.values, type: 'Claim').find_each do |m|
print '.'
t_uuid[m.id] = claim_uuid[Digest::MD5.hexdigest(m.quote)] || m.id.to_s
t_uuid[m.id] = get_claim_uuid(m.quote) || m.id.to_s
end
relations.each do |r|
print '.'
Expand Down Expand Up @@ -191,7 +208,6 @@ namespace :check do
task parse_output: [:environment, :download] do
print_task_title 'Parsing output files'
started = Time.now.to_i
claim_uuid = claim_uuid_for_duplicate_quote
sort = [{ annotated_id: { order: :asc } }]
error_logs = []
Feed.find_each do |feed|
Expand Down Expand Up @@ -230,13 +246,14 @@ namespace :check do
search_after = [0]
page = 0
while true
page += 1
puts "\nIterating on page #{page}/#{pages}\n"
result = $repository.search(_source: 'annotated_id', query: es_query, sort: sort, search_after: search_after, size: PER_PAGE).results
pm_ids = result.collect{ |i| i['annotated_id'] }.uniq
break if pm_ids.empty?
page += 1
puts "\nIterating on page #{page}/#{pages}\n"
pm_media_mapping = {} # Project Media ID => Media ID
uuid = {}
cluster_items = {}
cpm_items = []
ProjectMedia.where(id: pm_ids).find_in_batches(:batch_size => PER_PAGE) do |pms|
# Collect claim media UUIDs
Expand All @@ -246,7 +263,7 @@ namespace :check do
end
Media.where(id: pms.map(&:media_id), type: 'Claim').find_each do |m|
print '.'
uuid[m.id] = claim_uuid[Digest::MD5.hexdigest(m.quote)] || m.id.to_s
uuid[m.id] = get_claim_uuid(m.quote) || m.id.to_s
end
# Fact-checks
pm_fc_mapping = {} # Project Media ID => Fact-Check Updated At
Expand All @@ -263,7 +280,12 @@ namespace :check do
print '.'
cluster_id = mapping[uuid[pm_media_mapping[pm.id]].to_i]
next if cluster_id.nil?
cluster = Cluster.find_by_id(cluster_id)
cluster = nil
if cluster_items[cluster_id]
cluster = OpenStruct.new(cluster_items[cluster_id])
else
cluster = Cluster.find_by_id(cluster_id)
end
next if cluster.nil?
updated_cluster_attributes = { id: cluster.id, created_at: cluster.created_at, updated_at: Time.now }
updated_cluster_attributes[:first_item_at] = cluster.first_item_at || pm.created_at
Expand All @@ -280,13 +302,21 @@ namespace :check do
updated_cluster_attributes[:last_fact_check_date] = pm_fc_mapping[pm.id] if pm_fc_mapping[pm.id].to_i > cluster.last_fact_check_date.to_i
end
cpm_items << { project_media_id: pm.id, cluster_id: cluster.id }
cluster_center = CheckClusterCenter.replace_or_keep_cluster_center(cluster.project_media, pm)
cluster_center = CheckClusterCenter.replace_or_keep_cluster_center(cluster.project_media_id, pm)
updated_cluster_attributes[:project_media_id] = cluster_center
cluster_title = cluster_center == pm.id ? pm.title : cluster.title
updated_cluster_attributes[:title] = cluster_title
# Update cluster
# FIXME: Update clusters in batches
cluster.update_columns(updated_cluster_attributes)
cluster_items[cluster.id] = updated_cluster_attributes
end
end
# Bulk-update Cluster
unless cluster_items.blank?
begin
cluster_items_values = cluster_items.values.to_a
Cluster.upsert_all(cluster_items_values, unique_by: :id)
rescue
error_logs << {feed: "Failed to import Cluster for feed #{feed.id} page #{page}"}
end
end
# Bulk-insert ClusterProjectMedia
Expand All @@ -301,13 +331,19 @@ namespace :check do
end
Team.current = nil
end
puts "Rebuilding clusters for feed #{feed.name} took #{Time.now.to_f - started_at} seconds."
puts "\nRebuilding clusters for feed #{feed.name} took #{Time.now.to_f - started_at} seconds."
rescue Errno::ENOENT
puts "Output file not found for feed #{feed.name}."
puts "\nOutput file not found for feed #{feed.name}."
end
end
# Delete old clusters
Cluster.where(feed_id: feed.id).where('id <= ?', last_old_cluster_id).delete_all unless last_old_cluster_id.nil?
unless last_old_cluster_id.nil?
deleted_clusters = Cluster.where(feed_id: feed.id).where('id <= ?', last_old_cluster_id).map(&:id)
unless deleted_clusters.blank?
ClusterProjectMedia.where(cluster_id: deleted_clusters).delete_all
Cluster.where(id: deleted_clusters).delete_all
end
end
feed.update_column(:last_clusterized_at, Time.now)
end
puts "Logs: #{error_logs.inspect}." unless error_logs.blank?
Expand Down
16 changes: 8 additions & 8 deletions test/lib/check_cluster_center_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,30 +10,30 @@ class CheckClusterCenterTest < ActiveSupport::TestCase
new_pm = create_project_media team: team_b
ProjectMedia.where(id: [center.id, new_pm.id]).update_all(updated_at: Time.now)
# Nothing match so should sort by Alphabetical team name
assert_equal center.id, CheckClusterCenter.replace_or_keep_cluster_center(center.reload, new_pm.reload)
assert_equal center.id, CheckClusterCenter.replace_or_keep_cluster_center(center.id, new_pm.reload)
# Match based on updated_at condition
center.updated_at = Time.now - 1.month
center.save!
assert_equal new_pm.id, CheckClusterCenter.replace_or_keep_cluster_center(center.reload, new_pm.reload)
assert_equal new_pm.id, CheckClusterCenter.replace_or_keep_cluster_center(center.id, new_pm.reload)
# Still should match based on updated_at
2.times { create_tipline_request(team_id: team_a.id, associated: center) }
2.times { create_tipline_request(team_id: team_b.id, associated: new_pm) }
assert_equal new_pm.id, CheckClusterCenter.replace_or_keep_cluster_center(center.reload, new_pm.reload)
assert_equal new_pm.id, CheckClusterCenter.replace_or_keep_cluster_center(center.id, new_pm.reload)
# Should match by requests_count
create_tipline_request team_id: team_a.id, associated: center
assert_equal center.id, CheckClusterCenter.replace_or_keep_cluster_center(center.reload, new_pm.reload)
assert_equal center.id, CheckClusterCenter.replace_or_keep_cluster_center(center.id, new_pm.reload)
# Still should match based on requests_count
cd_1 = create_claim_description project_media: center
cd_2 = create_claim_description project_media: new_pm
assert_equal center.id, CheckClusterCenter.replace_or_keep_cluster_center(center.reload, new_pm.reload)
assert_equal center.id, CheckClusterCenter.replace_or_keep_cluster_center(center.id, new_pm.reload)
# Should match based on Claim
cd_1.destroy!
assert_equal new_pm.id, CheckClusterCenter.replace_or_keep_cluster_center(center.reload, new_pm.reload)
assert_equal new_pm.id, CheckClusterCenter.replace_or_keep_cluster_center(center.id, new_pm.reload)
# Should match based on Fact-Check
publish_report(center)
assert_equal center.id, CheckClusterCenter.replace_or_keep_cluster_center(center.reload, new_pm.reload)
assert_equal center.id, CheckClusterCenter.replace_or_keep_cluster_center(center.id, new_pm.reload)
# Should match based on Claim
publish_report(new_pm)
assert_equal new_pm.id, CheckClusterCenter.replace_or_keep_cluster_center(center.reload, new_pm.reload)
assert_equal new_pm.id, CheckClusterCenter.replace_or_keep_cluster_center(center.id, new_pm.reload)
end
end
2 changes: 2 additions & 0 deletions test/models/media_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -608,8 +608,10 @@ def setup
m = create_media
assert_equal m.id, m.uuid
c1 = create_claim_media quote: 'Foo'
create_project_media media: c1
assert_equal c1.id, c1.uuid
c2 = create_claim_media quote: 'Foo'
create_project_media media: c2
assert_equal c1.id, c2.uuid
end
end

0 comments on commit 9536090

Please sign in to comment.