diff --git a/lib/concurrent-ruby/concurrent/collection/ruby_timeout_queue.rb b/lib/concurrent-ruby/concurrent/collection/ruby_timeout_queue.rb new file mode 100644 index 000000000..bebc00fb9 --- /dev/null +++ b/lib/concurrent-ruby/concurrent/collection/ruby_timeout_queue.rb @@ -0,0 +1,53 @@ +module Concurrent + module Collection + # @!visibility private + # @!macro ruby_timeout_queue + class RubyTimeoutQueue < ::Queue + def initialize(*args) + if RUBY_VERSION >= '3.2' + raise 'RubyTimeoutQueue is not needed on Ruby 3.2 or later, use ::Queue instead' + end + + super(*args) + + @mutex = Mutex.new + @cond_var = ConditionVariable.new + end + + def push(obj) + super(obj).tap { @mutex.synchronize { @cond_var.signal } } + end + alias_method :enq, :push + alias_method :<<, :push + + def pop(non_block = false, timeout: nil) + if non_block && timeout + raise ArgumentError, "can't set a timeout if non_block is enabled" + end + + if non_block + super(true) + elsif timeout && empty? && timed_out?(timeout) { @mutex.synchronize { @cond_var.wait(@mutex, timeout) } } + nil + else + super(false) + end + end + alias_method :deq, :pop + alias_method :shift, :pop + + private + + def timed_out?(timeout) + # https://github.com/ruby/ruby/pull/4256 + if RUBY_VERSION >= '3.1' + yield.nil? + else + deadline = Time.now + timeout + yield + Time.now >= deadline + end + end + end + end +end diff --git a/lib/concurrent-ruby/concurrent/collection/timeout_queue.rb b/lib/concurrent-ruby/concurrent/collection/timeout_queue.rb new file mode 100644 index 000000000..7048c4b68 --- /dev/null +++ b/lib/concurrent-ruby/concurrent/collection/timeout_queue.rb @@ -0,0 +1,18 @@ +module Concurrent + module Collection + # @!visibility private + # @!macro internal_implementation_note + TimeoutQueueImplementation = if RUBY_VERSION >= '3.2' + ::Queue + else + require 'concurrent/collection/ruby_timeout_queue' + RubyTimeoutQueue + end + private_constant :TimeoutQueueImplementation + + # @!visibility private + # @!macro timeout_queue + class TimeoutQueue < TimeoutQueueImplementation + end + end +end diff --git a/lib/concurrent-ruby/concurrent/executor/fixed_thread_pool.rb b/lib/concurrent-ruby/concurrent/executor/fixed_thread_pool.rb index 8324c0673..eb9690208 100644 --- a/lib/concurrent-ruby/concurrent/executor/fixed_thread_pool.rb +++ b/lib/concurrent-ruby/concurrent/executor/fixed_thread_pool.rb @@ -75,17 +75,6 @@ module Concurrent # @return [Integer] Number of tasks that may be enqueued before reaching `max_queue` and rejecting # new tasks. A value of -1 indicates that the queue may grow without bound. - # @!macro thread_pool_executor_method_prune_pool - # Prune the thread pool of unneeded threads - # - # What is being pruned is controlled by the min_threads and idletime - # parameters passed at pool creation time - # - # This is a no-op on some pool implementation (e.g. the Java one). The Ruby - # pool will auto-prune each time a new job is posted. You will need to call - # this method explicitly in case your application post jobs in bursts (a - # lot of jobs and then nothing for long periods) - # @!macro thread_pool_executor_public_api # # @!macro abstract_executor_service_public_api @@ -122,9 +111,6 @@ module Concurrent # # @!method can_overflow? # @!macro executor_service_method_can_overflow_question - # - # @!method prune_pool - # @!macro thread_pool_executor_method_prune_pool diff --git a/lib/concurrent-ruby/concurrent/executor/java_thread_pool_executor.rb b/lib/concurrent-ruby/concurrent/executor/java_thread_pool_executor.rb index 598a5f91f..4fcf3ace1 100644 --- a/lib/concurrent-ruby/concurrent/executor/java_thread_pool_executor.rb +++ b/lib/concurrent-ruby/concurrent/executor/java_thread_pool_executor.rb @@ -98,10 +98,6 @@ def running? super && !@executor.isTerminating end - # @!macro thread_pool_executor_method_prune_pool - def prune_pool - end - private def ns_initialize(opts) diff --git a/lib/concurrent-ruby/concurrent/executor/ruby_thread_pool_executor.rb b/lib/concurrent-ruby/concurrent/executor/ruby_thread_pool_executor.rb index 9375acf38..cea7a0dd0 100644 --- a/lib/concurrent-ruby/concurrent/executor/ruby_thread_pool_executor.rb +++ b/lib/concurrent-ruby/concurrent/executor/ruby_thread_pool_executor.rb @@ -3,6 +3,7 @@ require 'concurrent/concern/logging' require 'concurrent/executor/ruby_executor_service' require 'concurrent/utility/monotonic_time' +require 'concurrent/collection/timeout_queue' module Concurrent @@ -104,6 +105,11 @@ def ready_worker(worker, last_message) synchronize { ns_ready_worker worker, last_message } end + # @!visibility private + def remove_ready_worker(worker) + synchronize { ns_remove_ready_worker worker } + end + # @!visibility private def worker_died(worker) synchronize { ns_worker_died worker } @@ -114,9 +120,9 @@ def worker_task_completed synchronize { @completed_task_count += 1 } end - # @!macro thread_pool_executor_method_prune_pool - def prune_pool - synchronize { ns_prune_pool } + # @!visibility private + def prunable_capacity + synchronize { ns_prunable_capacity } end private @@ -146,9 +152,6 @@ def ns_initialize(opts) @largest_length = 0 @workers_counter = 0 @ruby_pid = $$ # detects if Ruby has forked - - @gc_interval = opts.fetch(:gc_interval, @idletime / 2.0).to_i # undocumented - @next_gc_time = Concurrent.monotonic_time + @gc_interval end # @!visibility private @@ -162,12 +165,10 @@ def ns_execute(*args, &task) if ns_assign_worker(*args, &task) || ns_enqueue(*args, &task) @scheduled_task_count += 1 + nil else - return fallback_action(*args, &task) + fallback_action(*args, &task) end - - ns_prune_pool if @next_gc_time < Concurrent.monotonic_time - nil end # @!visibility private @@ -218,7 +219,7 @@ def ns_assign_worker(*args, &task) # @!visibility private def ns_enqueue(*args, &task) return false if @synchronous - + if !ns_limited_queue? || @queue.size < @max_queue @queue << [task, args] true @@ -265,7 +266,7 @@ def ns_ready_worker(worker, last_message, success = true) end end - # removes a worker which is not in not tracked in @ready + # removes a worker which is not tracked in @ready # # @!visibility private def ns_remove_busy_worker(worker) @@ -274,23 +275,13 @@ def ns_remove_busy_worker(worker) true end - # try oldest worker if it is idle for enough time, it's returned back at the start - # - # @!visibility private - def ns_prune_pool - now = Concurrent.monotonic_time - stopped_workers = 0 - while !@ready.empty? && (@pool.size - stopped_workers > @min_length) - worker, last_message = @ready.first - if now - last_message > self.idletime - stopped_workers += 1 - @ready.shift - worker << :stop - else break - end - end + def ns_remove_ready_worker(worker) + @ready.delete_if { |rw, _| rw == worker } + true + end - @next_gc_time = Concurrent.monotonic_time + @gc_interval + def ns_prunable_capacity + [@pool.size - @min_length, @ready.size].min end def ns_reset_if_forked @@ -312,7 +303,7 @@ class Worker def initialize(pool, id) # instance variables accessed only under pool's lock so no need to sync here again - @queue = Queue.new + @queue = Collection::TimeoutQueue.new @pool = pool @thread = create_worker @queue, pool, pool.idletime @@ -338,17 +329,29 @@ def kill def create_worker(queue, pool, idletime) Thread.new(queue, pool, idletime) do |my_queue, my_pool, my_idletime| catch(:stop) do - loop do + not_prunable = false - case message = my_queue.pop + loop do + timeout = not_prunable ? nil : my_idletime + case message = my_queue.pop(timeout: timeout) + when nil + unless my_pool.prunable_capacity.positive? + not_prunable = true + next + end + + my_pool.remove_ready_worker(self) + my_pool.remove_busy_worker(self) + throw :stop when :stop + my_pool.remove_ready_worker(self) my_pool.remove_busy_worker(self) throw :stop - else task, args = message run_task my_pool, task, args my_pool.ready_worker(self, Concurrent.monotonic_time) + not_prunable = false end end end diff --git a/spec/concurrent/executor/cached_thread_pool_spec.rb b/spec/concurrent/executor/cached_thread_pool_spec.rb index 513873b92..dbc3e2f78 100644 --- a/spec/concurrent/executor/cached_thread_pool_spec.rb +++ b/spec/concurrent/executor/cached_thread_pool_spec.rb @@ -152,15 +152,13 @@ module Concurrent context 'garbage collection' do - subject { described_class.new(idletime: 0.1, max_threads: 2, gc_interval: 0) } + subject { described_class.new(idletime: 0.1, max_threads: 2) } it 'removes from pool any thread that has been idle too long' do latch = Concurrent::CountDownLatch.new(4) 4.times { subject.post { sleep 0.1; latch.count_down } } + sleep 10 expect(latch.wait(1)).to be true - sleep 0.2 - subject.post {} - sleep 0.2 expect(subject.length).to be < 4 end @@ -197,25 +195,27 @@ module Concurrent expect(subject.length).to be >= 5 3.times { subject << proc { sleep(1) } } sleep(0.1) - expect(subject.length).to be >= 5 + expect(subject.length).to be >= 3 end end end context 'stress' do configurations = [ - { min_threads: 2, - max_threads: ThreadPoolExecutor::DEFAULT_MAX_POOL_SIZE, - idletime: 0.1, # 1 minute - max_queue: 0, # unlimited + { + min_threads: 2, + max_threads: ThreadPoolExecutor::DEFAULT_MAX_POOL_SIZE, + idletime: 60, # 1 minute + max_queue: 0, # unlimited fallback_policy: :caller_runs, # shouldn't matter -- 0 max queue - gc_interval: 0.1 }, - { min_threads: 2, - max_threads: 4, - idletime: 0.1, # 1 minute - max_queue: 0, # unlimited + }, + { + min_threads: 2, + max_threads: 4, + idletime: 60, # 1 minute + max_queue: 0, # unlimited fallback_policy: :caller_runs, # shouldn't matter -- 0 max queue - gc_interval: 0.1 } + } ] configurations.each do |config| diff --git a/spec/concurrent/executor/java_thread_pool_executor_spec.rb b/spec/concurrent/executor/java_thread_pool_executor_spec.rb index 64d445441..4d447317b 100644 --- a/spec/concurrent/executor/java_thread_pool_executor_spec.rb +++ b/spec/concurrent/executor/java_thread_pool_executor_spec.rb @@ -26,13 +26,6 @@ module Concurrent it_should_behave_like :thread_pool_executor - context :prune do - it "is a no-op, pruning is handled by the JVM" do - executor = JavaThreadPoolExecutor.new - executor.prune_pool - end - end - context '#overload_policy' do specify ':abort maps to AbortPolicy' do diff --git a/spec/concurrent/executor/ruby_thread_pool_executor_spec.rb b/spec/concurrent/executor/ruby_thread_pool_executor_spec.rb index bf328a7c6..54f736415 100644 --- a/spec/concurrent/executor/ruby_thread_pool_executor_spec.rb +++ b/spec/concurrent/executor/ruby_thread_pool_executor_spec.rb @@ -56,68 +56,49 @@ def wakeup_thread_group(group) end before(:each) do - @now = Concurrent.monotonic_time - allow(Concurrent).to receive(:monotonic_time) { @now } - @group1 = prepare_thread_group(5) @group2 = prepare_thread_group(5) end def eventually(mutex: nil, timeout: 5, &block) - start = Time.now - while Time.now - start < timeout - begin - if mutex - mutex.synchronize do - return yield - end - else + start = Time.now + while Time.now - start < timeout + begin + if mutex + mutex.synchronize do return yield end - rescue Exception => last_failure + else + return yield end - Thread.pass + rescue Exception => last_failure end - raise last_failure - end - - it "triggers pruning when posting work if the last prune happened more than gc_interval ago" do - wakeup_thread_group(@group1) - @now += 6 - wakeup_thread_group(@group2) - subject.post { } - - eventually { expect(@group1.threads).to all(have_attributes(status: false)) } - eventually { expect(@group2.threads).to all(have_attributes(status: 'sleep')) } + Thread.pass + end + raise last_failure end - it "does not trigger pruning when posting work if the last prune happened less than gc_interval ago" do + it "triggers pruning if the thread idletimes have elapsed" do wakeup_thread_group(@group1) - @now += 3 - subject.prune_pool - @now += 3 + sleep RUBY_VERSION >= '3.2' ? 6 : 8 # RubyTimeoutQueue overhead wakeup_thread_group(@group2) - subject.post { } eventually { expect(@group1.threads).to all(have_attributes(status: false)) } eventually { expect(@group2.threads).to all(have_attributes(status: 'sleep')) } end - it "reclaims threads that have been idle for more than idletime seconds" do + it "does not trigger pruning if the thread idletimes have not elapsed " do wakeup_thread_group(@group1) - @now += 6 wakeup_thread_group(@group2) - subject.prune_pool - eventually { expect(@group1.threads).to all(have_attributes(status: false)) } + eventually { expect(@group1.threads).to all(have_attributes(status: 'sleep')) } eventually { expect(@group2.threads).to all(have_attributes(status: 'sleep')) } end it "keeps at least min_length workers" do wakeup_thread_group(@group1) wakeup_thread_group(@group2) - @now += 12 - subject.prune_pool + sleep RUBY_VERSION >= '3.2' ? 6 : 8 # RubyTimeoutQueue overhead all_threads = @group1.threads + @group2.threads eventually do finished_threads = all_threads.find_all { |t| !t.status }