Skip to content

Commit

Permalink
Replace usage of InboundBuffer by InboundMessageQueue in VertxHttp2St…
Browse files Browse the repository at this point in the history
…ream
  • Loading branch information
vietj committed Jun 4, 2024
1 parent fc3c4da commit d84b366
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -450,7 +450,7 @@ public boolean writeQueueFull() {

@Override
public boolean isNotWritable() {
return !messageQueue.isWritable();
return !isWritable();
}

@Override
Expand Down Expand Up @@ -552,7 +552,7 @@ void handleException(Throwable exception) {
public Future<Void> writeHead(HttpRequestHead request, boolean chunked, ByteBuf buf, boolean end, StreamPriority priority, boolean connect) {
priority(priority);
PromiseInternal<Void> promise = context.promise();
messageQueue.write(new MessageWrite() {
write(new MessageWrite() {
@Override
public void write() {
writeHeaders(request, buf, end, priority, connect, promise);
Expand All @@ -562,7 +562,6 @@ public void cancel(Throwable cause) {
promise.fail(cause);
}
});

return promise.future();
}

Expand Down
72 changes: 40 additions & 32 deletions src/main/java/io/vertx/core/http/impl/VertxHttp2Stream.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,9 @@
import io.vertx.core.http.impl.headers.Http2HeadersAdaptor;
import io.vertx.core.impl.ContextInternal;
import io.vertx.core.impl.VertxInternal;
import io.vertx.core.net.impl.InboundMessageQueue;
import io.vertx.core.net.impl.OutboundMessageQueue;
import io.vertx.core.net.impl.MessageWrite;
import io.vertx.core.streams.impl.InboundBuffer;

/**
* @author <a href="mailto:[email protected]">Julien Viet</a>
Expand All @@ -38,7 +38,8 @@ abstract class VertxHttp2Stream<C extends Http2ConnectionBase> {

private static final MultiMap EMPTY = new Http2HeadersAdaptor(EmptyHttp2Headers.INSTANCE);

protected final OutboundMessageQueue<MessageWrite> messageQueue;
private final OutboundMessageQueue<MessageWrite> outboundQueue;
private final InboundMessageQueue<Object> inboundQueue;
protected final C conn;
protected final VertxInternal vertx;
protected final ContextInternal context;
Expand All @@ -47,7 +48,6 @@ abstract class VertxHttp2Stream<C extends Http2ConnectionBase> {
// Client context
private boolean writable;
private StreamPriority priority;
private final InboundBuffer<Object> pending;
private long bytesRead;
private long bytesWritten;
protected boolean isConnect;
Expand All @@ -57,11 +57,29 @@ abstract class VertxHttp2Stream<C extends Http2ConnectionBase> {
this.conn = conn;
this.vertx = conn.vertx();
this.context = context;
this.pending = new InboundBuffer<>(context, 5);
this.inboundQueue = new InboundMessageQueue<>(conn.channel().eventLoop(), context) {
@Override
protected void handleMessage(Object item) {
if (item instanceof MultiMap) {
handleEnd((MultiMap) item);
} else {
Buffer data = (Buffer) item;
int len = data.length();
conn.getContext().emit(null, v -> {
if (stream.state().remoteSideOpen()) {
// Handle the HTTP upgrade case
// buffers are received by HTTP/1 and not accounted by HTTP/2
conn.consumeCredits(stream, len);
}
});
handleData(data);
}
}
};
this.priority = HttpUtils.DEFAULT_STREAM_PRIORITY;
this.isConnect = false;
this.writable = true;
this.messageQueue = new OutboundMessageQueue<>(conn.getContext().nettyEventLoop()) {
this.outboundQueue = new OutboundMessageQueue<>(conn.getContext().nettyEventLoop()) {
// TODO implement stop drain to optimize flushes ?
@Override
public boolean test(MessageWrite msg) {
Expand All @@ -85,24 +103,6 @@ protected void writeQueueDrained() {
context.emit(VertxHttp2Stream.this, VertxHttp2Stream::handleWriteQueueDrained);
}
};
pending.handler(item -> {
if (item instanceof MultiMap) {
handleEnd((MultiMap) item);
} else {
Buffer data = (Buffer) item;
int len = data.length();
conn.getContext().emit(null, v -> {
if (stream.state().remoteSideOpen()) {
// Handle the HTTP upgrade case
// buffers are received by HTTP/1 and not accounted by HTTP/2
conn.consumeCredits(this.stream, len);
}
});
handleData(data);
}
});
pending.exceptionHandler(context::reportException);
pending.resume();
}

void init(Http2Stream stream) {
Expand All @@ -116,7 +116,7 @@ void init(Http2Stream stream) {
void onClose() {
conn.flushBytesWritten();
context.execute(ex -> handleClose());
messageQueue.close();
outboundQueue.close();
}

void onException(Throwable cause) {
Expand Down Expand Up @@ -147,13 +147,13 @@ void onHeaders(Http2Headers headers, StreamPriority streamPriority) {
void onData(Buffer data) {
bytesRead += data.length();
conn.reportBytesRead(data.length());
context.execute(data, pending::write);
inboundQueue.write(data);
}

void onWritabilityChanged() {
writable = !writable;
if (writable) {
messageQueue.drain();
outboundQueue.drain();
}
}

Expand All @@ -163,7 +163,7 @@ void onEnd() {

void onEnd(MultiMap trailers) {
conn.flushBytesRead();
context.emit(trailers, pending::write);
inboundQueue.write(trailers);
}

public int id() {
Expand All @@ -178,16 +178,24 @@ long bytesRead() {
return bytesRead;
}

public boolean isWritable() {
return outboundQueue.isWritable();
}

public void write(MessageWrite write) {
outboundQueue.write(write);
}

public void doPause() {
pending.pause();
inboundQueue.pause();
}

public void doFetch(long amount) {
pending.fetch(amount);
inboundQueue.fetch(amount);
}

public boolean isNotWritable() {
return !messageQueue.isWritable();
return !outboundQueue.isWritable();
}

public final Future<Void> writeFrame(int type, int flags, ByteBuf payload) {
Expand Down Expand Up @@ -223,7 +231,7 @@ final void writeHeaders(Http2Headers headers, boolean first, boolean end, boolea
eventLoop.execute(() -> doWriteHeaders(headers, end, checkFlush, promise));
}
} else {
messageQueue.write(new MessageWrite() {
outboundQueue.write(new MessageWrite() {
@Override
public void write() {
doWriteHeaders(headers, end, checkFlush, promise);
Expand Down Expand Up @@ -251,7 +259,7 @@ private void writePriorityFrame(StreamPriority priority) {
}

final void writeData(ByteBuf chunk, boolean end, Promise<Void> promise) {
messageQueue.write(new MessageWrite() {
outboundQueue.write(new MessageWrite() {
@Override
public void write() {
doWriteData(chunk, end, promise);
Expand Down
2 changes: 0 additions & 2 deletions src/test/java/io/vertx/core/http/HttpTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -3424,9 +3424,7 @@ public void start(Promise<Void> startPromise) {
req.response().end();
}));
req.pause();
System.out.println("pause " + this);
vertx.setTimer(10, id -> {
System.out.println("resume " + this);
req.resume();
});
}).listen(testAddress)
Expand Down

0 comments on commit d84b366

Please sign in to comment.