Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

http-utils: cleanup the BeforeFinallyHttpOperator state #3042

Merged
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,13 @@
* Request/Response cycle. One needs to consider and coordinate between the multitude of outcomes: cancel/success/error
* across both sources.</p>
* <p>This operator ensures that the provided callback is triggered just once whenever the sources reach a terminal
* state across both sources.</p>
* state across both sources. An important question is when the ownership of the callback is transferred from the
* {@link Single} source and the payload {@link Publisher}. In this case ownership is transferred the first time the
* payload is subscribed to. This means that if a cancellation of the response {@link Single} occurs after the response
* has been emitted but before the message body has been subscribed to, the callback will observe a cancel. However, if
* the message payload has been subscribed to, the cancellation of the {@link Single} will have no effect and the result
* is dictated by the terminal event of the payload body. If the body is subscribed to multiple times, only the first
* subscribe will receive ownership of the terminal events.</p>
*
* Example usage tracking the begin and end of a request:
*
Expand Down Expand Up @@ -110,9 +116,10 @@ public SingleSource.Subscriber<? super StreamingHttpResponse> apply(
}

private static final class ResponseCompletionSubscriber implements SingleSource.Subscriber<StreamingHttpResponse> {

private static final int IDLE = 0;
private static final int PROCESSING_PAYLOAD = 1;
private static final int TERMINATED = -1;
private static final int RESPONSE_COMPLETE = -1;
private static final AtomicIntegerFieldUpdater<ResponseCompletionSubscriber> stateUpdater =
newUpdater(ResponseCompletionSubscriber.class, "state");
private static final SingleSource.Subscriber<StreamingHttpResponse> NOOP_SUBSCRIBER =
Expand Down Expand Up @@ -147,7 +154,8 @@ public void onError(final Throwable t) {
public void onSubscribe(final Cancellable cancellable) {
subscriber.onSubscribe(() -> {
try {
if (stateUpdater.compareAndSet(this, IDLE, TERMINATED)) {
final int previous = stateUpdater.getAndSet(this, RESPONSE_COMPLETE);
if (previous == IDLE || previous == PROCESSING_PAYLOAD) {
beforeFinally.cancel();
}
} finally {
Expand All @@ -163,13 +171,18 @@ public void onSuccess(@Nullable final StreamingHttpResponse response) {
sendNullResponse();
} else if (stateUpdater.compareAndSet(this, IDLE, PROCESSING_PAYLOAD)) {
subscriber.onSuccess(response.transformMessageBody(payload ->
payload.liftSync(messageBodySubscriber -> new MessageBodySubscriber(messageBodySubscriber,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We satisfy the callback requirements but we don't proactively drain the message body. Doing so will be hairy. I personally think it should be up to the receiver of the message to properly dispose of it if they no longer want it, or pass it along until something in the pipeline cares.

Opinions welcome.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do you think we may need to drain message body here?

Copy link
Contributor Author

@bryce-anderson bryce-anderson Aug 22, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I certainly wasn't clear. I mean we win the race to send it but what happens if we receive a losing cancel call before the response body has been subscribed to: should we try to drain the message body at that point or is it out of our hands? I think the latter.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it's out of our hands. For now, we will rely on operators that generate cancel, like the timeout filter, to clean it up

beforeFinally, discardEventsAfterCancel))
payload.liftSync(messageBodySubscriber ->
// Only the first subscriber needs to be wrapped. Followup subscribers will
// most likely fail because duplicate subscriptions to message bodies are not allowed.
stateUpdater.compareAndSet(this,
PROCESSING_PAYLOAD, RESPONSE_COMPLETE) ? new MessageBodySubscriber(
messageBodySubscriber, beforeFinally, discardEventsAfterCancel) :
messageBodySubscriber)
));
} else {
// Invoking a terminal method multiple times is not allowed by the RS spec, so we assume we have been
// cancelled.
assert state == TERMINATED;
assert state == RESPONSE_COMPLETE;
// The request has been cancelled, but we still received a response. We need to discard the response
// body or risk leaking hot resources which are commonly attached to a message body.
toSource(response.messageBody()).subscribe(CancelImmediatelySubscriber.INSTANCE);
Expand All @@ -185,7 +198,7 @@ public void onSuccess(@Nullable final StreamingHttpResponse response) {
@Override
public void onError(final Throwable t) {
try {
if (stateUpdater.compareAndSet(this, IDLE, TERMINATED)) {
if (stateUpdater.compareAndSet(this, IDLE, RESPONSE_COMPLETE)) {
beforeFinally.onError(t);
} else if (discardEventsAfterCancel) {
return;
Expand All @@ -200,7 +213,7 @@ public void onError(final Throwable t) {
private void sendNullResponse() {
try {
// Since, we are not giving out a response, no subscriber will arrive for the payload Publisher.
if (stateUpdater.compareAndSet(this, IDLE, TERMINATED)) {
if (stateUpdater.compareAndSet(this, IDLE, RESPONSE_COMPLETE)) {
beforeFinally.onComplete();
} else if (discardEventsAfterCancel) {
return;
Expand Down Expand Up @@ -424,21 +437,7 @@ public void onComplete() {
}

private int setTerminalState() {
for (;;) {
final int state = this.state;
if (state == TERMINATED) {
// We already cancelled and have to discard further events
return state;
}
if (state == PROCESSING_PAYLOAD) {
if (stateUpdater.compareAndSet(this, PROCESSING_PAYLOAD, TERMINATED)) {
return state;
}
} else if (stateUpdater.compareAndSet(this, state, TERMINATED)) {
// re-entry, but we can terminate because this is a final event:
return state;
}
}
return stateUpdater.getAndSet(this, TERMINATED);
bryce-anderson marked this conversation as resolved.
Show resolved Hide resolved
}

private void cancel0(final boolean propagateCancel) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import io.servicetalk.http.api.StreamingHttpRequestResponseFactory;
import io.servicetalk.http.api.StreamingHttpResponse;

import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.params.ParameterizedTest;
Expand Down Expand Up @@ -76,7 +77,6 @@
import static org.hamcrest.Matchers.nullValue;
import static org.hamcrest.Matchers.sameInstance;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoInteractions;
Expand All @@ -91,6 +91,11 @@ class BeforeFinallyHttpOperatorTest {
@Mock
private TerminalSignalConsumer beforeFinally;

@AfterEach
void ensureOnlyOneSignal() {
verifyNoMoreInteractions(beforeFinally);
}

@SuppressWarnings("unused")
private static Stream<Arguments> booleanTerminalNotification() {
return Stream.of(Arguments.of(false, TerminalNotification.complete()),
Expand All @@ -111,7 +116,6 @@ void nullAsSuccess(boolean discardEventsAfterCancel) {
verify(beforeFinally).onComplete();

subscriber.verifyNullResponseReceived();
verifyNoMoreInteractions(beforeFinally);
}

@ParameterizedTest(name = "{displayName} [{index}] discardEventsAfterCancel={0}")
Expand Down Expand Up @@ -176,7 +180,6 @@ void cancelBeforeOnSuccess(boolean discardEventsAfterCancel) {
Exception ex = assertThrows(Exception.class, () -> subscriber.response.payloadBody().toFuture().get());
assertThat(ex.getCause(), instanceOf(CancellationException.class));
}
verifyNoMoreInteractions(beforeFinally);
}

@ParameterizedTest(name = "{displayName} [{index}] discardEventsAfterCancel={0}")
Expand All @@ -199,7 +202,6 @@ void cancelBeforeOnSuccessNull(boolean discardEventsAfterCancel) {
} else {
subscriber.verifyNullResponseReceived();
}
verifyNoMoreInteractions(beforeFinally);
}

@ParameterizedTest(name = "{displayName} [{index}] discardEventsAfterCancel={0}")
Expand All @@ -222,7 +224,6 @@ void cancelBeforeOnError(boolean discardEventsAfterCancel) {
} else {
assertThat("onError not called.", subscriber.error, is(DELIBERATE_EXCEPTION));
}
verifyNoMoreInteractions(beforeFinally);
}

@ParameterizedTest(name = "{displayName} [{index}] discardEventsAfterCancel={0}")
Expand All @@ -242,7 +243,9 @@ void cancelAfterOnSuccess(boolean discardEventsAfterCancel) {
subscriber.verifyResponseReceived();

subscriber.cancellable.cancel();
verifyNoInteractions(beforeFinally);

// We should get a cancel because we haven't subscribed to the payload body.
verify(beforeFinally).cancel();
// We unconditionally cancel and let the original single handle the cancel post terminate
responseSingle.verifyCancelled();
}
Expand All @@ -263,14 +266,13 @@ void cancelAfterOnError(boolean discardEventsAfterCancel) {
assertThat("onError not called.", subscriber.error, is(DELIBERATE_EXCEPTION));

subscriber.cancellable.cancel();
verifyNoMoreInteractions(beforeFinally);
// We unconditionally cancel and let the original single handle the cancel post terminate
responseSingle.verifyCancelled();
}

@ParameterizedTest(name = "{displayName} [{index}] discardEventsAfterCancel={0} payloadTerminal={1}")
@MethodSource("booleanTerminalNotification")
void cancelBeforeOnNextThenTerminate(boolean discardEventsAfterCancel, TerminalNotification payloadTerminal) {
void cancelBeforeMessageBodySubscribe(boolean discardEventsAfterCancel, TerminalNotification payloadTerminal) {
TestPublisher<Buffer> payload = new TestPublisher.Builder<Buffer>().disableAutoOnSubscribe().build();
TestSubscription payloadSubscription = new TestSubscription();
TestPublisherSubscriber<Buffer> payloadSubscriber = new TestPublisherSubscriber<>();
Expand All @@ -289,7 +291,7 @@ void cancelBeforeOnNextThenTerminate(boolean discardEventsAfterCancel, TerminalN
subscriber.verifyResponseReceived();

subscriber.cancellable.cancel();
verifyNoInteractions(beforeFinally);
verify(beforeFinally).cancel();
// We unconditionally cancel and let the original single handle the cancel post terminate
responseSingle.verifyCancelled();

Expand All @@ -302,6 +304,58 @@ void cancelBeforeOnNextThenTerminate(boolean discardEventsAfterCancel, TerminalN
payloadSubscriber.awaitSubscription().cancel();
assertThat("Payload was not cancelled", payloadSubscription.isCancelled(), is(true));

payload.onNext(EMPTY_BUFFER);
if (payloadTerminal.cause() == null) {
payload.onComplete();
} else {
payload.onError(payloadTerminal.cause());
}

assertThat("Unexpected payload body items",
payloadSubscriber.pollAllOnNext(), contains(EMPTY_BUFFER));
if (payloadTerminal.cause() == null) {
payloadSubscriber.awaitOnComplete();
} else {
assertThat(payloadSubscriber.awaitOnError(), is(DELIBERATE_EXCEPTION));
}
}

@ParameterizedTest(name = "{displayName} [{index}] discardEventsAfterCancel={0} payloadTerminal={1}")
@MethodSource("booleanTerminalNotification")
void cancelAfterMessageBodySubscribe(boolean discardEventsAfterCancel, TerminalNotification payloadTerminal) {
TestPublisher<Buffer> payload = new TestPublisher.Builder<Buffer>().disableAutoOnSubscribe().build();
TestSubscription payloadSubscription = new TestSubscription();
TestPublisherSubscriber<Buffer> payloadSubscriber = new TestPublisherSubscriber<>();

LegacyTestSingle<StreamingHttpResponse> responseSingle = new LegacyTestSingle<>(true);
final ResponseSubscriber subscriber = new ResponseSubscriber();
toSource(responseSingle
.liftSync(new BeforeFinallyHttpOperator(beforeFinally, discardEventsAfterCancel)))
.subscribe(subscriber);
assertThat("onSubscribe not called.", subscriber.cancellable, is(notNullValue()));

responseSingle.onSuccess(reqRespFactory.ok().payloadBody(payload));

verifyNoInteractions(beforeFinally);
responseSingle.verifyNotCancelled();
subscriber.verifyResponseReceived();

assert subscriber.response != null;
toSource(subscriber.response.payloadBody()).subscribe(payloadSubscriber);
payload.onSubscribe(payloadSubscription);
payloadSubscriber.awaitSubscription().request(MAX_VALUE);

// We unconditionally cancel and let the original single handle the cancel post terminate
subscriber.cancellable.cancel();
// The ownership of `beforeFinally` has been transferred to the message body subscription.
verify(beforeFinally, Mockito.never()).cancel();
responseSingle.verifyCancelled();

assertThat("Payload was prematurely cancelled", payloadSubscription.isCancelled(), is(false));
payloadSubscriber.awaitSubscription().cancel();
assertThat("Payload was not cancelled", payloadSubscription.isCancelled(), is(true));
verify(beforeFinally).cancel();

payload.onNext(EMPTY_BUFFER);
if (payloadTerminal.cause() == null) {
payload.onComplete();
Expand Down Expand Up @@ -359,8 +413,10 @@ void cancelAfterOnNextThenTerminate(boolean discardEventsAfterCancel, TerminalNo
payloadSubscriber.pollAllOnNext(), contains(EMPTY_BUFFER));
if (payloadTerminal.cause() == null) {
payloadSubscriber.awaitOnComplete();
verify(beforeFinally).onComplete();
} else {
assertThat(payloadSubscriber.awaitOnError(), is(DELIBERATE_EXCEPTION));
verify(beforeFinally).onError(DELIBERATE_EXCEPTION);
}

assertThat("Payload was prematurely cancelled", payloadSubscription.isCancelled(), is(false));
Expand Down Expand Up @@ -410,7 +466,6 @@ public void onNext(@Nullable final Buffer buffer) {
subscription.cancel(); // intentionally cancel two times to make sure it's idempotent
verify(payloadSubscription, Mockito.never()).cancel();
verifyNoMoreInteractions(beforeFinally);

payload.onNext(EMPTY_BUFFER);
}
verify(payloadSubscription, Mockito.never()).cancel();
Expand All @@ -437,8 +492,6 @@ public void onComplete() {

assertThat("Unexpected payload body items", receivedPayload, contains(EMPTY_BUFFER, EMPTY_BUFFER));
assertThat("Unexpected payload body termination", subscriberTerminal.get(), is(nullValue()));

verifyNoMoreInteractions(beforeFinally);
}

@ParameterizedTest(name = "{displayName} [{index}] fromOnNext={0} payloadTerminal={1}")
Expand Down Expand Up @@ -521,7 +574,6 @@ private void cancelFromTerminal() {
} else {
verify(beforeFinally).onError(payloadTerminal.cause());
}
verifyNoMoreInteractions(beforeFinally);
}

@ParameterizedTest(name = "{displayName} [{index}] discardEventsAfterCancel={0}")
Expand Down Expand Up @@ -589,7 +641,8 @@ void resubscribeToPayloadBody(boolean discardEventsAfterCancel, boolean payloadE
toSource(subscriber.response.payloadBody()).subscribe(payloadSubscriber2);
payloadSubscriber2.awaitSubscription().request(MAX_VALUE);
assertThat(payloadSubscriber2.awaitOnError(), is(instanceOf(DuplicateSubscribeException.class)));
verify(beforeFinally).onError(any(DuplicateSubscribeException.class));
// second subscribe shouldn't interact.
verifyNoMoreInteractions(beforeFinally);
}

@ParameterizedTest(name = "{displayName} [{index}] discardEventsAfterCancel={0}")
Expand Down
Loading