Skip to content

Commit

Permalink
Ensure and test the max number of concurrent streams which can be cre…
Browse files Browse the repository at this point in the history
…ated concurrently with a single connection.
  • Loading branch information
vietj committed Jan 16, 2024
1 parent 44e16cc commit 786ae94
Show file tree
Hide file tree
Showing 7 changed files with 67 additions and 28 deletions.
18 changes: 14 additions & 4 deletions src/main/java/io/vertx/core/http/HttpClientConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,20 +21,30 @@
@VertxGen
public interface HttpClientConnection extends HttpConnection {

/**
* @return the number of active request/response (streams)
*/
long activeStreams();

/**
* @return the max number of active streams this connection can handle concurrently
*/
long concurrency();

/**
* Like {@link #createRequest(RequestOptions)} but without options.
*/
Future<HttpClientRequest> createRequest();

/**
* Create an HTTP request initialized with the specified request {@code options}
* Create an HTTP request (stream) initialized with the specified request {@code options}.
*
* This enqueues a request in the client connection queue, the resulting future is notified when the connection can satisfy
* the request.
* No more than {@link #concurrency()} streams can be handled concurrently, when the number of {@link #activeStreams()} has
* reached {@link #concurrency()} the future is failed.
*
* Pooled HTTP connection will return an error, since requests should be made against the pool instead the connection itself.
*
* @return a future notified with the created request
* @return a future notified with the created stream
*/
Future<HttpClientRequest> createRequest(RequestOptions options);

Expand Down
18 changes: 12 additions & 6 deletions src/main/java/io/vertx/core/http/impl/Http1xClientConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -1242,17 +1242,23 @@ public Future<HttpClientStream> createStream(ContextInternal context) {
private void createStream(ContextInternal context, Promise<HttpClientStream> promise) {
EventLoop eventLoop = context.nettyEventLoop();
if (eventLoop.inEventLoop()) {
Throwable err;
synchronized (this) {
if (!closed) {
StreamImpl stream = new StreamImpl(context, this, promise, seq++);
requests.add(stream);
if (requests.size() == 1) {
stream.promise.complete(stream);
if (requests.size() < concurrency()) {
StreamImpl stream = new StreamImpl(context, this, promise, seq++);
requests.add(stream);
if (requests.size() == 1) {
stream.promise.complete(stream);
}
return;
}
return;
err = new VertxException("Pipelining limit exceeded");
} else {
err = HttpUtils.CONNECTION_CLOSED_EXCEPTION;
}
}
promise.fail(HttpUtils.CONNECTION_CLOSED_EXCEPTION);
promise.fail(err);
} else {
eventLoop.execute(() -> {
createStream(context, promise);
Expand Down
4 changes: 2 additions & 2 deletions src/main/java/io/vertx/core/http/impl/HttpClientBase.java
Original file line number Diff line number Diff line change
Expand Up @@ -135,8 +135,8 @@ protected ProxyOptions computeProxyOptions(ProxyOptions proxyOptions, SocketAddr
return proxyOptions;
}

protected ClientSSLOptions sslOptions(RequestOptions requestOptions) {
ClientSSLOptions sslOptions = requestOptions.getSslOptions();
protected ClientSSLOptions sslOptions(HttpConnectOptions connectOptions) {
ClientSSLOptions sslOptions = connectOptions.getSslOptions();
if (sslOptions != null) {
sslOptions = new ClientSSLOptions(sslOptions);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,16 +55,6 @@ public interface HttpClientConnectionInternal extends HttpClientConnection {
*/
HttpClientConnectionInternal concurrencyChangeHandler(Handler<Long> handler);

/**
* @return the connection concurrency
*/
long concurrency();

/**
* @return the number of active streams
*/
long activeStreams();

/**
* @return whether the connection is pooled
*/
Expand Down
5 changes: 1 addition & 4 deletions src/main/java/io/vertx/core/http/impl/HttpClientImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -264,10 +264,7 @@ public Future<HttpClientConnection> connect(HttpConnectOptions connect) {
throw new IllegalArgumentException("Only socket address are currently supported");
}
HostAndPort authority = HostAndPort.create(host, port);
ClientSSLOptions sslOptions = connect.getSslOptions();
if (sslOptions == null) {
sslOptions = options.getSslOptions();
}
ClientSSLOptions sslOptions = sslOptions(connect);
ProxyOptions proxyOptions = computeProxyOptions(connect.getProxyOptions(), server);
ClientMetrics clientMetrics = metrics != null ? metrics.createEndpointMetrics(server, 1) : null;
Boolean ssl = connect.isSsl();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@ public class Http2ClientConnectionTest extends HttpClientConnectionTest {

@Override
protected HttpServerOptions createBaseServerOptions() {
return Http2TestBase.createHttp2ServerOptions(DEFAULT_HTTP_PORT, DEFAULT_HTTP_HOST);
return Http2TestBase.createHttp2ServerOptions(DEFAULT_HTTP_PORT, DEFAULT_HTTP_HOST)
.setInitialSettings(new Http2Settings().setMaxConcurrentStreams(10));
}

@Override
Expand Down
37 changes: 36 additions & 1 deletion src/test/java/io/vertx/core/http/HttpClientConnectionTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,16 @@
package io.vertx.core.http;

import io.netty.buffer.Unpooled;
import io.vertx.core.Future;
import io.vertx.core.MultiMap;
import io.vertx.core.http.impl.HttpClientConnectionInternal;
import io.vertx.core.http.impl.HttpClientInternal;
import io.vertx.core.http.impl.HttpRequestHead;
import io.vertx.core.impl.ContextInternal;
import io.vertx.core.net.HostAndPort;
import org.junit.Test;

import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;

public abstract class HttpClientConnectionTest extends HttpTestBase {

Expand Down Expand Up @@ -110,4 +111,38 @@ public void testConnectionClose() throws Exception {
}));
await();
}

@Test
public void testConcurrencyLimit() throws Exception {
server.requestHandler(req -> {
});
startServer(testAddress);
client.connect(new HttpConnectOptions().setServer(testAddress).setHost(requestOptions.getHost()).setPort(requestOptions.getPort())).onComplete(onSuccess(conn -> {
long concurrency = ((HttpClientConnectionInternal) conn).concurrency();
createRequestRecursively(conn, 0, (err, num) -> {
assertEquals(concurrency, (long)num);
testComplete();
});
}));
await();
}

private void createRequestRecursively(HttpClientConnection conn, long num, BiConsumer<Throwable, Long> callback) {
Future<HttpClientRequest> fut = conn.createRequest(new RequestOptions());
fut.onComplete(ar1 -> {
if (ar1.succeeded()) {
HttpClientRequest req = ar1.result();
req.setChunked(true);
req.write("Hello").onComplete(ar2 -> {
if (ar2.succeeded()) {
createRequestRecursively(conn, num + 1, callback);
} else {
callback.accept(ar2.cause(), num);
}
});
} else {
callback.accept(ar1.cause(), num);
}
});
}
}

0 comments on commit 786ae94

Please sign in to comment.