diff --git a/Sources/NIOHTTP2/HTTP2StreamChannel.swift b/Sources/NIOHTTP2/HTTP2StreamChannel.swift index 3f7aa8ce..4003635f 100644 --- a/Sources/NIOHTTP2/HTTP2StreamChannel.swift +++ b/Sources/NIOHTTP2/HTTP2StreamChannel.swift @@ -630,26 +630,15 @@ private extension HTTP2StreamChannel { while self.pendingReads.count > 0 { let frame = self.pendingReads.removeFirst() - let endStream: Bool let dataLength: Int? switch frame.payload { case .data(let data): - endStream = data.endStream dataLength = data.data.readableBytes - case .headers(let headers): - endStream = headers.endStream - dataLength = nil default: - endStream = false dataLength = nil } - // We've seen end stream: close the window manager to avoid emitting extraneous WINDOW_UPDATE frames. - if endStream { - self.windowManager.closed = true - } - self.pipeline.fireChannelRead(NIOAny(frame)) if let size = dataLength, let increment = self.windowManager.bufferedFrameEmitted(size: size) { @@ -712,6 +701,14 @@ internal extension HTTP2StreamChannel { // actually delivered into the pipeline. if case .data(let dataPayload) = frame.payload { self.windowManager.bufferedFrameReceived(size: dataPayload.data.readableBytes) + + // No further window update frames should be sent. + if dataPayload.endStream { + self.windowManager.closed = true + } + } else if case .headers(let headersPayload) = frame.payload, headersPayload.endStream { + // No further window update frames should be sent. + self.windowManager.closed = true } self.pendingReads.append(message) } diff --git a/Tests/NIOHTTP2Tests/HTTP2FramePayloadStreamMultiplexerTests+XCTest.swift b/Tests/NIOHTTP2Tests/HTTP2FramePayloadStreamMultiplexerTests+XCTest.swift index 2fcf15c5..a702a081 100644 --- a/Tests/NIOHTTP2Tests/HTTP2FramePayloadStreamMultiplexerTests+XCTest.swift +++ b/Tests/NIOHTTP2Tests/HTTP2FramePayloadStreamMultiplexerTests+XCTest.swift @@ -76,6 +76,7 @@ extension HTTP2FramePayloadStreamMultiplexerTests { ("testWeCanCreateFrameAndPayloadBasedStreamsOnAMultiplexer", testWeCanCreateFrameAndPayloadBasedStreamsOnAMultiplexer), ("testReadWhenUsingAutoreadOnChildChannel", testReadWhenUsingAutoreadOnChildChannel), ("testWindowUpdateIsNotEmittedAfterStreamIsClosed", testWindowUpdateIsNotEmittedAfterStreamIsClosed), + ("testWindowUpdateIsNotEmittedAfterStreamIsClosedEvenOnLaterFrame", testWindowUpdateIsNotEmittedAfterStreamIsClosedEvenOnLaterFrame), ] } } diff --git a/Tests/NIOHTTP2Tests/HTTP2FramePayloadStreamMultiplexerTests.swift b/Tests/NIOHTTP2Tests/HTTP2FramePayloadStreamMultiplexerTests.swift index 7dfff8db..ac62b438 100644 --- a/Tests/NIOHTTP2Tests/HTTP2FramePayloadStreamMultiplexerTests.swift +++ b/Tests/NIOHTTP2Tests/HTTP2FramePayloadStreamMultiplexerTests.swift @@ -1848,4 +1848,54 @@ final class HTTP2FramePayloadStreamMultiplexerTests: XCTestCase { // the stream has closed we don't expect to read anything out. XCTAssertNil(try self.channel.readOutbound(as: HTTP2Frame.self)) } + + func testWindowUpdateIsNotEmittedAfterStreamIsClosedEvenOnLaterFrame() throws { + let targetWindowSize = 128 + let multiplexer = HTTP2StreamMultiplexer(mode: .client, + channel: self.channel, + targetWindowSize: targetWindowSize) { channel in + return channel.eventLoop.makeSucceededFuture(()) + } + XCTAssertNoThrow(try self.channel.pipeline.addHandler(multiplexer).wait()) + + // We need to activate the underlying channel here. + XCTAssertNoThrow(try self.channel.connect(to: SocketAddress(ipAddress: "127.0.0.1", port: 80)).wait()) + + // Write a headers frame. + let headers = HPACKHeaders([(":path", "/"), (":method", "GET"), (":authority", "localhost"), (":scheme", "https")]) + let headersFrame = HTTP2Frame(streamID: 1, payload: .headers(.init(headers: headers))) + self.channel.pipeline.fireChannelRead(NIOAny(headersFrame)) + + // Activate the stream. + self.activateStream(1) + + // Send a window updated event. + var windowUpdated = NIOHTTP2WindowUpdatedEvent(streamID: 1, inboundWindowSize: 128, outboundWindowSize: nil) + self.channel.pipeline.fireUserInboundEventTriggered(windowUpdated) + self.channel.pipeline.fireChannelReadComplete() + + // The inbound window size should now be our target: 128. Write enough bytes to consume the + // inbound window as two frames: a 127-byte frame, followed by a 1-byte with END_STREAM set. + let bytes = ByteBuffer(repeating: 0, count: targetWindowSize) + let firstData = HTTP2Frame.FramePayload.data(.init(data: .byteBuffer(bytes.getSlice(at: bytes.readerIndex, length: targetWindowSize - 1)!))) + let secondData = HTTP2Frame.FramePayload.data(.init(data: .byteBuffer(bytes.getSlice(at: bytes.readerIndex, length: 1)!), endStream: true)) + let firstDataFrame = HTTP2Frame(streamID: 1, payload: firstData) + let secondDataFrame = HTTP2Frame(streamID: 1, payload: secondData) + + self.channel.pipeline.fireChannelRead(NIOAny(firstDataFrame)) + windowUpdated = NIOHTTP2WindowUpdatedEvent(streamID: 1, inboundWindowSize: 1, outboundWindowSize: nil) + self.channel.pipeline.fireUserInboundEventTriggered(windowUpdated) + + self.channel.pipeline.fireChannelRead(NIOAny(secondDataFrame)) + // This is nil here for a reason: it reflects what would actually be sent in the real code. Relevantly, the nil currently + // does not actually propagate into the handler, which matters a lot. + windowUpdated = NIOHTTP2WindowUpdatedEvent(streamID: 1, inboundWindowSize: nil, outboundWindowSize: nil) + self.channel.pipeline.fireUserInboundEventTriggered(windowUpdated) + + self.channel.pipeline.fireChannelReadComplete() + + // We've consumed the inbound window: normally we'd expect a WINDOW_UPDATE frame but since + // the stream has closed we don't expect to read anything out. + XCTAssertNil(try self.channel.readOutbound(as: HTTP2Frame.self)) + } }