From 0471a582ad47d17c5590ab82e8fe6077f20e4953 Mon Sep 17 00:00:00 2001 From: Andrew El Date: Tue, 20 Feb 2024 20:47:18 -0500 Subject: [PATCH] Consolidate and simplify pooling for simple cases. Probably many opportunities still... --- .../Streams/UnfoldAsyncBenchmarks.cs | 191 ++++++++++++++++++ src/core/Akka.Streams/Dsl/Source.cs | 3 + .../Akka.Streams/Implementation/Fusing/Ops.cs | 152 ++++---------- .../Implementation/Fusing/SlimResult.cs | 49 +++++ .../PooledAwaitOutGraphStageLogic.cs | 92 +++++++++ .../Akka.Streams/Implementation/Sources.cs | 38 ++-- .../Implementation/Stages/Stages.cs | 1 + .../Akka.Streams/Implementation/Unfold.cs | 76 ++++--- .../Implementation/UnsafeSlimHolder.cs | 54 +++++ 9 files changed, 473 insertions(+), 183 deletions(-) create mode 100644 src/benchmark/Akka.Benchmarks/Streams/UnfoldAsyncBenchmarks.cs create mode 100644 src/core/Akka.Streams/Implementation/Fusing/SlimResult.cs create mode 100644 src/core/Akka.Streams/Implementation/PooledAwaitOutGraphStageLogic.cs create mode 100644 src/core/Akka.Streams/Implementation/UnsafeSlimHolder.cs diff --git a/src/benchmark/Akka.Benchmarks/Streams/UnfoldAsyncBenchmarks.cs b/src/benchmark/Akka.Benchmarks/Streams/UnfoldAsyncBenchmarks.cs new file mode 100644 index 00000000000..3015395ad16 --- /dev/null +++ b/src/benchmark/Akka.Benchmarks/Streams/UnfoldAsyncBenchmarks.cs @@ -0,0 +1,191 @@ +// //----------------------------------------------------------------------- +// // +// // Copyright (C) 2009-2024 Lightbend Inc. +// // Copyright (C) 2013-2024 .NET Foundation +// // +// //----------------------------------------------------------------------- + +using System.Threading.Channels; +using System.Threading.Tasks; +using Akka.Actor; +using Akka.Benchmarks.Configurations; +using Akka.Streams; +using Akka.Streams.Dsl; +using BenchmarkDotNet.Attributes; + +namespace Akka.Benchmarks.Streams; + +[Config(typeof(MicroBenchmarkConfig))] +public class UnfoldAsyncBenchmarks +{ + public struct IntOrCompletion + { + public readonly int IntValue; + public readonly TaskCompletionSource? Completion; + + public IntOrCompletion(int intValue, TaskCompletionSource? completion) + { + IntValue = intValue; + Completion = completion; + } + } + private ActorSystem system; + private ActorMaterializer materializer; + + private IRunnableGraph simpleGraph; + private Task selectAsyncStub; + private Channel asyncNoYieldCh; + private Task selectValueTaskAsyncStub; + private Channel vtAsyncCh; + private Task unfoldAsyncSyncStub; + private Task selectAsyncValueTaskSyncStub; + private Channel asyncYieldCh; + private Channel vtAsyncYieldCh; + + [GlobalSetup] + public void Setup() + { + system = ActorSystem.Create("system"); + materializer = system.Materializer(); + asyncNoYieldCh = Channel.CreateUnbounded(); + + asyncYieldCh = Channel.CreateUnbounded(); + + vtAsyncYieldCh = Channel.CreateUnbounded(); + + unfoldAsyncSyncStub = Source.UnfoldAsync,int>(asyncYieldCh.Reader, async r => + { + var i = await r.ReadAsync(); + if (i.Completion != null) + { + i.Completion.TrySetResult(); + return (r, -1); + } + else + { + return (r, i.IntValue); + } + }) + .RunWith(Sink.Ignore(), materializer); + + selectAsyncValueTaskSyncStub = Source.UnfoldValueTaskAsync,int>(vtAsyncYieldCh.Reader, async r => + { + var i = await r.ReadAsync(); + if (i.Completion != null) + { + i.Completion.TrySetResult(); + return (r, -1); + } + else + { + return (r, i.IntValue); + } + }) + .RunWith(Sink.Ignore(), materializer); + selectAsyncStub = Source.UnfoldAsync,int>(asyncNoYieldCh.Reader,async r => + { + await Task.Yield(); + var a = await r.ReadAsync(); + if (a.Completion != null) + { + a.Completion.TrySetResult(); + return (r, -1); + } + else + { + //await Task.Yield(); + // await Task.Delay(0); + return (r, a.IntValue); + } + }).RunWith(Sink.Ignore(), materializer); + vtAsyncCh = Channel.CreateUnbounded(); + int vta = 0; + selectValueTaskAsyncStub = Source.UnfoldValueTaskAsync,int>(vtAsyncCh.Reader,async r => + { + await Task.Yield(); + var a = await r.ReadAsync(); + if (a.Completion != null) + { + a.Completion.TrySetResult(); + return (r, -1); + } + else + { + //await Task.Yield(); + //await Task.Delay(0); + return (r, a.IntValue); + } + }).RunWith(Sink.Ignore(), materializer); + } + + [GlobalCleanup] + public void Cleanup() + { + materializer.Dispose(); + system.Dispose(); + } + + [Benchmark] + public async Task UnfoldAsyncNoYield() + { + var completion = new TaskCompletionSource(TaskCreationOptions + .RunContinuationsAsynchronously); + for (int i = 0; i < 100; i++) + { + asyncNoYieldCh.Writer.TryWrite(new IntOrCompletion(i, null)); + } + + asyncNoYieldCh.Writer.TryWrite(new IntOrCompletion(0, completion)); + await completion.Task; + + } + + + [Benchmark] + public async Task UnfoldValueTaskAsyncNoYield() + { + var completion = new TaskCompletionSource(TaskCreationOptions + .RunContinuationsAsynchronously); + for (int i = 0; i < 100; i++) + { + vtAsyncCh.Writer.TryWrite(new IntOrCompletion(i, null)); + } + + vtAsyncCh.Writer.TryWrite(new IntOrCompletion(0, completion)); + await completion.Task; + + } + + [Benchmark] + public async Task UnfoldAsyncWithYield() + { + var completion = new TaskCompletionSource(TaskCreationOptions + .RunContinuationsAsynchronously); + for (int i = 0; i < 100; i++) + { + asyncYieldCh.Writer.TryWrite(new IntOrCompletion(i, null)); + await Task.Delay(1); + } + + asyncYieldCh.Writer.TryWrite(new IntOrCompletion(0, completion)); + await completion.Task; + + } + + + [Benchmark] + public async Task UnfoldValueTaskAsyncWithYield() + { + var completion = new TaskCompletionSource(TaskCreationOptions + .RunContinuationsAsynchronously); + for (int i = 0; i < 100; i++) + { + vtAsyncYieldCh.Writer.TryWrite(new IntOrCompletion(i, null)); + await Task.Delay(1); + } + + vtAsyncYieldCh.Writer.TryWrite(new IntOrCompletion(0, completion)); + await completion.Task; + + } +} \ No newline at end of file diff --git a/src/core/Akka.Streams/Dsl/Source.cs b/src/core/Akka.Streams/Dsl/Source.cs index 8462f97af7f..b0802128bfc 100644 --- a/src/core/Akka.Streams/Dsl/Source.cs +++ b/src/core/Akka.Streams/Dsl/Source.cs @@ -789,6 +789,9 @@ public static Source Unfold(TState state, Func UnfoldAsync(TState state, Func>> unfoldAsync) => FromGraph(new UnfoldAsync(state, unfoldAsync)).WithAttributes(DefaultAttributes.UnfoldAsync); + + public static Source UnfoldValueTaskAsync(TState state, Func>> unfoldAsync) + => FromGraph(new UnfoldValueTaskAsync(state, unfoldAsync)).WithAttributes(DefaultAttributes.UnfoldValueTaskAsync); /// /// Simpler , for infinite sequences. /// diff --git a/src/core/Akka.Streams/Implementation/Fusing/Ops.cs b/src/core/Akka.Streams/Implementation/Fusing/Ops.cs index ce464c2aa5c..e4392ed8737 100644 --- a/src/core/Akka.Streams/Implementation/Fusing/Ops.cs +++ b/src/core/Akka.Streams/Implementation/Fusing/Ops.cs @@ -9,6 +9,7 @@ using System.Collections.Concurrent; using System.Collections.Generic; using System.Collections.Immutable; +using System.Diagnostics; using System.Linq; using System.Runtime.CompilerServices; using System.Threading; @@ -2521,43 +2522,6 @@ public sealed class NotYetThereSentinel : Exception public static readonly NotYetThereSentinel Instance = new(); } - public readonly struct SlimResult - { - public readonly Exception Error; - public readonly T Result; - - public static readonly SlimResult NotYetReady = - new SlimResult(NotYetThereSentinel.Instance, default); - - public static SlimResult FromTask(Task task) - { - return task.IsCanceled || task.IsFaulted - ? new SlimResult(task.Exception, default) - : new SlimResult(default, task.Result); - } - public SlimResult(Exception errorOrSentinel, T result) - { - if (result == null) - { - Error = errorOrSentinel ?? ReactiveStreamsCompliance - .ElementMustNotBeNullException; - } - else - { - Result = result; - } - } - - public bool IsSuccess() - { - return Error == null; - } - - public bool IsDone() - { - return Error != NotYetThereSentinel.Instance; - } - } /// /// INTERNAL API /// @@ -2576,45 +2540,8 @@ private sealed class Holder public SlimResult Element { get; private set; } private readonly Action> _callback; private ValueTask _pending; - - /*private static readonly Action OnCompletedAction = - CompletionActionVt; - - private static readonly Action, object> - TaskCompletedAction = (Task t, object o) => - { - var ca = (Holder)o; - if (t.IsFaulted) - { - var exception = - t.Exception?.InnerExceptions != null && - t.Exception.InnerExceptions.Count == 1 - ? t.Exception.InnerExceptions[0] - : t.Exception; - ca.Invoke(Result.Failure(exception)); - } - else - { - ca.Invoke(Result.Success(t.Result)); - } - }; - */ - private readonly Action TaskCompletedAction; - /* - private static void CompletionActionVt(object discard) - { - var inst = (Holder)discard; - var vtCapture = inst._pending; - inst._pending = default; - if (vtCapture.IsCompletedSuccessfully) - { - inst.Invoke(Result.Success(vtCapture.Result)); - } - else if (vtCapture.IsCanceled == false) - { - inst.VTCompletionError(vtCapture); - } - }*/ + private readonly Action _taskCompletedAction; + private void VTCompletionError(ValueTask vtCapture) { @@ -2629,17 +2556,13 @@ private void VTCompletionError(ValueTask vtCapture) Invoke(new SlimResult(exception,default)); } - else - { - //Console.WriteLine("Unexpected condition, CompletionError without faulted!!!!"); - } } public Holder(SlimResult element, Action> callback) { _callback = callback; Element = element; - TaskCompletedAction = () => + _taskCompletedAction = () => { var inst = this._pending; this._pending = default; @@ -2663,7 +2586,7 @@ public void SetContinuation(ValueTask vt) { _pending = vt; vt.ConfigureAwait(true).GetAwaiter() - .OnCompleted(TaskCompletedAction); + .OnCompleted(_taskCompletedAction); } public void Invoke(SlimResult result) @@ -4201,7 +4124,7 @@ public sealed class AsyncEnumerable : GraphStage> { #region internal classes - private sealed class Logic : OutGraphStageLogic + private sealed class Logic : PooledAwaitOutGraphStageLogic { private readonly IAsyncEnumerable _enumerable; private readonly Outlet _outlet; @@ -4217,12 +4140,30 @@ public Logic(SourceShape shape, IAsyncEnumerable enumerable) : base(shape) _enumerable = enumerable; _outlet = shape.Outlet; - _onSuccess = GetAsyncCallback(OnSuccess); - _onFailure = GetAsyncCallback(OnFailure); - _onComplete = GetAsyncCallback(OnComplete); + SetPooledCompletionCallback(OnResult); _completionCts = new CancellationTokenSource(); SetHandler(_outlet, this); } + + private void OnResult(SlimResult obj) + { + if (obj.IsSuccess()) + { + if (obj.Result) + { + OnSuccess(_enumerator.Current); + } + else + { + OnComplete(); + } + } + else + { + OnFailure(obj.Error); + } + } + [MethodImpl(MethodImplOptions.AggressiveInlining)] private void OnComplete() => CompleteStage(); @@ -4282,46 +4223,21 @@ async Task DisposeEnumeratorAsync() public override void OnPull() { - var vtask = _enumerator.MoveNextAsync(); - if (vtask.IsCompletedSuccessfully) + try { - // When MoveNextAsync returned immediately, we don't need to await. - // We can use fast path instead. - if (vtask.Result) + var vtask = _enumerator.MoveNextAsync(); + if (vtask.IsCompletedSuccessfully) { - // if result is true, it means we got an element. Push it downstream. - Push(_outlet, _enumerator.Current); + OnResult(new SlimResult(default,vtask.Result)); } else { - // if result is false, it means enumerator was closed. Complete stage in that case. - CompleteStage(); + SetContinuation(vtask,false); } } - else + catch (Exception e) { - //We immediately fall into wait case. - //Unlike Task, we don't have a 'status' Enum to switch off easily, - //And Error cases can just live with the small cost of async callback. - async Task ProcessTask() - { - // Since this Action is used as task continuation, we cannot safely call corresponding - // OnSuccess/OnFailure/OnComplete methods directly. We need to do that via async callbacks. - try - { - var completed = await vtask.ConfigureAwait(false); - if (completed) - _onSuccess(_enumerator.Current); - else - _onComplete(); - } - catch (Exception ex) - { - _onFailure(ex); - } - } - - _ = ProcessTask(); + OnResult(new SlimResult(e, default)); } } diff --git a/src/core/Akka.Streams/Implementation/Fusing/SlimResult.cs b/src/core/Akka.Streams/Implementation/Fusing/SlimResult.cs new file mode 100644 index 00000000000..01bd77110b5 --- /dev/null +++ b/src/core/Akka.Streams/Implementation/Fusing/SlimResult.cs @@ -0,0 +1,49 @@ +// //----------------------------------------------------------------------- +// // +// // Copyright (C) 2009-2024 Lightbend Inc. +// // Copyright (C) 2013-2024 .NET Foundation +// // +// //----------------------------------------------------------------------- + +using System; +using System.Threading.Tasks; + +namespace Akka.Streams.Implementation.Fusing; + +public readonly struct SlimResult +{ + public readonly Exception Error; + public readonly T Result; + + public static readonly SlimResult NotYetReady = + new SlimResult(NotYetThereSentinel.Instance, default); + + public static SlimResult FromTask(Task task) + { + return task.IsCanceled || task.IsFaulted + ? new SlimResult(task.Exception, default) + : new SlimResult(default, task.Result); + } + public SlimResult(Exception errorOrSentinel, T result) + { + if (result == null) + { + Error = errorOrSentinel ?? ReactiveStreamsCompliance + .ElementMustNotBeNullException; + } + else + { + Result = result; + } + } + + public bool IsSuccess() + { + return Error == null; + } + + public bool IsDone() + { + return Error != NotYetThereSentinel.Instance; + } +} \ No newline at end of file diff --git a/src/core/Akka.Streams/Implementation/PooledAwaitOutGraphStageLogic.cs b/src/core/Akka.Streams/Implementation/PooledAwaitOutGraphStageLogic.cs new file mode 100644 index 00000000000..87f2fcd381d --- /dev/null +++ b/src/core/Akka.Streams/Implementation/PooledAwaitOutGraphStageLogic.cs @@ -0,0 +1,92 @@ +// //----------------------------------------------------------------------- +// // +// // Copyright (C) 2009-2024 Lightbend Inc. +// // Copyright (C) 2013-2024 .NET Foundation +// // +// //----------------------------------------------------------------------- + +using System; +using System.Threading; +using System.Threading.Tasks; +using Akka.Streams.Implementation.Fusing; +using Akka.Streams.Stage; + +namespace Akka.Streams.Implementation; + +/// +/// Semi-unsafe Helper intermediate for +/// that allows for a ValueTask Wait to be pooled. +/// +/// Inheritors are expected to utilize the +/// and call `base.PreStart()` in their `PreStart` conditions. +/// +/// Additionally, if inheritors have their own 'restart' logic, +/// They should utilize the `ResetHolder()` method, +/// to avoid callback clashes. +/// +/// +/// +internal abstract class PooledAwaitOutGraphStageLogic : OutGraphStageLogic +{ + private UnsafeSlimHolder _unsafeSlimHolder; + protected Action> _completedCallback; + protected PooledAwaitOutGraphStageLogic(Shape shape) : base(shape) + { + _completedCallback = GetAsyncCallback>(t => + { + FailStage(new Exception("Callback was not set!")); + }); + } + + protected void SetPooledCompletionCallback(Action> completedCallback) + { + if (_completedCallback == null) + { + throw new ArgumentNullException( + nameof(completedCallback)); + } + _completedCallback = GetAsyncCallback(completedCallback); + } + + public override void PreStart() + { + ResetHolder(); + } + + /// + /// Sets a ValueTask to wire up the callback, + /// set via . + /// If has not been called, + /// The continuation will fail the stage! + /// + /// + protected void SetContinuation(ValueTask valueTask, bool configureAwait = true) + { + _unsafeSlimHolder.SetContinuation(valueTask, configureAwait); + } + + /// + /// Use at own risk! + /// + protected void SetHolder(UnsafeSlimHolder holder) + { + Interlocked.Exchange(ref _unsafeSlimHolder, holder); + } + + public void ResetHolder() + { + Interlocked.Exchange(ref _unsafeSlimHolder, new UnsafeSlimHolder(this)); + } + + internal void RunIfSame(UnsafeSlimHolder unsafeSlimHolder, ValueTask vt) + { + //We are explicitly using referenceEquals here, + //since we are explicitly resetting things. + if (object.ReferenceEquals(_unsafeSlimHolder, unsafeSlimHolder)) + { + _completedCallback(vt.IsCompletedSuccessfully + ? new SlimResult(default, vt.Result) + : SlimResult.FromTask(vt.AsTask())); + } + } +} \ No newline at end of file diff --git a/src/core/Akka.Streams/Implementation/Sources.cs b/src/core/Akka.Streams/Implementation/Sources.cs index a19abe2c7e2..486abb54eb4 100644 --- a/src/core/Akka.Streams/Implementation/Sources.cs +++ b/src/core/Akka.Streams/Implementation/Sources.cs @@ -774,14 +774,12 @@ public sealed class UnfoldResourceSourceValueTaskAsync> { private readonly UnfoldResourceSourceValueTaskAsync _stage; private readonly Lazy _decider; private Option _state = Option.None; - private ValueTask> _currentReadVt; - private readonly Action _valueTaskAwaiterOnCompleteAction; public Logic(UnfoldResourceSourceValueTaskAsync stage, Attributes inheritedAttributes) : base(stage.Shape) { @@ -791,7 +789,7 @@ public Logic(UnfoldResourceSourceValueTaskAsync sta var strategy = inheritedAttributes.GetAttribute(null); return strategy != null ? strategy.Decider : Deciders.StoppingDecider; }); - _valueTaskAwaiterOnCompleteAction = SelfReadCallback; + SetPooledCompletionCallback(Handler); SetHandler(_stage.Out, this); } @@ -831,21 +829,7 @@ private void ErrorHandler(Exception ex) } } - - private void SelfReadCallback() - { - var swap = _currentReadVt; - _currentReadVt = default; - if (swap.IsCompletedSuccessfully) - { - ReadCallback(new SlimResult>(default,swap.Result)); - } - else - { - ReadCallback(SlimResult>.FromTask(swap.AsTask())); - } - } - private Action>> ReadCallback => GetAsyncCallback>>(read => + private void Handler(SlimResult> read) { if (read.IsSuccess()) { @@ -869,8 +853,9 @@ private void SelfReadCallback() } } } - else ErrorHandler(read.Error); - }); + else + ErrorHandler(read.Error); + } private void CloseResource() { @@ -884,7 +869,11 @@ private void CloseResource() _state = Option.None; } - public override void PreStart() => CreateResource(); + public override void PreStart() + { + CreateResource(); + base.PreStart(); + } public override void OnPull() { @@ -908,9 +897,7 @@ public override void OnPull() } else { - _currentReadVt = vt; - _currentReadVt.GetAwaiter().OnCompleted(_valueTaskAwaiterOnCompleteAction); - //_pooledContinuation.AttachAwaiter(vt); + SetContinuation(vt); } @@ -986,6 +973,7 @@ private void CreateResource() /// /// TBD /// + /// /// TBD /// TBD /// TBD diff --git a/src/core/Akka.Streams/Implementation/Stages/Stages.cs b/src/core/Akka.Streams/Implementation/Stages/Stages.cs index b65f56a8c70..0227ad62b10 100644 --- a/src/core/Akka.Streams/Implementation/Stages/Stages.cs +++ b/src/core/Akka.Streams/Implementation/Stages/Stages.cs @@ -257,6 +257,7 @@ public static class DefaultAttributes /// TBD /// public static readonly Attributes UnfoldAsync = Attributes.CreateName("unfoldAsync"); + public static readonly Attributes UnfoldValueTaskAsync = Attributes.CreateName("unfoldValueTaskAsync"); /// /// TBD /// diff --git a/src/core/Akka.Streams/Implementation/Unfold.cs b/src/core/Akka.Streams/Implementation/Unfold.cs index 488aeafd34f..fbbc347fe2a 100644 --- a/src/core/Akka.Streams/Implementation/Unfold.cs +++ b/src/core/Akka.Streams/Implementation/Unfold.cs @@ -91,7 +91,7 @@ public Unfold(TState state, Func> unfoldFunc) protected override GraphStageLogic CreateLogic(Attributes inheritedAttributes) => new Logic(this); } - /// + /// /// INTERNAL API /// /// TBD @@ -100,69 +100,65 @@ public Unfold(TState state, Func> unfoldFunc) public class UnfoldValueTaskAsync : GraphStage> { #region stage logic - private sealed class Logic : OutGraphStageLogic + private sealed class Logic : PooledAwaitOutGraphStageLogic> { private readonly UnfoldValueTaskAsync _stage; private TState _state; - private Action>> _asyncHandler; - private ValueTask> _currentTask; + public Logic(UnfoldValueTaskAsync stage) : base(stage.Shape) { + _stage = stage; _state = _stage.State; - + SetPooledCompletionCallback(SyncResult); SetHandler(_stage.Out, this); } public override void OnPull() { - var vt = _stage.UnfoldFunc(_state); - if (vt.IsCompletedSuccessfully) - { - _asyncHandler( - new SlimResult>(default, - vt.Result)); - } - else + ValueTask> vt; + bool taken = false; + try { - _currentTask = vt; - vt.GetAwaiter().OnCompleted(CompletionAction); + vt = _stage.UnfoldFunc(_state); + taken = true; } - } - private void CompletionAction() - { - if (_currentTask.IsCompletedSuccessfully) + catch (Exception e) { - _asyncHandler.Invoke( - new SlimResult>(default, - _currentTask.Result)); + vt = default; + Fail(_stage.Out,e); } - else + + if (taken) { - _asyncHandler.Invoke( - SlimResult>.FromTask( - _currentTask.AsTask())); + if (vt.IsCompletedSuccessfully) + { + SyncResult( + new SlimResult>(default, + vt.Result)); + } + else + { + SetContinuation(vt); + } } } - public override void PreStart() + + private void SyncResult(SlimResult> result) { - var ac = GetAsyncCallback>>(result => + if (!result.IsSuccess()) + Fail(_stage.Out, result.Error); + else { - if (!result.IsSuccess()) - Fail(_stage.Out, result.Error); + var option = result.Result; + if (!option.HasValue) + Complete(_stage.Out); else { - var option = result.Result; - if (!option.HasValue) - Complete(_stage.Out); - else - { - Push(_stage.Out, option.Value.Item2); - _state = option.Value.Item1; - } + Push(_stage.Out, option.Value.Item2); + _state = option.Value.Item1; } - }); - _asyncHandler = ac; + } } } #endregion diff --git a/src/core/Akka.Streams/Implementation/UnsafeSlimHolder.cs b/src/core/Akka.Streams/Implementation/UnsafeSlimHolder.cs new file mode 100644 index 00000000000..9a5959ec347 --- /dev/null +++ b/src/core/Akka.Streams/Implementation/UnsafeSlimHolder.cs @@ -0,0 +1,54 @@ +// //----------------------------------------------------------------------- +// // +// // Copyright (C) 2009-2024 Lightbend Inc. +// // Copyright (C) 2013-2024 .NET Foundation +// // +// //----------------------------------------------------------------------- + +using System; +using System.Runtime.CompilerServices; +using System.Threading.Tasks; +using System.Threading.Tasks.Sources; + +namespace Akka.Streams.Implementation; + +/// +/// Used for Pooling ValueTasks by +/// +/// +/// This is intentionally only guarded for the sake of avoiding Exceptions, +/// As it is expected that implementations are within Akka Streams Stages +/// (which have their own safety guarantees) and geared towards performance. +/// +internal class UnsafeSlimHolder +{ + private readonly PooledAwaitOutGraphStageLogic _parent; + private ValueTask _vt; + private readonly Action _continuation; + public UnsafeSlimHolder(PooledAwaitOutGraphStageLogic logic) + { + _parent = logic; + _continuation = ContinuationAction; + } + + public void SetContinuation(ValueTask vt, bool configureAwait = true) + { + if (configureAwait) + { + _vt = vt; + } + else + { + var whyMSFTwhy = _vt.ConfigureAwait(false); + _vt = Unsafe.As, ValueTask>( + ref whyMSFTwhy); + } + _vt.GetAwaiter().OnCompleted(_continuation); + } + private void ContinuationAction() + { + var vt = _vt; + _vt = default; + _parent.RunIfSame(this, vt); + } +} \ No newline at end of file