diff --git a/base/task.jl b/base/task.jl index 5e4af6747f128..c1d3bec8deea9 100644 --- a/base/task.jl +++ b/base/task.jl @@ -921,30 +921,30 @@ function list_deletefirst!(W::IntrusiveLinkedListSynchronized{T}, t::T) where T end const StickyWorkqueue = IntrusiveLinkedListSynchronized{Task} -global Workqueues::Vector{StickyWorkqueue} = [StickyWorkqueue()] -const Workqueues_lock = Threads.SpinLock() -const Workqueue = Workqueues[1] # default work queue is thread 1 // TODO: deprecate this variable +global Workqueues::Memory{StickyWorkqueue} = Memory{StickyWorkqueue}([StickyWorkqueue()]) # TODO: Is the extra allocation here worth extra code +const Workqueue = (@atomic :acquire Base.Workqueues)[1] # default work queue is thread 1 // TODO: deprecate this variable function workqueue_for(tid::Int) - qs = Workqueues + qs = @atomic :acquire Base.Workqueues if length(qs) >= tid && isassigned(qs, tid) - return @inbounds qs[tid] + return @inbounds qs[tid] # This assumes that threads don't get deleted and that once an index in set + # all following Workqueue Memorys have the same queues at those indices end - # slow path to allocate it - @assert tid > 0 - l = Workqueues_lock - @lock l begin - qs = Workqueues - if length(qs) < tid - nt = Threads.maxthreadid() - @assert tid <= nt - global Workqueues = qs = copyto!(typeof(qs)(undef, length(qs) + nt - 1), qs) - end - if !isassigned(qs, tid) - @inbounds qs[tid] = StickyWorkqueue() + while length(qs) < tid + nt = Threads.maxthreadid() + @assert tid <= nt + new_qs = copyto!(typeof(qs)(undef, length(qs) + nt - 1), qs) + if (@atomicreplace :acquire_release :monotonic Base.Workqueues qs => new_qs).success + qs = new_qs + break + else + qs = @atomic :acquire Base.Workqueues end - return @inbounds qs[tid] end + if !isassigned(qs, tid) + @inbounds qs[tid] = StickyWorkqueue() + end + return @inbounds qs[tid] # This assumes that threads don't get deleted and that once an index in set end function enq_work(t::Task)