Skip to content

Commit

Permalink
Use Concurrent::Promises based TimerTask
Browse files Browse the repository at this point in the history
Promises are the recommended infrastructure, replacing
several OG APIs, including TimerTasks.

SQ only uses TimerTasks in 3 places (currently) and
a very small subset of their overall functionality.
SolidQueue::TimerTask is a drop-in replacement.

This PR uses AtomicBoolean instead of the recommended
Concurrent::Cancellation to avoid a dependency on,
and the potential API stability issues of, edge features.

This completes the move from the old APIs to Promises and
makes all of the new concurrency features (Actors,
Channels, etc.) available for future SQ features and
enahancements.
  • Loading branch information
hms committed Dec 2, 2024
1 parent 6026d0b commit 9dc7d30
Show file tree
Hide file tree
Showing 6 changed files with 125 additions and 23 deletions.
2 changes: 0 additions & 2 deletions Gemfile.lock
Original file line number Diff line number Diff line change
Expand Up @@ -132,8 +132,6 @@ GEM
rake (13.2.1)
rdoc (6.8.1)
psych (>= 4.0.0)
rdoc (6.6.3.1)
psych (>= 4.0.0)
regexp_parser (2.9.2)
reline (0.5.12)
io-console (~> 0.5)
Expand Down
8 changes: 1 addition & 7 deletions lib/solid_queue/dispatcher/concurrency_maintenance.rb
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,10 @@ def initialize(interval, batch_size)
end

def start
@concurrency_maintenance_task = Concurrent::TimerTask.new(run_now: true, execution_interval: interval) do
@concurrency_maintenance_task = SolidQueue::TimerTask.new(run_now: true, execution_interval: interval) do
expire_semaphores
unblock_blocked_executions
end

@concurrency_maintenance_task.add_observer do |_, _, error|
handle_thread_error(error) if error
end

@concurrency_maintenance_task.execute
end

def stop
Expand Down
8 changes: 1 addition & 7 deletions lib/solid_queue/processes/registrable.rb
Original file line number Diff line number Diff line change
Expand Up @@ -37,15 +37,9 @@ def registered?
end

def launch_heartbeat
@heartbeat_task = Concurrent::TimerTask.new(execution_interval: SolidQueue.process_heartbeat_interval) do
@heartbeat_task = SolidQueue::TimerTask.new(execution_interval: SolidQueue.process_heartbeat_interval) do
wrap_in_app_executor { heartbeat }
end

@heartbeat_task.add_observer do |_, _, error|
handle_thread_error(error) if error
end

@heartbeat_task.execute
end

def stop_heartbeat
Expand Down
9 changes: 2 additions & 7 deletions lib/solid_queue/supervisor/maintenance.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,11 @@ module Supervisor::Maintenance
end

private

def launch_maintenance_task
@maintenance_task = Concurrent::TimerTask.new(run_now: true, execution_interval: SolidQueue.process_alive_threshold) do
@maintenance_task = SolidQueue::TimerTask.new(run_now: true, execution_interval: SolidQueue.process_alive_threshold) do
prune_dead_processes
end

@maintenance_task.add_observer do |_, _, error|
handle_thread_error(error) if error
end

@maintenance_task.execute
end

def stop_maintenance_task
Expand Down
42 changes: 42 additions & 0 deletions lib/solid_queue/timer_task.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
# frozen_string_literal: true

module SolidQueue
class TimerTask
include AppExecutor

def initialize(execution_interval:, run_now: false, &block)
raise ArgumentError, "A block is required" unless block_given?
@shutdown = Concurrent::AtomicBoolean.new

run(run_now, execution_interval, &block)
end

def shutdown
@shutdown.make_true
end

private

def run(run_now, execution_interval, &block)
execute_task(&block) if run_now

Concurrent::Promises.future(execution_interval) do |interval|
repeating_task(interval, &block)
end.run
end

def execute_task(&block)
block.call unless @shutdown.true?
rescue Exception => e
handle_thread_error(e)
end

def repeating_task(interval, &block)
Concurrent::Promises.schedule(interval) do
execute_task(&block)
end.then do
repeating_task(interval, &block) unless @shutdown.true?
end
end
end
end
79 changes: 79 additions & 0 deletions test/unit/timer_task_test.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
# frozen_string_literal: true

require "test_helper"
require "mocha/minitest"

class TimerTaskTest < ActiveSupport::TestCase
test "initialization requires a block" do
assert_raises(ArgumentError) do
SolidQueue::TimerTask.new(execution_interval: 1)
end
end

test "task runs immediate when run now true" do
executed = false

task = SolidQueue::TimerTask.new(run_now: true, execution_interval: 1) do
executed = true
end

sleep 0.1

assert executed, "Task should have executed immediately"
task.shutdown
end

test "task does not run immediately when run with run_now false" do
executed = false

task = SolidQueue::TimerTask.new(run_now: false, execution_interval: 1) do
executed = true
end

sleep 0.1

assert_not executed, "Task should have executed immediately"
task.shutdown
end

test "task repeats" do
executions = 0

task = SolidQueue::TimerTask.new(execution_interval: 0.1, run_now: false) do
executions += 1
end

sleep(0.5) # Wait to accumulate some executions

assert executions > 3, "The block should be executed repeatedly"

task.shutdown
end

test "task stops on shutdown" do
executions = 0

task = SolidQueue::TimerTask.new(execution_interval: 0.1, run_now: false) { executions += 1 }

sleep(0.3) # Let the task run a few times

task.shutdown

current_executions = executions

sleep(0.5) # Ensure no more executions after shutdown

assert_equal current_executions, executions, "The task should stop executing after shutdown"
end

test "calls handle_thread_error if task raises" do
task = SolidQueue::TimerTask.new(execution_interval: 0.1) do
raise ExpectedTestError.new
end
task.expects(:handle_thread_error).with(instance_of(ExpectedTestError))

sleep(0.2) # Give some time for the task to run and handle the error

task.shutdown
end
end

0 comments on commit 9dc7d30

Please sign in to comment.