Skip to content

Commit

Permalink
The HTTP client connection handle buffer writes from a non vertx thre…
Browse files Browse the repository at this point in the history
…ad and will re-schedule any message sent by the event-loop thread to preserve ordering. However this is not applied to the initial message is sent and therefore it is possible to have the initial message written after the subsequent HTTP messages.

Apply the queueing pattern to the initial message written by the HTTP client connection to avoi this problem.
  • Loading branch information
vietj committed Jan 12, 2024
1 parent a35dd09 commit cf50ac4
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 17 deletions.
13 changes: 11 additions & 2 deletions src/main/java/io/vertx/core/http/impl/Http2ClientConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import io.vertx.core.impl.ContextInternal;
import io.vertx.core.impl.future.PromiseInternal;
import io.vertx.core.net.HostAndPort;
import io.vertx.core.net.impl.MessageWrite;
import io.vertx.core.spi.metrics.ClientMetrics;
import io.vertx.core.spi.metrics.HttpClientMetrics;
import io.vertx.core.spi.tracing.SpanKind;
Expand Down Expand Up @@ -548,9 +549,17 @@ void handleException(Throwable exception) {
public Future<Void> writeHead(HttpRequestHead request, boolean chunked, ByteBuf buf, boolean end, StreamPriority priority, boolean connect) {
priority(priority);
PromiseInternal<Void> promise = context.promise();
conn.context.emit(null, v -> {
writeHeaders(request, buf, end, priority, connect, promise);
messageQueue.write(new MessageWrite() {
@Override
public void write() {
writeHeaders(request, buf, end, priority, connect, promise);
}
@Override
public void cancel(Throwable cause) {
promise.fail(cause);
}
});

return promise.future();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ abstract class VertxHttp2Stream<C extends Http2ConnectionBase> {

private static final MultiMap EMPTY = new Http2HeadersAdaptor(EmptyHttp2Headers.INSTANCE);

private final OutboundMessageQueue<MessageWrite> messageQueue;
protected final OutboundMessageQueue<MessageWrite> messageQueue;
protected final C conn;
protected final VertxInternal vertx;
protected final ContextInternal context;
Expand Down
55 changes: 41 additions & 14 deletions src/test/java/io/vertx/core/http/HttpTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -6604,19 +6604,10 @@ private void testDnsClientSideLoadBalancing(boolean enabled) throws Exception {
}

@Test
public void testConcurrentWrites() throws Exception {
waitFor(1);
AtomicReference<String> received = new AtomicReference<>();
server.requestHandler(req -> req.body()
.onSuccess(buffer -> {
received.set(buffer.toString());
req.response().end();
}));
startServer(testAddress);
client.close();
client = vertx.createHttpClient(createBaseClientOptions());
client.request(requestOptions)
.compose(req -> req.setChunked(true).sendHead().compose(v -> {
public void testConcurrentWrites1() throws Exception {
testConcurrentWrites(req -> req
.sendHead()
.compose(v -> {
AtomicBoolean latch = new AtomicBoolean(false);
new Thread(() -> {
req.write("msg1");
Expand All @@ -6628,7 +6619,43 @@ public void testConcurrentWrites() throws Exception {
req.write("msg2");
req.end();
return req.response();
}))
}));
}

@Test
public void testConcurrentWrites2() throws Exception {
testConcurrentWrites(req -> {
AtomicBoolean latch = new AtomicBoolean(false);
new Thread(() -> {
req.sendHead();
latch.set(true); // Release Event-loop thread
}).start();
// Active wait for the event to be published
while (!latch.get()) {
}
req.write("msg1");
req.write("msg2");
req.end();
return req.response();
});
}

private void testConcurrentWrites(Function<HttpClientRequest, Future<HttpClientResponse>> action) throws Exception {
waitFor(1);
AtomicReference<String> received = new AtomicReference<>();
server.requestHandler(req -> req.body()
.onSuccess(buffer -> {
received.set(buffer.toString());
req.response().end();
}));
startServer(testAddress);
client.close();
client = vertx.createHttpClient(createBaseClientOptions());
client.request(requestOptions)
.compose(req -> {
req.setChunked(true);
return action.apply(req);
})
.onComplete(onSuccess(resp -> complete()));
await();
assertEquals("msg1msg2", received.get());
Expand Down

0 comments on commit cf50ac4

Please sign in to comment.