diff --git a/base/Base.jl b/base/Base.jl index 081426fa94d67..ae621e2e4c55c 100644 --- a/base/Base.jl +++ b/base/Base.jl @@ -417,7 +417,7 @@ const liblapack_name = libblas_name # Note that `atomics.jl` here should be deprecated Core.eval(Threads, :(include("atomics.jl"))) include("channels.jl") -include("partr.jl") +include("schedulers/scheduler.jl") include("task.jl") include("threads_overloads.jl") include("weakkeydict.jl") diff --git a/base/schedulers/CDLL.jl b/base/schedulers/CDLL.jl new file mode 100644 index 0000000000000..f084ec521a60d --- /dev/null +++ b/base/schedulers/CDLL.jl @@ -0,0 +1,266 @@ + +module ConcurrentList #Concurrent Doubly Linked List + +mutable struct Node{T} + const value::Union{T, Nothing} + @atomic next::Union{Node{T}, Nothing} + @atomic prev::Union{Node{T}, Nothing} + + Node{T}(value, next, prev) where T = new{T}(value, next, prev) + function Node(next::Node{T}) where T # Marker + this = new{T}(nothing, next, nothing) + @atomic :release this.prev = this + return this + end +end + +Node(value::T, next, prev) where T = Node{T}(value, next, prev) + +get_next(node::Node) = @atomic :acquire node.next +set_next(node::Node, next) = @atomic :release node.next = next +get_prev(node::Node) = @atomic :acquire node.prev +set_prev(node::Node, prev) = @atomic :release node.prev = prev +function cas_next(node::Node, exp::Node, desired::Node) + _,success = @atomicreplace :acquire_release :monotonic node.next exp => desired + return success +end +is_special(node::Node) = node.value === nothing +is_trailer(node::Node) = get_next(node) === nothing +is_header(node::Node) = get_prev(node) === nothing +is_marker(node::Node) = get_prev(node) === node + +function is_deleted(node::Node) + f = get_next(node) + return f !== nothing && is_marker(f) +end + +function next_nonmarker(node::Node) + f = get_next(node) + return (f === nothing || !is_marker(f)) ? f : get_next(f) +end + +function Base.show(io::IO, node::Node) + if is_special(node) + if is_marker(node) + print(io, "MarkerNode") + return + elseif is_header(node) + next = get_next(node) + if next === nothing + print(io, "BrokenNode()") + return + elseif is_marker(node) + print(io, "HeaderNode(next: MarkerNode)") + return + elseif is_trailer(next) + print(io, "HeaderNode(next: TrailerNode)") + return + end + print(io, "HeaderNode(next: ", next,")") + return + elseif is_trailer(node) + prev = get_prev(node) + if prev === nothing + print(io, "BrokenNode()") + return + elseif is_marker(node) + print(io, "TrailerNode(prev: MarkerNode)") + return + elseif is_header(prev) + print(io, "TrailerNode(prev: HeaderNode)") + return + end + print(io, "TrailerNode(prev: ", prev,")") + return + end + end + print(io, "Node(", node.value,")") +end + +function successor(node::Node) + f = next_nonmarker(node) + while true + if f === nothing + return nothing + end + if !is_deleted(f) + if get_prev(f) !== node && !is_deleted(node) + set_prev(f, node) # relink f to node + end + return f + end + s = next_nonmarker(f) + if f === get_next(node) + cas_next(node, f, s) + end + f = s + end +end + +function find_predecessor_of(node::Node{T}, target::Node{T}) where {T} + n = node + while true + f = successor(n) + if (f === target) + return n + end + if (f === nothing) + return nothing + end + n = f + end +end + +function predecessor(node::Node) + n = node + while true + b = get_prev(n) + if (b === nothing) + return find_predecessor_of(n, node) + end + s = get_next(b) + if (s === node) + return b + end + if (s === nothing || !is_marker(s)) + p = find_predecessor_of(b, node) + if (p !== nothing) + return p + end + end + n = b + end +end + +function forward(node::Node) + f = successor(node) + return (f === nothing || is_special(f)) ? nothing : f +end + +function back(node::Node) + f = predecessor(node) + return (f === nothing || is_special(f)) ? nothing : f +end + +function append!(node::Node{T}, val::T) where {T} + while true + f = get_next(node) + if (f === nothing || is_marker(f)) + return nothing + end + x = Node(val, f, node) + if cas_next(node, f, x) + set_prev(f, x) + return x + end + end +end + +function prepend!(node::Node{T}, val::T) where {T} + while true + b = predecessor(node) + if b === nothing + return nothing + end + x = Node(val, node, b) + if cas_next(b, node, x) + set_prev(node, x) + return x + end + end +end + +function delete!(node::Node) + b = get_prev(node) + f = get_next(node) + if (b !== nothing && f !== nothing && !is_marker(f) && cas_next(node, f, Node(f))) + if (cas_next(b, node, f)) + set_prev(f, b) + end + return true + end + return false +end + +function replace!(node::Node{T}, val::T) where {T} + while true + b = get_prev(node) + f = get_next(node) + if (b === nothing || f === nothing || is_marker(f)) + return nothing + end + x = Node(val, f, b) + if cas_next(node, f, Node(x)) + successor(b) + successor(x) + return x + end + end +end + +function usable(node::Node) + return node !== nothing && !is_special(node) +end + +const _PADDING_TUPLE = ntuple(zero, 15) +mutable struct ConcurrentDoublyLinkedList{T} + @atomic header::Union{Node{T}, Nothing} # 8 bytes + padding::NTuple{15,UInt64} # 120 bytes + @atomic trailer::Union{Node{T}, Nothing} + padding2::NTuple{15,UInt64} + function ConcurrentDoublyLinkedList{T}(header::Union{Node{T}, Nothing}, trailer::Union{Node{T}, Nothing}) where {T} + new{T}(header, _PADDING_TUPLE, trailer, _PADDING_TUPLE) + end +end + +function ConcurrentDoublyLinkedList{T}() where {T} + h = Node{T}(nothing, nothing, nothing) + t = Node{T}(nothing, nothing, h) + set_next(h, t) + ConcurrentDoublyLinkedList{T}(h, t) +end + +const CDLL = ConcurrentDoublyLinkedList + +function Base.pushfirst!(cdll::CDLL{T}, val::T) where {T} + while (append!((@atomic :acquire cdll.header), val) === nothing) + end +end + +function pushlast!(cdll::CDLL{T}, val::T) where {T} + while (prepend!((@atomic :acquire cdll.trailer), val) === nothing) + end +end + +function Base.popfirst!(cdll::CDLL) + while true + n = successor((@atomic :acquire cdll.header)) + if !usable(n) + return nothing + end + if delete!(n) + return n.value + end + end +end + +function poplast!(cdll::CDLL) + while true + n = predecessor((@atomic :acquire cdll.trailer)) + if !usable(n) + return nothing + end + if delete!(n) + return n.value + end + end +end + +Base.push!(cdll::CDLL{T}, val::T) where {T} = pushfirst!(cdll, val) +Base.pop!(cdll::CDLL) = poplast!(cdll) +steal!(cdll::CDLL) = popfirst!(cdll) +Base.isempty(cdll::CDLL) = !usable(successor(@atomic :acquire cdll.header)) + +const Queue = CDLL + +end \ No newline at end of file diff --git a/base/schedulers/CLL.jl b/base/schedulers/CLL.jl new file mode 100644 index 0000000000000..8e013413ddea0 --- /dev/null +++ b/base/schedulers/CLL.jl @@ -0,0 +1,156 @@ +# This file is a part of Julia. License is MIT: https://julialang.org/license + +module CLL +# Also see `work-stealing-queue.h` this is a pure Julia re-implementation + +# ======= +# Chase and Lev's work-stealing queue, optimized for +# weak memory models by Le et al. +# +# * Chase D., Lev Y. Dynamic Circular Work-Stealing queue +# * Le N. M. et al. Correct and Efficient Work-Stealing for +# Weak Memory Models +# ======= +# mutable so that we don't get a mutex in WSQueue +mutable struct WSBuffer{T} + const buffer::AtomicMemory{T} + const capacity::Int64 + const mask::Int64 + @noinline function WSBuffer{T}(capacity::Int64) where T + if __unlikely(capacity == 0) + throw(ArgumentError("Capacity can't be zero")) + end + if __unlikely(count_ones(capacity) != 1) + throw(ArgumentError("Capacity must be a power of two")) + end + buffer = AtomicMemory{T}(undef, capacity) + mask = capacity - 1 + return new(buffer, capacity, mask) + end +end + +function Base.getindex_atomic(buf::WSBuffer{T}, order::Symbol, idx::Int64) where T + @inbounds Base.getindex_atomic(buf.buffer, order, ((idx - 1) & buf.mask) + 1) +end + +function Base.setindex_atomic!(buf::WSBuffer{T}, order::Symbol, val::T, idx::Int64) where T + @inbounds Base.setindex_atomic!(buf.buffer, order, val,((idx - 1) & buf.mask) + 1) +end + +function Base.modifyindex_atomic!(buf::WSBuffer{T}, order::Symbol, op, val::T, idx::Int64) where T + @inbounds Base.modifyindex_atomic!(buf.buffer, order, op, val, ((idx - 1) & buf.mask) + 1) +end + +function Base.swapindex_atomic!(buf::WSBuffer{T}, order::Symbol, val::T, idx::Int64) where T + @inbounds Base.swapindex_atomic!(buf.buffer, order, val, ((idx - 1) & buf.mask) + 1) +end + +function Base.replaceindex_atomic!(buf::WSBuffer{T}, success_order::Symbol, fail_order::Symbol, expected::T, desired::T, idx::Int64) where T + @inbounds Base.replaceindex_atomic!(buf.buffer, success_order, fail_order, expected, desired, ((idx - 1) & buf.mask) + 1) +end + +function Base.copyto!(dst::WSBuffer{T}, src::WSBuffer{T}, top, bottom) where T + # must use queue indexes. When the queue is in state top=3, bottom=18, capacity=16 + # the real index of element 18 in the queue is 2, after growing in the new buffer it must be 18 + @assert dst.capacity >= src.capacity + @assert top <= bottom + # TODO overflow of bottom? + for i in top:bottom + @atomic :monotonic dst[i] = @atomic :monotonic src[i] + end +end + +""" + WSQueue{T} + +Work-stealing queue after Chase & Le. + +!!! note + popfirst! and push! are only allowed to be called from owner. +""" +mutable struct WSQueue{T} + @atomic top::Int64 + @atomic bottom::Int64 + @atomic buffer::WSBuffer{T} + function WSQueue{T}(capacity = 64) where T + new(1, 1, WSBuffer{T}(capacity)) + end +end + +# pushBottom +function Base.push!(q::WSQueue{T}, v::T) where T + bottom = @atomic :monotonic q.bottom + top = @atomic :acquire q.top + buffer = @atomic :monotonic q.buffer + + size = bottom - top + if __unlikely(size > (buffer.capacity - 1)) # Chase-Lev has size >= (buf.capacity - 1) || Le has size > (buf.capacity - 1) + new_buffer = WSBuffer{T}(2*buffer.capacity) + copyto!(new_buffer, buffer, top, bottom) + @atomic :release q.buffer = new_buffer + buffer = new_buffer + end + @atomic :monotonic buffer[bottom] = v + Core.Intrinsics.atomic_fence(:release) + @atomic :monotonic q.bottom = bottom + 1 + return nothing +end + +# popBottom / take +function Base.popfirst!(q::WSQueue{T}) where T + bottom = (@atomic :monotonic q.bottom) - 1 + buffer = @atomic :monotonic q.buffer + @atomic :monotonic q.bottom = bottom + + Core.Intrinsics.atomic_fence(:sequentially_consistent) # TODO slow on AMD + + top = @atomic :monotonic q.top + + size = bottom - top + 1 + if __likely(size > 0) + # Non-empty queue + v = @atomic :monotonic buffer[bottom] + if size == 1 + # Single last element in queue + _, success = @atomicreplace :sequentially_consistent :monotonic q.top top => top + 1 + @atomic :monotonic q.bottom = bottom + 1 + if !success + # Failed race + return nothing + end + end + return v + else + # Empty queue + @atomic :monotonic q.bottom = bottom + 1 + return nothing + end +end + +function steal!(q::WSQueue{T}) where T + top = @atomic :acquire q.top + Core.Intrinsics.atomic_fence(:sequentially_consistent) + bottom = @atomic :acquire q.bottom + size = bottom - top + if size > 0 + # Non-empty queue + buffer = @atomic :acquire q.buffer # consume in Le + v = @atomic :monotonic buffer[top] + _, success = @atomicreplace :sequentially_consistent :monotonic q.top top => top + 1 + if !success + # Failed race + return nothing + end + return v + end + return nothing # failed +end + +Base.pop!(q::WSQueue{T}) where T = popfirst!(q) +@inline __likely(cond::Bool) = ccall("llvm.expect", llvmcall, Bool, (Bool, Bool), cond, true) +@inline __unlikely(cond::Bool) = ccall("llvm.expect", llvmcall, Bool, (Bool, Bool), cond, false) +Base.isempty(q::WSQueue) = q.top == q.bottom + +const Queue = WSQueue + +end #module diff --git a/base/partr.jl b/base/schedulers/partr.jl similarity index 94% rename from base/partr.jl rename to base/schedulers/partr.jl index 8c95e3668ee74..b39b89a84e7cb 100644 --- a/base/partr.jl +++ b/base/schedulers/partr.jl @@ -19,10 +19,6 @@ const heap_d = UInt32(8) const heaps = [Vector{taskheap}(undef, 0), Vector{taskheap}(undef, 0)] const heaps_lock = [SpinLock(), SpinLock()] - -cong(max::UInt32) = iszero(max) ? UInt32(0) : ccall(:jl_rand_ptls, UInt32, (UInt32,), max) + UInt32(1) - - function multiq_sift_up(heap::taskheap, idx::Int32) while idx > Int32(1) parent = (idx - Int32(2)) รท heap_d + Int32(1) @@ -94,10 +90,10 @@ function multiq_insert(task::Task, priority::UInt16) task.priority = priority - rn = cong(heap_p) + rn = Base.Scheduler.cong(heap_p) tpheaps = heaps[tp] while !trylock(tpheaps[rn].lock) - rn = cong(heap_p) + rn = Base.Scheduler.cong(heap_p) end heap = tpheaps[rn] @@ -137,8 +133,8 @@ function multiq_deletemin() if i == heap_p return nothing end - rn1 = cong(heap_p) - rn2 = cong(heap_p) + rn1 = Base.Scheduler.cong(heap_p) + rn2 = Base.Scheduler.cong(heap_p) prio1 = tpheaps[rn1].priority prio2 = tpheaps[rn2].priority if prio1 > prio2 @@ -190,4 +186,8 @@ function multiq_check_empty() return true end +enqueue!(t::Task) = multiq_insert(t, t.priority) +dequeue!() = multiq_deletemin() +checktaskempty() = multiq_check_empty() + end diff --git a/base/schedulers/scheduler.jl b/base/schedulers/scheduler.jl new file mode 100644 index 0000000000000..679487d9bf78c --- /dev/null +++ b/base/schedulers/scheduler.jl @@ -0,0 +1,22 @@ +# This file is a part of Julia. License is MIT: https://julialang.org/license + +module Scheduler + +cong(max::UInt32) = iszero(max) ? UInt32(0) : ccall(:jl_rand_ptls, UInt32, (UInt32,), max) + UInt32(1) +include("schedulers/partr.jl") +include("schedulers/workstealing.jl") + +const ChosenScheduler = Workstealing + + + +# Scheduler interface: + # enqueue! which pushes a runnable Task into it + # dequeue! which pops a runnable Task from it + # checktaskempty which returns true if the scheduler has no available Tasks + +enqueue!(t::Task) = ChosenScheduler.enqueue!(t) +dequeue!() = ChosenScheduler.dequeue!() +checktaskempty() = ChosenScheduler.checktaskempty() + +end \ No newline at end of file diff --git a/base/schedulers/workstealing.jl b/base/schedulers/workstealing.jl new file mode 100644 index 0000000000000..4529c97f68fa1 --- /dev/null +++ b/base/schedulers/workstealing.jl @@ -0,0 +1,158 @@ +# This file is a part of Julia. License is MIT: https://julialang.org/license + +module Workstealing + +# Expected interface for a work-stealing queue: +# push!(queue, task) +# pop!(queue) # Only legal if you are the queues owner. +# steal!(queue) +include("schedulers/CLL.jl") +include("schedulers/CDLL.jl") + +# Threadpool utilities +function cur_threadpoolid() + return ccall(:jl_cur_threadpoolid, Int8, ()) + 1 +end + +function cur_threadpool_tid() + return ccall(:jl_cur_threadpool_tid, Int16, ()) + 1 +end + +function get_task_tpid(task::Task) + return ccall(:jl_get_task_threadpoolid, Int8, (Any,), task) + 1 +end + +# Logic for threadpools: +# Each thread has a global thread id, always called tid and unique per thread. +# Accesed via Threads.threadid() for a thread and Threads.threadid(task) for a task. + +# Each threadpool has an id, called tpid and unique per threadpool. +# Accessed via cur_threadpoolid() for the current thread and task_tpid for a task. + +# Each thread also has a threadpool_tid, called tp_tid, its use is to index into the array of queues for the threadpool. +# Accessed via cur_threadpool_tid() for the current thread. Or by checking if it's in the Threads.threadpooltids array. +# It's calculated by doing Threads.threadid() - Threads.threadpooltids(tpid)[1], though we store in the thread ptls for performance. + +# The calls return 1 based indexed numbers so threadpool 1 is :interactive and 2 is :default +# When a thread has either a tp_tid of 0 or a tpid of 0 it means that they aren't associated with a threadpool and should be inserted in the index 1 of the tasks tpid + + +function release_copyto!(dest::AtomicMemory{T}, src::AbstractArray{T,1}) where T + Base._checkaxs(axes(dest), axes(src)) + for i in eachindex(src) + @atomic :monotonic dest[i] = src[i] + end + Core.Intrinsics.atomic_fence(:release) + return dest +end + +make_atomic(x::AbstractArray{T,1}) where {T} = release_copyto!(AtomicMemory{T}(undef, size(x)), x) + +const QueueModule = ConcurrentList +const Queue = QueueModule.Queue{Task} +const Queues_lock = Threads.SpinLock() +global Queues::AtomicMemory{Memory{Queue}} = make_atomic([Memory{Queue}([Queue()]) for _ in 1:Threads.nthreadpools()]) # One array of queues per threadpool + +function queue_for(tp_tid::Int, tpid::Int) + @assert tp_tid >= 0 + qs = @atomic :monotonic Queues[tpid] + if (tp_tid == 0) + queue_index = 1 # We always have a queue for someone that isn't us to push to + else + queue_index = tp_tid + 1 + end + if length(qs) >= queue_index && isassigned(qs, queue_index) + return qs[queue_index] + end + # slow path to allocate it + # TODO: outline this + l = Queues_lock + @lock l begin + qs = @atomic :monotonic Queues[tpid] + if length(qs) < queue_index + nt = Threads._nthreads_in_pool(Int8(tpid - 1)) + 1 + @assert queue_index <= nt + new_q = copyto!(typeof(qs)(undef, length(qs) + nt - 1), qs) + qs = new_q + @atomic :monotonic Queues[tpid] = new_q + end + if !isassigned(qs, queue_index) + qs[queue_index] = Queue() + end + return qs[queue_index] + end +end + +function enqueue!(t::Task) + task_tpid = get_task_tpid(t) + thread_tpid = cur_threadpoolid() + if task_tpid == thread_tpid + push!(queue_for(Int(cur_threadpool_tid()), Int(thread_tpid)), t) + else + push!(queue_for(0, Int(task_tpid)), t) + end + return nothing +end + +function dequeue!() + tpid = cur_threadpoolid() + tp_tid = cur_threadpool_tid() + tid = Threads.threadid() + q = queue_for(Int(tp_tid), Int(tpid)) + t = pop!(q) # Check own queue first + if t !== nothing + if ccall(:jl_set_task_tid, Cint, (Any, Cint), t, tid-1) == 0 + push!(q, t) # Is there a way to avoid popping the same unrunnable task over and over? + ccall(:jl_wakeup_thread, Cvoid, (Int16,), (Threads.threadid(t) - 1) % Int16) + else + return t + end + end + t = attempt_steal!(Int(tp_tid), Int(tpid)) # Otherwise try to steal from others + return t +end + +function attempt_steal!(tp_tid::Int, tpid::Int) + tid = Threads.threadid() + nt = Threads._nthreads_in_pool(Int8(tpid - 1)) + for _ in 1:(4*nt) # Try to steal 4x nthread times + tp_tid2 = Base.Scheduler.cong(UInt32(nt + 1)) - 1 # From 0 to nt since queue_for uses 0 for the foreign queue + tp_tid == tp_tid2 && continue + t = QueueModule.steal!(queue_for(Int(tp_tid2), Int(tpid))) #TODO: Change types of things to avoid the convert + if t !== nothing + if ccall(:jl_set_task_tid, Cint, (Any, Cint), t, tid-1) == 0 + push!(queue_for(0, Int(get_task_tpid(t))), t) + ccall(:jl_wakeup_thread, Cvoid, (Int16,), (Threads.threadid(t) - 1) % Int16) + else + return t + end + end + end + for i in 0:(nt) # Try to steal from other threads round robin + t = QueueModule.steal!(queue_for(Int(i), Int(tpid))) #TODO: Change types of things to avoid the convert + if t !== nothing + if ccall(:jl_set_task_tid, Cint, (Any, Cint), t, tid-1) == 0 + push!(queue_for(0, Int(get_task_tpid(t))), t) + ccall(:jl_wakeup_thread, Cvoid, (Int16,), (Threads.threadid(t) - 1) % Int16) + else + return t + end + end + end + return nothing +end + +function checktaskempty() + qs = @atomic :monotonic Queues[cur_threadpoolid()] + for i in eachindex(qs) + if isassigned(qs, i) + q = qs[i] + if !isempty(q) + return false + end + end + end + return true +end + +end diff --git a/base/task.jl b/base/task.jl index 6cb1ff785eeee..8e7ca5aa268cc 100644 --- a/base/task.jl +++ b/base/task.jl @@ -1,5 +1,52 @@ # This file is a part of Julia. License is MIT: https://julialang.org/license +# How does the julia scheduler work?? + +# mutable struct Task +# next::Union{Nothing,Task} #Invasive linked list +# queue::Any #Union{Nothing, StickyWorkqueue } Invasive linked list +# storage::Any # Could be a union +# donenotify::Any #Condition var for task done +# result::Any #Return value for the closure +# scope::Any #Scope for ScopeValue +# code::Any #Closure being called +# rngState0::UInt64 #Random number generator state +# rngState1::UInt64 #Random number generator state +# rngState2::UInt64 #Random number generator state +# rngState3::UInt64 #Random number generator state +# rngState4::UInt64 #Random number generator state +# _state::UInt8 #task state Maybe make enum? #TODO: Should be atomic +# sticky::Bool +# _isexception::Bool #TODO: Should be atomic +# priority::UInt16 +# end +#hidden state of the task begin +# tid::Int16 #thread id +# threadpoolid::Int8 +# reentrant_timing::UInt8 # Counter for inference reentrancy +# gcstack::jl_gcframe_t # GC stack frame +# world_age::Csize_t +# ptls::jl_ptls_t # Pointer to the thread tls +# excstack::Any #Exception stack +# eh::jl_handler_t #Exception handler +# ctx::jl_ucontext_t #Saved thread state #TODO: this is very large and likely unnecessary +# stkbuf::Ptr{Cvoid} #Stack buffer +# bufsz::Csize_t #Size of the stack buffer +# +# end + +#The scheduler has many entrypoints but the simplest one is yield. Which enqueues current_task and looks for another task to run. + +#TODO: Figure out what the error paths are supposed to be doing + +# yield +# enqueue current task and enter scheduler for something to do by calling wait() + +# wait +# gets a workqueue from workqueue_for pops it via poptask and sets it as the next task + +# ensure_rescheduled +# ## basic task functions and TLS Core.Task(@nospecialize(f), reserved_stack::Int=0) = Core._Task(f, reserved_stack, ThreadSynchronizer()) @@ -971,15 +1018,37 @@ function enq_work(t::Task) push!(workqueue_for(tid), t) else # Otherwise, put the task in the multiqueue. - Partr.multiq_insert(t, t.priority) + Scheduler.enqueue!(t) tid = 0 end end - ccall(:jl_wakeup_thread, Cvoid, (Int16,), (tid - 1) % Int16) + if (tid == 0) + n_spinning = Core.Intrinsics.atomic_pointerref(cglobal(:jl_n_threads_spinning, Cint), :acquire) + n_spinning == 0 && ccall(:jl_add_spinner, Cvoid, ()) + else + ccall(:jl_wakeup_thread, Cvoid, (Int16,), (tid - 1) % Int16) + end + # n_spinning = Core.Intrinsics.atomic_pointerref(cglobal(:jl_n_threads, Cint), :acquire) + # n_spinning == 0 && ccall(:jl_add_spinner, Cvoid, ()) return t end -schedule(t::Task) = enq_work(t) +const ChildFirst = false + +function schedule(t::Task) + if ChildFirst + ct = current_task() + if ct.sticky || t.sticky + enq_work(t) + else + enq_work(ct) + yieldto(t) + end + else + enq_work(t) + end + return t +end """ schedule(t::Task, [val]; error=false) @@ -1154,10 +1223,10 @@ function trypoptask(W::StickyWorkqueue) end return t end - return Partr.multiq_deletemin() + return Scheduler.dequeue!() end -checktaskempty = Partr.multiq_check_empty +checktaskempty = Scheduler.checktaskempty @noinline function poptask(W::StickyWorkqueue) task = trypoptask(W) diff --git a/src/codegen.cpp b/src/codegen.cpp index e9a58d25e3e94..ae3ea4cf3e1c2 100644 --- a/src/codegen.cpp +++ b/src/codegen.cpp @@ -80,6 +80,10 @@ #include #include +#ifdef USE_ITTAPI +#include "ittapi/ittnotify.h" +#endif + using namespace llvm; static bool jl_fpo_disabled(const Triple &TT) { @@ -10235,8 +10239,14 @@ extern "C" void jl_init_llvm(void) const char *jit_profiling = getenv("ENABLE_JITPROFILING"); #if defined(JL_USE_INTEL_JITEVENTS) - if (jit_profiling && atoi(jit_profiling)) { - jl_using_intel_jitevents = 1; + if (jit_profiling) { + if (atoi(jit_profiling)) { + jl_using_intel_jitevents = 1; + } + } else { +#ifdef USE_ITTAPI + jl_using_intel_jitevents = __itt_get_collection_state() == __itt_collection_init_successful; +#endif } #endif diff --git a/src/init.c b/src/init.c index 9e6a695c71eb0..984df7f8739fc 100644 --- a/src/init.c +++ b/src/init.c @@ -875,8 +875,8 @@ static NOINLINE void _finish_julia_init(JL_IMAGE_SEARCH rel, jl_ptls_t ptls, jl_ jl_n_markthreads = 0; jl_n_sweepthreads = 0; jl_n_gcthreads = 0; - jl_n_threads_per_pool[0] = 1; - jl_n_threads_per_pool[1] = 0; + jl_n_threads_per_pool[0] = 0; + jl_n_threads_per_pool[1] = 1; } else { post_image_load_hooks(); } diff --git a/src/jl_exported_data.inc b/src/jl_exported_data.inc index ff79966b2b01b..da2bff02a1298 100644 --- a/src/jl_exported_data.inc +++ b/src/jl_exported_data.inc @@ -155,6 +155,7 @@ #define JL_EXPORTED_DATA_SYMBOLS(XX) \ XX(jl_n_threadpools, int) \ XX(jl_n_threads, _Atomic(int)) \ + XX(jl_n_threads_spinning, _Atomic(int)) \ XX(jl_n_gcthreads, int) \ XX(jl_options, jl_options_t) \ XX(jl_task_gcstack_offset, int) \ diff --git a/src/jl_exported_funcs.inc b/src/jl_exported_funcs.inc index 1976dbe709733..b03a74bc8f8cd 100644 --- a/src/jl_exported_funcs.inc +++ b/src/jl_exported_funcs.inc @@ -449,6 +449,7 @@ XX(jl_tagged_gensym) \ XX(jl_take_buffer) \ XX(jl_task_get_next) \ + XX(jl_add_spinner) \ XX(jl_task_stack_buffer) \ XX(jl_termios_size) \ XX(jl_test_cpu_feature) \ diff --git a/src/julia.h b/src/julia.h index 074c50fd0aa21..346c8b4ab7f50 100644 --- a/src/julia.h +++ b/src/julia.h @@ -1960,6 +1960,7 @@ JL_DLLEXPORT jl_sym_t *jl_get_ARCH(void) JL_NOTSAFEPOINT; JL_DLLIMPORT jl_value_t *jl_get_libllvm(void) JL_NOTSAFEPOINT; extern JL_DLLIMPORT int jl_n_threadpools; extern JL_DLLIMPORT _Atomic(int) jl_n_threads; +extern JL_DLLIMPORT _Atomic(int) jl_n_threads_spinning; // Scheduler internal count extern JL_DLLIMPORT int jl_n_gcthreads; extern int jl_n_markthreads; extern int jl_n_sweepthreads; diff --git a/src/julia_threads.h b/src/julia_threads.h index e56ff5edd6176..60ce5b371dd3f 100644 --- a/src/julia_threads.h +++ b/src/julia_threads.h @@ -124,6 +124,7 @@ struct _jl_bt_element_t; #define JL_MAX_BT_SIZE 80000 typedef struct _jl_tls_states_t { int16_t tid; + int16_t threadpool_tid; int8_t threadpoolid; uint64_t rngseed; _Atomic(volatile size_t *) safepoint; // may be changed to the suspend page by any thread @@ -197,6 +198,7 @@ typedef struct _jl_tls_states_t { uint64_t uv_run_leave; uint64_t sleep_enter; uint64_t sleep_leave; + uint64_t woken_up; ) // some hidden state (usually just because we don't have the type's size declaration) diff --git a/src/scheduler.c b/src/scheduler.c index 3cf97ba108873..6a3eea37d31f2 100644 --- a/src/scheduler.c +++ b/src/scheduler.c @@ -209,6 +209,7 @@ void surprise_wakeup(jl_ptls_t ptls) JL_NOTSAFEPOINT // introduced some inaccuracy into the count, but that is // unavoidable with any asynchronous interruption jl_atomic_fetch_add_relaxed(&n_threads_running, 1); + jl_atomic_fetch_add_relaxed(&jl_n_threads_spinning, 1); } } } @@ -221,6 +222,7 @@ static int set_not_sleeping(jl_ptls_t ptls) JL_NOTSAFEPOINT return 1; } } + jl_atomic_fetch_add_relaxed(&jl_n_threads_spinning, -1); int wasrunning = jl_atomic_fetch_add_relaxed(&n_threads_running, -1); // consume in-flight wakeup assert(wasrunning > 1); (void)wasrunning; return 0; @@ -233,6 +235,8 @@ static int wake_thread(int16_t tid) JL_NOTSAFEPOINT if (jl_atomic_load_relaxed(&ptls2->sleep_check_state) != not_sleeping) { int8_t state = sleeping; if (jl_atomic_cmpswap_relaxed(&ptls2->sleep_check_state, &state, not_sleeping)) { + JULIA_DEBUG_SLEEPWAKE( ptls2->woken_up = cycleclock() ); + jl_atomic_fetch_add_relaxed(&jl_n_threads_spinning, 1); int wasrunning = jl_atomic_fetch_add_relaxed(&n_threads_running, 1); // increment in-flight wakeup count assert(wasrunning); (void)wasrunning; JL_PROBE_RT_SLEEP_CHECK_WAKE(ptls2, state); @@ -269,6 +273,7 @@ JL_DLLEXPORT void jl_wakeup_thread(int16_t tid) JL_NOTSAFEPOINT if (jl_atomic_load_relaxed(&ptls->sleep_check_state) != not_sleeping) { if (jl_atomic_exchange_relaxed(&ptls->sleep_check_state, not_sleeping) != not_sleeping) { int wasrunning = jl_atomic_fetch_add_relaxed(&n_threads_running, 1); + jl_atomic_fetch_add_relaxed(&jl_n_threads_spinning, 1); assert(wasrunning); (void)wasrunning; JL_PROBE_RT_SLEEP_CHECK_WAKEUP(ptls); } @@ -368,17 +373,68 @@ static int may_sleep(jl_ptls_t ptls) JL_NOTSAFEPOINT return jl_atomic_load_relaxed(&ptls->sleep_check_state) == sleeping; } +// jl_n_threads_spinning is basically the same as n_threads_running with the caveat that it gets decremented when a thread starts doing work +// if a thread leaves to do work and spinning goes to 0 we try to wake up another thread +STATIC_INLINE void add_spinner(jl_task_t *ct) JL_NOTSAFEPOINT +{ + // Find sleeping thread to wake up + if (jl_atomic_load_relaxed(&n_threads_running) >= (jl_atomic_load_relaxed(&jl_n_threads) - jl_n_gcthreads)) + return; // TODO: This fast path is not correct with foreign threads in play + int self = jl_atomic_load_relaxed(&ct->tid); + int nthreads = jl_atomic_load_acquire(&jl_n_threads); + jl_fence(); + jl_task_t *uvlock = jl_atomic_load_relaxed(&jl_uv_mutex.owner); + jl_ptls_t ptls = ct->ptls; + if (jl_atomic_load_relaxed(&ptls->sleep_check_state) != not_sleeping) { + if (jl_atomic_exchange_relaxed(&ptls->sleep_check_state, not_sleeping) != not_sleeping) { + int wasrunning = jl_atomic_fetch_add_relaxed(&n_threads_running, 1); + jl_atomic_fetch_add_relaxed(&jl_n_threads_spinning, 1); + assert(wasrunning); (void)wasrunning; + JL_PROBE_RT_SLEEP_CHECK_WAKEUP(ptls); + } + } + int anysleep = 0; + for (int tid = 0; tid < nthreads; tid++) { + if ((tid != self) && wake_thread(tid)) { + anysleep = 1; + break; + } + } + if (uvlock == ct) + uv_stop(jl_global_event_loop()); + else if (uvlock != ct && anysleep) { + jl_fence(); + if (jl_atomic_load_relaxed(&jl_uv_mutex.owner) != NULL) + wake_libuv(); + } +} + +JL_DLLEXPORT void jl_add_spinner(void) +{ + jl_task_t *ct = jl_current_task; + add_spinner(ct); +} + +STATIC_INLINE void exit_spinning(jl_task_t* ct) JL_NOTSAFEPOINT +{ + int was_spinning = jl_atomic_fetch_add_relaxed(&jl_n_threads_spinning, -1); // decrement spinning count + if (was_spinning == 1) { + // we were the last spinning thread and we found work so wake up another thread to spin + add_spinner(ct); + } +} JL_DLLEXPORT jl_task_t *jl_task_get_next(jl_value_t *trypoptask, jl_value_t *q, jl_value_t *checkempty) { jl_task_t *ct = jl_current_task; uint64_t start_cycles = 0; - + jl_atomic_fetch_add_relaxed(&jl_n_threads_spinning, 1); // increment spinning count while (1) { jl_task_t *task = get_next_task(trypoptask, q); - if (task) + if (task) { + exit_spinning(ct); return task; - + } // quick, race-y check to see if there seems to be any stuff in there jl_cpu_pause(); if (!check_empty(checkempty)) { @@ -490,6 +546,7 @@ JL_DLLEXPORT jl_task_t *jl_task_get_next(jl_value_t *trypoptask, jl_value_t *q, // any thread which wants us running again will have to observe // sleep_check_state==sleeping and increment n_threads_running for us + jl_atomic_fetch_add_relaxed(&jl_n_threads_spinning, -1); int wasrunning = jl_atomic_fetch_add_relaxed(&n_threads_running, -1); assert(wasrunning); isrunning = 0; @@ -515,6 +572,7 @@ JL_DLLEXPORT jl_task_t *jl_task_get_next(jl_value_t *trypoptask, jl_value_t *q, if (ptls->tid == 0) { task = wait_empty; if (task && jl_atomic_load_relaxed(&n_threads_running) == 0) { + jl_atomic_fetch_add_relaxed(&jl_n_threads_spinning, 1); wasrunning = jl_atomic_fetch_add_relaxed(&n_threads_running, 1); assert(!wasrunning); wasrunning = !set_not_sleeping(ptls); @@ -543,13 +601,17 @@ JL_DLLEXPORT jl_task_t *jl_task_get_next(jl_value_t *trypoptask, jl_value_t *q, } JL_CATCH { // probably SIGINT, but possibly a user mistake in trypoptask - if (!isrunning) + if (!isrunning) { + jl_atomic_fetch_add_relaxed(&jl_n_threads_spinning, 1); jl_atomic_fetch_add_relaxed(&n_threads_running, 1); + } set_not_sleeping(ptls); jl_rethrow(); } - if (task) + if (task) { + exit_spinning(ct); return task; + } } else { // maybe check the kernel for new messages too @@ -573,9 +635,11 @@ void scheduler_delete_thread(jl_ptls_t ptls) JL_NOTSAFEPOINT } } else { + jl_atomic_fetch_add_relaxed(&jl_n_threads_spinning, 1); jl_atomic_fetch_add_relaxed(&n_threads_running, 1); } jl_wakeup_thread(0); // force thread 0 to see that we do not have the IO lock (and am dead) + jl_atomic_fetch_add_relaxed(&jl_n_threads_spinning, -1); jl_atomic_fetch_add_relaxed(&n_threads_running, -1); } diff --git a/src/threading.c b/src/threading.c index 8f37ee814056c..fe0e6940b0ad3 100644 --- a/src/threading.c +++ b/src/threading.c @@ -338,6 +338,36 @@ JL_DLLEXPORT int8_t jl_threadpoolid(int16_t tid) JL_NOTSAFEPOINT return -1; // everything else uses threadpool -1 (does not belong to any threadpool) } +JL_DLLEXPORT int8_t jl_cur_threadpoolid(void) JL_NOTSAFEPOINT +{ + return jl_current_task->ptls->threadpoolid; +} + +JL_DLLEXPORT int16_t jl_cur_threadpool_tid(void) JL_NOTSAFEPOINT +{ + return jl_current_task->ptls->threadpool_tid; +} + +STATIC_INLINE void set_ptls_tpid(jl_ptls_t ptls) JL_NOTSAFEPOINT +{ + int16_t tid = ptls->tid; + int nthreads = jl_atomic_load_acquire(&jl_n_threads); + if (tid < 0 || tid >= nthreads) + jl_error("invalid tid"); + int n = 0; + for (int i = 0; i < jl_n_threadpools; i++) { + int old_n = n; + n += jl_n_threads_per_pool[i]; + if (tid < n) { + ptls->threadpoolid = i; + ptls->threadpool_tid = tid - old_n; + return; + } + } + ptls->threadpoolid = -1; // everything else uses threadpool -1 (does not belong to any threadpool) + ptls->threadpool_tid = -1; +} + jl_ptls_t jl_init_threadtls(int16_t tid) { #ifndef _OS_WINDOWS_ @@ -386,6 +416,7 @@ jl_ptls_t jl_init_threadtls(int16_t tid) if (tid == -1) tid = jl_atomic_load_relaxed(&jl_n_threads); ptls->tid = tid; + set_ptls_tpid(ptls); jl_ptls_t *allstates = jl_atomic_load_relaxed(&jl_all_tls_states); if (jl_all_tls_states_size <= tid) { int i, newsize = jl_all_tls_states_size + tid + 2; diff --git a/test/threads.jl b/test/threads.jl index 7b4558091022b..a9feaee32d248 100644 --- a/test/threads.jl +++ b/test/threads.jl @@ -329,7 +329,7 @@ end end @testset "rand_ptls underflow" begin - @test Base.Partr.cong(UInt32(0)) == 0 + @test Base.Scheduler.cong(UInt32(0)) == 0 end @testset "num_stack_mappings metric" begin