diff --git a/src/ros2cs/ros2cs_core/CMakeLists.txt b/src/ros2cs/ros2cs_core/CMakeLists.txt index 9a9b902..f08ab88 100644 --- a/src/ros2cs/ros2cs_core/CMakeLists.txt +++ b/src/ros2cs/ros2cs_core/CMakeLists.txt @@ -114,6 +114,7 @@ set(CS_SOURCES Context.cs GuardCondition.cs executors/ManualExecutor.cs + executors/TaskExecutor.cs properties/AssemblyInfo.cs ) diff --git a/src/ros2cs/ros2cs_core/executors/ManualExecutor.cs b/src/ros2cs/ros2cs_core/executors/ManualExecutor.cs index 3fc42d3..c7493b1 100644 --- a/src/ros2cs/ros2cs_core/executors/ManualExecutor.cs +++ b/src/ros2cs/ros2cs_core/executors/ManualExecutor.cs @@ -18,6 +18,7 @@ using System.Diagnostics; using System.Linq; using System.Threading; +using System.Threading.Tasks; namespace ROS2.Executors { @@ -433,6 +434,27 @@ public void SpinWhile(Func condition, TimeSpan timeout) } } + /// + /// Create a task which calls when started. + /// + /// + /// The resulting task prevents and from being called + /// and this instance as well as its context from being disposed safely while it is running. + /// + /// Maximum time to wait for work to become available. + /// Token to cancel the task. + /// Task representing the spin operation. + public Task CreateSpinTask(TimeSpan timeout, CancellationToken cancellationToken) + { + return new Task(() => { + using (cancellationToken.Register(this.Interrupt)) + { + this.SpinWhile(() => !cancellationToken.IsCancellationRequested, timeout); + } + cancellationToken.ThrowIfCancellationRequested(); + }, cancellationToken, TaskCreationOptions.LongRunning); + } + /// /// This method is not thread safe and may not be called from /// multiple threads simultaneously or while the executor is in use. diff --git a/src/ros2cs/ros2cs_core/executors/TaskExecutor.cs b/src/ros2cs/ros2cs_core/executors/TaskExecutor.cs new file mode 100644 index 0000000..042fb51 --- /dev/null +++ b/src/ros2cs/ros2cs_core/executors/TaskExecutor.cs @@ -0,0 +1,191 @@ +using System; +using System.Collections; +using System.Collections.Generic; +using System.Threading; +using System.Threading.Tasks; + +namespace ROS2.Executors +{ + /// + /// Executor which wraps a and automatically + /// executes the task created by . + /// + /// + /// The spin task is automatically stopped when + /// is called or the context is shut down. + /// + public sealed class TaskExecutor : IExecutor + { + /// + /// Task managed by this executor. + /// + public Task Task { get; private set; } + + private readonly CancellationTokenSource CancellationSource = new CancellationTokenSource(); + + private readonly ManualExecutor Executor; + + private readonly Context Context; + + /// Context associated with this executor. + /// Maximum time to wait for work to become available. + public TaskExecutor(Context context, TimeSpan timeout) + { + this.Context = context; + this.Executor = new ManualExecutor(context); + this.Task = this.Executor.CreateSpinTask(timeout, this.CancellationSource.Token); + try + { + context.OnShutdown += this.StopSpinTask; + this.Task.Start(); + } + catch (SystemException) + { + try + { + context.OnShutdown -= this.StopSpinTask; + } + finally + { + this.Executor.Dispose(); + } + throw; + } + } + + /// + public bool IsDisposed + { + get => this.Executor.IsDisposed; + } + + /// + public int Count + { + get => this.Executor.Count; + } + + /// + public bool IsReadOnly + { + get => this.Executor.IsReadOnly; + } + + /// + public void Add(INode node) + { + this.Executor.Add(node); + } + + /// + public void Clear() + { + this.Executor.Clear(); + } + + /// + public bool Contains(INode node) + { + return this.Executor.Contains(node); + } + + /// + public void CopyTo(INode[] array, int arrayIndex) + { + this.Executor.CopyTo(array, arrayIndex); + } + + /// + public bool Remove(INode node) + { + return this.Executor.Remove(node); + } + + /// + public IEnumerator GetEnumerator() + { + return this.Executor.GetEnumerator(); + } + + /// + IEnumerator IEnumerable.GetEnumerator() + { + return this.GetEnumerator(); + } + + /// + public void ScheduleRescan() + { + this.Executor.ScheduleRescan(); + } + + /// + public bool TryScheduleRescan(INode node) + { + return this.Executor.TryScheduleRescan(node); + } + + /// + public void Wait() + { + this.Executor.Wait(); + } + + /// + public bool TryWait(TimeSpan timeout) + { + return this.Executor.TryWait(timeout); + } + + /// + /// Stop the spin task and return after it has stopped. + /// + /// + /// This function returns immediately if the spin task + /// has already been stopped. + /// + private void StopSpinTask() + { + try + { + this.CancellationSource.Cancel(); + } + catch (ObjectDisposedException) + { + // task has been canceled before + } + try + { + this.Task.Wait(); + } + catch (AggregateException e) + { + e.Handle(inner => inner is TaskCanceledException); + } + catch (ObjectDisposedException) + { + // task has already stopped + } + } + + /// + /// + /// The wrapper handles stopping the spin task. + /// + public void Dispose() + { + try + { + this.StopSpinTask(); + } + catch (AggregateException) + { + // prevent faulted task from preventing disposal + } + this.Context.OnShutdown -= this.StopSpinTask; + this.Task.Dispose(); + this.Executor.Dispose(); + this.CancellationSource.Dispose(); + } + } +} \ No newline at end of file diff --git a/src/ros2cs/ros2cs_tests/CMakeLists.txt b/src/ros2cs/ros2cs_tests/CMakeLists.txt index f802ad8..c6f3c52 100644 --- a/src/ros2cs/ros2cs_tests/CMakeLists.txt +++ b/src/ros2cs/ros2cs_tests/CMakeLists.txt @@ -54,6 +54,7 @@ if(BUILD_TESTING) src/WaitSetTest.cs src/GuardConditionTest.cs src/ManualExecutorTest.cs + src/TaskExecutorTest.cs ) add_dotnet_test(ros2cs_tests diff --git a/src/ros2cs/ros2cs_tests/src/ManualExecutorTest.cs b/src/ros2cs/ros2cs_tests/src/ManualExecutorTest.cs index 6b69b67..0fa968b 100644 --- a/src/ros2cs/ros2cs_tests/src/ManualExecutorTest.cs +++ b/src/ros2cs/ros2cs_tests/src/ManualExecutorTest.cs @@ -16,6 +16,7 @@ using System.Collections.Generic; using System.Diagnostics; using System.Threading; +using System.Threading.Tasks; using NUnit.Framework; using ROS2.Executors; @@ -319,6 +320,92 @@ public void Clear() Assert.That(this.Executor.RescanScheduled, Is.True); } + [Test] + public void SpinInTask() + { + using ManualResetEventSlim wasSpun = new ManualResetEventSlim(false); + using var guardCondition = this.Context.CreateGuardCondition(wasSpun.Set); + this.WaitSet.GuardConditions.Add(guardCondition); + + using var cancellationSource = new CancellationTokenSource(); + using Task spinTask = this.Executor.CreateSpinTask(TimeSpan.FromSeconds(0.5), cancellationSource.Token); + + Assert.That(spinTask.Status, Is.EqualTo(TaskStatus.Created)); + + spinTask.Start(); + try + { + while (spinTask.Status != TaskStatus.Running) + { + Thread.Yield(); // wait for task to be scheduled + } + Assert.That(wasSpun.Wait(TimeSpan.FromSeconds(1)), Is.False); + guardCondition.Trigger(); + Assert.That(wasSpun.Wait(TimeSpan.FromSeconds(1)), Is.True); + wasSpun.Reset(); + } + finally + { + cancellationSource.Cancel(); + try + { + spinTask.Wait(); + } + catch (AggregateException e) + { + e.Handle(inner => inner is TaskCanceledException); + } + } + + Assert.That(spinTask.Status, Is.EqualTo(TaskStatus.Canceled)); + guardCondition.Trigger(); + Assert.That(wasSpun.Wait(TimeSpan.FromSeconds(1)), Is.False); + } + + [Test] + public void ExceptionWhileSpinningInTask() + { + using var guardCondition = this.Context.CreateGuardCondition(() => + { + throw new SimulatedException("simulating runtime exception"); + }); + this.WaitSet.GuardConditions.Add(guardCondition); + + using var cancellationSource = new CancellationTokenSource(); + using Task spinTask = this.Executor.CreateSpinTask(TimeSpan.FromSeconds(0.5), cancellationSource.Token); + + spinTask.Start(); + try + { + while (spinTask.Status != TaskStatus.Running) + { + Thread.Yield(); // wait for task to be scheduled + } + guardCondition.Trigger(); + var exception = Assert.Throws(() => spinTask.Wait(TimeSpan.FromSeconds(1))); + Assert.That(exception.InnerExceptions, Has.Some.Matches(new Predicate(e => e is SimulatedException))); + Assert.That(spinTask.Status, Is.EqualTo(TaskStatus.Faulted)); + } + finally + { + cancellationSource.Cancel(); + try + { + spinTask.Wait(); + } + catch (AggregateException e) + { + e.Handle(inner => inner is TaskCanceledException || inner is SimulatedException); + } + } + } + + private sealed class SimulatedException : Exception + { + public SimulatedException(string msg) : base(msg) + { } + } + private sealed class DummyExecutor : HashSet, IExecutor { public bool IsDisposed @@ -343,7 +430,7 @@ public bool TryWait(TimeSpan timeout) } public void Dispose() - {} + { } } } } \ No newline at end of file diff --git a/src/ros2cs/ros2cs_tests/src/TaskExecutorTest.cs b/src/ros2cs/ros2cs_tests/src/TaskExecutorTest.cs new file mode 100644 index 0000000..4d12b28 --- /dev/null +++ b/src/ros2cs/ros2cs_tests/src/TaskExecutorTest.cs @@ -0,0 +1,120 @@ +using System; +using System.Threading; +using System.Threading.Tasks; +using NUnit.Framework; +using ROS2.Executors; + +namespace ROS2.Test +{ + [TestFixture] + public class TaskExecutorTest + { + private readonly static string TOPIC = "/task_executor_test"; + + private Context Context; + + private TaskExecutor Executor; + + [SetUp] + public void SetUp() + { + this.Context = new Context(); + this.Executor = new TaskExecutor(this.Context, TimeSpan.FromSeconds(0.5)); + } + + [TearDown] + public void TearDown() + { + this.Executor.Dispose(); + this.Context.Dispose(); + } + + [Test] + public void DoubleDisposeExecutor() + { + Assert.That(this.Executor.IsDisposed, Is.False); + + this.Executor.Dispose(); + this.Executor.Dispose(); + + Assert.That(this.Executor.IsDisposed, Is.True); + } + + [Test] + public void StopTask() + { + this.Executor.Dispose(); + + Assert.Throws(() => this.Executor.Task.Wait(TimeSpan.FromSeconds(1))); + Assert.That(this.Executor.Task.Status, Is.EqualTo(TaskStatus.Canceled)); + } + + [Test] + public void DisposeContext() + { + this.Context.Dispose(); + + Assert.Throws(() => this.Executor.Task.Wait(TimeSpan.FromSeconds(1))); + Assert.That(this.Executor.Task.Status, Is.EqualTo(TaskStatus.Canceled)); + } + + + [Test] + public void ExceptionWhileSpinning() + { + if (this.Context.TryCreateNode("task_executor_test_node", out INode node)) + { + using (node) + { + this.Executor.Add(node); + using var publisher = node.CreatePublisher(TOPIC); + using var subscription = node.CreateSubscription( + TOPIC, + _ => { throw new Exception("simulated runtime exception"); } + ); + + publisher.Publish(new std_msgs.msg.Int32()); + + Assert.Throws(() => this.Executor.Task.Wait(TimeSpan.FromSeconds(1))); + Assert.That(this.Executor.Task.Status, Is.EqualTo(TaskStatus.Faulted)); + + this.Executor.Dispose(); + Assert.That(this.Executor.IsDisposed, Is.True); + Assert.That(node.Executor, Is.Null); + } + } + else + { + throw new ArgumentException("node already exists"); + } + } + + [Test] + public void SpinningInBackground() + { + if (this.Context.TryCreateNode("task_executor_test_node", out INode node)) + { + using (node) + { + this.Executor.Add(node); + using var msgReceived = new ManualResetEventSlim(false); + using var publisher = node.CreatePublisher(TOPIC); + using var subscription = node.CreateSubscription(TOPIC, _ => msgReceived.Set()); + + publisher.Publish(new std_msgs.msg.Int32()); + + Assert.That(msgReceived.Wait(TimeSpan.FromSeconds(1)), Is.True); + Assert.That(this.Executor.Task.IsCompleted, Is.False); + + this.Executor.Dispose(); + Assert.That(this.Executor.IsDisposed, Is.True); + Assert.That(node.Executor, Is.Null); + } + } + else + { + throw new ArgumentException("node already exists"); + } + } + } +} \ No newline at end of file