Skip to content

Array.Parallel.groupBy added #14895

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Mar 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
122 changes: 122 additions & 0 deletions src/FSharp.Core/array.fs
Original file line number Diff line number Diff line change
Expand Up @@ -1932,7 +1932,9 @@ module Array =
result

module Parallel =
open System.Threading
open System.Threading.Tasks
open System.Collections.Concurrent

[<CompiledName("TryFindIndex")>]
let tryFindIndex predicate (array: _[]) =
Expand Down Expand Up @@ -2054,6 +2056,126 @@ module Array =

result

// The following two parameters were benchmarked and found to be optimal.
// Benchmark was run using: 11th Gen Intel Core i9-11950H 2.60GHz, 1 CPU, 16 logical and 8 physical cores
let private maxPartitions = Environment.ProcessorCount // The maximum number of partitions to use
let private minChunkSize = 256 // The minimum size of a chunk to be sorted in parallel

let private createPartitionsUpTo maxIdxExclusive (array: 'T[]) =
[|
let chunkSize =
match maxIdxExclusive with
| smallSize when smallSize < minChunkSize -> smallSize
| biggerSize when biggerSize % maxPartitions = 0 -> biggerSize / maxPartitions
| biggerSize -> (biggerSize / maxPartitions) + 1

let mutable offset = 0

while (offset + chunkSize) < maxIdxExclusive do
yield new ArraySegment<'T>(array, offset, chunkSize)
offset <- offset + chunkSize

yield new ArraySegment<'T>(array, offset, maxIdxExclusive - offset)
|]

let inline groupByImplParallel
(comparer: IEqualityComparer<'SafeKey>)
([<InlineIfLambda>] keyf: 'T -> 'SafeKey)
([<InlineIfLambda>] getKey: 'SafeKey -> 'Key)
(array: 'T[])
=
let counts =
new ConcurrentDictionary<_, _>(
concurrencyLevel = maxPartitions,
capacity = Operators.min (array.Length) 1_000,
comparer = comparer
)

let valueFactory = new Func<_, _>(fun _ -> ref 0)

let projectedValues =
Microsoft.FSharp.Primitives.Basics.Array.zeroCreateUnchecked array.Length

let inputChunks = createPartitionsUpTo array.Length array

Parallel.For(
0,
inputChunks.Length,
fun chunkIdx ->
let chunk = inputChunks[chunkIdx]

for elemIdx = chunk.Offset to (chunk.Offset + chunk.Count - 1) do
let projected = keyf array[elemIdx]
projectedValues[elemIdx] <- projected
let counter = counts.GetOrAdd(projected, valueFactory = valueFactory)
Interlocked.Increment(counter) |> ignore
)
|> ignore

let finalResults =
Microsoft.FSharp.Primitives.Basics.Array.zeroCreateUnchecked counts.Count

let mutable finalIdx = 0

let finalResultsLookup =
new Dictionary<'SafeKey, int ref * 'T[]>(capacity = counts.Count, comparer = comparer)

for kvp in counts do
let arrayForThisGroup =
Microsoft.FSharp.Primitives.Basics.Array.zeroCreateUnchecked kvp.Value.Value

finalResults.[finalIdx] <- getKey kvp.Key, arrayForThisGroup
finalResultsLookup[kvp.Key] <- kvp.Value, arrayForThisGroup
finalIdx <- finalIdx + 1

Parallel.For(
0,
inputChunks.Length,
fun chunkIdx ->
let chunk = inputChunks[chunkIdx]

for elemIdx = chunk.Offset to (chunk.Offset + chunk.Count - 1) do
let key = projectedValues[elemIdx]
let (counter, arrayForThisGroup) = finalResultsLookup[key]
let idxToWrite = Interlocked.Decrement(counter)
arrayForThisGroup[idxToWrite] <- array[elemIdx]
)
|> ignore

finalResults

let groupByValueTypeParallel (keyf: 'T -> 'Key) (array: 'T[]) =
// Is it a bad idea to put floating points as keys for grouping? Yes
// But would the implementation fail with KeyNotFound "nan" if we just leave it? Also yes
// Here we enforce nan=nan equality to prevent throwing
if typeof<'Key> = typeof<float> || typeof<'Key> = typeof<float32> then
let genericCmp =
HashIdentity.FromFunctions<'Key>
(LanguagePrimitives.GenericHash)
(LanguagePrimitives.GenericEqualityER)

groupByImplParallel genericCmp keyf id array
else
groupByImplParallel HashIdentity.Structural<'Key> keyf id array

// Just like in regular Array.groupBy: Wrap a StructBox around all keys in order to avoid nulls
// (dotnet doesn't allow null keys in dictionaries)
let groupByRefTypeParallel (keyf: 'T -> 'Key) (array: 'T[]) =
groupByImplParallel
RuntimeHelpers.StructBox<'Key>.Comparer
(fun t -> RuntimeHelpers.StructBox(keyf t))
(fun sb -> sb.Value)
array

[<CompiledName("GroupBy")>]
let groupBy (projection: 'T -> 'Key) (array: 'T[]) =
checkNonNull "array" array

if typeof<'Key>.IsValueType then
groupByValueTypeParallel projection array
else
groupByRefTypeParallel projection array

[<CompiledName("Iterate")>]
let iter action (array: 'T[]) =
checkNonNull "array" array
Expand Down
27 changes: 27 additions & 0 deletions src/FSharp.Core/array.fsi
Original file line number Diff line number Diff line change
Expand Up @@ -3307,6 +3307,33 @@ module Array =
[<CompiledName("MapIndexed")>]
val mapi: mapping:(int -> 'T -> 'U) -> array:'T[] -> 'U[]

/// <summary>Applies a key-generating function to each element of an array in parallel and yields an array of
/// unique keys. Each unique key contains an array of all elements that match
/// to this key.</summary>
///
/// <remarks>Performs the operation in parallel using <see cref="M:System.Threading.Tasks.Parallel.For" />.
/// The order in which the given function is applied to elements of the input array is not specified.
/// The order of the keys and values in the result is also not specified</remarks>

/// <param name="projection">A function that transforms an element of the array into a comparable key.</param>
/// <param name="array">The input array.</param>
///
/// <returns>The result array.</returns>
///
/// <exception cref="T:System.ArgumentNullException">Thrown when the input array is null.</exception>
///
/// <example id="group-by-para-1">
/// <code lang="fsharp">
/// let inputs = [| 1; 2; 3; 4; 5 |]
///
/// inputs |> Array.Parallel.groupBy (fun n -> n % 2)
/// </code>
/// Evaluates to <c>[| (1, [| 1; 3; 5 |]); (0, [| 2; 4 |]) |]</c>
/// </example>
[<CompiledName("GroupBy")>]
[<Experimental("Experimental library feature, requires '--langversion:preview'")>]
val groupBy: projection:('T -> 'Key) -> array:'T[] -> ('Key * 'T[])[] when 'Key : equality

/// <summary>Apply the given function to each element of the array. </summary>
///
/// <remarks>Performs the operation in parallel using <see cref="M:System.Threading.Tasks.Parallel.For" />.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ Microsoft.FSharp.Collections.Array4DModule: T[,,,] Create[T](Int32, Int32, Int32
Microsoft.FSharp.Collections.Array4DModule: T[,,,] Initialize[T](Int32, Int32, Int32, Int32, Microsoft.FSharp.Core.FSharpFunc`2[System.Int32,Microsoft.FSharp.Core.FSharpFunc`2[System.Int32,Microsoft.FSharp.Core.FSharpFunc`2[System.Int32,Microsoft.FSharp.Core.FSharpFunc`2[System.Int32,T]]]])
Microsoft.FSharp.Collections.Array4DModule: T[,,,] ZeroCreate[T](Int32, Int32, Int32, Int32)
Microsoft.FSharp.Collections.Array4DModule: Void Set[T](T[,,,], Int32, Int32, Int32, Int32, T)
Microsoft.FSharp.Collections.ArrayModule+Parallel: System.Tuple`2[TKey,T[]][] GroupBy[T,TKey](Microsoft.FSharp.Core.FSharpFunc`2[T,TKey], T[])
Microsoft.FSharp.Collections.ArrayModule+Parallel: Microsoft.FSharp.Core.FSharpOption`1[System.Int32] TryFindIndex[T](Microsoft.FSharp.Core.FSharpFunc`2[T,System.Boolean], T[])
Microsoft.FSharp.Collections.ArrayModule+Parallel: Microsoft.FSharp.Core.FSharpOption`1[TResult] TryPick[T,TResult](Microsoft.FSharp.Core.FSharpFunc`2[T,Microsoft.FSharp.Core.FSharpOption`1[TResult]], T[])
Microsoft.FSharp.Collections.ArrayModule+Parallel: Microsoft.FSharp.Core.FSharpOption`1[T] TryFind[T](Microsoft.FSharp.Core.FSharpFunc`2[T,System.Boolean], T[])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ Microsoft.FSharp.Collections.Array4DModule: T[,,,] Create[T](Int32, Int32, Int32
Microsoft.FSharp.Collections.Array4DModule: T[,,,] Initialize[T](Int32, Int32, Int32, Int32, Microsoft.FSharp.Core.FSharpFunc`2[System.Int32,Microsoft.FSharp.Core.FSharpFunc`2[System.Int32,Microsoft.FSharp.Core.FSharpFunc`2[System.Int32,Microsoft.FSharp.Core.FSharpFunc`2[System.Int32,T]]]])
Microsoft.FSharp.Collections.Array4DModule: T[,,,] ZeroCreate[T](Int32, Int32, Int32, Int32)
Microsoft.FSharp.Collections.Array4DModule: Void Set[T](T[,,,], Int32, Int32, Int32, Int32, T)
Microsoft.FSharp.Collections.ArrayModule+Parallel: System.Tuple`2[TKey,T[]][] GroupBy[T,TKey](Microsoft.FSharp.Core.FSharpFunc`2[T,TKey], T[])
Microsoft.FSharp.Collections.ArrayModule+Parallel: Microsoft.FSharp.Core.FSharpOption`1[System.Int32] TryFindIndex[T](Microsoft.FSharp.Core.FSharpFunc`2[T,System.Boolean], T[])
Microsoft.FSharp.Collections.ArrayModule+Parallel: Microsoft.FSharp.Core.FSharpOption`1[TResult] TryPick[T,TResult](Microsoft.FSharp.Core.FSharpFunc`2[T,Microsoft.FSharp.Core.FSharpOption`1[TResult]], T[])
Microsoft.FSharp.Collections.ArrayModule+Parallel: Microsoft.FSharp.Core.FSharpOption`1[T] TryFind[T](Microsoft.FSharp.Core.FSharpFunc`2[T,System.Boolean], T[])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ Microsoft.FSharp.Collections.Array4DModule: T[,,,] Create[T](Int32, Int32, Int32
Microsoft.FSharp.Collections.Array4DModule: T[,,,] Initialize[T](Int32, Int32, Int32, Int32, Microsoft.FSharp.Core.FSharpFunc`2[System.Int32,Microsoft.FSharp.Core.FSharpFunc`2[System.Int32,Microsoft.FSharp.Core.FSharpFunc`2[System.Int32,Microsoft.FSharp.Core.FSharpFunc`2[System.Int32,T]]]])
Microsoft.FSharp.Collections.Array4DModule: T[,,,] ZeroCreate[T](Int32, Int32, Int32, Int32)
Microsoft.FSharp.Collections.Array4DModule: Void Set[T](T[,,,], Int32, Int32, Int32, Int32, T)
Microsoft.FSharp.Collections.ArrayModule+Parallel: System.Tuple`2[TKey,T[]][] GroupBy[T,TKey](Microsoft.FSharp.Core.FSharpFunc`2[T,TKey], T[])
Microsoft.FSharp.Collections.ArrayModule+Parallel: Microsoft.FSharp.Core.FSharpOption`1[System.Int32] TryFindIndex[T](Microsoft.FSharp.Core.FSharpFunc`2[T,System.Boolean], T[])
Microsoft.FSharp.Collections.ArrayModule+Parallel: Microsoft.FSharp.Core.FSharpOption`1[TResult] TryPick[T,TResult](Microsoft.FSharp.Core.FSharpFunc`2[T,Microsoft.FSharp.Core.FSharpOption`1[TResult]], T[])
Microsoft.FSharp.Collections.ArrayModule+Parallel: Microsoft.FSharp.Core.FSharpOption`1[T] TryFind[T](Microsoft.FSharp.Core.FSharpFunc`2[T,System.Boolean], T[])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ Microsoft.FSharp.Collections.Array4DModule: T[,,,] Create[T](Int32, Int32, Int32
Microsoft.FSharp.Collections.Array4DModule: T[,,,] Initialize[T](Int32, Int32, Int32, Int32, Microsoft.FSharp.Core.FSharpFunc`2[System.Int32,Microsoft.FSharp.Core.FSharpFunc`2[System.Int32,Microsoft.FSharp.Core.FSharpFunc`2[System.Int32,Microsoft.FSharp.Core.FSharpFunc`2[System.Int32,T]]]])
Microsoft.FSharp.Collections.Array4DModule: T[,,,] ZeroCreate[T](Int32, Int32, Int32, Int32)
Microsoft.FSharp.Collections.Array4DModule: Void Set[T](T[,,,], Int32, Int32, Int32, Int32, T)
Microsoft.FSharp.Collections.ArrayModule+Parallel: System.Tuple`2[TKey,T[]][] GroupBy[T,TKey](Microsoft.FSharp.Core.FSharpFunc`2[T,TKey], T[])
Microsoft.FSharp.Collections.ArrayModule+Parallel: Microsoft.FSharp.Core.FSharpOption`1[System.Int32] TryFindIndex[T](Microsoft.FSharp.Core.FSharpFunc`2[T,System.Boolean], T[])
Microsoft.FSharp.Collections.ArrayModule+Parallel: Microsoft.FSharp.Core.FSharpOption`1[TResult] TryPick[T,TResult](Microsoft.FSharp.Core.FSharpFunc`2[T,Microsoft.FSharp.Core.FSharpOption`1[TResult]], T[])
Microsoft.FSharp.Collections.ArrayModule+Parallel: Microsoft.FSharp.Core.FSharpOption`1[T] TryFind[T](Microsoft.FSharp.Core.FSharpFunc`2[T,System.Boolean], T[])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1242,6 +1242,77 @@ type ArrayModule() =
CheckThrowsArgumentNullException(fun () -> Array.groupBy funcInt (null : int array) |> ignore)
()

[<Fact>]
member _.ParallelGroupBy() =

let assertEqualityOfGroupings opName (seqGroup: ('TKey * 'TVal[])[]) (paraGroup: ('TKey * 'TVal[])[]) =
seqGroup |> Array.sortInPlaceBy fst
paraGroup |> Array.sortInPlaceBy fst

seqGroup |> Array.iter (snd >> Array.sortInPlace)
paraGroup |> Array.iter (snd >> Array.sortInPlace)

if seqGroup.Length <> paraGroup.Length then
Assert.Fail($"{opName} produced different lengths of results. Seq={seqGroup.Length};Para={paraGroup.Length}.")

let seqKeys = seqGroup |> Array.map fst
let paraKeys = paraGroup |> Array.map fst
if(seqKeys <> paraKeys) then
Assert.Fail($"{opName} produced different keys. Seq=%A{seqKeys};Para=%A{paraKeys}.")

Array.zip seqGroup paraGroup
|> Array.iter (fun ((seqKey,seqGroup), (paraKey,paraGroup)) ->
Assert.AreEqual(seqKey,paraKey,opName)
if seqGroup <> paraGroup then
Assert.Fail($"{opName} produced different results for key={seqKey}. Seq=%A{seqGroup};Para=%A{paraGroup}."))

Assert.True((seqGroup=paraGroup), $"{opName} produced different results. Seq=%A{seqGroup};Para=%A{paraGroup}.")


let compareAndAssert opName array projection =
let seqGroup = array |> Array.groupBy projection
let paraGroup = array |> Array.Parallel.groupBy projection
assertEqualityOfGroupings opName seqGroup paraGroup

// int array
let funcInt x = x%5
let IntArray = [| 0 .. 250 |]
compareAndAssert "Int grouping" IntArray funcInt


// string array
let funcStr (x:string) = x.Length
let strArray = Array.init 177 (fun i -> string i)
compareAndAssert "String grouping" strArray funcStr


// Empty array
compareAndAssert "Empty group" [||] funcInt

// Reference key which can be null
let sampleStringsCanBeNull = [|"a";null;"abc";String.Empty|]
let pickStringByIdx idx = sampleStringsCanBeNull[idx % sampleStringsCanBeNull.Length]
compareAndAssert "Key can be null" IntArray pickStringByIdx

//String array w/ null keys and values
let strArray = Array.init 222 (fun i -> if i%3=0 then String.Empty else null )
compareAndAssert "String grouping w/ nulls" strArray id

// Keys being special floats. Array.groupBy does not work here, we test results manually
let specialFloats = [|infinity; -infinity;-0.0; 0.0; 1.0; -1.0; -0.0/0.0; -nan|]
let pickSpecialFloatByIdx idx = specialFloats[idx % specialFloats.Length]

let paraGroup = IntArray |> Array.Parallel.groupBy pickSpecialFloatByIdx
Assert.AreEqual(6, paraGroup.Length, "There should be 6 special floats!")
let (nan,nansGroup) = paraGroup |> Array.find (fun (k,_) -> Double.IsNaN(k))
// Both -0.0/0.0; -nan are a Nan. => 2/8 => every 4th elements goes to the NaN bucket
Assert.AreEqual((IntArray.Length / 4), nansGroup.Length, $"There should be {(IntArray.Length / 4)} NaNs!")



CheckThrowsArgumentNullException(fun () -> Array.Parallel.groupBy funcInt (null : int array) |> ignore)
()

member private this.InitTester initInt initString =
// integer array
let resultInt : int[] = initInt 3 (fun x -> x + 3)
Expand Down