Skip to content

Commit

Permalink
Fix queue order when combining multiple prefixes or prefixes and names
Browse files Browse the repository at this point in the history
We were altering the original order to be exact names and then prefixes
in the order returned by the DB, which doesn't need to be the order
specified for the worker. This change ensures the order is respected.
  • Loading branch information
rosa committed Nov 14, 2024
1 parent b515af2 commit 8132c7e
Show file tree
Hide file tree
Showing 2 changed files with 107 additions and 36 deletions.
40 changes: 35 additions & 5 deletions app/models/solid_queue/queue_selector.rb
Original file line number Diff line number Diff line change
Expand Up @@ -34,16 +34,20 @@ def queue_names
def eligible_queues
if include_all_queues? then all_queues
else
exact_names + prefixed_names
in_raw_order(exact_names + prefixed_names)
end
end

def include_all_queues?
"*".in? raw_queues
end

def all_queues
relation.distinct(:queue_name).pluck(:queue_name)
end

def exact_names
raw_queues.select { |queue| !queue.include?("*") }
raw_queues.select { |queue| exact_name?(queue) }
end

def prefixed_names
Expand All @@ -54,15 +58,41 @@ def prefixed_names
end

def prefixes
@prefixes ||= raw_queues.select { |queue| queue.ends_with?("*") }.map { |queue| queue.tr("*", "%") }
@prefixes ||= raw_queues.select { |queue| prefixed_name?(queue) }.map { |queue| queue.tr("*", "%") }
end

def all_queues
relation.distinct(:queue_name).pluck(:queue_name)
def exact_name?(queue)
!queue.include?("*")
end

def prefixed_name?(queue)
queue.ends_with?("*")
end

def paused_queues
@paused_queues ||= Pause.all.pluck(:queue_name)
end

def in_raw_order(queues)
# Only need to sort if we have prefixes and more than one queue name.
# Exact names are selected in the same order as they're found
if queues.one? || prefixes.empty?
queues
else
queues = queues.dup
raw_queues.flat_map { |raw_queue| delete_in_order(raw_queue, queues) }.compact
end
end

def delete_in_order(raw_queue, queues)
if exact_name?(raw_queue)
queues.delete(raw_queue)
elsif prefixed_name?(raw_queue)
prefix = raw_queue.tr("*", "")
queues.select { |queue| queue.start_with?(prefix) }.tap do |matches|
queues -= matches
end
end
end
end
end
103 changes: 72 additions & 31 deletions test/models/solid_queue/ready_execution_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -49,28 +49,51 @@ class SolidQueue::ReadyExecutionTest < ActiveSupport::TestCase
end
end

test "queue order and then priority is respected when using a list of queues" do
test "claim jobs using a wildcard" do
AddToBufferJob.perform_later("hey")
job = SolidQueue::Job.last
assert_equal "background", job.queue_name

assert_claimed_jobs(3) do
SolidQueue::ReadyExecution.claim(%w[ background backend ], 3, 42)
assert_claimed_jobs(6) do
SolidQueue::ReadyExecution.claim("*", SolidQueue::Job.count + 1, 42)
end
end

assert job.reload.claimed?
@jobs.first(2).each do |job|
assert_not job.reload.ready?
assert job.claimed?
test "claim jobs using queue prefixes" do
AddToBufferJob.perform_later("hey")

assert_claimed_jobs(1) do
SolidQueue::ReadyExecution.claim("backgr*", SolidQueue::Job.count + 1, 42)
end

assert @jobs.none?(&:claimed?)
end

test "claim jobs using a wildcard" do
test "claim jobs using a wildcard and having paused queues" do
AddToBufferJob.perform_later("hey")

assert_claimed_jobs(6) do
SolidQueue::Queue.find_by_name("backend").pause

assert_claimed_jobs(1) do
SolidQueue::ReadyExecution.claim("*", SolidQueue::Job.count + 1, 42)
end

@jobs.each(&:reload)
assert @jobs.none?(&:claimed?)
end

test "claim jobs using both exact names and a prefix" do
AddToBufferJob.perform_later("hey")

