Skip to content

Commit

Permalink
Better pruning for RubyThreadPoolExecutor
Browse files Browse the repository at this point in the history
  • Loading branch information
joshuay03 committed Feb 9, 2025
1 parent 6e2bd8a commit b6e5656
Show file tree
Hide file tree
Showing 8 changed files with 128 additions and 107 deletions.
44 changes: 44 additions & 0 deletions lib/concurrent-ruby/concurrent/collection/ruby_timeout_queue.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
module Concurrent
module Collection
# @!visibility private
# @!macro ruby_timeout_queue
class RubyTimeoutQueue < ::Queue
def initialize(*args)
if RUBY_VERSION >= '3.2'
raise 'RubyTimeoutQueue is not needed on Ruby 3.2 or later, use ::Queue instead'
end

super(*args)

@mutex = Mutex.new
@cond_var = ConditionVariable.new
end

def push(obj)
super(obj).tap { @mutex.synchronize { @cond_var.signal } }
end
alias_method :enq, :push
alias_method :<<, :push

def pop(non_block = false, timeout: nil)
if non_block && timeout
raise ArgumentError, "can't set a timeout if non_block is enabled"
end

if non_block
super(true)
elsif timeout && empty?
if @mutex.synchronize { @cond_var.wait(@mutex, timeout) }
super(false)
else
nil
end
else
super(false)
end
end
alias_method :deq, :pop
alias_method :shift, :pop
end
end
end
18 changes: 18 additions & 0 deletions lib/concurrent-ruby/concurrent/collection/timeout_queue.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
module Concurrent
module Collection
# @!visibility private
# @!macro internal_implementation_note
TimeoutQueueImplementation = if RUBY_VERSION >= '3.2'
::Queue
else
require 'concurrent/collection/ruby_timeout_queue'
RubyTimeoutQueue
end
private_constant :TimeoutQueueImplementation

# @!visibility private
# @!macro timeout_queue
class TimeoutQueue < TimeoutQueueImplementation
end
end
end
14 changes: 0 additions & 14 deletions lib/concurrent-ruby/concurrent/executor/fixed_thread_pool.rb
Original file line number Diff line number Diff line change
Expand Up @@ -75,17 +75,6 @@ module Concurrent
# @return [Integer] Number of tasks that may be enqueued before reaching `max_queue` and rejecting
# new tasks. A value of -1 indicates that the queue may grow without bound.

# @!macro thread_pool_executor_method_prune_pool
# Prune the thread pool of unneeded threads
#
# What is being pruned is controlled by the min_threads and idletime
# parameters passed at pool creation time
#
# This is a no-op on some pool implementation (e.g. the Java one). The Ruby
# pool will auto-prune each time a new job is posted. You will need to call
# this method explicitly in case your application post jobs in bursts (a
# lot of jobs and then nothing for long periods)

# @!macro thread_pool_executor_public_api
#
# @!macro abstract_executor_service_public_api
Expand Down Expand Up @@ -122,9 +111,6 @@ module Concurrent
#
# @!method can_overflow?
# @!macro executor_service_method_can_overflow_question
#
# @!method prune_pool
# @!macro thread_pool_executor_method_prune_pool



Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,10 +98,6 @@ def running?
super && !@executor.isTerminating
end

# @!macro thread_pool_executor_method_prune_pool
def prune_pool
end

private

def ns_initialize(opts)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
require 'concurrent/concern/logging'
require 'concurrent/executor/ruby_executor_service'
require 'concurrent/utility/monotonic_time'
require 'concurrent/collection/timeout_queue'

module Concurrent

Expand Down Expand Up @@ -104,6 +105,11 @@ def ready_worker(worker, last_message)
synchronize { ns_ready_worker worker, last_message }
end

# @!visibility private
def remove_ready_worker(worker)
synchronize { ns_remove_ready_worker worker }
end

# @!visibility private
def worker_died(worker)
synchronize { ns_worker_died worker }
Expand All @@ -114,9 +120,9 @@ def worker_task_completed
synchronize { @completed_task_count += 1 }
end

# @!macro thread_pool_executor_method_prune_pool
def prune_pool
synchronize { ns_prune_pool }
# @!visibility private
def prunable_capacity
synchronize { ns_prunable_capacity }
end

private
Expand Down Expand Up @@ -146,9 +152,6 @@ def ns_initialize(opts)
@largest_length = 0
@workers_counter = 0
@ruby_pid = $$ # detects if Ruby has forked

@gc_interval = opts.fetch(:gc_interval, @idletime / 2.0).to_i # undocumented
@next_gc_time = Concurrent.monotonic_time + @gc_interval
end

# @!visibility private
Expand All @@ -162,12 +165,10 @@ def ns_execute(*args, &task)

if ns_assign_worker(*args, &task) || ns_enqueue(*args, &task)
@scheduled_task_count += 1
nil
else
return fallback_action(*args, &task)
fallback_action(*args, &task)
end

