From b2a7611907e1361562c978f9a2d356dff4f3ef1c Mon Sep 17 00:00:00 2001
From: Bryce Anderson
Date: Fri, 9 Aug 2024 12:16:03 -0600
Subject: [PATCH 01/12] Add some comments
---
.../servicetalk/http/utils/BeforeFinallyHttpOperator.java | 7 +++++++
1 file changed, 7 insertions(+)
diff --git a/servicetalk-http-utils/src/main/java/io/servicetalk/http/utils/BeforeFinallyHttpOperator.java b/servicetalk-http-utils/src/main/java/io/servicetalk/http/utils/BeforeFinallyHttpOperator.java
index c3bfe759df..255f6af726 100644
--- a/servicetalk-http-utils/src/main/java/io/servicetalk/http/utils/BeforeFinallyHttpOperator.java
+++ b/servicetalk-http-utils/src/main/java/io/servicetalk/http/utils/BeforeFinallyHttpOperator.java
@@ -147,6 +147,13 @@ public void onError(final Throwable t) {
public void onSubscribe(final Cancellable cancellable) {
subscriber.onSubscribe(() -> {
try {
+ // TODO: We may also need to complete the `beforeFinally()` if we're in the PROCESSING state.
+ // and also audit something else?
+ // Investigate if rdar://105295860 (BeforeFinallyHttpOperator: callbacks will be invoked more
+ // than once if users re-subscribe to response payload body) could be another source of
+ // misaligned limiter counters. Note that it does not affect iCloud because we guarantee we
+ // never re-subscribe inside http-client-servicetalk, but could be an issue for other users
+ // of ServiceTalk.
if (stateUpdater.compareAndSet(this, IDLE, TERMINATED)) {
beforeFinally.cancel();
}
From ff57352152284172797b63796bb4ff2a0bdef1eb Mon Sep 17 00:00:00 2001
From: Bryce Anderson
Date: Tue, 13 Aug 2024 09:50:09 -0600
Subject: [PATCH 02/12] This might be a start, but still work to do
---
.../http/utils/BeforeFinallyHttpOperator.java | 320 ++++++------------
.../utils/BeforeFinallyHttpOperatorTest.java | 26 +-
2 files changed, 124 insertions(+), 222 deletions(-)
diff --git a/servicetalk-http-utils/src/main/java/io/servicetalk/http/utils/BeforeFinallyHttpOperator.java b/servicetalk-http-utils/src/main/java/io/servicetalk/http/utils/BeforeFinallyHttpOperator.java
index 255f6af726..b864740b82 100644
--- a/servicetalk-http-utils/src/main/java/io/servicetalk/http/utils/BeforeFinallyHttpOperator.java
+++ b/servicetalk-http-utils/src/main/java/io/servicetalk/http/utils/BeforeFinallyHttpOperator.java
@@ -24,17 +24,19 @@
import io.servicetalk.concurrent.api.SingleOperator;
import io.servicetalk.concurrent.api.TerminalSignalConsumer;
import io.servicetalk.concurrent.internal.CancelImmediatelySubscriber;
+import io.servicetalk.concurrent.internal.DuplicateSubscribeException;
import io.servicetalk.http.api.StreamingHttpRequest;
import io.servicetalk.http.api.StreamingHttpResponse;
import java.util.concurrent.CancellationException;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import javax.annotation.Nullable;
import static io.servicetalk.concurrent.api.SourceAdapters.toSource;
import static io.servicetalk.utils.internal.ThrowableUtils.addSuppressed;
import static java.util.Objects.requireNonNull;
-import static java.util.concurrent.atomic.AtomicIntegerFieldUpdater.newUpdater;
/**
* Helper operator for signaling the end of an HTTP Request/Response cycle.
@@ -59,6 +61,8 @@
* .beforeOnSubscribe(__ -> tracker.requestStarted())
* .liftSync(new BeforeFinallyHttpOperator(tracker));
* }
+ *
+ * We really want two things: to emit signals and to swallow all events after cancellation if so configured.
*/
public final class BeforeFinallyHttpOperator implements SingleOperator {
@@ -95,7 +99,7 @@ public BeforeFinallyHttpOperator(final Runnable beforeFinally) {
* cancellation.
*/
public BeforeFinallyHttpOperator(final TerminalSignalConsumer beforeFinally, boolean discardEventsAfterCancel) {
- this.beforeFinally = requireNonNull(beforeFinally);
+ this.beforeFinally = new OnceTerminalSignalConsumer(requireNonNull(beforeFinally));
this.discardEventsAfterCancel = discardEventsAfterCancel;
}
@@ -110,11 +114,14 @@ public SingleSource.Subscriber super StreamingHttpResponse> apply(
}
private static final class ResponseCompletionSubscriber implements SingleSource.Subscriber {
- private static final int IDLE = 0;
- private static final int PROCESSING_PAYLOAD = 1;
- private static final int TERMINATED = -1;
- private static final AtomicIntegerFieldUpdater stateUpdater =
- newUpdater(ResponseCompletionSubscriber.class, "state");
+
+ private enum State {
+ IDLE,
+ PROCESSING_PAYLOAD,
+ RESPONSE_COMPLETE
+ }
+ private static final AtomicReferenceFieldUpdater responseCompleteStateUpdater =
+ AtomicReferenceFieldUpdater.newUpdater(ResponseCompletionSubscriber.class, State.class, "state");
private static final SingleSource.Subscriber NOOP_SUBSCRIBER =
new SingleSource.Subscriber() {
@Override
@@ -133,7 +140,7 @@ public void onError(final Throwable t) {
private SingleSource.Subscriber super StreamingHttpResponse> subscriber;
private final TerminalSignalConsumer beforeFinally;
private final boolean discardEventsAfterCancel;
- private volatile int state;
+ private volatile State state = State.IDLE;
ResponseCompletionSubscriber(final SingleSource.Subscriber super StreamingHttpResponse> sub,
final TerminalSignalConsumer beforeFinally,
@@ -147,14 +154,8 @@ public void onError(final Throwable t) {
public void onSubscribe(final Cancellable cancellable) {
subscriber.onSubscribe(() -> {
try {
- // TODO: We may also need to complete the `beforeFinally()` if we're in the PROCESSING state.
- // and also audit something else?
- // Investigate if rdar://105295860 (BeforeFinallyHttpOperator: callbacks will be invoked more
- // than once if users re-subscribe to response payload body) could be another source of
- // misaligned limiter counters. Note that it does not affect iCloud because we guarantee we
- // never re-subscribe inside http-client-servicetalk, but could be an issue for other users
- // of ServiceTalk.
- if (stateUpdater.compareAndSet(this, IDLE, TERMINATED)) {
+ final State previous = responseCompleteStateUpdater.getAndSet(this, State.RESPONSE_COMPLETE);
+ if ((previous == State.IDLE || previous == State.PROCESSING_PAYLOAD)) {
beforeFinally.cancel();
}
} finally {
@@ -168,15 +169,15 @@ public void onSubscribe(final Cancellable cancellable) {
public void onSuccess(@Nullable final StreamingHttpResponse response) {
if (response == null) {
sendNullResponse();
- } else if (stateUpdater.compareAndSet(this, IDLE, PROCESSING_PAYLOAD)) {
+ } else if (responseCompleteStateUpdater.compareAndSet(this, State.IDLE, State.PROCESSING_PAYLOAD)) {
subscriber.onSuccess(response.transformMessageBody(payload ->
- payload.liftSync(messageBodySubscriber -> new MessageBodySubscriber(messageBodySubscriber,
+ payload.liftSync(messageBodySubscriber -> new MessageBodySubscriber(this, messageBodySubscriber,
beforeFinally, discardEventsAfterCancel))
));
} else {
// Invoking a terminal method multiple times is not allowed by the RS spec, so we assume we have been
// cancelled.
- assert state == TERMINATED;
+ assert state == State.RESPONSE_COMPLETE;
// The request has been cancelled, but we still received a response. We need to discard the response
// body or risk leaking hot resources which are commonly attached to a message body.
toSource(response.messageBody()).subscribe(CancelImmediatelySubscriber.INSTANCE);
@@ -191,7 +192,7 @@ public void onSuccess(@Nullable final StreamingHttpResponse response) {
@Override
public void onError(final Throwable t) {
try {
- if (stateUpdater.compareAndSet(this, IDLE, TERMINATED)) {
+ if (responseCompleteStateUpdater.compareAndSet(this, State.IDLE, State.RESPONSE_COMPLETE)) {
beforeFinally.onError(t);
} else if (discardEventsAfterCancel) {
return;
@@ -206,7 +207,7 @@ public void onError(final Throwable t) {
private void sendNullResponse() {
try {
// Since, we are not giving out a response, no subscriber will arrive for the payload Publisher.
- if (stateUpdater.compareAndSet(this, IDLE, TERMINATED)) {
+ if (responseCompleteStateUpdater.compareAndSet(this, State.IDLE, State.RESPONSE_COMPLETE)) {
beforeFinally.onComplete();
} else if (discardEventsAfterCancel) {
return;
@@ -226,232 +227,129 @@ private void dereferenceSubscriber() {
// https://github.com/reactive-streams/reactive-streams-jvm/blob/v1.0.4/README.md#3.13
subscriber = NOOP_SUBSCRIBER;
}
- }
- private static final class MessageBodySubscriber implements Subscriber
*
This operator ensures that the provided callback is triggered just once whenever the sources reach a terminal
- * state across both sources.
+ * state across both sources. An important question is when the ownership of the callback is transferred from the
+ * {@link Single} source and the payload {@link Publisher}. In this case ownership is transferred the first time the
+ * payload is subscribed to. This means that if a cancellation of the response {@link Single} occurs after the response
+ * has been emitted but before the message body has been subscribed to, the callback will observe a cancel. However, if
+ * the message payload has been subscribed to, the cancellation of the {@link Single} will have no effect and the result
+ * is dictated by the terminal event of the payload body. If the body is subscribed to multiple times, only the first
+ * subscribe will receive ownership of the terminal events.
*
* Example usage tracking the begin and end of a request:
*
@@ -60,19 +66,6 @@
* .beforeOnSubscribe(__ -> tracker.requestStarted())
* .liftSync(new BeforeFinallyHttpOperator(tracker));
* }
- *
- * What are the invariants that we want to support:
- * - Single terminal event is called, eg only one of onComplete(), onError(..), or cancel().
- * - if discardEventsAfterCancel is called and the `cancel()` operation 'wins', the `onError` and `onSuccess(response)`
- * pathways will not emit an event.
- * - Does that means we quit emitting from the message body as well?
- * - It has its own cancel notion: does that take over once the message body has been subscribed to, and does that
- * cancel() need honor the `discardEventsAfterCancel` flag?
- * - Support multiple subscribes to the message body
- * - Does this mean that the first terminal event wins, or that the first subscribe gets to set the terminal event?
- * - I'd say first subscribe 'wins' since in most real cases the second will result in a
- * `DuplicateSubscribeException`.
- *
*/
public final class BeforeFinallyHttpOperator implements SingleOperator {
From 4346a3156c6379adbff6df72560054adb9816ad6 Mon Sep 17 00:00:00 2001
From: Bryce Anderson
Date: Mon, 19 Aug 2024 17:25:39 -0600
Subject: [PATCH 09/12] No longer need OnceTerminalSignalConsumer
---
.../http/utils/BeforeFinallyHttpOperator.java | 212 +++++++++++++-----
.../utils/BeforeFinallyHttpOperatorTest.java | 10 +-
2 files changed, 161 insertions(+), 61 deletions(-)
diff --git a/servicetalk-http-utils/src/main/java/io/servicetalk/http/utils/BeforeFinallyHttpOperator.java b/servicetalk-http-utils/src/main/java/io/servicetalk/http/utils/BeforeFinallyHttpOperator.java
index e5bdafbbd0..c42386702a 100644
--- a/servicetalk-http-utils/src/main/java/io/servicetalk/http/utils/BeforeFinallyHttpOperator.java
+++ b/servicetalk-http-utils/src/main/java/io/servicetalk/http/utils/BeforeFinallyHttpOperator.java
@@ -28,7 +28,6 @@
import io.servicetalk.http.api.StreamingHttpResponse;
import java.util.concurrent.CancellationException;
-import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import javax.annotation.Nullable;
@@ -102,7 +101,7 @@ public BeforeFinallyHttpOperator(final Runnable beforeFinally) {
* cancellation.
*/
public BeforeFinallyHttpOperator(final TerminalSignalConsumer beforeFinally, boolean discardEventsAfterCancel) {
- this.beforeFinally = new OnceTerminalSignalConsumer(requireNonNull(beforeFinally));
+ this.beforeFinally = requireNonNull(beforeFinally);
this.discardEventsAfterCancel = discardEventsAfterCancel;
}
@@ -124,19 +123,19 @@ private static final class ResponseCompletionSubscriber implements SingleSource.
private static final AtomicIntegerFieldUpdater stateUpdater =
newUpdater(ResponseCompletionSubscriber.class, "state");
private static final SingleSource.Subscriber NOOP_SUBSCRIBER =
- new SingleSource.Subscriber() {
- @Override
- public void onSubscribe(final Cancellable cancellable) {
- }
+ new SingleSource.Subscriber() {
+ @Override
+ public void onSubscribe(final Cancellable cancellable) {
+ }
- @Override
- public void onSuccess(@Nullable final StreamingHttpResponse result) {
- }
+ @Override
+ public void onSuccess(@Nullable final StreamingHttpResponse result) {
+ }
- @Override
- public void onError(final Throwable t) {
- }
- };
+ @Override
+ public void onError(final Throwable t) {
+ }
+ };
private SingleSource.Subscriber super StreamingHttpResponse> subscriber;
private final TerminalSignalConsumer beforeFinally;
@@ -173,8 +172,6 @@ public void onSuccess(@Nullable final StreamingHttpResponse response) {
} else if (stateUpdater.compareAndSet(this, IDLE, PROCESSING_PAYLOAD)) {
subscriber.onSuccess(response.transformMessageBody(payload ->
payload.liftSync(messageBodySubscriber ->
- // TODO: is this legal to do here? It seems intrinsically racy in the error case but
- // perhaps that will always be undefined behavior.
// Only the first subscriber needs to be wrapped. Followup subscribers will
// most likely fail because duplicate subscriptions to message bodies are not allowed.
stateUpdater.compareAndSet(this,
@@ -241,7 +238,8 @@ private void dereferenceSubscriber() {
private static final class MessageBodySubscriber implements Subscriber {
private static final int PROCESSING_PAYLOAD = 0;
- private static final int CANCELLED = 2;
+ private static final int DELIVERING_PAYLOAD = 1;
+ private static final int AWAITING_CANCEL = 2;
private static final int TERMINATED = -1;
private static final AtomicIntegerFieldUpdater stateUpdater =
@@ -251,6 +249,8 @@ private static final class MessageBodySubscriber implements Subscriber {
private final TerminalSignalConsumer beforeFinally;
private final boolean discardEventsAfterCancel;
private volatile int state;
+ @Nullable
+ private Subscription subscription;
MessageBodySubscriber(final Subscriber super Object> subscriber,
final TerminalSignalConsumer beforeFinally,
@@ -262,6 +262,7 @@ private static final class MessageBodySubscriber implements Subscriber {
@Override
public void onSubscribe(final Subscription subscription) {
+ this.subscription = subscription;
subscriber.onSubscribe(new Subscription() {
@Override
public void request(final long n) {
@@ -270,11 +271,43 @@ public void request(final long n) {
@Override
public void cancel() {
- try {
- beforeFinally.cancel();
- } finally {
- if (CANCELLED != stateUpdater.getAndSet(MessageBodySubscriber.this, CANCELLED)) {
+ if (!discardEventsAfterCancel) {
+ try {
+ if (stateUpdater.compareAndSet(MessageBodySubscriber.this,
+ PROCESSING_PAYLOAD, TERMINATED)) {
+ beforeFinally.cancel();
+ }
+ } finally {
+ subscription.cancel();
+ }
+ return;
+ }
+
+ for (;;) {
+ final int state = MessageBodySubscriber.this.state;
+ if (state == PROCESSING_PAYLOAD) {
+ if (stateUpdater.compareAndSet(MessageBodySubscriber.this,
+ PROCESSING_PAYLOAD, TERMINATED)) {
+ try {
+ beforeFinally.cancel();
+ } finally {
+ subscription.cancel();
+ }
+ break;
+ }
+ } else if (state == DELIVERING_PAYLOAD) {
+ if (stateUpdater.compareAndSet(MessageBodySubscriber.this,
+ DELIVERING_PAYLOAD, AWAITING_CANCEL)) {
+ break;
+ }
+ } else if (state == TERMINATED) {
+ // still propagate cancel to the original subscription:
subscription.cancel();
+ break;
+ } else {
+ // cancel can be invoked multiple times
+ assert state == AWAITING_CANCEL;
+ break;
}
}
}
@@ -283,61 +316,134 @@ public void cancel() {
@Override
public void onNext(@Nullable final Object o) {
- if (!discardEventsAfterCancel || state == PROCESSING_PAYLOAD) {
+ if (!discardEventsAfterCancel) {
subscriber.onNext(o);
+ return;
+ }
+
+ boolean reentry = false;
+ for (;;) {
+ final int state = this.state;
+ if (state == TERMINATED) {
+ // We already cancelled and have to discard further events
+ return;
+ }
+ if (state == DELIVERING_PAYLOAD || state == AWAITING_CANCEL) {
+ reentry = true;
+ break;
+ }
+ if (stateUpdater.compareAndSet(this, PROCESSING_PAYLOAD, DELIVERING_PAYLOAD)) {
+ break;
+ }
+ }
+
+ try {
+ subscriber.onNext(o);
+ } finally {
+ // Re-entry -> don't unlock
+ if (!reentry) {
+ for (;;) {
+ final int state = this.state;
+ assert state != PROCESSING_PAYLOAD;
+ if (state == TERMINATED) {
+ break;
+ }
+ if (state == DELIVERING_PAYLOAD) {
+ if (stateUpdater.compareAndSet(this, DELIVERING_PAYLOAD, PROCESSING_PAYLOAD)) {
+ break;
+ }
+ } else if (stateUpdater.compareAndSet(this, AWAITING_CANCEL, TERMINATED)) {
+ try {
+ beforeFinally.cancel();
+ } finally {
+ assert subscription != null;
+ subscription.cancel();
+ }
+ break;
+ }
+ }
+ }
}
}
- @Override
public void onError(final Throwable t) {
- if (stateUpdater.compareAndSet(this, PROCESSING_PAYLOAD, TERMINATED) || !discardEventsAfterCancel) {
+ if (!discardEventsAfterCancel) {
+ try {
+ if (stateUpdater.compareAndSet(this, PROCESSING_PAYLOAD, TERMINATED)) {
+ beforeFinally.onError(t);
+ }
+ } catch (Throwable cause) {
+ addSuppressed(t, cause);
+ }
+ subscriber.onError(t);
+ return;
+ }
+
+ final int prevState = setTerminalState();
+ if (prevState == TERMINATED) {
+ // We already cancelled and have to discard further events
+ return;
+ }
+ // Propagate original cancel to let Subscription observe it
+ final boolean propagateCancel = prevState == AWAITING_CANCEL;
+
+ try {
beforeFinally.onError(t);
+ } catch (Throwable cause) {
+ addSuppressed(t, cause);
+ }
+ try {
subscriber.onError(t);
+ } finally {
+ cancel0(propagateCancel);
}
}
@Override
public void onComplete() {
- if (stateUpdater.compareAndSet(this, PROCESSING_PAYLOAD, TERMINATED) || !discardEventsAfterCancel) {
- beforeFinally.onComplete();
+ if (!discardEventsAfterCancel) {
+ try {
+ if (stateUpdater.compareAndSet(this, PROCESSING_PAYLOAD, TERMINATED)) {
+ beforeFinally.onComplete();
+ }
+ } catch (Throwable cause) {
+ subscriber.onError(cause);
+ return;
+ }
subscriber.onComplete();
+ return;
}
- }
- }
-
- private static final class OnceTerminalSignalConsumer implements TerminalSignalConsumer {
- private final TerminalSignalConsumer delegate;
- // TODO: inline.
- private final AtomicBoolean once = new AtomicBoolean();
-
- OnceTerminalSignalConsumer(final TerminalSignalConsumer delegate) {
- this.delegate = delegate;
- }
-
- @Override
- public void onComplete() {
- if (once()) {
- delegate.onComplete();
+ final int prevState = setTerminalState();
+ if (prevState == TERMINATED) {
+ // We already cancelled and have to discard further events
+ return;
}
- }
+ // Propagate original cancel to let Subscription observe it
+ final boolean propagateCancel = prevState == AWAITING_CANCEL;
- @Override
- public void onError(Throwable throwable) {
- if (once()) {
- delegate.onError(throwable);
+ try {
+ try {
+ beforeFinally.onComplete();
+ } catch (Throwable cause) {
+ subscriber.onError(cause);
+ return;
+ }
+ subscriber.onComplete();
+ } finally {
+ cancel0(propagateCancel);
}
}
- @Override
- public void cancel() {
- if (once()) {
- delegate.cancel();
- }
+ private int setTerminalState() {
+ return stateUpdater.getAndSet(this, TERMINATED);
}
- private boolean once() {
- return !once.getAndSet(true);
+ private void cancel0(final boolean propagateCancel) {
+ if (propagateCancel) {
+ assert subscription != null;
+ subscription.cancel();
+ }
}
}
}
diff --git a/servicetalk-http-utils/src/test/java/io/servicetalk/http/utils/BeforeFinallyHttpOperatorTest.java b/servicetalk-http-utils/src/test/java/io/servicetalk/http/utils/BeforeFinallyHttpOperatorTest.java
index 6b4b52e8ac..9bf4382266 100644
--- a/servicetalk-http-utils/src/test/java/io/servicetalk/http/utils/BeforeFinallyHttpOperatorTest.java
+++ b/servicetalk-http-utils/src/test/java/io/servicetalk/http/utils/BeforeFinallyHttpOperatorTest.java
@@ -564,14 +564,8 @@ private void cancelFromTerminal() {
assertThat("Payload was not cancelled", payloadSubscription.isCancelled(), is(true));
assertThat("Unexpected payload body items", receivedPayload, contains(EMPTY_BUFFER));
- if (!fromOnNext) {
- // We are discarding events after cancel, so if we cancel from onNext we won't be fed the terminal events.
- assertThat("Unexpected payload body termination", subscriberTerminal.get(),
- equalTo(payloadTerminal));
- }
- if (fromOnNext) {
- verify(beforeFinally).cancel();
- } else if (payloadTerminal.cause() == null) {
+ assertThat("Unexpected payload body termination", subscriberTerminal.get(), equalTo(payloadTerminal));
+ if (payloadTerminal.cause() == null) {
verify(beforeFinally).onComplete();
} else {
verify(beforeFinally).onError(payloadTerminal.cause());
From 4285fa8ce312ec1bd13b9a06ce19bb8b1da484ca Mon Sep 17 00:00:00 2001
From: Bryce Anderson
Date: Mon, 19 Aug 2024 17:50:10 -0600
Subject: [PATCH 10/12] Cleanup
---
.../http/utils/BeforeFinallyHttpOperator.java | 1 +
.../utils/BeforeFinallyHttpOperatorTest.java | 30 +++++++++++--------
2 files changed, 18 insertions(+), 13 deletions(-)
diff --git a/servicetalk-http-utils/src/main/java/io/servicetalk/http/utils/BeforeFinallyHttpOperator.java b/servicetalk-http-utils/src/main/java/io/servicetalk/http/utils/BeforeFinallyHttpOperator.java
index c42386702a..dc3d445452 100644
--- a/servicetalk-http-utils/src/main/java/io/servicetalk/http/utils/BeforeFinallyHttpOperator.java
+++ b/servicetalk-http-utils/src/main/java/io/servicetalk/http/utils/BeforeFinallyHttpOperator.java
@@ -366,6 +366,7 @@ public void onNext(@Nullable final Object o) {
}
}
+ @Override
public void onError(final Throwable t) {
if (!discardEventsAfterCancel) {
try {
diff --git a/servicetalk-http-utils/src/test/java/io/servicetalk/http/utils/BeforeFinallyHttpOperatorTest.java b/servicetalk-http-utils/src/test/java/io/servicetalk/http/utils/BeforeFinallyHttpOperatorTest.java
index 9bf4382266..d1ad89af6f 100644
--- a/servicetalk-http-utils/src/test/java/io/servicetalk/http/utils/BeforeFinallyHttpOperatorTest.java
+++ b/servicetalk-http-utils/src/test/java/io/servicetalk/http/utils/BeforeFinallyHttpOperatorTest.java
@@ -36,6 +36,7 @@
import io.servicetalk.http.api.StreamingHttpRequestResponseFactory;
import io.servicetalk.http.api.StreamingHttpResponse;
+import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.params.ParameterizedTest;
@@ -76,7 +77,7 @@
import static org.hamcrest.Matchers.nullValue;
import static org.hamcrest.Matchers.sameInstance;
import static org.junit.jupiter.api.Assertions.assertThrows;
-import static org.mockito.Mockito.atMostOnce;
+
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoInteractions;
@@ -91,6 +92,11 @@ class BeforeFinallyHttpOperatorTest {
@Mock
private TerminalSignalConsumer beforeFinally;
+ @AfterEach
+ void ensureOnlyOneSignal() {
+ verifyNoMoreInteractions(beforeFinally);
+ }
+
@SuppressWarnings("unused")
private static Stream booleanTerminalNotification() {
return Stream.of(Arguments.of(false, TerminalNotification.complete()),
@@ -111,7 +117,6 @@ void nullAsSuccess(boolean discardEventsAfterCancel) {
verify(beforeFinally).onComplete();
subscriber.verifyNullResponseReceived();
- verifyNoMoreInteractions(beforeFinally);
}
@ParameterizedTest(name = "{displayName} [{index}] discardEventsAfterCancel={0}")
@@ -176,7 +181,6 @@ void cancelBeforeOnSuccess(boolean discardEventsAfterCancel) {
Exception ex = assertThrows(Exception.class, () -> subscriber.response.payloadBody().toFuture().get());
assertThat(ex.getCause(), instanceOf(CancellationException.class));
}
- verifyNoMoreInteractions(beforeFinally);
}
@ParameterizedTest(name = "{displayName} [{index}] discardEventsAfterCancel={0}")
@@ -199,7 +203,6 @@ void cancelBeforeOnSuccessNull(boolean discardEventsAfterCancel) {
} else {
subscriber.verifyNullResponseReceived();
}
- verifyNoMoreInteractions(beforeFinally);
}
@ParameterizedTest(name = "{displayName} [{index}] discardEventsAfterCancel={0}")
@@ -222,7 +225,6 @@ void cancelBeforeOnError(boolean discardEventsAfterCancel) {
} else {
assertThat("onError not called.", subscriber.error, is(DELIBERATE_EXCEPTION));
}
- verifyNoMoreInteractions(beforeFinally);
}
@ParameterizedTest(name = "{displayName} [{index}] discardEventsAfterCancel={0}")
@@ -265,7 +267,6 @@ void cancelAfterOnError(boolean discardEventsAfterCancel) {
assertThat("onError not called.", subscriber.error, is(DELIBERATE_EXCEPTION));
subscriber.cancellable.cancel();
- verifyNoMoreInteractions(beforeFinally);
// We unconditionally cancel and let the original single handle the cancel post terminate
responseSingle.verifyCancelled();
}
@@ -354,6 +355,7 @@ void cancelAfterMessageBodySubscribe(boolean discardEventsAfterCancel, TerminalN
assertThat("Payload was prematurely cancelled", payloadSubscription.isCancelled(), is(false));
payloadSubscriber.awaitSubscription().cancel();
assertThat("Payload was not cancelled", payloadSubscription.isCancelled(), is(true));
+ verify(beforeFinally).cancel();
payload.onNext(EMPTY_BUFFER);
if (payloadTerminal.cause() == null) {
@@ -412,8 +414,10 @@ void cancelAfterOnNextThenTerminate(boolean discardEventsAfterCancel, TerminalNo
payloadSubscriber.pollAllOnNext(), contains(EMPTY_BUFFER));
if (payloadTerminal.cause() == null) {
payloadSubscriber.awaitOnComplete();
+ verify(beforeFinally).onComplete();
} else {
assertThat(payloadSubscriber.awaitOnError(), is(DELIBERATE_EXCEPTION));
+ verify(beforeFinally).onError(DELIBERATE_EXCEPTION);
}
assertThat("Payload was prematurely cancelled", payloadSubscription.isCancelled(), is(false));
@@ -460,11 +464,14 @@ public void onNext(@Nullable final Buffer buffer) {
if (receivedPayload.size() == 1) {
assert subscription != null;
subscription.cancel();
- subscription.cancel(); // second to make sure it's idempotent.
- verify(payloadSubscription, atMostOnce()).cancel();
- verify(beforeFinally).cancel();
+ subscription.cancel(); // intentionally cancel two times to make sure it's idempotent
+ verify(payloadSubscription, Mockito.never()).cancel();
+ verifyNoMoreInteractions(beforeFinally);
payload.onNext(EMPTY_BUFFER);
}
+ verify(payloadSubscription, Mockito.never()).cancel();
+ verifyNoMoreInteractions(beforeFinally);
+ // Cancel will be propagated after this method returns
}
@Override
@@ -484,10 +491,8 @@ public void onComplete() {
verify(payloadSubscription).cancel();
verify(beforeFinally).cancel();
- assertThat("Unexpected payload body items", receivedPayload, contains(EMPTY_BUFFER));
+ assertThat("Unexpected payload body items", receivedPayload, contains(EMPTY_BUFFER, EMPTY_BUFFER));
assertThat("Unexpected payload body termination", subscriberTerminal.get(), is(nullValue()));
-
- verifyNoMoreInteractions(beforeFinally);
}
@ParameterizedTest(name = "{displayName} [{index}] fromOnNext={0} payloadTerminal={1}")
@@ -570,7 +575,6 @@ private void cancelFromTerminal() {
} else {
verify(beforeFinally).onError(payloadTerminal.cause());
}
- verifyNoMoreInteractions(beforeFinally);
}
@ParameterizedTest(name = "{displayName} [{index}] discardEventsAfterCancel={0}")
From ab4bdb4e4f7df3ffe45f4dad5ff9b4fc59c4510a Mon Sep 17 00:00:00 2001
From: Bryce Anderson
Date: Mon, 19 Aug 2024 18:01:19 -0600
Subject: [PATCH 11/12] Fix style
---
.../io/servicetalk/http/utils/BeforeFinallyHttpOperatorTest.java | 1 -
1 file changed, 1 deletion(-)
diff --git a/servicetalk-http-utils/src/test/java/io/servicetalk/http/utils/BeforeFinallyHttpOperatorTest.java b/servicetalk-http-utils/src/test/java/io/servicetalk/http/utils/BeforeFinallyHttpOperatorTest.java
index d1ad89af6f..c0df8c2699 100644
--- a/servicetalk-http-utils/src/test/java/io/servicetalk/http/utils/BeforeFinallyHttpOperatorTest.java
+++ b/servicetalk-http-utils/src/test/java/io/servicetalk/http/utils/BeforeFinallyHttpOperatorTest.java
@@ -77,7 +77,6 @@
import static org.hamcrest.Matchers.nullValue;
import static org.hamcrest.Matchers.sameInstance;
import static org.junit.jupiter.api.Assertions.assertThrows;
-
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoInteractions;
From 3a0cbe65e2e63039aaa8d921aa8533cf5f5c1952 Mon Sep 17 00:00:00 2001
From: Bryce Anderson
Date: Thu, 22 Aug 2024 09:23:46 -0600
Subject: [PATCH 12/12] Update
servicetalk-http-utils/src/main/java/io/servicetalk/http/utils/BeforeFinallyHttpOperator.java
Co-authored-by: Idel Pivnitskiy
---
.../io/servicetalk/http/utils/BeforeFinallyHttpOperator.java | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/servicetalk-http-utils/src/main/java/io/servicetalk/http/utils/BeforeFinallyHttpOperator.java b/servicetalk-http-utils/src/main/java/io/servicetalk/http/utils/BeforeFinallyHttpOperator.java
index dc3d445452..d8c30ad2c3 100644
--- a/servicetalk-http-utils/src/main/java/io/servicetalk/http/utils/BeforeFinallyHttpOperator.java
+++ b/servicetalk-http-utils/src/main/java/io/servicetalk/http/utils/BeforeFinallyHttpOperator.java
@@ -155,7 +155,7 @@ public void onSubscribe(final Cancellable cancellable) {
subscriber.onSubscribe(() -> {
try {
final int previous = stateUpdater.getAndSet(this, RESPONSE_COMPLETE);
- if ((previous == IDLE || previous == PROCESSING_PAYLOAD)) {
+ if (previous == IDLE || previous == PROCESSING_PAYLOAD) {
beforeFinally.cancel();
}
} finally {