From bbc1f33b10ebf41a99734ae16548ebe1113012f9 Mon Sep 17 00:00:00 2001 From: Julien Viet Date: Fri, 12 Jan 2024 23:28:41 +0100 Subject: [PATCH] Implementation of an HTTP client connection API. --- .../http/HttpConnectOptionsConverter.java | 79 +++++ .../core/http/RequestOptionsConverter.java | 46 --- .../java/io/vertx/core/http/HttpClient.java | 10 + .../vertx/core/http/HttpClientConnection.java | 41 +++ .../io/vertx/core/http/HttpClientRequest.java | 2 +- .../vertx/core/http/HttpConnectOptions.java | 283 ++++++++++++++++++ .../io/vertx/core/http/HttpConnection.java | 2 +- .../io/vertx/core/http/RequestOptions.java | 188 ++---------- .../core/http/impl/CleanableHttpClient.java | 4 +- .../http/impl/Http1xClientConnection.java | 13 +- .../core/http/impl/Http2ClientConnection.java | 9 +- .../impl/Http2UpgradeClientConnection.java | 19 +- .../core/http/impl/HttpChannelConnector.java | 16 +- .../vertx/core/http/impl/HttpClientBase.java | 17 -- ...java => HttpClientConnectionInternal.java} | 35 ++- .../vertx/core/http/impl/HttpClientImpl.java | 140 +++++---- .../core/http/impl/HttpClientInternal.java | 17 -- .../core/http/impl/HttpClientRequestImpl.java | 47 ++- .../impl/HttpClientRequestPushPromise.java | 2 +- .../core/http/impl/HttpClientStream.java | 13 +- .../impl/SharedClientHttpStreamEndpoint.java | 36 +-- .../StatisticsGatheringHttpClientStream.java | 2 +- .../core/http/impl/WebSocketEndpoint.java | 10 +- .../core/spi/metrics/HttpClientMetrics.java | 2 +- .../core/http/Http1xClientConnectionTest.java | 22 +- .../io/vertx/core/http/Http2ClientTest.java | 4 +- .../core/http/HttpClientConnectionTest.java | 18 +- .../java/io/vertx/core/http/HttpTest.java | 2 +- 28 files changed, 675 insertions(+), 404 deletions(-) create mode 100644 src/main/generated/io/vertx/core/http/HttpConnectOptionsConverter.java create mode 100644 src/main/java/io/vertx/core/http/HttpClientConnection.java create mode 100644 src/main/java/io/vertx/core/http/HttpConnectOptions.java rename src/main/java/io/vertx/core/http/impl/{HttpClientConnection.java => HttpClientConnectionInternal.java} (67%) diff --git a/src/main/generated/io/vertx/core/http/HttpConnectOptionsConverter.java b/src/main/generated/io/vertx/core/http/HttpConnectOptionsConverter.java new file mode 100644 index 00000000000..c56de00ae53 --- /dev/null +++ b/src/main/generated/io/vertx/core/http/HttpConnectOptionsConverter.java @@ -0,0 +1,79 @@ +package io.vertx.core.http; + +import io.vertx.core.json.JsonObject; +import io.vertx.core.json.JsonArray; +import io.vertx.core.json.impl.JsonUtil; +import java.time.Instant; +import java.time.format.DateTimeFormatter; +import java.util.Base64; + +/** + * Converter and mapper for {@link io.vertx.core.http.HttpConnectOptions}. + * NOTE: This class has been automatically generated from the {@link io.vertx.core.http.HttpConnectOptions} original class using Vert.x codegen. + */ +public class HttpConnectOptionsConverter { + + + private static final Base64.Decoder BASE64_DECODER = JsonUtil.BASE64_DECODER; + private static final Base64.Encoder BASE64_ENCODER = JsonUtil.BASE64_ENCODER; + + static void fromJson(Iterable> json, HttpConnectOptions obj) { + for (java.util.Map.Entry member : json) { + switch (member.getKey()) { + case "proxyOptions": + if (member.getValue() instanceof JsonObject) { + obj.setProxyOptions(new io.vertx.core.net.ProxyOptions((io.vertx.core.json.JsonObject)member.getValue())); + } + break; + case "host": + if (member.getValue() instanceof String) { + obj.setHost((String)member.getValue()); + } + break; + case "port": + if (member.getValue() instanceof Number) { + obj.setPort(((Number)member.getValue()).intValue()); + } + break; + case "ssl": + if (member.getValue() instanceof Boolean) { + obj.setSsl((Boolean)member.getValue()); + } + break; + case "sslOptions": + if (member.getValue() instanceof JsonObject) { + obj.setSslOptions(new io.vertx.core.net.ClientSSLOptions((io.vertx.core.json.JsonObject)member.getValue())); + } + break; + case "connectTimeout": + if (member.getValue() instanceof Number) { + obj.setConnectTimeout(((Number)member.getValue()).longValue()); + } + break; + } + } + } + + static void toJson(HttpConnectOptions obj, JsonObject json) { + toJson(obj, json.getMap()); + } + + static void toJson(HttpConnectOptions obj, java.util.Map json) { + if (obj.getProxyOptions() != null) { + json.put("proxyOptions", obj.getProxyOptions().toJson()); + } + if (obj.getHost() != null) { + json.put("host", obj.getHost()); + } + if (obj.getPort() != null) { + json.put("port", obj.getPort()); + } + if (obj.isSsl() != null) { + json.put("ssl", obj.isSsl()); + } + if (obj.getSslOptions() != null) { + json.put("sslOptions", obj.getSslOptions().toJson()); + } + json.put("connectTimeout", obj.getConnectTimeout()); + } +} diff --git a/src/main/generated/io/vertx/core/http/RequestOptionsConverter.java b/src/main/generated/io/vertx/core/http/RequestOptionsConverter.java index 680d317d4fb..136f974371f 100644 --- a/src/main/generated/io/vertx/core/http/RequestOptionsConverter.java +++ b/src/main/generated/io/vertx/core/http/RequestOptionsConverter.java @@ -20,31 +20,6 @@ public class RequestOptionsConverter { static void fromJson(Iterable> json, RequestOptions obj) { for (java.util.Map.Entry member : json) { switch (member.getKey()) { - case "proxyOptions": - if (member.getValue() instanceof JsonObject) { - obj.setProxyOptions(new io.vertx.core.net.ProxyOptions((io.vertx.core.json.JsonObject)member.getValue())); - } - break; - case "host": - if (member.getValue() instanceof String) { - obj.setHost((String)member.getValue()); - } - break; - case "port": - if (member.getValue() instanceof Number) { - obj.setPort(((Number)member.getValue()).intValue()); - } - break; - case "ssl": - if (member.getValue() instanceof Boolean) { - obj.setSsl((Boolean)member.getValue()); - } - break; - case "sslOptions": - if (member.getValue() instanceof JsonObject) { - obj.setSslOptions(new io.vertx.core.net.ClientSSLOptions((io.vertx.core.json.JsonObject)member.getValue())); - } - break; case "uri": if (member.getValue() instanceof String) { obj.setURI((String)member.getValue()); @@ -60,11 +35,6 @@ static void fromJson(Iterable> json, Request obj.setTimeout(((Number)member.getValue()).longValue()); } break; - case "connectTimeout": - if (member.getValue() instanceof Number) { - obj.setConnectTimeout(((Number)member.getValue()).longValue()); - } - break; case "idleTimeout": if (member.getValue() instanceof Number) { obj.setIdleTimeout(((Number)member.getValue()).longValue()); @@ -89,21 +59,6 @@ static void toJson(RequestOptions obj, JsonObject json) { } static void toJson(RequestOptions obj, java.util.Map json) { - if (obj.getProxyOptions() != null) { - json.put("proxyOptions", obj.getProxyOptions().toJson()); - } - if (obj.getHost() != null) { - json.put("host", obj.getHost()); - } - if (obj.getPort() != null) { - json.put("port", obj.getPort()); - } - if (obj.isSsl() != null) { - json.put("ssl", obj.isSsl()); - } - if (obj.getSslOptions() != null) { - json.put("sslOptions", obj.getSslOptions().toJson()); - } if (obj.getURI() != null) { json.put("uri", obj.getURI()); } @@ -111,7 +66,6 @@ static void toJson(RequestOptions obj, java.util.Map json) { json.put("followRedirects", obj.getFollowRedirects()); } json.put("timeout", obj.getTimeout()); - json.put("connectTimeout", obj.getConnectTimeout()); json.put("idleTimeout", obj.getIdleTimeout()); if (obj.getTraceOperation() != null) { json.put("traceOperation", obj.getTraceOperation()); diff --git a/src/main/java/io/vertx/core/http/HttpClient.java b/src/main/java/io/vertx/core/http/HttpClient.java index fc3bd9aaaaa..1b53897a33a 100644 --- a/src/main/java/io/vertx/core/http/HttpClient.java +++ b/src/main/java/io/vertx/core/http/HttpClient.java @@ -14,6 +14,8 @@ import io.vertx.codegen.annotations.VertxGen; import io.vertx.core.Future; import io.vertx.core.net.ClientSSLOptions; +import io.vertx.core.net.HostAndPort; +import io.vertx.core.net.SocketAddress; import java.util.concurrent.TimeUnit; @@ -145,4 +147,12 @@ default Future updateSSLOptions(ClientSSLOptions options) { * @return a future signaling the update success */ Future updateSSLOptions(ClientSSLOptions options, boolean force); + + /** + * Connect to a remote HTTP server. + * + * @param options the server connect options + */ + Future connect(HttpConnectOptions options); + } diff --git a/src/main/java/io/vertx/core/http/HttpClientConnection.java b/src/main/java/io/vertx/core/http/HttpClientConnection.java new file mode 100644 index 00000000000..602a205cf87 --- /dev/null +++ b/src/main/java/io/vertx/core/http/HttpClientConnection.java @@ -0,0 +1,41 @@ +/* + * Copyright (c) 2011-2024 Contributors to the Eclipse Foundation + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 + * which is available at https://www.apache.org/licenses/LICENSE-2.0. + * + * SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 + */ +package io.vertx.core.http; + +import io.vertx.codegen.annotations.VertxGen; +import io.vertx.core.Future; + +/** + * Represents an HTTP client connection. + * + * @author Julien Viet + */ +@VertxGen +public interface HttpClientConnection extends HttpConnection { + + /** + * Like {@link #createRequest(RequestOptions)} but with null options. + */ + Future createRequest(); + + /** + * Create an HTTP request initialized with the specified request {@code options} + * + * This enqueues a request in the client connection queue, the resulting future is notified when the connection can satisfy + * the request. + * + * Pooled HTTP connection will return an error, since requests should be made against the pool instead the connection itself. + * + * @return a future notified with the created request + */ + Future createRequest(RequestOptions options); + +} diff --git a/src/main/java/io/vertx/core/http/HttpClientRequest.java b/src/main/java/io/vertx/core/http/HttpClientRequest.java index d68a6c2df78..5ebc644b11b 100644 --- a/src/main/java/io/vertx/core/http/HttpClientRequest.java +++ b/src/main/java/io/vertx/core/http/HttpClientRequest.java @@ -478,7 +478,7 @@ default boolean reset() { * @return the {@link HttpConnection} associated with this request */ @CacheReturn - HttpConnection connection(); + HttpClientConnection connection(); /** * Write an HTTP/2 frame to the request, allowing to extend the HTTP/2 protocol.

diff --git a/src/main/java/io/vertx/core/http/HttpConnectOptions.java b/src/main/java/io/vertx/core/http/HttpConnectOptions.java new file mode 100644 index 00000000000..822689ee4c0 --- /dev/null +++ b/src/main/java/io/vertx/core/http/HttpConnectOptions.java @@ -0,0 +1,283 @@ +/* + * Copyright (c) 2011-2019 Contributors to the Eclipse Foundation + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 + * which is available at https://www.apache.org/licenses/LICENSE-2.0. + * + * SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 + */ + +package io.vertx.core.http; + +import io.vertx.codegen.annotations.DataObject; +import io.vertx.codegen.json.annotations.JsonGen; +import io.vertx.core.VertxException; +import io.vertx.core.json.JsonObject; +import io.vertx.core.net.Address; +import io.vertx.core.net.ClientSSLOptions; +import io.vertx.core.net.ProxyOptions; +import io.vertx.core.net.SocketAddress; + +import java.net.MalformedURLException; +import java.net.URL; + +/** + * Options describing how an {@link HttpClient} will connect to a server. + * + * @author Julien Viet + */ +@DataObject +@JsonGen(publicConverter = false) +public class HttpConnectOptions { + + /** + * The default value for proxy options = {@code null} + */ + public static final ProxyOptions DEFAULT_PROXY_OPTIONS = null; + + /** + * The default value for server method = {@code null} + */ + public static final SocketAddress DEFAULT_SERVER = null; + + /** + * The default value for host name = {@code null} + */ + public static final String DEFAULT_HOST = null; + + /** + * The default value for port = {@code null} + */ + public static final Integer DEFAULT_PORT = null; + + /** + * The default value for SSL = {@code null} + */ + public static final Boolean DEFAULT_SSL = null; + + /** + * The default connect timeout = {@code -1L} (disabled) + */ + public static final long DEFAULT_CONNECT_TIMEOUT = -1L; + + private ProxyOptions proxyOptions; + private Address server; + private String host; + private Integer port; + private Boolean ssl; + private ClientSSLOptions sslOptions;; + private long connectTimeout; + + /** + * Default constructor + */ + public HttpConnectOptions() { + init(); + } + + /** + * Copy constructor + * + * @param other the options to copy + */ + public HttpConnectOptions(HttpConnectOptions other) { + init(); + setProxyOptions(other.proxyOptions); + setServer(other.server); + setHost(other.host); + setPort(other.port); + setSsl(other.ssl); + sslOptions = other.sslOptions != null ? new ClientSSLOptions(other.sslOptions) : null; + setConnectTimeout(other.connectTimeout); + } + + /** + * Create options from JSON + * + * @param json the JSON + */ + public HttpConnectOptions(JsonObject json) { + init(); + HttpConnectOptionsConverter.fromJson(json, this); + JsonObject server = json.getJsonObject("server"); + if (server != null) { + this.server = SocketAddress.fromJson(server); + } + } + + protected void init() { + proxyOptions = DEFAULT_PROXY_OPTIONS; + server = DEFAULT_SERVER; + host = DEFAULT_HOST; + port = DEFAULT_PORT; + ssl = DEFAULT_SSL; + sslOptions = null; + connectTimeout = DEFAULT_CONNECT_TIMEOUT; + } + + /** + * Get the proxy options override for connections + * + * @return proxy options override + */ + public ProxyOptions getProxyOptions() { + return proxyOptions; + } + + /** + * Override the {@link HttpClientOptions#setProxyOptions(ProxyOptions)} proxy options + * for connections. + * + * @param proxyOptions proxy options override object + * @return a reference to this, so the API can be used fluently + */ + public HttpConnectOptions setProxyOptions(ProxyOptions proxyOptions) { + this.proxyOptions = proxyOptions; + return this; + } + + /** + * Get the server address to be used by the client request. + * + * @return the server address + */ + public Address getServer() { + return server; + } + + /** + * Set the server address to be used by the client request. + * + *

When the server address is {@code null}, the address will be resolved after the {@code host} + * property by the Vert.x resolver. + * + *

Use this when you want to connect to a specific server address without name resolution. + * + * @return a reference to this, so the API can be used fluently + */ + public HttpConnectOptions setServer(Address server) { + this.server = server; + return this; + } + + /** + * Get the host name to be used by the client request. + * + * @return the host name + */ + public String getHost() { + return host; + } + + /** + * Set the host name to be used by the client request. + * + * @return a reference to this, so the API can be used fluently + */ + public HttpConnectOptions setHost(String host) { + this.host = host; + return this; + } + + /** + * Get the port to be used by the client request. + * + * @return the port + */ + public Integer getPort() { + return port; + } + + /** + * Set the port to be used by the client request. + * + * @return a reference to this, so the API can be used fluently + */ + public HttpConnectOptions setPort(Integer port) { + this.port = port; + return this; + } + + /** + * @return is SSL/TLS enabled? + */ + public Boolean isSsl() { + return ssl; + } + + /** + * Set whether SSL/TLS is enabled. + * + * @param ssl true if enabled + * @return a reference to this, so the API can be used fluently + */ + public HttpConnectOptions setSsl(Boolean ssl) { + this.ssl = ssl; + return this; + } + + /** + * @return the SSL options + */ + public ClientSSLOptions getSslOptions() { + return sslOptions; + } + + /** + * Set the SSL options to use. + *

+ * When none is provided, the client SSL options will be used instead. + * @param sslOptions the SSL options to use + * @return a reference to this, so the API can be used fluently + */ + public HttpConnectOptions setSslOptions(ClientSSLOptions sslOptions) { + this.sslOptions = sslOptions; + return this; + } + + /** + * @return the amount of time after which, if the request is not obtained from the client within the timeout period, + * the {@code Future} obtained from the client is failed with a {@link java.util.concurrent.TimeoutException} + */ + public long getConnectTimeout() { + return connectTimeout; + } + + /** + * Sets the amount of time after which, if the request is not obtained from the client within the timeout period, + * the {@code Future} obtained from the client is failed with a {@link java.util.concurrent.TimeoutException}. + * + * Note this is not related to the TCP {@link HttpClientOptions#setConnectTimeout(int)} option, when a request is made against + * a pooled HTTP client, the timeout applies to the duration to obtain a connection from the pool to serve the request, the timeout + * might fire because the server does not respond in time or the pool is too busy to serve a request. + * + * @param timeout the amount of time in milliseconds. + * @return a reference to this, so the API can be used fluently + */ + public HttpConnectOptions setConnectTimeout(long timeout) { + this.connectTimeout = timeout; + return this; + } + + private URL parseUrl(String surl) { + // Note - parsing a URL this way is slower than specifying host, port and relativeURI + try { + return new URL(surl); + } catch (MalformedURLException e) { + throw new VertxException("Invalid url: " + surl, e); + } + } + + public JsonObject toJson() { + JsonObject json = new JsonObject(); + HttpConnectOptionsConverter.toJson(this, json); + Address serverAddr = this.server; + if (serverAddr instanceof SocketAddress) { + SocketAddress socketAddr = (SocketAddress) serverAddr; + json.put("server", socketAddr.toJson()); + } + return json; + } +} diff --git a/src/main/java/io/vertx/core/http/HttpConnection.java b/src/main/java/io/vertx/core/http/HttpConnection.java index 8a11625b287..9b1d97a0bb4 100644 --- a/src/main/java/io/vertx/core/http/HttpConnection.java +++ b/src/main/java/io/vertx/core/http/HttpConnection.java @@ -27,7 +27,7 @@ /** * Represents an HTTP connection. *

- * HTTP/1.x connection provides an limited implementation, the following methods are implemented: + * HTTP/1.x connection provides a limited implementation, the following methods are implemented: *

    *
  • {@link #close}
  • *
  • {@link #closeHandler}
  • diff --git a/src/main/java/io/vertx/core/http/RequestOptions.java b/src/main/java/io/vertx/core/http/RequestOptions.java index a7de893d99b..315a70f8188 100644 --- a/src/main/java/io/vertx/core/http/RequestOptions.java +++ b/src/main/java/io/vertx/core/http/RequestOptions.java @@ -34,12 +34,7 @@ */ @DataObject @JsonGen(publicConverter = false) -public class RequestOptions { - - /** - * The default value for proxy options = {@code null} - */ - public static final ProxyOptions DEFAULT_PROXY_OPTIONS = null; +public class RequestOptions extends HttpConnectOptions { /** * The default value for server method = {@code null} @@ -91,18 +86,11 @@ public class RequestOptions { */ public static final long DEFAULT_IDLE_TIMEOUT = -1L; - private ProxyOptions proxyOptions; - private Address server; private HttpMethod method; - private String host; - private Integer port; - private Boolean ssl; - private ClientSSLOptions sslOptions;; private String uri; private MultiMap headers; private boolean followRedirects; private long timeout; - private long connectTimeout; private long idleTimeout; private String traceOperation; @@ -110,19 +98,7 @@ public class RequestOptions { * Default constructor */ public RequestOptions() { - proxyOptions = DEFAULT_PROXY_OPTIONS; - server = DEFAULT_SERVER; - method = DEFAULT_HTTP_METHOD; - host = DEFAULT_HOST; - port = DEFAULT_PORT; - ssl = DEFAULT_SSL; - sslOptions = null; - uri = DEFAULT_URI; - followRedirects = DEFAULT_FOLLOW_REDIRECTS; - timeout = DEFAULT_TIMEOUT; - connectTimeout = DEFAULT_CONNECT_TIMEOUT; - idleTimeout = DEFAULT_IDLE_TIMEOUT; - traceOperation = null; + super(); } /** @@ -131,17 +107,10 @@ public RequestOptions() { * @param other the options to copy */ public RequestOptions(RequestOptions other) { - setProxyOptions(other.proxyOptions); - setServer(other.server); - setMethod(other.method); - setHost(other.host); - setPort(other.port); - setSsl(other.ssl); - sslOptions = other.sslOptions != null ? new ClientSSLOptions(other.sslOptions) : null; + super(other); setURI(other.uri); setFollowRedirects(other.followRedirects); setIdleTimeout(other.idleTimeout); - setConnectTimeout(other.connectTimeout); setTimeout(other.timeout); if (other.headers != null) { setHeaders(MultiMap.caseInsensitiveMultiMap().setAll(other.headers)); @@ -155,16 +124,12 @@ public RequestOptions(RequestOptions other) { * @param json the JSON */ public RequestOptions(JsonObject json) { - this(); + super(json); RequestOptionsConverter.fromJson(json, this); String method = json.getString("method"); if (method != null) { setMethod(HttpMethod.valueOf(method)); } - JsonObject server = json.getJsonObject("server"); - if (server != null) { - this.server = SocketAddress.fromJson(server); - } JsonObject headers = json.getJsonObject("headers"); if (headers != null) { for (Map.Entry entry : headers) { @@ -182,48 +147,24 @@ public RequestOptions(JsonObject json) { } } - /** - * Get the proxy options override for connections - * - * @return proxy options override - */ - public ProxyOptions getProxyOptions() { - return proxyOptions; + @Override + protected void init() { + super.init(); + method = DEFAULT_HTTP_METHOD; + uri = DEFAULT_URI; + followRedirects = DEFAULT_FOLLOW_REDIRECTS; + timeout = DEFAULT_TIMEOUT; + idleTimeout = DEFAULT_IDLE_TIMEOUT; + traceOperation = null; } - /** - * Override the {@link HttpClientOptions#setProxyOptions(ProxyOptions)} proxy options - * for connections. - * - * @param proxyOptions proxy options override object - * @return a reference to this, so the API can be used fluently - */ public RequestOptions setProxyOptions(ProxyOptions proxyOptions) { - this.proxyOptions = proxyOptions; + super.setProxyOptions(proxyOptions); return this; } - /** - * Get the server address to be used by the client request. - * - * @return the server address - */ - public Address getServer() { - return server; - } - - /** - * Set the server address to be used by the client request. - * - *

    When the server address is {@code null}, the address will be resolved after the {@code host} - * property by the Vert.x resolver. - * - *

    Use this when you want to connect to a specific server address without name resolution. - * - * @return a reference to this, so the API can be used fluently - */ public RequestOptions setServer(Address server) { - this.server = server; + super.setServer(server); return this; } @@ -248,78 +189,23 @@ public RequestOptions setMethod(HttpMethod method) { return this; } - /** - * Get the host name to be used by the client request. - * - * @return the host name - */ - public String getHost() { - return host; - } - - /** - * Set the host name to be used by the client request. - * - * @return a reference to this, so the API can be used fluently - */ public RequestOptions setHost(String host) { - this.host = host; + super.setHost(host); return this; } - /** - * Get the port to be used by the client request. - * - * @return the port - */ - public Integer getPort() { - return port; - } - - /** - * Set the port to be used by the client request. - * - * @return a reference to this, so the API can be used fluently - */ public RequestOptions setPort(Integer port) { - this.port = port; + super.setPort(port); return this; } - /** - * @return is SSL/TLS enabled? - */ - public Boolean isSsl() { - return ssl; - } - - /** - * Set whether SSL/TLS is enabled. - * - * @param ssl true if enabled - * @return a reference to this, so the API can be used fluently - */ public RequestOptions setSsl(Boolean ssl) { - this.ssl = ssl; + super.setSsl(ssl); return this; } - /** - * @return the SSL options - */ - public ClientSSLOptions getSslOptions() { - return sslOptions; - } - - /** - * Set the SSL options to use. - *

    - * When none is provided, the client SSL options will be used instead. - * @param sslOptions the SSL options to use - * @return a reference to this, so the API can be used fluently - */ public RequestOptions setSslOptions(ClientSSLOptions sslOptions) { - this.sslOptions = sslOptions; + super.setSslOptions(sslOptions); return this; } @@ -384,27 +270,8 @@ public RequestOptions setTimeout(long timeout) { return this; } - /** - * @return the amount of time after which, if the request is not obtained from the client within the timeout period, - * the {@code Future} obtained from the client is failed with a {@link java.util.concurrent.TimeoutException} - */ - public long getConnectTimeout() { - return connectTimeout; - } - - /** - * Sets the amount of time after which, if the request is not obtained from the client within the timeout period, - * the {@code Future} obtained from the client is failed with a {@link java.util.concurrent.TimeoutException}. - * - * Note this is not related to the TCP {@link HttpClientOptions#setConnectTimeout(int)} option, when a request is made against - * a pooled HTTP client, the timeout applies to the duration to obtain a connection from the pool to serve the request, the timeout - * might fire because the server does not respond in time or the pool is too busy to serve a request. - * - * @param timeout the amount of time in milliseconds. - * @return a reference to this, so the API can be used fluently - */ public RequestOptions setConnectTimeout(long timeout) { - this.connectTimeout = timeout; + super.setConnectTimeout(timeout); return this; } @@ -483,10 +350,10 @@ public RequestOptions setAbsoluteURI(URL url) { default: throw new IllegalArgumentException(); } - this.uri = relativeUri; - this.port = port; - this.ssl = ssl; - this.host = url.getHost(); + setURI(relativeUri); + setPort(port); + setSsl(ssl); + setHost(url.getHost()); return this; } @@ -633,16 +500,11 @@ public RequestOptions setTraceOperation(String op) { } public JsonObject toJson() { - JsonObject json = new JsonObject(); + JsonObject json = super.toJson(); RequestOptionsConverter.toJson(this, json); if (method != null) { json.put("method", method.name()); } - Address serverAddr = this.server; - if (serverAddr instanceof SocketAddress) { - SocketAddress socketAddr = (SocketAddress) serverAddr; - json.put("server", socketAddr.toJson()); - } if (this.headers != null) { JsonObject headers = new JsonObject(); for (String name : this.headers.names()) { diff --git a/src/main/java/io/vertx/core/http/impl/CleanableHttpClient.java b/src/main/java/io/vertx/core/http/impl/CleanableHttpClient.java index 476a275f5c8..63fcb511f77 100644 --- a/src/main/java/io/vertx/core/http/impl/CleanableHttpClient.java +++ b/src/main/java/io/vertx/core/http/impl/CleanableHttpClient.java @@ -116,7 +116,7 @@ public void close(Promise completion) { } @Override - public Future connect(SocketAddress server, HostAndPort peer) { - return delegate.connect(server, peer); + public Future connect(HttpConnectOptions options) { + return delegate.connect(options); } } diff --git a/src/main/java/io/vertx/core/http/impl/Http1xClientConnection.java b/src/main/java/io/vertx/core/http/impl/Http1xClientConnection.java index bf0de1c24a3..bffdcf5d229 100644 --- a/src/main/java/io/vertx/core/http/impl/Http1xClientConnection.java +++ b/src/main/java/io/vertx/core/http/impl/Http1xClientConnection.java @@ -71,7 +71,7 @@ * * @author Tim Fox */ -public class Http1xClientConnection extends Http1xConnectionBase implements HttpClientConnection { +public class Http1xClientConnection extends Http1xConnectionBase implements HttpClientConnectionInternal { private static final Logger log = LoggerFactory.getLogger(Http1xClientConnection.class); @@ -138,7 +138,7 @@ public HostAndPort authority() { } @Override - public HttpClientConnection evictionHandler(Handler handler) { + public HttpClientConnectionInternal evictionHandler(Handler handler) { evictionHandler = handler; return this; } @@ -154,7 +154,7 @@ protected void handleEvent(Object evt) { } @Override - public HttpClientConnection concurrencyChangeHandler(Handler handler) { + public HttpClientConnectionInternal concurrencyChangeHandler(Handler handler) { // Never changes return this; } @@ -553,7 +553,7 @@ public HttpVersion version() { } @Override - public HttpClientConnection connection() { + public HttpClientConnectionInternal connection() { return conn; } @@ -1224,11 +1224,6 @@ protected void handleException(Throwable e) { } } - @Override - public Future createRequest(ContextInternal context) { - return ((HttpClientImpl)client).createRequest(this, context); - } - @Override public Future createStream(ContextInternal context) { PromiseInternal promise = context.promise(); diff --git a/src/main/java/io/vertx/core/http/impl/Http2ClientConnection.java b/src/main/java/io/vertx/core/http/impl/Http2ClientConnection.java index c3dc2287b00..23d3294f44e 100644 --- a/src/main/java/io/vertx/core/http/impl/Http2ClientConnection.java +++ b/src/main/java/io/vertx/core/http/impl/Http2ClientConnection.java @@ -38,7 +38,7 @@ /** * @author Julien Viet */ -class Http2ClientConnection extends Http2ConnectionBase implements HttpClientConnection { +class Http2ClientConnection extends Http2ConnectionBase implements HttpClientConnectionInternal { private final HttpClientBase client; private final ClientMetrics metrics; @@ -157,11 +157,6 @@ public Future createStream(ContextInternal context) { } } - @Override - public Future createRequest(ContextInternal context) { - return ((HttpClientImpl)client).createRequest(this, context); - } - private StreamImpl createStream2(ContextInternal context) { return new StreamImpl(this, context, false); } @@ -661,7 +656,7 @@ public void reset(Throwable cause) { } @Override - public HttpClientConnection connection() { + public HttpClientConnectionInternal connection() { return conn; } } diff --git a/src/main/java/io/vertx/core/http/impl/Http2UpgradeClientConnection.java b/src/main/java/io/vertx/core/http/impl/Http2UpgradeClientConnection.java index 1ac47f62926..c371166f032 100644 --- a/src/main/java/io/vertx/core/http/impl/Http2UpgradeClientConnection.java +++ b/src/main/java/io/vertx/core/http/impl/Http2UpgradeClientConnection.java @@ -45,14 +45,14 @@ * A connection that attempts to perform a protocol upgrade to H2C. The connection might use HTTP/1 or H2C * depending on the initial server response. */ -public class Http2UpgradeClientConnection implements HttpClientConnection { +public class Http2UpgradeClientConnection implements HttpClientConnectionInternal { private static final Object SEND_BUFFERED_MESSAGES = new Object(); private static final Logger log = LoggerFactory.getLogger(Http2UpgradeClientConnection.class); private HttpClientBase client; - private HttpClientConnection current; + private HttpClientConnectionInternal current; private boolean upgradeProcessed; private Handler closeHandler; @@ -69,7 +69,7 @@ public class Http2UpgradeClientConnection implements HttpClientConnection { this.current = connection; } - public HttpClientConnection unwrap() { + public HttpClientConnectionInternal unwrap() { return current; } @@ -144,7 +144,7 @@ public HttpVersion version() { } @Override - public HttpClientConnection connection() { + public HttpClientConnectionInternal connection() { return connection; } @@ -301,7 +301,7 @@ private static class UpgradingStream implements HttpClientStream { } @Override - public HttpClientConnection connection() { + public HttpClientConnectionInternal connection() { return upgradedConnection; } @@ -794,11 +794,6 @@ public Future createStream(ContextInternal context) { } } - @Override - public Future createRequest(ContextInternal context) { - return ((HttpClientImpl)client).createRequest(this, context); - } - @Override public ContextInternal getContext() { return current.getContext(); @@ -863,7 +858,7 @@ public HttpConnection exceptionHandler(Handler handler) { } @Override - public HttpClientConnection evictionHandler(Handler handler) { + public HttpClientConnectionInternal evictionHandler(Handler handler) { if (current instanceof Http1xClientConnection) { evictionHandler = handler; } @@ -872,7 +867,7 @@ public HttpClientConnection evictionHandler(Handler handler) { } @Override - public HttpClientConnection concurrencyChangeHandler(Handler handler) { + public HttpClientConnectionInternal concurrencyChangeHandler(Handler handler) { if (current instanceof Http1xClientConnection) { concurrencyChangeHandler = handler; } diff --git a/src/main/java/io/vertx/core/http/impl/HttpChannelConnector.java b/src/main/java/io/vertx/core/http/impl/HttpChannelConnector.java index 54d5d4b24df..9b40e12aec7 100644 --- a/src/main/java/io/vertx/core/http/impl/HttpChannelConnector.java +++ b/src/main/java/io/vertx/core/http/impl/HttpChannelConnector.java @@ -107,10 +107,10 @@ private void connect(ContextInternal context, Promise promise) { netClient.connectInternal(connectOptions, promise, context); } - public Future wrap(ContextInternal context, NetSocket so_) { + public Future wrap(ContextInternal context, NetSocket so_) { NetSocketImpl so = (NetSocketImpl) so_; Object metric = so.metric(); - PromiseInternal promise = context.promise(); + PromiseInternal promise = context.promise(); // Remove all un-necessary handlers ChannelPipeline pipeline = so.channelHandlerContext().pipeline(); @@ -158,12 +158,12 @@ public Future wrap(ContextInternal context, NetSocket so_) return promise.future(); } - public Future httpConnect(ContextInternal context) { + public Future httpConnect(ContextInternal context) { Promise promise = context.promise(); Future future = promise.future(); // We perform the compose operation before calling connect to be sure that the composition happens // before the promise is completed by the connect operation - Future ret = future.compose(so -> wrap(context, so)); + Future ret = future.compose(so -> wrap(context, so)); connect(context, promise); return ret; } @@ -205,7 +205,7 @@ private void http1xConnected(HttpVersion version, ContextInternal context, Object socketMetric, Channel ch, - Promise future) { + Promise future) { boolean upgrade = version == HttpVersion.HTTP_2 && options.isHttp2ClearTextUpgrade(); VertxHandler clientHandler = VertxHandler.create(chctx -> { HttpClientMetrics met = client.metrics(); @@ -229,7 +229,7 @@ private void http1xConnected(HttpVersion version, HttpClientStream stream = ar.result(); stream.headHandler(resp -> { Http2UpgradeClientConnection connection = (Http2UpgradeClientConnection) stream.connection(); - HttpClientConnection unwrap = connection.unwrap(); + HttpClientConnectionInternal unwrap = connection.unwrap(); future.tryComplete(unwrap); }); stream.exceptionHandler(future::tryFail); @@ -253,7 +253,7 @@ private void http1xConnected(HttpVersion version, private void http2Connected(ContextInternal context, Object metric, Channel ch, - PromiseInternal promise) { + PromiseInternal promise) { VertxHttp2ConnectionHandler clientHandler; try { clientHandler = Http2ClientConnection.createHttp2ConnectionHandler(client, metrics, context, false, metric, authority); @@ -266,7 +266,7 @@ private void http2Connected(ContextInternal context, clientHandler.connectFuture().addListener(promise); } - private void connectFailed(Channel ch, Throwable t, Promise future) { + private void connectFailed(Channel ch, Throwable t, Promise future) { if (ch != null) { try { ch.close(); diff --git a/src/main/java/io/vertx/core/http/impl/HttpClientBase.java b/src/main/java/io/vertx/core/http/impl/HttpClientBase.java index f7e0eb3c437..f88c3107711 100644 --- a/src/main/java/io/vertx/core/http/impl/HttpClientBase.java +++ b/src/main/java/io/vertx/core/http/impl/HttpClientBase.java @@ -149,23 +149,6 @@ HttpClientMetrics metrics() { return metrics; } - /** - * Connect to a server. - */ - public Future connect(SocketAddress server) { - return connect(server, null); - } - - /** - * Connect to a server. - */ - public Future connect(SocketAddress server, HostAndPort peer) { - ContextInternal context = vertx.getOrCreateContext(); - HttpChannelConnector connector = new HttpChannelConnector(this, netClient, defaultSslOptions, null, null, options.getProtocolVersion(), options.isSsl(), options.isUseAlpn(), peer, server); - return connector.httpConnect(context); - } - - protected void doShutdown(Promise p) { netClient.shutdown(closeTimeout, closeTimeoutUnit).onComplete(p); } diff --git a/src/main/java/io/vertx/core/http/impl/HttpClientConnection.java b/src/main/java/io/vertx/core/http/impl/HttpClientConnectionInternal.java similarity index 67% rename from src/main/java/io/vertx/core/http/impl/HttpClientConnection.java rename to src/main/java/io/vertx/core/http/impl/HttpClientConnectionInternal.java index 6614daac91b..2f256ea7a98 100644 --- a/src/main/java/io/vertx/core/http/impl/HttpClientConnection.java +++ b/src/main/java/io/vertx/core/http/impl/HttpClientConnectionInternal.java @@ -15,8 +15,9 @@ import io.netty.channel.ChannelHandlerContext; import io.vertx.core.Future; import io.vertx.core.Handler; +import io.vertx.core.http.HttpClientConnection; import io.vertx.core.http.HttpClientRequest; -import io.vertx.core.http.HttpConnection; +import io.vertx.core.http.RequestOptions; import io.vertx.core.impl.ContextInternal; import io.vertx.core.impl.logging.Logger; import io.vertx.core.impl.logging.LoggerFactory; @@ -25,9 +26,9 @@ /** * @author Julien Viet */ -public interface HttpClientConnection extends HttpConnection { +public interface HttpClientConnectionInternal extends HttpClientConnection { - Logger log = LoggerFactory.getLogger(HttpClientConnection.class); + Logger log = LoggerFactory.getLogger(HttpClientConnectionInternal.class); Handler DEFAULT_EVICTION_HANDLER = v -> { log.warn("Connection evicted"); @@ -43,7 +44,7 @@ public interface HttpClientConnection extends HttpConnection { * @param handler the handler * @return a reference to this, so the API can be used fluently */ - HttpClientConnection evictionHandler(Handler handler); + HttpClientConnectionInternal evictionHandler(Handler handler); /** * Set a {@code handler} called when the connection concurrency changes. @@ -52,7 +53,7 @@ public interface HttpClientConnection extends HttpConnection { * @param handler the handler * @return a reference to this, so the API can be used fluently */ - HttpClientConnection concurrencyChangeHandler(Handler handler); + HttpClientConnectionInternal concurrencyChangeHandler(Handler handler); /** * @return the connection concurrency @@ -78,9 +79,29 @@ public interface HttpClientConnection extends HttpConnection { * Create an HTTP stream. * * @param context the stream context - * @return a future notified with the created stream + * @return a future notified with the created request */ - Future createRequest(ContextInternal context); + default Future createRequest(ContextInternal context, RequestOptions options) { + return createStream(context).map(stream -> { + HttpClientRequestImpl request = new HttpClientRequestImpl(stream); + if (options != null) { + request.init(options); + } + return request; + }); + } + + @Override + default Future createRequest() { + ContextInternal ctx = getContext().owner().getOrCreateContext(); + return createRequest(ctx, null); + } + + @Override + default Future createRequest(RequestOptions options) { + ContextInternal ctx = getContext().owner().getOrCreateContext(); + return createRequest(ctx, options); + } /** * Create an HTTP stream. diff --git a/src/main/java/io/vertx/core/http/impl/HttpClientImpl.java b/src/main/java/io/vertx/core/http/impl/HttpClientImpl.java index 6a581e72c1a..2b3cd39eb2a 100644 --- a/src/main/java/io/vertx/core/http/impl/HttpClientImpl.java +++ b/src/main/java/io/vertx/core/http/impl/HttpClientImpl.java @@ -17,13 +17,11 @@ import io.vertx.core.Promise; import io.vertx.core.http.*; import io.vertx.core.impl.*; -import io.vertx.core.loadbalancing.LoadBalancer; import io.vertx.core.net.*; import io.vertx.core.net.impl.endpoint.EndpointManager; import io.vertx.core.net.impl.endpoint.EndpointProvider; import io.vertx.core.net.impl.pool.*; import io.vertx.core.spi.resolver.endpoint.EndpointRequest; -import io.vertx.core.net.impl.resolver.EndpointResolverImpl; import io.vertx.core.spi.resolver.endpoint.EndpointLookup; import io.vertx.core.spi.metrics.ClientMetrics; import io.vertx.core.spi.metrics.MetricsProvider; @@ -46,7 +44,7 @@ public class HttpClientImpl extends HttpClientBase implements HttpClientInternal, MetricsProvider { // Pattern to check we are not dealing with an absoluate URI - private static final Pattern ABS_URI_START_PATTERN = Pattern.compile("^\\p{Alpha}[\\p{Alpha}\\p{Digit}+.\\-]*:"); + static final Pattern ABS_URI_START_PATTERN = Pattern.compile("^\\p{Alpha}[\\p{Alpha}\\p{Digit}+.\\-]*:"); private static final Function> DEFAULT_HANDLER = resp -> { try { @@ -234,6 +232,65 @@ Handler connectionHandler() { return connectionHandler; } + @Override + public Future connect(HttpConnectOptions connect) { + Address addr = connect.getServer(); + Integer port = connect.getPort(); + String host = connect.getHost(); + SocketAddress server; + if (addr == null) { + if (port == null) { + port = options.getDefaultPort(); + } + if (host == null) { + host = options.getDefaultHost(); + } + server = SocketAddress.inetSocketAddress(port, host); + } else if (addr instanceof SocketAddress) { + server = (SocketAddress) addr; + if (port == null) { + port = connect.getPort(); + } + if (host == null) { + host = connect.getHost(); + } + if (port == null) { + port = server.port(); + } + if (host == null) { + host = server.host(); + } + } else { + throw new IllegalArgumentException("Only socket address are currently supported"); + } + HostAndPort authority = HostAndPort.create(host, port); + ClientSSLOptions sslOptions = connect.getSslOptions(); + if (sslOptions == null) { + sslOptions = options.getSslOptions(); + } + ProxyOptions proxyOptions = computeProxyOptions(connect.getProxyOptions(), server); + ClientMetrics clientMetrics = metrics != null ? metrics.createEndpointMetrics(server, 1) : null; + Boolean ssl = connect.isSsl(); + boolean useSSL = ssl != null ? ssl : this.options.isSsl(); + boolean useAlpn = options.isUseAlpn(); + if (!useAlpn && useSSL && this.options.getProtocolVersion() == HttpVersion.HTTP_2) { + return vertx.getOrCreateContext().failedFuture("Must enable ALPN when using H2"); + } + checkClosed(); + HttpChannelConnector connector = new HttpChannelConnector( + this, + netClient, + sslOptions, + proxyOptions, + clientMetrics, + options.getProtocolVersion(), + useSSL, + useAlpn, + authority, + server); + return (Future) connector.httpConnect(vertx.getOrCreateContext()); + } + @Override public Future request(RequestOptions request) { Address addr = request.getServer(); @@ -298,9 +355,9 @@ private Future doRequest(Address server, Integer port, String // should we do that here ? it might create issues with address resolver that resolves this later if (host != null && port != null) { String peerHost = host; - if (peerHost.endsWith(".")) { - peerHost = peerHost.substring(0, peerHost.length() - 1); - } +// if (peerHost.endsWith(".")) { +// peerHost = peerHost.substring(0, peerHost.length() - 1); +// } authority = HostAndPort.create(peerHost, port); } else { authority = null; @@ -313,7 +370,7 @@ private Future doRequest( HttpMethod method, HostAndPort authority, Address server, - Boolean useSSL, + boolean useSSL, String requestURI, MultiMap headers, String traceOperation, @@ -333,7 +390,7 @@ private Future doRequest( ProxyOptions proxyOptions = computeProxyOptions(proxyConfig, address); EndpointKey key = new EndpointKey(useSSL, sslOptions, proxyOptions, address, authority != null ? authority : HostAndPort.create(address.host(), address.port())); return httpCM.withEndpointAsync(key, httpEndpointProvider(), (endpoint, created) -> { - Future> fut2 = endpoint.requestConnection(connCtx, connectTimeout); + Future> fut2 = endpoint.requestConnection(connCtx, connectTimeout); if (fut2 == null) { return null; } else { @@ -343,7 +400,7 @@ private Future doRequest( endpointRequest.reportFailure(ar.cause()); } }).compose(lease -> { - HttpClientConnection conn = lease.get(); + HttpClientConnectionInternal conn = lease.get(); return conn.createStream(ctx).map(stream -> { HttpClientStream wrapped = new StatisticsGatheringHttpClientStream(stream, endpointRequest); wrapped.closeHandler(v -> lease.recycle()); @@ -357,12 +414,12 @@ private Future doRequest( ProxyOptions proxyOptions = computeProxyOptions(proxyConfig, (SocketAddress) server); EndpointKey key = new EndpointKey(useSSL, sslOptions, proxyOptions, (SocketAddress) server, authority); future = httpCM.withEndpointAsync(key, httpEndpointProvider(), (endpoint, created) -> { - Future> fut = endpoint.requestConnection(connCtx, connectTimeout); + Future> fut = endpoint.requestConnection(connCtx, connectTimeout); if (fut == null) { return null; } else { return fut.compose(lease -> { - HttpClientConnection conn = lease.get(); + HttpClientConnectionInternal conn = lease.get(); return conn.createStream(ctx).map(stream -> { stream.closeHandler(v -> { lease.recycle(); @@ -379,7 +436,15 @@ private Future doRequest( return connCtx.failedFuture("Cannot resolve address " + server); } else { future.map(res -> { - return createRequest(res.stream, method, headers, requestURI, res.proxyOptions, useSSL, idleTimeout, followRedirects, traceOperation); + RequestOptions options = new RequestOptions(); + options.setMethod(method); + options.setHeaders(headers); + options.setURI(requestURI); + options.setProxyOptions(res.proxyOptions); + options.setIdleTimeout(idleTimeout); + options.setFollowRedirects(followRedirects); + options.setTraceOperation(traceOperation); + return createRequest(res.stream, options); }).onComplete(promise); return promise.future(); } @@ -394,15 +459,12 @@ public ConnectionObtainedResult(ProxyOptions proxyOptions, HttpClientStream stre } } - Future createRequest(HttpClientConnection conn, ContextInternal context) { - return conn.createStream(context).map(this::createRequest); - } - - private HttpClientRequest createRequest(HttpClientStream stream) { - HttpClientRequest request = new HttpClientRequestImpl(stream, stream.getContext().promise(), HttpMethod.GET, "/"); + HttpClientRequest createRequest(HttpClientStream stream, RequestOptions options) { + HttpClientRequestImpl request = new HttpClientRequestImpl(stream); + request.init(options); Function> rHandler = redirectHandler; if (rHandler != null) { - request.setMaxRedirects(options.getMaxRedirects()); + request.setMaxRedirects(this.options.getMaxRedirects()); request.redirectHandler(resp -> { Future fut_ = rHandler.apply(resp); if (fut_ != null) { @@ -414,44 +476,4 @@ private HttpClientRequest createRequest(HttpClientStream stream) { } return request; } - - private HttpClientRequest createRequest( - HttpClientStream stream, - HttpMethod method, - MultiMap headers, - String requestURI, - ProxyOptions proxyOptions, - Boolean useSSL, - long idleTimeout, - Boolean followRedirects, - String traceOperation) { - String u = requestURI; - HostAndPort authority = stream.connection().authority(); - if (proxyOptions != null && !useSSL && proxyOptions.getType() == ProxyType.HTTP) { - if (!ABS_URI_START_PATTERN.matcher(u).find()) { - int defaultPort = 80; - String addPort = (authority.port() != -1 && authority.port() != defaultPort) ? (":" + authority.port()) : ""; - u = (useSSL == Boolean.TRUE ? "https://" : "http://") + authority.host() + addPort + requestURI; - } - } - HttpClientRequest request = createRequest(stream); - request.setURI(u); - request.setMethod(method); - request.traceOperation(traceOperation); - request.setFollowRedirects(followRedirects == Boolean.TRUE); - if (headers != null) { - request.headers().setAll(headers); - } - if (proxyOptions != null && !useSSL && proxyOptions.getType() == ProxyType.HTTP) { - if (proxyOptions.getUsername() != null && proxyOptions.getPassword() != null) { - request.headers().add("Proxy-Authorization", "Basic " + Base64.getEncoder() - .encodeToString((proxyOptions.getUsername() + ":" + proxyOptions.getPassword()).getBytes())); - } - } - if (idleTimeout > 0L) { - // Maybe later ? - request.idleTimeout(idleTimeout); - } - return request; - } } diff --git a/src/main/java/io/vertx/core/http/impl/HttpClientInternal.java b/src/main/java/io/vertx/core/http/impl/HttpClientInternal.java index c02fa541e8d..a0cab590a2c 100644 --- a/src/main/java/io/vertx/core/http/impl/HttpClientInternal.java +++ b/src/main/java/io/vertx/core/http/impl/HttpClientInternal.java @@ -14,7 +14,6 @@ import io.vertx.core.Closeable; import io.vertx.core.Future; import io.vertx.core.http.*; -import io.vertx.core.impl.ContextInternal; import io.vertx.core.impl.VertxInternal; import io.vertx.core.net.HostAndPort; import io.vertx.core.net.SocketAddress; @@ -34,20 +33,4 @@ public interface HttpClientInternal extends HttpClient, MetricsProvider, Closeab Future closeFuture(); - /** - * Connect to a server. - * - * @param server the server address - */ - default Future connect(SocketAddress server) { - return connect(server, null); - } - - /** - * Connect to a server. - * - * @param server the server address - * @param peer the peer - */ - Future connect(SocketAddress server, HostAndPort peer); } diff --git a/src/main/java/io/vertx/core/http/impl/HttpClientRequestImpl.java b/src/main/java/io/vertx/core/http/impl/HttpClientRequestImpl.java index 42f73a39304..d736cdac745 100644 --- a/src/main/java/io/vertx/core/http/impl/HttpClientRequestImpl.java +++ b/src/main/java/io/vertx/core/http/impl/HttpClientRequestImpl.java @@ -22,11 +22,16 @@ import io.vertx.core.impl.future.PromiseInternal; import io.vertx.core.impl.logging.Logger; import io.vertx.core.impl.logging.LoggerFactory; +import io.vertx.core.net.HostAndPort; +import io.vertx.core.net.ProxyOptions; +import io.vertx.core.net.ProxyType; +import java.util.Base64; import java.util.Objects; import java.util.function.Function; import static io.vertx.core.http.HttpHeaders.CONTENT_LENGTH; +import static io.vertx.core.http.impl.HttpClientImpl.ABS_URI_START_PATTERN; /** * This class is optimised for performance when used on the same event loop that is passed to the handler with. @@ -62,9 +67,8 @@ public class HttpClientRequestImpl extends HttpClientRequestBase implements Http private boolean isConnect; private String traceOperation; - HttpClientRequestImpl(HttpClientStream stream, PromiseInternal responsePromise, HttpMethod method, - String requestURI) { - super(stream, responsePromise, method, requestURI); + HttpClientRequestImpl(HttpClientStream stream) { + super(stream, stream.getContext().promise(), HttpMethod.GET, "/"); this.chunked = false; this.endPromise = context.promise(); this.endFuture = endPromise.future(); @@ -78,6 +82,41 @@ public class HttpClientRequestImpl extends HttpClientRequestBase implements Http stream.exceptionHandler(this::handleException); } + public void init(RequestOptions options) { + MultiMap headers = options.getHeaders(); + if (headers != null) { + headers().setAll(headers); + } + HttpClientConnectionInternal conn = stream.connection(); + boolean useSSL = conn.isSsl(); + String requestURI = options.getURI(); + HttpMethod method = options.getMethod(); + String traceOperation = options.getTraceOperation(); + Boolean followRedirects = options.getFollowRedirects(); + long idleTimeout = options.getIdleTimeout(); + ProxyOptions proxyOptions = options.getProxyOptions(); + if (proxyOptions != null && !useSSL && proxyOptions.getType() == ProxyType.HTTP) { + HostAndPort authority = conn.authority(); + if (!ABS_URI_START_PATTERN.matcher(requestURI).find()) { + int defaultPort = 80; + String addPort = (authority.port() != -1 && authority.port() != defaultPort) ? (":" + authority.port()) : ""; + requestURI = "http://" + authority.host() + addPort + requestURI; + } + if (proxyOptions.getUsername() != null && proxyOptions.getPassword() != null) { + headers().add("Proxy-Authorization", "Basic " + Base64.getEncoder() + .encodeToString((proxyOptions.getUsername() + ":" + proxyOptions.getPassword()).getBytes())); + } + } + setURI(requestURI); + setMethod(method); + traceOperation(traceOperation); + setFollowRedirects(followRedirects == Boolean.TRUE); + if (idleTimeout > 0L) { + // Maybe later ? + idleTimeout(idleTimeout); + } + } + @Override void handleException(Throwable t) { t = mapException(t); @@ -279,7 +318,7 @@ private void tryComplete() { } @Override - public synchronized HttpConnection connection() { + public HttpClientConnection connection() { return stream.connection(); } diff --git a/src/main/java/io/vertx/core/http/impl/HttpClientRequestPushPromise.java b/src/main/java/io/vertx/core/http/impl/HttpClientRequestPushPromise.java index b0c1502026a..6bbe35a1abb 100644 --- a/src/main/java/io/vertx/core/http/impl/HttpClientRequestPushPromise.java +++ b/src/main/java/io/vertx/core/http/impl/HttpClientRequestPushPromise.java @@ -55,7 +55,7 @@ public HttpClientRequest exceptionHandler(Handler handler) { } @Override - public HttpConnection connection() { + public HttpClientConnection connection() { return stream.connection(); } diff --git a/src/main/java/io/vertx/core/http/impl/HttpClientStream.java b/src/main/java/io/vertx/core/http/impl/HttpClientStream.java index aa1c825cbd9..a9c5bdedcdf 100644 --- a/src/main/java/io/vertx/core/http/impl/HttpClientStream.java +++ b/src/main/java/io/vertx/core/http/impl/HttpClientStream.java @@ -18,12 +18,17 @@ import io.vertx.core.MultiMap; import io.vertx.core.buffer.Buffer; import io.vertx.core.buffer.impl.BufferInternal; -import io.vertx.core.http.HttpFrame; -import io.vertx.core.http.HttpVersion; -import io.vertx.core.http.StreamPriority; +import io.vertx.core.http.*; import io.vertx.core.impl.ContextInternal; +import io.vertx.core.net.HostAndPort; +import io.vertx.core.net.ProxyOptions; +import io.vertx.core.net.ProxyType; import io.vertx.core.streams.WriteStream; +import java.util.Base64; + +import static io.vertx.core.http.impl.HttpClientImpl.ABS_URI_START_PATTERN; + /** * @author Julien Viet */ @@ -44,7 +49,7 @@ public interface HttpClientStream extends WriteStream { */ HttpVersion version(); - HttpClientConnection connection(); + HttpClientConnectionInternal connection(); ContextInternal getContext(); Future writeHead(HttpRequestHead request, boolean chunked, ByteBuf buf, boolean end, StreamPriority priority, boolean connect); diff --git a/src/main/java/io/vertx/core/http/impl/SharedClientHttpStreamEndpoint.java b/src/main/java/io/vertx/core/http/impl/SharedClientHttpStreamEndpoint.java index 2541abc3277..1afed55b5e3 100644 --- a/src/main/java/io/vertx/core/http/impl/SharedClientHttpStreamEndpoint.java +++ b/src/main/java/io/vertx/core/http/impl/SharedClientHttpStreamEndpoint.java @@ -33,19 +33,19 @@ /** * @author Julien Viet */ -class SharedClientHttpStreamEndpoint extends ClientHttpEndpointBase> implements PoolConnector { +class SharedClientHttpStreamEndpoint extends ClientHttpEndpointBase> implements PoolConnector { /** * LIFO pool selector. */ - private static final BiFunction, List>, PoolConnection> LIFO_SELECTOR = (waiter, connections) -> { + private static final BiFunction, List>, PoolConnection> LIFO_SELECTOR = (waiter, connections) -> { int size = connections.size(); - PoolConnection selected = null; + PoolConnection selected = null; long last = 0L; for (int i = 0; i < size; i++) { - PoolConnection pooled = connections.get(i); + PoolConnection pooled = connections.get(i); if (pooled.available() > 0) { - HttpClientConnection conn = pooled.get(); + HttpClientConnectionInternal conn = pooled.get(); if (selected == null) { selected = pooled; } else { @@ -60,7 +60,7 @@ class SharedClientHttpStreamEndpoint extends ClientHttpEndpointBase pool; + private final ConnectionPool pool; public SharedClientHttpStreamEndpoint(HttpClientImpl client, ClientMetrics metrics, @@ -71,7 +71,7 @@ public SharedClientHttpStreamEndpoint(HttpClientImpl client, Runnable dispose) { super(metrics, dispose); - ConnectionPool pool = ConnectionPool.pool(this, new int[]{http1MaxSize, http2MaxSize}, queueMaxSize) + ConnectionPool pool = ConnectionPool.pool(this, new int[]{http1MaxSize, http2MaxSize}, queueMaxSize) .connectionSelector(LIFO_SELECTOR).contextProvider(client.contextProvider()); this.client = client; @@ -80,7 +80,7 @@ public SharedClientHttpStreamEndpoint(HttpClientImpl client, } @Override - public Future> connect(ContextInternal context, Listener listener) { + public Future> connect(ContextInternal context, Listener listener) { return connector .httpConnect(context) .map(connection -> { @@ -106,28 +106,28 @@ public Future> connect(ContextInternal conte } @Override - public boolean isValid(HttpClientConnection connection) { + public boolean isValid(HttpClientConnectionInternal connection) { return connection.isValid(); } protected void checkExpired() { pool.evict(conn -> !conn.isValid(), ar -> { if (ar.succeeded()) { - List lst = ar.result(); + List lst = ar.result(); lst.forEach(HttpConnection::close); } }); } - private class Request implements PoolWaiter.Listener, Handler>> { + private class Request implements PoolWaiter.Listener, Handler>> { private final ContextInternal context; private final HttpVersion protocol; private final long timeout; - private final Promise> promise; + private final Promise> promise; private long timerID; - Request(ContextInternal context, HttpVersion protocol, long timeout, Promise> promise) { + Request(ContextInternal context, HttpVersion protocol, long timeout, Promise> promise) { this.context = context; this.protocol = protocol; this.timeout = timeout; @@ -136,12 +136,12 @@ private class Request implements PoolWaiter.Listener, Hand } @Override - public void onEnqueue(PoolWaiter waiter) { + public void onEnqueue(PoolWaiter waiter) { onConnect(waiter); } @Override - public void onConnect(PoolWaiter waiter) { + public void onConnect(PoolWaiter waiter) { if (timeout > 0L && timerID == -1L) { timerID = context.setTimer(timeout, id -> { pool.cancel(waiter, ar -> { @@ -154,7 +154,7 @@ public void onConnect(PoolWaiter waiter) { } @Override - public void handle(AsyncResult> ar) { + public void handle(AsyncResult> ar) { if (timerID >= 0) { context.owner().cancelTimer(timerID); } @@ -167,8 +167,8 @@ void acquire() { } @Override - protected Future> requestConnection2(ContextInternal ctx, long timeout) { - PromiseInternal> promise = ctx.promise(); + protected Future> requestConnection2(ContextInternal ctx, long timeout) { + PromiseInternal> promise = ctx.promise(); Request request = new Request(ctx, client.options().getProtocolVersion(), timeout, promise); request.acquire(); return promise.future(); diff --git a/src/main/java/io/vertx/core/http/impl/StatisticsGatheringHttpClientStream.java b/src/main/java/io/vertx/core/http/impl/StatisticsGatheringHttpClientStream.java index 3e540249432..e6f21279927 100644 --- a/src/main/java/io/vertx/core/http/impl/StatisticsGatheringHttpClientStream.java +++ b/src/main/java/io/vertx/core/http/impl/StatisticsGatheringHttpClientStream.java @@ -60,7 +60,7 @@ public HttpVersion version() { } @Override - public HttpClientConnection connection() { + public HttpClientConnectionInternal connection() { return delegate.connection(); } diff --git a/src/main/java/io/vertx/core/http/impl/WebSocketEndpoint.java b/src/main/java/io/vertx/core/http/impl/WebSocketEndpoint.java index fcfb42ca192..f4bc7c7b4a3 100644 --- a/src/main/java/io/vertx/core/http/impl/WebSocketEndpoint.java +++ b/src/main/java/io/vertx/core/http/impl/WebSocketEndpoint.java @@ -22,11 +22,11 @@ * * @author Julien Viet */ -class WebSocketEndpoint extends ClientHttpEndpointBase { +class WebSocketEndpoint extends ClientHttpEndpointBase { private static class Waiter { - final Promise promise; + final Promise promise; final ContextInternal context; Waiter(ContextInternal context) { @@ -59,14 +59,14 @@ private void onEvict() { tryConnect(h.context).onComplete(h.promise); } - private Future tryConnect(ContextInternal ctx) { + private Future tryConnect(ContextInternal ctx) { ContextInternal eventLoopContext; if (ctx.isEventLoopContext()) { eventLoopContext = ctx; } else { eventLoopContext = ctx.owner().createEventLoopContext(ctx.nettyEventLoop(), ctx.workerPool(), ctx.classLoader()); } - Future fut = connector.httpConnect(eventLoopContext); + Future fut = connector.httpConnect(eventLoopContext); return fut.map(c -> { if (incRefCount()) { c.evictionHandler(v -> onEvict()); @@ -79,7 +79,7 @@ private Future tryConnect(ContextInternal ctx) { } @Override - protected Future requestConnection2(ContextInternal ctx, long timeout) { + protected Future requestConnection2(ContextInternal ctx, long timeout) { synchronized (this) { if (inflightConnections >= maxPoolSize) { Waiter waiter = new Waiter(ctx); diff --git a/src/main/java/io/vertx/core/spi/metrics/HttpClientMetrics.java b/src/main/java/io/vertx/core/spi/metrics/HttpClientMetrics.java index 9168b3ee4a3..bb2c71a19aa 100644 --- a/src/main/java/io/vertx/core/spi/metrics/HttpClientMetrics.java +++ b/src/main/java/io/vertx/core/spi/metrics/HttpClientMetrics.java @@ -41,7 +41,7 @@ public interface HttpClientMetrics extends TCPMetrics { * Provides metrics for a particular endpoint * * @param remoteAddress the endpoint remote address - * @param maxPoolSize the server max pool size + * @param maxPoolSize the client max pool size * @return the endpoint metric */ default ClientMetrics createEndpointMetrics(SocketAddress remoteAddress, int maxPoolSize) { diff --git a/src/test/java/io/vertx/core/http/Http1xClientConnectionTest.java b/src/test/java/io/vertx/core/http/Http1xClientConnectionTest.java index b10728dd851..cc4f821f84e 100644 --- a/src/test/java/io/vertx/core/http/Http1xClientConnectionTest.java +++ b/src/test/java/io/vertx/core/http/Http1xClientConnectionTest.java @@ -13,6 +13,7 @@ import io.netty.buffer.Unpooled; import io.vertx.core.MultiMap; import io.vertx.core.Promise; +import io.vertx.core.http.impl.HttpClientConnectionInternal; import io.vertx.core.http.impl.HttpRequestHead; import io.vertx.core.impl.ContextInternal; import org.junit.Test; @@ -27,12 +28,13 @@ public void testResetStreamBeforeSend() throws Exception { server.requestHandler(req -> { }); startServer(testAddress); - client.connect(testAddress).onComplete(onSuccess(conn -> { + client.connect(new HttpConnectOptions().setServer(testAddress)).onComplete(onSuccess(conn -> { + HttpClientConnectionInternal ci = (HttpClientConnectionInternal) conn; AtomicInteger evictions = new AtomicInteger(); - conn.evictionHandler(v -> { + ci.evictionHandler(v -> { evictions.incrementAndGet(); }); - conn + ci .createStream((ContextInternal) vertx.getOrCreateContext()) .onComplete(onSuccess(stream -> { Exception cause = new Exception(); @@ -54,12 +56,13 @@ public void testResetStreamRequestSent() throws Exception { continuation.complete(); }); startServer(testAddress); - client.connect(testAddress).onComplete(onSuccess(conn -> { + client.connect(new HttpConnectOptions().setServer(testAddress)).onComplete(onSuccess(conn -> { + HttpClientConnectionInternal ci = (HttpClientConnectionInternal) conn; AtomicInteger evictions = new AtomicInteger(); - conn.evictionHandler(v -> { + ci.evictionHandler(v -> { evictions.incrementAndGet(); }); - conn + ci .createStream((ContextInternal) vertx.getOrCreateContext()) .onComplete(onSuccess(stream -> { Exception cause = new Exception(); @@ -86,12 +89,13 @@ public void testServerConnectionClose() throws Exception { req.response().putHeader("Connection", "close").end(); }); startServer(testAddress); - client.connect(testAddress).onComplete(onSuccess(conn -> { + client.connect(new HttpConnectOptions().setServer(testAddress)).onComplete(onSuccess(conn -> { + HttpClientConnectionInternal ci = (HttpClientConnectionInternal) conn; AtomicInteger evictions = new AtomicInteger(); - conn.evictionHandler(v -> { + ci.evictionHandler(v -> { assertEquals(1, evictions.incrementAndGet()); }); - conn.createStream((ContextInternal) vertx.getOrCreateContext()).onComplete(onSuccess(stream -> { + ci.createStream((ContextInternal) vertx.getOrCreateContext()).onComplete(onSuccess(stream -> { stream.closeHandler(v -> { assertEquals(1, evictions.get()); complete(); diff --git a/src/test/java/io/vertx/core/http/Http2ClientTest.java b/src/test/java/io/vertx/core/http/Http2ClientTest.java index c7f77fea5b3..96db38a9b50 100644 --- a/src/test/java/io/vertx/core/http/Http2ClientTest.java +++ b/src/test/java/io/vertx/core/http/Http2ClientTest.java @@ -28,7 +28,7 @@ import io.vertx.core.buffer.Buffer; import io.vertx.core.buffer.impl.BufferInternal; import io.vertx.core.http.impl.Http2UpgradeClientConnection; -import io.vertx.core.http.impl.HttpClientConnection; +import io.vertx.core.http.impl.HttpClientConnectionInternal; import io.vertx.core.net.NetServer; import io.vertx.core.net.NetSocket; import io.vertx.core.net.SocketAddress; @@ -1694,7 +1694,7 @@ public void onHeadersRead(ChannelHandlerContext ctx, int streamId, Http2Headers assertEquals(HttpVersion.HTTP_2, resp1.version()); client.request(requestOptions).onComplete(onSuccess(req2 -> { req2.send().onComplete(onSuccess(resp2 -> { - assertSame(((HttpClientConnection)conn).channel(), ((HttpClientConnection)resp2.request().connection()).channel()); + assertSame(((HttpClientConnectionInternal)conn).channel(), ((HttpClientConnectionInternal)resp2.request().connection()).channel()); testComplete(); })); })); diff --git a/src/test/java/io/vertx/core/http/HttpClientConnectionTest.java b/src/test/java/io/vertx/core/http/HttpClientConnectionTest.java index 6655a92dc45..abd17fe5827 100644 --- a/src/test/java/io/vertx/core/http/HttpClientConnectionTest.java +++ b/src/test/java/io/vertx/core/http/HttpClientConnectionTest.java @@ -12,6 +12,7 @@ import io.netty.buffer.Unpooled; import io.vertx.core.MultiMap; +import io.vertx.core.http.impl.HttpClientConnectionInternal; import io.vertx.core.http.impl.HttpClientInternal; import io.vertx.core.http.impl.HttpRequestHead; import io.vertx.core.impl.ContextInternal; @@ -23,13 +24,11 @@ public abstract class HttpClientConnectionTest extends HttpTestBase { protected HttpClientInternal client; - protected HostAndPort peerAddress; @Override public void setUp() throws Exception { super.setUp(); this.client = (HttpClientInternal) super.client; - this.peerAddress = HostAndPort.create(requestOptions.getHost(), requestOptions.getPort()); } @Test @@ -38,8 +37,8 @@ public void testGet() throws Exception { req.response().end("Hello World"); }); startServer(testAddress); - client.connect(testAddress, peerAddress) - .compose(conn -> conn.createRequest((ContextInternal) vertx.getOrCreateContext())) + client.connect(new HttpConnectOptions().setServer(testAddress).setHost(requestOptions.getHost()).setPort(requestOptions.getPort())) + .compose(HttpClientConnection::createRequest) .compose(request -> request .send() .andThen(onSuccess(resp -> assertEquals(200, resp.statusCode()))) @@ -58,8 +57,8 @@ public void testStreamGet() throws Exception { req.response().end("Hello World"); }); startServer(testAddress); - client.connect(testAddress, peerAddress).onComplete(onSuccess(conn -> { - conn + client.connect(new HttpConnectOptions().setServer(testAddress).setHost(requestOptions.getHost()).setPort(requestOptions.getPort())).onComplete(onSuccess(conn -> { + ((HttpClientConnectionInternal)conn) .createStream((ContextInternal) vertx.getOrCreateContext()) .onComplete(onSuccess(stream -> { stream.writeHead(new HttpRequestHead( @@ -87,13 +86,14 @@ public void testConnectionClose() throws Exception { req.connection().close(); }); startServer(testAddress); - client.connect(testAddress, peerAddress).onComplete(onSuccess(conn -> { + client.connect(new HttpConnectOptions().setServer(testAddress).setHost(requestOptions.getHost()).setPort(requestOptions.getPort())).onComplete(onSuccess(conn -> { + HttpClientConnectionInternal ci = ((HttpClientConnectionInternal)conn); AtomicInteger evictions = new AtomicInteger(); - conn.evictionHandler(v -> { + ci.evictionHandler(v -> { assertEquals(1, evictions.incrementAndGet()); complete(); }); - conn.createStream((ContextInternal) vertx.getOrCreateContext()).onComplete(onSuccess(stream -> { + ci.createStream((ContextInternal) vertx.getOrCreateContext()).onComplete(onSuccess(stream -> { stream.writeHead(new HttpRequestHead( HttpMethod.GET, "/", MultiMap.caseInsensitiveMultiMap(), DEFAULT_HTTP_HOST_AND_PORT, "", null), false, Unpooled.EMPTY_BUFFER, true, new StreamPriority(), false); stream.headHandler(resp -> { diff --git a/src/test/java/io/vertx/core/http/HttpTest.java b/src/test/java/io/vertx/core/http/HttpTest.java index 1be752e980f..975e410192a 100644 --- a/src/test/java/io/vertx/core/http/HttpTest.java +++ b/src/test/java/io/vertx/core/http/HttpTest.java @@ -4174,7 +4174,7 @@ class MockReq implements HttpClientRequest { public HttpClientRequest pushHandler(Handler handler) { throw new UnsupportedOperationException(); } public boolean reset(long code) { return false; } public boolean reset(long code, Throwable cause) { return false; } - public HttpConnection connection() { throw new UnsupportedOperationException(); } + public HttpClientConnection connection() { throw new UnsupportedOperationException(); } public Future writeCustomFrame(int type, int flags, Buffer payload) { throw new UnsupportedOperationException(); } public boolean writeQueueFull() { throw new UnsupportedOperationException(); } public StreamPriority getStreamPriority() { return null; }