ns_prune_pool if @next_gc_time < Concurrent.monotonic_time
nil
end

# @!visibility private
Expand Down Expand Up @@ -218,7 +219,7 @@ def ns_assign_worker(*args, &task)
# @!visibility private
def ns_enqueue(*args, &task)
return false if @synchronous

if !ns_limited_queue? || @queue.size < @max_queue
@queue << [task, args]
true
Expand Down Expand Up @@ -265,7 +266,7 @@ def ns_ready_worker(worker, last_message, success = true)
end
end

# removes a worker which is not in not tracked in @ready
# removes a worker which is not tracked in @ready
#
# @!visibility private
def ns_remove_busy_worker(worker)
Expand All @@ -274,23 +275,13 @@ def ns_remove_busy_worker(worker)
true
end

# try oldest worker if it is idle for enough time, it's returned back at the start
#
# @!visibility private
def ns_prune_pool
now = Concurrent.monotonic_time
stopped_workers = 0
while !@ready.empty? && (@pool.size - stopped_workers > @min_length)
worker, last_message = @ready.first
if now - last_message > self.idletime
stopped_workers += 1
@ready.shift
worker << :stop
else break
end
end
def ns_remove_ready_worker(worker)
@ready.delete_if { |rw, _| rw == worker }
true
end

@next_gc_time = Concurrent.monotonic_time + @gc_interval
def ns_prunable_capacity
[@pool.size - @min_length, @ready.size].min
end

def ns_reset_if_forked
Expand All @@ -312,7 +303,7 @@ class Worker

def initialize(pool, id)
# instance variables accessed only under pool's lock so no need to sync here again
@queue = Queue.new
@queue = Collection::TimeoutQueue.new
@pool = pool
@thread = create_worker @queue, pool, pool.idletime

Expand All @@ -338,17 +329,29 @@ def kill
def create_worker(queue, pool, idletime)
Thread.new(queue, pool, idletime) do |my_queue, my_pool, my_idletime|
catch(:stop) do
loop do
not_prunable = false

case message = my_queue.pop
loop do
timeout = not_prunable ? nil : my_idletime
case message = my_queue.pop(timeout: timeout)
when nil
unless my_pool.prunable_capacity.positive?
not_prunable = true
next
end

my_pool.remove_ready_worker(self)
my_pool.remove_busy_worker(self)
throw :stop
when :stop
my_pool.remove_ready_worker(self)
my_pool.remove_busy_worker(self)
throw :stop

else
task, args = message
run_task my_pool, task, args
my_pool.ready_worker(self, Concurrent.monotonic_time)
not_prunable = false
end
end
end
Expand Down
29 changes: 14 additions & 15 deletions spec/concurrent/executor/cached_thread_pool_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -152,15 +152,12 @@ module Concurrent

context 'garbage collection' do

subject { described_class.new(idletime: 0.1, max_threads: 2, gc_interval: 0) }
subject { described_class.new(idletime: 0.1, max_threads: 2) }

it 'removes from pool any thread that has been idle too long' do
latch = Concurrent::CountDownLatch.new(4)
4.times { subject.post { sleep 0.1; latch.count_down } }
expect(latch.wait(1)).to be true
sleep 0.2
subject.post {}
sleep 0.2
expect(subject.length).to be < 4
end

Expand Down Expand Up @@ -197,25 +194,27 @@ module Concurrent
expect(subject.length).to be >= 5
3.times { subject << proc { sleep(1) } }
sleep(0.1)
expect(subject.length).to be >= 5
expect(subject.length).to be >= 3
end
end
end

context 'stress' do
configurations = [
{ min_threads: 2,
max_threads: ThreadPoolExecutor::DEFAULT_MAX_POOL_SIZE,
idletime: 0.1, # 1 minute
max_queue: 0, # unlimited
{
min_threads: 2,
max_threads: ThreadPoolExecutor::DEFAULT_MAX_POOL_SIZE,
idletime: 60, # 1 minute
max_queue: 0, # unlimited
fallback_policy: :caller_runs, # shouldn't matter -- 0 max queue
gc_interval: 0.1 },
{ min_threads: 2,
max_threads: 4,
idletime: 0.1, # 1 minute
max_queue: 0, # unlimited
},
{
min_threads: 2,
max_threads: 4,
idletime: 60, # 1 minute
max_queue: 0, # unlimited
fallback_policy: :caller_runs, # shouldn't matter -- 0 max queue
gc_interval: 0.1 }
}
]

configurations.each do |config|
Expand Down
7 changes: 0 additions & 7 deletions spec/concurrent/executor/java_thread_pool_executor_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,6 @@ module Concurrent

it_should_behave_like :thread_pool_executor

context :prune do
it "is a no-op, pruning is handled by the JVM" do
executor = JavaThreadPoolExecutor.new
executor.prune_pool
end
end

context '#overload_policy' do

specify ':abort maps to AbortPolicy' do
Expand Down
Loading

0 comments on commit b6e5656

Please sign in to comment.