From 7ebf09c67b5258b54040513073ad69a7fdbd06e5 Mon Sep 17 00:00:00 2001 From: neuecc Date: Tue, 20 Aug 2024 15:47:44 +0900 Subject: [PATCH] Add SingleAssignmentSubject --- src/R3/SingleAssignmentSubject.cs | 176 ++++++++++++++++++++++++++++++ tests/R3.Tests/SubjectTest.cs | 112 +++++++++++++++++++ 2 files changed, 288 insertions(+) create mode 100644 src/R3/SingleAssignmentSubject.cs diff --git a/src/R3/SingleAssignmentSubject.cs b/src/R3/SingleAssignmentSubject.cs new file mode 100644 index 00000000..8a0f34ab --- /dev/null +++ b/src/R3/SingleAssignmentSubject.cs @@ -0,0 +1,176 @@ +namespace R3; + +public sealed class SingleAssignmentSubject : Observable, ISubject, IDisposable +{ + Observer? singleObserver; + Result completed; + + public bool IsDisposed => singleObserver == DisposedObserver.Instance; + + public void OnNext(T value) + { + var observer = singleObserver; + if (observer == CompletedObserver.Instance || observer == null) + { + // do nothing + } + else if (observer == DisposedObserver.Instance) + { + ThrowAlreadyDisposed(); + } + else + { + observer.OnNext(value); + } + } + + public void OnErrorResume(Exception error) + { + var observer = singleObserver; + if (observer == CompletedObserver.Instance || observer == null) + { + // do nothing + } + else if (observer == DisposedObserver.Instance) + { + ThrowAlreadyDisposed(); + } + else + { + observer.OnErrorResume(error); + } + } + + public void OnCompleted(Result complete) + { + while (true) + { + var observer = Volatile.Read(ref singleObserver); + if (observer == CompletedObserver.Instance) + { + // do nothing + return; + } + else if (observer == DisposedObserver.Instance) + { + ThrowAlreadyDisposed(); + return; + } + else + { + this.completed = complete; + if (Interlocked.CompareExchange(ref singleObserver, CompletedObserver.Instance, observer) == observer) + { + observer?.OnCompleted(complete); + return; + } + } + } + } + + protected override IDisposable SubscribeCore(Observer observer) + { + var field = Interlocked.CompareExchange(ref singleObserver, observer, null); + if (field == null) + { + // ok to set. + return new Subscription(this); + } + + if (field == DisposedObserver.Instance) + { + ThrowAlreadyDisposed(); + } + else if (field == CompletedObserver.Instance) + { + observer.OnCompleted(completed); + } + else + { + ThrowAlreadyAssignment(); + } + return Disposable.Empty; + } + + public void Dispose() + { + Dispose(true); + } + + public void Dispose(bool callOnCompleted) + { + var observer = Interlocked.Exchange(ref singleObserver, DisposedObserver.Instance); + if (observer != DisposedObserver.Instance && observer != null && callOnCompleted) + { + observer.OnCompleted(); + } + } + + static void ThrowAlreadyAssignment() + { + throw new InvalidOperationException("Observer is already assigned."); + } + + void ThrowAlreadyDisposed() + { + throw new ObjectDisposedException(""); + } + + class Subscription(SingleAssignmentSubject parent) : IDisposable + { + public void Dispose() + { + while (true) + { + var observer = Volatile.Read(ref parent.singleObserver); + if (observer == CompletedObserver.Instance || observer == DisposedObserver.Instance || observer == null) + { + // do nothing + return; + } + else + { + // reset to null(allow multiple assignment after first subscription is disposed) + if (Interlocked.CompareExchange(ref parent.singleObserver, null, observer) == observer) + { + return; + } + } + } + } + } + + sealed class CompletedObserver : Observer + { + public static readonly CompletedObserver Instance = new(); + + protected override void OnCompletedCore(Result result) + { + } + + protected override void OnErrorResumeCore(Exception error) + { + } + + protected override void OnNextCore(T value) + { + } + } + + sealed class DisposedObserver : Observer + { + public static readonly DisposedObserver Instance = new(); + + protected override void OnCompletedCore(Result result) + { + } + + protected override void OnErrorResumeCore(Exception error) + { + } + + protected override void OnNextCore(T value) + { + } + } +} diff --git a/tests/R3.Tests/SubjectTest.cs b/tests/R3.Tests/SubjectTest.cs index a7f29cd1..838589e4 100644 --- a/tests/R3.Tests/SubjectTest.cs +++ b/tests/R3.Tests/SubjectTest.cs @@ -95,4 +95,116 @@ public void SubscribeAfterCompleted() l.Result.Exception!.Message.Should().Be("foo"); } } + + [Fact] + public void SingleAssignment() + { + // normal + { + var s = new SingleAssignmentSubject(); + using var l = s.ToLiveList(); + s.OnNext(1); + s.OnNext(2); + s.OnNext(3); + l.Should().Equal(1, 2, 3); + + s.OnCompleted(); + l.AssertIsCompleted(); + } + // subscribe twice + { + var s = new SingleAssignmentSubject(); + using var l = s.ToLiveList(); + Assert.Throws(() => s.Subscribe()); + } + + // subject test copy + + // Dispose(not yet completed) + { + var s = new SingleAssignmentSubject(); + using var l = s.ToLiveList(); + s.OnNext(1); + s.OnNext(2); + s.OnNext(3); + s.Dispose(); + + l.AssertEqual([1, 2, 3]); + l.AssertIsCompleted(); + s.IsDisposed.Should().BeTrue(); + } + + // already OnCompleted(Success), Dispose + { + var s = new SingleAssignmentSubject(); + using var l = s.ToLiveList(); + s.OnNext(1); + s.OnNext(2); + s.OnNext(3); + s.OnCompleted(); + s.Dispose(); + + l.AssertEqual([1, 2, 3]); + l.AssertIsCompleted(); + s.IsDisposed.Should().BeTrue(); + } + + // already OnCompleted(Failure), Dispose + { + var s = new SingleAssignmentSubject(); + using var l = s.ToLiveList(); + s.OnNext(1); + s.OnNext(2); + s.OnNext(3); + s.OnCompleted(new Exception("foo")); + s.Dispose(); + + l.AssertEqual([1, 2, 3]); + l.AssertIsCompleted(); + s.IsDisposed.Should().BeTrue(); + } + + + // already Disposed, call OnNext + { + var s = new SingleAssignmentSubject(); + s.Dispose(); + Assert.Throws(() => s.OnNext(1)); + } + // already Disposed, call OnError + { + var s = new SingleAssignmentSubject(); + s.Dispose(); + Assert.Throws(() => s.OnErrorResume(new Exception())); + } + // already Disposed, call OnCompleted + { + var s = new SingleAssignmentSubject(); + s.Dispose(); + Assert.Throws(() => s.OnCompleted()); + } + + + { + // after Success + var s = new SingleAssignmentSubject(); + s.OnCompleted(); + + using var l = s.ToLiveList(); + + l.AssertIsCompleted(); + l.Result.IsSuccess.Should().BeTrue(); + } + { + // after Failure + var s = new SingleAssignmentSubject(); + s.OnCompleted(new Exception("foo")); + + using var l = s.ToLiveList(); + + l.AssertIsCompleted(); + l.Result.IsFailure.Should().BeTrue(); + l.Result.Exception!.Message.Should().Be("foo"); + } + } }