From e6ac5feedb62bafd17315c5667d139c2dfff68e8 Mon Sep 17 00:00:00 2001 From: Gabriel Baraldi Date: Tue, 20 Aug 2024 17:32:56 -0300 Subject: [PATCH] Revert changes except the switch to a Memory --- base/task.jl | 36 +++++++++++++++++------------------- 1 file changed, 17 insertions(+), 19 deletions(-) diff --git a/base/task.jl b/base/task.jl index f3c7ad90219b5..721d8b929dc91 100644 --- a/base/task.jl +++ b/base/task.jl @@ -922,31 +922,29 @@ end const StickyWorkqueue = IntrusiveLinkedListSynchronized{Task} global Workqueues::Memory{StickyWorkqueue} = Memory{StickyWorkqueue}([StickyWorkqueue()]) -const Workqueue = (@atomic :acquire Base.Workqueues)[1] # default work queue is thread 1 // TODO: deprecate this variable +const Workqueues_lock = Threads.SpinLock() +const Workqueue = Workqueues[1] # default work queue is thread 1 // TODO: deprecate this variable function workqueue_for(tid::Int) - qs = @atomic :acquire Base.Workqueues + qs = Workqueues if length(qs) >= tid && isassigned(qs, tid) - return @inbounds qs[tid] # This assumes that threads don't get deleted and that once an index is set - # all following Workqueue Memorys have the same queues at those indices + return @inbounds qs[tid] end - while length(qs) < tid - # slow path to allocate it - nt = Threads.maxthreadid() - @assert tid <= nt - new_qs = copyto!(typeof(qs)(undef, length(qs) + nt - 1), qs) # This overallocates by nt-1 - xchg = @atomicreplace :acquire_release :acquire Base.Workqueues qs => new_qs - if xchg.success - qs = new_qs - break - else - qs = xchg.old + # slow path to allocate it + @assert tid > 0 + l = Workqueues_lock + @lock l begin + qs = Workqueues + if length(qs) < tid + nt = Threads.maxthreadid() + @assert tid <= nt + global Workqueues = qs = copyto!(typeof(qs)(undef, length(qs) + nt - 1), qs) end + if !isassigned(qs, tid) + @inbounds qs[tid] = StickyWorkqueue() + end + return @inbounds qs[tid] end - if !isassigned(qs, tid) - @inbounds qs[tid] = StickyWorkqueue() - end - return @inbounds qs[tid] end function enq_work(t::Task)