Skip to content

Commit

Permalink
Add Dataloader fiber_limit option
Browse files Browse the repository at this point in the history
  • Loading branch information
rmosolgo committed Oct 22, 2024
1 parent 062d7b7 commit 7d4bb6e
Show file tree
Hide file tree
Showing 4 changed files with 162 additions and 11 deletions.
41 changes: 31 additions & 10 deletions lib/graphql/dataloader.rb
Original file line number Diff line number Diff line change
Expand Up @@ -24,18 +24,23 @@ module GraphQL
#
class Dataloader
class << self
attr_accessor :default_nonblocking
attr_accessor :default_nonblocking, :default_fiber_limit
end

NonblockingDataloader = Class.new(self) { self.default_nonblocking = true }

def self.use(schema, nonblocking: nil)
schema.dataloader_class = if nonblocking
def self.use(schema, nonblocking: nil, fiber_limit: nil)
dataloader_class = if nonblocking
warn("`nonblocking: true` is deprecated from `GraphQL::Dataloader`, please use `GraphQL::Dataloader::AsyncDataloader` instead. Docs: https://graphql-ruby.org/dataloader/async_dataloader.")
NonblockingDataloader
Class.new(self) { self.default_nonblocking = true }
else
self
end

if fiber_limit
dataloader_class = Class.new(dataloader_class)
dataloader_class.default_fiber_limit = fiber_limit
end

schema.dataloader_class = dataloader_class
end

# Call the block with a Dataloader instance,
Expand All @@ -50,14 +55,18 @@ def self.with_dataloading(&block)
result
end

def initialize(nonblocking: self.class.default_nonblocking)
def initialize(nonblocking: self.class.default_nonblocking, fiber_limit: self.class.default_fiber_limit)
@source_cache = Hash.new { |h, k| h[k] = {} }
@pending_jobs = []
if !nonblocking.nil?
@nonblocking = nonblocking
end
@fiber_limit = fiber_limit
end

# @return [Integer, nil]
attr_reader :fiber_limit

def nonblocking?
@nonblocking
end
Expand Down Expand Up @@ -178,6 +187,7 @@ def run_isolated
end

def run
jobs_fiber_limit, total_fiber_limit = calculate_fiber_limit
job_fibers = []
next_job_fibers = []
source_fibers = []
Expand All @@ -187,7 +197,7 @@ def run
while first_pass || job_fibers.any?
first_pass = false

while (f = (job_fibers.shift || spawn_job_fiber))
while (f = (job_fibers.shift || (((next_job_fibers.size + job_fibers.size) < jobs_fiber_limit) && spawn_job_fiber)))
if f.alive?
finished = run_fiber(f)
if !finished
Expand All @@ -197,8 +207,8 @@ def run
end
join_queues(job_fibers, next_job_fibers)

while source_fibers.any? || @source_cache.each_value.any? { |group_sources| group_sources.each_value.any?(&:pending?) }
while (f = source_fibers.shift || spawn_source_fiber)
while (source_fibers.any? || @source_cache.each_value.any? { |group_sources| group_sources.each_value.any?(&:pending?) })
while (f = source_fibers.shift || (((job_fibers.size + source_fibers.size + next_source_fibers.size + next_job_fibers.size) < total_fiber_limit) && spawn_source_fiber))
if f.alive?
finished = run_fiber(f)
if !finished
Expand Down Expand Up @@ -242,6 +252,17 @@ def spawn_fiber

private

def calculate_fiber_limit
total_fiber_limit = @fiber_limit || Float::INFINITY
if total_fiber_limit < 4
raise ArgumentError, "Dataloader fiber limit is too low (#{total_fiber_limit}), it must be at least 4"
end
total_fiber_limit -= 1 # deduct one fiber for `manager`
# Deduct at least one fiber for sources
jobs_fiber_limit = total_fiber_limit - 2
return jobs_fiber_limit, total_fiber_limit
end

def join_queues(prev_queue, new_queue)
@nonblocking && Fiber.scheduler.run
prev_queue.concat(new_queue)
Expand Down
2 changes: 1 addition & 1 deletion lib/graphql/dataloader/source.rb
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ def sync(pending_result_keys)
while pending_result_keys.any? { |key| !@results.key?(key) }
iterations += 1
if iterations > MAX_ITERATIONS
raise "#{self.class}#sync tried #{MAX_ITERATIONS} times to load pending keys (#{pending_result_keys}), but they still weren't loaded. There is likely a circular dependency."
raise "#{self.class}#sync tried #{MAX_ITERATIONS} times to load pending keys (#{pending_result_keys}), but they still weren't loaded. There is likely a circular dependency#{@dataloader.fiber_limit ? " or `fiber_limit: #{@dataloader.fiber_limit}` is set too low" : ""}."
end
@dataloader.yield
end
Expand Down
8 changes: 8 additions & 0 deletions spec/graphql/dataloader/source_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,14 @@ def fetch(keys)
end
expected_message = "FailsToLoadSource#sync tried 1000 times to load pending keys ([1]), but they still weren't loaded. There is likely a circular dependency."
assert_equal expected_message, err.message

