Skip to content

Commit

Permalink
Upgrade Worker.pool to Promises.future
Browse files Browse the repository at this point in the history
* Upgrade Worker.pool from Future to Promises.future

  Promises.future leverages an improved, non-blocking, and lock-free
  implementation of Concurrent Ruby's Async runtime, enhancing performance
  and future feature compatibility.  Promises.future was moved from
  edge to main in V1.1 (2018).

* Replace ClaimedExecution::Results with Concurrent::Maybe

  `Results` was underutilized and essentially mirrored `Maybe`'s
  functionality. This change simplifies code and reduces redundancy
  by leveraging the `Concurrent::Maybe` class.

* Centralize error reporting in AppExecutor.handle_thread_error

  This change was necessitated by the change to Promises.future.
  Concurrent Ruby has some very strong ideas about exceptions
  within a future with a little code rearranging, this change
  pulls the error / exception reporting responsibilities out
  of the Future execution code path and pushes it to
  AppExecutor#handle_thread_error.

  This change ensures that `Rails.error` is called exactly once
  per `handle_thread_error` invocation regardless of
  on_thread_error calling `Rails.error` or not.

* Update tests to accommodate these changes
  • Loading branch information
hms committed Dec 2, 2024
1 parent 7c09954 commit 6026d0b
Show file tree
Hide file tree
Showing 9 changed files with 51 additions and 39 deletions.
19 changes: 5 additions & 14 deletions app/models/solid_queue/claimed_execution.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,6 @@ class SolidQueue::ClaimedExecution < SolidQueue::Execution

scope :orphaned, -> { where.missing(:process) }

class Result < Struct.new(:success, :error)
def success?
success
end
end

class << self
def claiming(job_ids, process_id, &block)
job_data = Array(job_ids).collect { |job_id| { job_id: job_id, process_id: process_id } }
Expand Down Expand Up @@ -60,12 +54,9 @@ def discard_all_from_jobs(*)
def perform
result = execute

if result.success?
finished
else
failed_with(result.error)
raise result.error
end
result.just? ? finished : failed_with(result.reason)

result
ensure
job.unblock_next_blocked_job
end
Expand Down Expand Up @@ -93,9 +84,9 @@ def failed_with(error)
private
def execute
ActiveJob::Base.execute(job.arguments)
Result.new(true, nil)
Concurrent::Maybe.just(true)
rescue Exception => e
Result.new(false, e)
Concurrent::Maybe.nothing(e)
end

def finished
Expand Down
1 change: 1 addition & 0 deletions lib/solid_queue.rb
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ module SolidQueue
mattr_accessor :preserve_finished_jobs, default: true
mattr_accessor :clear_finished_jobs_after, default: 1.day
mattr_accessor :default_concurrency_control_period, default: 3.minutes
mattr_accessor :reporting_label, default: "SolidQueue-#{SolidQueue::VERSION}"

delegate :on_start, :on_stop, to: Supervisor

Expand Down
35 changes: 31 additions & 4 deletions lib/solid_queue/app_executor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,38 @@ def wrap_in_app_executor(&block)
end

def handle_thread_error(error)
SolidQueue.instrument(:thread_error, error: error)
CallErrorReporters.new(error).call
end

private

# Handles error reporting and guarantees that Rails.error will be called if configured.
#
# This method performs the following actions:
# 1. Invokes `SolidQueue.instrument` for `:thread_error`.
# 2. Invokes `SolidQueue.on_thread_error` if it is configured.
# 3. Invokes `Rails.error.report` if it wasn't invoked by one of the above calls.
class CallErrorReporters
# @param [Exception] error The error to be reported.
def initialize(error)
@error = error
@reported = false
end

def call
SolidQueue.instrument(:thread_error, error: @error)
Rails.error.subscribe(self) if Rails.error&.respond_to?(:subscribe)

if SolidQueue.on_thread_error
SolidQueue.on_thread_error.call(error)
SolidQueue.on_thread_error&.call(@error)

Rails.error.report(@error, handled: false, source: SolidQueue.reporting_label) unless @reported
ensure
Rails.error.unsubscribe(self) if Rails.error&.respond_to?(:unsubscribe)
end

