From 372f108e7f5763aa5fe39b8f7b19b3ce163b924e Mon Sep 17 00:00:00 2001
From: Bryce Anderson
Date: Thu, 22 Aug 2024 09:58:22 -0600
Subject: [PATCH] http-utils: cleanup the BeforeFinallyHttpOperator state
(#3042)
Motivation:
The BeforeFinallyHttpOperator has a few shortcomings
- It doesn't honor the contract of calling the callbacks only once in the case of multiple subscribes.
- It won't honor a cancel if in the PROCESSING_PAYLOAD state, which has some weird callback lifetime questions.
Modifications:
- If we receive a cancellation on the Single before the message body has been subscribed, that counts as a cancel. Once the message body is subscribed ownership of the callbacks are fully transferred to that subscription.
- Only the first body subscribe gets ownership of the callbacks.
Co-authored-by: Idel Pivnitskiy
---
.../http/utils/BeforeFinallyHttpOperator.java | 45 +++++------
.../utils/BeforeFinallyHttpOperatorTest.java | 81 +++++++++++++++----
2 files changed, 89 insertions(+), 37 deletions(-)
diff --git a/servicetalk-http-utils/src/main/java/io/servicetalk/http/utils/BeforeFinallyHttpOperator.java b/servicetalk-http-utils/src/main/java/io/servicetalk/http/utils/BeforeFinallyHttpOperator.java
index 0537523836..d8c30ad2c3 100644
--- a/servicetalk-http-utils/src/main/java/io/servicetalk/http/utils/BeforeFinallyHttpOperator.java
+++ b/servicetalk-http-utils/src/main/java/io/servicetalk/http/utils/BeforeFinallyHttpOperator.java
@@ -44,7 +44,13 @@
* Request/Response cycle. One needs to consider and coordinate between the multitude of outcomes: cancel/success/error
* across both sources.
* This operator ensures that the provided callback is triggered just once whenever the sources reach a terminal
- * state across both sources.
+ * state across both sources. An important question is when the ownership of the callback is transferred from the
+ * {@link Single} source and the payload {@link Publisher}. In this case ownership is transferred the first time the
+ * payload is subscribed to. This means that if a cancellation of the response {@link Single} occurs after the response
+ * has been emitted but before the message body has been subscribed to, the callback will observe a cancel. However, if
+ * the message payload has been subscribed to, the cancellation of the {@link Single} will have no effect and the result
+ * is dictated by the terminal event of the payload body. If the body is subscribed to multiple times, only the first
+ * subscribe will receive ownership of the terminal events.
*
* Example usage tracking the begin and end of a request:
*
@@ -110,9 +116,10 @@ public SingleSource.Subscriber super StreamingHttpResponse> apply(
}
private static final class ResponseCompletionSubscriber implements SingleSource.Subscriber {
+
private static final int IDLE = 0;
private static final int PROCESSING_PAYLOAD = 1;
- private static final int TERMINATED = -1;
+ private static final int RESPONSE_COMPLETE = -1;
private static final AtomicIntegerFieldUpdater stateUpdater =
newUpdater(ResponseCompletionSubscriber.class, "state");
private static final SingleSource.Subscriber NOOP_SUBSCRIBER =
@@ -147,7 +154,8 @@ public void onError(final Throwable t) {
public void onSubscribe(final Cancellable cancellable) {
subscriber.onSubscribe(() -> {
try {
- if (stateUpdater.compareAndSet(this, IDLE, TERMINATED)) {
+ final int previous = stateUpdater.getAndSet(this, RESPONSE_COMPLETE);
+ if (previous == IDLE || previous == PROCESSING_PAYLOAD) {
beforeFinally.cancel();
}
} finally {
@@ -163,13 +171,18 @@ public void onSuccess(@Nullable final StreamingHttpResponse response) {
sendNullResponse();
} else if (stateUpdater.compareAndSet(this, IDLE, PROCESSING_PAYLOAD)) {
subscriber.onSuccess(response.transformMessageBody(payload ->
- payload.liftSync(messageBodySubscriber -> new MessageBodySubscriber(messageBodySubscriber,
- beforeFinally, discardEventsAfterCancel))
+ payload.liftSync(messageBodySubscriber ->
+ // Only the first subscriber needs to be wrapped. Followup subscribers will
+ // most likely fail because duplicate subscriptions to message bodies are not allowed.
+ stateUpdater.compareAndSet(this,
+ PROCESSING_PAYLOAD, RESPONSE_COMPLETE) ? new MessageBodySubscriber(
+ messageBodySubscriber, beforeFinally, discardEventsAfterCancel) :
+ messageBodySubscriber)
));
} else {
// Invoking a terminal method multiple times is not allowed by the RS spec, so we assume we have been
// cancelled.
- assert state == TERMINATED;
+ assert state == RESPONSE_COMPLETE;
// The request has been cancelled, but we still received a response. We need to discard the response
// body or risk leaking hot resources which are commonly attached to a message body.
toSource(response.messageBody()).subscribe(CancelImmediatelySubscriber.INSTANCE);
@@ -185,7 +198,7 @@ public void onSuccess(@Nullable final StreamingHttpResponse response) {
@Override
public void onError(final Throwable t) {
try {
- if (stateUpdater.compareAndSet(this, IDLE, TERMINATED)) {
+ if (stateUpdater.compareAndSet(this, IDLE, RESPONSE_COMPLETE)) {
beforeFinally.onError(t);
} else if (discardEventsAfterCancel) {
return;
@@ -200,7 +213,7 @@ public void onError(final Throwable t) {
private void sendNullResponse() {
try {
// Since, we are not giving out a response, no subscriber will arrive for the payload Publisher.
- if (stateUpdater.compareAndSet(this, IDLE, TERMINATED)) {
+ if (stateUpdater.compareAndSet(this, IDLE, RESPONSE_COMPLETE)) {
beforeFinally.onComplete();
} else if (discardEventsAfterCancel) {
return;
@@ -424,21 +437,7 @@ public void onComplete() {
}
private int setTerminalState() {
- for (;;) {
- final int state = this.state;
- if (state == TERMINATED) {
- // We already cancelled and have to discard further events
- return state;
- }
- if (state == PROCESSING_PAYLOAD) {
- if (stateUpdater.compareAndSet(this, PROCESSING_PAYLOAD, TERMINATED)) {
- return state;
- }
- } else if (stateUpdater.compareAndSet(this, state, TERMINATED)) {
- // re-entry, but we can terminate because this is a final event:
- return state;
- }
- }
+ return stateUpdater.getAndSet(this, TERMINATED);
}
private void cancel0(final boolean propagateCancel) {
diff --git a/servicetalk-http-utils/src/test/java/io/servicetalk/http/utils/BeforeFinallyHttpOperatorTest.java b/servicetalk-http-utils/src/test/java/io/servicetalk/http/utils/BeforeFinallyHttpOperatorTest.java
index b168a19df4..c0df8c2699 100644
--- a/servicetalk-http-utils/src/test/java/io/servicetalk/http/utils/BeforeFinallyHttpOperatorTest.java
+++ b/servicetalk-http-utils/src/test/java/io/servicetalk/http/utils/BeforeFinallyHttpOperatorTest.java
@@ -36,6 +36,7 @@
import io.servicetalk.http.api.StreamingHttpRequestResponseFactory;
import io.servicetalk.http.api.StreamingHttpResponse;
+import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.params.ParameterizedTest;
@@ -76,7 +77,6 @@
import static org.hamcrest.Matchers.nullValue;
import static org.hamcrest.Matchers.sameInstance;
import static org.junit.jupiter.api.Assertions.assertThrows;
-import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoInteractions;
@@ -91,6 +91,11 @@ class BeforeFinallyHttpOperatorTest {
@Mock
private TerminalSignalConsumer beforeFinally;
+ @AfterEach
+ void ensureOnlyOneSignal() {
+ verifyNoMoreInteractions(beforeFinally);
+ }
+
@SuppressWarnings("unused")
private static Stream booleanTerminalNotification() {
return Stream.of(Arguments.of(false, TerminalNotification.complete()),
@@ -111,7 +116,6 @@ void nullAsSuccess(boolean discardEventsAfterCancel) {
verify(beforeFinally).onComplete();
subscriber.verifyNullResponseReceived();
- verifyNoMoreInteractions(beforeFinally);
}
@ParameterizedTest(name = "{displayName} [{index}] discardEventsAfterCancel={0}")
@@ -176,7 +180,6 @@ void cancelBeforeOnSuccess(boolean discardEventsAfterCancel) {
Exception ex = assertThrows(Exception.class, () -> subscriber.response.payloadBody().toFuture().get());
assertThat(ex.getCause(), instanceOf(CancellationException.class));
}
- verifyNoMoreInteractions(beforeFinally);
}
@ParameterizedTest(name = "{displayName} [{index}] discardEventsAfterCancel={0}")
@@ -199,7 +202,6 @@ void cancelBeforeOnSuccessNull(boolean discardEventsAfterCancel) {
} else {
subscriber.verifyNullResponseReceived();
}
- verifyNoMoreInteractions(beforeFinally);
}
@ParameterizedTest(name = "{displayName} [{index}] discardEventsAfterCancel={0}")
@@ -222,7 +224,6 @@ void cancelBeforeOnError(boolean discardEventsAfterCancel) {
} else {
assertThat("onError not called.", subscriber.error, is(DELIBERATE_EXCEPTION));
}
- verifyNoMoreInteractions(beforeFinally);
}
@ParameterizedTest(name = "{displayName} [{index}] discardEventsAfterCancel={0}")
@@ -242,7 +243,9 @@ void cancelAfterOnSuccess(boolean discardEventsAfterCancel) {
subscriber.verifyResponseReceived();
subscriber.cancellable.cancel();
- verifyNoInteractions(beforeFinally);
+
+ // We should get a cancel because we haven't subscribed to the payload body.
+ verify(beforeFinally).cancel();
// We unconditionally cancel and let the original single handle the cancel post terminate
responseSingle.verifyCancelled();
}
@@ -263,14 +266,13 @@ void cancelAfterOnError(boolean discardEventsAfterCancel) {
assertThat("onError not called.", subscriber.error, is(DELIBERATE_EXCEPTION));
subscriber.cancellable.cancel();
- verifyNoMoreInteractions(beforeFinally);
// We unconditionally cancel and let the original single handle the cancel post terminate
responseSingle.verifyCancelled();
}
@ParameterizedTest(name = "{displayName} [{index}] discardEventsAfterCancel={0} payloadTerminal={1}")
@MethodSource("booleanTerminalNotification")
- void cancelBeforeOnNextThenTerminate(boolean discardEventsAfterCancel, TerminalNotification payloadTerminal) {
+ void cancelBeforeMessageBodySubscribe(boolean discardEventsAfterCancel, TerminalNotification payloadTerminal) {
TestPublisher payload = new TestPublisher.Builder().disableAutoOnSubscribe().build();
TestSubscription payloadSubscription = new TestSubscription();
TestPublisherSubscriber payloadSubscriber = new TestPublisherSubscriber<>();
@@ -289,7 +291,7 @@ void cancelBeforeOnNextThenTerminate(boolean discardEventsAfterCancel, TerminalN
subscriber.verifyResponseReceived();
subscriber.cancellable.cancel();
- verifyNoInteractions(beforeFinally);
+ verify(beforeFinally).cancel();
// We unconditionally cancel and let the original single handle the cancel post terminate
responseSingle.verifyCancelled();
@@ -302,6 +304,58 @@ void cancelBeforeOnNextThenTerminate(boolean discardEventsAfterCancel, TerminalN
payloadSubscriber.awaitSubscription().cancel();
assertThat("Payload was not cancelled", payloadSubscription.isCancelled(), is(true));
+ payload.onNext(EMPTY_BUFFER);
+ if (payloadTerminal.cause() == null) {
+ payload.onComplete();
+ } else {
+ payload.onError(payloadTerminal.cause());
+ }
+
+ assertThat("Unexpected payload body items",
+ payloadSubscriber.pollAllOnNext(), contains(EMPTY_BUFFER));
+ if (payloadTerminal.cause() == null) {
+ payloadSubscriber.awaitOnComplete();
+ } else {
+ assertThat(payloadSubscriber.awaitOnError(), is(DELIBERATE_EXCEPTION));
+ }
+ }
+
+ @ParameterizedTest(name = "{displayName} [{index}] discardEventsAfterCancel={0} payloadTerminal={1}")
+ @MethodSource("booleanTerminalNotification")
+ void cancelAfterMessageBodySubscribe(boolean discardEventsAfterCancel, TerminalNotification payloadTerminal) {
+ TestPublisher payload = new TestPublisher.Builder().disableAutoOnSubscribe().build();
+ TestSubscription payloadSubscription = new TestSubscription();
+ TestPublisherSubscriber payloadSubscriber = new TestPublisherSubscriber<>();
+
+ LegacyTestSingle responseSingle = new LegacyTestSingle<>(true);
+ final ResponseSubscriber subscriber = new ResponseSubscriber();
+ toSource(responseSingle
+ .liftSync(new BeforeFinallyHttpOperator(beforeFinally, discardEventsAfterCancel)))
+ .subscribe(subscriber);
+ assertThat("onSubscribe not called.", subscriber.cancellable, is(notNullValue()));
+
+ responseSingle.onSuccess(reqRespFactory.ok().payloadBody(payload));
+
+ verifyNoInteractions(beforeFinally);
+ responseSingle.verifyNotCancelled();
+ subscriber.verifyResponseReceived();
+
+ assert subscriber.response != null;
+ toSource(subscriber.response.payloadBody()).subscribe(payloadSubscriber);
+ payload.onSubscribe(payloadSubscription);
+ payloadSubscriber.awaitSubscription().request(MAX_VALUE);
+
+ // We unconditionally cancel and let the original single handle the cancel post terminate
+ subscriber.cancellable.cancel();
+ // The ownership of `beforeFinally` has been transferred to the message body subscription.
+ verify(beforeFinally, Mockito.never()).cancel();
+ responseSingle.verifyCancelled();
+
+ assertThat("Payload was prematurely cancelled", payloadSubscription.isCancelled(), is(false));
+ payloadSubscriber.awaitSubscription().cancel();
+ assertThat("Payload was not cancelled", payloadSubscription.isCancelled(), is(true));
+ verify(beforeFinally).cancel();
+
payload.onNext(EMPTY_BUFFER);
if (payloadTerminal.cause() == null) {
payload.onComplete();
@@ -359,8 +413,10 @@ void cancelAfterOnNextThenTerminate(boolean discardEventsAfterCancel, TerminalNo
payloadSubscriber.pollAllOnNext(), contains(EMPTY_BUFFER));
if (payloadTerminal.cause() == null) {
payloadSubscriber.awaitOnComplete();
+ verify(beforeFinally).onComplete();
} else {
assertThat(payloadSubscriber.awaitOnError(), is(DELIBERATE_EXCEPTION));
+ verify(beforeFinally).onError(DELIBERATE_EXCEPTION);
}
assertThat("Payload was prematurely cancelled", payloadSubscription.isCancelled(), is(false));
@@ -410,7 +466,6 @@ public void onNext(@Nullable final Buffer buffer) {
subscription.cancel(); // intentionally cancel two times to make sure it's idempotent
verify(payloadSubscription, Mockito.never()).cancel();
verifyNoMoreInteractions(beforeFinally);
-
payload.onNext(EMPTY_BUFFER);
}
verify(payloadSubscription, Mockito.never()).cancel();
@@ -437,8 +492,6 @@ public void onComplete() {
assertThat("Unexpected payload body items", receivedPayload, contains(EMPTY_BUFFER, EMPTY_BUFFER));
assertThat("Unexpected payload body termination", subscriberTerminal.get(), is(nullValue()));
-
- verifyNoMoreInteractions(beforeFinally);
}
@ParameterizedTest(name = "{displayName} [{index}] fromOnNext={0} payloadTerminal={1}")
@@ -521,7 +574,6 @@ private void cancelFromTerminal() {
} else {
verify(beforeFinally).onError(payloadTerminal.cause());
}
- verifyNoMoreInteractions(beforeFinally);
}
@ParameterizedTest(name = "{displayName} [{index}] discardEventsAfterCancel={0}")
@@ -589,7 +641,8 @@ void resubscribeToPayloadBody(boolean discardEventsAfterCancel, boolean payloadE
toSource(subscriber.response.payloadBody()).subscribe(payloadSubscriber2);
payloadSubscriber2.awaitSubscription().request(MAX_VALUE);
assertThat(payloadSubscriber2.awaitOnError(), is(instanceOf(DuplicateSubscribeException.class)));
- verify(beforeFinally).onError(any(DuplicateSubscribeException.class));
+ // second subscribe shouldn't interact.
+ verifyNoMoreInteractions(beforeFinally);
}
@ParameterizedTest(name = "{displayName} [{index}] discardEventsAfterCancel={0}")