diff --git a/src/main/java/io/vertx/core/http/impl/Http2ClientConnection.java b/src/main/java/io/vertx/core/http/impl/Http2ClientConnection.java index 747f5bc9e04..8539b784b92 100644 --- a/src/main/java/io/vertx/core/http/impl/Http2ClientConnection.java +++ b/src/main/java/io/vertx/core/http/impl/Http2ClientConnection.java @@ -25,6 +25,7 @@ import io.vertx.core.impl.ContextInternal; import io.vertx.core.impl.future.PromiseInternal; import io.vertx.core.net.HostAndPort; +import io.vertx.core.net.impl.MessageWrite; import io.vertx.core.spi.metrics.ClientMetrics; import io.vertx.core.spi.metrics.HttpClientMetrics; import io.vertx.core.spi.tracing.SpanKind; @@ -548,9 +549,17 @@ void handleException(Throwable exception) { public Future writeHead(HttpRequestHead request, boolean chunked, ByteBuf buf, boolean end, StreamPriority priority, boolean connect) { priority(priority); PromiseInternal promise = context.promise(); - conn.context.emit(null, v -> { - writeHeaders(request, buf, end, priority, connect, promise); + messageQueue.write(new MessageWrite() { + @Override + public void write() { + writeHeaders(request, buf, end, priority, connect, promise); + } + @Override + public void cancel(Throwable cause) { + promise.fail(cause); + } }); + return promise.future(); } diff --git a/src/main/java/io/vertx/core/http/impl/VertxHttp2Stream.java b/src/main/java/io/vertx/core/http/impl/VertxHttp2Stream.java index a0e79239e52..838fdc8f5c3 100644 --- a/src/main/java/io/vertx/core/http/impl/VertxHttp2Stream.java +++ b/src/main/java/io/vertx/core/http/impl/VertxHttp2Stream.java @@ -38,7 +38,7 @@ abstract class VertxHttp2Stream { private static final MultiMap EMPTY = new Http2HeadersAdaptor(EmptyHttp2Headers.INSTANCE); - private final OutboundMessageQueue messageQueue; + protected final OutboundMessageQueue messageQueue; protected final C conn; protected final VertxInternal vertx; protected final ContextInternal context; diff --git a/src/test/java/io/vertx/core/http/HttpTest.java b/src/test/java/io/vertx/core/http/HttpTest.java index 0b16f72c07a..1be752e980f 100644 --- a/src/test/java/io/vertx/core/http/HttpTest.java +++ b/src/test/java/io/vertx/core/http/HttpTest.java @@ -6604,19 +6604,10 @@ private void testDnsClientSideLoadBalancing(boolean enabled) throws Exception { } @Test - public void testConcurrentWrites() throws Exception { - waitFor(1); - AtomicReference received = new AtomicReference<>(); - server.requestHandler(req -> req.body() - .onSuccess(buffer -> { - received.set(buffer.toString()); - req.response().end(); - })); - startServer(testAddress); - client.close(); - client = vertx.createHttpClient(createBaseClientOptions()); - client.request(requestOptions) - .compose(req -> req.setChunked(true).sendHead().compose(v -> { + public void testConcurrentWrites1() throws Exception { + testConcurrentWrites(req -> req + .sendHead() + .compose(v -> { AtomicBoolean latch = new AtomicBoolean(false); new Thread(() -> { req.write("msg1"); @@ -6628,7 +6619,43 @@ public void testConcurrentWrites() throws Exception { req.write("msg2"); req.end(); return req.response(); - })) + })); + } + + @Test + public void testConcurrentWrites2() throws Exception { + testConcurrentWrites(req -> { + AtomicBoolean latch = new AtomicBoolean(false); + new Thread(() -> { + req.sendHead(); + latch.set(true); // Release Event-loop thread + }).start(); + // Active wait for the event to be published + while (!latch.get()) { + } + req.write("msg1"); + req.write("msg2"); + req.end(); + return req.response(); + }); + } + + private void testConcurrentWrites(Function> action) throws Exception { + waitFor(1); + AtomicReference received = new AtomicReference<>(); + server.requestHandler(req -> req.body() + .onSuccess(buffer -> { + received.set(buffer.toString()); + req.response().end(); + })); + startServer(testAddress); + client.close(); + client = vertx.createHttpClient(createBaseClientOptions()); + client.request(requestOptions) + .compose(req -> { + req.setChunked(true); + return action.apply(req); + }) .onComplete(onSuccess(resp -> complete())); await(); assertEquals("msg1msg2", received.get());