def report(*, **)
@reported = true
end
end
end
end
end
2 changes: 1 addition & 1 deletion lib/solid_queue/engine.rb
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ class Engine < ::Rails::Engine

initializer "solid_queue.app_executor", before: :run_prepare_callbacks do |app|
config.solid_queue.app_executor ||= app.executor
config.solid_queue.on_thread_error ||= ->(exception) { Rails.error.report(exception, handled: false) }
config.solid_queue.on_thread_error ||= ->(exception) { Rails.error.report(exception, handled: false, source: SolidQueue.reporting_label) }

SolidQueue.app_executor = config.solid_queue.app_executor
SolidQueue.on_thread_error = config.solid_queue.on_thread_error
Expand Down
2 changes: 1 addition & 1 deletion lib/solid_queue/log_subscriber.rb
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ def replace_fork(event)

private
def formatted_event(event, action:, **attributes)
"SolidQueue-#{SolidQueue::VERSION} #{action} (#{event.duration.round(1)}ms) #{formatted_attributes(**attributes)}"
"#{SolidQueue.reporting_label} #{action} (#{event.duration.round(1)}ms) #{formatted_attributes(**attributes)}"
end

def formatted_attributes(**attributes)
Expand Down
12 changes: 4 additions & 8 deletions lib/solid_queue/pool.rb
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,16 @@ def initialize(size, on_idle: nil)
def post(execution)
available_threads.decrement

future = Concurrent::Future.new(args: [ execution ], executor: executor) do |thread_execution|
Concurrent::Promises.future_on(executor, execution) do |thread_execution|
wrap_in_app_executor do
thread_execution.perform
result = thread_execution.perform

handle_thread_error(result.reason) if result.rejected?
ensure
available_threads.increment
mutex.synchronize { on_idle.try(:call) if idle? }
end
end

future.add_observer do |_, _, error|
handle_thread_error(error) if error
end

future.execute
end

def idle_threads
Expand Down
5 changes: 3 additions & 2 deletions test/integration/instrumentation_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -391,9 +391,10 @@ class InstrumentationTest < ActiveSupport::TestCase

test "thread errors emit thread_error events" do
previous_thread_report_on_exception, Thread.report_on_exception = Thread.report_on_exception, false

error = ExpectedTestError.new("everything is broken")
SolidQueue::ClaimedExecution::Result.expects(:new).raises(error).at_least_once

# Allows the job to process normally, but trigger the error path in ClaimedExecution.execute
Concurrent::Maybe.expects(:just).returns(Concurrent::Maybe.nothing(error))

AddToBufferJob.perform_later "hey!"

Expand Down
12 changes: 4 additions & 8 deletions test/models/solid_queue/claimed_execution_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,7 @@ class SolidQueue::ClaimedExecutionTest < ActiveSupport::TestCase
job = claimed_execution.job

assert_difference -> { SolidQueue::ClaimedExecution.count } => -1, -> { SolidQueue::FailedExecution.count } => 1 do
assert_raises RuntimeError do
claimed_execution.perform
end
claimed_execution.perform
end

assert_not job.reload.finished?
Expand All @@ -39,12 +37,10 @@ class SolidQueue::ClaimedExecutionTest < ActiveSupport::TestCase
test "job failures are reported via Rails error subscriber" do
subscriber = ErrorBuffer.new

assert_raises RuntimeError do
with_error_subscriber(subscriber) do
claimed_execution = prepare_and_claim_job RaisingJob.perform_later(RuntimeError, "B")
with_error_subscriber(subscriber) do
claimed_execution = prepare_and_claim_job RaisingJob.perform_later(RuntimeError, "B")

claimed_execution.perform
end
claimed_execution.perform
end

assert_equal 1, subscriber.errors.count
Expand Down
2 changes: 1 addition & 1 deletion test/unit/worker_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ class WorkerTest < ActiveSupport::TestCase
subscriber = ErrorBuffer.new
Rails.error.subscribe(subscriber)

SolidQueue::ClaimedExecution::Result.expects(:new).raises(ExpectedTestError.new("everything is broken")).at_least_once
Concurrent::Maybe.expects(:just).returns(Concurrent::Maybe.nothing(ExpectedTestError.new("everything is broken")))

AddToBufferJob.perform_later "hey!"

Expand Down

0 comments on commit 6026d0b

Please sign in to comment.