-
-
Notifications
You must be signed in to change notification settings - Fork 5.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
WIP workstealing scheduler + refactor for easier experimentation #55542
Draft
gbaraldi
wants to merge
12
commits into
master
Choose a base branch
from
gb/workstealing-fun
base: master
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Draft
Changes from 6 commits
Commits
Show all changes
12 commits
Select commit
Hold shift + click to select a range
e80fe48
WIP workstealing scheduler + refactor for easier experimentation
gbaraldi 61d9700
Add convert
gbaraldi 272c5af
Change queue implementation
gbaraldi 4e1fbb2
Fix scheduler
gbaraldi 1d56bde
Fix workstealing implementation and only wake up one thread per enque…
gbaraldi 7ff2c3c
Add padding in the CDLL queue struct + add round robin to the workste…
gbaraldi 9174cc4
Automatically enable JITPROFILING with ITTAPI
vchuravy ed1c0c6
annotate queue implementation after Le
vchuravy e717451
Implement Continuation Stealing mode
gbaraldi a958a97
Implement threadpool support in the workstealing scheduler
gbaraldi bc81e62
fixup! annotate queue implementation after Le
vchuravy 0727c0a
fix copyto in WS
vchuravy File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,266 @@ | ||
|
||
module ConcurrentList #Concurrent Doubly Linked List | ||
|
||
mutable struct Node{T} | ||
const value::Union{T, Nothing} | ||
@atomic next::Union{Node{T}, Nothing} | ||
@atomic prev::Union{Node{T}, Nothing} | ||
|
||
Node{T}(value, next, prev) where T = new{T}(value, next, prev) | ||
function Node(next::Node{T}) where T # Marker | ||
this = new{T}(nothing, next, nothing) | ||
@atomic :release this.prev = this | ||
return this | ||
end | ||
end | ||
|
||
Node(value::T, next, prev) where T = Node{T}(value, next, prev) | ||
|
||
get_next(node::Node) = @atomic :acquire node.next | ||
set_next(node::Node, next) = @atomic :release node.next = next | ||
get_prev(node::Node) = @atomic :acquire node.prev | ||
set_prev(node::Node, prev) = @atomic :release node.prev = prev | ||
function cas_next(node::Node, exp::Node, desired::Node) | ||
_,success = @atomicreplace :acquire_release :monotonic node.next exp => desired | ||
return success | ||
end | ||
is_special(node::Node) = node.value === nothing | ||
is_trailer(node::Node) = get_next(node) === nothing | ||
is_header(node::Node) = get_prev(node) === nothing | ||
is_marker(node::Node) = get_prev(node) === node | ||
|
||
function is_deleted(node::Node) | ||
f = get_next(node) | ||
return f !== nothing && is_marker(f) | ||
end | ||
|
||
function next_nonmarker(node::Node) | ||
f = get_next(node) | ||
return (f === nothing || !is_marker(f)) ? f : get_next(f) | ||
end | ||
|
||
function Base.show(io::IO, node::Node) | ||
if is_special(node) | ||
if is_marker(node) | ||
print(io, "MarkerNode") | ||
return | ||
elseif is_header(node) | ||
next = get_next(node) | ||
if next === nothing | ||
print(io, "BrokenNode()") | ||
return | ||
elseif is_marker(node) | ||
print(io, "HeaderNode(next: MarkerNode)") | ||
return | ||
elseif is_trailer(next) | ||
print(io, "HeaderNode(next: TrailerNode)") | ||
return | ||
end | ||
print(io, "HeaderNode(next: ", next,")") | ||
return | ||
elseif is_trailer(node) | ||
prev = get_prev(node) | ||
if prev === nothing | ||
print(io, "BrokenNode()") | ||
return | ||
elseif is_marker(node) | ||
print(io, "TrailerNode(prev: MarkerNode)") | ||
return | ||
elseif is_header(prev) | ||
print(io, "TrailerNode(prev: HeaderNode)") | ||
return | ||
end | ||
print(io, "TrailerNode(prev: ", prev,")") | ||
return | ||
end | ||
end | ||
print(io, "Node(", node.value,")") | ||
end | ||
|
||
function successor(node::Node) | ||
f = next_nonmarker(node) | ||
while true | ||
if f === nothing | ||
return nothing | ||
end | ||
if !is_deleted(f) | ||
if get_prev(f) !== node && !is_deleted(node) | ||
set_prev(f, node) # relink f to node | ||
end | ||
return f | ||
end | ||
s = next_nonmarker(f) | ||
if f === get_next(node) | ||
cas_next(node, f, s) | ||
end | ||
f = s | ||
end | ||
end | ||
|
||
function find_predecessor_of(node::Node{T}, target::Node{T}) where {T} | ||
n = node | ||
while true | ||
f = successor(n) | ||
if (f === target) | ||
return n | ||
end | ||
if (f === nothing) | ||
return nothing | ||
end | ||
n = f | ||
end | ||
end | ||
|
||
function predecessor(node::Node) | ||
n = node | ||
while true | ||
b = get_prev(n) | ||
if (b === nothing) | ||
return find_predecessor_of(n, node) | ||
end | ||
s = get_next(b) | ||
if (s === node) | ||
return b | ||
end | ||
if (s === nothing || !is_marker(s)) | ||
p = find_predecessor_of(b, node) | ||
if (p !== nothing) | ||
return p | ||
end | ||
end | ||
n = b | ||
end | ||
end | ||
|
||
function forward(node::Node) | ||
f = successor(node) | ||
return (f === nothing || is_special(f)) ? nothing : f | ||
end | ||
|
||
function back(node::Node) | ||
f = predecessor(node) | ||
return (f === nothing || is_special(f)) ? nothing : f | ||
end | ||
|
||
function append!(node::Node{T}, val::T) where {T} | ||
while true | ||
f = get_next(node) | ||
if (f === nothing || is_marker(f)) | ||
return nothing | ||
end | ||
x = Node(val, f, node) | ||
if cas_next(node, f, x) | ||
set_prev(f, x) | ||
return x | ||
end | ||
end | ||
end | ||
|
||
function prepend!(node::Node{T}, val::T) where {T} | ||
while true | ||
b = predecessor(node) | ||
if b === nothing | ||
return nothing | ||
end | ||
x = Node(val, node, b) | ||
if cas_next(b, node, x) | ||
set_prev(node, x) | ||
return x | ||
end | ||
end | ||
end | ||
|
||
function delete!(node::Node) | ||
b = get_prev(node) | ||
f = get_next(node) | ||
if (b !== nothing && f !== nothing && !is_marker(f) && cas_next(node, f, Node(f))) | ||
if (cas_next(b, node, f)) | ||
set_prev(f, b) | ||
end | ||
return true | ||
end | ||
return false | ||
end | ||
|
||
function replace!(node::Node{T}, val::T) where {T} | ||
while true | ||
b = get_prev(node) | ||
f = get_next(node) | ||
if (b === nothing || f === nothing || is_marker(f)) | ||
return nothing | ||
end | ||
x = Node(val, f, b) | ||
if cas_next(node, f, Node(x)) | ||
successor(b) | ||
successor(x) | ||
return x | ||
end | ||
end | ||
end | ||
|
||
function usable(node::Node) | ||
return node !== nothing && !is_special(node) | ||
end | ||
|
||
const _PADDING_TUPLE = ntuple(zero, 15) | ||
mutable struct ConcurrentDoublyLinkedList{T} | ||
@atomic header::Union{Node{T}, Nothing} # 8 bytes | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 4 byte on 32 bit, right? |
||
padding::NTuple{15,UInt64} # 120 bytes | ||
@atomic trailer::Union{Node{T}, Nothing} | ||
padding2::NTuple{15,UInt64} | ||
function ConcurrentDoublyLinkedList{T}(header::Union{Node{T}, Nothing}, trailer::Union{Node{T}, Nothing}) where {T} | ||
new{T}(header, _PADDING_TUPLE, trailer, _PADDING_TUPLE) | ||
end | ||
end | ||
|
||
function ConcurrentDoublyLinkedList{T}() where {T} | ||
h = Node{T}(nothing, nothing, nothing) | ||
t = Node{T}(nothing, nothing, h) | ||
set_next(h, t) | ||
ConcurrentDoublyLinkedList{T}(h, t) | ||
end | ||
|
||
const CDLL = ConcurrentDoublyLinkedList | ||
|
||
function Base.pushfirst!(cdll::CDLL{T}, val::T) where {T} | ||
while (append!((@atomic :acquire cdll.header), val) === nothing) | ||
end | ||
end | ||
|
||
function pushlast!(cdll::CDLL{T}, val::T) where {T} | ||
while (prepend!((@atomic :acquire cdll.trailer), val) === nothing) | ||
end | ||
end | ||
|
||
function Base.popfirst!(cdll::CDLL) | ||
while true | ||
n = successor((@atomic :acquire cdll.header)) | ||
if !usable(n) | ||
return nothing | ||
end | ||
if delete!(n) | ||
return n.value | ||
end | ||
end | ||
end | ||
|
||
function poplast!(cdll::CDLL) | ||
while true | ||
n = predecessor((@atomic :acquire cdll.trailer)) | ||
if !usable(n) | ||
return nothing | ||
end | ||
if delete!(n) | ||
return n.value | ||
end | ||
end | ||
end | ||
|
||
Base.push!(cdll::CDLL{T}, val::T) where {T} = pushfirst!(cdll, val) | ||
Base.pop!(cdll::CDLL) = poplast!(cdll) | ||
steal!(cdll::CDLL) = popfirst!(cdll) | ||
Base.isempty(cdll::CDLL) = !usable(successor(@atomic :acquire cdll.header)) | ||
|
||
const Queue = CDLL | ||
|
||
end | ||
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can make this type stable by having
next
andprev
point to the current node in case there is no next/previous node.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh cool. This will all need some fine comb optimizations for sure.