From b2a7611907e1361562c978f9a2d356dff4f3ef1c Mon Sep 17 00:00:00 2001 From: Bryce Anderson Date: Fri, 9 Aug 2024 12:16:03 -0600 Subject: [PATCH 01/12] Add some comments --- .../servicetalk/http/utils/BeforeFinallyHttpOperator.java | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/servicetalk-http-utils/src/main/java/io/servicetalk/http/utils/BeforeFinallyHttpOperator.java b/servicetalk-http-utils/src/main/java/io/servicetalk/http/utils/BeforeFinallyHttpOperator.java index c3bfe759df..255f6af726 100644 --- a/servicetalk-http-utils/src/main/java/io/servicetalk/http/utils/BeforeFinallyHttpOperator.java +++ b/servicetalk-http-utils/src/main/java/io/servicetalk/http/utils/BeforeFinallyHttpOperator.java @@ -147,6 +147,13 @@ public void onError(final Throwable t) { public void onSubscribe(final Cancellable cancellable) { subscriber.onSubscribe(() -> { try { + // TODO: We may also need to complete the `beforeFinally()` if we're in the PROCESSING state. + // and also audit something else? + // Investigate if rdar://105295860 (BeforeFinallyHttpOperator: callbacks will be invoked more + // than once if users re-subscribe to response payload body) could be another source of + // misaligned limiter counters. Note that it does not affect iCloud because we guarantee we + // never re-subscribe inside http-client-servicetalk, but could be an issue for other users + // of ServiceTalk. if (stateUpdater.compareAndSet(this, IDLE, TERMINATED)) { beforeFinally.cancel(); } From ff57352152284172797b63796bb4ff2a0bdef1eb Mon Sep 17 00:00:00 2001 From: Bryce Anderson Date: Tue, 13 Aug 2024 09:50:09 -0600 Subject: [PATCH 02/12] This might be a start, but still work to do --- .../http/utils/BeforeFinallyHttpOperator.java | 320 ++++++------------ .../utils/BeforeFinallyHttpOperatorTest.java | 26 +- 2 files changed, 124 insertions(+), 222 deletions(-) diff --git a/servicetalk-http-utils/src/main/java/io/servicetalk/http/utils/BeforeFinallyHttpOperator.java b/servicetalk-http-utils/src/main/java/io/servicetalk/http/utils/BeforeFinallyHttpOperator.java index 255f6af726..b864740b82 100644 --- a/servicetalk-http-utils/src/main/java/io/servicetalk/http/utils/BeforeFinallyHttpOperator.java +++ b/servicetalk-http-utils/src/main/java/io/servicetalk/http/utils/BeforeFinallyHttpOperator.java @@ -24,17 +24,19 @@ import io.servicetalk.concurrent.api.SingleOperator; import io.servicetalk.concurrent.api.TerminalSignalConsumer; import io.servicetalk.concurrent.internal.CancelImmediatelySubscriber; +import io.servicetalk.concurrent.internal.DuplicateSubscribeException; import io.servicetalk.http.api.StreamingHttpRequest; import io.servicetalk.http.api.StreamingHttpResponse; import java.util.concurrent.CancellationException; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; +import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; import javax.annotation.Nullable; import static io.servicetalk.concurrent.api.SourceAdapters.toSource; import static io.servicetalk.utils.internal.ThrowableUtils.addSuppressed; import static java.util.Objects.requireNonNull; -import static java.util.concurrent.atomic.AtomicIntegerFieldUpdater.newUpdater; /** * Helper operator for signaling the end of an HTTP Request/Response cycle. @@ -59,6 +61,8 @@ * .beforeOnSubscribe(__ -> tracker.requestStarted()) * .liftSync(new BeforeFinallyHttpOperator(tracker)); * } + * + * We really want two things: to emit signals and to swallow all events after cancellation if so configured. */ public final class BeforeFinallyHttpOperator implements SingleOperator { @@ -95,7 +99,7 @@ public BeforeFinallyHttpOperator(final Runnable beforeFinally) { * cancellation. */ public BeforeFinallyHttpOperator(final TerminalSignalConsumer beforeFinally, boolean discardEventsAfterCancel) { - this.beforeFinally = requireNonNull(beforeFinally); + this.beforeFinally = new OnceTerminalSignalConsumer(requireNonNull(beforeFinally)); this.discardEventsAfterCancel = discardEventsAfterCancel; } @@ -110,11 +114,14 @@ public SingleSource.Subscriber apply( } private static final class ResponseCompletionSubscriber implements SingleSource.Subscriber { - private static final int IDLE = 0; - private static final int PROCESSING_PAYLOAD = 1; - private static final int TERMINATED = -1; - private static final AtomicIntegerFieldUpdater stateUpdater = - newUpdater(ResponseCompletionSubscriber.class, "state"); + + private enum State { + IDLE, + PROCESSING_PAYLOAD, + RESPONSE_COMPLETE + } + private static final AtomicReferenceFieldUpdater responseCompleteStateUpdater = + AtomicReferenceFieldUpdater.newUpdater(ResponseCompletionSubscriber.class, State.class, "state"); private static final SingleSource.Subscriber NOOP_SUBSCRIBER = new SingleSource.Subscriber() { @Override @@ -133,7 +140,7 @@ public void onError(final Throwable t) { private SingleSource.Subscriber subscriber; private final TerminalSignalConsumer beforeFinally; private final boolean discardEventsAfterCancel; - private volatile int state; + private volatile State state = State.IDLE; ResponseCompletionSubscriber(final SingleSource.Subscriber sub, final TerminalSignalConsumer beforeFinally, @@ -147,14 +154,8 @@ public void onError(final Throwable t) { public void onSubscribe(final Cancellable cancellable) { subscriber.onSubscribe(() -> { try { - // TODO: We may also need to complete the `beforeFinally()` if we're in the PROCESSING state. - // and also audit something else? - // Investigate if rdar://105295860 (BeforeFinallyHttpOperator: callbacks will be invoked more - // than once if users re-subscribe to response payload body) could be another source of - // misaligned limiter counters. Note that it does not affect iCloud because we guarantee we - // never re-subscribe inside http-client-servicetalk, but could be an issue for other users - // of ServiceTalk. - if (stateUpdater.compareAndSet(this, IDLE, TERMINATED)) { + final State previous = responseCompleteStateUpdater.getAndSet(this, State.RESPONSE_COMPLETE); + if ((previous == State.IDLE || previous == State.PROCESSING_PAYLOAD)) { beforeFinally.cancel(); } } finally { @@ -168,15 +169,15 @@ public void onSubscribe(final Cancellable cancellable) { public void onSuccess(@Nullable final StreamingHttpResponse response) { if (response == null) { sendNullResponse(); - } else if (stateUpdater.compareAndSet(this, IDLE, PROCESSING_PAYLOAD)) { + } else if (responseCompleteStateUpdater.compareAndSet(this, State.IDLE, State.PROCESSING_PAYLOAD)) { subscriber.onSuccess(response.transformMessageBody(payload -> - payload.liftSync(messageBodySubscriber -> new MessageBodySubscriber(messageBodySubscriber, + payload.liftSync(messageBodySubscriber -> new MessageBodySubscriber(this, messageBodySubscriber, beforeFinally, discardEventsAfterCancel)) )); } else { // Invoking a terminal method multiple times is not allowed by the RS spec, so we assume we have been // cancelled. - assert state == TERMINATED; + assert state == State.RESPONSE_COMPLETE; // The request has been cancelled, but we still received a response. We need to discard the response // body or risk leaking hot resources which are commonly attached to a message body. toSource(response.messageBody()).subscribe(CancelImmediatelySubscriber.INSTANCE); @@ -191,7 +192,7 @@ public void onSuccess(@Nullable final StreamingHttpResponse response) { @Override public void onError(final Throwable t) { try { - if (stateUpdater.compareAndSet(this, IDLE, TERMINATED)) { + if (responseCompleteStateUpdater.compareAndSet(this, State.IDLE, State.RESPONSE_COMPLETE)) { beforeFinally.onError(t); } else if (discardEventsAfterCancel) { return; @@ -206,7 +207,7 @@ public void onError(final Throwable t) { private void sendNullResponse() { try { // Since, we are not giving out a response, no subscriber will arrive for the payload Publisher. - if (stateUpdater.compareAndSet(this, IDLE, TERMINATED)) { + if (responseCompleteStateUpdater.compareAndSet(this, State.IDLE, State.RESPONSE_COMPLETE)) { beforeFinally.onComplete(); } else if (discardEventsAfterCancel) { return; @@ -226,232 +227,129 @@ private void dereferenceSubscriber() { // https://github.com/reactive-streams/reactive-streams-jvm/blob/v1.0.4/README.md#3.13 subscriber = NOOP_SUBSCRIBER; } - } - private static final class MessageBodySubscriber implements Subscriber { + private static final class MessageBodySubscriber implements Subscriber { + + private static final int PROCESSING_PAYLOAD = 0; + private static final int CANCELLED = 2; + private static final int TERMINATED = -1; + + private static final AtomicIntegerFieldUpdater stateUpdater = + AtomicIntegerFieldUpdater.newUpdater(MessageBodySubscriber.class, "state"); + + private final ResponseCompletionSubscriber parent; + private final Subscriber subscriber; + private final TerminalSignalConsumer beforeFinally; + private final boolean discardEventsAfterCancel; + private volatile int state; + + MessageBodySubscriber(final ResponseCompletionSubscriber parent, + final Subscriber subscriber, + final TerminalSignalConsumer beforeFinally, + final boolean discardEventsAfterCancel) { + this.parent = parent; + this.subscriber = subscriber; + this.beforeFinally = beforeFinally; + this.discardEventsAfterCancel = discardEventsAfterCancel; + } - private static final int PROCESSING_PAYLOAD = 0; - private static final int DELIVERING_PAYLOAD = 1; - private static final int AWAITING_CANCEL = 2; - private static final int TERMINATED = -1; + @Override + public void onSubscribe(final Subscription subscription) { + if (!responseCompleteStateUpdater.compareAndSet(parent, State.PROCESSING_PAYLOAD, State.RESPONSE_COMPLETE)) { + // There was a cancellation between providing the response and subscribing to the payload + // body. We must discard everything. + subscription.cancel(); + subscriber.onSubscribe(new Subscription() { + @Override + public void request(long n) { - private static final AtomicIntegerFieldUpdater stateUpdater = - newUpdater(MessageBodySubscriber.class, "state"); + } - private final Subscriber subscriber; - private final TerminalSignalConsumer beforeFinally; - private final boolean discardEventsAfterCancel; - private volatile int state; - @Nullable - private Subscription subscription; - - MessageBodySubscriber(final Subscriber subscriber, - final TerminalSignalConsumer beforeFinally, - final boolean discardEventsAfterCancel) { - this.subscriber = subscriber; - this.beforeFinally = beforeFinally; - this.discardEventsAfterCancel = discardEventsAfterCancel; - } + @Override + public void cancel() { - @Override - public void onSubscribe(final Subscription subscription) { - this.subscription = subscription; - subscriber.onSubscribe(new Subscription() { - @Override - public void request(final long n) { - subscription.request(n); + } + }); + subscriber.onError(new DuplicateSubscribeException(null, subscriber, "TODO: be better")); + return; } + subscriber.onSubscribe(new Subscription() { + @Override + public void request(final long n) { + subscription.request(n); + } - @Override - public void cancel() { - if (!discardEventsAfterCancel) { + @Override + public void cancel() { try { - if (stateUpdater.compareAndSet(MessageBodySubscriber.this, - PROCESSING_PAYLOAD, TERMINATED)) { - beforeFinally.cancel(); - } + beforeFinally.cancel(); } finally { - subscription.cancel(); - } - return; - } - - for (;;) { - final int state = MessageBodySubscriber.this.state; - if (state == PROCESSING_PAYLOAD) { - if (stateUpdater.compareAndSet(MessageBodySubscriber.this, - PROCESSING_PAYLOAD, TERMINATED)) { - try { - beforeFinally.cancel(); - } finally { - subscription.cancel(); - } - break; - } - } else if (state == DELIVERING_PAYLOAD) { - if (stateUpdater.compareAndSet(MessageBodySubscriber.this, - DELIVERING_PAYLOAD, AWAITING_CANCEL)) { - break; + if (CANCELLED != stateUpdater.getAndSet(MessageBodySubscriber.this, CANCELLED)) { + subscription.cancel(); } - } else if (state == TERMINATED) { - // still propagate cancel to the original subscription: - subscription.cancel(); - break; - } else { - // cancel can be invoked multiple times - assert state == AWAITING_CANCEL; - break; } } - } - }); - } - - @Override - public void onNext(@Nullable final Object o) { - if (!discardEventsAfterCancel) { - subscriber.onNext(o); - return; + }); } - boolean reentry = false; - for (;;) { - final int state = this.state; - if (state == TERMINATED) { - // We already cancelled and have to discard further events - return; - } - if (state == DELIVERING_PAYLOAD || state == AWAITING_CANCEL) { - reentry = true; - break; - } - if (stateUpdater.compareAndSet(this, PROCESSING_PAYLOAD, DELIVERING_PAYLOAD)) { - break; + @Override + public void onNext(@Nullable final Object o) { + if (!discardEventsAfterCancel || state == PROCESSING_PAYLOAD) { + subscriber.onNext(o); } } - try { - subscriber.onNext(o); - } finally { - // Re-entry -> don't unlock - if (!reentry) { - for (;;) { - final int state = this.state; - assert state != PROCESSING_PAYLOAD; - if (state == TERMINATED) { - break; - } - if (state == DELIVERING_PAYLOAD) { - if (stateUpdater.compareAndSet(this, DELIVERING_PAYLOAD, PROCESSING_PAYLOAD)) { - break; - } - } else if (stateUpdater.compareAndSet(this, AWAITING_CANCEL, TERMINATED)) { - try { - beforeFinally.cancel(); - } finally { - assert subscription != null; - subscription.cancel(); - } - break; - } - } + @Override + public void onError(final Throwable t) { + if (stateUpdater.compareAndSet(this, PROCESSING_PAYLOAD, TERMINATED) || !discardEventsAfterCancel) { + beforeFinally.onError(t); + subscriber.onError(t); } } - } - @Override - public void onError(final Throwable t) { - if (!discardEventsAfterCancel) { - try { - if (stateUpdater.compareAndSet(this, PROCESSING_PAYLOAD, TERMINATED)) { - beforeFinally.onError(t); - } - } catch (Throwable cause) { - addSuppressed(t, cause); + @Override + public void onComplete() { + if (stateUpdater.compareAndSet(this, PROCESSING_PAYLOAD, TERMINATED) || !discardEventsAfterCancel) { + beforeFinally.onComplete(); + subscriber.onComplete(); } - subscriber.onError(t); - return; } + } + } - final int prevState = setTerminalState(); - if (prevState == TERMINATED) { - // We already cancelled and have to discard further events - return; - } - // Propagate original cancel to let Subscription observe it - final boolean propagateCancel = prevState == AWAITING_CANCEL; + private static final class OnceTerminalSignalConsumer implements TerminalSignalConsumer { - try { - beforeFinally.onError(t); - } catch (Throwable cause) { - addSuppressed(t, cause); - } - try { - subscriber.onError(t); - } finally { - cancel0(propagateCancel); - } + private final TerminalSignalConsumer delegate; + // TODO: inline. + private final AtomicBoolean once = new AtomicBoolean(); + + public OnceTerminalSignalConsumer(final TerminalSignalConsumer delegate) { + this.delegate = delegate; } @Override public void onComplete() { - if (!discardEventsAfterCancel) { - try { - if (stateUpdater.compareAndSet(this, PROCESSING_PAYLOAD, TERMINATED)) { - beforeFinally.onComplete(); - } - } catch (Throwable cause) { - subscriber.onError(cause); - return; - } - subscriber.onComplete(); - return; + if (once()) { + delegate.onComplete(); } + } - final int prevState = setTerminalState(); - if (prevState == TERMINATED) { - // We already cancelled and have to discard further events - return; - } - // Propagate original cancel to let Subscription observe it - final boolean propagateCancel = prevState == AWAITING_CANCEL; - - try { - try { - beforeFinally.onComplete(); - } catch (Throwable cause) { - subscriber.onError(cause); - return; - } - subscriber.onComplete(); - } finally { - cancel0(propagateCancel); + @Override + public void onError(Throwable throwable) { + if (once()) { + delegate.onError(throwable); } } - private int setTerminalState() { - for (;;) { - final int state = this.state; - if (state == TERMINATED) { - // We already cancelled and have to discard further events - return state; - } - if (state == PROCESSING_PAYLOAD) { - if (stateUpdater.compareAndSet(this, PROCESSING_PAYLOAD, TERMINATED)) { - return state; - } - } else if (stateUpdater.compareAndSet(this, state, TERMINATED)) { - // re-entry, but we can terminate because this is a final event: - return state; - } + @Override + public void cancel() { + if (once()) { + delegate.cancel(); } } - private void cancel0(final boolean propagateCancel) { - if (propagateCancel) { - assert subscription != null; - subscription.cancel(); - } + private boolean once() { + return !once.getAndSet(true); } } } diff --git a/servicetalk-http-utils/src/test/java/io/servicetalk/http/utils/BeforeFinallyHttpOperatorTest.java b/servicetalk-http-utils/src/test/java/io/servicetalk/http/utils/BeforeFinallyHttpOperatorTest.java index b168a19df4..234605cf1a 100644 --- a/servicetalk-http-utils/src/test/java/io/servicetalk/http/utils/BeforeFinallyHttpOperatorTest.java +++ b/servicetalk-http-utils/src/test/java/io/servicetalk/http/utils/BeforeFinallyHttpOperatorTest.java @@ -77,6 +77,7 @@ import static org.hamcrest.Matchers.sameInstance; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.atMostOnce; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoInteractions; @@ -242,11 +243,16 @@ void cancelAfterOnSuccess(boolean discardEventsAfterCancel) { subscriber.verifyResponseReceived(); subscriber.cancellable.cancel(); - verifyNoInteractions(beforeFinally); + + // We should get a cancel because we haven't subscribed to the payload body. + verify(beforeFinally).cancel(); // We unconditionally cancel and let the original single handle the cancel post terminate responseSingle.verifyCancelled(); } + // TODO: do we have a test where we get a cancel from the Single after we have delivered the payload and someone + // has subscribed to it? + @ParameterizedTest(name = "{displayName} [{index}] discardEventsAfterCancel={0}") @ValueSource(booleans = {false, true}) void cancelAfterOnError(boolean discardEventsAfterCancel) { @@ -407,15 +413,11 @@ public void onNext(@Nullable final Buffer buffer) { if (receivedPayload.size() == 1) { assert subscription != null; subscription.cancel(); - subscription.cancel(); // intentionally cancel two times to make sure it's idempotent - verify(payloadSubscription, Mockito.never()).cancel(); - verifyNoMoreInteractions(beforeFinally); - + subscription.cancel(); // second to make sure it's idempotent. + verify(payloadSubscription, atMostOnce()).cancel(); + verify(beforeFinally).cancel(); payload.onNext(EMPTY_BUFFER); } - verify(payloadSubscription, Mockito.never()).cancel(); - verifyNoMoreInteractions(beforeFinally); - // Cancel will be propagated after this method returns } @Override @@ -435,7 +437,7 @@ public void onComplete() { verify(payloadSubscription).cancel(); verify(beforeFinally).cancel(); - assertThat("Unexpected payload body items", receivedPayload, contains(EMPTY_BUFFER, EMPTY_BUFFER)); + assertThat("Unexpected payload body items", receivedPayload, contains(EMPTY_BUFFER)); assertThat("Unexpected payload body termination", subscriberTerminal.get(), is(nullValue())); verifyNoMoreInteractions(beforeFinally); @@ -515,8 +517,10 @@ private void cancelFromTerminal() { assertThat("Payload was not cancelled", payloadSubscription.isCancelled(), is(true)); assertThat("Unexpected payload body items", receivedPayload, contains(EMPTY_BUFFER)); - assertThat("Unexpected payload body termination", subscriberTerminal.get(), equalTo(payloadTerminal)); - if (payloadTerminal.cause() == null) { +// assertThat("Unexpected payload body termination", subscriberTerminal.get(), equalTo(payloadTerminal)); + if (fromOnNext) { + verify(beforeFinally).cancel(); + } else if (payloadTerminal.cause() == null) { verify(beforeFinally).onComplete(); } else { verify(beforeFinally).onError(payloadTerminal.cause()); From 5efa94b8f34451a11a9dc71cb6a118d009f71f33 Mon Sep 17 00:00:00 2001 From: Bryce Anderson Date: Tue, 13 Aug 2024 10:26:14 -0600 Subject: [PATCH 03/12] Simplify the application of ResponseCompletionSubscriber --- .../http/utils/BeforeFinallyHttpOperator.java | 158 ++++++++---------- 1 file changed, 72 insertions(+), 86 deletions(-) diff --git a/servicetalk-http-utils/src/main/java/io/servicetalk/http/utils/BeforeFinallyHttpOperator.java b/servicetalk-http-utils/src/main/java/io/servicetalk/http/utils/BeforeFinallyHttpOperator.java index b864740b82..b597741236 100644 --- a/servicetalk-http-utils/src/main/java/io/servicetalk/http/utils/BeforeFinallyHttpOperator.java +++ b/servicetalk-http-utils/src/main/java/io/servicetalk/http/utils/BeforeFinallyHttpOperator.java @@ -24,7 +24,6 @@ import io.servicetalk.concurrent.api.SingleOperator; import io.servicetalk.concurrent.api.TerminalSignalConsumer; import io.servicetalk.concurrent.internal.CancelImmediatelySubscriber; -import io.servicetalk.concurrent.internal.DuplicateSubscribeException; import io.servicetalk.http.api.StreamingHttpRequest; import io.servicetalk.http.api.StreamingHttpResponse; @@ -120,22 +119,23 @@ private enum State { PROCESSING_PAYLOAD, RESPONSE_COMPLETE } + private static final AtomicReferenceFieldUpdater responseCompleteStateUpdater = AtomicReferenceFieldUpdater.newUpdater(ResponseCompletionSubscriber.class, State.class, "state"); private static final SingleSource.Subscriber NOOP_SUBSCRIBER = new SingleSource.Subscriber() { - @Override - public void onSubscribe(final Cancellable cancellable) { - } + @Override + public void onSubscribe(final Cancellable cancellable) { + } - @Override - public void onSuccess(@Nullable final StreamingHttpResponse result) { - } + @Override + public void onSuccess(@Nullable final StreamingHttpResponse result) { + } - @Override - public void onError(final Throwable t) { - } - }; + @Override + public void onError(final Throwable t) { + } + }; private SingleSource.Subscriber subscriber; private final TerminalSignalConsumer beforeFinally; @@ -171,8 +171,15 @@ public void onSuccess(@Nullable final StreamingHttpResponse response) { sendNullResponse(); } else if (responseCompleteStateUpdater.compareAndSet(this, State.IDLE, State.PROCESSING_PAYLOAD)) { subscriber.onSuccess(response.transformMessageBody(payload -> - payload.liftSync(messageBodySubscriber -> new MessageBodySubscriber(this, messageBodySubscriber, - beforeFinally, discardEventsAfterCancel)) + payload.liftSync(messageBodySubscriber -> + // TODO: is this legal to do here? It seems intrinsically racy in the error case but + // perhaps that will always be undefined behavior. + // Only the first subscriber needs to be wrapped. Followup subscribers will + // most likely fail because duplicate subscriptions to message bodies are not allowed. + responseCompleteStateUpdater.compareAndSet(this, + State.PROCESSING_PAYLOAD, State.RESPONSE_COMPLETE) ? + new MessageBodySubscriber(messageBodySubscriber, beforeFinally, discardEventsAfterCancel) : + messageBodySubscriber) )); } else { // Invoking a terminal method multiple times is not allowed by the RS spec, so we assume we have been @@ -183,7 +190,7 @@ public void onSuccess(@Nullable final StreamingHttpResponse response) { toSource(response.messageBody()).subscribe(CancelImmediatelySubscriber.INSTANCE); if (!discardEventsAfterCancel) { subscriber.onSuccess(response.transformMessageBody(payload -> - Publisher.failed(new CancellationException("Received response post cancel.")))); + Publisher.failed(new CancellationException("Received response post cancel.")))); } } dereferenceSubscriber(); @@ -227,92 +234,71 @@ private void dereferenceSubscriber() { // https://github.com/reactive-streams/reactive-streams-jvm/blob/v1.0.4/README.md#3.13 subscriber = NOOP_SUBSCRIBER; } + } - private static final class MessageBodySubscriber implements Subscriber { - - private static final int PROCESSING_PAYLOAD = 0; - private static final int CANCELLED = 2; - private static final int TERMINATED = -1; - - private static final AtomicIntegerFieldUpdater stateUpdater = - AtomicIntegerFieldUpdater.newUpdater(MessageBodySubscriber.class, "state"); - - private final ResponseCompletionSubscriber parent; - private final Subscriber subscriber; - private final TerminalSignalConsumer beforeFinally; - private final boolean discardEventsAfterCancel; - private volatile int state; - - MessageBodySubscriber(final ResponseCompletionSubscriber parent, - final Subscriber subscriber, - final TerminalSignalConsumer beforeFinally, - final boolean discardEventsAfterCancel) { - this.parent = parent; - this.subscriber = subscriber; - this.beforeFinally = beforeFinally; - this.discardEventsAfterCancel = discardEventsAfterCancel; - } + private static final class MessageBodySubscriber implements Subscriber { - @Override - public void onSubscribe(final Subscription subscription) { - if (!responseCompleteStateUpdater.compareAndSet(parent, State.PROCESSING_PAYLOAD, State.RESPONSE_COMPLETE)) { - // There was a cancellation between providing the response and subscribing to the payload - // body. We must discard everything. - subscription.cancel(); - subscriber.onSubscribe(new Subscription() { - @Override - public void request(long n) { + private static final int PROCESSING_PAYLOAD = 0; + private static final int CANCELLED = 2; + private static final int TERMINATED = -1; - } + private static final AtomicIntegerFieldUpdater stateUpdater = + AtomicIntegerFieldUpdater.newUpdater(MessageBodySubscriber.class, "state"); - @Override - public void cancel() { + private final Subscriber subscriber; + private final TerminalSignalConsumer beforeFinally; + private final boolean discardEventsAfterCancel; + private volatile int state; - } - }); - subscriber.onError(new DuplicateSubscribeException(null, subscriber, "TODO: be better")); - return; + MessageBodySubscriber(final Subscriber subscriber, + final TerminalSignalConsumer beforeFinally, + final boolean discardEventsAfterCancel) { + this.subscriber = subscriber; + this.beforeFinally = beforeFinally; + this.discardEventsAfterCancel = discardEventsAfterCancel; + } + + @Override + public void onSubscribe(final Subscription subscription) { + subscriber.onSubscribe(new Subscription() { + @Override + public void request(final long n) { + subscription.request(n); } - subscriber.onSubscribe(new Subscription() { - @Override - public void request(final long n) { - subscription.request(n); - } - @Override - public void cancel() { - try { - beforeFinally.cancel(); - } finally { - if (CANCELLED != stateUpdater.getAndSet(MessageBodySubscriber.this, CANCELLED)) { - subscription.cancel(); - } + @Override + public void cancel() { + try { + beforeFinally.cancel(); + } finally { + if (CANCELLED != stateUpdater.getAndSet(MessageBodySubscriber.this, CANCELLED)) { + subscription.cancel(); } } - }); - } - - @Override - public void onNext(@Nullable final Object o) { - if (!discardEventsAfterCancel || state == PROCESSING_PAYLOAD) { - subscriber.onNext(o); } + }); + } + + @Override + public void onNext(@Nullable final Object o) { + if (!discardEventsAfterCancel || state == PROCESSING_PAYLOAD) { + subscriber.onNext(o); } + } - @Override - public void onError(final Throwable t) { - if (stateUpdater.compareAndSet(this, PROCESSING_PAYLOAD, TERMINATED) || !discardEventsAfterCancel) { - beforeFinally.onError(t); - subscriber.onError(t); - } + @Override + public void onError(final Throwable t) { + if (stateUpdater.compareAndSet(this, PROCESSING_PAYLOAD, TERMINATED) || !discardEventsAfterCancel) { + beforeFinally.onError(t); + subscriber.onError(t); } + } - @Override - public void onComplete() { - if (stateUpdater.compareAndSet(this, PROCESSING_PAYLOAD, TERMINATED) || !discardEventsAfterCancel) { - beforeFinally.onComplete(); - subscriber.onComplete(); - } + @Override + public void onComplete() { + if (stateUpdater.compareAndSet(this, PROCESSING_PAYLOAD, TERMINATED) || !discardEventsAfterCancel) { + beforeFinally.onComplete(); + subscriber.onComplete(); } } } From e87c6489ce59e8cb4cc538c74cdcd082179c7a5c Mon Sep 17 00:00:00 2001 From: Bryce Anderson Date: Tue, 13 Aug 2024 11:03:19 -0600 Subject: [PATCH 04/12] Go back to int state, for readability --- .../http/utils/BeforeFinallyHttpOperator.java | 57 +++++++++---------- 1 file changed, 27 insertions(+), 30 deletions(-) diff --git a/servicetalk-http-utils/src/main/java/io/servicetalk/http/utils/BeforeFinallyHttpOperator.java b/servicetalk-http-utils/src/main/java/io/servicetalk/http/utils/BeforeFinallyHttpOperator.java index b597741236..3e9029d0e4 100644 --- a/servicetalk-http-utils/src/main/java/io/servicetalk/http/utils/BeforeFinallyHttpOperator.java +++ b/servicetalk-http-utils/src/main/java/io/servicetalk/http/utils/BeforeFinallyHttpOperator.java @@ -30,12 +30,12 @@ import java.util.concurrent.CancellationException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; -import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; import javax.annotation.Nullable; import static io.servicetalk.concurrent.api.SourceAdapters.toSource; import static io.servicetalk.utils.internal.ThrowableUtils.addSuppressed; import static java.util.Objects.requireNonNull; +import static java.util.concurrent.atomic.AtomicIntegerFieldUpdater.newUpdater; /** * Helper operator for signaling the end of an HTTP Request/Response cycle. @@ -114,33 +114,30 @@ public SingleSource.Subscriber apply( private static final class ResponseCompletionSubscriber implements SingleSource.Subscriber { - private enum State { - IDLE, - PROCESSING_PAYLOAD, - RESPONSE_COMPLETE - } - - private static final AtomicReferenceFieldUpdater responseCompleteStateUpdater = - AtomicReferenceFieldUpdater.newUpdater(ResponseCompletionSubscriber.class, State.class, "state"); + private static final int IDLE = 0; + private static final int PROCESSING_PAYLOAD = 1; + private static final int RESPONSE_COMPLETE = -1; + private static final AtomicIntegerFieldUpdater stateUpdater = + newUpdater(ResponseCompletionSubscriber.class, "state"); private static final SingleSource.Subscriber NOOP_SUBSCRIBER = - new SingleSource.Subscriber() { - @Override - public void onSubscribe(final Cancellable cancellable) { - } + new SingleSource.Subscriber() { + @Override + public void onSubscribe(final Cancellable cancellable) { + } - @Override - public void onSuccess(@Nullable final StreamingHttpResponse result) { - } + @Override + public void onSuccess(@Nullable final StreamingHttpResponse result) { + } - @Override - public void onError(final Throwable t) { - } - }; + @Override + public void onError(final Throwable t) { + } + }; private SingleSource.Subscriber subscriber; private final TerminalSignalConsumer beforeFinally; private final boolean discardEventsAfterCancel; - private volatile State state = State.IDLE; + private volatile int state; ResponseCompletionSubscriber(final SingleSource.Subscriber sub, final TerminalSignalConsumer beforeFinally, @@ -154,8 +151,8 @@ public void onError(final Throwable t) { public void onSubscribe(final Cancellable cancellable) { subscriber.onSubscribe(() -> { try { - final State previous = responseCompleteStateUpdater.getAndSet(this, State.RESPONSE_COMPLETE); - if ((previous == State.IDLE || previous == State.PROCESSING_PAYLOAD)) { + final int previous = stateUpdater.getAndSet(this, RESPONSE_COMPLETE); + if ((previous == IDLE || previous == PROCESSING_PAYLOAD)) { beforeFinally.cancel(); } } finally { @@ -169,22 +166,22 @@ public void onSubscribe(final Cancellable cancellable) { public void onSuccess(@Nullable final StreamingHttpResponse response) { if (response == null) { sendNullResponse(); - } else if (responseCompleteStateUpdater.compareAndSet(this, State.IDLE, State.PROCESSING_PAYLOAD)) { + } else if (stateUpdater.compareAndSet(this, IDLE, PROCESSING_PAYLOAD)) { subscriber.onSuccess(response.transformMessageBody(payload -> payload.liftSync(messageBodySubscriber -> // TODO: is this legal to do here? It seems intrinsically racy in the error case but // perhaps that will always be undefined behavior. // Only the first subscriber needs to be wrapped. Followup subscribers will // most likely fail because duplicate subscriptions to message bodies are not allowed. - responseCompleteStateUpdater.compareAndSet(this, - State.PROCESSING_PAYLOAD, State.RESPONSE_COMPLETE) ? + stateUpdater.compareAndSet(this, + PROCESSING_PAYLOAD, RESPONSE_COMPLETE) ? new MessageBodySubscriber(messageBodySubscriber, beforeFinally, discardEventsAfterCancel) : messageBodySubscriber) )); } else { // Invoking a terminal method multiple times is not allowed by the RS spec, so we assume we have been // cancelled. - assert state == State.RESPONSE_COMPLETE; + assert state == RESPONSE_COMPLETE; // The request has been cancelled, but we still received a response. We need to discard the response // body or risk leaking hot resources which are commonly attached to a message body. toSource(response.messageBody()).subscribe(CancelImmediatelySubscriber.INSTANCE); @@ -199,7 +196,7 @@ public void onSuccess(@Nullable final StreamingHttpResponse response) { @Override public void onError(final Throwable t) { try { - if (responseCompleteStateUpdater.compareAndSet(this, State.IDLE, State.RESPONSE_COMPLETE)) { + if (stateUpdater.compareAndSet(this, IDLE, RESPONSE_COMPLETE)) { beforeFinally.onError(t); } else if (discardEventsAfterCancel) { return; @@ -214,7 +211,7 @@ public void onError(final Throwable t) { private void sendNullResponse() { try { // Since, we are not giving out a response, no subscriber will arrive for the payload Publisher. - if (responseCompleteStateUpdater.compareAndSet(this, State.IDLE, State.RESPONSE_COMPLETE)) { + if (stateUpdater.compareAndSet(this, IDLE, RESPONSE_COMPLETE)) { beforeFinally.onComplete(); } else if (discardEventsAfterCancel) { return; @@ -243,7 +240,7 @@ private static final class MessageBodySubscriber implements Subscriber { private static final int TERMINATED = -1; private static final AtomicIntegerFieldUpdater stateUpdater = - AtomicIntegerFieldUpdater.newUpdater(MessageBodySubscriber.class, "state"); + newUpdater(MessageBodySubscriber.class, "state"); private final Subscriber subscriber; private final TerminalSignalConsumer beforeFinally; From 41066e2357d4ba995b5aaea4f6210b33432175c9 Mon Sep 17 00:00:00 2001 From: Bryce Anderson Date: Tue, 13 Aug 2024 12:07:54 -0600 Subject: [PATCH 05/12] Fix some tests and figure out why another is failing --- .../http/utils/BeforeFinallyHttpOperatorTest.java | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/servicetalk-http-utils/src/test/java/io/servicetalk/http/utils/BeforeFinallyHttpOperatorTest.java b/servicetalk-http-utils/src/test/java/io/servicetalk/http/utils/BeforeFinallyHttpOperatorTest.java index 234605cf1a..b942cd94c7 100644 --- a/servicetalk-http-utils/src/test/java/io/servicetalk/http/utils/BeforeFinallyHttpOperatorTest.java +++ b/servicetalk-http-utils/src/test/java/io/servicetalk/http/utils/BeforeFinallyHttpOperatorTest.java @@ -295,7 +295,7 @@ void cancelBeforeOnNextThenTerminate(boolean discardEventsAfterCancel, TerminalN subscriber.verifyResponseReceived(); subscriber.cancellable.cancel(); - verifyNoInteractions(beforeFinally); + verify(beforeFinally).cancel(); // We unconditionally cancel and let the original single handle the cancel post terminate responseSingle.verifyCancelled(); @@ -315,6 +315,8 @@ void cancelBeforeOnNextThenTerminate(boolean discardEventsAfterCancel, TerminalN payload.onError(payloadTerminal.cause()); } if (discardEventsAfterCancel) { + // TODO: these are failing because cancellation happened before subscribing to the message body and now + // we're skipping events. assertThat("Unexpected payload body items", payloadSubscriber.pollAllOnNext(), empty()); assertThat("Payload body terminated unexpectedly", payloadSubscriber.pollTerminal(100, MILLISECONDS), is(nullValue())); @@ -593,7 +595,8 @@ void resubscribeToPayloadBody(boolean discardEventsAfterCancel, boolean payloadE toSource(subscriber.response.payloadBody()).subscribe(payloadSubscriber2); payloadSubscriber2.awaitSubscription().request(MAX_VALUE); assertThat(payloadSubscriber2.awaitOnError(), is(instanceOf(DuplicateSubscribeException.class))); - verify(beforeFinally).onError(any(DuplicateSubscribeException.class)); + // second subscribe shouldn't interact. + verifyNoMoreInteractions(beforeFinally); } @ParameterizedTest(name = "{displayName} [{index}] discardEventsAfterCancel={0}") From 6418b6c8c9df9a080dccd788afac8bd33da66373 Mon Sep 17 00:00:00 2001 From: Bryce Anderson Date: Wed, 14 Aug 2024 16:34:46 -0600 Subject: [PATCH 06/12] Make CancelledSubscriber, but I will probably remove it later --- .../http/utils/BeforeFinallyHttpOperator.java | 53 ++++++++++++++++++- .../utils/BeforeFinallyHttpOperatorTest.java | 6 ++- 2 files changed, 55 insertions(+), 4 deletions(-) diff --git a/servicetalk-http-utils/src/main/java/io/servicetalk/http/utils/BeforeFinallyHttpOperator.java b/servicetalk-http-utils/src/main/java/io/servicetalk/http/utils/BeforeFinallyHttpOperator.java index 3e9029d0e4..0fb6435884 100644 --- a/servicetalk-http-utils/src/main/java/io/servicetalk/http/utils/BeforeFinallyHttpOperator.java +++ b/servicetalk-http-utils/src/main/java/io/servicetalk/http/utils/BeforeFinallyHttpOperator.java @@ -61,7 +61,18 @@ * .liftSync(new BeforeFinallyHttpOperator(tracker)); * } * - * We really want two things: to emit signals and to swallow all events after cancellation if so configured. + * What are the invariants that we want to support: + * - Single terminal event is called, eg only one of onComplete(), onError(..), or cancel(). + * - if discardEventsAfterCancel is called and the `cancel()` operation 'wins', the `onError` and `onSuccess(response)` + * pathways will not emit an event. + * - Does that means we quit emitting from the message body as well? + * - It has its own cancel notion: does that take over once the message body has been subscribed to, and does that + * cancel() need honor the `discardEventsAfterCancel` flag? + * - Support multiple subscribes to the message body + * - Does this mean that the first terminal event wins, or that the first subscribe gets to set the terminal event? + * - I'd say first subscribe 'wins' since in most real cases the second will result in a + * `DuplicateSubscribeException`. + * */ public final class BeforeFinallyHttpOperator implements SingleOperator { @@ -176,7 +187,7 @@ public void onSuccess(@Nullable final StreamingHttpResponse response) { stateUpdater.compareAndSet(this, PROCESSING_PAYLOAD, RESPONSE_COMPLETE) ? new MessageBodySubscriber(messageBodySubscriber, beforeFinally, discardEventsAfterCancel) : - messageBodySubscriber) + discardEventsAfterCancel ? new CancelledSubscriber(messageBodySubscriber) : messageBodySubscriber) )); } else { // Invoking a terminal method multiple times is not allowed by the RS spec, so we assume we have been @@ -300,6 +311,44 @@ public void onComplete() { } } + // TODO: do we really need this? + private static final class CancelledSubscriber implements Subscriber { + + private final Subscriber delegate; + + CancelledSubscriber(final Subscriber delegate) { + this.delegate = delegate; + } + + @Override + public void onSubscribe(Subscription subscription) { + subscription.cancel(); + delegate.onSubscribe(new Subscription() { + @Override + public void request(long n) { + // TODO: validate `n` + } + + @Override + public void cancel() { + } + }); + delegate.onError(new CancellationException("Subscribed to response body post cancel.")); + } + + @Override + public void onNext(@Nullable Object o) { + } + + @Override + public void onError(Throwable t) { + } + + @Override + public void onComplete() { + } + } + private static final class OnceTerminalSignalConsumer implements TerminalSignalConsumer { private final TerminalSignalConsumer delegate; diff --git a/servicetalk-http-utils/src/test/java/io/servicetalk/http/utils/BeforeFinallyHttpOperatorTest.java b/servicetalk-http-utils/src/test/java/io/servicetalk/http/utils/BeforeFinallyHttpOperatorTest.java index b942cd94c7..5c5340c407 100644 --- a/servicetalk-http-utils/src/test/java/io/servicetalk/http/utils/BeforeFinallyHttpOperatorTest.java +++ b/servicetalk-http-utils/src/test/java/io/servicetalk/http/utils/BeforeFinallyHttpOperatorTest.java @@ -315,8 +315,10 @@ void cancelBeforeOnNextThenTerminate(boolean discardEventsAfterCancel, TerminalN payload.onError(payloadTerminal.cause()); } if (discardEventsAfterCancel) { - // TODO: these are failing because cancellation happened before subscribing to the message body and now - // we're skipping events. + // TODO: this branch is failing because cancellation happened before we subscribed to the payload body + // so we didn't wrap it with anything. What should happen? If we don't do anything then we break + // the RS pattern because the request payload stream wasn't cancelled, so if we must send something + // or it is a hung stream. assertThat("Unexpected payload body items", payloadSubscriber.pollAllOnNext(), empty()); assertThat("Payload body terminated unexpectedly", payloadSubscriber.pollTerminal(100, MILLISECONDS), is(nullValue())); From d857ca051d7ab9388994669d7faae0df24eef56a Mon Sep 17 00:00:00 2001 From: Bryce Anderson Date: Wed, 14 Aug 2024 16:55:07 -0600 Subject: [PATCH 07/12] Fix tests and cleanup --- .../http/utils/BeforeFinallyHttpOperator.java | 46 ++----------- .../utils/BeforeFinallyHttpOperatorTest.java | 67 ++++++++++++++++--- 2 files changed, 61 insertions(+), 52 deletions(-) diff --git a/servicetalk-http-utils/src/main/java/io/servicetalk/http/utils/BeforeFinallyHttpOperator.java b/servicetalk-http-utils/src/main/java/io/servicetalk/http/utils/BeforeFinallyHttpOperator.java index 0fb6435884..f4f9594125 100644 --- a/servicetalk-http-utils/src/main/java/io/servicetalk/http/utils/BeforeFinallyHttpOperator.java +++ b/servicetalk-http-utils/src/main/java/io/servicetalk/http/utils/BeforeFinallyHttpOperator.java @@ -185,9 +185,9 @@ public void onSuccess(@Nullable final StreamingHttpResponse response) { // Only the first subscriber needs to be wrapped. Followup subscribers will // most likely fail because duplicate subscriptions to message bodies are not allowed. stateUpdater.compareAndSet(this, - PROCESSING_PAYLOAD, RESPONSE_COMPLETE) ? - new MessageBodySubscriber(messageBodySubscriber, beforeFinally, discardEventsAfterCancel) : - discardEventsAfterCancel ? new CancelledSubscriber(messageBodySubscriber) : messageBodySubscriber) + PROCESSING_PAYLOAD, RESPONSE_COMPLETE) ? new MessageBodySubscriber( + messageBodySubscriber, beforeFinally, discardEventsAfterCancel) : + messageBodySubscriber) )); } else { // Invoking a terminal method multiple times is not allowed by the RS spec, so we assume we have been @@ -311,51 +311,13 @@ public void onComplete() { } } - // TODO: do we really need this? - private static final class CancelledSubscriber implements Subscriber { - - private final Subscriber delegate; - - CancelledSubscriber(final Subscriber delegate) { - this.delegate = delegate; - } - - @Override - public void onSubscribe(Subscription subscription) { - subscription.cancel(); - delegate.onSubscribe(new Subscription() { - @Override - public void request(long n) { - // TODO: validate `n` - } - - @Override - public void cancel() { - } - }); - delegate.onError(new CancellationException("Subscribed to response body post cancel.")); - } - - @Override - public void onNext(@Nullable Object o) { - } - - @Override - public void onError(Throwable t) { - } - - @Override - public void onComplete() { - } - } - private static final class OnceTerminalSignalConsumer implements TerminalSignalConsumer { private final TerminalSignalConsumer delegate; // TODO: inline. private final AtomicBoolean once = new AtomicBoolean(); - public OnceTerminalSignalConsumer(final TerminalSignalConsumer delegate) { + OnceTerminalSignalConsumer(final TerminalSignalConsumer delegate) { this.delegate = delegate; } diff --git a/servicetalk-http-utils/src/test/java/io/servicetalk/http/utils/BeforeFinallyHttpOperatorTest.java b/servicetalk-http-utils/src/test/java/io/servicetalk/http/utils/BeforeFinallyHttpOperatorTest.java index 5c5340c407..6b4b52e8ac 100644 --- a/servicetalk-http-utils/src/test/java/io/servicetalk/http/utils/BeforeFinallyHttpOperatorTest.java +++ b/servicetalk-http-utils/src/test/java/io/servicetalk/http/utils/BeforeFinallyHttpOperatorTest.java @@ -76,7 +76,6 @@ import static org.hamcrest.Matchers.nullValue; import static org.hamcrest.Matchers.sameInstance; import static org.junit.jupiter.api.Assertions.assertThrows; -import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.atMostOnce; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; @@ -250,9 +249,6 @@ void cancelAfterOnSuccess(boolean discardEventsAfterCancel) { responseSingle.verifyCancelled(); } - // TODO: do we have a test where we get a cancel from the Single after we have delivered the payload and someone - // has subscribed to it? - @ParameterizedTest(name = "{displayName} [{index}] discardEventsAfterCancel={0}") @ValueSource(booleans = {false, true}) void cancelAfterOnError(boolean discardEventsAfterCancel) { @@ -276,7 +272,7 @@ void cancelAfterOnError(boolean discardEventsAfterCancel) { @ParameterizedTest(name = "{displayName} [{index}] discardEventsAfterCancel={0} payloadTerminal={1}") @MethodSource("booleanTerminalNotification") - void cancelBeforeOnNextThenTerminate(boolean discardEventsAfterCancel, TerminalNotification payloadTerminal) { + void cancelBeforeMessageBodySubscribe(boolean discardEventsAfterCancel, TerminalNotification payloadTerminal) { TestPublisher payload = new TestPublisher.Builder().disableAutoOnSubscribe().build(); TestSubscription payloadSubscription = new TestSubscription(); TestPublisherSubscriber payloadSubscriber = new TestPublisherSubscriber<>(); @@ -308,6 +304,57 @@ void cancelBeforeOnNextThenTerminate(boolean discardEventsAfterCancel, TerminalN payloadSubscriber.awaitSubscription().cancel(); assertThat("Payload was not cancelled", payloadSubscription.isCancelled(), is(true)); + payload.onNext(EMPTY_BUFFER); + if (payloadTerminal.cause() == null) { + payload.onComplete(); + } else { + payload.onError(payloadTerminal.cause()); + } + + assertThat("Unexpected payload body items", + payloadSubscriber.pollAllOnNext(), contains(EMPTY_BUFFER)); + if (payloadTerminal.cause() == null) { + payloadSubscriber.awaitOnComplete(); + } else { + assertThat(payloadSubscriber.awaitOnError(), is(DELIBERATE_EXCEPTION)); + } + } + + @ParameterizedTest(name = "{displayName} [{index}] discardEventsAfterCancel={0} payloadTerminal={1}") + @MethodSource("booleanTerminalNotification") + void cancelAfterMessageBodySubscribe(boolean discardEventsAfterCancel, TerminalNotification payloadTerminal) { + TestPublisher payload = new TestPublisher.Builder().disableAutoOnSubscribe().build(); + TestSubscription payloadSubscription = new TestSubscription(); + TestPublisherSubscriber payloadSubscriber = new TestPublisherSubscriber<>(); + + LegacyTestSingle responseSingle = new LegacyTestSingle<>(true); + final ResponseSubscriber subscriber = new ResponseSubscriber(); + toSource(responseSingle + .liftSync(new BeforeFinallyHttpOperator(beforeFinally, discardEventsAfterCancel))) + .subscribe(subscriber); + assertThat("onSubscribe not called.", subscriber.cancellable, is(notNullValue())); + + responseSingle.onSuccess(reqRespFactory.ok().payloadBody(payload)); + + verifyNoInteractions(beforeFinally); + responseSingle.verifyNotCancelled(); + subscriber.verifyResponseReceived(); + + assert subscriber.response != null; + toSource(subscriber.response.payloadBody()).subscribe(payloadSubscriber); + payload.onSubscribe(payloadSubscription); + payloadSubscriber.awaitSubscription().request(MAX_VALUE); + + // We unconditionally cancel and let the original single handle the cancel post terminate + subscriber.cancellable.cancel(); + // The ownership of `beforeFinally` has been transferred to the message body subscription. + verify(beforeFinally, Mockito.never()).cancel(); + responseSingle.verifyCancelled(); + + assertThat("Payload was prematurely cancelled", payloadSubscription.isCancelled(), is(false)); + payloadSubscriber.awaitSubscription().cancel(); + assertThat("Payload was not cancelled", payloadSubscription.isCancelled(), is(true)); + payload.onNext(EMPTY_BUFFER); if (payloadTerminal.cause() == null) { payload.onComplete(); @@ -315,10 +362,6 @@ void cancelBeforeOnNextThenTerminate(boolean discardEventsAfterCancel, TerminalN payload.onError(payloadTerminal.cause()); } if (discardEventsAfterCancel) { - // TODO: this branch is failing because cancellation happened before we subscribed to the payload body - // so we didn't wrap it with anything. What should happen? If we don't do anything then we break - // the RS pattern because the request payload stream wasn't cancelled, so if we must send something - // or it is a hung stream. assertThat("Unexpected payload body items", payloadSubscriber.pollAllOnNext(), empty()); assertThat("Payload body terminated unexpectedly", payloadSubscriber.pollTerminal(100, MILLISECONDS), is(nullValue())); @@ -521,7 +564,11 @@ private void cancelFromTerminal() { assertThat("Payload was not cancelled", payloadSubscription.isCancelled(), is(true)); assertThat("Unexpected payload body items", receivedPayload, contains(EMPTY_BUFFER)); -// assertThat("Unexpected payload body termination", subscriberTerminal.get(), equalTo(payloadTerminal)); + if (!fromOnNext) { + // We are discarding events after cancel, so if we cancel from onNext we won't be fed the terminal events. + assertThat("Unexpected payload body termination", subscriberTerminal.get(), + equalTo(payloadTerminal)); + } if (fromOnNext) { verify(beforeFinally).cancel(); } else if (payloadTerminal.cause() == null) { From 25224094d18575b9a7a0a8a9265d81d030d3cc48 Mon Sep 17 00:00:00 2001 From: Bryce Anderson Date: Wed, 14 Aug 2024 17:08:43 -0600 Subject: [PATCH 08/12] Fix doc string --- .../http/utils/BeforeFinallyHttpOperator.java | 21 +++++++------------ 1 file changed, 7 insertions(+), 14 deletions(-) diff --git a/servicetalk-http-utils/src/main/java/io/servicetalk/http/utils/BeforeFinallyHttpOperator.java b/servicetalk-http-utils/src/main/java/io/servicetalk/http/utils/BeforeFinallyHttpOperator.java index b5e1b715ef..e5bdafbbd0 100644 --- a/servicetalk-http-utils/src/main/java/io/servicetalk/http/utils/BeforeFinallyHttpOperator.java +++ b/servicetalk-http-utils/src/main/java/io/servicetalk/http/utils/BeforeFinallyHttpOperator.java @@ -45,7 +45,13 @@ * Request/Response cycle. One needs to consider and coordinate between the multitude of outcomes: cancel/success/error * across both sources.

*

This operator ensures that the provided callback is triggered just once whenever the sources reach a terminal - * state across both sources.

+ * state across both sources. An important question is when the ownership of the callback is transferred from the + * {@link Single} source and the payload {@link Publisher}. In this case ownership is transferred the first time the + * payload is subscribed to. This means that if a cancellation of the response {@link Single} occurs after the response + * has been emitted but before the message body has been subscribed to, the callback will observe a cancel. However, if + * the message payload has been subscribed to, the cancellation of the {@link Single} will have no effect and the result + * is dictated by the terminal event of the payload body. If the body is subscribed to multiple times, only the first + * subscribe will receive ownership of the terminal events.

* * Example usage tracking the begin and end of a request: * @@ -60,19 +66,6 @@ * .beforeOnSubscribe(__ -> tracker.requestStarted()) * .liftSync(new BeforeFinallyHttpOperator(tracker)); * } - * - * What are the invariants that we want to support: - * - Single terminal event is called, eg only one of onComplete(), onError(..), or cancel(). - * - if discardEventsAfterCancel is called and the `cancel()` operation 'wins', the `onError` and `onSuccess(response)` - * pathways will not emit an event. - * - Does that means we quit emitting from the message body as well? - * - It has its own cancel notion: does that take over once the message body has been subscribed to, and does that - * cancel() need honor the `discardEventsAfterCancel` flag? - * - Support multiple subscribes to the message body - * - Does this mean that the first terminal event wins, or that the first subscribe gets to set the terminal event? - * - I'd say first subscribe 'wins' since in most real cases the second will result in a - * `DuplicateSubscribeException`. - * */ public final class BeforeFinallyHttpOperator implements SingleOperator { From 4346a3156c6379adbff6df72560054adb9816ad6 Mon Sep 17 00:00:00 2001 From: Bryce Anderson Date: Mon, 19 Aug 2024 17:25:39 -0600 Subject: [PATCH 09/12] No longer need OnceTerminalSignalConsumer --- .../http/utils/BeforeFinallyHttpOperator.java | 212 +++++++++++++----- .../utils/BeforeFinallyHttpOperatorTest.java | 10 +- 2 files changed, 161 insertions(+), 61 deletions(-) diff --git a/servicetalk-http-utils/src/main/java/io/servicetalk/http/utils/BeforeFinallyHttpOperator.java b/servicetalk-http-utils/src/main/java/io/servicetalk/http/utils/BeforeFinallyHttpOperator.java index e5bdafbbd0..c42386702a 100644 --- a/servicetalk-http-utils/src/main/java/io/servicetalk/http/utils/BeforeFinallyHttpOperator.java +++ b/servicetalk-http-utils/src/main/java/io/servicetalk/http/utils/BeforeFinallyHttpOperator.java @@ -28,7 +28,6 @@ import io.servicetalk.http.api.StreamingHttpResponse; import java.util.concurrent.CancellationException; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import javax.annotation.Nullable; @@ -102,7 +101,7 @@ public BeforeFinallyHttpOperator(final Runnable beforeFinally) { * cancellation. */ public BeforeFinallyHttpOperator(final TerminalSignalConsumer beforeFinally, boolean discardEventsAfterCancel) { - this.beforeFinally = new OnceTerminalSignalConsumer(requireNonNull(beforeFinally)); + this.beforeFinally = requireNonNull(beforeFinally); this.discardEventsAfterCancel = discardEventsAfterCancel; } @@ -124,19 +123,19 @@ private static final class ResponseCompletionSubscriber implements SingleSource. private static final AtomicIntegerFieldUpdater stateUpdater = newUpdater(ResponseCompletionSubscriber.class, "state"); private static final SingleSource.Subscriber NOOP_SUBSCRIBER = - new SingleSource.Subscriber() { - @Override - public void onSubscribe(final Cancellable cancellable) { - } + new SingleSource.Subscriber() { + @Override + public void onSubscribe(final Cancellable cancellable) { + } - @Override - public void onSuccess(@Nullable final StreamingHttpResponse result) { - } + @Override + public void onSuccess(@Nullable final StreamingHttpResponse result) { + } - @Override - public void onError(final Throwable t) { - } - }; + @Override + public void onError(final Throwable t) { + } + }; private SingleSource.Subscriber subscriber; private final TerminalSignalConsumer beforeFinally; @@ -173,8 +172,6 @@ public void onSuccess(@Nullable final StreamingHttpResponse response) { } else if (stateUpdater.compareAndSet(this, IDLE, PROCESSING_PAYLOAD)) { subscriber.onSuccess(response.transformMessageBody(payload -> payload.liftSync(messageBodySubscriber -> - // TODO: is this legal to do here? It seems intrinsically racy in the error case but - // perhaps that will always be undefined behavior. // Only the first subscriber needs to be wrapped. Followup subscribers will // most likely fail because duplicate subscriptions to message bodies are not allowed. stateUpdater.compareAndSet(this, @@ -241,7 +238,8 @@ private void dereferenceSubscriber() { private static final class MessageBodySubscriber implements Subscriber { private static final int PROCESSING_PAYLOAD = 0; - private static final int CANCELLED = 2; + private static final int DELIVERING_PAYLOAD = 1; + private static final int AWAITING_CANCEL = 2; private static final int TERMINATED = -1; private static final AtomicIntegerFieldUpdater stateUpdater = @@ -251,6 +249,8 @@ private static final class MessageBodySubscriber implements Subscriber { private final TerminalSignalConsumer beforeFinally; private final boolean discardEventsAfterCancel; private volatile int state; + @Nullable + private Subscription subscription; MessageBodySubscriber(final Subscriber subscriber, final TerminalSignalConsumer beforeFinally, @@ -262,6 +262,7 @@ private static final class MessageBodySubscriber implements Subscriber { @Override public void onSubscribe(final Subscription subscription) { + this.subscription = subscription; subscriber.onSubscribe(new Subscription() { @Override public void request(final long n) { @@ -270,11 +271,43 @@ public void request(final long n) { @Override public void cancel() { - try { - beforeFinally.cancel(); - } finally { - if (CANCELLED != stateUpdater.getAndSet(MessageBodySubscriber.this, CANCELLED)) { + if (!discardEventsAfterCancel) { + try { + if (stateUpdater.compareAndSet(MessageBodySubscriber.this, + PROCESSING_PAYLOAD, TERMINATED)) { + beforeFinally.cancel(); + } + } finally { + subscription.cancel(); + } + return; + } + + for (;;) { + final int state = MessageBodySubscriber.this.state; + if (state == PROCESSING_PAYLOAD) { + if (stateUpdater.compareAndSet(MessageBodySubscriber.this, + PROCESSING_PAYLOAD, TERMINATED)) { + try { + beforeFinally.cancel(); + } finally { + subscription.cancel(); + } + break; + } + } else if (state == DELIVERING_PAYLOAD) { + if (stateUpdater.compareAndSet(MessageBodySubscriber.this, + DELIVERING_PAYLOAD, AWAITING_CANCEL)) { + break; + } + } else if (state == TERMINATED) { + // still propagate cancel to the original subscription: subscription.cancel(); + break; + } else { + // cancel can be invoked multiple times + assert state == AWAITING_CANCEL; + break; } } } @@ -283,61 +316,134 @@ public void cancel() { @Override public void onNext(@Nullable final Object o) { - if (!discardEventsAfterCancel || state == PROCESSING_PAYLOAD) { + if (!discardEventsAfterCancel) { subscriber.onNext(o); + return; + } + + boolean reentry = false; + for (;;) { + final int state = this.state; + if (state == TERMINATED) { + // We already cancelled and have to discard further events + return; + } + if (state == DELIVERING_PAYLOAD || state == AWAITING_CANCEL) { + reentry = true; + break; + } + if (stateUpdater.compareAndSet(this, PROCESSING_PAYLOAD, DELIVERING_PAYLOAD)) { + break; + } + } + + try { + subscriber.onNext(o); + } finally { + // Re-entry -> don't unlock + if (!reentry) { + for (;;) { + final int state = this.state; + assert state != PROCESSING_PAYLOAD; + if (state == TERMINATED) { + break; + } + if (state == DELIVERING_PAYLOAD) { + if (stateUpdater.compareAndSet(this, DELIVERING_PAYLOAD, PROCESSING_PAYLOAD)) { + break; + } + } else if (stateUpdater.compareAndSet(this, AWAITING_CANCEL, TERMINATED)) { + try { + beforeFinally.cancel(); + } finally { + assert subscription != null; + subscription.cancel(); + } + break; + } + } + } } } - @Override public void onError(final Throwable t) { - if (stateUpdater.compareAndSet(this, PROCESSING_PAYLOAD, TERMINATED) || !discardEventsAfterCancel) { + if (!discardEventsAfterCancel) { + try { + if (stateUpdater.compareAndSet(this, PROCESSING_PAYLOAD, TERMINATED)) { + beforeFinally.onError(t); + } + } catch (Throwable cause) { + addSuppressed(t, cause); + } + subscriber.onError(t); + return; + } + + final int prevState = setTerminalState(); + if (prevState == TERMINATED) { + // We already cancelled and have to discard further events + return; + } + // Propagate original cancel to let Subscription observe it + final boolean propagateCancel = prevState == AWAITING_CANCEL; + + try { beforeFinally.onError(t); + } catch (Throwable cause) { + addSuppressed(t, cause); + } + try { subscriber.onError(t); + } finally { + cancel0(propagateCancel); } } @Override public void onComplete() { - if (stateUpdater.compareAndSet(this, PROCESSING_PAYLOAD, TERMINATED) || !discardEventsAfterCancel) { - beforeFinally.onComplete(); + if (!discardEventsAfterCancel) { + try { + if (stateUpdater.compareAndSet(this, PROCESSING_PAYLOAD, TERMINATED)) { + beforeFinally.onComplete(); + } + } catch (Throwable cause) { + subscriber.onError(cause); + return; + } subscriber.onComplete(); + return; } - } - } - - private static final class OnceTerminalSignalConsumer implements TerminalSignalConsumer { - private final TerminalSignalConsumer delegate; - // TODO: inline. - private final AtomicBoolean once = new AtomicBoolean(); - - OnceTerminalSignalConsumer(final TerminalSignalConsumer delegate) { - this.delegate = delegate; - } - - @Override - public void onComplete() { - if (once()) { - delegate.onComplete(); + final int prevState = setTerminalState(); + if (prevState == TERMINATED) { + // We already cancelled and have to discard further events + return; } - } + // Propagate original cancel to let Subscription observe it + final boolean propagateCancel = prevState == AWAITING_CANCEL; - @Override - public void onError(Throwable throwable) { - if (once()) { - delegate.onError(throwable); + try { + try { + beforeFinally.onComplete(); + } catch (Throwable cause) { + subscriber.onError(cause); + return; + } + subscriber.onComplete(); + } finally { + cancel0(propagateCancel); } } - @Override - public void cancel() { - if (once()) { - delegate.cancel(); - } + private int setTerminalState() { + return stateUpdater.getAndSet(this, TERMINATED); } - private boolean once() { - return !once.getAndSet(true); + private void cancel0(final boolean propagateCancel) { + if (propagateCancel) { + assert subscription != null; + subscription.cancel(); + } } } } diff --git a/servicetalk-http-utils/src/test/java/io/servicetalk/http/utils/BeforeFinallyHttpOperatorTest.java b/servicetalk-http-utils/src/test/java/io/servicetalk/http/utils/BeforeFinallyHttpOperatorTest.java index 6b4b52e8ac..9bf4382266 100644 --- a/servicetalk-http-utils/src/test/java/io/servicetalk/http/utils/BeforeFinallyHttpOperatorTest.java +++ b/servicetalk-http-utils/src/test/java/io/servicetalk/http/utils/BeforeFinallyHttpOperatorTest.java @@ -564,14 +564,8 @@ private void cancelFromTerminal() { assertThat("Payload was not cancelled", payloadSubscription.isCancelled(), is(true)); assertThat("Unexpected payload body items", receivedPayload, contains(EMPTY_BUFFER)); - if (!fromOnNext) { - // We are discarding events after cancel, so if we cancel from onNext we won't be fed the terminal events. - assertThat("Unexpected payload body termination", subscriberTerminal.get(), - equalTo(payloadTerminal)); - } - if (fromOnNext) { - verify(beforeFinally).cancel(); - } else if (payloadTerminal.cause() == null) { + assertThat("Unexpected payload body termination", subscriberTerminal.get(), equalTo(payloadTerminal)); + if (payloadTerminal.cause() == null) { verify(beforeFinally).onComplete(); } else { verify(beforeFinally).onError(payloadTerminal.cause()); From 4285fa8ce312ec1bd13b9a06ce19bb8b1da484ca Mon Sep 17 00:00:00 2001 From: Bryce Anderson Date: Mon, 19 Aug 2024 17:50:10 -0600 Subject: [PATCH 10/12] Cleanup --- .../http/utils/BeforeFinallyHttpOperator.java | 1 + .../utils/BeforeFinallyHttpOperatorTest.java | 30 +++++++++++-------- 2 files changed, 18 insertions(+), 13 deletions(-) diff --git a/servicetalk-http-utils/src/main/java/io/servicetalk/http/utils/BeforeFinallyHttpOperator.java b/servicetalk-http-utils/src/main/java/io/servicetalk/http/utils/BeforeFinallyHttpOperator.java index c42386702a..dc3d445452 100644 --- a/servicetalk-http-utils/src/main/java/io/servicetalk/http/utils/BeforeFinallyHttpOperator.java +++ b/servicetalk-http-utils/src/main/java/io/servicetalk/http/utils/BeforeFinallyHttpOperator.java @@ -366,6 +366,7 @@ public void onNext(@Nullable final Object o) { } } + @Override public void onError(final Throwable t) { if (!discardEventsAfterCancel) { try { diff --git a/servicetalk-http-utils/src/test/java/io/servicetalk/http/utils/BeforeFinallyHttpOperatorTest.java b/servicetalk-http-utils/src/test/java/io/servicetalk/http/utils/BeforeFinallyHttpOperatorTest.java index 9bf4382266..d1ad89af6f 100644 --- a/servicetalk-http-utils/src/test/java/io/servicetalk/http/utils/BeforeFinallyHttpOperatorTest.java +++ b/servicetalk-http-utils/src/test/java/io/servicetalk/http/utils/BeforeFinallyHttpOperatorTest.java @@ -36,6 +36,7 @@ import io.servicetalk.http.api.StreamingHttpRequestResponseFactory; import io.servicetalk.http.api.StreamingHttpResponse; +import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.params.ParameterizedTest; @@ -76,7 +77,7 @@ import static org.hamcrest.Matchers.nullValue; import static org.hamcrest.Matchers.sameInstance; import static org.junit.jupiter.api.Assertions.assertThrows; -import static org.mockito.Mockito.atMostOnce; + import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoInteractions; @@ -91,6 +92,11 @@ class BeforeFinallyHttpOperatorTest { @Mock private TerminalSignalConsumer beforeFinally; + @AfterEach + void ensureOnlyOneSignal() { + verifyNoMoreInteractions(beforeFinally); + } + @SuppressWarnings("unused") private static Stream booleanTerminalNotification() { return Stream.of(Arguments.of(false, TerminalNotification.complete()), @@ -111,7 +117,6 @@ void nullAsSuccess(boolean discardEventsAfterCancel) { verify(beforeFinally).onComplete(); subscriber.verifyNullResponseReceived(); - verifyNoMoreInteractions(beforeFinally); } @ParameterizedTest(name = "{displayName} [{index}] discardEventsAfterCancel={0}") @@ -176,7 +181,6 @@ void cancelBeforeOnSuccess(boolean discardEventsAfterCancel) { Exception ex = assertThrows(Exception.class, () -> subscriber.response.payloadBody().toFuture().get()); assertThat(ex.getCause(), instanceOf(CancellationException.class)); } - verifyNoMoreInteractions(beforeFinally); } @ParameterizedTest(name = "{displayName} [{index}] discardEventsAfterCancel={0}") @@ -199,7 +203,6 @@ void cancelBeforeOnSuccessNull(boolean discardEventsAfterCancel) { } else { subscriber.verifyNullResponseReceived(); } - verifyNoMoreInteractions(beforeFinally); } @ParameterizedTest(name = "{displayName} [{index}] discardEventsAfterCancel={0}") @@ -222,7 +225,6 @@ void cancelBeforeOnError(boolean discardEventsAfterCancel) { } else { assertThat("onError not called.", subscriber.error, is(DELIBERATE_EXCEPTION)); } - verifyNoMoreInteractions(beforeFinally); } @ParameterizedTest(name = "{displayName} [{index}] discardEventsAfterCancel={0}") @@ -265,7 +267,6 @@ void cancelAfterOnError(boolean discardEventsAfterCancel) { assertThat("onError not called.", subscriber.error, is(DELIBERATE_EXCEPTION)); subscriber.cancellable.cancel(); - verifyNoMoreInteractions(beforeFinally); // We unconditionally cancel and let the original single handle the cancel post terminate responseSingle.verifyCancelled(); } @@ -354,6 +355,7 @@ void cancelAfterMessageBodySubscribe(boolean discardEventsAfterCancel, TerminalN assertThat("Payload was prematurely cancelled", payloadSubscription.isCancelled(), is(false)); payloadSubscriber.awaitSubscription().cancel(); assertThat("Payload was not cancelled", payloadSubscription.isCancelled(), is(true)); + verify(beforeFinally).cancel(); payload.onNext(EMPTY_BUFFER); if (payloadTerminal.cause() == null) { @@ -412,8 +414,10 @@ void cancelAfterOnNextThenTerminate(boolean discardEventsAfterCancel, TerminalNo payloadSubscriber.pollAllOnNext(), contains(EMPTY_BUFFER)); if (payloadTerminal.cause() == null) { payloadSubscriber.awaitOnComplete(); + verify(beforeFinally).onComplete(); } else { assertThat(payloadSubscriber.awaitOnError(), is(DELIBERATE_EXCEPTION)); + verify(beforeFinally).onError(DELIBERATE_EXCEPTION); } assertThat("Payload was prematurely cancelled", payloadSubscription.isCancelled(), is(false)); @@ -460,11 +464,14 @@ public void onNext(@Nullable final Buffer buffer) { if (receivedPayload.size() == 1) { assert subscription != null; subscription.cancel(); - subscription.cancel(); // second to make sure it's idempotent. - verify(payloadSubscription, atMostOnce()).cancel(); - verify(beforeFinally).cancel(); + subscription.cancel(); // intentionally cancel two times to make sure it's idempotent + verify(payloadSubscription, Mockito.never()).cancel(); + verifyNoMoreInteractions(beforeFinally); payload.onNext(EMPTY_BUFFER); } + verify(payloadSubscription, Mockito.never()).cancel(); + verifyNoMoreInteractions(beforeFinally); + // Cancel will be propagated after this method returns } @Override @@ -484,10 +491,8 @@ public void onComplete() { verify(payloadSubscription).cancel(); verify(beforeFinally).cancel(); - assertThat("Unexpected payload body items", receivedPayload, contains(EMPTY_BUFFER)); + assertThat("Unexpected payload body items", receivedPayload, contains(EMPTY_BUFFER, EMPTY_BUFFER)); assertThat("Unexpected payload body termination", subscriberTerminal.get(), is(nullValue())); - - verifyNoMoreInteractions(beforeFinally); } @ParameterizedTest(name = "{displayName} [{index}] fromOnNext={0} payloadTerminal={1}") @@ -570,7 +575,6 @@ private void cancelFromTerminal() { } else { verify(beforeFinally).onError(payloadTerminal.cause()); } - verifyNoMoreInteractions(beforeFinally); } @ParameterizedTest(name = "{displayName} [{index}] discardEventsAfterCancel={0}") From ab4bdb4e4f7df3ffe45f4dad5ff9b4fc59c4510a Mon Sep 17 00:00:00 2001 From: Bryce Anderson Date: Mon, 19 Aug 2024 18:01:19 -0600 Subject: [PATCH 11/12] Fix style --- .../io/servicetalk/http/utils/BeforeFinallyHttpOperatorTest.java | 1 - 1 file changed, 1 deletion(-) diff --git a/servicetalk-http-utils/src/test/java/io/servicetalk/http/utils/BeforeFinallyHttpOperatorTest.java b/servicetalk-http-utils/src/test/java/io/servicetalk/http/utils/BeforeFinallyHttpOperatorTest.java index d1ad89af6f..c0df8c2699 100644 --- a/servicetalk-http-utils/src/test/java/io/servicetalk/http/utils/BeforeFinallyHttpOperatorTest.java +++ b/servicetalk-http-utils/src/test/java/io/servicetalk/http/utils/BeforeFinallyHttpOperatorTest.java @@ -77,7 +77,6 @@ import static org.hamcrest.Matchers.nullValue; import static org.hamcrest.Matchers.sameInstance; import static org.junit.jupiter.api.Assertions.assertThrows; - import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoInteractions; From 3a0cbe65e2e63039aaa8d921aa8533cf5f5c1952 Mon Sep 17 00:00:00 2001 From: Bryce Anderson Date: Thu, 22 Aug 2024 09:23:46 -0600 Subject: [PATCH 12/12] Update servicetalk-http-utils/src/main/java/io/servicetalk/http/utils/BeforeFinallyHttpOperator.java Co-authored-by: Idel Pivnitskiy --- .../io/servicetalk/http/utils/BeforeFinallyHttpOperator.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/servicetalk-http-utils/src/main/java/io/servicetalk/http/utils/BeforeFinallyHttpOperator.java b/servicetalk-http-utils/src/main/java/io/servicetalk/http/utils/BeforeFinallyHttpOperator.java index dc3d445452..d8c30ad2c3 100644 --- a/servicetalk-http-utils/src/main/java/io/servicetalk/http/utils/BeforeFinallyHttpOperator.java +++ b/servicetalk-http-utils/src/main/java/io/servicetalk/http/utils/BeforeFinallyHttpOperator.java @@ -155,7 +155,7 @@ public void onSubscribe(final Cancellable cancellable) { subscriber.onSubscribe(() -> { try { final int previous = stateUpdater.getAndSet(this, RESPONSE_COMPLETE); - if ((previous == IDLE || previous == PROCESSING_PAYLOAD)) { + if (previous == IDLE || previous == PROCESSING_PAYLOAD) { beforeFinally.cancel(); } } finally {