Skip to content

Commit

Permalink
Merge pull request #35 from LilithSilver/fix-ijobparallelfor
Browse files Browse the repository at this point in the history
Actually fix IJobParallelFor
  • Loading branch information
genaray authored Feb 20, 2024
2 parents ded2656 + cd5bbfb commit 399e77c
Show file tree
Hide file tree
Showing 6 changed files with 198 additions and 22 deletions.
70 changes: 68 additions & 2 deletions JobScheduler.Benchmarks/ParallelForBenchmark.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ namespace Schedulers.Benchmarks;
[MemoryDiagnoser]
public abstract class ParallelForBenchmark
{

private Schedulers.JobScheduler _scheduler = null!;

/// <summary>
Expand Down Expand Up @@ -77,6 +76,30 @@ public void Finish() { }

private BasicParallelJob _basicParallel = null!;

private class BasicRegularJob : IJob
{
private readonly ParallelForBenchmark _benchmark;
private readonly int _start;
private readonly int _size;
public BasicRegularJob(ParallelForBenchmark benchmark, int start, int size)
{
_benchmark = benchmark;
_start = start;
_size = size;
}

public void Execute()
{
for (var i = _start; i < _start + _size; i++)
{
_benchmark.Work(i);
}
}
}

private List<BasicRegularJob> _basicRegulars = null!;
private List<JobHandle> _jobHandles = null!;

// to give Parallel.For the best chance possible of competing well, we allow it to not allocate
private static ParallelForBenchmark _currentBenchmarkCache = null!;

Expand All @@ -92,6 +115,28 @@ public void Setup()
};
_scheduler = new(config);
_basicParallel = new(this);
_basicRegulars = new(_scheduler.ThreadCount);

var remaining = Size;
var amountPerThread = Size / _scheduler.ThreadCount;
for (var i = 0; i < _scheduler.ThreadCount; i++)
{
var amount = amountPerThread;
if (remaining < amount)
{
amount = remaining;
}

if (remaining == 0)
{
break;
}

_basicRegulars.Add(new(this, i * amountPerThread, amount));
remaining -= amount;
}

_jobHandles = new(_scheduler.ThreadCount);
_currentBenchmarkCache = this;
Init();
}
Expand Down Expand Up @@ -125,7 +170,7 @@ private static void DoFor(int i)
}

[Benchmark]
public void BenchmarkStandardParallelFor()
public void BenchmarkDotNetParallelFor()
{
for (var w = 0; w < Waves; w++)
{
Expand All @@ -143,4 +188,25 @@ public void BenchmarkParallelFor()
handle.Complete();
}
}

[Benchmark]
public void BenchmarkNaiveParallelFor()
{
for (var w = 0; w < Waves; w++)
{
foreach (var job in _basicRegulars)
{
_jobHandles.Add(_scheduler.Schedule(job));
}

_scheduler.Flush();

foreach (var handle in _jobHandles)
{
handle.Complete();
}

_jobHandles.Clear();
}
}
}
24 changes: 21 additions & 3 deletions JobScheduler.Benchmarks/ParallelForBenchmarkMatrix.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
namespace Schedulers.Benchmarks;


/// <summary>
/// Increments a simple counter as the work;
/// </summary>
Expand Down Expand Up @@ -43,9 +42,28 @@ protected override void Work(int index)
var col = index % _dim;
float sum = 0;

for (var k = 0; k < _dim; k++)
// test more complicated work as indices get higher to test work stealing
var reps = 0;
if (index < Size / 3)
{
reps = 1;
}
else if (index < Size / 3 * 2)
{
reps = 3;
}
else
{
reps = 6;
}

for (var i = 0; i < reps; i++)
{
sum += _matrixA[(row * _dim) + k] * _matrixB[(k * _dim) + col];
sum = 0;
for (var k = 0; k < _dim; k++)
{
sum += _matrixA[(row * _dim) + k] * _matrixB[(k * _dim) + col];
}
}

