Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ability to rerun relationship create job #970

Merged
merged 1 commit into from
Aug 22, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 10 additions & 9 deletions app/jobs/bulkrax/create_relationships_job.rb
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ class CreateRelationshipsJob < ApplicationJob

queue_as Bulkrax.config.ingest_queue_name

attr_accessor :user, :importer_run, :errors
##
# @param parent_identifier [String] Work/Collection ID or Bulkrax::Entry source_identifiers
# @param importer_run [Bulkrax::ImporterRun] current importer run (needed to properly update counters)
Expand All @@ -54,9 +55,10 @@ class CreateRelationshipsJob < ApplicationJob
# is the child in the relationship, and vice versa if a child_identifier is passed.
#
# rubocop:disable Metrics/MethodLength
def perform(parent_identifier:, importer_run_id:) # rubocop:disable Metrics/AbcSize
@importer_run = Bulkrax::ImporterRun.find(importer_run_id)
ability = Ability.new(importer_run.user)
def perform(parent_identifier:, importer_run_id: nil, run_user: nil) # rubocop:disable Metrics/AbcSize
importer_run = Bulkrax::ImporterRun.find(importer_run_id) if importer_run_id
user = run_user || importer_run&.user
ability = Ability.new(user)

parent_entry, parent_record = find_record(parent_identifier, importer_run_id)

Expand All @@ -69,10 +71,11 @@ def perform(parent_identifier:, importer_run_id:) # rubocop:disable Metrics/AbcS
if parent_record
conditionally_acquire_lock_for(parent_record.id) do
ActiveRecord::Base.uncached do
Bulkrax::PendingRelationship.where(parent_id: parent_identifier, importer_run_id: importer_run_id)
Bulkrax::PendingRelationship.where(parent_id: parent_identifier)
.ordered.find_each do |rel|
process(relationship: rel, importer_run_id: importer_run_id, parent_record: parent_record, ability: ability)
number_of_successes += 1
@parent_record_members_added = true
rescue => e
number_of_failures += 1
errors << e
Expand All @@ -81,7 +84,7 @@ def perform(parent_identifier:, importer_run_id:) # rubocop:disable Metrics/AbcS

# save record if members were added
if @parent_record_members_added
Bulkrax.object_factory.save!(resource: parent_record, user: importer_run.user)
Bulkrax.object_factory.save!(resource: parent_record, user: user)
Bulkrax.object_factory.publish(event: 'object.membership.updated', object: parent_record)
Bulkrax.object_factory.update_index(resources: @child_members_added)
end
Expand All @@ -107,7 +110,7 @@ def perform(parent_identifier:, importer_run_id:) # rubocop:disable Metrics/AbcS

# TODO: This can create an infinite job cycle, consider a time to live tracker.
reschedule(parent_identifier: parent_identifier, importer_run_id: importer_run_id)
return false # stop current job from continuing to run after rescheduling
return errors # stop current job from continuing to run after rescheduling
else
# rubocop:disable Rails/SkipsModelValidations
ImporterRun.update_counters(importer_run_id, processed_relationships: number_of_successes)
Expand All @@ -116,8 +119,6 @@ def perform(parent_identifier:, importer_run_id:) # rubocop:disable Metrics/AbcS
end
# rubocop:enable Metrics/MethodLength

attr_reader :importer_run

private

##
Expand Down Expand Up @@ -170,7 +171,7 @@ def add_to_collection(child_record, parent_record)
Bulkrax.object_factory.add_resource_to_collection(
collection: parent_record,
resource: child_record,
user: importer_run.user
user: user
)
end

Expand Down
Loading