Skip to content

Commit 1998f8d

Browse files
authored
Array.Parallel.groupBy added (#14895)
* Array.Parallel.groupBy added
1 parent c108f00 commit 1998f8d

7 files changed

+224
-0
lines changed

Diff for: src/FSharp.Core/array.fs

+122
Original file line numberDiff line numberDiff line change
@@ -1932,7 +1932,9 @@ module Array =
19321932
result
19331933

19341934
module Parallel =
1935+
open System.Threading
19351936
open System.Threading.Tasks
1937+
open System.Collections.Concurrent
19361938

19371939
[<CompiledName("TryFindIndex")>]
19381940
let tryFindIndex predicate (array: _[]) =
@@ -2054,6 +2056,126 @@ module Array =
20542056

20552057
result
20562058

2059+
// The following two parameters were benchmarked and found to be optimal.
2060+
// Benchmark was run using: 11th Gen Intel Core i9-11950H 2.60GHz, 1 CPU, 16 logical and 8 physical cores
2061+
let private maxPartitions = Environment.ProcessorCount // The maximum number of partitions to use
2062+
let private minChunkSize = 256 // The minimum size of a chunk to be sorted in parallel
2063+
2064+
let private createPartitionsUpTo maxIdxExclusive (array: 'T[]) =
2065+
[|
2066+
let chunkSize =
2067+
match maxIdxExclusive with
2068+
| smallSize when smallSize < minChunkSize -> smallSize
2069+
| biggerSize when biggerSize % maxPartitions = 0 -> biggerSize / maxPartitions
2070+
| biggerSize -> (biggerSize / maxPartitions) + 1
2071+
2072+
let mutable offset = 0
2073+
2074+
while (offset + chunkSize) < maxIdxExclusive do
2075+
yield new ArraySegment<'T>(array, offset, chunkSize)
2076+
offset <- offset + chunkSize
2077+
2078+
yield new ArraySegment<'T>(array, offset, maxIdxExclusive - offset)
2079+
|]
2080+
2081+
let inline groupByImplParallel
2082+
(comparer: IEqualityComparer<'SafeKey>)
2083+
([<InlineIfLambda>] keyf: 'T -> 'SafeKey)
2084+
([<InlineIfLambda>] getKey: 'SafeKey -> 'Key)
2085+
(array: 'T[])
2086+
=
2087+
let counts =
2088+
new ConcurrentDictionary<_, _>(
2089+
concurrencyLevel = maxPartitions,
2090+
capacity = Operators.min (array.Length) 1_000,
2091+
comparer = comparer
2092+
)
2093+
2094+
let valueFactory = new Func<_, _>(fun _ -> ref 0)
2095+
2096+
let projectedValues =
2097+
Microsoft.FSharp.Primitives.Basics.Array.zeroCreateUnchecked array.Length
2098+
2099+
let inputChunks = createPartitionsUpTo array.Length array
2100+
2101+
Parallel.For(
2102+
0,
2103+
inputChunks.Length,
2104+
fun chunkIdx ->
2105+
let chunk = inputChunks[chunkIdx]
2106+
2107+
for elemIdx = chunk.Offset to (chunk.Offset + chunk.Count - 1) do
2108+
let projected = keyf array[elemIdx]
2109+
projectedValues[elemIdx] <- projected
2110+
let counter = counts.GetOrAdd(projected, valueFactory = valueFactory)
2111+
Interlocked.Increment(counter) |> ignore
2112+
)
2113+
|> ignore
2114+
2115+
let finalResults =
2116+
Microsoft.FSharp.Primitives.Basics.Array.zeroCreateUnchecked counts.Count
2117+
2118+
let mutable finalIdx = 0
2119+
2120+
let finalResultsLookup =
2121+
new Dictionary<'SafeKey, int ref * 'T[]>(capacity = counts.Count, comparer = comparer)
2122+
2123+
for kvp in counts do
2124+
let arrayForThisGroup =
2125+
Microsoft.FSharp.Primitives.Basics.Array.zeroCreateUnchecked kvp.Value.Value
2126+
2127+
finalResults.[finalIdx] <- getKey kvp.Key, arrayForThisGroup
2128+
finalResultsLookup[kvp.Key] <- kvp.Value, arrayForThisGroup
2129+
finalIdx <- finalIdx + 1
2130+
2131+
Parallel.For(
2132+
0,
2133+
inputChunks.Length,
2134+
fun chunkIdx ->
2135+
let chunk = inputChunks[chunkIdx]
2136+
2137+
for elemIdx = chunk.Offset to (chunk.Offset + chunk.Count - 1) do
2138+
let key = projectedValues[elemIdx]
2139+
let (counter, arrayForThisGroup) = finalResultsLookup[key]
2140+
let idxToWrite = Interlocked.Decrement(counter)
2141+
arrayForThisGroup[idxToWrite] <- array[elemIdx]
2142+
)
2143+
|> ignore
2144+
2145+
finalResults
2146+
2147+
let groupByValueTypeParallel (keyf: 'T -> 'Key) (array: 'T[]) =
2148+
// Is it a bad idea to put floating points as keys for grouping? Yes
2149+
// But would the implementation fail with KeyNotFound "nan" if we just leave it? Also yes
2150+
// Here we enforce nan=nan equality to prevent throwing
2151+
if typeof<'Key> = typeof<float> || typeof<'Key> = typeof<float32> then
2152+
let genericCmp =
2153+
HashIdentity.FromFunctions<'Key>
2154+
(LanguagePrimitives.GenericHash)
2155+
(LanguagePrimitives.GenericEqualityER)
2156+
2157+
groupByImplParallel genericCmp keyf id array
2158+
else
2159+
groupByImplParallel HashIdentity.Structural<'Key> keyf id array
2160+
2161+
// Just like in regular Array.groupBy: Wrap a StructBox around all keys in order to avoid nulls
2162+
// (dotnet doesn't allow null keys in dictionaries)
2163+
let groupByRefTypeParallel (keyf: 'T -> 'Key) (array: 'T[]) =
2164+
groupByImplParallel
2165+
RuntimeHelpers.StructBox<'Key>.Comparer
2166+
(fun t -> RuntimeHelpers.StructBox(keyf t))
2167+
(fun sb -> sb.Value)
2168+
array
2169+
2170+
[<CompiledName("GroupBy")>]
2171+
let groupBy (projection: 'T -> 'Key) (array: 'T[]) =
2172+
checkNonNull "array" array
2173+
2174+
if typeof<'Key>.IsValueType then
2175+
groupByValueTypeParallel projection array
2176+
else
2177+
groupByRefTypeParallel projection array
2178+
20572179
[<CompiledName("Iterate")>]
20582180
let iter action (array: 'T[]) =
20592181
checkNonNull "array" array

Diff for: src/FSharp.Core/array.fsi

+27
Original file line numberDiff line numberDiff line change
@@ -3307,6 +3307,33 @@ module Array =
33073307
[<CompiledName("MapIndexed")>]
33083308
val mapi: mapping:(int -> 'T -> 'U) -> array:'T[] -> 'U[]
33093309

3310+
/// <summary>Applies a key-generating function to each element of an array in parallel and yields an array of
3311+
/// unique keys. Each unique key contains an array of all elements that match
3312+
/// to this key.</summary>
3313+
///
3314+
/// <remarks>Performs the operation in parallel using <see cref="M:System.Threading.Tasks.Parallel.For" />.
3315+
/// The order in which the given function is applied to elements of the input array is not specified.
3316+
/// The order of the keys and values in the result is also not specified</remarks>
3317+
3318+
/// <param name="projection">A function that transforms an element of the array into a comparable key.</param>
3319+
/// <param name="array">The input array.</param>
3320+
///
3321+
/// <returns>The result array.</returns>
3322+
///
3323+
/// <exception cref="T:System.ArgumentNullException">Thrown when the input array is null.</exception>
3324+
///
3325+
/// <example id="group-by-para-1">
3326+
/// <code lang="fsharp">
3327+
/// let inputs = [| 1; 2; 3; 4; 5 |]
3328+
///
3329+
/// inputs |> Array.Parallel.groupBy (fun n -> n % 2)
3330+
/// </code>
3331+
/// Evaluates to <c>[| (1, [| 1; 3; 5 |]); (0, [| 2; 4 |]) |]</c>
3332+
/// </example>
3333+
[<CompiledName("GroupBy")>]
3334+
[<Experimental("Experimental library feature, requires '--langversion:preview'")>]
3335+
val groupBy: projection:('T -> 'Key) -> array:'T[] -> ('Key * 'T[])[] when 'Key : equality
3336+
33103337
/// <summary>Apply the given function to each element of the array. </summary>
33113338
///
33123339
/// <remarks>Performs the operation in parallel using <see cref="M:System.Threading.Tasks.Parallel.For" />.

Diff for: tests/FSharp.Core.UnitTests/FSharp.Core.SurfaceArea.netstandard20.debug.bsl

+1
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ Microsoft.FSharp.Collections.Array4DModule: T[,,,] Create[T](Int32, Int32, Int32
4040
Microsoft.FSharp.Collections.Array4DModule: T[,,,] Initialize[T](Int32, Int32, Int32, Int32, Microsoft.FSharp.Core.FSharpFunc`2[System.Int32,Microsoft.FSharp.Core.FSharpFunc`2[System.Int32,Microsoft.FSharp.Core.FSharpFunc`2[System.Int32,Microsoft.FSharp.Core.FSharpFunc`2[System.Int32,T]]]])
4141
Microsoft.FSharp.Collections.Array4DModule: T[,,,] ZeroCreate[T](Int32, Int32, Int32, Int32)
4242
Microsoft.FSharp.Collections.Array4DModule: Void Set[T](T[,,,], Int32, Int32, Int32, Int32, T)
43+
Microsoft.FSharp.Collections.ArrayModule+Parallel: System.Tuple`2[TKey,T[]][] GroupBy[T,TKey](Microsoft.FSharp.Core.FSharpFunc`2[T,TKey], T[])
4344
Microsoft.FSharp.Collections.ArrayModule+Parallel: Microsoft.FSharp.Core.FSharpOption`1[System.Int32] TryFindIndex[T](Microsoft.FSharp.Core.FSharpFunc`2[T,System.Boolean], T[])
4445
Microsoft.FSharp.Collections.ArrayModule+Parallel: Microsoft.FSharp.Core.FSharpOption`1[TResult] TryPick[T,TResult](Microsoft.FSharp.Core.FSharpFunc`2[T,Microsoft.FSharp.Core.FSharpOption`1[TResult]], T[])
4546
Microsoft.FSharp.Collections.ArrayModule+Parallel: Microsoft.FSharp.Core.FSharpOption`1[T] TryFind[T](Microsoft.FSharp.Core.FSharpFunc`2[T,System.Boolean], T[])

Diff for: tests/FSharp.Core.UnitTests/FSharp.Core.SurfaceArea.netstandard20.release.bsl

+1
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ Microsoft.FSharp.Collections.Array4DModule: T[,,,] Create[T](Int32, Int32, Int32
4040
Microsoft.FSharp.Collections.Array4DModule: T[,,,] Initialize[T](Int32, Int32, Int32, Int32, Microsoft.FSharp.Core.FSharpFunc`2[System.Int32,Microsoft.FSharp.Core.FSharpFunc`2[System.Int32,Microsoft.FSharp.Core.FSharpFunc`2[System.Int32,Microsoft.FSharp.Core.FSharpFunc`2[System.Int32,T]]]])
4141
Microsoft.FSharp.Collections.Array4DModule: T[,,,] ZeroCreate[T](Int32, Int32, Int32, Int32)
4242
Microsoft.FSharp.Collections.Array4DModule: Void Set[T](T[,,,], Int32, Int32, Int32, Int32, T)
43+
Microsoft.FSharp.Collections.ArrayModule+Parallel: System.Tuple`2[TKey,T[]][] GroupBy[T,TKey](Microsoft.FSharp.Core.FSharpFunc`2[T,TKey], T[])
4344
Microsoft.FSharp.Collections.ArrayModule+Parallel: Microsoft.FSharp.Core.FSharpOption`1[System.Int32] TryFindIndex[T](Microsoft.FSharp.Core.FSharpFunc`2[T,System.Boolean], T[])
4445
Microsoft.FSharp.Collections.ArrayModule+Parallel: Microsoft.FSharp.Core.FSharpOption`1[TResult] TryPick[T,TResult](Microsoft.FSharp.Core.FSharpFunc`2[T,Microsoft.FSharp.Core.FSharpOption`1[TResult]], T[])
4546
Microsoft.FSharp.Collections.ArrayModule+Parallel: Microsoft.FSharp.Core.FSharpOption`1[T] TryFind[T](Microsoft.FSharp.Core.FSharpFunc`2[T,System.Boolean], T[])

Diff for: tests/FSharp.Core.UnitTests/FSharp.Core.SurfaceArea.netstandard21.debug.bsl

+1
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ Microsoft.FSharp.Collections.Array4DModule: T[,,,] Create[T](Int32, Int32, Int32
4040
Microsoft.FSharp.Collections.Array4DModule: T[,,,] Initialize[T](Int32, Int32, Int32, Int32, Microsoft.FSharp.Core.FSharpFunc`2[System.Int32,Microsoft.FSharp.Core.FSharpFunc`2[System.Int32,Microsoft.FSharp.Core.FSharpFunc`2[System.Int32,Microsoft.FSharp.Core.FSharpFunc`2[System.Int32,T]]]])
4141
Microsoft.FSharp.Collections.Array4DModule: T[,,,] ZeroCreate[T](Int32, Int32, Int32, Int32)
4242
Microsoft.FSharp.Collections.Array4DModule: Void Set[T](T[,,,], Int32, Int32, Int32, Int32, T)
43+
Microsoft.FSharp.Collections.ArrayModule+Parallel: System.Tuple`2[TKey,T[]][] GroupBy[T,TKey](Microsoft.FSharp.Core.FSharpFunc`2[T,TKey], T[])
4344
Microsoft.FSharp.Collections.ArrayModule+Parallel: Microsoft.FSharp.Core.FSharpOption`1[System.Int32] TryFindIndex[T](Microsoft.FSharp.Core.FSharpFunc`2[T,System.Boolean], T[])
4445
Microsoft.FSharp.Collections.ArrayModule+Parallel: Microsoft.FSharp.Core.FSharpOption`1[TResult] TryPick[T,TResult](Microsoft.FSharp.Core.FSharpFunc`2[T,Microsoft.FSharp.Core.FSharpOption`1[TResult]], T[])
4546
Microsoft.FSharp.Collections.ArrayModule+Parallel: Microsoft.FSharp.Core.FSharpOption`1[T] TryFind[T](Microsoft.FSharp.Core.FSharpFunc`2[T,System.Boolean], T[])

Diff for: tests/FSharp.Core.UnitTests/FSharp.Core.SurfaceArea.netstandard21.release.bsl

+1
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ Microsoft.FSharp.Collections.Array4DModule: T[,,,] Create[T](Int32, Int32, Int32
4040
Microsoft.FSharp.Collections.Array4DModule: T[,,,] Initialize[T](Int32, Int32, Int32, Int32, Microsoft.FSharp.Core.FSharpFunc`2[System.Int32,Microsoft.FSharp.Core.FSharpFunc`2[System.Int32,Microsoft.FSharp.Core.FSharpFunc`2[System.Int32,Microsoft.FSharp.Core.FSharpFunc`2[System.Int32,T]]]])
4141
Microsoft.FSharp.Collections.Array4DModule: T[,,,] ZeroCreate[T](Int32, Int32, Int32, Int32)
4242
Microsoft.FSharp.Collections.Array4DModule: Void Set[T](T[,,,], Int32, Int32, Int32, Int32, T)
43+
Microsoft.FSharp.Collections.ArrayModule+Parallel: System.Tuple`2[TKey,T[]][] GroupBy[T,TKey](Microsoft.FSharp.Core.FSharpFunc`2[T,TKey], T[])
4344
Microsoft.FSharp.Collections.ArrayModule+Parallel: Microsoft.FSharp.Core.FSharpOption`1[System.Int32] TryFindIndex[T](Microsoft.FSharp.Core.FSharpFunc`2[T,System.Boolean], T[])
4445
Microsoft.FSharp.Collections.ArrayModule+Parallel: Microsoft.FSharp.Core.FSharpOption`1[TResult] TryPick[T,TResult](Microsoft.FSharp.Core.FSharpFunc`2[T,Microsoft.FSharp.Core.FSharpOption`1[TResult]], T[])
4546
Microsoft.FSharp.Collections.ArrayModule+Parallel: Microsoft.FSharp.Core.FSharpOption`1[T] TryFind[T](Microsoft.FSharp.Core.FSharpFunc`2[T,System.Boolean], T[])

Diff for: tests/FSharp.Core.UnitTests/FSharp.Core/Microsoft.FSharp.Collections/ArrayModule.fs

+71
Original file line numberDiff line numberDiff line change
@@ -1242,6 +1242,77 @@ type ArrayModule() =
12421242
CheckThrowsArgumentNullException(fun () -> Array.groupBy funcInt (null : int array) |> ignore)
12431243
()
12441244

1245+
[<Fact>]
1246+
member _.ParallelGroupBy() =
1247+
1248+
let assertEqualityOfGroupings opName (seqGroup: ('TKey * 'TVal[])[]) (paraGroup: ('TKey * 'TVal[])[]) =
1249+
seqGroup |> Array.sortInPlaceBy fst
1250+
paraGroup |> Array.sortInPlaceBy fst
1251+
1252+
seqGroup |> Array.iter (snd >> Array.sortInPlace)
1253+
paraGroup |> Array.iter (snd >> Array.sortInPlace)
1254+
1255+
if seqGroup.Length <> paraGroup.Length then
1256+
Assert.Fail($"{opName} produced different lengths of results. Seq={seqGroup.Length};Para={paraGroup.Length}.")
1257+
1258+
let seqKeys = seqGroup |> Array.map fst
1259+
let paraKeys = paraGroup |> Array.map fst
1260+
if(seqKeys <> paraKeys) then
1261+
Assert.Fail($"{opName} produced different keys. Seq=%A{seqKeys};Para=%A{paraKeys}.")
1262+
1263+
Array.zip seqGroup paraGroup
1264+
|> Array.iter (fun ((seqKey,seqGroup), (paraKey,paraGroup)) ->
1265+
Assert.AreEqual(seqKey,paraKey,opName)
1266+
if seqGroup <> paraGroup then
1267+
Assert.Fail($"{opName} produced different results for key={seqKey}. Seq=%A{seqGroup};Para=%A{paraGroup}."))
1268+
1269+
Assert.True((seqGroup=paraGroup), $"{opName} produced different results. Seq=%A{seqGroup};Para=%A{paraGroup}.")
1270+
1271+
1272+
let compareAndAssert opName array projection =
1273+
let seqGroup = array |> Array.groupBy projection
1274+
let paraGroup = array |> Array.Parallel.groupBy projection
1275+
assertEqualityOfGroupings opName seqGroup paraGroup
1276+
1277+
// int array
1278+
let funcInt x = x%5
1279+
let IntArray = [| 0 .. 250 |]
1280+
compareAndAssert "Int grouping" IntArray funcInt
1281+
1282+
1283+
// string array
1284+
let funcStr (x:string) = x.Length
1285+
let strArray = Array.init 177 (fun i -> string i)
1286+
compareAndAssert "String grouping" strArray funcStr
1287+
1288+
1289+
// Empty array
1290+
compareAndAssert "Empty group" [||] funcInt
1291+
1292+
// Reference key which can be null
1293+
let sampleStringsCanBeNull = [|"a";null;"abc";String.Empty|]
1294+
let pickStringByIdx idx = sampleStringsCanBeNull[idx % sampleStringsCanBeNull.Length]
1295+
compareAndAssert "Key can be null" IntArray pickStringByIdx
1296+
1297+
//String array w/ null keys and values
1298+
let strArray = Array.init 222 (fun i -> if i%3=0 then String.Empty else null )
1299+
compareAndAssert "String grouping w/ nulls" strArray id
1300+
1301+
// Keys being special floats. Array.groupBy does not work here, we test results manually
1302+
let specialFloats = [|infinity; -infinity;-0.0; 0.0; 1.0; -1.0; -0.0/0.0; -nan|]
1303+
let pickSpecialFloatByIdx idx = specialFloats[idx % specialFloats.Length]
1304+
1305+
let paraGroup = IntArray |> Array.Parallel.groupBy pickSpecialFloatByIdx
1306+
Assert.AreEqual(6, paraGroup.Length, "There should be 6 special floats!")
1307+
let (nan,nansGroup) = paraGroup |> Array.find (fun (k,_) -> Double.IsNaN(k))
1308+
// Both -0.0/0.0; -nan are a Nan. => 2/8 => every 4th elements goes to the NaN bucket
1309+
Assert.AreEqual((IntArray.Length / 4), nansGroup.Length, $"There should be {(IntArray.Length / 4)} NaNs!")
1310+
1311+
1312+
1313+
CheckThrowsArgumentNullException(fun () -> Array.Parallel.groupBy funcInt (null : int array) |> ignore)
1314+
()
1315+
12451316
member private this.InitTester initInt initString =
12461317
// integer array
12471318
let resultInt : int[] = initInt 3 (fun x -> x + 3)

0 commit comments

Comments
 (0)