Skip to content

Commit

Permalink
fix thread starvation in ManualExecutor.*Wait
Browse files Browse the repository at this point in the history
Threads may fail to wake for some time since the method waits
on a `ManualResetEventSlim` which is reset before the next spin.
To prevent such cases the waiting condition has been modified
to additionally check an integer which is incremented after every spin
to detected if the spin which was waited upon has ended.
  • Loading branch information
Deric-W committed Sep 1, 2023
1 parent 2e5bdd9 commit 77bfffd
Show file tree
Hide file tree
Showing 2 changed files with 119 additions and 16 deletions.
132 changes: 116 additions & 16 deletions src/ros2cs/ros2cs_core/executors/ManualExecutor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -44,21 +44,29 @@ public IContext Context
/// </summary>
public bool IsSpinning
{
get { return !this.IsIdle.IsSet; }
get => this._IsSpinning;
private set => this._IsSpinning = value;
}

private volatile bool _IsSpinning = false;

/// <summary>
/// Whether a rescan is scheduled.
/// </summary>
public bool RescanScheduled
{
get { return this._RescanScheduled; }
private set { this._RescanScheduled = value; }
get => this._RescanScheduled;
private set => this._RescanScheduled = value;
}

// volatile since it may be changed by multiple threads
private volatile bool _RescanScheduled = false;

/// <summary>
/// To prevent <see cref="TryWait"/> from being starved by multiple spins.
/// </summary>
private long SpinId = 0;

/// <inheritdoc/>
public bool IsDisposed
{
Expand Down Expand Up @@ -92,6 +100,9 @@ public bool IsReadOnly
/// <summary>
/// Wait set used while spinning.
/// </summary>
/// <remarks>
/// Is also used to notify <see cref="TryWait"/> when the executor finished spinning.
/// </remarks>
private readonly WaitSet WaitSet;

/// <summary>
Expand All @@ -104,11 +115,6 @@ public bool IsReadOnly
/// </summary>
private readonly HashSet<INode> Nodes = new HashSet<INode>();

/// <summary>
/// Event signaling whether the executor is not spinning.
/// </summary>
private readonly ManualResetEventSlim IsIdle = new ManualResetEventSlim(true);

/// <summary>
/// Create a new instance.
/// </summary>
Expand Down Expand Up @@ -286,21 +292,92 @@ public bool TryScheduleRescan(INode node)
/// <inheritdoc/>
public void Wait()
{
bool success = this.TryWait(TimeSpan.FromMilliseconds(-1));
Debug.Assert(success, "infinite wait timed out");
if (this.RescanScheduled)
{
lock (this.WaitSet)
{
this.WaitUntilDone(this.SpinId);
}
}
}

/// <remarks>
/// This method is thread safe.
/// </remarks>
/// <exception cref="ObjectDisposedException"> If the executor was disposed. </exception>
/// <exception cref="ArgumentOutOfRangeException"> If the timeout is negative or too big. </exception>
/// <inheritdoc/>
public bool TryWait(TimeSpan timeout)
{
if (this.RescanScheduled && this.IsSpinning)
if (timeout.Ticks < 0)
{
throw new ArgumentOutOfRangeException("timeout is negative");
}
if (this.RescanScheduled)
{
lock (this.WaitSet)
{
// read id inside the lock to prevent an outdated id from being copied
return this.WaitUntilDone(this.SpinId, timeout);
}
}
return true;
}

/// <summary>
/// Utility method to wait until the current spin has finished.
/// </summary>
/// <remarks>
/// This replaces a <see cref="ManualResetEventSlim"/> which did starve waiters
/// when spinning multiple times.
/// </remarks>
/// <param name="spinId"> Current spin id. </param>
private void WaitUntilDone(long spinId)
{
// the condition is checked with the lock held to prevent
// a the spin from pulsing before the wait can be started
while (this.IsSpinning && this.SpinId == spinId)
{
try
{
// stop a possible current spin
this.Interrupt();
}
catch (ObjectDisposedException)
{
// if the context is shut down then the
// guard condition might be disposed but
// nodes still have to be removed
}
Monitor.Wait(this.WaitSet);
}
}

/// <summary>
/// Utility method to wait until the current spin has finished.
/// </summary>
/// <param name="spinId"> Current spin id. </param>
/// <param name="timeout"> Timeout when waiting </param>
/// <returns> Whether the wait did not time out. </returns>
/// <exception cref="ArgumentOutOfRangeException"> Timeout is too big. </exception>
private bool WaitUntilDone(long spinId, TimeSpan timeout)
{
int milliSeconds;
try
{
milliSeconds = Convert.ToInt32(timeout.TotalMilliseconds);
}
catch (OverflowException e)
{
throw new ArgumentOutOfRangeException("timeout too big", e);
}
int remainingTimeout = milliSeconds;
uint startTime = (uint)Environment.TickCount;
while (this.IsSpinning && this.SpinId == spinId)
{
try
{
// stop a possible current spin
this.Interrupt();
}
catch (ObjectDisposedException)
Expand All @@ -309,7 +386,22 @@ public bool TryWait(TimeSpan timeout)
// guard condition might be disposed but
// nodes still have to be removed
}
return this.IsIdle.Wait(timeout);
if (!Monitor.Wait(this.WaitSet, remainingTimeout))
{
// if the wait timed out return immediately
return false;
}
// update the timeout for the next wait
uint elapsed = (uint)Environment.TickCount - startTime;
if (elapsed > int.MaxValue)
{
return false;
}
remainingTimeout = milliSeconds - (int)elapsed;
if (remainingTimeout <= 0)
{
return false;
}
}
return true;
}
Expand Down Expand Up @@ -338,10 +430,10 @@ public void Interrupt()
/// <returns> Whether work could be processed since no rescan was scheduled. </returns>
public bool TrySpin(TimeSpan timeout)
{
this.IsIdle.Reset();
this.IsSpinning = true;
try
{
// check after resetting IsIdle to
// check after setting IsSpinning to
// prevent race condition
if (this.RescanScheduled)
{
Expand All @@ -357,7 +449,16 @@ public bool TrySpin(TimeSpan timeout)
}
finally
{
this.IsIdle.Set();
// update flag before waking threads
this.IsSpinning = false;
lock (this.WaitSet)
{
// prevent other threads from reading stale result
// overflow is acceptable
unchecked { this.SpinId++; }
// notify other threads that we finished spinning
Monitor.PulseAll(this.WaitSet);
}
}
return true;
}
Expand Down Expand Up @@ -477,7 +578,6 @@ public void Dispose()
}
this.WaitSet.Dispose();
this.InterruptCondition.Dispose();
this.IsIdle.Dispose();
}
}
}
3 changes: 3 additions & 0 deletions src/ros2cs/ros2cs_tests/src/ManualExecutorTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ public void DisposedExecutorHandling()

this.Context.TryCreateNode("test_node", out var node);
this.Executor.Add(node);
this.Executor.Rescan();
this.Executor.Dispose();

Assert.That(this.Executor.IsDisposed, Is.True);
Expand Down Expand Up @@ -125,13 +126,15 @@ public void TryWaitScheduled()
this.Executor.ScheduleRescan();

Assert.That(this.Executor.TryWait(TimeSpan.Zero), Is.True);
this.Executor.Wait();
Assert.That(this.Executor.RescanScheduled, Is.True);
}

[Test]
public void TryWaitUnscheduled()
{
Assert.That(this.Executor.TryWait(TimeSpan.Zero), Is.True);
this.Executor.Wait();
Assert.That(this.Executor.RescanScheduled, Is.False);
}

Expand Down

0 comments on commit 77bfffd

Please sign in to comment.