Skip to content

Commit

Permalink
Pooled HTTP client connection cannot directly create requests
Browse files Browse the repository at this point in the history
  • Loading branch information
vietj committed Jan 16, 2024
1 parent fe6b72d commit a290ad1
Show file tree
Hide file tree
Showing 7 changed files with 44 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ public class Http1xClientConnection extends Http1xConnectionBase<WebSocketImpl>
private final HttpVersion version;
private final long lowWaterMark;
private final long highWaterMark;
private final boolean pooled;

private Deque<Stream> requests = new ArrayDeque<>();
private Deque<Stream> responses = new ArrayDeque<>();
Expand Down Expand Up @@ -116,7 +117,8 @@ public class Http1xClientConnection extends Http1xConnectionBase<WebSocketImpl>
SocketAddress server,
HostAndPort authority,
ContextInternal context,
ClientMetrics metrics) {
ClientMetrics metrics,
boolean pooled) {
super(context, chctx);
this.client = client;
this.options = client.options();
Expand All @@ -130,6 +132,7 @@ public class Http1xClientConnection extends Http1xConnectionBase<WebSocketImpl>
this.lowWaterMark = chctx.channel().config().getWriteBufferLowWaterMark();
this.keepAliveTimeout = options.getKeepAliveTimeout();
this.expirationTimestamp = expirationTimestampOf(keepAliveTimeout);
this.pooled = pooled;
}

@Override
Expand Down Expand Up @@ -169,6 +172,11 @@ public synchronized long activeStreams() {
return requests.isEmpty() && responses.isEmpty() ? 0 : 1;
}

@Override
public boolean pooled() {
return pooled;
}

/**
* @return a raw {@code NetSocket} - for internal use - must be called from event-loop
*/
Expand Down
14 changes: 11 additions & 3 deletions src/main/java/io/vertx/core/http/impl/Http2ClientConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ class Http2ClientConnection extends Http2ConnectionBase implements HttpClientCon
private final HttpClientBase client;
private final ClientMetrics metrics;
private final HostAndPort authority;
private final boolean pooled;
private Handler<Void> evictionHandler = DEFAULT_EVICTION_HANDLER;
private Handler<Long> concurrencyChangeHandler = DEFAULT_CONCURRENCY_CHANGE_HANDLER;
private long expirationTimestamp;
Expand All @@ -52,18 +53,24 @@ class Http2ClientConnection extends Http2ConnectionBase implements HttpClientCon
ContextInternal context,
HostAndPort authority,
VertxHttp2ConnectionHandler connHandler,
ClientMetrics metrics) {
ClientMetrics metrics, boolean pooled) {
super(context, connHandler);
this.metrics = metrics;
this.client = client;
this.authority = authority;
this.pooled = pooled;
}

@Override
public HostAndPort authority() {
return authority;
}

@Override
public boolean pooled() {
return pooled;
}

