diff --git a/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/SingleConcatWithPublisher.java b/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/SingleConcatWithPublisher.java index d61ec4cf92..c0d63d742b 100644 --- a/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/SingleConcatWithPublisher.java +++ b/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/SingleConcatWithPublisher.java @@ -206,12 +206,18 @@ private static final class ConcatDeferNextSubscriber extends AbstractConcatSu */ private static final Object REQUESTED_ONE = new Object(); /** - * If more than one item was {@link #request(long) requested} before {@link #onSuccess(Object)}. + * If more than one item was {@link #request(long) requested} before {@link #onSuccess(Object)} or while its + * result is delivering to the target. */ private static final Object REQUESTED_MORE = new Object(); /** * If only one item was {@link #request(long) requested} and {@link #onSuccess(Object)} invoked. */ + private static final Object SINGLE_DELIVERING = new Object(); + /** + * If only one item was {@link #request(long) requested}, {@link #onSuccess(Object)} invoked, and its result was + * delivered to the target. + */ private static final Object SINGLE_DELIVERED = new Object(); /** * If more than one item was {@link #request(long) requested}, {@link #onSuccess(Object)} invoked, and we @@ -227,6 +233,7 @@ private static final class ConcatDeferNextSubscriber extends AbstractConcatSu public void onSuccess(@Nullable final T result) { for (;;) { final Object oldValue = mayBeResult; + assert oldValue != SINGLE_DELIVERING; assert oldValue != SINGLE_DELIVERED; assert oldValue != PUBLISHER_SUBSCRIBED; @@ -237,8 +244,8 @@ public void onSuccess(@Nullable final T result) { break; } } else if (oldValue == REQUESTED_ONE) { - if (mayBeResultUpdater.compareAndSet(this, oldValue, SINGLE_DELIVERED)) { - tryEmitSingleSuccessToTarget(result); + if (mayBeResultUpdater.compareAndSet(this, oldValue, SINGLE_DELIVERING)) { + emitSingleSuccessToTarget(result); break; } } else if (oldValue == REQUESTED_MORE && @@ -280,7 +287,7 @@ public void request(long n) { break; } } - } else if (oldVal == REQUESTED_ONE) { + } else if (oldVal == REQUESTED_ONE || oldVal == SINGLE_DELIVERING) { if (mayBeResultUpdater.compareAndSet(this, oldVal, REQUESTED_MORE)) { super.request(n); break; @@ -301,13 +308,26 @@ public void request(long n) { } break; } - } else if (mayBeResultUpdater.compareAndSet(this, oldVal, SINGLE_DELIVERED)) { + } else if (mayBeResultUpdater.compareAndSet(this, oldVal, SINGLE_DELIVERING)) { @SuppressWarnings("unchecked") final T tVal = (T) oldVal; - tryEmitSingleSuccessToTarget(tVal); + emitSingleSuccessToTarget(tVal); break; } } } + + private void emitSingleSuccessToTarget(@Nullable final T result) { + if (tryEmitSingleSuccessToTarget(result)) { + if (mayBeResultUpdater.compareAndSet(this, SINGLE_DELIVERING, SINGLE_DELIVERED)) { + // state didn't change, we are done + } else if (mayBeResultUpdater.compareAndSet(this, REQUESTED_MORE, PUBLISHER_SUBSCRIBED)) { + // more demand appeared while we were delivering the single result + next.subscribeInternal(this); + } else { + assert mayBeResult == CANCELLED; + } + } + } } } diff --git a/servicetalk-concurrent-reactivestreams/src/test/java/io/servicetalk/concurrent/reactivestreams/tck/SingleConcatWithPublisherDeferSubscribeTckTest.java b/servicetalk-concurrent-reactivestreams/src/test/java/io/servicetalk/concurrent/reactivestreams/tck/SingleConcatWithPublisherDeferSubscribeTckTest.java index 5a48ee86f3..71f69bd119 100644 --- a/servicetalk-concurrent-reactivestreams/src/test/java/io/servicetalk/concurrent/reactivestreams/tck/SingleConcatWithPublisherDeferSubscribeTckTest.java +++ b/servicetalk-concurrent-reactivestreams/src/test/java/io/servicetalk/concurrent/reactivestreams/tck/SingleConcatWithPublisherDeferSubscribeTckTest.java @@ -24,9 +24,4 @@ public class SingleConcatWithPublisherDeferSubscribeTckTest extends SingleConcat boolean deferSubscribe() { return true; } - - @Override - public long boundedDepthOfOnNextAndRequestRecursion() { - return 2; - } }