diff --git a/docs/articles/streams/builtinstages.md b/docs/articles/streams/builtinstages.md
index d8fbf4e5e41..17d0779c17d 100644
--- a/docs/articles/streams/builtinstages.md
+++ b/docs/articles/streams/builtinstages.md
@@ -113,6 +113,14 @@ Can be used to implement many stateful sources without having to touch the more
**completes** when the task returned by the unfold function completes with an null value
+### UnfoldValueTaskAsync
+
+Just like ``UnfoldAsync``, but the fold function returns a ``ValueTask``, with internal pooling to minimize allocation and improve latency.
+
+**emits** when there is demand and unfold state returned task completes with not null value
+
+**completes** when the task returned by the unfold function completes with an null value
+
### Empty
Complete right away without ever emitting any elements. Useful when you have to provide a source to
@@ -196,6 +204,14 @@ Functions return ``Task`` to achieve asynchronous processing
**completes** when ``Task`` from read function returns ``None``
+### UnfoldResourceValueTaskAsync
+
+Like ``UnfoldResourceAsync`` but takes ``ValueTask`` Functions instead, with amortization of allocations for the main read stage.
+
+**emits** when there is demand and ``ValueTask`` from read function returns value
+
+**completes** when ``ValueTask`` from read function returns ``None``
+
### Queue
Materialize a ``SourceQueue`` onto which elements can be pushed for emitting from the source. The queue contains
@@ -846,6 +862,16 @@ If a Task fails, the stream also fails (unless a different supervision strategy
**completes** when upstream completes and all tasks has been completed and all elements has been emitted
+### SelectValueTaskAsync
+
+Version of ``SelectAsync`` that is optimized for ValueTask returns. Prefer this over ``SelectAsync`` if your work may be synchronus or is primarily waiting on ``ValueTask``
+
+**emits** when the ``ValueTask`` returned by the provided function finishes for the next element in sequence
+
+**backpressures** when the number of tasks reaches the configured parallelism and the downstream backpressures
+
+**completes** when upstream completes and all tasks has been completed and all elements has been emitted
+
### SelectAsyncUnordered
Like ``SelectAsync`` but ``Task`` results are passed downstream as they arrive regardless of the order of the elements
diff --git a/src/Directory.Build.props b/src/Directory.Build.props
index 8c63af3d49b..09089f4eb0c 100644
--- a/src/Directory.Build.props
+++ b/src/Directory.Build.props
@@ -22,6 +22,7 @@
2.5.3
17.9.0
0.12.2
+ 4.5.4
[13.0.1,)
2.0.1
3.26.0
diff --git a/src/benchmark/Akka.Benchmarks/Streams/SelectAsyncBenchmarks.cs b/src/benchmark/Akka.Benchmarks/Streams/SelectAsyncBenchmarks.cs
new file mode 100644
index 00000000000..47f6dd3f5d8
--- /dev/null
+++ b/src/benchmark/Akka.Benchmarks/Streams/SelectAsyncBenchmarks.cs
@@ -0,0 +1,189 @@
+// //-----------------------------------------------------------------------
+// //
+// // Copyright (C) 2009-2024 Lightbend Inc.
+// // Copyright (C) 2013-2024 .NET Foundation
+// //
+// //-----------------------------------------------------------------------
+
+using System.Threading.Channels;
+using System.Threading.Tasks;
+using Akka.Actor;
+using Akka.Benchmarks.Configurations;
+using Akka.Streams;
+using Akka.Streams.Dsl;
+using BenchmarkDotNet.Attributes;
+
+namespace Akka.Benchmarks.Streams;
+
+[Config(typeof(MicroBenchmarkConfig))]
+public class SelectAsyncBenchmarks
+{
+ public struct IntOrCompletion
+ {
+ public readonly int IntValue;
+ public readonly TaskCompletionSource? Completion;
+
+ public IntOrCompletion(int intValue, TaskCompletionSource? completion)
+ {
+ IntValue = intValue;
+ Completion = completion;
+ }
+ }
+ private ActorSystem system;
+ private ActorMaterializer materializer;
+
+ private IRunnableGraph simpleGraph;
+ private Task selectAsyncStub;
+ private Channel asyncCh;
+ private Task selectValueTaskAsyncStub;
+ private Channel vtAsyncCh;
+ private Task selectAsyncSyncStub;
+ private Task selectAsyncValueTaskSyncStub;
+ private Channel asyncChSync;
+ private Channel vtAsyncChSync;
+
+ [GlobalSetup]
+ public void Setup()
+ {
+ system = ActorSystem.Create("system");
+ materializer = system.Materializer();
+ asyncCh = Channel.CreateUnbounded();
+
+ asyncChSync = Channel.CreateUnbounded();
+
+ vtAsyncChSync = Channel.CreateUnbounded();
+
+ selectAsyncSyncStub = Source.ChannelReader(asyncChSync.Reader)
+ .SelectAsync(4, a =>
+ {
+ if (a.Completion != null)
+ {
+ a.Completion.TrySetResult();
+ }
+ else
+ {
+ }
+
+ return Task.FromResult(NotUsed.Instance);
+ }).RunWith(Sink.Ignore(), materializer);
+
+ selectAsyncValueTaskSyncStub = Source.ChannelReader(vtAsyncChSync.Reader)
+ .SelectValueTaskAsync(4, a =>
+ {
+ if (a.Completion != null)
+ {
+ a.Completion.TrySetResult();
+ }
+ else
+ {
+ }
+
+ return ValueTask.FromResult(NotUsed.Instance);
+ }).RunWith(Sink.Ignore(), materializer);
+ selectAsyncStub = Source.ChannelReader(asyncCh.Reader)
+ .SelectAsync(4, async a =>
+ {
+ if (a.Completion != null)
+ {
+ a.Completion.TrySetResult();
+ }
+ else
+ {
+ //await Task.Yield();
+ await Task.Delay(0);
+ }
+
+ return NotUsed.Instance;
+ }).RunWith(Sink.Ignore(), materializer);
+ vtAsyncCh = Channel.CreateUnbounded();
+ int vta = 0;
+ selectValueTaskAsyncStub = Source.ChannelReader(vtAsyncCh.Reader)
+ .SelectValueTaskAsync(4, async a =>
+ {
+ if (a.Completion != null)
+ {
+ a.Completion.TrySetResult();
+ //return NotUsed.Instance;
+ }
+ else
+ {
+ //await Task.Yield();
+ await Task.Delay(0);
+ //return NotUsed.Instance;
+ //Console.WriteLine(++vta);
+ //return vta;
+ }
+
+ return NotUsed.Instance;
+ }).RunWith(Sink.Ignore(), materializer);
+ }
+
+ [GlobalCleanup]
+ public void Cleanup()
+ {
+ materializer.Dispose();
+ system.Dispose();
+ }
+
+ [Benchmark]
+ public async Task RunSelectAsync()
+ {
+ var completion = new TaskCompletionSource(TaskCreationOptions
+ .RunContinuationsAsynchronously);
+ for (int i = 0; i < 100; i++)
+ {
+ asyncCh.Writer.TryWrite(new IntOrCompletion(i, null));
+ }
+
+ asyncCh.Writer.TryWrite(new IntOrCompletion(0, completion));
+ await completion.Task;
+
+ }
+
+
+ [Benchmark]
+ public async Task RunSelectValueTaskAsync()
+ {
+ var completion = new TaskCompletionSource(TaskCreationOptions
+ .RunContinuationsAsynchronously);
+ for (int i = 0; i < 100; i++)
+ {
+ vtAsyncCh.Writer.TryWrite(new IntOrCompletion(i, null));
+ }
+
+ vtAsyncCh.Writer.TryWrite(new IntOrCompletion(0, completion));
+ await completion.Task;
+
+ }
+
+ [Benchmark]
+ public async Task RunSelectAsyncSync()
+ {
+ var completion = new TaskCompletionSource(TaskCreationOptions
+ .RunContinuationsAsynchronously);
+ for (int i = 0; i < 100; i++)
+ {
+ asyncChSync.Writer.TryWrite(new IntOrCompletion(i, null));
+ }
+
+ asyncChSync.Writer.TryWrite(new IntOrCompletion(0, completion));
+ await completion.Task;
+
+ }
+
+
+ [Benchmark]
+ public async Task RunSelectValueTaskAsyncSync()
+ {
+ var completion = new TaskCompletionSource(TaskCreationOptions
+ .RunContinuationsAsynchronously);
+ for (int i = 0; i < 100; i++)
+ {
+ vtAsyncChSync.Writer.TryWrite(new IntOrCompletion(i, null));
+ }
+
+ vtAsyncChSync.Writer.TryWrite(new IntOrCompletion(0, completion));
+ await completion.Task;
+
+ }
+}
\ No newline at end of file
diff --git a/src/benchmark/Akka.Benchmarks/Streams/UnfoldAsyncBenchmarks.cs b/src/benchmark/Akka.Benchmarks/Streams/UnfoldAsyncBenchmarks.cs
new file mode 100644
index 00000000000..9e086e7955e
--- /dev/null
+++ b/src/benchmark/Akka.Benchmarks/Streams/UnfoldAsyncBenchmarks.cs
@@ -0,0 +1,191 @@
+// //-----------------------------------------------------------------------
+// //
+// // Copyright (C) 2009-2024 Lightbend Inc.
+// // Copyright (C) 2013-2024 .NET Foundation
+// //
+// //-----------------------------------------------------------------------
+
+using System.Threading.Channels;
+using System.Threading.Tasks;
+using Akka.Actor;
+using Akka.Benchmarks.Configurations;
+using Akka.Streams;
+using Akka.Streams.Dsl;
+using BenchmarkDotNet.Attributes;
+
+namespace Akka.Benchmarks.Streams;
+
+[Config(typeof(MicroBenchmarkConfig))]
+public class UnfoldAsyncBenchmarks
+{
+ public struct IntOrCompletion
+ {
+ public readonly int IntValue;
+ public readonly TaskCompletionSource? Completion;
+
+ public IntOrCompletion(int intValue, TaskCompletionSource? completion)
+ {
+ IntValue = intValue;
+ Completion = completion;
+ }
+ }
+ private ActorSystem system;
+ private ActorMaterializer materializer;
+
+ private IRunnableGraph simpleGraph;
+ private Task selectAsyncStub;
+ private Channel asyncNoYieldCh;
+ private Task selectValueTaskAsyncStub;
+ private Channel vtAsyncCh;
+ private Task unfoldAsyncSyncStub;
+ private Task selectAsyncValueTaskSyncStub;
+ private Channel asyncYieldCh;
+ private Channel vtAsyncYieldCh;
+
+ [GlobalSetup]
+ public void Setup()
+ {
+ system = ActorSystem.Create("system");
+ materializer = system.Materializer();
+ asyncNoYieldCh = Channel.CreateUnbounded();
+
+ asyncYieldCh = Channel.CreateUnbounded();
+
+ vtAsyncYieldCh = Channel.CreateUnbounded();
+
+ unfoldAsyncSyncStub = Source.UnfoldAsync,int>(asyncYieldCh.Reader, async r =>
+ {
+ var i = await r.ReadAsync();
+ if (i.Completion != null)
+ {
+ i.Completion.TrySetResult();
+ return (r, -1);
+ }
+ else
+ {
+ return (r, i.IntValue);
+ }
+ })
+ .RunWith(Sink.Ignore(), materializer);
+
+ selectAsyncValueTaskSyncStub = Source.UnfoldValueTaskAsync,int>(vtAsyncYieldCh.Reader, async r =>
+ {
+ var i = await r.ReadAsync();
+ if (i.Completion != null)
+ {
+ i.Completion.TrySetResult();
+ return (r, -1);
+ }
+ else
+ {
+ return (r, i.IntValue);
+ }
+ })
+ .RunWith(Sink.Ignore(), materializer);
+ selectAsyncStub = Source.UnfoldAsync,int>(asyncNoYieldCh.Reader,async r =>
+ {
+ await Task.Yield();
+ var a = await r.ReadAsync();
+ if (a.Completion != null)
+ {
+ a.Completion.TrySetResult();
+ return (r, -1);
+ }
+ else
+ {
+ //await Task.Yield();
+ // await Task.Delay(0);
+ return (r, a.IntValue);
+ }
+ }).RunWith(Sink.Ignore(), materializer);
+ vtAsyncCh = Channel.CreateUnbounded();
+ int vta = 0;
+ selectValueTaskAsyncStub = Source.UnfoldValueTaskAsync,int>(vtAsyncCh.Reader,async r =>
+ {
+ await Task.Yield();
+ var a = await r.ReadAsync();
+ if (a.Completion != null)
+ {
+ a.Completion.TrySetResult();
+ return (r, -1);
+ }
+ else
+ {
+ //await Task.Yield();
+ //await Task.Delay(0);
+ return (r, a.IntValue);
+ }
+ }).RunWith(Sink.Ignore(), materializer);
+ }
+
+ [GlobalCleanup]
+ public void Cleanup()
+ {
+ materializer.Dispose();
+ system.Dispose();
+ }
+
+ [Benchmark]
+ public async Task UnfoldAsyncYieldInConsume()
+ {
+ var completion = new TaskCompletionSource(TaskCreationOptions
+ .RunContinuationsAsynchronously);
+ for (int i = 0; i < 100; i++)
+ {
+ asyncNoYieldCh.Writer.TryWrite(new IntOrCompletion(i, null));
+ }
+
+ asyncNoYieldCh.Writer.TryWrite(new IntOrCompletion(0, completion));
+ await completion.Task;
+
+ }
+
+
+ [Benchmark]
+ public async Task UnfoldValueTaskAsyncYieldInConsume()
+ {
+ var completion = new TaskCompletionSource(TaskCreationOptions
+ .RunContinuationsAsynchronously);
+ for (int i = 0; i < 100; i++)
+ {
+ vtAsyncCh.Writer.TryWrite(new IntOrCompletion(i, null));
+ }
+
+ vtAsyncCh.Writer.TryWrite(new IntOrCompletion(0, completion));
+ await completion.Task;
+
+ }
+
+ [Benchmark]
+ public async Task UnfoldAsyncYieldInPush()
+ {
+ var completion = new TaskCompletionSource(TaskCreationOptions
+ .RunContinuationsAsynchronously);
+ for (int i = 0; i < 100; i++)
+ {
+ asyncYieldCh.Writer.TryWrite(new IntOrCompletion(i, null));
+ await Task.Yield();
+ }
+
+ asyncYieldCh.Writer.TryWrite(new IntOrCompletion(0, completion));
+ await completion.Task;
+
+ }
+
+
+ [Benchmark]
+ public async Task UnfoldValueTaskAsyncYieldInPush()
+ {
+ var completion = new TaskCompletionSource(TaskCreationOptions
+ .RunContinuationsAsynchronously);
+ for (int i = 0; i < 100; i++)
+ {
+ vtAsyncYieldCh.Writer.TryWrite(new IntOrCompletion(i, null));
+ await Task.Yield();
+ }
+
+ vtAsyncYieldCh.Writer.TryWrite(new IntOrCompletion(0, completion));
+ await completion.Task;
+
+ }
+}
\ No newline at end of file
diff --git a/src/benchmark/Akka.Benchmarks/Streams/UnfoldResourceAsyncBenchmarks.cs b/src/benchmark/Akka.Benchmarks/Streams/UnfoldResourceAsyncBenchmarks.cs
new file mode 100644
index 00000000000..9bc207df999
--- /dev/null
+++ b/src/benchmark/Akka.Benchmarks/Streams/UnfoldResourceAsyncBenchmarks.cs
@@ -0,0 +1,302 @@
+// -----------------------------------------------------------------------
+//
+// Copyright (C) 2009-2024 Lightbend Inc.
+// Copyright (C) 2013-2024 .NET Foundation
+//
+// -----------------------------------------------------------------------
+
+using System.Collections.Generic;
+using System.Threading;
+using System.Threading.Channels;
+using System.Threading.Tasks;
+using Akka.Actor;
+using Akka.Benchmarks.Configurations;
+using Akka.Streams;
+using Akka.Streams.Dsl;
+using Akka.Streams.Implementation.Fusing;
+using BenchmarkDotNet.Attributes;
+
+namespace Akka.Benchmarks.Streams;
+
+[Config(typeof(MicroBenchmarkConfig))]
+public class UnfoldResourceAsyncBenchmarks
+{
+
+ public struct IntOrCompletion
+ {
+ public readonly int IntValue;
+ public readonly TaskCompletionSource? Completion;
+
+ public IntOrCompletion(int intValue, TaskCompletionSource? completion)
+ {
+ IntValue = intValue;
+ Completion = completion;
+ }
+ }
+ private ActorSystem system;
+ private ActorMaterializer materializer;
+
+ private IRunnableGraph simpleGraph;
+ private Task selectAsyncStub;
+ private Channel asyncNoYieldCh;
+ private Task selectValueTaskAsyncStub;
+ private Channel vtAsyncCh;
+ private Task unfoldAsyncSyncStub;
+ private Task selectAsyncValueTaskSyncStub;
+ private Channel asyncYieldCh;
+ private Channel vtAsyncYieldCh;
+ private Channel straightCh;
+ private Task straightTask;
+ private CancellationTokenSource straightChTokenSource;
+ private Channel straightYieldCh;
+ private Task straightYieldTask;
+
+ [GlobalSetup]
+ public void Setup()
+ {
+ system = ActorSystem.Create("system");
+ materializer = system.Materializer();
+ asyncNoYieldCh = Channel.CreateUnbounded();
+
+ asyncYieldCh = Channel.CreateUnbounded();
+
+ vtAsyncYieldCh = Channel.CreateUnbounded();
+
+ unfoldAsyncSyncStub = Source.UnfoldResourceAsync>(()=> Task.FromResult(asyncYieldCh.Reader), async r =>
+ {
+ var i = await r.ReadAsync();
+ if (i.Completion != null)
+ {
+ i.Completion.TrySetResult();
+ return -1;
+ }
+ else
+ {
+ return i.IntValue;
+ }
+ }, (r)=> Task.FromResult(Done.Instance))
+ .RunWith(Sink.Ignore(), materializer);
+
+ selectAsyncValueTaskSyncStub = Source.UnfoldResourceValueTaskAsync,int,ChannelReader>(vtAsyncYieldCh.Reader,(r)=>new ValueTask>(r), async r =>
+ {
+ var i = await r.ReadAsync();
+ if (i.Completion != null)
+ {
+ i.Completion.TrySetResult();
+ return -1;
+ }
+ else
+ {
+ return i.IntValue;
+ }
+ }, (r)=> ValueTask.CompletedTask)
+ .RunWith(Sink.Ignore(), materializer);
+ selectAsyncStub = Source.UnfoldResourceAsync>(()=>Task.FromResult(asyncNoYieldCh.Reader),async r =>
+ {
+ await Task.Yield();
+ var a = await r.ReadAsync();
+ if (a.Completion != null)
+ {
+ a.Completion.TrySetResult();
+ return -1;
+ }
+ else
+ {
+ //await Task.Yield();
+ // await Task.Delay(0);
+ return a.IntValue;
+ }
+ }, (r)=> Task.FromResult(Done.Instance) ).RunWith(Sink.Ignore(), materializer);
+ vtAsyncCh = Channel.CreateUnbounded();
+
+
+ int vta = 0;
+ selectValueTaskAsyncStub = Source
+ .UnfoldResourceValueTaskAsync, int,
+ ChannelReader>(vtAsyncCh.Reader,
+ r => new ValueTask>(r),
+ async r =>
+ {
+ await Task.Yield();
+ var a = await r.ReadAsync();
+ if (a.Completion != null)
+ {
+ a.Completion.TrySetResult();
+ return -1;
+ }
+ else
+ {
+ //await Task.Yield();
+ //await Task.Delay(0);
+ return a.IntValue;
+ }
+ }, (r) => ValueTask.CompletedTask)
+ .RunWith(Sink.Ignore(), materializer);
+ straightChTokenSource = new CancellationTokenSource();
+ straightCh = Channel.CreateUnbounded();
+
+
+ straightTask = Task.Run(async () =>
+ {
+ static async IAsyncEnumerable GetEnumerator(
+ ChannelReader reader, CancellationToken token)
+ {
+ while (token.IsCancellationRequested == false)
+ {
+ await Task.Yield();
+ var a = await reader.ReadAsync();
+ if (a.Completion != null)
+ {
+ a.Completion.TrySetResult();
+ yield return -1;
+ }
+ else
+ {
+ //await Task.Yield();
+ //await Task.Delay(0);
+ yield return a.IntValue;
+ }
+ }
+ }
+ var r = straightCh.Reader;
+ await foreach (var v in GetEnumerator(r,straightChTokenSource.Token))
+ {
+
+ }
+ });
+
+ straightYieldCh = Channel.CreateUnbounded();
+
+
+ straightYieldTask = Task.Run(async () =>
+ {
+ static async IAsyncEnumerable GetEnumerator(
+ ChannelReader reader, CancellationToken token)
+ {
+ while (token.IsCancellationRequested == false)
+ {
+ var a = await reader.ReadAsync();
+ if (a.Completion != null)
+ {
+ a.Completion.TrySetResult();
+ yield return -1;
+ }
+ else
+ {
+ //await Task.Yield();
+ //await Task.Delay(0);
+ yield return a.IntValue;
+ }
+ }
+ }
+ var r = straightYieldCh.Reader;
+ await foreach (var v in GetEnumerator(r,straightChTokenSource.Token))
+ {
+
+ }
+ });
+ }
+
+ [GlobalCleanup]
+ public void Cleanup()
+ {
+ materializer.Dispose();
+ system.Dispose();
+ straightChTokenSource.Cancel();
+ }
+
+ [Benchmark]
+ public async Task UnfoldResourceAsyncNoYield()
+ {
+ var completion = new TaskCompletionSource(TaskCreationOptions
+ .RunContinuationsAsynchronously);
+ for (int i = 0; i < 100; i++)
+ {
+ asyncNoYieldCh.Writer.TryWrite(new IntOrCompletion(i, null));
+ }
+
+ asyncNoYieldCh.Writer.TryWrite(new IntOrCompletion(0, completion));
+ await completion.Task;
+
+ }
+
+
+ [Benchmark]
+ public async Task UnfoldResourceValueTaskAsyncNoYield()
+ {
+ var completion = new TaskCompletionSource(TaskCreationOptions
+ .RunContinuationsAsynchronously);
+ for (int i = 0; i < 100; i++)
+ {
+ vtAsyncCh.Writer.TryWrite(new IntOrCompletion(i, null));
+ }
+
+ vtAsyncCh.Writer.TryWrite(new IntOrCompletion(0, completion));
+ await completion.Task;
+
+ }
+
+ [Benchmark]
+ public async Task UnfoldResourceAsyncWithYield()
+ {
+ var completion = new TaskCompletionSource(TaskCreationOptions
+ .RunContinuationsAsynchronously);
+ for (int i = 0; i < 100; i++)
+ {
+ asyncYieldCh.Writer.TryWrite(new IntOrCompletion(i, null));
+ await Task.Yield();
+ }
+
+ asyncYieldCh.Writer.TryWrite(new IntOrCompletion(0, completion));
+ await completion.Task;
+
+ }
+
+
+ [Benchmark]
+ public async Task UnfoldResourceValueTaskAsyncWithYield()
+ {
+ var completion = new TaskCompletionSource(TaskCreationOptions
+ .RunContinuationsAsynchronously);
+ for (int i = 0; i < 100; i++)
+ {
+ vtAsyncYieldCh.Writer.TryWrite(new IntOrCompletion(i, null));
+ await Task.Yield();
+ }
+
+ vtAsyncYieldCh.Writer.TryWrite(new IntOrCompletion(0, completion));
+ await completion.Task;
+
+ }
+
+ [Benchmark]
+ public async Task StraightChannelReadNoYield()
+ {
+ var completion = new TaskCompletionSource(TaskCreationOptions
+ .RunContinuationsAsynchronously);
+ for (int i = 0; i < 100; i++)
+ {
+ straightCh.Writer.TryWrite(new IntOrCompletion(i, null));
+ }
+
+ straightCh.Writer.TryWrite(new IntOrCompletion(0, completion));
+ await completion.Task;
+
+ }
+
+ [Benchmark]
+ public async Task StraightChannelReadWithYield()
+ {
+ var completion = new TaskCompletionSource(TaskCreationOptions
+ .RunContinuationsAsynchronously);
+ for (int i = 0; i < 100; i++)
+ {
+ straightYieldCh.Writer.TryWrite(new IntOrCompletion(i, null));
+ await Task.Yield();
+ }
+
+ straightYieldCh.Writer.TryWrite(new IntOrCompletion(0, completion));
+ await completion.Task;
+
+ }
+}
\ No newline at end of file
diff --git a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveStreams.DotNet.verified.txt b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveStreams.DotNet.verified.txt
index e17b26a58ce..c13b0dd70d0 100644
--- a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveStreams.DotNet.verified.txt
+++ b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveStreams.DotNet.verified.txt
@@ -2044,6 +2044,9 @@ namespace Akka.Streams.Dsl
public static Akka.Streams.Dsl.Source UnfoldInfinite(TState state, System.Func> unfold) { }
public static Akka.Streams.Dsl.Source UnfoldResource(System.Func create, System.Func> read, System.Action close) { }
public static Akka.Streams.Dsl.Source UnfoldResourceAsync(System.Func> create, System.Func>> read, System.Func> close) { }
+ public static Akka.Streams.Dsl.Source UnfoldResourceValueTaskAsync(System.Func> create, System.Func>> read, System.Func close) { }
+ public static Akka.Streams.Dsl.Source UnfoldResourceValueTaskAsync(TCreateState createState, System.Func> create, System.Func>> read, System.Func close) { }
+ public static Akka.Streams.Dsl.Source UnfoldValueTaskAsync(TState state, System.Func>>> unfoldAsync) { }
public static Akka.Streams.Dsl.Source, Akka.NotUsed> ZipN(System.Collections.Generic.IEnumerable> sources) { }
public static Akka.Streams.Dsl.Source ZipWithN(System.Func, TOut2> zipper, System.Collections.Generic.IEnumerable> sources) { }
}
@@ -2126,6 +2129,7 @@ namespace Akka.Streams.Dsl
public static Akka.Streams.Dsl.Source SelectAsyncUnordered(this Akka.Streams.Dsl.Source flow, int parallelism, System.Func> asyncMapper) { }
public static Akka.Streams.Dsl.Source SelectError(this Akka.Streams.Dsl.Source flow, System.Func selector) { }
public static Akka.Streams.Dsl.Source SelectMany(this Akka.Streams.Dsl.Source flow, System.Func> mapConcater) { }
+ public static Akka.Streams.Dsl.Source SelectValueTaskAsync(this Akka.Streams.Dsl.Source flow, int parallelism, System.Func> asyncMapper) { }
public static Akka.Streams.Dsl.Source Skip(this Akka.Streams.Dsl.Source flow, long n) { }
public static Akka.Streams.Dsl.Source SkipWhile(this Akka.Streams.Dsl.Source flow, System.Predicate predicate) { }
public static Akka.Streams.Dsl.Source SkipWithin(this Akka.Streams.Dsl.Source flow, System.TimeSpan duration) { }
@@ -3881,6 +3885,16 @@ namespace Akka.Streams.Implementation
public override string ToString() { }
}
[Akka.Annotations.InternalApiAttribute()]
+ public sealed class UnfoldResourceSourceValueTaskAsync : Akka.Streams.Stage.GraphStage>
+ {
+ public UnfoldResourceSourceValueTaskAsync(TCreateState createState, System.Func> create, System.Func>> readData, System.Func close) { }
+ protected override Akka.Streams.Attributes InitialAttributes { get; }
+ public Akka.Streams.Outlet Out { get; }
+ public override Akka.Streams.SourceShape Shape { get; }
+ protected override Akka.Streams.Stage.GraphStageLogic CreateLogic(Akka.Streams.Attributes inheritedAttributes) { }
+ public override string ToString() { }
+ }
+ [Akka.Annotations.InternalApiAttribute()]
public sealed class UnfoldResourceSource : Akka.Streams.Stage.GraphStage>
{
public UnfoldResourceSource(System.Func create, System.Func> readData, System.Action close) { }
@@ -3891,6 +3905,16 @@ namespace Akka.Streams.Implementation
public override string ToString() { }
}
[Akka.Annotations.InternalApiAttribute()]
+ public class UnfoldValueTaskAsync : Akka.Streams.Stage.GraphStage>
+ {
+ public readonly Akka.Streams.Outlet Out;
+ public readonly TState State;
+ public readonly System.Func>>> UnfoldFunc;
+ public UnfoldValueTaskAsync(TState state, System.Func>>> unfoldFunc) { }
+ public override Akka.Streams.SourceShape Shape { get; }
+ protected override Akka.Streams.Stage.GraphStageLogic CreateLogic(Akka.Streams.Attributes inheritedAttributes) { }
+ }
+ [Akka.Annotations.InternalApiAttribute()]
public class Unfold : Akka.Streams.Stage.GraphStage>
{
public readonly Akka.Streams.Outlet Out;
@@ -4369,6 +4393,11 @@ namespace Akka.Streams.Implementation.Fusing
public void SetValue(T value) { }
public override string ToString() { }
}
+ public sealed class NotYetThereSentinel : System.Exception
+ {
+ public static readonly Akka.Streams.Implementation.Fusing.NotYetThereSentinel Instance;
+ public NotYetThereSentinel() { }
+ }
[Akka.Annotations.InternalApiAttribute()]
public sealed class OnCompleted : Akka.Streams.Stage.GraphStage>
{
@@ -4443,6 +4472,16 @@ namespace Akka.Streams.Implementation.Fusing
protected override Akka.Streams.Stage.GraphStageLogic CreateLogic(Akka.Streams.Attributes inheritedAttributes) { }
}
[Akka.Annotations.InternalApiAttribute()]
+ public sealed class SelectValueTaskAsync : Akka.Streams.Stage.GraphStage>
+ {
+ public readonly Akka.Streams.Inlet In;
+ public readonly Akka.Streams.Outlet Out;
+ public SelectValueTaskAsync(int parallelism, System.Func> mapFunc) { }
+ protected override Akka.Streams.Attributes InitialAttributes { get; }
+ public override Akka.Streams.FlowShape Shape { get; }
+ protected override Akka.Streams.Stage.GraphStageLogic CreateLogic(Akka.Streams.Attributes inheritedAttributes) { }
+ }
+ [Akka.Annotations.InternalApiAttribute()]
public sealed class Select : Akka.Streams.Stage.GraphStage>
{
public Select(System.Func func) { }
@@ -4501,6 +4540,19 @@ namespace Akka.Streams.Implementation.Fusing
protected override Akka.Streams.Stage.GraphStageLogic CreateLogic(Akka.Streams.Attributes inheritedAttributes) { }
public override string ToString() { }
}
+ [System.Runtime.CompilerServices.IsReadOnlyAttribute()]
+ public struct SlimResult
+ {
+ public readonly System.Exception Error;
+ public static readonly Akka.Streams.Implementation.Fusing.SlimResult NotYetReady;
+ public readonly T Result;
+ public SlimResult(System.Exception errorOrSentinel, T result) { }
+ public static Akka.Streams.Implementation.Fusing.SlimResult ForError(System.Exception errorOrSentinel) { }
+ public static Akka.Streams.Implementation.Fusing.SlimResult ForSuccess(T result) { }
+ public static Akka.Streams.Implementation.Fusing.SlimResult FromTask(System.Threading.Tasks.Task task) { }
+ public bool IsDone() { }
+ public bool IsSuccess() { }
+ }
[Akka.Annotations.InternalApiAttribute()]
public sealed class StatefulSelectMany : Akka.Streams.Stage.GraphStage>
{
@@ -4705,6 +4757,8 @@ namespace Akka.Streams.Implementation.Stages
public static readonly Akka.Streams.Attributes UnfoldInf;
public static readonly Akka.Streams.Attributes UnfoldResourceSource;
public static readonly Akka.Streams.Attributes UnfoldResourceSourceAsync;
+ public static readonly Akka.Streams.Attributes UnfoldResourceSourceValueTaskAsync;
+ public static readonly Akka.Streams.Attributes UnfoldValueTaskAsync;
public static readonly Akka.Streams.Attributes Unzip;
public static readonly Akka.Streams.Attributes Watch;
public static readonly Akka.Streams.Attributes Where;
diff --git a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveStreams.Net.verified.txt b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveStreams.Net.verified.txt
index 107bc594f50..c04531d75eb 100644
--- a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveStreams.Net.verified.txt
+++ b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveStreams.Net.verified.txt
@@ -2043,6 +2043,9 @@ namespace Akka.Streams.Dsl
public static Akka.Streams.Dsl.Source UnfoldInfinite(TState state, System.Func> unfold) { }
public static Akka.Streams.Dsl.Source UnfoldResource(System.Func create, System.Func> read, System.Action close) { }
public static Akka.Streams.Dsl.Source UnfoldResourceAsync(System.Func> create, System.Func>> read, System.Func> close) { }
+ public static Akka.Streams.Dsl.Source UnfoldResourceValueTaskAsync(System.Func> create, System.Func>> read, System.Func close) { }
+ public static Akka.Streams.Dsl.Source UnfoldResourceValueTaskAsync(TCreateState createState, System.Func> create, System.Func>> read, System.Func close) { }
+ public static Akka.Streams.Dsl.Source UnfoldValueTaskAsync(TState state, System.Func>>> unfoldAsync) { }
public static Akka.Streams.Dsl.Source, Akka.NotUsed> ZipN(System.Collections.Generic.IEnumerable> sources) { }
public static Akka.Streams.Dsl.Source ZipWithN(System.Func, TOut2> zipper, System.Collections.Generic.IEnumerable> sources) { }
}
@@ -2125,6 +2128,7 @@ namespace Akka.Streams.Dsl
public static Akka.Streams.Dsl.Source SelectAsyncUnordered(this Akka.Streams.Dsl.Source flow, int parallelism, System.Func> asyncMapper) { }
public static Akka.Streams.Dsl.Source SelectError(this Akka.Streams.Dsl.Source flow, System.Func selector) { }
public static Akka.Streams.Dsl.Source SelectMany(this Akka.Streams.Dsl.Source flow, System.Func> mapConcater) { }
+ public static Akka.Streams.Dsl.Source SelectValueTaskAsync(this Akka.Streams.Dsl.Source flow, int parallelism, System.Func> asyncMapper) { }
public static Akka.Streams.Dsl.Source Skip(this Akka.Streams.Dsl.Source flow, long n) { }
public static Akka.Streams.Dsl.Source SkipWhile(this Akka.Streams.Dsl.Source flow, System.Predicate predicate) { }
public static Akka.Streams.Dsl.Source SkipWithin(this Akka.Streams.Dsl.Source flow, System.TimeSpan duration) { }
@@ -3866,6 +3870,16 @@ namespace Akka.Streams.Implementation
public override string ToString() { }
}
[Akka.Annotations.InternalApiAttribute()]
+ public sealed class UnfoldResourceSourceValueTaskAsync : Akka.Streams.Stage.GraphStage>
+ {
+ public UnfoldResourceSourceValueTaskAsync(TCreateState createState, System.Func> create, System.Func>> readData, System.Func close) { }
+ protected override Akka.Streams.Attributes InitialAttributes { get; }
+ public Akka.Streams.Outlet Out { get; }
+ public override Akka.Streams.SourceShape Shape { get; }
+ protected override Akka.Streams.Stage.GraphStageLogic CreateLogic(Akka.Streams.Attributes inheritedAttributes) { }
+ public override string ToString() { }
+ }
+ [Akka.Annotations.InternalApiAttribute()]
public sealed class UnfoldResourceSource : Akka.Streams.Stage.GraphStage>
{
public UnfoldResourceSource(System.Func create, System.Func> readData, System.Action close) { }
@@ -3876,6 +3890,16 @@ namespace Akka.Streams.Implementation
public override string ToString() { }
}
[Akka.Annotations.InternalApiAttribute()]
+ public class UnfoldValueTaskAsync : Akka.Streams.Stage.GraphStage>
+ {
+ public readonly Akka.Streams.Outlet Out;
+ public readonly TState State;
+ public readonly System.Func>>> UnfoldFunc;
+ public UnfoldValueTaskAsync(TState state, System.Func>>> unfoldFunc) { }
+ public override Akka.Streams.SourceShape Shape { get; }
+ protected override Akka.Streams.Stage.GraphStageLogic CreateLogic(Akka.Streams.Attributes inheritedAttributes) { }
+ }
+ [Akka.Annotations.InternalApiAttribute()]
public class Unfold : Akka.Streams.Stage.GraphStage>
{
public readonly Akka.Streams.Outlet Out;
@@ -4343,6 +4367,11 @@ namespace Akka.Streams.Implementation.Fusing
public void SetValue(T value) { }
public override string ToString() { }
}
+ public sealed class NotYetThereSentinel : System.Exception
+ {
+ public static readonly Akka.Streams.Implementation.Fusing.NotYetThereSentinel Instance;
+ public NotYetThereSentinel() { }
+ }
[Akka.Annotations.InternalApiAttribute()]
public sealed class OnCompleted : Akka.Streams.Stage.GraphStage>
{
@@ -4417,6 +4446,16 @@ namespace Akka.Streams.Implementation.Fusing
protected override Akka.Streams.Stage.GraphStageLogic CreateLogic(Akka.Streams.Attributes inheritedAttributes) { }
}
[Akka.Annotations.InternalApiAttribute()]
+ public sealed class SelectValueTaskAsync : Akka.Streams.Stage.GraphStage>
+ {
+ public readonly Akka.Streams.Inlet