dl = GraphQL::Dataloader.new(fiber_limit: 10000)
dl.append_job { dl.with(FailsToLoadSource).load(1) }
err = assert_raises RuntimeError do
dl.run
end
expected_message = "FailsToLoadSource#sync tried 1000 times to load pending keys ([1]), but they still weren't loaded. There is likely a circular dependency or `fiber_limit: 10000` is set too low."
assert_equal expected_message, err.message
end

it "is pending when waiting for false and nil" do
Expand Down
122 changes: 122 additions & 0 deletions spec/graphql/dataloader_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -515,6 +515,51 @@ class Query < GraphQL::Schema::Object
end

module DataloaderAssertions
module FiberCounting
class << self
attr_accessor :starting_fiber_count, :last_spawn_fiber_count, :last_max_fiber_count

def current_fiber_count
count_all_fibers - starting_fiber_count
end

def count_all_fibers
GC.start
ObjectSpace.each_object(Fiber).count
end
end

def initialize(*args, **kwargs, &block)
super
FiberCounting.starting_fiber_count = FiberCounting.count_all_fibers
FiberCounting.last_max_fiber_count = 0
FiberCounting.last_spawn_fiber_count = 0
end

def spawn_fiber
result = super
update_fiber_counts
result
end

def spawn_source_task(parent_task, condition)
result = super
update_fiber_counts
result
end

private

def update_fiber_counts
FiberCounting.last_spawn_fiber_count += 1
current_count = FiberCounting.current_fiber_count
if current_count > FiberCounting.last_max_fiber_count
FiberCounting.last_max_fiber_count = current_count
end
end
end


def self.included(child_class)
child_class.class_eval do
let(:schema) { make_schema_from(FiberSchema) }
Expand Down Expand Up @@ -1038,6 +1083,83 @@ def self.included(child_class)
response = parts_schema.execute(query).to_h
assert_equal [4, 4, 4, 4], response["data"]["manufacturers"].map { |parts_obj| parts_obj["parts"].size }
end

describe "fiber_limit" do
focus
it "respects a configured fiber_limit" do
query_str = <<-GRAPHQL
{
recipes {
ingredients {
name
}
}
nestedIngredient(id: 2) {
name
}
keyIngredient(id: 4) {
name
}
commonIngredientsWithLoad(recipe1Id: 5, recipe2Id: 6) {
name
}
}
GRAPHQL

fiber_counting_dataloader_class = Class.new(schema.dataloader_class)
fiber_counting_dataloader_class.include(FiberCounting)

# TODO figure out if these counts are doing their jobs
_res = schema.execute(query_str, context: { dataloader: fiber_counting_dataloader_class.new })
assert_equal 12, FiberCounting.last_spawn_fiber_count
assert_equal 9, FiberCounting.last_max_fiber_count

res = schema.execute(query_str, context: { dataloader: fiber_counting_dataloader_class.new(fiber_limit: 4) })
assert_equal 4, res.context.dataloader.fiber_limit
assert_equal 14, FiberCounting.last_spawn_fiber_count
assert_equal 4, FiberCounting.last_max_fiber_count

res = schema.execute(query_str, context: { dataloader: fiber_counting_dataloader_class.new(fiber_limit: 6) })
assert_equal 6, res.context.dataloader.fiber_limit
assert_equal 10, FiberCounting.last_spawn_fiber_count
assert_equal 6, FiberCounting.last_max_fiber_count
end

it "accepts a default fiber_limit config" do
schema = Class.new(FiberSchema) do
use GraphQL::Dataloader, fiber_limit: 4
end
query_str = <<-GRAPHQL
{
recipes {
ingredients {
name
}
}
nestedIngredient(id: 2) {
name
}
keyIngredient(id: 4) {
name
}
commonIngredientsWithLoad(recipe1Id: 5, recipe2Id: 6) {
name
}
}
GRAPHQL
res = schema.execute(query_str)
assert_equal 4, res.context.dataloader.fiber_limit
assert_nil res["errors"]
end

it "requires at least three fibers" do
dl = GraphQL::Dataloader.new(fiber_limit: 2)
err = assert_raises ArgumentError do
dl.run
end
assert_equal "Dataloader fiber limit is too low (2), it must be at least 4", err.message
end
end
end
end
end
Expand Down

0 comments on commit 7d4bb6e

Please sign in to comment.