Skip to content

Commit

Permalink
Replace the HTTP2 client stream outbound backpressure workaround with…
Browse files Browse the repository at this point in the history
… the use of the outbound write queue
  • Loading branch information
vietj committed Jan 12, 2024
1 parent cf50ac4 commit 80dde74
Showing 1 changed file with 7 additions and 28 deletions.
35 changes: 7 additions & 28 deletions src/main/java/io/vertx/core/http/impl/Http2ClientConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -242,14 +242,11 @@ static abstract class Stream extends VertxHttp2Stream<Http2ClientConnection> {
protected Handler<Throwable> exceptionHandler;
protected Handler<HttpClientPush> pushHandler;
protected Handler<Void> closeHandler;
protected long writeWindow;
protected final long windowSize;

Stream(Http2ClientConnection conn, ContextInternal context, boolean push) {
super(conn, context);

this.push = push;
this.windowSize = conn.getWindowSize();
}

void onContinue() {
Expand Down Expand Up @@ -450,8 +447,8 @@ public boolean writeQueueFull() {
}

@Override
public synchronized boolean isNotWritable() {
return writeWindow > windowSize;
public boolean isNotWritable() {
return !messageQueue.isWritable();
}

@Override
Expand Down Expand Up @@ -510,6 +507,10 @@ void handleReset(long errorCode) {

@Override
void handleWriteQueueDrained() {
Handler<Void> handler = drainHandler;
if (handler != null) {
context.dispatch(null, handler);
}
}

@Override
Expand Down Expand Up @@ -634,29 +635,7 @@ private void createStream(HttpRequestHead head, Http2Headers headers) throws Htt
public Future<Void> writeBuffer(ByteBuf buf, boolean end) {
Promise<Void> promise = context.promise();
writeData(buf, end, promise);
Future<Void> fut = promise.future();
if (buf != null) {
int size = buf.readableBytes();
synchronized (this) {
writeWindow += size;
}
fut = fut.andThen(ar -> {
Handler<Void> drainHandler;
synchronized (this) {
boolean full = writeWindow > windowSize;
writeWindow -= size;
if (full && writeWindow <= windowSize) {
drainHandler = this.drainHandler;
} else {
drainHandler = null;
}
}
if (drainHandler != null) {
drainHandler.handle(null);
}
});
}
return fut;
return promise.future();
}

@Override
Expand Down

0 comments on commit 80dde74

Please sign in to comment.