Skip to content

Commit

Permalink
http-utils: cleanup the BeforeFinallyHttpOperator state (#3042)
Browse files Browse the repository at this point in the history
Motivation:

The BeforeFinallyHttpOperator has a few shortcomings

- It doesn't honor the contract of calling the callbacks only once in the case of multiple subscribes.
- It won't honor a cancel if in the PROCESSING_PAYLOAD state, which has some weird callback lifetime questions.

Modifications:

- If we receive a cancellation on the Single before the message body has been subscribed, that counts as a cancel. Once the message body is subscribed ownership of the callbacks are fully transferred to that subscription.
- Only the first body subscribe gets ownership of the callbacks.

Co-authored-by: Idel Pivnitskiy <[email protected]>
  • Loading branch information
bryce-anderson and idelpivnitskiy authored Aug 22, 2024
1 parent 44a5b80 commit 372f108
Show file tree
Hide file tree
Showing 2 changed files with 89 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,13 @@
* Request/Response cycle. One needs to consider and coordinate between the multitude of outcomes: cancel/success/error
* across both sources.</p>
* <p>This operator ensures that the provided callback is triggered just once whenever the sources reach a terminal
* state across both sources.</p>
* state across both sources. An important question is when the ownership of the callback is transferred from the
* {@link Single} source and the payload {@link Publisher}. In this case ownership is transferred the first time the
* payload is subscribed to. This means that if a cancellation of the response {@link Single} occurs after the response
* has been emitted but before the message body has been subscribed to, the callback will observe a cancel. However, if
* the message payload has been subscribed to, the cancellation of the {@link Single} will have no effect and the result
* is dictated by the terminal event of the payload body. If the body is subscribed to multiple times, only the first
* subscribe will receive ownership of the terminal events.</p>
*
* Example usage tracking the begin and end of a request:
*
Expand Down Expand Up @@ -110,9 +116,10 @@ public SingleSource.Subscriber<? super StreamingHttpResponse> apply(
}

private static final class ResponseCompletionSubscriber implements SingleSource.Subscriber<StreamingHttpResponse> {

private static final int IDLE = 0;
private static final int PROCESSING_PAYLOAD = 1;
private static final int TERMINATED = -1;
private static final int RESPONSE_COMPLETE = -1;
private static final AtomicIntegerFieldUpdater<ResponseCompletionSubscriber> stateUpdater =
newUpdater(ResponseCompletionSubscriber.class, "state");
private static final SingleSource.Subscriber<StreamingHttpResponse> NOOP_SUBSCRIBER =
Expand Down Expand Up @@ -147,7 +154,8 @@ public void onError(final Throwable t) {
public void onSubscribe(final Cancellable cancellable) {
subscriber.onSubscribe(() -> {
try {
if (stateUpdater.compareAndSet(this, IDLE, TERMINATED)) {
final int previous = stateUpdater.getAndSet(this, RESPONSE_COMPLETE);
if (previous == IDLE || previous == PROCESSING_PAYLOAD) {
beforeFinally.cancel();
}
} finally {
Expand All @@ -163,13 +171,18 @@ public void onSuccess(@Nullable final StreamingHttpResponse response) {
sendNullResponse();
} else if (stateUpdater.compareAndSet(this, IDLE, PROCESSING_PAYLOAD)) {
subscriber.onSuccess(response.transformMessageBody(payload ->
payload.liftSync(messageBodySubscriber -> new MessageBodySubscriber(messageBodySubscriber,
beforeFinally, discardEventsAfterCancel))
payload.liftSync(messageBodySubscriber ->
// Only the first subscriber needs to be wrapped. Followup subscribers will
// most likely fail because duplicate subscriptions to message bodies are not allowed.
stateUpdater.compareAndSet(this,
PROCESSING_PAYLOAD, RESPONSE_COMPLETE) ? new MessageBodySubscriber(
messageBodySubscriber, beforeFinally, discardEventsAfterCancel) :
messageBodySubscriber)
));
} else {
// Invoking a terminal method multiple times is not allowed by the RS spec, so we assume we have been
// cancelled.
assert state == TERMINATED;
assert state == RESPONSE_COMPLETE;
// The request has been cancelled, but we still received a response. We need to discard the response
// body or risk leaking hot resources which are commonly attached to a message body.
toSource(response.messageBody()).subscribe(CancelImmediatelySubscriber.INSTANCE);
Expand All @@ -185,7 +198,7 @@ public void onSuccess(@Nullable final StreamingHttpResponse response) {
@Override
public void onError(final Throwable t) {
try {
if (stateUpdater.compareAndSet(this, IDLE, TERMINATED)) {
if (stateUpdater.compareAndSet(this, IDLE, RESPONSE_COMPLETE)) {
beforeFinally.onError(t);
} else if (discardEventsAfterCancel) {
return;
Expand All @@ -200,7 +213,7 @@ public void onError(final Throwable t) {
private void sendNullResponse() {
try {
// Since, we are not giving out a response, no subscriber will arrive for the payload Publisher.
if (stateUpdater.compareAndSet(this, IDLE, TERMINATED)) {
if (stateUpdater.compareAndSet(this, IDLE, RESPONSE_COMPLETE)) {
beforeFinally.onComplete();
} else if (discardEventsAfterCancel) {
return;
Expand Down Expand Up @@ -424,21 +437,7 @@ public void onComplete() {
}

private int setTerminalState() {
for (;;) {
final int state = this.state;
if (state == TERMINATED) {
// We already cancelled and have to discard further events
return state;
}
if (state == PROCESSING_PAYLOAD) {
if (stateUpdater.compareAndSet(this, PROCESSING_PAYLOAD, TERMINATED)) {
return state;
}
} else if (stateUpdater.compareAndSet(this, state, TERMINATED)) {
// re-entry, but we can terminate because this is a final event:
return state;
}
}
return stateUpdater.getAndSet(this, TERMINATED);
}

private void cancel0(final boolean propagateCancel) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import io.servicetalk.http.api.StreamingHttpRequestResponseFactory;
import io.servicetalk.http.api.StreamingHttpResponse;

import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.params.ParameterizedTest;
Expand Down Expand Up @@ -76,7 +77,6 @@
import static org.hamcrest.Matchers.nullValue;
import static org.hamcrest.Matchers.sameInstance;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoInteractions;
Expand All @@ -91,6 +91,11 @@ class BeforeFinallyHttpOperatorTest {
@Mock
private TerminalSignalConsumer beforeFinally;

@AfterEach
void ensureOnlyOneSignal() {
verifyNoMoreInteractions(beforeFinally);
}

@SuppressWarnings("unused")
private static Stream<Arguments> booleanTerminalNotification() {
return Stream.of(Arguments.of(false, TerminalNotification.complete()),
Expand All @@ -111,7 +116,6 @@ void nullAsSuccess(boolean discardEventsAfterCancel) {
verify(beforeFinally).onComplete();

subscriber.verifyNullResponseReceived();
verifyNoMoreInteractions(beforeFinally);
}

@ParameterizedTest(name = "{displayName} [{index}] discardEventsAfterCancel={0}")
Expand Down Expand Up @@ -176,7 +180,6 @@ void cancelBeforeOnSuccess(boolean discardEventsAfterCancel) {
Exception ex = assertThrows(Exception.class, () -> subscriber.response.payloadBody().toFuture().get());
assertThat(ex.getCause(), instanceOf(CancellationException.class));
}
verifyNoMoreInteractions(beforeFinally);
}

@ParameterizedTest(name = "{displayName} [{index}] discardEventsAfterCancel={0}")
Expand All @@ -199,7 +202,6 @@ void cancelBeforeOnSuccessNull(boolean discardEventsAfterCancel) {
} else {
subscriber.verifyNullResponseReceived();
}
verifyNoMoreInteractions(beforeFinally);
}

@ParameterizedTest(name = "{displayName} [{index}] discardEventsAfterCancel={0}")
Expand All @@ -222,7 +224,6 @@ void cancelBeforeOnError(boolean discardEventsAfterCancel) {
} else {
assertThat("onError not called.", subscriber.error, is(DELIBERATE_EXCEPTION));
}
verifyNoMoreInteractions(beforeFinally);
}

@ParameterizedTest(name = "{displayName} [{index}] discardEventsAfterCancel={0}")
Expand All @@ -242,7 +243,9 @@ void cancelAfterOnSuccess(boolean discardEventsAfterCancel) {
subscriber.verifyResponseReceived();

subscriber.cancellable.cancel();
verifyNoInteractions(beforeFinally);

// We should get a cancel because we haven't subscribed to the payload body.
verify(beforeFinally).cancel();
// We unconditionally cancel and let the original single handle the cancel post terminate
responseSingle.verifyCancelled();
}
Expand All @@ -263,14 +266,13 @@ void cancelAfterOnError(boolean discardEventsAfterCancel) {
assertThat("onError not called.", subscriber.error, is(DELIBERATE_EXCEPTION));

subscriber.cancellable.cancel();
verifyNoMoreInteractions(beforeFinally);
// We unconditionally cancel and let the original single handle the cancel post terminate
responseSingle.verifyCancelled();
}

@ParameterizedTest(name = "{displayName} [{index}] discardEventsAfterCancel={0} payloadTerminal={1}")
@MethodSource("booleanTerminalNotification")
void cancelBeforeOnNextThenTerminate(boolean discardEventsAfterCancel, TerminalNotification payloadTerminal) {
void cancelBeforeMessageBodySubscribe(boolean discardEventsAfterCancel, TerminalNotification payloadTerminal) {
TestPublisher<Buffer> payload = new TestPublisher.Builder<Buffer>().disableAutoOnSubscribe().build();
TestSubscription payloadSubscription = new TestSubscription();
TestPublisherSubscriber<Buffer> payloadSubscriber = new TestPublisherSubscriber<>();
Expand All @@ -289,7 +291,7 @@ void cancelBeforeOnNextThenTerminate(boolean discardEventsAfterCancel, TerminalN
subscriber.verifyResponseReceived();

subscriber.cancellable.cancel();
verifyNoInteractions(beforeFinally);
verify(beforeFinally).cancel();
// We unconditionally cancel and let the original single handle the cancel post terminate
responseSingle.verifyCancelled();

Expand All @@ -302,6 +304,58 @@ void cancelBeforeOnNextThenTerminate(boolean discardEventsAfterCancel, TerminalN
payloadSubscriber.awaitSubscription().cancel();
assertThat("Payload was not cancelled", payloadSubscription.isCancelled(), is(true));

payload.onNext(EMPTY_BUFFER);
if (payloadTerminal.cause() == null) {
payload.onComplete();
} else {
payload.onError(payloadTerminal.cause());
}

assertThat("Unexpected payload body items",
payloadSubscriber.pollAllOnNext(), contains(EMPTY_BUFFER));
if (payloadTerminal.cause() == null) {
payloadSubscriber.awaitOnComplete();
} else {
assertThat(payloadSubscriber.awaitOnError(), is(DELIBERATE_EXCEPTION));
}
}

@ParameterizedTest(name = "{displayName} [{index}] discardEventsAfterCancel={0} payloadTerminal={1}")
@MethodSource("booleanTerminalNotification")
void cancelAfterMessageBodySubscribe(boolean discardEventsAfterCancel, TerminalNotification payloadTerminal) {
TestPublisher<Buffer> payload = new TestPublisher.Builder<Buffer>().disableAutoOnSubscribe().build();
TestSubscription payloadSubscription = new TestSubscription();
TestPublisherSubscriber<Buffer> payloadSubscriber = new TestPublisherSubscriber<>();

LegacyTestSingle<StreamingHttpResponse> responseSingle = new LegacyTestSingle<>(true);
final ResponseSubscriber subscriber = new ResponseSubscriber();
toSource(responseSingle
.liftSync(new BeforeFinallyHttpOperator(beforeFinally, discardEventsAfterCancel)))
.subscribe(subscriber);
assertThat("onSubscribe not called.", subscriber.cancellable, is(notNullValue()));

responseSingle.onSuccess(reqRespFactory.ok().payloadBody(payload));

verifyNoInteractions(beforeFinally);
responseSingle.verifyNotCancelled();
subscriber.verifyResponseReceived();

assert subscriber.response != null;
toSource(subscriber.response.payloadBody()).subscribe(payloadSubscriber);
payload.onSubscribe(payloadSubscription);
payloadSubscriber.awaitSubscription().request(MAX_VALUE);

// We unconditionally cancel and let the original single handle the cancel post terminate
subscriber.cancellable.cancel();
// The ownership of `beforeFinally` has been transferred to the message body subscription.
verify(beforeFinally, Mockito.never()).cancel();
responseSingle.verifyCancelled();

assertThat("Payload was prematurely cancelled", payloadSubscription.isCancelled(), is(false));
payloadSubscriber.awaitSubscription().cancel();
assertThat("Payload was not cancelled", payloadSubscription.isCancelled(), is(true));
verify(beforeFinally).cancel();

payload.onNext(EMPTY_BUFFER);
if (payloadTerminal.cause() == null) {
payload.onComplete();
Expand Down Expand Up @@ -359,8 +413,10 @@ void cancelAfterOnNextThenTerminate(boolean discardEventsAfterCancel, TerminalNo
payloadSubscriber.pollAllOnNext(), contains(EMPTY_BUFFER));
if (payloadTerminal.cause() == null) {
payloadSubscriber.awaitOnComplete();
verify(beforeFinally).onComplete();
} else {
assertThat(payloadSubscriber.awaitOnError(), is(DELIBERATE_EXCEPTION));
verify(beforeFinally).onError(DELIBERATE_EXCEPTION);
}

assertThat("Payload was prematurely cancelled", payloadSubscription.isCancelled(), is(false));
Expand Down Expand Up @@ -410,7 +466,6 @@ public void onNext(@Nullable final Buffer buffer) {
subscription.cancel(); // intentionally cancel two times to make sure it's idempotent
verify(payloadSubscription, Mockito.never()).cancel();
verifyNoMoreInteractions(beforeFinally);

payload.onNext(EMPTY_BUFFER);
}
verify(payloadSubscription, Mockito.never()).cancel();
Expand All @@ -437,8 +492,6 @@ public void onComplete() {

assertThat("Unexpected payload body items", receivedPayload, contains(EMPTY_BUFFER, EMPTY_BUFFER));
assertThat("Unexpected payload body termination", subscriberTerminal.get(), is(nullValue()));

verifyNoMoreInteractions(beforeFinally);
}

@ParameterizedTest(name = "{displayName} [{index}] fromOnNext={0} payloadTerminal={1}")
Expand Down Expand Up @@ -521,7 +574,6 @@ private void cancelFromTerminal() {
} else {
verify(beforeFinally).onError(payloadTerminal.cause());
}
verifyNoMoreInteractions(beforeFinally);
}

@ParameterizedTest(name = "{displayName} [{index}] discardEventsAfterCancel={0}")
Expand Down Expand Up @@ -589,7 +641,8 @@ void resubscribeToPayloadBody(boolean discardEventsAfterCancel, boolean payloadE
toSource(subscriber.response.payloadBody()).subscribe(payloadSubscriber2);
payloadSubscriber2.awaitSubscription().request(MAX_VALUE);
assertThat(payloadSubscriber2.awaitOnError(), is(instanceOf(DuplicateSubscribeException.class)));
verify(beforeFinally).onError(any(DuplicateSubscribeException.class));
// second subscribe shouldn't interact.
verifyNoMoreInteractions(beforeFinally);
}

@ParameterizedTest(name = "{displayName} [{index}] discardEventsAfterCancel={0}")
Expand Down

0 comments on commit 372f108

Please sign in to comment.