Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HTTPCLIENT-2310: Async Connect exec handler incorrectly pipes CONNECT requests through the main request protocol chain #516

Merged
merged 1 commit into from
Dec 2, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@

import java.io.IOException;
import java.io.InterruptedIOException;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.concurrent.atomic.AtomicReference;

import org.apache.hc.client5.http.AuthenticationStrategy;
import org.apache.hc.client5.http.HttpRoute;
Expand All @@ -53,6 +56,7 @@
import org.apache.hc.core5.concurrent.CancellableDependency;
import org.apache.hc.core5.concurrent.FutureCallback;
import org.apache.hc.core5.http.EntityDetails;
import org.apache.hc.core5.http.Header;
import org.apache.hc.core5.http.HttpException;
import org.apache.hc.core5.http.HttpHost;
import org.apache.hc.core5.http.HttpRequest;
Expand All @@ -62,8 +66,13 @@
import org.apache.hc.core5.http.Method;
import org.apache.hc.core5.http.message.BasicHttpRequest;
import org.apache.hc.core5.http.message.StatusLine;
import org.apache.hc.core5.http.nio.AsyncClientExchangeHandler;
import org.apache.hc.core5.http.nio.AsyncDataConsumer;
import org.apache.hc.core5.http.nio.AsyncEntityProducer;
import org.apache.hc.core5.http.nio.CapacityChannel;
import org.apache.hc.core5.http.nio.DataStreamChannel;
import org.apache.hc.core5.http.nio.RequestChannel;
import org.apache.hc.core5.http.protocol.HttpContext;
import org.apache.hc.core5.http.protocol.HttpCoreContext;
import org.apache.hc.core5.http.protocol.HttpProcessor;
import org.apache.hc.core5.util.Args;
Expand Down Expand Up @@ -255,20 +264,20 @@ public void cancelled() {
if (LOG.isDebugEnabled()) {
LOG.debug("{} create tunnel", exchangeId);
}
createTunnel(state, proxy, target, scope, chain, new AsyncExecCallback() {
createTunnel(state, proxy, target, scope, new AsyncExecCallback() {

@Override
public AsyncDataConsumer handleResponse(
final HttpResponse response,
final EntityDetails entityDetails) throws HttpException, IOException {
return asyncExecCallback.handleResponse(response, entityDetails);
}
@Override
public AsyncDataConsumer handleResponse(
final HttpResponse response,
final EntityDetails entityDetails) throws HttpException, IOException {
return asyncExecCallback.handleResponse(response, entityDetails);
}

@Override
public void handleInformationResponse(
final HttpResponse response) throws HttpException, IOException {
asyncExecCallback.handleInformationResponse(response);
}
@Override
public void handleInformationResponse(
final HttpResponse response) throws HttpException, IOException {
asyncExecCallback.handleInformationResponse(response);
}

@Override
public void completed() {
Expand Down Expand Up @@ -302,6 +311,7 @@ public void completed() {

@Override
public void failed(final Exception cause) {
execRuntime.markConnectionNonReusable();
asyncExecCallback.failed(cause);
}

Expand Down Expand Up @@ -370,30 +380,75 @@ private void createTunnel(
final HttpHost proxy,
final HttpHost nextHop,
final AsyncExecChain.Scope scope,
final AsyncExecChain chain,
final AsyncExecCallback asyncExecCallback) throws HttpException, IOException {

final CancellableDependency operation = scope.cancellableDependency;
final HttpClientContext clientContext = scope.clientContext;
final AsyncExecRuntime execRuntime = scope.execRuntime;
final String exchangeId = scope.exchangeId;

final AuthExchange proxyAuthExchange = proxy != null ? clientContext.getAuthExchange(proxy) : new AuthExchange();

if (authCacheKeeper != null) {
authCacheKeeper.loadPreemptively(proxy, null, proxyAuthExchange, clientContext);
}

final HttpRequest connect = new BasicHttpRequest(Method.CONNECT, nextHop, nextHop.toHostString());
connect.setVersion(HttpVersion.HTTP_1_1);
final AsyncClientExchangeHandler internalExchangeHandler = new AsyncClientExchangeHandler() {

private final AtomicReference<AsyncDataConsumer> entityConsumerRef = new AtomicReference<>();

proxyHttpProcessor.process(connect, null, clientContext);
authenticator.addAuthResponse(proxy, ChallengeType.PROXY, connect, proxyAuthExchange, clientContext);
@Override
public void releaseResources() {
final AsyncDataConsumer entityConsumer = entityConsumerRef.getAndSet(null);
if (entityConsumer != null) {
entityConsumer.releaseResources();
}
}

chain.proceed(connect, null, scope, new AsyncExecCallback() {
@Override
public void failed(final Exception cause) {
final AsyncDataConsumer entityConsumer = entityConsumerRef.getAndSet(null);
if (entityConsumer != null) {
entityConsumer.releaseResources();
}
asyncExecCallback.failed(cause);
}

@Override
public void cancel() {
failed(new InterruptedIOException());
}

@Override
public AsyncDataConsumer handleResponse(
final HttpResponse response,
final EntityDetails entityDetails) throws HttpException, IOException {
public void produceRequest(final RequestChannel requestChannel,
final HttpContext httpContext) throws HttpException, IOException {
final HttpRequest connect = new BasicHttpRequest(Method.CONNECT, nextHop, nextHop.toHostString());
connect.setVersion(HttpVersion.HTTP_1_1);

proxyHttpProcessor.process(connect, null, clientContext);
authenticator.addAuthResponse(proxy, ChallengeType.PROXY, connect, proxyAuthExchange, clientContext);

requestChannel.sendRequest(connect, null, clientContext);
}

@Override
public void produce(final DataStreamChannel dataStreamChannel) throws IOException {
}

@Override
public int available() {
return 0;
}

@Override
public void consumeInformation(final HttpResponse httpResponse,
final HttpContext httpContext) throws HttpException, IOException {
}

@Override
public void consumeResponse(final HttpResponse response,
final EntityDetails entityDetails,
final HttpContext httpContext) throws HttpException, IOException {
clientContext.setAttribute(HttpCoreContext.HTTP_RESPONSE, response);
proxyHttpProcessor.process(response, entityDetails, clientContext);

Expand All @@ -404,31 +459,56 @@ public AsyncDataConsumer handleResponse(

if (needAuthentication(proxyAuthExchange, proxy, response, clientContext)) {
state.challenged = true;
return null;
}
state.challenged = false;
if (status >= HttpStatus.SC_REDIRECTION) {
state.tunnelRefused = true;
return asyncExecCallback.handleResponse(response, entityDetails);
} else {
state.challenged = false;
if (status >= HttpStatus.SC_REDIRECTION) {
state.tunnelRefused = true;
entityConsumerRef.set(asyncExecCallback.handleResponse(response, entityDetails));
} else if (status == HttpStatus.SC_OK) {
asyncExecCallback.completed();
} else {
throw new HttpException("Unexpected response to CONNECT request: " + new StatusLine(response));
}
}
return null;
}

@Override
public void handleInformationResponse(final HttpResponse response) throws HttpException, IOException {
public void updateCapacity(final CapacityChannel capacityChannel) throws IOException {
final AsyncDataConsumer entityConsumer = entityConsumerRef.get();
if (entityConsumer != null) {
entityConsumer.updateCapacity(capacityChannel);
} else {
capacityChannel.update(Integer.MAX_VALUE);
}
}

@Override
public void completed() {
asyncExecCallback.completed();
public void consume(final ByteBuffer src) throws IOException {
final AsyncDataConsumer entityConsumer = entityConsumerRef.get();
if (entityConsumer != null) {
entityConsumer.consume(src);
}
}

@Override
public void failed(final Exception cause) {
asyncExecCallback.failed(cause);
public void streamEnd(final List<? extends Header> trailers) throws HttpException, IOException {
final AsyncDataConsumer entityConsumer = entityConsumerRef.getAndSet(null);
if (entityConsumer != null) {
entityConsumer.streamEnd(trailers);
}
asyncExecCallback.completed();
}

});
};

if (LOG.isDebugEnabled()) {
operation.setDependency(execRuntime.execute(
exchangeId,
new LoggingAsyncClientExchangeHandler(LOG, exchangeId, internalExchangeHandler),
clientContext));
} else {
operation.setDependency(execRuntime.execute(exchangeId, internalExchangeHandler, clientContext));
}

}

Expand Down