diff --git a/Gemfile.lock b/Gemfile.lock index e8bc7762..0ed3e912 100644 --- a/Gemfile.lock +++ b/Gemfile.lock @@ -132,8 +132,6 @@ GEM rake (13.2.1) rdoc (6.8.1) psych (>= 4.0.0) - rdoc (6.6.3.1) - psych (>= 4.0.0) regexp_parser (2.9.2) reline (0.5.12) io-console (~> 0.5) diff --git a/lib/solid_queue/dispatcher/concurrency_maintenance.rb b/lib/solid_queue/dispatcher/concurrency_maintenance.rb index 81cf770c..fd0073d4 100644 --- a/lib/solid_queue/dispatcher/concurrency_maintenance.rb +++ b/lib/solid_queue/dispatcher/concurrency_maintenance.rb @@ -12,16 +12,10 @@ def initialize(interval, batch_size) end def start - @concurrency_maintenance_task = Concurrent::TimerTask.new(run_now: true, execution_interval: interval) do + @concurrency_maintenance_task = SolidQueue::TimerTask.new(run_now: true, execution_interval: interval) do expire_semaphores unblock_blocked_executions end - - @concurrency_maintenance_task.add_observer do |_, _, error| - handle_thread_error(error) if error - end - - @concurrency_maintenance_task.execute end def stop diff --git a/lib/solid_queue/processes/registrable.rb b/lib/solid_queue/processes/registrable.rb index 58cabfa8..9c8e33cf 100644 --- a/lib/solid_queue/processes/registrable.rb +++ b/lib/solid_queue/processes/registrable.rb @@ -37,15 +37,9 @@ def registered? end def launch_heartbeat - @heartbeat_task = Concurrent::TimerTask.new(execution_interval: SolidQueue.process_heartbeat_interval) do + @heartbeat_task = SolidQueue::TimerTask.new(execution_interval: SolidQueue.process_heartbeat_interval) do wrap_in_app_executor { heartbeat } end - - @heartbeat_task.add_observer do |_, _, error| - handle_thread_error(error) if error - end - - @heartbeat_task.execute end def stop_heartbeat diff --git a/lib/solid_queue/supervisor/maintenance.rb b/lib/solid_queue/supervisor/maintenance.rb index 1b6b5204..48808d7c 100644 --- a/lib/solid_queue/supervisor/maintenance.rb +++ b/lib/solid_queue/supervisor/maintenance.rb @@ -7,16 +7,11 @@ module Supervisor::Maintenance end private + def launch_maintenance_task - @maintenance_task = Concurrent::TimerTask.new(run_now: true, execution_interval: SolidQueue.process_alive_threshold) do + @maintenance_task = SolidQueue::TimerTask.new(run_now: true, execution_interval: SolidQueue.process_alive_threshold) do prune_dead_processes end - - @maintenance_task.add_observer do |_, _, error| - handle_thread_error(error) if error - end - - @maintenance_task.execute end def stop_maintenance_task diff --git a/lib/solid_queue/timer_task.rb b/lib/solid_queue/timer_task.rb new file mode 100644 index 00000000..9a3fcec4 --- /dev/null +++ b/lib/solid_queue/timer_task.rb @@ -0,0 +1,42 @@ +# frozen_string_literal: true + +module SolidQueue + class TimerTask + include AppExecutor + + def initialize(execution_interval:, run_now: false, &block) + raise ArgumentError, "A block is required" unless block_given? + @shutdown = Concurrent::AtomicBoolean.new + + run(run_now, execution_interval, &block) + end + + def shutdown + @shutdown.make_true + end + + private + + def run(run_now, execution_interval, &block) + execute_task(&block) if run_now + + Concurrent::Promises.future(execution_interval) do |interval| + repeating_task(interval, &block) + end.run + end + + def execute_task(&block) + block.call unless @shutdown.true? + rescue Exception => e + handle_thread_error(e) + end + + def repeating_task(interval, &block) + Concurrent::Promises.schedule(interval) do + execute_task(&block) + end.then do + repeating_task(interval, &block) unless @shutdown.true? + end + end + end +end diff --git a/test/unit/timer_task_test.rb b/test/unit/timer_task_test.rb new file mode 100644 index 00000000..b3a9b11b --- /dev/null +++ b/test/unit/timer_task_test.rb @@ -0,0 +1,79 @@ +# frozen_string_literal: true + +require "test_helper" +require "mocha/minitest" + +class TimerTaskTest < ActiveSupport::TestCase + test "initialization requires a block" do + assert_raises(ArgumentError) do + SolidQueue::TimerTask.new(execution_interval: 1) + end + end + + test "task runs immediate when run now true" do + executed = false + + task = SolidQueue::TimerTask.new(run_now: true, execution_interval: 1) do + executed = true + end + + sleep 0.1 + + assert executed, "Task should have executed immediately" + task.shutdown + end + + test "task does not run immediately when run with run_now false" do + executed = false + + task = SolidQueue::TimerTask.new(run_now: false, execution_interval: 1) do + executed = true + end + + sleep 0.1 + + assert_not executed, "Task should have executed immediately" + task.shutdown + end + + test "task repeats" do + executions = 0 + + task = SolidQueue::TimerTask.new(execution_interval: 0.1, run_now: false) do + executions += 1 + end + + sleep(0.5) # Wait to accumulate some executions + + assert executions > 3, "The block should be executed repeatedly" + + task.shutdown + end + + test "task stops on shutdown" do + executions = 0 + + task = SolidQueue::TimerTask.new(execution_interval: 0.1, run_now: false) { executions += 1 } + + sleep(0.3) # Let the task run a few times + + task.shutdown + + current_executions = executions + + sleep(0.5) # Ensure no more executions after shutdown + + assert_equal current_executions, executions, "The task should stop executing after shutdown" + end + + test "calls handle_thread_error if task raises" do + task = SolidQueue::TimerTask.new(execution_interval: 0.1) do + raise ExpectedTestError.new + end + task.expects(:handle_thread_error).with(instance_of(ExpectedTestError)) + + sleep(0.2) # Give some time for the task to run and handle the error + + task.shutdown + end +end