Skip to content

Commit

Permalink
Handle race condition between sending GOAWAY and HEADERS (#457)
Browse files Browse the repository at this point in the history
Motivation:

A potential race exists between a server sending a GOAWAY frame and a
client opening a new stream (i.e., sending a HEADERS frame before
receiving an already sent GOAWAY frame). If there are no open streams
when the server sends the GOAWAY frame, the connection state on the
server transitions to being fully quiesced, which throws a connection
error on receipt of a HEADERS frame.

Modifications:

- Adjust the connection state machine on the server to throw a stream
error (instead of a connection error), and consequently, send a
RST_STREAM frame to the client.

Result:

This condition will be treated as a stream-level error instead.
  • Loading branch information
clintonpi authored Sep 6, 2024
1 parent 9f6d865 commit 376312d
Show file tree
Hide file tree
Showing 4 changed files with 110 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -704,7 +704,7 @@ extension HTTP2ConnectionStateMachine {
return .init(result: .connectionError(underlyingError: NIOHTTP2Errors.missingPreface(), type: .protocolError), effect: nil)

case .fullyQuiesced:
return .init(result: .connectionError(underlyingError: NIOHTTP2Errors.ioOnClosedConnection(), type: .protocolError), effect: nil)
return .init(result: .streamError(streamID: streamID, underlyingError: NIOHTTP2Errors.streamError(streamID: streamID, baseError: NIOHTTP2Errors.createdStreamAfterGoaway()), type: .refusedStream), effect: nil)

case .modifying:
preconditionFailure("Must not be left in modifying state")
Expand Down Expand Up @@ -1042,7 +1042,10 @@ extension HTTP2ConnectionStateMachine {
return .init(result: .connectionError(underlyingError: NIOHTTP2Errors.missingPreface(), type: .protocolError), effect: nil)

case .fullyQuiesced:
return .init(result: .connectionError(underlyingError: NIOHTTP2Errors.ioOnClosedConnection(), type: .protocolError), effect: nil)
// We allow RST_STREAM frames to be sent because when a server receives a HEADERS frame in this state (say, a client sends
// a HEADERS frame before receiving the GOAWAY frame that the server has already sent), it throws a stream
// error which causes the emission of a RST_STREAM frame. This RST_STREAM frame needs to be passed on successfully.
return .init(result: .succeed, effect: nil)

case .modifying:
preconditionFailure("Must not be left in modifying state")
Expand Down
24 changes: 22 additions & 2 deletions Tests/NIOHTTP2Tests/ConnectionStateMachineTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -578,6 +578,24 @@ class ConnectionStateMachineTests: XCTestCase {
assertIgnored(temporaryServer.receiveHeaders(streamID: streamSeven, headers: ConnectionStateMachineTests.requestHeaders, isEndStreamSet: true))
}

func testClientInitiatesNewStreamBeforeReceivingAlreadySentGoaway() {
let streamOne = HTTP2StreamID(1)

self.exchangePreamble()

var temporaryServer = self.server!
var temporaryClient = self.client!

assertSucceeds(temporaryServer.sendGoaway(lastStreamID: .maxID))
assertSucceeds(temporaryClient.sendHeaders(streamID: streamOne, headers: ConnectionStateMachineTests.requestHeaders, isEndStreamSet: false))
assertSucceeds(temporaryClient.receiveGoaway(lastStreamID: .maxID))

// The server should throw a stream error (and as a result, respond with a RST_STREAM frame) when it receives the headers.
assertStreamError(type: .refusedStream, temporaryServer.receiveHeaders(streamID: streamOne, headers: ConnectionStateMachineTests.requestHeaders, isEndStreamSet: false))
assertSucceeds(temporaryServer.sendRstStream(streamID: streamOne, reason: .refusedStream))
assertSucceeds(temporaryClient.receiveRstStream(streamID: streamOne, reason: .refusedStream))
}

func testHeadersOnClosedStreamAfterClientGoaway() {
let streamOne = HTTP2StreamID(1)
let streamTwo = HTTP2StreamID(2)
Expand Down Expand Up @@ -1053,12 +1071,11 @@ class ConnectionStateMachineTests: XCTestCase {

// Stream specific things don't work.
assertConnectionError(type: .protocolError, self.client.sendHeaders(streamID: streamOne, headers: ConnectionStateMachineTests.requestHeaders, isEndStreamSet: true))
assertConnectionError(type: .protocolError, self.server.receiveHeaders(streamID: streamOne, headers: ConnectionStateMachineTests.requestHeaders, isEndStreamSet: true))
assertStreamError(type: .refusedStream, self.server.receiveHeaders(streamID: streamOne, headers: ConnectionStateMachineTests.requestHeaders, isEndStreamSet: true))
assertConnectionError(type: .protocolError, self.client.sendData(streamID: streamOne, contentLength: 15, flowControlledBytes: 15, isEndStreamSet: false))
assertConnectionError(type: .protocolError, self.server.receiveData(streamID: streamOne, contentLength: 15, flowControlledBytes: 15, isEndStreamSet: false))
assertConnectionError(type: .protocolError, self.client.sendWindowUpdate(streamID: streamOne, windowIncrement: 1024))
assertConnectionError(type: .protocolError, self.server.receiveWindowUpdate(streamID: streamOne, windowIncrement: 1024))
assertConnectionError(type: .protocolError, self.client.sendRstStream(streamID: streamOne, reason: .noError))
assertConnectionError(type: .protocolError, self.server.receiveRstStream(streamID: streamOne, reason: .noError))
assertConnectionError(type: .protocolError, self.server.sendPushPromise(originalStreamID: streamOne, childStreamID: streamTwo, headers: ConnectionStateMachineTests.requestHeaders))
assertConnectionError(type: .protocolError, self.client.receivePushPromise(originalStreamID: streamOne, childStreamID: streamTwo, headers: ConnectionStateMachineTests.requestHeaders))
Expand All @@ -1073,6 +1090,9 @@ class ConnectionStateMachineTests: XCTestCase {
assertSucceeds(self.client.sendGoaway(lastStreamID: .rootStream))
assertSucceeds(self.server.receiveGoaway(lastStreamID: .rootStream))

// Sending RST_STREAM is cool too.
assertSucceeds(self.client.sendRstStream(streamID: streamOne, reason: .noError))

// PINGing is cool too.
assertSucceeds(self.client.sendPing())
assertSucceeds(self.server.receivePing(ackFlagSet: false))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -329,12 +329,59 @@ class SimpleClientServerInlineStreamMultiplexerTests: XCTestCase {
stream.writeAndFlush(headerPayload, promise: promise)
}

func testSuccessfullyReceiveAndSendPingEvenWhenConnectionIsFullyQuiesced() throws {
func testOpenStreamBeforeReceivingAlreadySentGoAway() throws {
let serverHandler = InboundFramePayloadRecorder()
try self.basicHTTP2Connection() { channel in
return channel.pipeline.addHandler(serverHandler)
}

let clientHandler = InboundFramePayloadRecorder()
let childChannelPromise = self.clientChannel.eventLoop.makePromise(of: Channel.self)
let multiplexer = try self.clientChannel.pipeline.handler(type: NIOHTTP2Handler.self).wait().multiplexer.wait()
multiplexer.createStreamChannel(promise: childChannelPromise) { channel in
return channel.pipeline.addHandler(clientHandler)
}
self.clientChannel.embeddedEventLoop.run()
let childChannel = try childChannelPromise.futureResult.wait()

// Server sends GOAWAY frame.
let goAwayFrame = HTTP2Frame(streamID: .rootStream, payload: .goAway(lastStreamID: .maxID, errorCode: .noError, opaqueData: nil))
serverChannel.writeAndFlush(goAwayFrame, promise: nil)

// Client sends headers.
let headers = HPACKHeaders([(":path", "/"), (":method", "POST"), (":scheme", "https"), (":authority", "localhost")])
let reqFramePayload = HTTP2Frame.FramePayload.headers(.init(headers: headers))
childChannel.writeAndFlush(reqFramePayload, promise: nil)

self.interactInMemory(self.clientChannel, self.serverChannel, expectError: true) { error in
if let error = error as? NIOHTTP2Errors.StreamError {
XCTAssert(error.baseError is NIOHTTP2Errors.CreatedStreamAfterGoaway)
} else {
XCTFail("Expected error to be of type StreamError, got error of type \(type(of: error)).")
}
}

// Client receives GOAWAY and RST_STREAM frames.
try self.clientChannel.assertReceivedFrame().assertGoAwayFrame(lastStreamID: .maxID, errorCode: 0, opaqueData: nil)
clientHandler.receivedFrames.assertFramePayloadsMatch([HTTP2Frame.FramePayload.rstStream(.refusedStream)])

// No frames left.
self.clientChannel.assertNoFramesReceived()
self.serverChannel.assertNoFramesReceived()

// The stream closes with an error.
self.clientChannel.embeddedEventLoop.run()
XCTAssertThrowsError(try childChannel.closeFuture.wait())

XCTAssertNoThrow(try self.clientChannel.finish())
XCTAssertNoThrow(try self.serverChannel.finish())
}

func testSuccessfullyReceiveAndSendPingEvenWhenConnectionIsFullyQuiesced() throws {
let serverHandler = InboundFramePayloadRecorder()
try self.basicHTTP2Connection() { channel in
return channel.pipeline.addHandler(serverHandler)
}
// Fully quiesce the connection on the server.
let goAwayFrame = HTTP2Frame(streamID: .rootStream, payload: .goAway(lastStreamID: .rootStream, errorCode: .noError, opaqueData: nil))
serverChannel.writeAndFlush(goAwayFrame, promise: nil)
Expand Down
38 changes: 35 additions & 3 deletions Tests/NIOHTTP2Tests/TestUtilities.swift
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,16 @@ struct NoFrameReceived: Error { }
extension XCTestCase {
/// Have two `EmbeddedChannel` objects send and receive data from each other until
/// they make no forward progress.
func interactInMemory(_ first: EmbeddedChannel, _ second: EmbeddedChannel, file: StaticString = #filePath, line: UInt = #line) {
func interactInMemory(
_ first: EmbeddedChannel,
_ second: EmbeddedChannel,
expectError: Bool = false,
file: StaticString = #filePath,
line: UInt = #line,
_ errorHandler: (_ error: any Error) -> Void = { _ in }
) {
var operated: Bool
var encounteredError = false

func readBytesFromChannel(_ channel: EmbeddedChannel) -> ByteBuffer? {
guard let data = try? assertNoThrowWithValue(channel.readOutbound(as: IOData.self)) else {
Expand All @@ -54,13 +62,37 @@ extension XCTestCase {

if let data = readBytesFromChannel(first) {
operated = true
XCTAssertNoThrow(try second.writeInbound(data), file: (file), line: line)

if expectError {
do {
try second.writeInbound(data)
} catch {
encounteredError = true
errorHandler(error)
}
} else {
XCTAssertNoThrow(try second.writeInbound(data), file: (file), line: line)
}
}
if let data = readBytesFromChannel(second) {
operated = true
XCTAssertNoThrow(try first.writeInbound(data), file: (file), line: line)

if expectError {
do {
try first.writeInbound(data)
} catch {
encounteredError = true
errorHandler(error)
}
} else {
XCTAssertNoThrow(try first.writeInbound(data), file: (file), line: line)
}
}
} while operated

if expectError && !encounteredError {
XCTFail("Expected an error but didn't encounter one", file: (file), line: line)
}
}

/// Have two `NIOAsyncTestingChannel` objects send and receive data from each other until
Expand Down

0 comments on commit 376312d

Please sign in to comment.