diff --git a/docs/articles/streams/builtinstages.md b/docs/articles/streams/builtinstages.md index d8fbf4e5e41..17d0779c17d 100644 --- a/docs/articles/streams/builtinstages.md +++ b/docs/articles/streams/builtinstages.md @@ -113,6 +113,14 @@ Can be used to implement many stateful sources without having to touch the more **completes** when the task returned by the unfold function completes with an null value +### UnfoldValueTaskAsync + +Just like ``UnfoldAsync``, but the fold function returns a ``ValueTask``, with internal pooling to minimize allocation and improve latency. + +**emits** when there is demand and unfold state returned task completes with not null value + +**completes** when the task returned by the unfold function completes with an null value + ### Empty Complete right away without ever emitting any elements. Useful when you have to provide a source to @@ -196,6 +204,14 @@ Functions return ``Task`` to achieve asynchronous processing **completes** when ``Task`` from read function returns ``None`` +### UnfoldResourceValueTaskAsync + +Like ``UnfoldResourceAsync`` but takes ``ValueTask`` Functions instead, with amortization of allocations for the main read stage. + +**emits** when there is demand and ``ValueTask`` from read function returns value + +**completes** when ``ValueTask`` from read function returns ``None`` + ### Queue Materialize a ``SourceQueue`` onto which elements can be pushed for emitting from the source. The queue contains @@ -846,6 +862,16 @@ If a Task fails, the stream also fails (unless a different supervision strategy **completes** when upstream completes and all tasks has been completed and all elements has been emitted +### SelectValueTaskAsync + +Version of ``SelectAsync`` that is optimized for ValueTask returns. Prefer this over ``SelectAsync`` if your work may be synchronus or is primarily waiting on ``ValueTask`` + +**emits** when the ``ValueTask`` returned by the provided function finishes for the next element in sequence + +**backpressures** when the number of tasks reaches the configured parallelism and the downstream backpressures + +**completes** when upstream completes and all tasks has been completed and all elements has been emitted + ### SelectAsyncUnordered Like ``SelectAsync`` but ``Task`` results are passed downstream as they arrive regardless of the order of the elements diff --git a/src/Directory.Build.props b/src/Directory.Build.props index 8c63af3d49b..09089f4eb0c 100644 --- a/src/Directory.Build.props +++ b/src/Directory.Build.props @@ -22,6 +22,7 @@ 2.5.3 17.9.0 0.12.2 + 4.5.4 [13.0.1,) 2.0.1 3.26.0 diff --git a/src/benchmark/Akka.Benchmarks/Streams/SelectAsyncBenchmarks.cs b/src/benchmark/Akka.Benchmarks/Streams/SelectAsyncBenchmarks.cs new file mode 100644 index 00000000000..47f6dd3f5d8 --- /dev/null +++ b/src/benchmark/Akka.Benchmarks/Streams/SelectAsyncBenchmarks.cs @@ -0,0 +1,189 @@ +// //----------------------------------------------------------------------- +// // +// // Copyright (C) 2009-2024 Lightbend Inc. +// // Copyright (C) 2013-2024 .NET Foundation +// // +// //----------------------------------------------------------------------- + +using System.Threading.Channels; +using System.Threading.Tasks; +using Akka.Actor; +using Akka.Benchmarks.Configurations; +using Akka.Streams; +using Akka.Streams.Dsl; +using BenchmarkDotNet.Attributes; + +namespace Akka.Benchmarks.Streams; + +[Config(typeof(MicroBenchmarkConfig))] +public class SelectAsyncBenchmarks +{ + public struct IntOrCompletion + { + public readonly int IntValue; + public readonly TaskCompletionSource? Completion; + + public IntOrCompletion(int intValue, TaskCompletionSource? completion) + { + IntValue = intValue; + Completion = completion; + } + } + private ActorSystem system; + private ActorMaterializer materializer; + + private IRunnableGraph simpleGraph; + private Task selectAsyncStub; + private Channel asyncCh; + private Task selectValueTaskAsyncStub; + private Channel vtAsyncCh; + private Task selectAsyncSyncStub; + private Task selectAsyncValueTaskSyncStub; + private Channel asyncChSync; + private Channel vtAsyncChSync; + + [GlobalSetup] + public void Setup() + { + system = ActorSystem.Create("system"); + materializer = system.Materializer(); + asyncCh = Channel.CreateUnbounded(); + + asyncChSync = Channel.CreateUnbounded(); + + vtAsyncChSync = Channel.CreateUnbounded(); + + selectAsyncSyncStub = Source.ChannelReader(asyncChSync.Reader) + .SelectAsync(4, a => + { + if (a.Completion != null) + { + a.Completion.TrySetResult(); + } + else + { + } + + return Task.FromResult(NotUsed.Instance); + }).RunWith(Sink.Ignore(), materializer); + + selectAsyncValueTaskSyncStub = Source.ChannelReader(vtAsyncChSync.Reader) + .SelectValueTaskAsync(4, a => + { + if (a.Completion != null) + { + a.Completion.TrySetResult(); + } + else + { + } + + return ValueTask.FromResult(NotUsed.Instance); + }).RunWith(Sink.Ignore(), materializer); + selectAsyncStub = Source.ChannelReader(asyncCh.Reader) + .SelectAsync(4, async a => + { + if (a.Completion != null) + { + a.Completion.TrySetResult(); + } + else + { + //await Task.Yield(); + await Task.Delay(0); + } + + return NotUsed.Instance; + }).RunWith(Sink.Ignore(), materializer); + vtAsyncCh = Channel.CreateUnbounded(); + int vta = 0; + selectValueTaskAsyncStub = Source.ChannelReader(vtAsyncCh.Reader) + .SelectValueTaskAsync(4, async a => + { + if (a.Completion != null) + { + a.Completion.TrySetResult(); + //return NotUsed.Instance; + } + else + { + //await Task.Yield(); + await Task.Delay(0); + //return NotUsed.Instance; + //Console.WriteLine(++vta); + //return vta; + } + + return NotUsed.Instance; + }).RunWith(Sink.Ignore(), materializer); + } + + [GlobalCleanup] + public void Cleanup() + { + materializer.Dispose(); + system.Dispose(); + } + + [Benchmark] + public async Task RunSelectAsync() + { + var completion = new TaskCompletionSource(TaskCreationOptions + .RunContinuationsAsynchronously); + for (int i = 0; i < 100; i++) + { + asyncCh.Writer.TryWrite(new IntOrCompletion(i, null)); + } + + asyncCh.Writer.TryWrite(new IntOrCompletion(0, completion)); + await completion.Task; + + } + + + [Benchmark] + public async Task RunSelectValueTaskAsync() + { + var completion = new TaskCompletionSource(TaskCreationOptions + .RunContinuationsAsynchronously); + for (int i = 0; i < 100; i++) + { + vtAsyncCh.Writer.TryWrite(new IntOrCompletion(i, null)); + } + + vtAsyncCh.Writer.TryWrite(new IntOrCompletion(0, completion)); + await completion.Task; + + } + + [Benchmark] + public async Task RunSelectAsyncSync() + { + var completion = new TaskCompletionSource(TaskCreationOptions + .RunContinuationsAsynchronously); + for (int i = 0; i < 100; i++) + { + asyncChSync.Writer.TryWrite(new IntOrCompletion(i, null)); + } + + asyncChSync.Writer.TryWrite(new IntOrCompletion(0, completion)); + await completion.Task; + + } + + + [Benchmark] + public async Task RunSelectValueTaskAsyncSync() + { + var completion = new TaskCompletionSource(TaskCreationOptions + .RunContinuationsAsynchronously); + for (int i = 0; i < 100; i++) + { + vtAsyncChSync.Writer.TryWrite(new IntOrCompletion(i, null)); + } + + vtAsyncChSync.Writer.TryWrite(new IntOrCompletion(0, completion)); + await completion.Task; + + } +} \ No newline at end of file diff --git a/src/benchmark/Akka.Benchmarks/Streams/UnfoldAsyncBenchmarks.cs b/src/benchmark/Akka.Benchmarks/Streams/UnfoldAsyncBenchmarks.cs new file mode 100644 index 00000000000..9e086e7955e --- /dev/null +++ b/src/benchmark/Akka.Benchmarks/Streams/UnfoldAsyncBenchmarks.cs @@ -0,0 +1,191 @@ +// //----------------------------------------------------------------------- +// // +// // Copyright (C) 2009-2024 Lightbend Inc. +// // Copyright (C) 2013-2024 .NET Foundation +// // +// //----------------------------------------------------------------------- + +using System.Threading.Channels; +using System.Threading.Tasks; +using Akka.Actor; +using Akka.Benchmarks.Configurations; +using Akka.Streams; +using Akka.Streams.Dsl; +using BenchmarkDotNet.Attributes; + +namespace Akka.Benchmarks.Streams; + +[Config(typeof(MicroBenchmarkConfig))] +public class UnfoldAsyncBenchmarks +{ + public struct IntOrCompletion + { + public readonly int IntValue; + public readonly TaskCompletionSource? Completion; + + public IntOrCompletion(int intValue, TaskCompletionSource? completion) + { + IntValue = intValue; + Completion = completion; + } + } + private ActorSystem system; + private ActorMaterializer materializer; + + private IRunnableGraph simpleGraph; + private Task selectAsyncStub; + private Channel asyncNoYieldCh; + private Task selectValueTaskAsyncStub; + private Channel vtAsyncCh; + private Task unfoldAsyncSyncStub; + private Task selectAsyncValueTaskSyncStub; + private Channel asyncYieldCh; + private Channel vtAsyncYieldCh; + + [GlobalSetup] + public void Setup() + { + system = ActorSystem.Create("system"); + materializer = system.Materializer(); + asyncNoYieldCh = Channel.CreateUnbounded(); + + asyncYieldCh = Channel.CreateUnbounded(); + + vtAsyncYieldCh = Channel.CreateUnbounded(); + + unfoldAsyncSyncStub = Source.UnfoldAsync,int>(asyncYieldCh.Reader, async r => + { + var i = await r.ReadAsync(); + if (i.Completion != null) + { + i.Completion.TrySetResult(); + return (r, -1); + } + else + { + return (r, i.IntValue); + } + }) + .RunWith(Sink.Ignore(), materializer); + + selectAsyncValueTaskSyncStub = Source.UnfoldValueTaskAsync,int>(vtAsyncYieldCh.Reader, async r => + { + var i = await r.ReadAsync(); + if (i.Completion != null) + { + i.Completion.TrySetResult(); + return (r, -1); + } + else + { + return (r, i.IntValue); + } + }) + .RunWith(Sink.Ignore(), materializer); + selectAsyncStub = Source.UnfoldAsync,int>(asyncNoYieldCh.Reader,async r => + { + await Task.Yield(); + var a = await r.ReadAsync(); + if (a.Completion != null) + { + a.Completion.TrySetResult(); + return (r, -1); + } + else + { + //await Task.Yield(); + // await Task.Delay(0); + return (r, a.IntValue); + } + }).RunWith(Sink.Ignore(), materializer); + vtAsyncCh = Channel.CreateUnbounded(); + int vta = 0; + selectValueTaskAsyncStub = Source.UnfoldValueTaskAsync,int>(vtAsyncCh.Reader,async r => + { + await Task.Yield(); + var a = await r.ReadAsync(); + if (a.Completion != null) + { + a.Completion.TrySetResult(); + return (r, -1); + } + else + { + //await Task.Yield(); + //await Task.Delay(0); + return (r, a.IntValue); + } + }).RunWith(Sink.Ignore(), materializer); + } + + [GlobalCleanup] + public void Cleanup() + { + materializer.Dispose(); + system.Dispose(); + } + + [Benchmark] + public async Task UnfoldAsyncYieldInConsume() + { + var completion = new TaskCompletionSource(TaskCreationOptions + .RunContinuationsAsynchronously); + for (int i = 0; i < 100; i++) + { + asyncNoYieldCh.Writer.TryWrite(new IntOrCompletion(i, null)); + } + + asyncNoYieldCh.Writer.TryWrite(new IntOrCompletion(0, completion)); + await completion.Task; + + } + + + [Benchmark] + public async Task UnfoldValueTaskAsyncYieldInConsume() + { + var completion = new TaskCompletionSource(TaskCreationOptions + .RunContinuationsAsynchronously); + for (int i = 0; i < 100; i++) + { + vtAsyncCh.Writer.TryWrite(new IntOrCompletion(i, null)); + } + + vtAsyncCh.Writer.TryWrite(new IntOrCompletion(0, completion)); + await completion.Task; + + } + + [Benchmark] + public async Task UnfoldAsyncYieldInPush() + { + var completion = new TaskCompletionSource(TaskCreationOptions + .RunContinuationsAsynchronously); + for (int i = 0; i < 100; i++) + { + asyncYieldCh.Writer.TryWrite(new IntOrCompletion(i, null)); + await Task.Yield(); + } + + asyncYieldCh.Writer.TryWrite(new IntOrCompletion(0, completion)); + await completion.Task; + + } + + + [Benchmark] + public async Task UnfoldValueTaskAsyncYieldInPush() + { + var completion = new TaskCompletionSource(TaskCreationOptions + .RunContinuationsAsynchronously); + for (int i = 0; i < 100; i++) + { + vtAsyncYieldCh.Writer.TryWrite(new IntOrCompletion(i, null)); + await Task.Yield(); + } + + vtAsyncYieldCh.Writer.TryWrite(new IntOrCompletion(0, completion)); + await completion.Task; + + } +} \ No newline at end of file diff --git a/src/benchmark/Akka.Benchmarks/Streams/UnfoldResourceAsyncBenchmarks.cs b/src/benchmark/Akka.Benchmarks/Streams/UnfoldResourceAsyncBenchmarks.cs new file mode 100644 index 00000000000..9bc207df999 --- /dev/null +++ b/src/benchmark/Akka.Benchmarks/Streams/UnfoldResourceAsyncBenchmarks.cs @@ -0,0 +1,302 @@ +// ----------------------------------------------------------------------- +// +// Copyright (C) 2009-2024 Lightbend Inc. +// Copyright (C) 2013-2024 .NET Foundation +// +// ----------------------------------------------------------------------- + +using System.Collections.Generic; +using System.Threading; +using System.Threading.Channels; +using System.Threading.Tasks; +using Akka.Actor; +using Akka.Benchmarks.Configurations; +using Akka.Streams; +using Akka.Streams.Dsl; +using Akka.Streams.Implementation.Fusing; +using BenchmarkDotNet.Attributes; + +namespace Akka.Benchmarks.Streams; + +[Config(typeof(MicroBenchmarkConfig))] +public class UnfoldResourceAsyncBenchmarks +{ + + public struct IntOrCompletion + { + public readonly int IntValue; + public readonly TaskCompletionSource? Completion; + + public IntOrCompletion(int intValue, TaskCompletionSource? completion) + { + IntValue = intValue; + Completion = completion; + } + } + private ActorSystem system; + private ActorMaterializer materializer; + + private IRunnableGraph simpleGraph; + private Task selectAsyncStub; + private Channel asyncNoYieldCh; + private Task selectValueTaskAsyncStub; + private Channel vtAsyncCh; + private Task unfoldAsyncSyncStub; + private Task selectAsyncValueTaskSyncStub; + private Channel asyncYieldCh; + private Channel vtAsyncYieldCh; + private Channel straightCh; + private Task straightTask; + private CancellationTokenSource straightChTokenSource; + private Channel straightYieldCh; + private Task straightYieldTask; + + [GlobalSetup] + public void Setup() + { + system = ActorSystem.Create("system"); + materializer = system.Materializer(); + asyncNoYieldCh = Channel.CreateUnbounded(); + + asyncYieldCh = Channel.CreateUnbounded(); + + vtAsyncYieldCh = Channel.CreateUnbounded(); + + unfoldAsyncSyncStub = Source.UnfoldResourceAsync>(()=> Task.FromResult(asyncYieldCh.Reader), async r => + { + var i = await r.ReadAsync(); + if (i.Completion != null) + { + i.Completion.TrySetResult(); + return -1; + } + else + { + return i.IntValue; + } + }, (r)=> Task.FromResult(Done.Instance)) + .RunWith(Sink.Ignore(), materializer); + + selectAsyncValueTaskSyncStub = Source.UnfoldResourceValueTaskAsync,int,ChannelReader>(vtAsyncYieldCh.Reader,(r)=>new ValueTask>(r), async r => + { + var i = await r.ReadAsync(); + if (i.Completion != null) + { + i.Completion.TrySetResult(); + return -1; + } + else + { + return i.IntValue; + } + }, (r)=> ValueTask.CompletedTask) + .RunWith(Sink.Ignore(), materializer); + selectAsyncStub = Source.UnfoldResourceAsync>(()=>Task.FromResult(asyncNoYieldCh.Reader),async r => + { + await Task.Yield(); + var a = await r.ReadAsync(); + if (a.Completion != null) + { + a.Completion.TrySetResult(); + return -1; + } + else + { + //await Task.Yield(); + // await Task.Delay(0); + return a.IntValue; + } + }, (r)=> Task.FromResult(Done.Instance) ).RunWith(Sink.Ignore(), materializer); + vtAsyncCh = Channel.CreateUnbounded(); + + + int vta = 0; + selectValueTaskAsyncStub = Source + .UnfoldResourceValueTaskAsync, int, + ChannelReader>(vtAsyncCh.Reader, + r => new ValueTask>(r), + async r => + { + await Task.Yield(); + var a = await r.ReadAsync(); + if (a.Completion != null) + { + a.Completion.TrySetResult(); + return -1; + } + else + { + //await Task.Yield(); + //await Task.Delay(0); + return a.IntValue; + } + }, (r) => ValueTask.CompletedTask) + .RunWith(Sink.Ignore(), materializer); + straightChTokenSource = new CancellationTokenSource(); + straightCh = Channel.CreateUnbounded(); + + + straightTask = Task.Run(async () => + { + static async IAsyncEnumerable GetEnumerator( + ChannelReader reader, CancellationToken token) + { + while (token.IsCancellationRequested == false) + { + await Task.Yield(); + var a = await reader.ReadAsync(); + if (a.Completion != null) + { + a.Completion.TrySetResult(); + yield return -1; + } + else + { + //await Task.Yield(); + //await Task.Delay(0); + yield return a.IntValue; + } + } + } + var r = straightCh.Reader; + await foreach (var v in GetEnumerator(r,straightChTokenSource.Token)) + { + + } + }); + + straightYieldCh = Channel.CreateUnbounded(); + + + straightYieldTask = Task.Run(async () => + { + static async IAsyncEnumerable GetEnumerator( + ChannelReader reader, CancellationToken token) + { + while (token.IsCancellationRequested == false) + { + var a = await reader.ReadAsync(); + if (a.Completion != null) + { + a.Completion.TrySetResult(); + yield return -1; + } + else + { + //await Task.Yield(); + //await Task.Delay(0); + yield return a.IntValue; + } + } + } + var r = straightYieldCh.Reader; + await foreach (var v in GetEnumerator(r,straightChTokenSource.Token)) + { + + } + }); + } + + [GlobalCleanup] + public void Cleanup() + { + materializer.Dispose(); + system.Dispose(); + straightChTokenSource.Cancel(); + } + + [Benchmark] + public async Task UnfoldResourceAsyncNoYield() + { + var completion = new TaskCompletionSource(TaskCreationOptions + .RunContinuationsAsynchronously); + for (int i = 0; i < 100; i++) + { + asyncNoYieldCh.Writer.TryWrite(new IntOrCompletion(i, null)); + } + + asyncNoYieldCh.Writer.TryWrite(new IntOrCompletion(0, completion)); + await completion.Task; + + } + + + [Benchmark] + public async Task UnfoldResourceValueTaskAsyncNoYield() + { + var completion = new TaskCompletionSource(TaskCreationOptions + .RunContinuationsAsynchronously); + for (int i = 0; i < 100; i++) + { + vtAsyncCh.Writer.TryWrite(new IntOrCompletion(i, null)); + } + + vtAsyncCh.Writer.TryWrite(new IntOrCompletion(0, completion)); + await completion.Task; + + } + + [Benchmark] + public async Task UnfoldResourceAsyncWithYield() + { + var completion = new TaskCompletionSource(TaskCreationOptions + .RunContinuationsAsynchronously); + for (int i = 0; i < 100; i++) + { + asyncYieldCh.Writer.TryWrite(new IntOrCompletion(i, null)); + await Task.Yield(); + } + + asyncYieldCh.Writer.TryWrite(new IntOrCompletion(0, completion)); + await completion.Task; + + } + + + [Benchmark] + public async Task UnfoldResourceValueTaskAsyncWithYield() + { + var completion = new TaskCompletionSource(TaskCreationOptions + .RunContinuationsAsynchronously); + for (int i = 0; i < 100; i++) + { + vtAsyncYieldCh.Writer.TryWrite(new IntOrCompletion(i, null)); + await Task.Yield(); + } + + vtAsyncYieldCh.Writer.TryWrite(new IntOrCompletion(0, completion)); + await completion.Task; + + } + + [Benchmark] + public async Task StraightChannelReadNoYield() + { + var completion = new TaskCompletionSource(TaskCreationOptions + .RunContinuationsAsynchronously); + for (int i = 0; i < 100; i++) + { + straightCh.Writer.TryWrite(new IntOrCompletion(i, null)); + } + + straightCh.Writer.TryWrite(new IntOrCompletion(0, completion)); + await completion.Task; + + } + + [Benchmark] + public async Task StraightChannelReadWithYield() + { + var completion = new TaskCompletionSource(TaskCreationOptions + .RunContinuationsAsynchronously); + for (int i = 0; i < 100; i++) + { + straightYieldCh.Writer.TryWrite(new IntOrCompletion(i, null)); + await Task.Yield(); + } + + straightYieldCh.Writer.TryWrite(new IntOrCompletion(0, completion)); + await completion.Task; + + } +} \ No newline at end of file diff --git a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveStreams.DotNet.verified.txt b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveStreams.DotNet.verified.txt index e17b26a58ce..c13b0dd70d0 100644 --- a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveStreams.DotNet.verified.txt +++ b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveStreams.DotNet.verified.txt @@ -2044,6 +2044,9 @@ namespace Akka.Streams.Dsl public static Akka.Streams.Dsl.Source UnfoldInfinite(TState state, System.Func> unfold) { } public static Akka.Streams.Dsl.Source UnfoldResource(System.Func create, System.Func> read, System.Action close) { } public static Akka.Streams.Dsl.Source UnfoldResourceAsync(System.Func> create, System.Func>> read, System.Func> close) { } + public static Akka.Streams.Dsl.Source UnfoldResourceValueTaskAsync(System.Func> create, System.Func>> read, System.Func close) { } + public static Akka.Streams.Dsl.Source UnfoldResourceValueTaskAsync(TCreateState createState, System.Func> create, System.Func>> read, System.Func close) { } + public static Akka.Streams.Dsl.Source UnfoldValueTaskAsync(TState state, System.Func>>> unfoldAsync) { } public static Akka.Streams.Dsl.Source, Akka.NotUsed> ZipN(System.Collections.Generic.IEnumerable> sources) { } public static Akka.Streams.Dsl.Source ZipWithN(System.Func, TOut2> zipper, System.Collections.Generic.IEnumerable> sources) { } } @@ -2126,6 +2129,7 @@ namespace Akka.Streams.Dsl public static Akka.Streams.Dsl.Source SelectAsyncUnordered(this Akka.Streams.Dsl.Source flow, int parallelism, System.Func> asyncMapper) { } public static Akka.Streams.Dsl.Source SelectError(this Akka.Streams.Dsl.Source flow, System.Func selector) { } public static Akka.Streams.Dsl.Source SelectMany(this Akka.Streams.Dsl.Source flow, System.Func> mapConcater) { } + public static Akka.Streams.Dsl.Source SelectValueTaskAsync(this Akka.Streams.Dsl.Source flow, int parallelism, System.Func> asyncMapper) { } public static Akka.Streams.Dsl.Source Skip(this Akka.Streams.Dsl.Source flow, long n) { } public static Akka.Streams.Dsl.Source SkipWhile(this Akka.Streams.Dsl.Source flow, System.Predicate predicate) { } public static Akka.Streams.Dsl.Source SkipWithin(this Akka.Streams.Dsl.Source flow, System.TimeSpan duration) { } @@ -3881,6 +3885,16 @@ namespace Akka.Streams.Implementation public override string ToString() { } } [Akka.Annotations.InternalApiAttribute()] + public sealed class UnfoldResourceSourceValueTaskAsync : Akka.Streams.Stage.GraphStage> + { + public UnfoldResourceSourceValueTaskAsync(TCreateState createState, System.Func> create, System.Func>> readData, System.Func close) { } + protected override Akka.Streams.Attributes InitialAttributes { get; } + public Akka.Streams.Outlet Out { get; } + public override Akka.Streams.SourceShape Shape { get; } + protected override Akka.Streams.Stage.GraphStageLogic CreateLogic(Akka.Streams.Attributes inheritedAttributes) { } + public override string ToString() { } + } + [Akka.Annotations.InternalApiAttribute()] public sealed class UnfoldResourceSource : Akka.Streams.Stage.GraphStage> { public UnfoldResourceSource(System.Func create, System.Func> readData, System.Action close) { } @@ -3891,6 +3905,16 @@ namespace Akka.Streams.Implementation public override string ToString() { } } [Akka.Annotations.InternalApiAttribute()] + public class UnfoldValueTaskAsync : Akka.Streams.Stage.GraphStage> + { + public readonly Akka.Streams.Outlet Out; + public readonly TState State; + public readonly System.Func>>> UnfoldFunc; + public UnfoldValueTaskAsync(TState state, System.Func>>> unfoldFunc) { } + public override Akka.Streams.SourceShape Shape { get; } + protected override Akka.Streams.Stage.GraphStageLogic CreateLogic(Akka.Streams.Attributes inheritedAttributes) { } + } + [Akka.Annotations.InternalApiAttribute()] public class Unfold : Akka.Streams.Stage.GraphStage> { public readonly Akka.Streams.Outlet Out; @@ -4369,6 +4393,11 @@ namespace Akka.Streams.Implementation.Fusing public void SetValue(T value) { } public override string ToString() { } } + public sealed class NotYetThereSentinel : System.Exception + { + public static readonly Akka.Streams.Implementation.Fusing.NotYetThereSentinel Instance; + public NotYetThereSentinel() { } + } [Akka.Annotations.InternalApiAttribute()] public sealed class OnCompleted : Akka.Streams.Stage.GraphStage> { @@ -4443,6 +4472,16 @@ namespace Akka.Streams.Implementation.Fusing protected override Akka.Streams.Stage.GraphStageLogic CreateLogic(Akka.Streams.Attributes inheritedAttributes) { } } [Akka.Annotations.InternalApiAttribute()] + public sealed class SelectValueTaskAsync : Akka.Streams.Stage.GraphStage> + { + public readonly Akka.Streams.Inlet In; + public readonly Akka.Streams.Outlet Out; + public SelectValueTaskAsync(int parallelism, System.Func> mapFunc) { } + protected override Akka.Streams.Attributes InitialAttributes { get; } + public override Akka.Streams.FlowShape Shape { get; } + protected override Akka.Streams.Stage.GraphStageLogic CreateLogic(Akka.Streams.Attributes inheritedAttributes) { } + } + [Akka.Annotations.InternalApiAttribute()] public sealed class Select : Akka.Streams.Stage.GraphStage> { public Select(System.Func func) { } @@ -4501,6 +4540,19 @@ namespace Akka.Streams.Implementation.Fusing protected override Akka.Streams.Stage.GraphStageLogic CreateLogic(Akka.Streams.Attributes inheritedAttributes) { } public override string ToString() { } } + [System.Runtime.CompilerServices.IsReadOnlyAttribute()] + public struct SlimResult + { + public readonly System.Exception Error; + public static readonly Akka.Streams.Implementation.Fusing.SlimResult NotYetReady; + public readonly T Result; + public SlimResult(System.Exception errorOrSentinel, T result) { } + public static Akka.Streams.Implementation.Fusing.SlimResult ForError(System.Exception errorOrSentinel) { } + public static Akka.Streams.Implementation.Fusing.SlimResult ForSuccess(T result) { } + public static Akka.Streams.Implementation.Fusing.SlimResult FromTask(System.Threading.Tasks.Task task) { } + public bool IsDone() { } + public bool IsSuccess() { } + } [Akka.Annotations.InternalApiAttribute()] public sealed class StatefulSelectMany : Akka.Streams.Stage.GraphStage> { @@ -4705,6 +4757,8 @@ namespace Akka.Streams.Implementation.Stages public static readonly Akka.Streams.Attributes UnfoldInf; public static readonly Akka.Streams.Attributes UnfoldResourceSource; public static readonly Akka.Streams.Attributes UnfoldResourceSourceAsync; + public static readonly Akka.Streams.Attributes UnfoldResourceSourceValueTaskAsync; + public static readonly Akka.Streams.Attributes UnfoldValueTaskAsync; public static readonly Akka.Streams.Attributes Unzip; public static readonly Akka.Streams.Attributes Watch; public static readonly Akka.Streams.Attributes Where; diff --git a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveStreams.Net.verified.txt b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveStreams.Net.verified.txt index 107bc594f50..c04531d75eb 100644 --- a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveStreams.Net.verified.txt +++ b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveStreams.Net.verified.txt @@ -2043,6 +2043,9 @@ namespace Akka.Streams.Dsl public static Akka.Streams.Dsl.Source UnfoldInfinite(TState state, System.Func> unfold) { } public static Akka.Streams.Dsl.Source UnfoldResource(System.Func create, System.Func> read, System.Action close) { } public static Akka.Streams.Dsl.Source UnfoldResourceAsync(System.Func> create, System.Func>> read, System.Func> close) { } + public static Akka.Streams.Dsl.Source UnfoldResourceValueTaskAsync(System.Func> create, System.Func>> read, System.Func close) { } + public static Akka.Streams.Dsl.Source UnfoldResourceValueTaskAsync(TCreateState createState, System.Func> create, System.Func>> read, System.Func close) { } + public static Akka.Streams.Dsl.Source UnfoldValueTaskAsync(TState state, System.Func>>> unfoldAsync) { } public static Akka.Streams.Dsl.Source, Akka.NotUsed> ZipN(System.Collections.Generic.IEnumerable> sources) { } public static Akka.Streams.Dsl.Source ZipWithN(System.Func, TOut2> zipper, System.Collections.Generic.IEnumerable> sources) { } } @@ -2125,6 +2128,7 @@ namespace Akka.Streams.Dsl public static Akka.Streams.Dsl.Source SelectAsyncUnordered(this Akka.Streams.Dsl.Source flow, int parallelism, System.Func> asyncMapper) { } public static Akka.Streams.Dsl.Source SelectError(this Akka.Streams.Dsl.Source flow, System.Func selector) { } public static Akka.Streams.Dsl.Source SelectMany(this Akka.Streams.Dsl.Source flow, System.Func> mapConcater) { } + public static Akka.Streams.Dsl.Source SelectValueTaskAsync(this Akka.Streams.Dsl.Source flow, int parallelism, System.Func> asyncMapper) { } public static Akka.Streams.Dsl.Source Skip(this Akka.Streams.Dsl.Source flow, long n) { } public static Akka.Streams.Dsl.Source SkipWhile(this Akka.Streams.Dsl.Source flow, System.Predicate predicate) { } public static Akka.Streams.Dsl.Source SkipWithin(this Akka.Streams.Dsl.Source flow, System.TimeSpan duration) { } @@ -3866,6 +3870,16 @@ namespace Akka.Streams.Implementation public override string ToString() { } } [Akka.Annotations.InternalApiAttribute()] + public sealed class UnfoldResourceSourceValueTaskAsync : Akka.Streams.Stage.GraphStage> + { + public UnfoldResourceSourceValueTaskAsync(TCreateState createState, System.Func> create, System.Func>> readData, System.Func close) { } + protected override Akka.Streams.Attributes InitialAttributes { get; } + public Akka.Streams.Outlet Out { get; } + public override Akka.Streams.SourceShape Shape { get; } + protected override Akka.Streams.Stage.GraphStageLogic CreateLogic(Akka.Streams.Attributes inheritedAttributes) { } + public override string ToString() { } + } + [Akka.Annotations.InternalApiAttribute()] public sealed class UnfoldResourceSource : Akka.Streams.Stage.GraphStage> { public UnfoldResourceSource(System.Func create, System.Func> readData, System.Action close) { } @@ -3876,6 +3890,16 @@ namespace Akka.Streams.Implementation public override string ToString() { } } [Akka.Annotations.InternalApiAttribute()] + public class UnfoldValueTaskAsync : Akka.Streams.Stage.GraphStage> + { + public readonly Akka.Streams.Outlet Out; + public readonly TState State; + public readonly System.Func>>> UnfoldFunc; + public UnfoldValueTaskAsync(TState state, System.Func>>> unfoldFunc) { } + public override Akka.Streams.SourceShape Shape { get; } + protected override Akka.Streams.Stage.GraphStageLogic CreateLogic(Akka.Streams.Attributes inheritedAttributes) { } + } + [Akka.Annotations.InternalApiAttribute()] public class Unfold : Akka.Streams.Stage.GraphStage> { public readonly Akka.Streams.Outlet Out; @@ -4343,6 +4367,11 @@ namespace Akka.Streams.Implementation.Fusing public void SetValue(T value) { } public override string ToString() { } } + public sealed class NotYetThereSentinel : System.Exception + { + public static readonly Akka.Streams.Implementation.Fusing.NotYetThereSentinel Instance; + public NotYetThereSentinel() { } + } [Akka.Annotations.InternalApiAttribute()] public sealed class OnCompleted : Akka.Streams.Stage.GraphStage> { @@ -4417,6 +4446,16 @@ namespace Akka.Streams.Implementation.Fusing protected override Akka.Streams.Stage.GraphStageLogic CreateLogic(Akka.Streams.Attributes inheritedAttributes) { } } [Akka.Annotations.InternalApiAttribute()] + public sealed class SelectValueTaskAsync : Akka.Streams.Stage.GraphStage> + { + public readonly Akka.Streams.Inlet In; + public readonly Akka.Streams.Outlet Out; + public SelectValueTaskAsync(int parallelism, System.Func> mapFunc) { } + protected override Akka.Streams.Attributes InitialAttributes { get; } + public override Akka.Streams.FlowShape Shape { get; } + protected override Akka.Streams.Stage.GraphStageLogic CreateLogic(Akka.Streams.Attributes inheritedAttributes) { } + } + [Akka.Annotations.InternalApiAttribute()] public sealed class Select : Akka.Streams.Stage.GraphStage> { public Select(System.Func func) { } @@ -4475,6 +4514,18 @@ namespace Akka.Streams.Implementation.Fusing protected override Akka.Streams.Stage.GraphStageLogic CreateLogic(Akka.Streams.Attributes inheritedAttributes) { } public override string ToString() { } } + public struct SlimResult + { + public readonly System.Exception Error; + public static readonly Akka.Streams.Implementation.Fusing.SlimResult NotYetReady; + public readonly T Result; + public SlimResult(System.Exception errorOrSentinel, T result) { } + public static Akka.Streams.Implementation.Fusing.SlimResult ForError(System.Exception errorOrSentinel) { } + public static Akka.Streams.Implementation.Fusing.SlimResult ForSuccess(T result) { } + public static Akka.Streams.Implementation.Fusing.SlimResult FromTask(System.Threading.Tasks.Task task) { } + public bool IsDone() { } + public bool IsSuccess() { } + } [Akka.Annotations.InternalApiAttribute()] public sealed class StatefulSelectMany : Akka.Streams.Stage.GraphStage> { @@ -4679,6 +4730,8 @@ namespace Akka.Streams.Implementation.Stages public static readonly Akka.Streams.Attributes UnfoldInf; public static readonly Akka.Streams.Attributes UnfoldResourceSource; public static readonly Akka.Streams.Attributes UnfoldResourceSourceAsync; + public static readonly Akka.Streams.Attributes UnfoldResourceSourceValueTaskAsync; + public static readonly Akka.Streams.Attributes UnfoldValueTaskAsync; public static readonly Akka.Streams.Attributes Unzip; public static readonly Akka.Streams.Attributes Watch; public static readonly Akka.Streams.Attributes Where; diff --git a/src/core/Akka.Streams.Tests/Akka.Streams.Tests.csproj b/src/core/Akka.Streams.Tests/Akka.Streams.Tests.csproj index e0a2d53dec9..d218fb687d0 100644 --- a/src/core/Akka.Streams.Tests/Akka.Streams.Tests.csproj +++ b/src/core/Akka.Streams.Tests/Akka.Streams.Tests.csproj @@ -1,7 +1,8 @@  - $(NetFrameworkTestVersion);$(NetTestVersion) + $(NetFrameworkTestVersion);$(NetTestVersion) + $(NetTestVersion) diff --git a/src/core/Akka.Streams.Tests/Dsl/FlowSelectValueTaskAsyncSpec.cs b/src/core/Akka.Streams.Tests/Dsl/FlowSelectValueTaskAsyncSpec.cs new file mode 100644 index 00000000000..6560d9e0f6e --- /dev/null +++ b/src/core/Akka.Streams.Tests/Dsl/FlowSelectValueTaskAsyncSpec.cs @@ -0,0 +1,461 @@ +// ----------------------------------------------------------------------- +// +// Copyright (C) 2009-2024 Lightbend Inc. +// Copyright (C) 2013-2024 .NET Foundation +// +// ----------------------------------------------------------------------- + +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; +using Akka.Actor; +using Akka.Streams.Dsl; +using Akka.Streams.Implementation; +using Akka.Streams.Supervision; +using Akka.Streams.TestKit; +using Akka.TestKit; +using Akka.TestKit.Extensions; +using Akka.TestKit.Internal; +using Akka.TestKit.Xunit2.Attributes; +using Akka.Util; +using Akka.Util.Internal; +using FluentAssertions; +using FluentAssertions.Extensions; +using Xunit; +using Xunit.Abstractions; +using Directive = Akka.Streams.Supervision.Directive; + +namespace Akka.Streams.Tests.Dsl; + +#pragma warning disable 162 +[Collection(nameof(FlowSelectValueTaskAsyncSpec))] +public class FlowSelectValueTaskAsyncSpec : AkkaSpec +{ + private ActorMaterializer Materializer { get; } + + public FlowSelectValueTaskAsyncSpec(ITestOutputHelper helper) : base(helper) + { + Materializer = ActorMaterializer.Create(Sys); + } + + [Fact] + public async Task A_Flow_with_SelectValueTaskAsyncValueTask_must_produce_task_elements() + { + await this.AssertAllStagesStoppedAsync(async() => { + var c = this.CreateManualSubscriberProbe(); + Source.From(Enumerable.Range(1, 3)) + .SelectValueTaskAsync(4, static async a => a) + .RunWith(Sink.FromSubscriber(c), Materializer); + var sub = await c.ExpectSubscriptionAsync(); + + sub.Request(2); + await c.ExpectNext(1) + .ExpectNext(2) + .ExpectNoMsgAsync(TimeSpan.FromMilliseconds(200)); + sub.Request(2); + + await c.ExpectNext(3) + .ExpectCompleteAsync(); + }, Materializer); + } + + [Fact] + public async void A_Flow_with_SelectValueTaskAsync_must_produce_task_elements_in_order() + { + var c = this.CreateManualSubscriberProbe(); + Source.From(Enumerable.Range(1, 50)) + .SelectValueTaskAsync(4, i => + { + if (i%3 == 0) + return new ValueTask(Task.FromResult(i)); + + return new ValueTask(Task.Factory.StartNew(() => + { + Thread.Sleep(ThreadLocalRandom.Current.Next(1, 10)); + return i; + }, TaskCreationOptions.LongRunning)); + }) + .RunWith(Sink.FromSubscriber(c), Materializer); + var sub = await c.ExpectSubscriptionAsync(); + sub.Request(1000); + foreach (var n in Enumerable.Range(1, 50)) + await c.ExpectNextAsync(n); + //Enumerable.Range(1, 50).ForEach(n => c.ExpectNext(n)); + await c.ExpectCompleteAsync(); + } + + // Turning this on in CI/CD for now + [Fact] + public async Task A_Flow_with_SelectValueTaskAsync_must_not_run_more_futures_than_requested_parallelism() + { + await this.AssertAllStagesStoppedAsync(async () => + { + var probe = CreateTestProbe(); + var c = this.CreateManualSubscriberProbe(); + + Source.From(Enumerable.Range(1, 20)) + .SelectValueTaskAsync(8, async n => + { + await Task.Yield(); + probe.Ref.Tell(n); + return n; + }) + .RunWith(Sink.FromSubscriber(c), Materializer); + var sub = await c.ExpectSubscriptionAsync(); + await probe.ExpectNoMsgAsync(TimeSpan.FromMilliseconds(200)); + sub.Request(1); + (await probe.ReceiveNAsync(9).ToListAsync()).Should().BeEquivalentTo(Enumerable.Range(1, 9)); + await probe.ExpectNoMsgAsync(TimeSpan.FromMilliseconds(200)); + sub.Request(2); + (await probe.ReceiveNAsync(2).ToListAsync()).Should().BeEquivalentTo(Enumerable.Range(10, 2)); + await probe.ExpectNoMsgAsync(TimeSpan.FromMilliseconds(200)); + sub.Request(10); + (await probe.ReceiveNAsync(9).ToListAsync()).Should().BeEquivalentTo(Enumerable.Range(12, 9)); + await probe.ExpectNoMsgAsync(TimeSpan.FromMilliseconds(200)); + + foreach (var n in Enumerable.Range(1, 13)) + await c.ExpectNextAsync(n); + + await c.ExpectNoMsgAsync(TimeSpan.FromMilliseconds(200)); + }, Materializer).ShouldCompleteWithin(RemainingOrDefault); + } + + // Turning this on in CI/CD for now + [Fact] + public async Task A_Flow_with_parallel_execution_SelectValueTaskAsync_must_signal_task_failure() + { + await this.AssertAllStagesStoppedAsync(async() => { + var c = this.CreateManualSubscriberProbe(); + + Source.From(Enumerable.Range(1, 5)) + .SelectValueTaskAsync(4, async n => + { + if (n == 4) + throw new TestException("err1"); + await Task.Delay(10.Seconds()); + + return n; + }) + .To(Sink.FromSubscriber(c)).Run(Materializer); + + var sub = await c.ExpectSubscriptionAsync(); + sub.Request(10); + + var exception = await c.ExpectErrorAsync(); + exception.Message.Should().Be("err1"); + }, Materializer).ShouldCompleteWithin(RemainingOrDefault); + } + + [Fact] + public async Task A_Flow_with_SelectValueTaskAsync_must_signal_task_failure() + { + await this.AssertAllStagesStoppedAsync(async() => { + var probe = Source.From(Enumerable.Range(1, 5)) + .SelectValueTaskAsync(1, async n => + { + await Task.Delay(10); + if (n == 3) + throw new TestException("err1"); + + return n; + }) + .RunWith(this.SinkProbe(), Materializer); + + var exception = await probe.AsyncBuilder() + .Request(10) + .ExpectNextN(new[]{1, 2}) + .ExpectErrorAsync() + .ShouldCompleteWithin(RemainingOrDefault); + exception.Message.Should().Be("err1"); + }, Materializer); + } + + [Fact] + public async Task A_Flow_with_SelectValueTaskAsync_must_signal_task_failure_asap() + { + await this.AssertAllStagesStoppedAsync(async () => { + var latch = CreateTestLatch(); + var done = Source.From(Enumerable.Range(1, 5)) + .Select(n => + { + //if (n != 1) + // slow upstream should not block the error + //latch.Ready(TimeSpan.FromSeconds(10)); + + return n; + }) + .SelectValueTaskAsync(4, n => + { + if (n == 1) + { + var c = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + c.SetException(new Exception("err1")); + return new ValueTask(c.Task); + } + return new ValueTask(Task.FromResult(n)); + }).RunWith(Sink.Ignore(), Materializer); + + await FluentActions.Awaiting(async () => await done).Should() + .ThrowAsync() + .WithMessage("err1") + .ShouldCompleteWithin(RemainingOrDefault); + + latch.CountDown(); + }, Materializer); + } + + [Fact] + public async Task A_Flow_with_SelectValueTaskAsync_must_signal_error_from_SelectValueTaskAsync() + { + await this.AssertAllStagesStoppedAsync(async () => { + var c = this.CreateManualSubscriberProbe(); + Source.From(Enumerable.Range(1, 5)) + .SelectValueTaskAsync(4, n => + { + if (n == 3) + throw new TestException("err2"); + + return new ValueTask(Task.Run(async () => + { + await Task.Delay(10.Seconds()); + return n; + })); + }) + .RunWith(Sink.FromSubscriber(c), Materializer); + var sub = await c.ExpectSubscriptionAsync(); + sub.Request(10); + c.ExpectError().Message.Should().Be("err2"); + }, Materializer); + } + + [Fact] + public async Task A_Flow_with_SelectValueTaskAsync_must_invoke_supervision_strategy_on_task_failure() + { + await this.AssertAllStagesStoppedAsync(async () => + { + var invoked = false; + var probe = Source.From(Enumerable.Range(1, 5)) + .SelectValueTaskAsync(1, n => new ValueTask( Task.Run(() => + { + if (n == 3) + throw new TestException("err3"); + return n; + }))) + .WithAttributes(ActorAttributes.CreateSupervisionStrategy(_ => + { + invoked = true; + return Directive.Stop; + })) + .RunWith(this.SinkProbe(), Materializer); + + await probe.AsyncBuilder() + .Request(10) + .ExpectNextN(new[] { 1, 2 }) + .ExpectErrorAsync(); + + invoked.Should().BeTrue(); + }, Materializer); + } + + [Fact] + public async Task A_Flow_with_SelectValueTaskAsync_must_resume_after_task_failure() + { + await this.AssertAllStagesStoppedAsync(async () => { + var c = this.CreateManualSubscriberProbe(); + Source.From(Enumerable.Range(1, 5)) + .SelectValueTaskAsync(4, n => new ValueTask(Task.Run(() => + { + if (n == 3) + throw new TestException("err3"); + return n; + }))) + .WithAttributes(ActorAttributes.CreateSupervisionStrategy(Deciders.ResumingDecider)) + .RunWith(Sink.FromSubscriber(c), Materializer); + var sub = await c.ExpectSubscriptionAsync(); + sub.Request(10); + foreach (var i in new[] { 1, 2, 4, 5 }) + await c.ExpectNextAsync(i); + await c.ExpectCompleteAsync(); + }, Materializer); + } + + [Fact] + public async Task A_Flow_with_SelectValueTaskAsync_must_resume_after_multiple_failures() + { + await this.AssertAllStagesStoppedAsync(() => { + var futures = new[] + { + Task.Run(() => { throw new TestException("failure1"); return "";}), + Task.Run(() => { throw new TestException("failure2"); return "";}), + Task.Run(() => { throw new TestException("failure3"); return "";}), + Task.Run(() => { throw new TestException("failure4"); return "";}), + Task.Run(() => { throw new TestException("failure5"); return "";}), + Task.FromResult("happy") + }; + + var t = Source.From(futures) + .SelectValueTaskAsync(2, x => new ValueTask(x)) + .WithAttributes(ActorAttributes.CreateSupervisionStrategy(Deciders.ResumingDecider)) + .RunWith(Sink.First(), Materializer); + + t.Wait(TimeSpan.FromSeconds(3)).Should().BeTrue(); + t.Result.Should().Be("happy"); + return Task.CompletedTask; + }, Materializer); + } + + [Fact] + public async Task A_Flow_with_SelectValueTaskAsync_must_finish_after_task_failure() + { + await this.AssertAllStagesStoppedAsync(async() => + { + var t = Source.From(Enumerable.Range(1, 3)) + .SelectValueTaskAsync(1, n => new ValueTask(Task.Run(() => + { + if (n == 3) + throw new TestException("err3b"); + return n; + }))) + .WithAttributes(ActorAttributes.CreateSupervisionStrategy(Deciders.ResumingDecider)) + .Grouped(10) + .RunWith(Sink.First>(), Materializer); + + var complete = await t.ShouldCompleteWithin(3.Seconds()); + complete.Should().BeEquivalentTo(new[] { 1, 2 }); + }, Materializer); + } + + [Fact] + public async Task A_Flow_with_SelectValueTaskAsync_must_resume_when_SelectValueTaskAsync_throws() + { + var c = this.CreateManualSubscriberProbe(); + Source.From(Enumerable.Range(1, 5)) + .SelectValueTaskAsync(4, n => + { + if (n == 3) + throw new TestException("err4"); + return new ValueTask(Task.FromResult(n)); + }) + .WithAttributes(ActorAttributes.CreateSupervisionStrategy(Deciders.ResumingDecider)) + .RunWith(Sink.FromSubscriber(c), Materializer); + var sub = await c.ExpectSubscriptionAsync(); + sub.Request(10); + foreach (var i in new[] { 1, 2, 4, 5 }) + await c.ExpectNextAsync(i); + await c.ExpectCompleteAsync(); + } + + [Fact] + public async Task A_Flow_with_SelectValueTaskAsync_must_signal_NPE_when_task_is_completed_with_null() + { + var c = this.CreateManualSubscriberProbe(); + + Source.From(new[] {"a", "b"}) + .SelectValueTaskAsync(4, _ =>new ValueTask(Task.FromResult(null as string))) + .To(Sink.FromSubscriber(c)).Run(Materializer); + + var sub = await c.ExpectSubscriptionAsync(); + sub.Request(10); + c.ExpectError().Message.Should().StartWith(ReactiveStreamsCompliance.ElementMustNotBeNullMsg); + } + + [Fact] + public async Task A_Flow_with_SelectValueTaskAsync_must_resume_when_task_is_completed_with_null() + { + var c = this.CreateManualSubscriberProbe(); + Source.From(new[] { "a", "b", "c" }) + .SelectValueTaskAsync(4, s => s.Equals("b") ? new ValueTask(null as string) : new ValueTask(s)) + .WithAttributes(ActorAttributes.CreateSupervisionStrategy(Deciders.ResumingDecider)) + .To(Sink.FromSubscriber(c)).Run(Materializer); + var sub = await c.ExpectSubscriptionAsync(); + sub.Request(10); + await c.ExpectNextAsync("a"); + await c.ExpectNextAsync("c"); + await c.ExpectCompleteAsync(); + } + + [Fact] + public async Task A_Flow_with_SelectValueTaskAsync_must_handle_cancel_properly() + { + await this.AssertAllStagesStoppedAsync(async() => { + var pub = this.CreateManualPublisherProbe(); + var sub = this.CreateManualSubscriberProbe(); + + Source.FromPublisher(pub) + .SelectValueTaskAsync(4, _ => new ValueTask(0)) + .RunWith(Sink.FromSubscriber(sub), Materializer); + + var upstream = await pub.ExpectSubscriptionAsync(); + await upstream.ExpectRequestAsync(); + + (await sub.ExpectSubscriptionAsync()).Cancel(); + + await upstream.ExpectCancellationAsync(); + }, Materializer); + } + + [LocalFact(SkipLocal = "Racy on Azure DevOps")] + public async Task A_Flow_with_SelectValueTaskAsync_must_not_run_more_futures_than_configured() + { + await this.AssertAllStagesStoppedAsync(async() => + { + const int parallelism = 8; + var counter = new AtomicCounter(); + var queue = new BlockingQueue<(TaskCompletionSource, long)>(); + var cancellation = new CancellationTokenSource(); +#pragma warning disable CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed + Task.Factory.StartNew(() => + { + var delay = 500; // 50000 nanoseconds + var count = 0; + var cont = true; + while (cont) + { + try + { + var t = queue.Take(cancellation.Token); + var promise = t.Item1; + var enqueued = t.Item2; + var wakeup = enqueued + delay; + while (DateTime.Now.Ticks < wakeup) { } + counter.Decrement(); + promise.SetResult(count); + count++; + } + catch + { + cont = false; + } + } + }, cancellation.Token, TaskCreationOptions.LongRunning, TaskScheduler.Default); +#pragma warning restore CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed + + Func> deferred = () => + { + var promise = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + if (counter.IncrementAndGet() > parallelism) + promise.SetException(new Exception("parallelism exceeded")); + else + queue.Enqueue((promise, DateTime.Now.Ticks)); + return new ValueTask(promise.Task); + }; + + try + { + const int n = 10000; + var task = Source.From(Enumerable.Range(1, n)) + .SelectValueTaskAsync(parallelism, _ => deferred()) + .RunAggregate(0, (c, _) => c + 1, Materializer); + + var complete = await task.ShouldCompleteWithin(3.Seconds()); + complete.Should().Be(n); + } + finally + { + cancellation.Cancel(false); + } + }, Materializer); + } +} \ No newline at end of file diff --git a/src/core/Akka.Streams.Tests/Dsl/SourceSpec.cs b/src/core/Akka.Streams.Tests/Dsl/SourceSpec.cs index f1532191925..231341101d9 100644 --- a/src/core/Akka.Streams.Tests/Dsl/SourceSpec.cs +++ b/src/core/Akka.Streams.Tests/Dsl/SourceSpec.cs @@ -12,6 +12,7 @@ using System.Threading.Tasks; using Akka.Streams.Dsl; using Akka.Streams.TestKit; +using Akka.Streams.Tests.Util; using Akka.Streams.Util; using Akka.TestKit.Extensions; using Akka.TestKit; @@ -380,6 +381,24 @@ public void Unfold_Source_must_generate_a_finite_fibonacci_sequence_asynchronous return ints; }, Materializer).Result.Should().Equal(Expected); } + + [Fact] + public void UnfoldValueTask_Source_must_generate_a_finite_fibonacci_sequence_asynchronously() + { + Source.UnfoldValueTaskAsync((0, 1), tuple => + { + var a = tuple.Item1; + var b = tuple.Item2; + if (a > 10000000) + return Task.FromResult(Option<((int, int), int)>.None).ToValueTask(); + + return Task.FromResult(((b, a + b), a).AsOption()).ToValueTask(); + }).RunAggregate(new LinkedList(), (ints, i) => + { + ints.AddFirst(i); + return ints; + }, Materializer).Result.Should().Equal(Expected); + } [Fact] public void Unfold_Source_must_generate_a_unboundeed_fibonacci_sequence() diff --git a/src/core/Akka.Streams.Tests/Dsl/UnfoldResourceValueTaskAsyncSourceSpec.cs b/src/core/Akka.Streams.Tests/Dsl/UnfoldResourceValueTaskAsyncSourceSpec.cs new file mode 100644 index 00000000000..8b6a502efc2 --- /dev/null +++ b/src/core/Akka.Streams.Tests/Dsl/UnfoldResourceValueTaskAsyncSourceSpec.cs @@ -0,0 +1,587 @@ +// ----------------------------------------------------------------------- +// +// Copyright (C) 2009-2024 Lightbend Inc. +// Copyright (C) 2013-2024 .NET Foundation +// +// ----------------------------------------------------------------------- + +using System; +using System.Collections; +using System.Collections.Generic; +using System.Linq; +using System.Threading.Tasks; +using Akka.Pattern; +using Akka.Streams.Dsl; +using Akka.Streams.Implementation; +using Akka.Streams.Supervision; +using Akka.Streams.TestKit; +using Akka.Streams.Tests.Util; +using Akka.Streams.Util; +using Akka.TestKit; +using Akka.TestKit.Extensions; +using Akka.Util; +using Akka.Util.Internal; +using FluentAssertions; +using FluentAssertions.Extensions; +using Xunit; +using Xunit.Abstractions; + +namespace Akka.Streams.Tests.Dsl; + +public class UnfoldResourceValueTaskAsyncSourceSpec : AkkaSpec +{ + private class ResourceDummy + { + private readonly IEnumerator _iterator; + private readonly Task _createFuture; + private readonly Task _firstReadFuture; + private readonly Task _closeFuture; + + private readonly TaskCompletionSource _createdPromise = new(); + private readonly TaskCompletionSource _closedPromise = new(); + private readonly TaskCompletionSource _firstReadPromise = new(); + + // these can be used to observe when the resource calls has happened + public Task Created => _createdPromise.Task; + public Task FirstElementRead => _firstReadPromise.Task; + public Task Closed => _closedPromise.Task; + + public ResourceDummy(IEnumerable values, Task createFuture = default, Task firstReadFuture = default, Task closeFuture = default) + { + _iterator = values.GetEnumerator(); + _createFuture = createFuture ?? Task.FromResult(Done.Instance); + _firstReadFuture = firstReadFuture ?? Task.FromResult(Done.Instance); + _closeFuture = closeFuture ?? Task.FromResult(Done.Instance); + } + + public Task> Create() + { + _createdPromise.TrySetResult(Done.Instance); + return _createFuture.ContinueWith(_ => this); + } + + public Task> Read() + { + if (!_firstReadPromise.Task.IsCompleted) + _firstReadPromise.TrySetResult(Done.Instance); + + return _firstReadFuture.ContinueWith(_ => _iterator.MoveNext() ? _iterator.Current : Option.None); + } + + public Task Close() + { + _closedPromise.TrySetResult(Done.Instance); + return _closeFuture; + } + } + + public UnfoldResourceValueTaskAsyncSourceSpec(ITestOutputHelper helper) + : base(Utils.UnboundedMailboxConfig, helper) + { + Sys.Settings.InjectTopLevelFallback(ActorMaterializer.DefaultConfig()); + var settings = ActorMaterializerSettings.Create(Sys).WithDispatcher("akka.actor.default-dispatcher"); + Materializer = Sys.Materializer(settings); + } + + public ActorMaterializer Materializer { get; } + + [Fact] + public async Task A_UnfoldResourceValueTaskAsyncSource_must_unfold_data_from_a_resource() + { + await this.AssertAllStagesStoppedAsync(async () => + { + var createPromise = new TaskCompletionSource(); + var closePromise = new TaskCompletionSource(); + + var values = Enumerable.Range(0, 1000).ToList(); + var resource = new ResourceDummy(values, createPromise.Task, closeFuture: closePromise.Task); + + var probe = this.CreateSubscriberProbe(); + Source.UnfoldResourceValueTaskAsync( + () => resource.Create().ToValueTask(), + r => r.Read().ToValueTask(), + close: r => r.Close().ToValueTask()) + .RunWith(Sink.FromSubscriber(probe), Materializer); + + await probe.RequestAsync(1); + await resource.Created.ShouldCompleteWithin(3.Seconds()); + await probe.ExpectNoMsgAsync(TimeSpan.FromMilliseconds(200)); + createPromise.SetResult(Done.Instance); + + foreach (var i in values) + { + await resource.FirstElementRead.ShouldCompleteWithin(3.Seconds()); + (await probe.ExpectNextAsync()).ShouldBe(i); + await probe.RequestAsync(1); + } + + await resource.Closed.ShouldCompleteWithin(3.Seconds()); + closePromise.SetResult(Done.Instance); + + await probe.ExpectCompleteAsync(); + }, Materializer); + } + + [Fact] + public async Task A_UnfoldResourceValueTaskAsyncSource_must_close_resource_successfully_right_after_open() + { + await this.AssertAllStagesStoppedAsync(async () => + { + var probe = this.CreateSubscriberProbe(); + var firtRead = new TaskCompletionSource(); + var resource = new ResourceDummy(new[] { 1 }, firstReadFuture: firtRead.Task); + + Source.UnfoldResourceValueTaskAsync( + create: () => resource.Create().ToValueTask(), + read: reader => reader.Read().ToValueTask(), + close: reader => reader.Close().ToValueTask()) + .RunWith(Sink.FromSubscriber(probe), Materializer); + + await probe.RequestAsync(1L); + await resource.FirstElementRead.ShouldCompleteWithin(3.Seconds()); + // we cancel before we complete first read (racy) + await probe.CancelAsync(); + await Task.Delay(100); + firtRead.SetResult(Done.Instance); + + await resource.Closed.ShouldCompleteWithin(3.Seconds()); + }, Materializer); + } + + [Fact] + public async Task A_UnfoldResourceValueTaskAsyncSource_must_fail_when_create_throws_exception() + { + await this.AssertAllStagesStoppedAsync(async () => + { + var probe = this.CreateSubscriberProbe(); + var testException = new TestException("create failed"); + + Source.UnfoldResourceValueTaskAsync( + create: () => throw testException, + read: _ => default, + close: _ => default) + .RunWith(Sink.FromSubscriber(probe), Materializer); + + await probe.EnsureSubscriptionAsync(); + (await probe.ExpectErrorAsync()).ShouldBe(testException); + }, Materializer); + } + + [Fact] + public async Task A_UnfoldResourceValueTaskAsyncSource_must_fail_when_create_returns_failed_future() + { + await this.AssertAllStagesStoppedAsync(async () => + { + var probe = this.CreateSubscriberProbe(); + var testException = new TestException("create failed"); + + Source.UnfoldResourceValueTaskAsync( + create: () => Task.FromException(testException).ToValueTask(), + read: _ => default, + close: _ => default) + .RunWith(Sink.FromSubscriber(probe), Materializer); + + await probe.EnsureSubscriptionAsync(); + (await probe.ExpectErrorAsync()).ShouldBe(testException); + }, Materializer); + } + + [Fact] + public async Task A_UnfoldResourceValueTaskAsyncSource_must_fail_when_close_throws_exception() + { + await this.AssertAllStagesStoppedAsync(async () => + { + var probe = this.CreateSubscriberProbe(); + var testException = new TestException(""); + + Source.UnfoldResourceValueTaskAsync( + () => Task.FromResult(Task.CompletedTask).ToValueTask(), + _ => Task.FromResult(Option.None).ToValueTask(), + _ => throw testException) + .RunWith(Sink.FromSubscriber(probe), Materializer); + + await probe.EnsureSubscriptionAsync(); + await probe.RequestAsync(1L); + await probe.ExpectErrorAsync(); + }, Materializer); + } + + [Fact] + public async Task A_UnfoldResourceValueTaskAsyncSource_must_fail_when_close_returns_failed_future() + { + await this.AssertAllStagesStoppedAsync(async () => + { + var probe = this.CreateSubscriberProbe(); + var testException = new TestException("create failed"); + + Source.UnfoldResourceValueTaskAsync( + () => Task.FromResult(Task.CompletedTask).ToValueTask(), + _ => Task.FromResult(Option.None).ToValueTask(), + _ => Task.FromException(testException).ToValueTask()) + .RunWith(Sink.FromSubscriber(probe), Materializer); + + await probe.EnsureSubscriptionAsync(); + await probe.RequestAsync(1L); + await probe.ExpectErrorAsync(); + }, Materializer); + } + + [Fact] + public async Task A_UnfoldResourceValueTaskAsyncSource_must_continue_when_strategy_is_resume_and_read_throws() + { + await this.AssertAllStagesStoppedAsync(async () => + { + var result = Source.UnfoldResourceValueTaskAsync( + () => Task.FromResult(new object[] { 1, 2, new TestException("read-error"), 3 }.GetEnumerator()).ToValueTask(), + iterator => + { + if (iterator.MoveNext()) + { + var next = iterator.Current; + switch (next) + { + case int n: + return Task.FromResult(Option.Create(n)).ToValueTask(); + case TestException e: + throw e; + default: + throw new Exception($"Unexpected: {next}"); + } + } + + return Task.FromResult(Option.None).ToValueTask(); + }, + _ => Task.FromResult(Done.Instance).ToValueTask()) + .WithAttributes(ActorAttributes.CreateSupervisionStrategy(Deciders.ResumingDecider)) + .RunWith(Sink.Seq(), Materializer); + + var r = await result.ShouldCompleteWithin(3.Seconds()); + r.ShouldBe(new[] { 1, 2, 3 }); + }, Materializer); + } + + [Fact] + public async Task A_UnfoldResourceValueTaskAsyncSource_must_continue_when_strategy_is_resume_and_read_returns_failed_future() + { + await this.AssertAllStagesStoppedAsync(async () => + { + var result = Source.UnfoldResourceValueTaskAsync( + () => Task.FromResult(new object[] { 1, 2, new TestException("read-error"), 3 }.GetEnumerator()).ToValueTask(), + iterator => + { + if (iterator.MoveNext()) + { + var next = iterator.Current; + switch (next) + { + case int n: + return Task.FromResult(Option.Create(n)).ToValueTask(); + case TestException e: + return Task.FromException>(e).ToValueTask(); + default: + throw new Exception($"Unexpected: {next}"); + } + } + + return Task.FromResult(Option.None).ToValueTask(); + }, + _ => Task.FromResult(Done.Instance).ToValueTask()) + .WithAttributes(ActorAttributes.CreateSupervisionStrategy(Deciders.ResumingDecider)) + .RunWith(Sink.Seq(), Materializer); + + var r = await result.ShouldCompleteWithin(3.Seconds()); + r.ShouldBe(new[] { 1, 2, 3 }); + }, Materializer); + } + + [Fact] + public async Task A_UnfoldResourceValueTaskAsyncSource_must_close_and_open_stream_again_when_strategy_is_restart_and_read_throws() + { + await this.AssertAllStagesStoppedAsync(async () => + { + var failed = false; + var startCount = new AtomicCounter(0); + + var result = Source.UnfoldResourceValueTaskAsync( + () => + { + startCount.IncrementAndGet(); + return Task.FromResult(new[] { 1, 2, 3 }.GetEnumerator()).ToValueTask(); + }, + reader => + { + if (!failed) + { + failed = true; + throw new TestException("read-error"); + } + + return reader.MoveNext() && reader.Current != null + ? Task.FromResult(Option.Create((int)reader.Current)).ToValueTask() + : Task.FromResult(Option.None).ToValueTask(); + }, + _ => Task.FromResult(Done.Instance).ToValueTask()) + .WithAttributes(ActorAttributes.CreateSupervisionStrategy(Deciders.RestartingDecider)) + .RunWith(Sink.Seq(), Materializer); + + var r = await result.ShouldCompleteWithin(3.Seconds()); + r.ShouldBe(new[] { 1, 2, 3 }); + startCount.Current.ShouldBe(2); + }, Materializer); + } + + [Fact] + public async Task A_UnfoldResourceValueTaskAsyncSource_must_close_and_open_stream_again_when_strategy_is_restart_and_read_returns_failed_future() + { + await this.AssertAllStagesStoppedAsync(async () => + { + var failed = false; + var startCount = new AtomicCounter(0); + + var result = Source.UnfoldResourceValueTaskAsync( + () => + { + startCount.IncrementAndGet(); + return Task.FromResult(new[] { 1, 2, 3 }.GetEnumerator()).ToValueTask(); + }, + reader => + { + if (!failed) + { + failed = true; + return Task.FromException>(new TestException("read-error")).ToValueTask(); + } + + return reader.MoveNext() && reader.Current != null + ? Task.FromResult(Option.Create((int)reader.Current)).ToValueTask() + : Task.FromResult(Option.None).ToValueTask(); + }, + _ => Task.FromResult(Done.Instance).ToValueTask()) + .WithAttributes(ActorAttributes.CreateSupervisionStrategy(Deciders.RestartingDecider)) + .RunWith(Sink.Seq(), Materializer); + + var r = await result.ShouldCompleteWithin(3.Seconds()); + r.ShouldBe(new[] { 1, 2, 3 }); + startCount.Current.ShouldBe(2); + }, Materializer); + } + + [Fact] + public async Task A_UnfoldResourceValueTaskAsyncSource_must_fail_when_restarting_and_close_throws() + { + await this.AssertAllStagesStoppedAsync(async () => + { + var probe = this.CreateSubscriberProbe(); + Source.UnfoldResourceValueTaskAsync( + () => Task.FromResult(new[] { 1, 2, 3 }.GetEnumerator()).ToValueTask(), + _ => throw new TestException("read-error"), + _ => throw new TestException("close-error")) + .WithAttributes(ActorAttributes.CreateSupervisionStrategy(Deciders.RestartingDecider)) + .RunWith(Sink.FromSubscriber(probe), Materializer); + + await probe.RequestAsync(1L); + (await probe.ExpectErrorAsync()).Message.ShouldBe("close-error"); + }, Materializer); + } + + [Fact] + public async Task A_UnfoldResourceValueTaskAsyncSource_must_fail_when_restarting_and_close_returns_failed_future() + { + await this.AssertAllStagesStoppedAsync(async () => + { + var probe = this.CreateSubscriberProbe(); + Source.UnfoldResourceValueTaskAsync( + () => Task.FromResult(new[] { 1, 2, 3 }.GetEnumerator()).ToValueTask(), + _ => throw new TestException("read-error"), + _ => Task.FromException(new TestException("close-error")).ToValueTask()) + .WithAttributes(ActorAttributes.CreateSupervisionStrategy(Deciders.RestartingDecider)) + .RunWith(Sink.FromSubscriber(probe), Materializer); + + await probe.RequestAsync(1L); + (await probe.ExpectErrorAsync()).Message.ShouldBe("close-error"); + }, Materializer); + } + + [Fact] + public async Task A_UnfoldResourceValueTaskAsyncSource_must_fail_when_restarting_and_start_throws() + { + await this.AssertAllStagesStoppedAsync(async () => + { + var probe = this.CreateSubscriberProbe(); + var startCounter = new AtomicCounter(0); + + Source.UnfoldResourceValueTaskAsync( + () => + { + return startCounter.IncrementAndGet() < 2 ? + Task.FromResult(new[] { 1, 2, 3 }.GetEnumerator()).ToValueTask() : + throw new TestException("start-error"); + }, + _ => throw new TestException("read-error"), + _ => Task.FromResult(Done.Instance).ToValueTask()) + .WithAttributes(ActorAttributes.CreateSupervisionStrategy(Deciders.RestartingDecider)) + .RunWith(Sink.FromSubscriber(probe), Materializer); + + await probe.RequestAsync(1L); + (await probe.ExpectErrorAsync()).Message.ShouldBe("start-error"); + }, Materializer); + } + + [Fact] + public async Task A_UnfoldResourceValueTaskAsyncSource_must_fail_when_restarting_and_start_returns_failed_future() + { + await this.AssertAllStagesStoppedAsync(async () => + { + var probe = this.CreateSubscriberProbe(); + var startCounter = new AtomicCounter(0); + + Source.UnfoldResourceValueTaskAsync( + () => + { + return startCounter.IncrementAndGet() < 2 ? + Task.FromResult(new[] { 1, 2, 3 }.GetEnumerator()).ToValueTask() : + Task.FromException(new TestException("start-error")).ToValueTask(); + }, + _ => throw new TestException("read-error"), + _ => Task.FromResult(Done.Instance).ToValueTask()) + .WithAttributes(ActorAttributes.CreateSupervisionStrategy(Deciders.RestartingDecider)) + .RunWith(Sink.FromSubscriber(probe), Materializer); + + await probe.RequestAsync(1L); + (await probe.ExpectErrorAsync()).Message.ShouldBe("start-error"); + }, Materializer); + } + + // Could not use AssertAllStagesStoppedAsync because materializer is shut down inside the test. + [Fact] + public async Task A_UnfoldResourceValueTaskAsyncSource_must_close_resource_when_stream_is_abruptly_terminated() + { + var closePromise = new TaskCompletionSource(); + var materializer = ActorMaterializer.Create(Sys); + var p = Source.UnfoldResourceValueTaskAsync( + () => Task.FromResult(closePromise).ToValueTask(), + // a slow trickle of elements that never ends + _ => FutureTimeoutSupport.After(TimeSpan.FromMilliseconds(100), Sys.Scheduler, () => Task.FromResult(Option.Create("element"))).ToValueTask(), + tcs => + { + tcs.SetResult("Closed"); + return Task.FromResult(Done.Instance).ToValueTask(); + }) + .RunWith(Sink.AsPublisher(false), materializer); + + var c = this.CreateManualSubscriberProbe(); + p.Subscribe(c); + await c.ExpectSubscriptionAsync(); + + materializer.Shutdown(); + materializer.IsShutdown.Should().BeTrue(); + + var r = await closePromise.Task.ShouldCompleteWithin(3.Seconds()); + r.Should().Be("Closed"); + } + + [Fact] + public async Task A_UnfoldResourceValueTaskAsyncSource_must_close_resource_when_stream_is_quickly_cancelled() + { + await this.AssertAllStagesStoppedAsync(async () => + { + var closePromise = new TaskCompletionSource(); + var probe = Source.UnfoldResourceValueTaskAsync( + () => Task.FromResult(closePromise).ToValueTask(), + _ => Task.FromResult(Option.Create("whatever")).ToValueTask(), + tcs => + { + tcs.SetResult("Closed"); + return Task.FromResult(Done.Instance).ToValueTask(); + }) + .RunWith(this.SinkProbe(), Materializer); + + await probe.CancelAsync(); + + var r = await closePromise.Task.ShouldCompleteWithin(3.Seconds()); + r.Should().Be("Closed"); + }, Materializer); + } + + [Fact] + public async Task A_UnfoldResourceValueTaskAsyncSource_must_close_resource_when_stream_is_quickly_cancelled_reproducer_2() + { + await this.AssertAllStagesStoppedAsync(async () => + { + var closePromise = new TaskCompletionSource(); + Source.UnfoldResourceValueTaskAsync( + () => Task.FromResult(new[] { "a", "b", "c" }.GetEnumerator()).ToValueTask(), + m => Task.FromResult(m.MoveNext() && m.Current != null ? (string)m.Current : Option.None).ToValueTask(), + _ => + { + closePromise.SetResult("Closed"); + return Task.FromResult(Done.Instance).ToValueTask(); + }) + .Select(m => + { + Output.WriteLine($"Elem=> {m}"); + return m; + }) + .RunWith(Sink.Cancelled(), Materializer); + + var r = await closePromise.Task.ShouldCompleteWithin(3.Seconds()); + r.Should().Be("Closed"); + }, Materializer); + } + + [Fact] + public async Task A_UnfoldResourceValueTaskAsyncSource_close_the_resource_when_reading_an_element_returns_a_failed_future() + { + await this.AssertAllStagesStoppedAsync(async () => + { + var closePromise = new TaskCompletionSource(); + var probe = this.CreateSubscriberProbe(); + + Source.UnfoldResourceValueTaskAsync( + () => Task.FromResult(closePromise).ToValueTask(), + _ => Task.FromException>(new TestException("read failed")).ToValueTask(), + tcs => + { + tcs.TrySetResult("Closed"); + return Task.FromResult(Done.Instance).ToValueTask(); + }) + .RunWith(Sink.FromSubscriber(probe), Materializer); + + await probe.EnsureSubscriptionAsync(); + await probe.RequestAsync(1L); + await probe.ExpectErrorAsync(); + + var r = await closePromise.Task.ShouldCompleteWithin(3.Seconds()); + r.Should().Be("Closed"); + }, Materializer); + } + + [Fact] + public async Task A_UnfoldResourceValueTaskAsyncSource_close_the_resource_when_reading_an_element_throws() + { + await this.AssertAllStagesStoppedAsync(async () => + { + var closePromise = new TaskCompletionSource(); + var probe = this.CreateSubscriberProbe(); + + ValueTask> Fail(TaskCompletionSource _) => throw new TestException("read failed"); + + Source.UnfoldResourceValueTaskAsync( + () => Task.FromResult(closePromise).ToValueTask(), + Fail, + tcs => + { + tcs.SetResult("Closed"); + return Task.FromResult(Done.Instance).ToValueTask(); + }) + .RunWith(Sink.FromSubscriber(probe), Materializer); + + await probe.EnsureSubscriptionAsync(); + await probe.RequestAsync(1L); + await probe.ExpectErrorAsync(); + + var r = await closePromise.Task.ShouldCompleteWithin(3.Seconds()); + r.Should().Be("Closed"); + }, Materializer); + } +} \ No newline at end of file diff --git a/src/core/Akka.Streams/Dsl/Internal/InternalFlowOperations.cs b/src/core/Akka.Streams/Dsl/Internal/InternalFlowOperations.cs index 8175e40c0b1..4a84c86b1d9 100644 --- a/src/core/Akka.Streams/Dsl/Internal/InternalFlowOperations.cs +++ b/src/core/Akka.Streams/Dsl/Internal/InternalFlowOperations.cs @@ -339,6 +339,12 @@ public static IFlow SelectAsync(this IFlow(parallelism, asyncMapper)); } + + public static IFlow SelectValueTaskAsync(this IFlow flow, int parallelism, + Func> asyncMapper) + { + return flow.Via(new Fusing.SelectValueTaskAsync(parallelism, asyncMapper)); + } /// /// Transform this stream by applying the given function to each of the elements diff --git a/src/core/Akka.Streams/Dsl/Source.cs b/src/core/Akka.Streams/Dsl/Source.cs index e60b768ba79..b0802128bfc 100644 --- a/src/core/Akka.Streams/Dsl/Source.cs +++ b/src/core/Akka.Streams/Dsl/Source.cs @@ -789,6 +789,9 @@ public static Source Unfold(TState state, Func UnfoldAsync(TState state, Func>> unfoldAsync) => FromGraph(new UnfoldAsync(state, unfoldAsync)).WithAttributes(DefaultAttributes.UnfoldAsync); + + public static Source UnfoldValueTaskAsync(TState state, Func>> unfoldAsync) + => FromGraph(new UnfoldValueTaskAsync(state, unfoldAsync)).WithAttributes(DefaultAttributes.UnfoldValueTaskAsync); /// /// Simpler , for infinite sequences. /// @@ -1136,6 +1139,62 @@ public static Source UnfoldResourceAsync(Func(create, read, close)); } + + /// + /// Unfolds a resource, using ValueTask returns instead of normal tasks, + /// As well as optimizing when the result successfully completed synchronously. + /// + /// There is an overload that takes a create State, + /// allowing one to minimize captures and leakage + /// + /// (essentially, pass initial state in, + /// then any additional state gets passed into initial TState) + /// + /// function that is called on stream start and creates/opens resource. + /// function that reads data from opened resource. It is called each time backpressure signal + /// is received. Stream calls close and completes when from read function returns None. + /// function that closes resource + /// + /// + /// A source of T that is created on stream start and read on backpressure + public static Source UnfoldResourceValueTaskAsync(Func> create, + Func>> read, + Func close) + { + return FromGraph( + new UnfoldResourceSourceValueTaskAsync>, TSource>( + create, (a) => a(), read, close)); + } + + /// + /// Unfolds a resource, using ValueTask returns instead of normal tasks, + /// As well as optimizing when the result successfully completed synchronously. + /// + /// By passing an initial state in, + /// one can minimize accidental delegate over-captures. + /// + /// The initial state to be passed to the create function + /// function that is called on stream start and creates/opens resource. + /// function that reads data from opened resource. It is called each time backpressure signal + /// is received. Stream calls close and completes when from read function returns None. + /// function that closes resource + /// The type this source will emit while read returns with a value + /// The State type + /// The Initial state to be passed in to the Creation function. + /// A source of T that is created on stream start and read on backpressure + public static Source + UnfoldResourceValueTaskAsync( + TCreateState createState, + Func> create, + Func>> read, + Func close + ) + { + return FromGraph( + new UnfoldResourceSourceValueTaskAsync( + createState, create, read, close)); + } /// /// Start a new attached to a .NET event. In case when event will be triggered faster, than a downstream is able diff --git a/src/core/Akka.Streams/Dsl/SourceOperations.cs b/src/core/Akka.Streams/Dsl/SourceOperations.cs index 7ceed16caae..29822362060 100644 --- a/src/core/Akka.Streams/Dsl/SourceOperations.cs +++ b/src/core/Akka.Streams/Dsl/SourceOperations.cs @@ -281,6 +281,11 @@ public static Source SelectAsync(this Source)InternalFlowOperations.SelectAsync(flow, parallelism, asyncMapper); } + + public static Source SelectValueTaskAsync(this Source flow, int parallelism, Func> asyncMapper) + { + return (Source)InternalFlowOperations.SelectValueTaskAsync(flow, parallelism, asyncMapper); + } /// /// Transform this stream by applying the given function to each of the elements diff --git a/src/core/Akka.Streams/Implementation/Fusing/Ops.cs b/src/core/Akka.Streams/Implementation/Fusing/Ops.cs index 23158182cde..6bbce4447ac 100644 --- a/src/core/Akka.Streams/Implementation/Fusing/Ops.cs +++ b/src/core/Akka.Streams/Implementation/Fusing/Ops.cs @@ -6,12 +6,15 @@ //----------------------------------------------------------------------- using System; +using System.Collections.Concurrent; using System.Collections.Generic; using System.Collections.Immutable; +using System.Diagnostics; using System.Linq; using System.Runtime.CompilerServices; using System.Threading; using System.Threading.Tasks; +using System.Threading.Tasks.Sources; using Akka.Annotations; using Akka.Event; using Akka.Pattern; @@ -22,6 +25,7 @@ using Akka.Streams.Util; using Akka.Util; using Akka.Util.Internal; +using Debug = System.Diagnostics.Debug; using Decider = Akka.Streams.Supervision.Decider; using Directive = Akka.Streams.Supervision.Directive; @@ -2513,6 +2517,334 @@ public Expand(Func> extrapolate) public override string ToString() => "Expand"; } + public sealed class NotYetThereSentinel : Exception + { + public static readonly NotYetThereSentinel Instance = new(); + } + + /// + /// INTERNAL API + /// + /// TBD + /// TBD + [InternalApi] + public sealed class + SelectValueTaskAsync : GraphStage> + { + #region internal classes + + private sealed class Logic : InAndOutGraphStageLogic + { + private sealed class Holder + { + public SlimResult Element { get; private set; } + private readonly Action> _callback; + private ValueTask _pending; + private readonly Action _taskCompletedAction; + + + private void VTCompletionError(ValueTask vtCapture) + { + var t = vtCapture.AsTask(); + //We only care about faulted, not canceled. + if (t.IsFaulted) + { + var exception = t.Exception?.InnerExceptions != null && + t.Exception.InnerExceptions.Count == 1 + ? t.Exception.InnerExceptions[0] + : t.Exception; + + Invoke(SlimResult.ForError(exception)); + } + } + + public Holder(SlimResult element, Action> callback) + { + _callback = callback; + Element = element; + _taskCompletedAction = () => + { + var inst = this._pending; + this._pending = default; + if (inst.IsCompletedSuccessfully) + { + this.Invoke(SlimResult.ForSuccess(inst.Result)); + } + else + { + this.VTCompletionError(inst); + } + }; + } + + public void SetElement(SlimResult result) + { + Element = result; + } + + public void SetContinuation(ValueTask vt) + { + _pending = vt; + vt.ConfigureAwait(true).GetAwaiter() + .OnCompleted(_taskCompletedAction); + } + + public void Invoke(SlimResult result) + { + SetElement(result); + _callback(this); + } + } + + private static readonly SlimResult NotYetThere = + SlimResult.NotYetReady; + + private readonly SelectValueTaskAsync _stage; + private readonly Decider _decider; + private IBuffer> _buffer; + private readonly Action> _taskCallback; + + //Use this to hold on to reused holders. + private readonly ConcurrentQueue> _holderReuseQueue; + + public Logic(Attributes inheritedAttributes, + SelectValueTaskAsync stage) : base(stage.Shape) + { + _stage = stage; + var attr = inheritedAttributes + .GetAttribute(null); + _decider = attr != null + ? attr.Decider + : Deciders.StoppingDecider; + + _taskCallback = + GetAsyncCallback>(hc => + HolderCompleted(hc)); + _holderReuseQueue = + new ConcurrentQueue< + Holder>(); + SetHandlers(stage.In, stage.Out, this); + } + + private Holder RentOrGet() + { + if (_holderReuseQueue.TryDequeue(out var item)) + { + return item; + } + else + { + return new Holder(NotYetThere, _taskCallback); + } + } + + public override void OnPush() + { + var message = Grab(_stage.In); + try + { + var task = _stage._mapFunc(message); + var holder = RentOrGet(); + _buffer.Enqueue(holder); + // We dispatch the task if it's ready to optimize away + // scheduling it to an execution context + if (task.IsCompletedSuccessfully) + { + holder.SetElement(SlimResult.ForSuccess(task.Result)); + HolderCompleted(holder); + } + else + holder.SetContinuation(task); + } + catch (Exception e) + { + onPushErrorDecider(e, message); + } + + if (Todo < _stage._parallelism && !HasBeenPulled(_stage.In)) + TryPull(_stage.In); + } + + private void onPushErrorDecider(Exception e, TIn message) + { + var strategy = _decider(e); + Log.Error(e, + "An exception occured inside SelectValueTaskAsync while processing message [{0}]. Supervision strategy: {1}", + message, strategy); + switch (strategy) + { + case Directive.Stop: + FailStage(e); + break; + + case Directive.Resume: + case Directive.Restart: + break; + + default: + throw new AggregateException( + $"Unknown SupervisionStrategy directive: {strategy}", + e); + } + } + + public override void OnUpstreamFinish() + { + if (Todo == 0) + CompleteStage(); + } + + public override void OnPull() => PushOne(null); + + private int Todo => _buffer.Used; + + public override void PreStart() => + _buffer = + Buffer.Create>(_stage._parallelism, + Materializer); + + private void PushOne(Holder holder) + { + var inlet = _stage.In; + while (true) + { + if (_buffer.IsEmpty) + { + if (IsClosed(inlet)) + { + CompleteStage(); + } + else if (!HasBeenPulled(inlet)) + { + Pull(inlet); + } + } + else if (_buffer.Peek().Element.IsDone() == false) + { + if (Todo < _stage._parallelism && !HasBeenPulled(inlet)) + { + TryPull(inlet); + } + } + else + { + var dequeued = _buffer.Dequeue(); + var result = dequeued!.Element; + if (!result.IsSuccess()) + continue; + // Important! We can only re-use if the DQ holder + // is the one that was passed in. + // That means it already passed through HolderCompleted, + // otherwise (i.e. HolderCompleted not yet called) + // when that happens, because it is reset, we blow up =( + if (holder == dequeued) + { + dequeued.SetElement(NotYetThere); + _holderReuseQueue.Enqueue(dequeued); + } + + Push(_stage.Out, result.Result); + + if (Todo < _stage._parallelism && !HasBeenPulled(inlet)) + TryPull(inlet); + } + + break; + } + } + + private void HolderCompleted(Holder holder) + { + var element = holder.Element; + if (element.IsSuccess()) + { + if (IsAvailable(_stage.Out)) + PushOne(holder); + return; + } + + HolderCompletedError(element); + } + + private void HolderCompletedError(SlimResult element) + { + var exception = element.Error; + var strategy = _decider(exception); + Log.Error(exception, + "An exception occured inside SelectValueTaskAsync while executing Task. Supervision strategy: {0}", + strategy); + switch (strategy) + { + case Directive.Stop: + FailStage(exception); + break; + + case Directive.Resume: + case Directive.Restart: + if (IsAvailable(_stage.Out)) + PushOne(null); + break; + + default: + throw new AggregateException( + $"Unknown SupervisionStrategy directive: {strategy}", + exception); + } + } + + public override string ToString() => + $"SelectValueTaskAsync.Logic(buffer={_buffer})"; + } + + #endregion + + private readonly int _parallelism; + private readonly Func> _mapFunc; + + /// + /// TBD + /// + public readonly Inlet In = new("SelectValueTaskAsync.in"); + + /// + /// TBD + /// + public readonly Outlet Out = new("SelectValueTaskAsync.out"); + + /// + /// TBD + /// + /// TBD + /// TBD + public SelectValueTaskAsync(int parallelism, + Func> mapFunc) + { + _parallelism = parallelism; + _mapFunc = mapFunc; + Shape = new FlowShape(In, Out); + } + + /// + /// TBD + /// + protected override Attributes InitialAttributes { get; } = + Attributes.CreateName("selectValueTaskAsync"); + + /// + /// TBD + /// + public override FlowShape Shape { get; } + + /// + /// TBD + /// + /// TBD + /// TBD + protected override GraphStageLogic CreateLogic( + Attributes inheritedAttributes) + => new Logic(inheritedAttributes, this); + } + /// /// INTERNAL API /// @@ -2551,7 +2883,7 @@ public void Invoke(Result result) } } - private static readonly Result NotYetThere = Result.Failure(new Exception()); + private static readonly Result NotYetThere = Result.Failure(NotYetThereSentinel.Instance); private readonly SelectAsync _stage; private readonly Decider _decider; @@ -3827,6 +4159,7 @@ public Logic(SourceShape shape, IAsyncEnumerable enumerable) : base(shape) _completionCts = new CancellationTokenSource(); SetHandler(_outlet, this); } + [MethodImpl(MethodImplOptions.AggressiveInlining)] private void OnComplete() => CompleteStage(); @@ -3889,16 +4222,12 @@ public override void OnPull() var vtask = _enumerator.MoveNextAsync(); if (vtask.IsCompletedSuccessfully) { - // When MoveNextAsync returned immediately, we don't need to await. - // We can use fast path instead. if (vtask.Result) { - // if result is true, it means we got an element. Push it downstream. Push(_outlet, _enumerator.Current); } else { - // if result is false, it means enumerator was closed. Complete stage in that case. CompleteStage(); } } diff --git a/src/core/Akka.Streams/Implementation/Fusing/SlimResult.cs b/src/core/Akka.Streams/Implementation/Fusing/SlimResult.cs new file mode 100644 index 00000000000..e53f88e826b --- /dev/null +++ b/src/core/Akka.Streams/Implementation/Fusing/SlimResult.cs @@ -0,0 +1,76 @@ +// //----------------------------------------------------------------------- +// // +// // Copyright (C) 2009-2024 Lightbend Inc. +// // Copyright (C) 2013-2024 .NET Foundation +// // +// //----------------------------------------------------------------------- + +using System; +using System.Threading.Tasks; + +namespace Akka.Streams.Implementation.Fusing; + +public readonly struct SlimResult +{ + public readonly Exception Error; + public readonly T Result; + + public static readonly SlimResult NotYetReady = + SlimResult.ForError(NotYetThereSentinel.Instance); + + private static readonly SlimResult MustNotBeNull = + SlimResult.ForError(ReactiveStreamsCompliance + .ElementMustNotBeNullException); + public static SlimResult FromTask(Task task) + { + return task.IsCanceled || task.IsFaulted + ? SlimResult.ForError(task.Exception) + : SlimResult.ForSuccess(task.Result); + } + public SlimResult(Exception errorOrSentinel, T result) + { + if (result == null || errorOrSentinel != null) + { + Error = errorOrSentinel ?? ReactiveStreamsCompliance + .ElementMustNotBeNullException; + } + else + { + Result = result; + } + } + + private SlimResult(Exception errorOrSentinel) + { + Error = errorOrSentinel; + Result = default; + } + + private SlimResult(T result) + { + Error = default; + Result = result; + } + + public static SlimResult ForError(Exception errorOrSentinel) + { + return new SlimResult(errorOrSentinel); + } + + public static SlimResult ForSuccess(T result) + { + return result == null + ? SlimResult.MustNotBeNull + : new SlimResult(result); + } + + public bool IsSuccess() + { + return Error == null; + } + + public bool IsDone() + { + return Error != NotYetThereSentinel.Instance; + } +} \ No newline at end of file diff --git a/src/core/Akka.Streams/Implementation/PooledAwaitOutGraphStageLogic.cs b/src/core/Akka.Streams/Implementation/PooledAwaitOutGraphStageLogic.cs new file mode 100644 index 00000000000..8a4f6a2b59b --- /dev/null +++ b/src/core/Akka.Streams/Implementation/PooledAwaitOutGraphStageLogic.cs @@ -0,0 +1,92 @@ +// //----------------------------------------------------------------------- +// // +// // Copyright (C) 2009-2024 Lightbend Inc. +// // Copyright (C) 2013-2024 .NET Foundation +// // +// //----------------------------------------------------------------------- + +using System; +using System.Threading; +using System.Threading.Tasks; +using Akka.Streams.Implementation.Fusing; +using Akka.Streams.Stage; + +namespace Akka.Streams.Implementation; + +/// +/// Semi-unsafe Helper intermediate for +/// that allows for a ValueTask Wait to be pooled. +/// +/// Inheritors are expected to utilize the +/// and call `base.PreStart()` in their `PreStart` conditions. +/// +/// Additionally, if inheritors have their own 'restart' logic, +/// They should utilize the `ResetHolder()` method, +/// to avoid callback clashes. +/// +/// +/// +internal abstract class PooledAwaitOutGraphStageLogic : OutGraphStageLogic +{ + private UnsafeSlimHolder _unsafeSlimHolder; + protected Action> _completedCallback; + protected PooledAwaitOutGraphStageLogic(Shape shape) : base(shape) + { + _completedCallback = GetAsyncCallback>(t => + { + FailStage(new Exception("Callback was not set!")); + }); + } + + protected void SetPooledCompletionCallback(Action> completedCallback) + { + if (_completedCallback == null) + { + throw new ArgumentNullException( + nameof(completedCallback)); + } + _completedCallback = GetAsyncCallback(completedCallback); + } + + public override void PreStart() + { + ResetHolder(); + } + + /// + /// Sets a ValueTask to wire up the callback, + /// set via . + /// If has not been called, + /// The continuation will fail the stage! + /// + /// + protected void SetContinuation(ValueTask valueTask, bool configureAwait = true) + { + _unsafeSlimHolder.SetContinuation(valueTask, configureAwait); + } + + /// + /// Use at own risk! + /// + protected void SetHolder(UnsafeSlimHolder holder) + { + Interlocked.Exchange(ref _unsafeSlimHolder, holder); + } + + public void ResetHolder() + { + Interlocked.Exchange(ref _unsafeSlimHolder, new UnsafeSlimHolder(this)); + } + + internal void RunIfSame(UnsafeSlimHolder unsafeSlimHolder, ValueTask vt) + { + //We are explicitly using referenceEquals here, + //since we are explicitly resetting things. + if (object.ReferenceEquals(_unsafeSlimHolder, unsafeSlimHolder)) + { + _completedCallback(vt.IsCompletedSuccessfully + ? SlimResult.ForSuccess(vt.Result) + : SlimResult.FromTask(vt.AsTask())); + } + } +} \ No newline at end of file diff --git a/src/core/Akka.Streams/Implementation/Sources.cs b/src/core/Akka.Streams/Implementation/Sources.cs index e5ee1cb2d07..486abb54eb4 100644 --- a/src/core/Akka.Streams/Implementation/Sources.cs +++ b/src/core/Akka.Streams/Implementation/Sources.cs @@ -7,10 +7,12 @@ using System; using System.Collections.Generic; +using System.Threading.Channels; using System.Threading.Tasks; using Akka.Annotations; using Akka.Pattern; using Akka.Streams.Dsl; +using Akka.Streams.Implementation.Fusing; using Akka.Streams.Implementation.Stages; using Akka.Streams.Stage; using Akka.Streams.Supervision; @@ -761,6 +763,262 @@ public UnfoldResourceSourceAsync(Func> create, Func "UnfoldResourceSourceAsync"; } + /// + /// INTERNAL API + /// + /// TBD + /// TBD + /// The state passed to resource create function + [InternalApi] + public sealed class UnfoldResourceSourceValueTaskAsync : GraphStage> + { + #region Logic + + private sealed class Logic : PooledAwaitOutGraphStageLogic> + { + private readonly UnfoldResourceSourceValueTaskAsync _stage; + private readonly Lazy _decider; + private Option _state = Option.None; + + public Logic(UnfoldResourceSourceValueTaskAsync stage, Attributes inheritedAttributes) + : base(stage.Shape) + { + _stage = stage; + _decider = new Lazy(() => + { + var strategy = inheritedAttributes.GetAttribute(null); + return strategy != null ? strategy.Decider : Deciders.StoppingDecider; + }); + SetPooledCompletionCallback(Handler); + SetHandler(_stage.Out, this); + } + + + private Action> CreatedCallback => GetAsyncCallback>(resource => + { + if (resource.IsSuccess) + { + _state = resource.Success; + if (IsAvailable(_stage.Out)) OnPull(); + } + else FailStage(resource.Failure.Value); + }); + + private void ErrorHandler(Exception ex) + { + switch (_decider.Value(ex)) + { + case Directive.Stop: + FailStage(ex); + break; + case Directive.Restart: + try + { + RestartResource(); + } + catch (Exception ex1) + { + FailStage(ex1); + } + break; + case Directive.Resume: + OnPull(); + break; + default: + throw new ArgumentOutOfRangeException(); + } + } + + private void Handler(SlimResult> read) + { + if (read.IsSuccess()) + { + var data = read.Result; + if (data.HasValue) + { + var some = data.Value; + Push(_stage.Out, some); + } + else + { + // end of resource reached, lets close it + if (_state.HasValue) + { + CloseResource(); + } + else + { + // cannot happen, but for good measure + throw new InvalidOperationException("Reached end of data but there is no open resource"); + } + } + } + else + ErrorHandler(read.Error); + } + + private void CloseResource() + { + var resource = _state.Value; + _stage._close(resource).AsTask().OnComplete(GetAsyncCallback>( + done => + { + if (done.IsSuccess) CompleteStage(); + else FailStage(done.Failure.Value); + })); + _state = Option.None; + } + + public override void PreStart() + { + CreateResource(); + base.PreStart(); + } + + public override void OnPull() + { + if (_state.HasValue) + { + try + { + var resource = _state.Value; + var vt = _stage._readData(resource); + if (vt.IsCompletedSuccessfully) + { + var maybe = vt.GetAwaiter().GetResult(); + if (maybe.HasValue) + { + Push(_stage.Out, maybe.Value); + } + else + { + CloseResource(); + } + } + else + { + SetContinuation(vt); + } + + + } + catch (Exception ex) + { + ErrorHandler(ex); + } + } + else + { + // we got a pull but there is no open resource, we are either + // currently creating/restarting then the read will be triggered when creating the + // resource completes, or shutting down and then the pull does not matter anyway + } + } + + public override void PostStop() + { + if (_state.HasValue) + _stage._close(_state.Value); + } + + private void RestartResource() + { + if (_state.HasValue) + { + var resource = _state.Value; + // wait for the resource to close before restarting + _stage._close(resource).AsTask().OnComplete(GetAsyncCallback>(done => + { + if (done.IsSuccess) CreateResource(); + else FailStage(done.Failure.Value); + })); + _state = Option.None; + } + else CreateResource(); + } + + private void CreateResource() + { + _stage._create(_stage._createState).AsTask().OnComplete(resource => + { + try + { + CreatedCallback(resource); + } + catch (StreamDetachedException) + { + // stream stopped before created callback could be invoked, we need + // to close the resource if it is was opened, to not leak it + if (resource.IsSuccess) + { + _stage._close(resource.Success.Value); + } + else + { + // failed to open but stream is stopped already + throw resource.Failure.Value; + } + } + }); + } + } + + #endregion + + private readonly Func> _create; + private readonly Func>> _readData; + private readonly Func _close; + private readonly TCreateState _createState; + + /// + /// TBD + /// + /// + /// TBD + /// TBD + /// TBD + public UnfoldResourceSourceValueTaskAsync(TCreateState createState, + Func> create, + Func>> readData, + Func close) + { + _createState = createState; + _create = create; + _readData = readData; + _close = close; + + Shape = new SourceShape(Out); + } + + /// + /// TBD + /// + protected override Attributes InitialAttributes => DefaultAttributes.UnfoldResourceSourceValueTaskAsync; + + /// + /// TBD + /// + public Outlet Out { get; } = new("UnfoldResourceSourceValueTaskAsync.out"); + + /// + /// TBD + /// + public override SourceShape Shape { get; } + + /// + /// TBD + /// + /// TBD + /// TBD + protected override GraphStageLogic CreateLogic(Attributes inheritedAttributes) => new Logic(this, inheritedAttributes); + + /// + /// TBD + /// + /// TBD + public override string ToString() => "UnfoldResourceSourceValueTaskAsync"; + } + /// /// INTERNAL API /// diff --git a/src/core/Akka.Streams/Implementation/Stages/Stages.cs b/src/core/Akka.Streams/Implementation/Stages/Stages.cs index c7607cce4fb..0227ad62b10 100644 --- a/src/core/Akka.Streams/Implementation/Stages/Stages.cs +++ b/src/core/Akka.Streams/Implementation/Stages/Stages.cs @@ -257,6 +257,7 @@ public static class DefaultAttributes /// TBD /// public static readonly Attributes UnfoldAsync = Attributes.CreateName("unfoldAsync"); + public static readonly Attributes UnfoldValueTaskAsync = Attributes.CreateName("unfoldValueTaskAsync"); /// /// TBD /// @@ -270,6 +271,11 @@ public static class DefaultAttributes /// public static readonly Attributes UnfoldResourceSourceAsync = Attributes.CreateName("unfoldResourceSourceAsync").And(IODispatcher); /// + /// This, unlike normal UnfoldResourceSource, + /// does -not- use IODispatcher. + /// + public static readonly Attributes UnfoldResourceSourceValueTaskAsync = Attributes.CreateName("unfoldResourceSourceValueTaskAsync"); + /// /// TBD /// public static readonly Attributes TerminationWatcher = Attributes.CreateName("terminationWatcher"); diff --git a/src/core/Akka.Streams/Implementation/Unfold.cs b/src/core/Akka.Streams/Implementation/Unfold.cs index da60c73cff3..850d7b7c7c2 100644 --- a/src/core/Akka.Streams/Implementation/Unfold.cs +++ b/src/core/Akka.Streams/Implementation/Unfold.cs @@ -6,8 +6,11 @@ //----------------------------------------------------------------------- using System; +using System.Runtime.CompilerServices; using System.Threading.Tasks; +using System.Threading.Tasks.Sources; using Akka.Annotations; +using Akka.Streams.Implementation.Fusing; using Akka.Streams.Stage; using Akka.Streams.Util; using Akka.Util; @@ -88,6 +91,116 @@ public Unfold(TState state, Func> unfoldFunc) protected override GraphStageLogic CreateLogic(Attributes inheritedAttributes) => new Logic(this); } + /// + /// INTERNAL API + /// + /// TBD + /// TBD + [InternalApi] + public class UnfoldValueTaskAsync : GraphStage> + { + #region stage logic + private sealed class Logic : PooledAwaitOutGraphStageLogic> + { + private readonly UnfoldValueTaskAsync _stage; + private TState _state; + + public Logic(UnfoldValueTaskAsync stage) : base(stage.Shape) + { + + _stage = stage; + _state = _stage.State; + SetPooledCompletionCallback(SyncResult); + SetHandler(_stage.Out, this); + } + + public override void OnPull() + { + ValueTask> vt; + bool taken = false; + try + { + vt = _stage.UnfoldFunc(_state); + taken = true; + } + catch (Exception e) + { + vt = default; + Fail(_stage.Out,e); + } + + if (taken) + { + if (vt.IsCompletedSuccessfully) + { + SyncResult( + SlimResult>.ForSuccess( + vt.Result)); + } + else + { + SetContinuation(vt); + } + } + } + + private void SyncResult(SlimResult> result) + { + if (!result.IsSuccess()) + Fail(_stage.Out, result.Error); + else + { + var option = result.Result; + if (!option.HasValue) + Complete(_stage.Out); + else + { + Push(_stage.Out, option.Value.Item2); + _state = option.Value.Item1; + } + } + } + } + #endregion + + /// + /// TBD + /// + public readonly TState State; + /// + /// TBD + /// + public readonly Func>> UnfoldFunc; + /// + /// TBD + /// + public readonly Outlet Out = new("UnfoldValueTaskAsync.out"); + + /// + /// TBD + /// + /// TBD + /// TBD + public UnfoldValueTaskAsync(TState state, Func>> unfoldFunc) + { + State = state; + UnfoldFunc = unfoldFunc; + Shape = new SourceShape(Out); + } + + /// + /// TBD + /// + public override SourceShape Shape { get; } + + /// + /// TBD + /// + /// TBD + /// TBD + protected override GraphStageLogic CreateLogic(Attributes inheritedAttributes) => new Logic(this); + } + /// /// INTERNAL API /// diff --git a/src/core/Akka.Streams/Implementation/UnsafeSlimHolder.cs b/src/core/Akka.Streams/Implementation/UnsafeSlimHolder.cs new file mode 100644 index 00000000000..9a5959ec347 --- /dev/null +++ b/src/core/Akka.Streams/Implementation/UnsafeSlimHolder.cs @@ -0,0 +1,54 @@ +// //----------------------------------------------------------------------- +// // +// // Copyright (C) 2009-2024 Lightbend Inc. +// // Copyright (C) 2013-2024 .NET Foundation +// // +// //----------------------------------------------------------------------- + +using System; +using System.Runtime.CompilerServices; +using System.Threading.Tasks; +using System.Threading.Tasks.Sources; + +namespace Akka.Streams.Implementation; + +/// +/// Used for Pooling ValueTasks by +/// +/// +/// This is intentionally only guarded for the sake of avoiding Exceptions, +/// As it is expected that implementations are within Akka Streams Stages +/// (which have their own safety guarantees) and geared towards performance. +/// +internal class UnsafeSlimHolder +{ + private readonly PooledAwaitOutGraphStageLogic _parent; + private ValueTask _vt; + private readonly Action _continuation; + public UnsafeSlimHolder(PooledAwaitOutGraphStageLogic logic) + { + _parent = logic; + _continuation = ContinuationAction; + } + + public void SetContinuation(ValueTask vt, bool configureAwait = true) + { + if (configureAwait) + { + _vt = vt; + } + else + { + var whyMSFTwhy = _vt.ConfigureAwait(false); + _vt = Unsafe.As, ValueTask>( + ref whyMSFTwhy); + } + _vt.GetAwaiter().OnCompleted(_continuation); + } + private void ContinuationAction() + { + var vt = _vt; + _vt = default; + _parent.RunIfSame(this, vt); + } +} \ No newline at end of file diff --git a/src/core/Akka.Streams/Util/TaskHelperExts.cs b/src/core/Akka.Streams/Util/TaskHelperExts.cs new file mode 100644 index 00000000000..3e710155972 --- /dev/null +++ b/src/core/Akka.Streams/Util/TaskHelperExts.cs @@ -0,0 +1,33 @@ +// ----------------------------------------------------------------------- +// +// Copyright (C) 2009-2024 Lightbend Inc. +// Copyright (C) 2013-2024 .NET Foundation +// +// ----------------------------------------------------------------------- + +using System.Threading.Tasks; + +namespace Akka.Streams.Util; + +/// +/// Internal API. +/// Extension Helper Sugar for +/// -> Conversions. +/// +internal static class TaskHelperExts +{ + public static ValueTask ToValueTask(this Task task) + { + return new ValueTask(task); + } + + /// + /// Converts a into a + /// If you want , + /// Call with an explicit Type parameter. + /// + public static ValueTask ToValueTask(this Task task) + { + return new ValueTask(task); + } +} \ No newline at end of file diff --git a/src/core/Akka/Akka.csproj b/src/core/Akka/Akka.csproj index 72c50e2f71d..53ae8898b54 100644 --- a/src/core/Akka/Akka.csproj +++ b/src/core/Akka/Akka.csproj @@ -21,6 +21,9 @@ + + +