@Override
public Http2ClientConnection evictionHandler(Handler<Void> handler) {
evictionHandler = handler;
Expand Down Expand Up @@ -674,7 +681,8 @@ public static VertxHttp2ConnectionHandler<Http2ClientConnection> createHttp2Conn
ContextInternal context,
boolean upgrade,
Object socketMetric,
HostAndPort authority) {
HostAndPort authority,
boolean pooled) {
HttpClientOptions options = client.options();
HttpClientMetrics met = client.metrics();
VertxHttp2ConnectionHandler<Http2ClientConnection> handler = new VertxHttp2ConnectionHandlerBuilder<Http2ClientConnection>()
Expand All @@ -683,7 +691,7 @@ public static VertxHttp2ConnectionHandler<Http2ClientConnection> createHttp2Conn
.gracefulShutdownTimeoutMillis(0) // So client close tests don't hang 30 seconds - make this configurable later but requires HTTP/1 impl
.initialSettings(client.options().getInitialSettings())
.connectionFactory(connHandler -> {
Http2ClientConnection conn = new Http2ClientConnection(client, context, authority, connHandler, metrics);
Http2ClientConnection conn = new Http2ClientConnection(client, context, authority, connHandler, metrics, pooled);
if (metrics != null) {
Object m = socketMetric;
conn.metric(m);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,11 @@ public long concurrency() {
return upgradeProcessed ? current.concurrency() : 1L;
}

@Override
public boolean pooled() {
return current.pooled();
}

@Override
public long activeStreams() {
return current.concurrency();
Expand Down Expand Up @@ -358,7 +363,7 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
public void upgradeTo(ChannelHandlerContext ctx, FullHttpResponse upgradeResponse) throws Exception {

// Now we need to upgrade this to an HTTP2
VertxHttp2ConnectionHandler<Http2ClientConnection> handler = Http2ClientConnection.createHttp2ConnectionHandler(upgradedConnection.client, upgradingConnection.metrics, upgradingConnection.getContext(), true, upgradedConnection.current.metric(), upgradedConnection.current.authority());
VertxHttp2ConnectionHandler<Http2ClientConnection> handler = Http2ClientConnection.createHttp2ConnectionHandler(upgradedConnection.client, upgradingConnection.metrics, upgradingConnection.getContext(), true, upgradedConnection.current.metric(), upgradedConnection.current.authority(), upgradingConnection.pooled());
upgradingConnection.channel().pipeline().addLast(handler);
handler.connectFuture().addListener(future -> {
if (!future.isSuccess()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ public class HttpChannelConnector {
private final HttpVersion version;
private final HostAndPort authority;
private final SocketAddress server;
private final boolean pooled;

public HttpChannelConnector(HttpClientBase client,
NetClientInternal netClient,
Expand All @@ -67,7 +68,8 @@ public HttpChannelConnector(HttpClientBase client,
boolean ssl,
boolean useAlpn,
HostAndPort authority,
SocketAddress server) {
SocketAddress server,
boolean pooled) {
this.client = client;
this.netClient = netClient;
this.metrics = metrics;
Expand All @@ -79,6 +81,7 @@ public HttpChannelConnector(HttpClientBase client,
this.version = version;
this.authority = authority;
this.server = server;
this.pooled = pooled;
}

public SocketAddress server() {
Expand Down Expand Up @@ -209,7 +212,7 @@ private void http1xConnected(HttpVersion version,
boolean upgrade = version == HttpVersion.HTTP_2 && options.isHttp2ClearTextUpgrade();
VertxHandler<Http1xClientConnection> clientHandler = VertxHandler.create(chctx -> {
HttpClientMetrics met = client.metrics();
Http1xClientConnection conn = new Http1xClientConnection(upgrade ? HttpVersion.HTTP_1_1 : version, client, chctx, ssl, server, authority, context, this.metrics);
Http1xClientConnection conn = new Http1xClientConnection(upgrade ? HttpVersion.HTTP_1_1 : version, client, chctx, ssl, server, authority, context, metrics, pooled);
if (met != null) {
conn.metric(socketMetric);
met.endpointConnected(metrics);
Expand Down Expand Up @@ -256,7 +259,7 @@ private void http2Connected(ContextInternal context,
PromiseInternal<HttpClientConnectionInternal> promise) {
VertxHttp2ConnectionHandler<Http2ClientConnection> clientHandler;
try {
clientHandler = Http2ClientConnection.createHttp2ConnectionHandler(client, metrics, context, false, metric, authority);
clientHandler = Http2ClientConnection.createHttp2ConnectionHandler(client, metrics, context, false, metric, authority, pooled);
ch.pipeline().addLast("handler", clientHandler);
ch.flush();
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,11 @@ public interface HttpClientConnectionInternal extends HttpClientConnection {
*/
long activeStreams();

/**
* @return whether the connection is pooled
*/
boolean pooled();

/**
* @return the connection channel
*/
Expand All @@ -82,6 +87,9 @@ public interface HttpClientConnectionInternal extends HttpClientConnection {
* @return a future notified with the created request
*/
default Future<HttpClientRequest> createRequest(ContextInternal context, RequestOptions options) {
if (pooled()) {
return context.failedFuture("HTTP requests cannot be directly created from pool HTTP client request, use the pool instead");
}
return createStream(context).map(stream -> {
HttpClientRequestImpl request = new HttpClientRequestImpl(stream);
if (options != null) {
Expand Down
5 changes: 3 additions & 2 deletions src/main/java/io/vertx/core/http/impl/HttpClientImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ private EndpointProvider<EndpointKey, SharedClientHttpStreamEndpoint> httpEndpoi
key = new EndpointKey(key.ssl, key.sslOptions, proxyOptions, server, key.authority);
proxyOptions = null;
}
HttpChannelConnector connector = new HttpChannelConnector(HttpClientImpl.this, netClient, key.sslOptions, proxyOptions, metrics, options.getProtocolVersion(), key.ssl, options.isUseAlpn(), key.authority, key.server);
HttpChannelConnector connector = new HttpChannelConnector(HttpClientImpl.this, netClient, key.sslOptions, proxyOptions, metrics, options.getProtocolVersion(), key.ssl, options.isUseAlpn(), key.authority, key.server, true);
return new SharedClientHttpStreamEndpoint(
HttpClientImpl.this,
metrics,
Expand Down Expand Up @@ -287,7 +287,8 @@ public Future<HttpClientConnection> connect(HttpConnectOptions connect) {
useSSL,
useAlpn,
authority,
server);
server,
false);
return (Future) connector.httpConnect(vertx.getOrCreateContext());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ void webSocket(ContextInternal ctx, WebSocketConnectOptions connectOptions, Prom
EndpointProvider<EndpointKey, WebSocketEndpoint> provider = (key_, dispose) -> {
int maxPoolSize = options.getMaxConnections();
ClientMetrics metrics = WebSocketClientImpl.this.metrics != null ? WebSocketClientImpl.this.metrics.createEndpointMetrics(key_.server, maxPoolSize) : null;
HttpChannelConnector connector = new HttpChannelConnector(WebSocketClientImpl.this, netClient, sslOptions, key_.proxyOptions, metrics, HttpVersion.HTTP_1_1, key_.ssl, false, key_.authority, key_.server);
HttpChannelConnector connector = new HttpChannelConnector(WebSocketClientImpl.this, netClient, sslOptions, key_.proxyOptions, metrics, HttpVersion.HTTP_1_1, key_.ssl, false, key_.authority, key_.server, false);
return new WebSocketEndpoint(null, maxPoolSize, connector, dispose);
};
webSocketCM
Expand Down

0 comments on commit a290ad1

Please sign in to comment.