Skip to content

Commit 1c05540

Browse files
committed
http2: block RoundTrip when the Transport hits MaxConcurrentStreams
Currently if the http2.Transport hits SettingsMaxConcurrentStreams for a server, it just makes a new TCP connection and creates the stream on the new connection. This CL updates that behavior to instead block RoundTrip until a new stream is available. I also fixed a second bug, which was necessary to make some tests pass: Previously, a stream was removed from cc.streams only if either (a) we received END_STREAM from the server, or (b) we received RST_STREAM from the server. This CL removes a stream from cc.streams if the request was cancelled (via ctx.Close, req.Cancel, or resp.Body.Close) before receiving END_STREAM or RST_STREAM from the server. Updates golang/go#13774 Updates golang/go#20985 Updates golang/go#21229 Change-Id: I660ffd724c4c513e0f1cc587b404bedb8aff80be Reviewed-on: https://go-review.googlesource.com/53250 Run-TryBot: Tom Bergan <[email protected]> TryBot-Result: Gobot Gobot <[email protected]> Reviewed-by: Brad Fitzpatrick <[email protected]>
1 parent 167db31 commit 1c05540

File tree

2 files changed

+249
-26
lines changed

2 files changed

+249
-26
lines changed

http2/transport.go

+78-24
Original file line numberDiff line numberDiff line change
@@ -165,6 +165,7 @@ type ClientConn struct {
165165
goAwayDebug string // goAway frame's debug data, retained as a string
166166
streams map[uint32]*clientStream // client-initiated
167167
nextStreamID uint32
168+
pendingRequests int // requests blocked and waiting to be sent because len(streams) == maxConcurrentStreams
168169
pings map[[8]byte]chan struct{} // in flight ping data to notification channel
169170
bw *bufio.Writer
170171
br *bufio.Reader
@@ -217,35 +218,45 @@ type clientStream struct {
217218
resTrailer *http.Header // client's Response.Trailer
218219
}
219220

220-
// awaitRequestCancel runs in its own goroutine and waits for the user
221-
// to cancel a RoundTrip request, its context to expire, or for the
222-
// request to be done (any way it might be removed from the cc.streams
223-
// map: peer reset, successful completion, TCP connection breakage,
224-
// etc)
225-
func (cs *clientStream) awaitRequestCancel(req *http.Request) {
221+
// awaitRequestCancel waits for the user to cancel a request or for the done
222+
// channel to be signaled. A non-nil error is returned only if the request was
223+
// canceled.
224+
func awaitRequestCancel(req *http.Request, done <-chan struct{}) error {
226225
ctx := reqContext(req)
227226
if req.Cancel == nil && ctx.Done() == nil {
228-
return
227+
return nil
229228
}
230229
select {
231230
case <-req.Cancel:
232-
cs.cancelStream()
233-
cs.bufPipe.CloseWithError(errRequestCanceled)
231+
return errRequestCanceled
234232
case <-ctx.Done():
233+
return ctx.Err()
234+
case <-done:
235+
return nil
236+
}
237+
}
238+
239+
// awaitRequestCancel waits for the user to cancel a request, its context to
240+
// expire, or for the request to be done (any way it might be removed from the
241+
// cc.streams map: peer reset, successful completion, TCP connection breakage,
242+
// etc). If the request is canceled, then cs will be canceled and closed.
243+
func (cs *clientStream) awaitRequestCancel(req *http.Request) {
244+
if err := awaitRequestCancel(req, cs.done); err != nil {
235245
cs.cancelStream()
236-
cs.bufPipe.CloseWithError(ctx.Err())
237-
case <-cs.done:
246+
cs.bufPipe.CloseWithError(err)
238247
}
239248
}
240249

241250
func (cs *clientStream) cancelStream() {
242-
cs.cc.mu.Lock()
251+
cc := cs.cc
252+
cc.mu.Lock()
243253
didReset := cs.didReset
244254
cs.didReset = true
245-
cs.cc.mu.Unlock()
255+
cc.mu.Unlock()
246256

247257
if !didReset {
248-
cs.cc.writeStreamReset(cs.ID, ErrCodeCancel, nil)
258+
cc.writeStreamReset(cs.ID, ErrCodeCancel, nil)
259+
cc.forgetStreamID(cs.ID)
249260
}
250261
}
251262

@@ -594,6 +605,8 @@ func (cc *ClientConn) setGoAway(f *GoAwayFrame) {
594605
}
595606
}
596607

608+
// CanTakeNewRequest reports whether the connection can take a new request,
609+
// meaning it has not been closed or received or sent a GOAWAY.
597610
func (cc *ClientConn) CanTakeNewRequest() bool {
598611
cc.mu.Lock()
599612
defer cc.mu.Unlock()
@@ -605,8 +618,7 @@ func (cc *ClientConn) canTakeNewRequestLocked() bool {
605618
return false
606619
}
607620
return cc.goAway == nil && !cc.closed &&
608-
int64(len(cc.streams)+1) < int64(cc.maxConcurrentStreams) &&
609-
cc.nextStreamID < math.MaxInt32
621+
int64(cc.nextStreamID)+int64(cc.pendingRequests) < math.MaxInt32
610622
}
611623

612624
// onIdleTimeout is called from a time.AfterFunc goroutine. It will
@@ -752,10 +764,9 @@ func (cc *ClientConn) RoundTrip(req *http.Request) (*http.Response, error) {
752764
hasTrailers := trailers != ""
753765

754766
cc.mu.Lock()
755-
cc.lastActive = time.Now()
756-
if cc.closed || !cc.canTakeNewRequestLocked() {
767+
if err := cc.awaitOpenSlotForRequest(req); err != nil {
757768
cc.mu.Unlock()
758-
return nil, errClientConnUnusable
769+
return nil, err
759770
}
760771

761772
body := req.Body
@@ -869,31 +880,31 @@ func (cc *ClientConn) RoundTrip(req *http.Request) (*http.Response, error) {
869880
case re := <-readLoopResCh:
870881
return handleReadLoopResponse(re)
871882
case <-respHeaderTimer:
872-
cc.forgetStreamID(cs.ID)
873883
if !hasBody || bodyWritten {
874884
cc.writeStreamReset(cs.ID, ErrCodeCancel, nil)
875885
} else {
876886
bodyWriter.cancel()
877887
cs.abortRequestBodyWrite(errStopReqBodyWriteAndCancel)
878888
}
889+
cc.forgetStreamID(cs.ID)
879890
return nil, errTimeout
880891
case <-ctx.Done():
881-
cc.forgetStreamID(cs.ID)
882892
if !hasBody || bodyWritten {
883893
cc.writeStreamReset(cs.ID, ErrCodeCancel, nil)
884894
} else {
885895
bodyWriter.cancel()
886896
cs.abortRequestBodyWrite(errStopReqBodyWriteAndCancel)
887897
}
898+
cc.forgetStreamID(cs.ID)
888899
return nil, ctx.Err()
889900
case <-req.Cancel:
890-
cc.forgetStreamID(cs.ID)
891901
if !hasBody || bodyWritten {
892902
cc.writeStreamReset(cs.ID, ErrCodeCancel, nil)
893903
} else {
894904
bodyWriter.cancel()
895905
cs.abortRequestBodyWrite(errStopReqBodyWriteAndCancel)
896906
}
907+
cc.forgetStreamID(cs.ID)
897908
return nil, errRequestCanceled
898909
case <-cs.peerReset:
899910
// processResetStream already removed the
@@ -920,6 +931,45 @@ func (cc *ClientConn) RoundTrip(req *http.Request) (*http.Response, error) {
920931
}
921932
}
922933

934+
// awaitOpenSlotForRequest waits until len(streams) < maxConcurrentStreams.
935+
// Must hold cc.mu.
936+
func (cc *ClientConn) awaitOpenSlotForRequest(req *http.Request) error {
937+
var waitingForConn chan struct{}
938+
var waitingForConnErr error // guarded by cc.mu
939+
for {
940+
cc.lastActive = time.Now()
941+
if cc.closed || !cc.canTakeNewRequestLocked() {
942+
return errClientConnUnusable
943+
}
944+
if int64(len(cc.streams))+1 <= int64(cc.maxConcurrentStreams) {
945+
if waitingForConn != nil {
946+
close(waitingForConn)
947+
}
948+
return nil
949+
}
950+
// Unfortunately, we cannot wait on a condition variable and channel at
951+
// the same time, so instead, we spin up a goroutine to check if the
952+
// request is canceled while we wait for a slot to open in the connection.
953+
if waitingForConn == nil {
954+
waitingForConn = make(chan struct{})
955+
go func() {
956+
if err := awaitRequestCancel(req, waitingForConn); err != nil {
957+
cc.mu.Lock()
958+
waitingForConnErr = err
959+
cc.cond.Broadcast()
960+
cc.mu.Unlock()
961+
}
962+
}()
963+
}
964+
cc.pendingRequests++
965+
cc.cond.Wait()
966+
cc.pendingRequests--
967+
if waitingForConnErr != nil {
968+
return waitingForConnErr
969+
}
970+
}
971+
}
972+
923973
// requires cc.wmu be held
924974
func (cc *ClientConn) writeHeaders(streamID uint32, endStream bool, hdrs []byte) error {
925975
first := true // first frame written (HEADERS is first, then CONTINUATION)
@@ -1279,7 +1329,9 @@ func (cc *ClientConn) streamByID(id uint32, andRemove bool) *clientStream {
12791329
cc.idleTimer.Reset(cc.idleTimeout)
12801330
}
12811331
close(cs.done)
1282-
cc.cond.Broadcast() // wake up checkResetOrDone via clientStream.awaitFlowControl
1332+
// Wake up checkResetOrDone via clientStream.awaitFlowControl and
1333+
// wake up RoundTrip if there is a pending request.
1334+
cc.cond.Broadcast()
12831335
}
12841336
return cs
12851337
}
@@ -1378,8 +1430,9 @@ func (rl *clientConnReadLoop) run() error {
13781430
cc.vlogf("http2: Transport readFrame error on conn %p: (%T) %v", cc, err, err)
13791431
}
13801432
if se, ok := err.(StreamError); ok {
1381-
if cs := cc.streamByID(se.StreamID, true /*ended; remove it*/); cs != nil {
1433+
if cs := cc.streamByID(se.StreamID, false); cs != nil {
13821434
cs.cc.writeStreamReset(cs.ID, se.Code, err)
1435+
cs.cc.forgetStreamID(cs.ID)
13831436
if se.Cause == nil {
13841437
se.Cause = cc.fr.errDetail
13851438
}
@@ -1701,6 +1754,7 @@ func (b transportResponseBody) Close() error {
17011754
}
17021755

17031756
cs.bufPipe.BreakWithError(errClosedResponseBody)
1757+
cc.forgetStreamID(cs.ID)
17041758
return nil
17051759
}
17061760

0 commit comments

Comments
 (0)