_matrixC[index] = sum;
Expand Down
58 changes: 45 additions & 13 deletions JobScheduler.Test/ParallelJobTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ public void ManyDependentParallelJobsComplete(int count, int size, int batchSize
[TestCase(5, 64, 250, 512, 3)]
public void ParallelJobGraphCompletes(int graphCount, int nodesPerGraph, int waves, int size, int batchSize)
{
Dictionary<int, ParallelTestJob> jobs = new();
Dictionary<int, ParallelTestJob> jobs = [];

GraphRunner.TestGraph(graphCount, nodesPerGraph, waves,
(index, dependency) =>
Expand All @@ -237,27 +237,20 @@ public void ParallelJobGraphCompletes(int graphCount, int nodesPerGraph, int wav
});
}

private class ParallelSleepJob : IJobParallelFor
private class ParallelSleepJob(int expectedSize, int sleep) : IJobParallelFor
{
private readonly int _sleep;

public ParallelSleepJob(int expectedSize, int sleep)
{
ThreadIDs = new int[expectedSize];
_sleep = sleep;
}
public int[] ThreadIDs { get; }
public int[] ThreadIDs { get; } = new int[expectedSize];

public int ThreadCount { get => 0; }
// keep a low batch size if we sleep (lots of work simulated)
public int BatchSize { get => _sleep > 0 ? 32 : 1; }
public int BatchSize { get => sleep > 0 ? 32 : 1; }

public void Execute(int index)
{
ThreadIDs[index] = Environment.CurrentManagedThreadId;
if (_sleep > 0)
if (sleep > 0)
{
Thread.Sleep(_sleep);
Thread.Sleep(sleep);
}
}

Expand All @@ -278,5 +271,44 @@ public void ParallelJobRunsParallel(int size, int sleep)

Assert.That(job.ThreadIDs.Distinct(), Has.Exactly(Scheduler.ThreadCount).Items);
}

private class SegmentedParallelJob(int expectedSize, int threadCount) : IJobParallelFor
{
public int[] ThreadIDs { get; } = new int[expectedSize];

public int ThreadCount { get => 0; }

public int BatchSize
{
get => 1;
}

public void Execute(int index)
{
ThreadIDs[index] = Environment.CurrentManagedThreadId;
// in first thread's batch, sleep. This should later be stolen by a bunch of other threads.
if (index < expectedSize / threadCount)
{
Thread.Sleep(1);
}
}

public void Finish()
{
Assert.That(ThreadIDs, Does.Not.Contain(0));
}
}

[TestCase(256)]
public void ParallelJobDoesWorkStealing(int size)
{
var job = new SegmentedParallelJob(size, Scheduler.ThreadCount);
var handle = Scheduler.Schedule(job, size);
Scheduler.Flush();
handle.Complete();

// the first jobs should have an even distribution of threads
Assert.That(job.ThreadIDs.Take(size / Scheduler.ThreadCount).Distinct(), Has.Exactly(Scheduler.ThreadCount).Items);
}
}

12 changes: 11 additions & 1 deletion JobScheduler/Deque/RangeWorkStealingDeque.cs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ public enum Status
private int _start;
private int _end;
private int _batchSize;
private bool _empty = true;

/// <summary>
/// Initializes an empty <see cref="RangeWorkStealingDeque"/>.
Expand All @@ -47,7 +48,7 @@ public RangeWorkStealingDeque()
/// </summary>
public bool IsEmpty
{
get => Interlocked.Read(ref _top) >= Volatile.Read(ref _bottom);
get => _empty;
}

/// <summary>
Expand All @@ -69,6 +70,7 @@ public void Set(int start, int count, int batchSize)
_start = start;
_end = start + count;
_batchSize = batchSize;
_empty = false;
}

// Is oldVal equal to our current top? If so, we're good; exchange them and return true. If not, we went
Expand Down Expand Up @@ -125,12 +127,14 @@ public Status TryPopBottom(out Range range)
{
// Reset to empty canon. Remember, our t is out of date by one, so after this _top == _bottom
Volatile.Write(ref _bottom, t + 1);
_empty = true;
return Status.Empty;
}

// We won the race! Return the full remaining range and reset to empty canon.
Volatile.Write(ref _bottom, t + 1);
range = popped;
_empty = true;
return Status.Success;
}

