Skip to content

Commit 71af9c7

Browse files
authored
Crash fix: HTTPClientRequestHandler can deal with failing writes (#558)
1 parent 16aed40 commit 71af9c7

File tree

3 files changed

+106
-22
lines changed

3 files changed

+106
-22
lines changed

Sources/AsyncHTTPClient/ConnectionPool/HTTP2/HTTP2ClientRequestHandler.swift

+45-22
Original file line numberDiff line numberDiff line change
@@ -139,30 +139,13 @@ final class HTTP2ClientRequestHandler: ChannelDuplexHandler {
139139
// MARK: Run Actions
140140

141141
private func run(_ action: HTTPRequestStateMachine.Action, context: ChannelHandlerContext) {
142-
// NOTE: We can bang the request in the following actions, since the `HTTPRequestStateMachine`
143-
// ensures, that actions that require a request are only called, if the request is
144-
// still present. The request is only nilled as a response to a state machine action
145-
// (.failRequest or .succeedRequest).
146-
147142
switch action {
148143
case .sendRequestHead(let head, let startBody):
149-
if startBody {
150-
context.writeAndFlush(self.wrapOutboundOut(.head(head)), promise: nil)
151-
self.request!.requestHeadSent()
152-
self.request!.resumeRequestBodyStream()
153-
} else {
154-
context.write(self.wrapOutboundOut(.head(head)), promise: nil)
155-
context.write(self.wrapOutboundOut(.end(nil)), promise: nil)
156-
context.flush()
157-
158-
self.request!.requestHeadSent()
159-
160-
if let timeoutAction = self.idleReadTimeoutStateMachine?.requestEndSent() {
161-
self.runTimeoutAction(timeoutAction, context: context)
162-
}
163-
}
144+
self.sendRequestHead(head, startBody: startBody, context: context)
164145

165146
case .pauseRequestBodyStream:
147+
// We can force unwrap the request here, as we have just validated in the state machine,
148+
// that the request is neither failed nor finished yet
166149
self.request!.pauseRequestBodyStream()
167150

168151
case .sendBodyPart(let data):
@@ -182,18 +165,29 @@ final class HTTP2ClientRequestHandler: ChannelDuplexHandler {
182165
break
183166

184167
case .resumeRequestBodyStream:
168+
// We can force unwrap the request here, as we have just validated in the state machine,
169+
// that the request is neither failed nor finished yet
185170
self.request!.resumeRequestBodyStream()
186171

187172
case .forwardResponseHead(let head, pauseRequestBodyStream: let pauseRequestBodyStream):
173+
// We can force unwrap the request here, as we have just validated in the state machine,
174+
// that the request is neither failed nor finished yet
188175
self.request!.receiveResponseHead(head)
189-
if pauseRequestBodyStream {
190-
self.request!.pauseRequestBodyStream()
176+
if pauseRequestBodyStream, let request = self.request {
177+
// The above response head forward might lead the request to mark itself as
178+
// cancelled, which in turn might pop the request of the handler. For this reason we
179+
// must check if the request is still present here.
180+
request.pauseRequestBodyStream()
191181
}
192182

193183
case .forwardResponseBodyParts(let parts):
184+
// We can force unwrap the request here, as we have just validated in the state machine,
185+
// that the request is neither failed nor finished yet
194186
self.request!.receiveResponseBodyParts(parts)
195187

196188
case .failRequest(let error, _):
189+
// We can force unwrap the request here, as we have just validated in the state machine,
190+
// that the request object is still present.
197191
self.request!.fail(error)
198192
self.request = nil
199193
self.runTimeoutAction(.clearIdleReadTimeoutTimer, context: context)
@@ -204,13 +198,42 @@ final class HTTP2ClientRequestHandler: ChannelDuplexHandler {
204198
self.runFinalAction(.close, context: context)
205199

206200
case .succeedRequest(let finalAction, let finalParts):
201+
// We can force unwrap the request here, as we have just validated in the state machine,
202+
// that the request object is still present.
207203
self.request!.succeedRequest(finalParts)
208204
self.request = nil
209205
self.runTimeoutAction(.clearIdleReadTimeoutTimer, context: context)
210206
self.runFinalAction(finalAction, context: context)
211207
}
212208
}
213209

210+
private func sendRequestHead(_ head: HTTPRequestHead, startBody: Bool, context: ChannelHandlerContext) {
211+
if startBody {
212+
context.writeAndFlush(self.wrapOutboundOut(.head(head)), promise: nil)
213+
214+
// The above write might trigger an error, which may lead to a call to `errorCaught`,
215+
// which in turn, may fail the request and pop it from the handler. For this reason
216+
// we must check if the request is still present here.
217+
guard let request = self.request else { return }
218+
request.requestHeadSent()
219+
request.resumeRequestBodyStream()
220+
} else {
221+
context.write(self.wrapOutboundOut(.head(head)), promise: nil)
222+
context.write(self.wrapOutboundOut(.end(nil)), promise: nil)
223+
context.flush()
224+
225+
// The above write might trigger an error, which may lead to a call to `errorCaught`,
226+
// which in turn, may fail the request and pop it from the handler. For this reason
227+
// we must check if the request is still present here.
228+
guard let request = self.request else { return }
229+
request.requestHeadSent()
230+
231+
if let timeoutAction = self.idleReadTimeoutStateMachine?.requestEndSent() {
232+
self.runTimeoutAction(timeoutAction, context: context)
233+
}
234+
}
235+
}
236+
214237
private func runFinalAction(_ action: HTTPRequestStateMachine.Action.FinalStreamAction, context: ChannelHandlerContext) {
215238
switch action {
216239
case .close:

Tests/AsyncHTTPClientTests/HTTP2ClientRequestHandlerTests+XCTest.swift

+1
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ extension HTTP2ClientRequestHandlerTests {
2929
("testWriteBackpressure", testWriteBackpressure),
3030
("testIdleReadTimeout", testIdleReadTimeout),
3131
("testIdleReadTimeoutIsCanceledIfRequestIsCanceled", testIdleReadTimeoutIsCanceledIfRequestIsCanceled),
32+
("testWriteHTTPHeadFails", testWriteHTTPHeadFails),
3233
]
3334
}
3435
}

Tests/AsyncHTTPClientTests/HTTP2ClientRequestHandlerTests.swift

+60
Original file line numberDiff line numberDiff line change
@@ -285,4 +285,64 @@ class HTTP2ClientRequestHandlerTests: XCTestCase {
285285
// therefore advancing the time should not trigger a crash
286286
embedded.embeddedEventLoop.advanceTime(by: .milliseconds(250))
287287
}
288+
289+
func testWriteHTTPHeadFails() {
290+
struct WriteError: Error, Equatable {}
291+
292+
class FailWriteHandler: ChannelOutboundHandler {
293+
typealias OutboundIn = HTTPClientRequestPart
294+
typealias OutboundOut = HTTPClientRequestPart
295+
296+
func write(context: ChannelHandlerContext, data: NIOAny, promise: EventLoopPromise<Void>?) {
297+
let error = WriteError()
298+
promise?.fail(error)
299+
context.fireErrorCaught(error)
300+
}
301+
}
302+
303+
let bodies: [HTTPClient.Body?] = [
304+
.none,
305+
.some(.byteBuffer(ByteBuffer(string: "hello world"))),
306+
]
307+
308+
for body in bodies {
309+
let embeddedEventLoop = EmbeddedEventLoop()
310+
let requestHandler = HTTP2ClientRequestHandler(eventLoop: embeddedEventLoop)
311+
let embedded = EmbeddedChannel(handlers: [FailWriteHandler(), requestHandler], loop: embeddedEventLoop)
312+
313+
let logger = Logger(label: "test")
314+
315+
var maybeRequest: HTTPClient.Request?
316+
XCTAssertNoThrow(maybeRequest = try HTTPClient.Request(url: "http://localhost/", method: .POST, body: body))
317+
guard let request = maybeRequest else { return XCTFail("Expected to be able to create a request") }
318+
319+
let delegate = ResponseAccumulator(request: request)
320+
var maybeRequestBag: RequestBag<ResponseAccumulator>?
321+
XCTAssertNoThrow(maybeRequestBag = try RequestBag(
322+
request: request,
323+
eventLoopPreference: .delegate(on: embedded.eventLoop),
324+
task: .init(eventLoop: embedded.eventLoop, logger: logger),
325+
redirectHandler: nil,
326+
connectionDeadline: .now() + .seconds(30),
327+
requestOptions: .forTests(idleReadTimeout: .milliseconds(200)),
328+
delegate: delegate
329+
))
330+
guard let requestBag = maybeRequestBag else { return XCTFail("Expected to be able to create a request bag") }
331+
332+
embedded.isWritable = false
333+
XCTAssertNoThrow(try embedded.connect(to: .makeAddressResolvingHost("localhost", port: 0)).wait())
334+
embedded.write(requestBag, promise: nil)
335+
336+
// the handler only writes once the channel is writable
337+
XCTAssertEqual(try embedded.readOutbound(as: HTTPClientRequestPart.self), .none)
338+
embedded.isWritable = true
339+
embedded.pipeline.fireChannelWritabilityChanged()
340+
341+
XCTAssertThrowsError(try requestBag.task.futureResult.wait()) {
342+
XCTAssertEqual($0 as? WriteError, WriteError())
343+
}
344+
345+
XCTAssertEqual(embedded.isActive, false)
346+
}
347+
}
288348
}

0 commit comments

Comments
 (0)