assert_claimed_jobs(6) do
SolidQueue::ReadyExecution.claim(%w[ backe* background ], SolidQueue::Job.count + 1, 42)
end
end

test "claim jobs for queue without jobs at the moment using prefixes" do
AddToBufferJob.perform_later("hey")

assert_claimed_jobs(0) do
SolidQueue::ReadyExecution.claim(%w[ none* ], SolidQueue::Job.count + 1, 42)
end
end

test "priority order is used when claiming jobs using a wildcard" do
Expand All @@ -88,43 +111,61 @@ class SolidQueue::ReadyExecutionTest < ActiveSupport::TestCase
end
end

test "claim jobs using queue prefixes" do
test "queue order and then priority is respected when using a list of queues" do
AddToBufferJob.perform_later("hey")
job = SolidQueue::Job.last
assert_equal "background", job.queue_name

assert_claimed_jobs(1) do
SolidQueue::ReadyExecution.claim("backgr*", SolidQueue::Job.count + 1, 42)
assert_claimed_jobs(3) do
SolidQueue::ReadyExecution.claim(%w[ background backend ], 3, 42)
end

assert @jobs.none?(&:claimed?)
assert job.reload.claimed?
@jobs.first(2).each do |job|
assert_not job.reload.ready?
assert job.claimed?
end
end

test "claim jobs using a wildcard and having paused queues" do
AddToBufferJob.perform_later("hey")
test "queue order is respected when using prefixes" do
%w[ queue_b1 queue_b2 queue_a2 queue_a1 queue_b1 queue_a2 queue_b2 queue_a1 ].each do |queue_name|
AddToBufferJob.set(queue: queue_name).perform_later(1)
end

SolidQueue::Queue.find_by_name("backend").pause
# Claim 8 jobs
claimed_jobs = []
4.times do
assert_claimed_jobs(2) do
SolidQueue::ReadyExecution.claim(%w[ queue_b* queue_a* ], 2, 42)
end

assert_claimed_jobs(1) do
SolidQueue::ReadyExecution.claim("*", SolidQueue::Job.count + 1, 42)
claimed_jobs += SolidQueue::ClaimedExecution.order(:id).last(2).map(&:job)
end

@jobs.each(&:reload)
assert @jobs.none?(&:claimed?)
# Check claim order
assert_equal %w[ queue_b1 queue_b1 queue_b2 queue_b2 queue_a1 queue_a1 queue_a2 queue_a2 ],
claimed_jobs.map(&:queue_name)
end

test "claim jobs using both exact names and a prefixes" do
AddToBufferJob.perform_later("hey")

assert_claimed_jobs(6) do
SolidQueue::ReadyExecution.claim(%w[ backe* background ], SolidQueue::Job.count + 1, 42)
test "queue order is respected when mixing exact names with prefixes" do
%w[ queue_b1 queue_b2 queue_a2 queue_c2 queue_a1 queue_c1 queue_b1 queue_a2 queue_b2 queue_a1 ].each do |queue_name|
AddToBufferJob.set(queue: queue_name).perform_later(1)
end
end

test "claim jobs for queue without jobs at the moment using prefixes" do
AddToBufferJob.perform_later("hey")
# Claim 10 jobs
claimed_jobs = []
5.times do
assert_claimed_jobs(2) do
SolidQueue::ReadyExecution.claim(%w[ queue_a2 queue_c1 queue_b* queue_c2 queue_a* ], 2, 42)
end

assert_claimed_jobs(0) do
SolidQueue::ReadyExecution.claim(%w[ none* ], SolidQueue::Job.count + 1, 42)
claimed_jobs += SolidQueue::ClaimedExecution.order(:id).last(2).map(&:job)
end

# Check claim order
assert_equal %w[ queue_a2 queue_a2 queue_c1 queue_b1 queue_b1 queue_b2 queue_b2 queue_c2 queue_a1 queue_a1 ],
claimed_jobs.map(&:queue_name)
end

test "discard all" do
Expand Down

0 comments on commit 8132c7e

Please sign in to comment.