Expand All @@ -151,6 +155,7 @@ public Status TrySteal(out Range range)
// If we're empty, don't even try.
if (size <= 0)
{
_empty = true;
return Status.Empty;
}

Expand All @@ -167,6 +172,11 @@ public Status TrySteal(out Range range)

// We won!
range = stolen;
if (b - (t + 1) <= 0)
{
_empty = true;
}

return Status.Success;
}

Expand Down
9 changes: 6 additions & 3 deletions JobScheduler/JobScheduler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -319,9 +319,12 @@ public JobHandle Schedule(IJob job, JobHandle? dependency = null)
/// Note that this will schedule as many jobs as specified in <see cref="IJobParallelFor"/> or the maximum thread count, whichever is less
/// (or the maximum thread count if the threads provided are 0). See <see cref="IJobParallelFor"/> for details.
/// </remarks>
/// <param name="job"></param>
/// <param name="amount"></param>
/// <param name="dependency"></param>
/// <param name="job">The <see cref="IJobParallelFor"/> to schedule.</param>
/// <param name="amount">
/// The amount of indices to execute.
/// <see cref="IJobParallelFor.Execute(int)"/> will be called for each value in <c>[0, <paramref name="amount"/>)</c>.
/// </param>
/// <param name="dependency">A <see cref="JobHandle"/> dependency to require completion of first.</param>
/// <returns>The <see cref="JobHandle"/> of a job representing the full task.</returns>
/// <exception cref="InvalidOperationException">If called on a different thread than the <see cref="JobScheduler"/> was constructed on</exception>
/// <exception cref="MaximumConcurrentJobCountExceededException">If the maximum amount of concurrent jobs is at maximum, and strict mode is enabled.</exception>
Expand Down
47 changes: 47 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -89,3 +89,50 @@ JobHandle.CompleteAll(List<JobHandle> handles);
```

Or, if you don't want to maintain a list or array, you can just call `handle.Complete()` on all your handles, in any order.


# Parallel-For support

Instead of `IJob`, you may extend your class from `IJobParallelFor` to implement foreach-style indexing on a job. This is useful for when you have very many small operations, and it would be inefficient to schedule a whole job for each one; for example, iterating through a giant set of data.

Define an `IJobParallelFor` like so:

```csharp
public class ManyCalculations : IJobParallelFor
{
// Execute will be called for each i for the specified amount
public void Execute(int i)
{
// ... do some operation with i here
}

// Finish will be called once all operations are completed.
public void Finish()
{
Debug.Log("All done!");
}

// BatchSize is a measure of how "complicated" your operations are. Detailed below.
public int BatchSize => 32;

// Restrict the number of spawned jobs to decrease memory usage and overhead. Keep this at 0 to use the Scheduler's number of active threads (recommended).
public int ThreadCount => 0;
}

```

Run your `IJobParallelFor` with this syntax:


```csharp
var job = new ManyCalculations();
var handle = scheduler.Schedule(job, 512); // Execute will be called 512 times

```

However, there are several caveats:

* Don't overuse `IJobParallelFor`. In general, over-parallelization is a bad thing. Only schedule your job in parallel if it is truly iterating a huge amount of times, and make sure to always profile when dealing with multithreaded code.
* You must choose a sane `BatchSize` for the work inside your job. If you have very many small tasks that complete very quickly, a higher batch size will dispatch more indices to each thread at once, minimizing scheduler overhead. On the other hand, if you have complicated (or mixed-complexity) tasks, a smaller batch size will maximize the ability for threads to use work-stealing and thus might complete faster. The only way to know what batch size to use is to profile your code and see what's faster!
* Scheduling just a single `IJobParallelFor` actually schedules `ThreadCount` jobs on the backend, decreasing the jobs pool. If you make a lot of these, the amount of jobs in play could quickly increase. For example, on a 16-core CPU, with default settings, spawning 8 `IJobParallelFor` would spawn 128 jobs on the backend. The scheduler can certainly handle it, but you'll probably want to keep an eye on `MaxExpectedCurrentJobs` if you want to keep the scheduler truly zero-allocation.

0 comments on commit 399e77c

Please sign in to comment.