Skip to content

Commit 3613d26

Browse files
committed
Add kernelabstractions based mapreduce implementation
1 parent 184b36f commit 3613d26

File tree

1 file changed

+182
-4
lines changed

1 file changed

+182
-4
lines changed

src/host/mapreduce.jl

Lines changed: 182 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,10 @@
22

33
const AbstractArrayOrBroadcasted = Union{AbstractArray,Broadcast.Broadcasted}
44

5-
# GPUArrays' mapreduce methods build on `Base.mapreducedim!`, but with an additional
6-
# argument `init` value to avoid eager initialization of `R` (if set to something).
7-
mapreducedim!(f, op, R::AnyGPUArray, A::AbstractArrayOrBroadcasted;
8-
init=nothing) = error("Not implemented") # COV_EXCL_LINE
5+
# # GPUArrays' mapreduce methods build on `Base.mapreducedim!`, but with an additional
6+
# # argument `init` value to avoid eager initialization of `R` (if set to something).
7+
# mapreducedim!(f, op, R::AnyGPUArray, A::AbstractArrayOrBroadcasted;
8+
# init=nothing) = error("Not implemented") # COV_EXCL_LINE
99
# resolve ambiguities
1010
Base.mapreducedim!(f, op, R::AnyGPUArray, A::AbstractArray) = mapreducedim!(f, op, R, A)
1111
Base.mapreducedim!(f, op, R::AnyGPUArray, A::Broadcast.Broadcasted) = mapreducedim!(f, op, R, A)
@@ -132,3 +132,181 @@ function Base.:(==)(A::AnyGPUArray, B::AnyGPUArray)
132132
res = mapreduce(mapper, reducer, A, B; init=(; is_missing=false, is_equal=true))
133133
res.is_missing ? missing : res.is_equal
134134
end
135+
136+
@inline function reduce_group(@context, op, val::T, neutral, ::Val{maxitems}) where {T, maxitems}
137+
items = @groupsize[1]
138+
item = @index(Local, Linear)
139+
140+
# local mem for a complete reduction
141+
shared = @localmem T (maxitems,)
142+
@inbounds shared[item] = val
143+
144+
# perform a reduction
145+
d = 1
146+
while d < items
147+
@synchronize() # legal since cpu=false
148+
index = 2 * d * (item-1) + 1
149+
@inbounds if index <= items
150+
other_val = if index + d <= items
151+
shared[index+d]
152+
else
153+
neutral
154+
end
155+
shared[index] = op(shared[index], other_val)
156+
end
157+
d *= 2
158+
end
159+
160+
# load the final value on the first item
161+
if item == 1
162+
val = @inbounds shared[item]
163+
end
164+
165+
return val
166+
end
167+
168+
Base.@propagate_inbounds _map_getindex(args::Tuple, I) = ((args[1][I]), _map_getindex(Base.tail(args), I)...)
169+
Base.@propagate_inbounds _map_getindex(args::Tuple{Any}, I) = ((args[1][I]),)
170+
Base.@propagate_inbounds _map_getindex(args::Tuple{}, I) = ()
171+
172+
# Reduce an array across the grid. All elements to be processed can be addressed by the
173+
# product of the two iterators `Rreduce` and `Rother`, where the latter iterator will have
174+
# singleton entries for the dimensions that should be reduced (and vice versa).
175+
@kernel cpu=false function partial_mapreduce_device(f, op, neutral, maxitems, Rreduce, Rother, R, As...)
176+
# decompose the 1D hardware indices into separate ones for reduction (across items
177+
# and possibly groups if it doesn't fit) and other elements (remaining groups)
178+
localIdx_reduce = @index(Local, Linear)
179+
localDim_reduce = @groupsize()[1]
180+
groupIdx_reduce, groupIdx_other = fldmod1(@index(Group, Linear), length(Rother))
181+
numGroups = length(KernelAbstractions.blocks(KernelAbstractions.__iterspace(@context())))
182+
groupDim_reduce = numGroups ÷ length(Rother)
183+
184+
# group-based indexing into the values outside of the reduction dimension
185+
# (that means we can safely synchronize items within this group)
186+
iother = groupIdx_other
187+
@inbounds if iother <= length(Rother)
188+
Iother = Rother[iother]
189+
190+
# load the neutral value
191+
Iout = CartesianIndex(Tuple(Iother)..., groupIdx_reduce)
192+
neutral = if neutral === nothing
193+
R[Iout]
194+
else
195+
neutral
196+
end
197+
198+
val = op(neutral, neutral)
199+
200+
# reduce serially across chunks of input vector that don't fit in a group
201+
ireduce = localIdx_reduce + (groupIdx_reduce - 1) * localDim_reduce
202+
while ireduce <= length(Rreduce)
203+
Ireduce = Rreduce[ireduce]
204+
J = max(Iother, Ireduce)
205+
val = op(val, f(_map_getindex(As, J)...))
206+
ireduce += localDim_reduce * groupDim_reduce
207+
end
208+
209+
val = reduce_group(@context(), op, val, neutral, maxitems)
210+
211+
# write back to memory
212+
if localIdx_reduce == 1
213+
R[Iout] = val
214+
end
215+
end
216+
217+
return
218+
end
219+
220+
## COV_EXCL_STOP
221+
222+
function GPUArrays.mapreducedim!(f::F, op::OP, R::AnyGPUArray{T}, A::AbstractArrayOrBroadcasted;
223+
init=nothing) where {F, OP, T}
224+
Base.check_reducedims(R, A)
225+
length(A) == 0 && return R # isempty(::Broadcasted) iterates
226+
227+
# add singleton dimensions to the output container, if needed
228+
if ndims(R) < ndims(A)
229+
dims = Base.fill_to_length(size(R), 1, Val(ndims(A)))
230+
R = reshape(R, dims)
231+
end
232+
233+
# iteration domain, split in two: one part covers the dimensions that should
234+
# be reduced, and the other covers the rest. combining both covers all values.
235+
Rall = CartesianIndices(axes(A))
236+
Rother = CartesianIndices(axes(R))
237+
Rreduce = CartesianIndices(ifelse.(axes(A) .== axes(R), Ref(Base.OneTo(1)), axes(A)))
238+
# NOTE: we hard-code `OneTo` (`first.(axes(A))` would work too) or we get a
239+
# CartesianIndices object with UnitRanges that behave badly on the GPU.
240+
@assert length(Rall) == length(Rother) * length(Rreduce)
241+
242+
# allocate an additional, empty dimension to write the reduced value to.
243+
# this does not affect the actual location in memory of the final values,
244+
# but allows us to write a generalized kernel supporting partial reductions.
245+
R′ = reshape(R, (size(R)..., 1))
246+
247+
# how many items do we want?
248+
#
249+
# items in a group work together to reduce values across the reduction dimensions;
250+
# we want as many as possible to improve algorithm efficiency and execution occupancy.
251+
wanted_items = length(Rreduce)
252+
function compute_items(max_items)
253+
if wanted_items > max_items
254+
max_items
255+
else
256+
wanted_items
257+
end
258+
end
259+
260+
# how many items can we launch?
261+
#
262+
# we might not be able to launch all those items to reduce each slice in one go.
263+
# that's why each items also loops across their inputs, processing multiple values
264+
# so that we can span the entire reduction dimension using a single item group.
265+
266+
# group size is restricted by local memory
267+
# max_lmem_elements = compute_properties(device()).maxSharedLocalMemory ÷ sizeof(T)
268+
# max_items = min(compute_properties(device()).maxTotalGroupSize,
269+
# compute_items(max_lmem_elements ÷ 2))
270+
# TODO: dynamic local memory to avoid two compilations
271+
272+
# let the driver suggest a group size
273+
# args = (f, op, init, Val(max_items), Rreduce, Rother, R′, A)
274+
# kernel_args = kernel_convert.(args)
275+
# kernel_tt = Tuple{Core.Typeof.(kernel_args)...}
276+
# kernel = zefunction(partial_mapreduce_device, kernel_tt)
277+
# reduce_items = launch_configuration(kernel)
278+
reduce_items = 512
279+
reduce_kernel = partial_mapreduce_device(get_backend(R), (reduce_items,))
280+
281+
# how many groups should we launch?
282+
#
283+
# even though we can always reduce each slice in a single item group, that may not be
284+
# optimal as it might not saturate the GPU. we already launch some groups to process
285+
# independent dimensions in parallel; pad that number to ensure full occupancy.
286+
other_groups = length(Rother)
287+
reduce_groups = cld(length(Rreduce), reduce_items)
288+
289+
# determine the launch configuration
290+
items = reduce_items
291+
groups = reduce_groups*other_groups
292+
293+
ndrange = groups*items
294+
295+
# perform the actual reduction
296+
if reduce_groups == 1
297+
# we can cover the dimensions to reduce using a single group
298+
reduce_kernel(f, op, init, Val(items), Rreduce, Rother, R′, A; ndrange)
299+
else
300+
# we need multiple steps to cover all values to reduce
301+
partial = similar(R, (size(R)..., reduce_groups))
302+
if init === nothing
303+
# without an explicit initializer we need to copy from the output container
304+
partial .= R
305+
end
306+
reduce_kernel(f, op, init, Val(items), Rreduce, Rother, partial, A; ndrange)
307+
308+
GPUArrays.mapreducedim!(identity, op, R′, partial; init=init)
309+
end
310+
311+
return R
312+
end

0 commit comments

Comments
 (0)