From 83c7b11b2e8ae80e3e184d8d8cee5c3671009a1d Mon Sep 17 00:00:00 2001 From: Wendel Fabian Chinsamy Date: Thu, 25 Jul 2024 14:31:01 +0200 Subject: [PATCH 1/4] intial commit --- app/jobs/delete_gbif_events_job.rb | 21 +++++++++++++++---- app/models/event.rb | 33 ++++++++++++++++++++++++++++++ lib/tasks/event.rake | 7 ++++--- 3 files changed, 54 insertions(+), 7 deletions(-) diff --git a/app/jobs/delete_gbif_events_job.rb b/app/jobs/delete_gbif_events_job.rb index 2a75fd443..25c5bad33 100644 --- a/app/jobs/delete_gbif_events_job.rb +++ b/app/jobs/delete_gbif_events_job.rb @@ -3,11 +3,24 @@ class DeleteGbifEventsJob < ApplicationJob queue_as :lupo_background - def perform(id, options = {}) - event = Event.find_by(uuid: id) + def perform(ids, options = {}) + label = options[:label] + index = ENV["INDEX"] - event.destroy! if event.present? + if index.blank? + Rails.logger.error("#{label}: ENV['INDEX'] must be provided") + return + end + + # delete event records from mysql + result = Events.where(id: ids).delete_all + Rails.logger.info("#{label}: #{result} event records deleted") + + # delete event documents from elasticsearch + bulk_payload = ids.map { |id| { delete: { _index: index, _id: id } } } + response = Event.__elasticsearch__.client.bulk(body: bulk_payload) + Rails.logger.info("#{label}: #{response}") rescue => err - Rails.logger.info("#{options[:label]}: event delete error: #{err.message}") + Rails.logger.error("#{label}: #{are.message}") end end diff --git a/app/models/event.rb b/app/models/event.rb index b8fe8745c..4fec93971 100644 --- a/app/models/event.rb +++ b/app/models/event.rb @@ -880,6 +880,39 @@ def self.loop_through_events(options) Rails.logger.info("#{label}: task completed") end + def self.loop_through_gbif_events(options) + size = (options[:size] || 1_000).to_i + cursor = options[:cursor] || [] + filter = options[:filter] || {} + label = options[:label] || "" + job_name = options[:job_name] || "" + query = options[:query].presence + + response = Event.query(query, filter.merge(page: { size: 1, cursor: [] })) + + if response.size.positive? + while response.size.positive? + response = Event.query(query, filter.merge(page: { size: size, cursor: cursor })) + + break unless response.size.positive? + + Rails.logger.info("#{label}: #{response.size} events starting with _id #{response.results.to_a.first[:_id]}") + + cursor = response.results.to_a.last[:sort] + + Rails.logger.info "#{label}: cursor: #{cursor}" + + ids = response.results.map(&:_id).uniq + + DeleteGbifEventsJob.perform_later(ids, options) + + # Object.const_get(job_name).perform_later(ids, options) + end + end + + Rails.logger.info("#{label}: task completed") + end + def metric_type if /(requests|investigations)/.match?(relation_type_id.to_s) arr = relation_type_id.split("-", 4) diff --git a/lib/tasks/event.rake b/lib/tasks/event.rake index 2e6fd5323..d20925ea1 100644 --- a/lib/tasks/event.rake +++ b/lib/tasks/event.rake @@ -88,16 +88,17 @@ namespace :gbif_events do desc "delete gbif events" task delete_gbif_events: :environment do options = { - size: 100, + size: 5, from_id: (ENV["FROM_ID"] || Event.minimum(:id)).to_i, until_id: (ENV["UNTIL_ID"] || Event.maximum(:id)).to_i, filter: {}, - query: "+subj.registrantId:datacite.gbif.gbif +relation_type_id:references -source_doi:(\"10.15468/QJGWBA\" OR \"10.35035/GDWQ-3V93\" OR \"10.15469/3XSWXB\" OR \"10.15469/UBP6QO\" OR \"10.35000/TEDB-QD70\" OR \"10.15469/2YMQOZ\")", + # query: "+subj.registrantId:datacite.gbif.gbif +relation_type_id:references -source_doi:(\"10.15468/QJGWBA\" OR \"10.35035/GDWQ-3V93\" OR \"10.15469/3XSWXB\" OR \"10.15469/UBP6QO\" OR \"10.35000/TEDB-QD70\" OR \"10.15469/2YMQOZ\")", + query: "+relation_type_id:is-authored-by", label: "gbif_event_cleanup_#{Time.now.utc.strftime("%d%m%Y%H%M%S")}", job_name: "DeleteGbifEventsJob" } - Event.loop_through_events(options) + Event.loop_through_gbif_events(options) end desc "delete orphaned gbif_events" From 532c4e0bda93cb590658e258306f404ad4d4a772 Mon Sep 17 00:00:00 2001 From: Wendel Fabian Chinsamy Date: Thu, 25 Jul 2024 15:48:55 +0200 Subject: [PATCH 2/4] add a separate gbif loop function --- app/jobs/delete_gbif_events_job.rb | 9 ++++----- app/models/event.rb | 4 +--- lib/tasks/event.rake | 5 ++--- 3 files changed, 7 insertions(+), 11 deletions(-) diff --git a/app/jobs/delete_gbif_events_job.rb b/app/jobs/delete_gbif_events_job.rb index 25c5bad33..1c7777a84 100644 --- a/app/jobs/delete_gbif_events_job.rb +++ b/app/jobs/delete_gbif_events_job.rb @@ -13,14 +13,13 @@ def perform(ids, options = {}) end # delete event records from mysql - result = Events.where(id: ids).delete_all - Rails.logger.info("#{label}: #{result} event records deleted") + sql = ActiveRecord::Base.sanitize_sql_array(["DELETE FROM events WHERE id IN (?)", ids]) + ActiveRecord::Base.connection.execute(sql) # delete event documents from elasticsearch bulk_payload = ids.map { |id| { delete: { _index: index, _id: id } } } - response = Event.__elasticsearch__.client.bulk(body: bulk_payload) - Rails.logger.info("#{label}: #{response}") + Event.__elasticsearch__.client.bulk(body: bulk_payload) rescue => err - Rails.logger.error("#{label}: #{are.message}") + Rails.logger.error("#{label}: #{err.message}") end end diff --git a/app/models/event.rb b/app/models/event.rb index 4fec93971..a8cae10b6 100644 --- a/app/models/event.rb +++ b/app/models/event.rb @@ -904,9 +904,7 @@ def self.loop_through_gbif_events(options) ids = response.results.map(&:_id).uniq - DeleteGbifEventsJob.perform_later(ids, options) - - # Object.const_get(job_name).perform_later(ids, options) + Object.const_get(job_name).perform_later(ids, options) end end diff --git a/lib/tasks/event.rake b/lib/tasks/event.rake index d20925ea1..dd459ca21 100644 --- a/lib/tasks/event.rake +++ b/lib/tasks/event.rake @@ -88,12 +88,11 @@ namespace :gbif_events do desc "delete gbif events" task delete_gbif_events: :environment do options = { - size: 5, + size: 1000, from_id: (ENV["FROM_ID"] || Event.minimum(:id)).to_i, until_id: (ENV["UNTIL_ID"] || Event.maximum(:id)).to_i, filter: {}, - # query: "+subj.registrantId:datacite.gbif.gbif +relation_type_id:references -source_doi:(\"10.15468/QJGWBA\" OR \"10.35035/GDWQ-3V93\" OR \"10.15469/3XSWXB\" OR \"10.15469/UBP6QO\" OR \"10.35000/TEDB-QD70\" OR \"10.15469/2YMQOZ\")", - query: "+relation_type_id:is-authored-by", + query: "+subj.registrantId:datacite.gbif.gbif +relation_type_id:references -source_doi:(\"10.15468/QJGWBA\" OR \"10.35035/GDWQ-3V93\" OR \"10.15469/3XSWXB\" OR \"10.15469/UBP6QO\" OR \"10.35000/TEDB-QD70\" OR \"10.15469/2YMQOZ\")", label: "gbif_event_cleanup_#{Time.now.utc.strftime("%d%m%Y%H%M%S")}", job_name: "DeleteGbifEventsJob" } From 56fb4dae75e1da74ddfa82d88b8bf048be7c6df8 Mon Sep 17 00:00:00 2001 From: Wendel Fabian Chinsamy Date: Thu, 25 Jul 2024 17:16:48 +0200 Subject: [PATCH 3/4] rework gbif cleanup to bulk delete --- app/jobs/delete_gbif_events_job.rb | 7 +------ app/models/event.rb | 6 +++++- lib/tasks/event.rake | 20 ++++++++++++++++++-- 3 files changed, 24 insertions(+), 9 deletions(-) diff --git a/app/jobs/delete_gbif_events_job.rb b/app/jobs/delete_gbif_events_job.rb index 1c7777a84..d40fa913b 100644 --- a/app/jobs/delete_gbif_events_job.rb +++ b/app/jobs/delete_gbif_events_job.rb @@ -5,12 +5,7 @@ class DeleteGbifEventsJob < ApplicationJob def perform(ids, options = {}) label = options[:label] - index = ENV["INDEX"] - - if index.blank? - Rails.logger.error("#{label}: ENV['INDEX'] must be provided") - return - end + index = options[:index] # delete event records from mysql sql = ActiveRecord::Base.sanitize_sql_array(["DELETE FROM events WHERE id IN (?)", ids]) diff --git a/app/models/event.rb b/app/models/event.rb index a8cae10b6..de47a0ef3 100644 --- a/app/models/event.rb +++ b/app/models/event.rb @@ -887,11 +887,13 @@ def self.loop_through_gbif_events(options) label = options[:label] || "" job_name = options[:job_name] || "" query = options[:query].presence + delete_count = 0 + max_delete_count = options[:max_delete_count] response = Event.query(query, filter.merge(page: { size: 1, cursor: [] })) if response.size.positive? - while response.size.positive? + while response.size.positive? && delete_count < max_delete_count response = Event.query(query, filter.merge(page: { size: size, cursor: cursor })) break unless response.size.positive? @@ -905,6 +907,8 @@ def self.loop_through_gbif_events(options) ids = response.results.map(&:_id).uniq Object.const_get(job_name).perform_later(ids, options) + + delete_count += response.size end end diff --git a/lib/tasks/event.rake b/lib/tasks/event.rake index dd459ca21..65430422b 100644 --- a/lib/tasks/event.rake +++ b/lib/tasks/event.rake @@ -87,14 +87,30 @@ end namespace :gbif_events do desc "delete gbif events" task delete_gbif_events: :environment do + index = ENV["INDEX"] + + if index.blank? + Rails.logger.error("You must provide an INDEX environment variable") + exit + end + + max_delete_count = ENV["MAX_DELETE_COUNT"] + + if max_delete_count.blank? + Rails.logger.error("You must provide an MAX_DELETE_COUNT environment variable") + exit + end + options = { - size: 1000, + size: 2, from_id: (ENV["FROM_ID"] || Event.minimum(:id)).to_i, until_id: (ENV["UNTIL_ID"] || Event.maximum(:id)).to_i, filter: {}, query: "+subj.registrantId:datacite.gbif.gbif +relation_type_id:references -source_doi:(\"10.15468/QJGWBA\" OR \"10.35035/GDWQ-3V93\" OR \"10.15469/3XSWXB\" OR \"10.15469/UBP6QO\" OR \"10.35000/TEDB-QD70\" OR \"10.15469/2YMQOZ\")", + job_name: "DeleteGbifEventsJob", label: "gbif_event_cleanup_#{Time.now.utc.strftime("%d%m%Y%H%M%S")}", - job_name: "DeleteGbifEventsJob" + max_delete_count: max_delete_count.to_i, + index: index } Event.loop_through_gbif_events(options) From 1f26360a4fc2d9f542e313c5565f7236f1cf1b78 Mon Sep 17 00:00:00 2001 From: Wendel Fabian Chinsamy Date: Fri, 26 Jul 2024 12:08:29 +0200 Subject: [PATCH 4/4] change page size to 1000 --- lib/tasks/event.rake | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/tasks/event.rake b/lib/tasks/event.rake index 65430422b..e27db58a1 100644 --- a/lib/tasks/event.rake +++ b/lib/tasks/event.rake @@ -102,7 +102,7 @@ namespace :gbif_events do end options = { - size: 2, + size: 1000, from_id: (ENV["FROM_ID"] || Event.minimum(:id)).to_i, until_id: (ENV["UNTIL_ID"] || Event.maximum(:id)).to_i, filter: {},