From 786ae9411bc4ee677c1f235731c7db6d28ccd22c Mon Sep 17 00:00:00 2001 From: Julien Viet Date: Tue, 16 Jan 2024 15:35:04 +0100 Subject: [PATCH] Ensure and test the max number of concurrent streams which can be created concurrently with a single connection. --- .../vertx/core/http/HttpClientConnection.java | 18 +++++++-- .../http/impl/Http1xClientConnection.java | 18 ++++++--- .../vertx/core/http/impl/HttpClientBase.java | 4 +- .../impl/HttpClientConnectionInternal.java | 10 ----- .../vertx/core/http/impl/HttpClientImpl.java | 5 +-- .../core/http/Http2ClientConnectionTest.java | 3 +- .../core/http/HttpClientConnectionTest.java | 37 ++++++++++++++++++- 7 files changed, 67 insertions(+), 28 deletions(-) diff --git a/src/main/java/io/vertx/core/http/HttpClientConnection.java b/src/main/java/io/vertx/core/http/HttpClientConnection.java index 1fd213fe665..560c7886941 100644 --- a/src/main/java/io/vertx/core/http/HttpClientConnection.java +++ b/src/main/java/io/vertx/core/http/HttpClientConnection.java @@ -21,20 +21,30 @@ @VertxGen public interface HttpClientConnection extends HttpConnection { + /** + * @return the number of active request/response (streams) + */ + long activeStreams(); + + /** + * @return the max number of active streams this connection can handle concurrently + */ + long concurrency(); + /** * Like {@link #createRequest(RequestOptions)} but without options. */ Future createRequest(); /** - * Create an HTTP request initialized with the specified request {@code options} + * Create an HTTP request (stream) initialized with the specified request {@code options}. * - * This enqueues a request in the client connection queue, the resulting future is notified when the connection can satisfy - * the request. + * No more than {@link #concurrency()} streams can be handled concurrently, when the number of {@link #activeStreams()} has + * reached {@link #concurrency()} the future is failed. * * Pooled HTTP connection will return an error, since requests should be made against the pool instead the connection itself. * - * @return a future notified with the created request + * @return a future notified with the created stream */ Future createRequest(RequestOptions options); diff --git a/src/main/java/io/vertx/core/http/impl/Http1xClientConnection.java b/src/main/java/io/vertx/core/http/impl/Http1xClientConnection.java index 6553f767231..324ffac4ee3 100644 --- a/src/main/java/io/vertx/core/http/impl/Http1xClientConnection.java +++ b/src/main/java/io/vertx/core/http/impl/Http1xClientConnection.java @@ -1242,17 +1242,23 @@ public Future createStream(ContextInternal context) { private void createStream(ContextInternal context, Promise promise) { EventLoop eventLoop = context.nettyEventLoop(); if (eventLoop.inEventLoop()) { + Throwable err; synchronized (this) { if (!closed) { - StreamImpl stream = new StreamImpl(context, this, promise, seq++); - requests.add(stream); - if (requests.size() == 1) { - stream.promise.complete(stream); + if (requests.size() < concurrency()) { + StreamImpl stream = new StreamImpl(context, this, promise, seq++); + requests.add(stream); + if (requests.size() == 1) { + stream.promise.complete(stream); + } + return; } - return; + err = new VertxException("Pipelining limit exceeded"); + } else { + err = HttpUtils.CONNECTION_CLOSED_EXCEPTION; } } - promise.fail(HttpUtils.CONNECTION_CLOSED_EXCEPTION); + promise.fail(err); } else { eventLoop.execute(() -> { createStream(context, promise); diff --git a/src/main/java/io/vertx/core/http/impl/HttpClientBase.java b/src/main/java/io/vertx/core/http/impl/HttpClientBase.java index f88c3107711..1d6bdf2f840 100644 --- a/src/main/java/io/vertx/core/http/impl/HttpClientBase.java +++ b/src/main/java/io/vertx/core/http/impl/HttpClientBase.java @@ -135,8 +135,8 @@ protected ProxyOptions computeProxyOptions(ProxyOptions proxyOptions, SocketAddr return proxyOptions; } - protected ClientSSLOptions sslOptions(RequestOptions requestOptions) { - ClientSSLOptions sslOptions = requestOptions.getSslOptions(); + protected ClientSSLOptions sslOptions(HttpConnectOptions connectOptions) { + ClientSSLOptions sslOptions = connectOptions.getSslOptions(); if (sslOptions != null) { sslOptions = new ClientSSLOptions(sslOptions); } else { diff --git a/src/main/java/io/vertx/core/http/impl/HttpClientConnectionInternal.java b/src/main/java/io/vertx/core/http/impl/HttpClientConnectionInternal.java index aace5ad93ac..00143dcfd5c 100644 --- a/src/main/java/io/vertx/core/http/impl/HttpClientConnectionInternal.java +++ b/src/main/java/io/vertx/core/http/impl/HttpClientConnectionInternal.java @@ -55,16 +55,6 @@ public interface HttpClientConnectionInternal extends HttpClientConnection { */ HttpClientConnectionInternal concurrencyChangeHandler(Handler handler); - /** - * @return the connection concurrency - */ - long concurrency(); - - /** - * @return the number of active streams - */ - long activeStreams(); - /** * @return whether the connection is pooled */ diff --git a/src/main/java/io/vertx/core/http/impl/HttpClientImpl.java b/src/main/java/io/vertx/core/http/impl/HttpClientImpl.java index d0b234f2b97..8a837ef01e2 100644 --- a/src/main/java/io/vertx/core/http/impl/HttpClientImpl.java +++ b/src/main/java/io/vertx/core/http/impl/HttpClientImpl.java @@ -264,10 +264,7 @@ public Future connect(HttpConnectOptions connect) { throw new IllegalArgumentException("Only socket address are currently supported"); } HostAndPort authority = HostAndPort.create(host, port); - ClientSSLOptions sslOptions = connect.getSslOptions(); - if (sslOptions == null) { - sslOptions = options.getSslOptions(); - } + ClientSSLOptions sslOptions = sslOptions(connect); ProxyOptions proxyOptions = computeProxyOptions(connect.getProxyOptions(), server); ClientMetrics clientMetrics = metrics != null ? metrics.createEndpointMetrics(server, 1) : null; Boolean ssl = connect.isSsl(); diff --git a/src/test/java/io/vertx/core/http/Http2ClientConnectionTest.java b/src/test/java/io/vertx/core/http/Http2ClientConnectionTest.java index a83cd3e86d2..c5c809db972 100644 --- a/src/test/java/io/vertx/core/http/Http2ClientConnectionTest.java +++ b/src/test/java/io/vertx/core/http/Http2ClientConnectionTest.java @@ -14,7 +14,8 @@ public class Http2ClientConnectionTest extends HttpClientConnectionTest { @Override protected HttpServerOptions createBaseServerOptions() { - return Http2TestBase.createHttp2ServerOptions(DEFAULT_HTTP_PORT, DEFAULT_HTTP_HOST); + return Http2TestBase.createHttp2ServerOptions(DEFAULT_HTTP_PORT, DEFAULT_HTTP_HOST) + .setInitialSettings(new Http2Settings().setMaxConcurrentStreams(10)); } @Override diff --git a/src/test/java/io/vertx/core/http/HttpClientConnectionTest.java b/src/test/java/io/vertx/core/http/HttpClientConnectionTest.java index abd17fe5827..613c5b117f3 100644 --- a/src/test/java/io/vertx/core/http/HttpClientConnectionTest.java +++ b/src/test/java/io/vertx/core/http/HttpClientConnectionTest.java @@ -11,15 +11,16 @@ package io.vertx.core.http; import io.netty.buffer.Unpooled; +import io.vertx.core.Future; import io.vertx.core.MultiMap; import io.vertx.core.http.impl.HttpClientConnectionInternal; import io.vertx.core.http.impl.HttpClientInternal; import io.vertx.core.http.impl.HttpRequestHead; import io.vertx.core.impl.ContextInternal; -import io.vertx.core.net.HostAndPort; import org.junit.Test; import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.BiConsumer; public abstract class HttpClientConnectionTest extends HttpTestBase { @@ -110,4 +111,38 @@ public void testConnectionClose() throws Exception { })); await(); } + + @Test + public void testConcurrencyLimit() throws Exception { + server.requestHandler(req -> { + }); + startServer(testAddress); + client.connect(new HttpConnectOptions().setServer(testAddress).setHost(requestOptions.getHost()).setPort(requestOptions.getPort())).onComplete(onSuccess(conn -> { + long concurrency = ((HttpClientConnectionInternal) conn).concurrency(); + createRequestRecursively(conn, 0, (err, num) -> { + assertEquals(concurrency, (long)num); + testComplete(); + }); + })); + await(); + } + + private void createRequestRecursively(HttpClientConnection conn, long num, BiConsumer callback) { + Future fut = conn.createRequest(new RequestOptions()); + fut.onComplete(ar1 -> { + if (ar1.succeeded()) { + HttpClientRequest req = ar1.result(); + req.setChunked(true); + req.write("Hello").onComplete(ar2 -> { + if (ar2.succeeded()) { + createRequestRecursively(conn, num + 1, callback); + } else { + callback.accept(ar2.cause(), num); + } + }); + } else { + callback.accept(ar1.cause(), num); + } + }); + } }