Skip to content

Commit

Permalink
Use Crystal::PointerLinkedList instead of Deque in Mutex (#15330)
Browse files Browse the repository at this point in the history
Extracts the undocumented `Fiber::Waiting` struct from `WaitGroup` that acts as
the node in a linked list, replacing a `Deque` to store the waiting fibers.

The flat array doesn't have much impact on performance: we only reach the head
or the tail once to dequeue/dequeue one fiber at a time. This however spares a
number of GC allocations since the Deque has to be allocated plus its buffer
that will have to be reallocated sometimes (and will only ever grow, never shrink).
  • Loading branch information
ysbaddaden authored Jan 15, 2025
1 parent 2458e35 commit 5b20837
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 19 deletions.
15 changes: 15 additions & 0 deletions src/fiber/pointer_linked_list_node.cr
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
require "crystal/pointer_linked_list"

class Fiber
# :nodoc:
struct PointerLinkedListNode
include Crystal::PointerLinkedList::Node

def initialize(@fiber : Fiber)
end

def enqueue : Nil
@fiber.enqueue
end
end
end
14 changes: 9 additions & 5 deletions src/mutex.cr
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
require "fiber/pointer_linked_list_node"
require "crystal/spin_lock"

# A fiber-safe mutex.
Expand All @@ -22,7 +23,7 @@ class Mutex
@state = Atomic(Int32).new(UNLOCKED)
@mutex_fiber : Fiber?
@lock_count = 0
@queue = Deque(Fiber).new
@queue = Crystal::PointerLinkedList(Fiber::PointerLinkedListNode).new
@queue_count = Atomic(Int32).new(0)
@lock = Crystal::SpinLock.new

Expand Down Expand Up @@ -59,6 +60,8 @@ class Mutex
loop do
break if try_lock

waiting = Fiber::PointerLinkedListNode.new(Fiber.current)

@lock.sync do
@queue_count.add(1)

Expand All @@ -71,7 +74,7 @@ class Mutex
end
end

@queue.push Fiber.current
@queue.push pointerof(waiting)
end

Fiber.suspend
Expand Down Expand Up @@ -116,17 +119,18 @@ class Mutex
return
end

fiber = nil
waiting = nil
@lock.sync do
if @queue_count.get == 0
return
end

if fiber = @queue.shift?
if waiting = @queue.shift?
@queue_count.add(-1)
end
end
fiber.enqueue if fiber

waiting.try(&.value.enqueue)
end

def synchronize(&)
Expand Down
17 changes: 3 additions & 14 deletions src/wait_group.cr
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
require "fiber"
require "fiber/pointer_linked_list_node"
require "crystal/spin_lock"
require "crystal/pointer_linked_list"

# Suspend execution until a collection of fibers are finished.
#
Expand Down Expand Up @@ -31,17 +31,6 @@ require "crystal/pointer_linked_list"
# wg.wait
# ```
class WaitGroup
private struct Waiting
include Crystal::PointerLinkedList::Node

def initialize(@fiber : Fiber)
end

def enqueue : Nil
@fiber.enqueue
end
end

# Yields a `WaitGroup` instance and waits at the end of the block for all of
# the work enqueued inside it to complete.
#
Expand All @@ -59,7 +48,7 @@ class WaitGroup
end

def initialize(n : Int32 = 0)
@waiting = Crystal::PointerLinkedList(Waiting).new
@waiting = Crystal::PointerLinkedList(Fiber::PointerLinkedListNode).new
@lock = Crystal::SpinLock.new
@counter = Atomic(Int32).new(n)
end
Expand Down Expand Up @@ -128,7 +117,7 @@ class WaitGroup
def wait : Nil
return if done?

waiting = Waiting.new(Fiber.current)
waiting = Fiber::PointerLinkedListNode.new(Fiber.current)

@lock.sync do
# must check again to avoid a race condition where #done may have
Expand Down

0 comments on commit 5b20837

Please sign in to comment.