From 58494a337ef061007a3f31485349cb6221726877 Mon Sep 17 00:00:00 2001 From: Rosa Gutierrez Date: Tue, 17 Dec 2024 17:50:05 +0100 Subject: [PATCH] Validate DB connection pool before starting supervisor Also, refactor a bit configuration validation to use ActiveModel's validations. --- lib/solid_queue/configuration.rb | 44 ++++++++++++++++---- lib/solid_queue/supervisor.rb | 2 +- test/dummy/config/recurring_with_invalid.yml | 13 +++--- test/unit/configuration_test.rb | 33 ++++++++------- test/unit/supervisor_test.rb | 10 ++--- 5 files changed, 67 insertions(+), 35 deletions(-) diff --git a/lib/solid_queue/configuration.rb b/lib/solid_queue/configuration.rb index c317e71e..bd238dcc 100644 --- a/lib/solid_queue/configuration.rb +++ b/lib/solid_queue/configuration.rb @@ -2,6 +2,12 @@ module SolidQueue class Configuration + include ActiveModel::Model + + validate :ensure_configured_processes + validate :ensure_valid_recurring_tasks + validate :ensure_correctly_sized_thread_pool + class Process < Struct.new(:kind, :attributes) def instantiate "SolidQueue::#{kind.to_s.titleize}".safe_constantize.new(**attributes) @@ -36,10 +42,6 @@ def configured_processes end end - def valid? - configured_processes.any? && (skip_recurring_tasks? || invalid_tasks.none?) - end - def error_messages if configured_processes.none? "No workers or processed configured. Exiting..." @@ -54,14 +56,32 @@ def error_messages end end - def max_number_of_threads - # At most "threads" in each worker + 1 thread for the worker + 1 thread for the heartbeat task - workers_options.map { |options| options[:threads] }.max + 2 - end - private attr_reader :options + def ensure_configured_processes + unless configured_processes.any? + errors.add(:base, "No processes configured") + end + end + + def ensure_valid_recurring_tasks + unless skip_recurring_tasks? || invalid_tasks.none? + error_messages = invalid_tasks.map do |task| + "- #{task.key}: #{task.errors.full_messages.join(", ")}" + end + + errors.add(:base, "Invalid recurring tasks:\n#{error_messages.join("\n")}") + end + end + + def ensure_correctly_sized_thread_pool + if (db_pool_size = SolidQueue::Record.connection_pool&.size) && db_pool_size < estimated_number_of_threads + errors.add(:base, "Solid Queue is configured to use #{estimated_number_of_threads} threads but the " + + "database connection pool is #{db_pool_size}. Increase it in `config/database.yml`") + end + end + def default_options { config_file: Rails.root.join(ENV["SOLID_QUEUE_CONFIG"] || DEFAULT_CONFIG_FILE_PATH), @@ -169,5 +189,11 @@ def load_config_from_file(file) {} end end + + def estimated_number_of_threads + # At most "threads" in each worker + 1 thread for the worker + 1 thread for the heartbeat task + thread_count = workers_options.map { |options| options.fetch(:threads, WORKER_DEFAULTS[:threads]) }.max + (thread_count || 1) + 2 + end end end diff --git a/lib/solid_queue/supervisor.rb b/lib/solid_queue/supervisor.rb index d2b11f22..e8f075eb 100644 --- a/lib/solid_queue/supervisor.rb +++ b/lib/solid_queue/supervisor.rb @@ -13,7 +13,7 @@ def start(**options) if configuration.valid? new(configuration).tap(&:start) else - abort configuration.error_messages + abort configuration.errors.full_messages.join("\n") + "\nExiting..." end end end diff --git a/test/dummy/config/recurring_with_invalid.yml b/test/dummy/config/recurring_with_invalid.yml index f7889981..69dacf6f 100644 --- a/test/dummy/config/recurring_with_invalid.yml +++ b/test/dummy/config/recurring_with_invalid.yml @@ -1,5 +1,8 @@ -periodic_store_result: - class: StoreResultJorrrrrrb - queue: default - args: [42, { status: "custom_status" }] - schedule: every second +periodic_invalid_class: + class: StoreResultJorrrrrrb + queue: default + args: [42, { status: "custom_status" }] + schedule: every second +periodic_incorrect_schedule: + class: StoreResultJob + schedule: every 1.minute diff --git a/test/unit/configuration_test.rb b/test/unit/configuration_test.rb index 73442d6e..017e9e9b 100644 --- a/test/unit/configuration_test.rb +++ b/test/unit/configuration_test.rb @@ -54,11 +54,6 @@ class ConfigurationTest < ActiveSupport::TestCase assert_processes configuration, :worker, 2 end - test "max number of threads" do - configuration = SolidQueue::Configuration.new - assert 7, configuration.max_number_of_threads - end - test "mulitple workers with the same configuration" do background_worker = { queues: "background", polling_interval: 10, processes: 3 } configuration = SolidQueue::Configuration.new(workers: [ background_worker ]) @@ -90,22 +85,30 @@ class ConfigurationTest < ActiveSupport::TestCase assert_processes configuration, :dispatcher, 1, polling_interval: 0.1, recurring_tasks: nil end - test "detects when there are invalid recurring tasks" do + test "validate configuration" do + # Valid and invalid recurring tasks configuration = SolidQueue::Configuration.new(recurring_schedule_file: config_file_path(:recurring_with_invalid)) - assert_not configuration.valid? - end + assert configuration.errors.full_messages.one? + error = configuration.errors.full_messages.first - test "is valid when there are no recurring tasks" do - configuration = SolidQueue::Configuration.new(recurring_schedule_file: config_file_path(:empty_recurring)) + assert error.include?("Invalid recurring tasks") + assert error.include?("periodic_invalid_class: Class name doesn't correspond to an existing class") + assert error.include?("periodic_incorrect_schedule: Schedule is not a supported recurring schedule") - assert configuration.valid? - end + assert SolidQueue::Configuration.new(recurring_schedule_file: config_file_path(:empty_recurring)).valid? + assert SolidQueue::Configuration.new(skip_recurring: true).valid? - test "is valid when recurring tasks are skipped" do - configuration = SolidQueue::Configuration.new(skip_recurring: true) + # No processes + configuration = SolidQueue::Configuration.new(skip_recurring: true, dispatchers: [], workers: []) + assert_not configuration.valid? + assert_equal [ "No processes configured" ], configuration.errors.full_messages - assert configuration.valid? + # Not enough DB connections + configuration = SolidQueue::Configuration.new(workers: [ { queues: "background", threads: 50, polling_interval: 10 } ]) + assert_not configuration.valid? + assert_equal [ "Solid Queue is configured to use 52 threads but the database connection pool is 20. Increase it in `config/database.yml`" ], + configuration.errors.full_messages end private diff --git a/test/unit/supervisor_test.rb b/test/unit/supervisor_test.rb index 4be772e0..c430544a 100644 --- a/test/unit/supervisor_test.rb +++ b/test/unit/supervisor_test.rb @@ -41,22 +41,22 @@ class SupervisorTest < ActiveSupport::TestCase end test "start with empty configuration" do - pid, _out, err = run_supervisor_as_fork_with_captured_io(workers: [], dispatchers: []) + pid, _out, error = run_supervisor_as_fork_with_captured_io(workers: [], dispatchers: []) sleep(0.5) assert_no_registered_processes assert_not process_exists?(pid) - assert_match %r{No workers or processed configured. Exiting...}, err + assert_match %r{No processes configured}, error end - test "start with invalid configuration" do - pid, _out, err = run_supervisor_as_fork_with_captured_io(recurring_schedule_file: config_file_path(:recurring_with_invalid), skip_recurring: false) + test "start with invalid recurring tasks" do + pid, _out, error = run_supervisor_as_fork_with_captured_io(recurring_schedule_file: config_file_path(:recurring_with_invalid), skip_recurring: false) sleep(0.5) assert_no_registered_processes assert_not process_exists?(pid) - assert_match %r{Invalid processes configured}, err + assert_match %r{Invalid recurring tasks}, error end test "create and delete pidfile" do