Skip to content

Commit

Permalink
Use atomics for the StickyWorkqueue array instead of a lock
Browse files Browse the repository at this point in the history
  • Loading branch information
gbaraldi committed Aug 16, 2024
1 parent 5a633b7 commit 2e2b833
Showing 1 changed file with 18 additions and 18 deletions.
36 changes: 18 additions & 18 deletions base/task.jl
Original file line number Diff line number Diff line change
Expand Up @@ -921,30 +921,30 @@ function list_deletefirst!(W::IntrusiveLinkedListSynchronized{T}, t::T) where T
end

const StickyWorkqueue = IntrusiveLinkedListSynchronized{Task}
global Workqueues::Vector{StickyWorkqueue} = [StickyWorkqueue()]
const Workqueues_lock = Threads.SpinLock()
const Workqueue = Workqueues[1] # default work queue is thread 1 // TODO: deprecate this variable
global Workqueues::Memory{StickyWorkqueue} = Memory{StickyWorkqueue}([StickyWorkqueue()]) # TODO: Is the extra allocation here worth extra code
const Workqueue = (@atomic :acquire Base.Workqueues)[1] # default work queue is thread 1 // TODO: deprecate this variable

function workqueue_for(tid::Int)
qs = Workqueues
qs = @atomic :acquire Base.Workqueues
if length(qs) >= tid && isassigned(qs, tid)
return @inbounds qs[tid]
return @inbounds qs[tid] # This assumes that threads don't get deleted and that once an index in set
# all following Workqueue Memorys have the same queues at those indices
end
# slow path to allocate it
@assert tid > 0
l = Workqueues_lock
@lock l begin
qs = Workqueues
if length(qs) < tid
nt = Threads.maxthreadid()
@assert tid <= nt
global Workqueues = qs = copyto!(typeof(qs)(undef, length(qs) + nt - 1), qs)
end
if !isassigned(qs, tid)
@inbounds qs[tid] = StickyWorkqueue()
while length(qs) < tid
nt = Threads.maxthreadid()
@assert tid <= nt
new_qs = copyto!(typeof(qs)(undef, length(qs) + nt - 1), qs)
if (@atomicreplace :acquire_release :monotonic Base.Workqueues qs => new_qs).success
qs = new_qs
break
else
qs = @atomic :acquire Base.Workqueues
end
return @inbounds qs[tid]
end
if !isassigned(qs, tid)
@inbounds qs[tid] = StickyWorkqueue()
end
return @inbounds qs[tid] # This assumes that threads don't get deleted and that once an index in set
end

function enq_work(t::Task)
Expand Down

0 comments on commit 2e2b833

Please sign in to comment.