Skip to content

Commit

Permalink
Merge upstream
Browse files Browse the repository at this point in the history
  • Loading branch information
corretto-github-robot committed Aug 8, 2024
2 parents 938108b + 3acdebe commit 360c1bd
Show file tree
Hide file tree
Showing 18 changed files with 428 additions and 148 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -329,7 +329,11 @@ private Http2Connection(HttpConnection connection,
Log.logTrace("Connection send window size {0} ", windowController.connectionWindowSize());

Stream<?> initialStream = createStream(exchange);
initialStream.registerStream(1);
boolean opened = initialStream.registerStream(1, true);
if (debug.on() && !opened) {
debug.log("Initial stream was cancelled - but connection is maintained: " +
"reset frame will need to be sent later");
}
windowController.registerStream(1, getInitialSendWindowSize());
initialStream.requestSent();
// Upgrading:
Expand All @@ -338,6 +342,11 @@ private Http2Connection(HttpConnection connection,
this.initial = initial;
connectFlows(connection);
sendConnectionPreface();
if (!opened) {
debug.log("ensure reset frame is sent to cancel initial stream");
initialStream.sendCancelStreamFrame();
}

}

// Used when upgrading an HTTP/1.1 connection to HTTP/2 after receiving
Expand Down Expand Up @@ -847,7 +856,7 @@ private <T> void handlePushPromise(Stream<T> parent, PushPromiseFrame pp)
Exchange<T> pushExch = new Exchange<>(pushReq, parent.exchange.multi);
Stream.PushedStream<T> pushStream = createPushStream(parent, pushExch);
pushExch.exchImpl = pushStream;
pushStream.registerStream(promisedStreamid);
pushStream.registerStream(promisedStreamid, true);
parent.incoming_pushPromise(pushReq, pushStream);
}

Expand All @@ -872,14 +881,15 @@ private void handleConnectionFrame(Http2Frame frame)
}
}

void resetStream(int streamid, int code) throws IOException {
void resetStream(int streamid, int code) {
try {
if (connection.channel().isOpen()) {
// no need to try & send a reset frame if the
// connection channel is already closed.
Log.logError(
"Resetting stream {0,number,integer} with error code {1,number,integer}",
streamid, code);
markStream(streamid, code);
ResetFrame frame = new ResetFrame(streamid, code);
sendFrame(frame);
} else if (debug.on()) {
Expand All @@ -892,6 +902,11 @@ void resetStream(int streamid, int code) throws IOException {
}
}

private void markStream(int streamid, int code) {
Stream<?> s = streams.get(streamid);
if (s != null) s.markStream(code);
}

// reduce count of streams by 1 if stream still exists
synchronized void decrementStreamsCount(int streamid) {
Stream<?> s = streams.get(streamid);
Expand Down Expand Up @@ -1193,12 +1208,19 @@ private Stream<?> registerNewStream(OutgoingHeaders<Stream<?>> oh) {
Stream<?> stream = oh.getAttachment();
assert stream.streamid == 0;
int streamid = nextstreamid;
nextstreamid += 2;
stream.registerStream(streamid);
// set outgoing window here. This allows thread sending
// body to proceed.
windowController.registerStream(streamid, getInitialSendWindowSize());
return stream;
if (stream.registerStream(streamid, false)) {
// set outgoing window here. This allows thread sending
// body to proceed.
nextstreamid += 2;
windowController.registerStream(streamid, getInitialSendWindowSize());
return stream;
} else {
stream.cancelImpl(new IOException("Request cancelled"));
if (finalStream() && streams.isEmpty()) {
close();
}
return null;
}
}

private final Object sendlock = new Object();
Expand All @@ -1212,7 +1234,9 @@ void sendFrame(Http2Frame frame) {
OutgoingHeaders<Stream<?>> oh = (OutgoingHeaders<Stream<?>>) frame;
Stream<?> stream = registerNewStream(oh);
// provide protection from inserting unordered frames between Headers and Continuation
publisher.enqueue(encodeHeaders(oh, stream));
if (stream != null) {
publisher.enqueue(encodeHeaders(oh, stream));
}
} else {
publisher.enqueue(encodeFrame(frame));
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2017, 2018, Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2017, 2020, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
Expand Down Expand Up @@ -246,7 +246,7 @@ public final void handle() {
}
@Override
public final void abort(IOException error) {
debug().log(() -> "abort: " + error);
debug().log(() -> this.getClass().getSimpleName() + " abort: " + error);
pause(); // pause, then signal
signalError(error); // should not be resumed after abort (not checked)
}
Expand Down Expand Up @@ -724,10 +724,12 @@ public final void request(long n) {
@Override
public final void cancel() {
pauseReadEvent();
if (debug.on()) debug.log("Read subscription cancelled");
if (Log.channel()) {
Log.logChannel("Read subscription cancelled for channel {0}",
channelDescr());
}
if (debug.on()) debug.log("Stopping read scheduler");
readScheduler.stop();
}

Expand All @@ -748,6 +750,7 @@ final void handleError() {
}

final void signalError(Throwable error) {
if (debug.on()) debug.log("signal read error: " + error);
if (!errorRef.compareAndSet(null, error)) {
return;
}
Expand Down Expand Up @@ -808,6 +811,7 @@ final void read() {
}
current.errorRef.compareAndSet(null, error);
current.signalCompletion();
if (debug.on()) debug.log("Stopping read scheduler");
readScheduler.stop();
debugState("leaving read() loop with error: ");
return;
Expand All @@ -831,6 +835,7 @@ final void read() {
// anyway.
pauseReadEvent();
current.signalCompletion();
if (debug.on()) debug.log("Stopping read scheduler");
readScheduler.stop();
}
debugState("leaving read() loop after EOF: ");
Expand All @@ -850,6 +855,7 @@ final void read() {
// waiting for this event to terminate.
// So resume the read event and return now...
resumeReadEvent();
if (errorRef.get() != null) continue;
debugState("leaving read() loop after onNext: ");
return;
} else {
Expand All @@ -861,6 +867,7 @@ final void read() {
// readable again.
demand.increase(1);
resumeReadEvent();
if (errorRef.get() != null) continue;
debugState("leaving read() loop with no bytes");
return;
}
Expand All @@ -879,6 +886,7 @@ final void read() {
// Trying to pause the event here would actually
// introduce a race condition between this loop and
// request(n).
if (errorRef.get() != null) continue;
debugState("leaving read() loop with no demand");
break;
}
Expand Down Expand Up @@ -946,6 +954,7 @@ protected final void signalEvent() {

@Override
protected final void signalError(Throwable error) {
if (debug.on()) debug.log("signalError to %s (%s)", sub, error);
sub.signalError(error);
}

Expand Down
Loading

0 comments on commit 360c1bd

Please sign in to comment.