Skip to content

Commit

Permalink
proxy: fix proxy panic caused by broken backend
Browse files Browse the repository at this point in the history
Close outgoing request body to fix proxy panic.

Signed-off-by: Alexander Yastrebov <[email protected]>
  • Loading branch information
AlexanderYastrebov committed Mar 22, 2024
1 parent 94ad68f commit 13dac9e
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 29 deletions.
6 changes: 3 additions & 3 deletions proxy/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ type context struct {
originalRequest *http.Request
originalResponse *http.Response
outgoingHost string
outgoingDebugRequest *http.Request
outgoingRequest *http.Request
executionCounter int
startServe time.Time
metrics *filterMetrics
Expand Down Expand Up @@ -192,9 +192,9 @@ func (c *context) shunted() bool {
return c.servedWithResponse
}

func (c *context) setResponse(r *http.Response, preserveOriginal bool) {
func (c *context) setResponse(r *http.Response) {
c.response = r
if preserveOriginal {
if c.proxy.flags.PreserveOriginal() {
c.originalResponse = cloneResponseMetadata(r)
}
}
Expand Down
57 changes: 31 additions & 26 deletions proxy/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -550,9 +550,9 @@ func (p *Proxy) selectEndpoint(ctx *context) *routing.LBEndpoint {
return &e
}

// creates an outgoing http request to be forwarded to the route endpoint
// Creates and stores into context an outgoing http request to be forwarded to the route endpoint
// based on the augmented incoming request
func (p *Proxy) mapRequest(ctx *context, requestContext stdlibcontext.Context) (*http.Request, routing.Metrics, error) {
func (p *Proxy) mapRequest(ctx *context, requestContext stdlibcontext.Context) (routing.Metrics, error) {
var endpointMetrics routing.Metrics
r := ctx.request
rt := ctx.route
Expand Down Expand Up @@ -584,7 +584,7 @@ func (p *Proxy) mapRequest(ctx *context, requestContext stdlibcontext.Context) (

rr, err := http.NewRequestWithContext(requestContext, r.Method, u.String(), body)
if err != nil {
return nil, nil, err
return nil, err
}

rr.ContentLength = r.ContentLength
Expand Down Expand Up @@ -615,7 +615,9 @@ func (p *Proxy) mapRequest(ctx *context, requestContext stdlibcontext.Context) (
rr = forwardToProxy(r, rr)
}

return rr, endpointMetrics, nil
ctx.outgoingRequest = rr

return endpointMetrics, nil
}

type proxyUrlContextKey struct{}
Expand Down Expand Up @@ -919,10 +921,11 @@ func (p *Proxy) makeUpgradeRequest(ctx *context, req *http.Request) {
}

func (p *Proxy) makeBackendRequest(ctx *context, requestContext stdlibcontext.Context) (*http.Response, *proxyError) {
req, endpointMetrics, err := p.mapRequest(ctx, requestContext)
endpointMetrics, err := p.mapRequest(ctx, requestContext)
if err != nil {
return nil, &proxyError{err: fmt.Errorf("could not map backend request: %w", err)}
}
req := ctx.outgoingRequest

if res, ok := p.rejectBackend(ctx, req); ok {
return res, nil
Expand Down Expand Up @@ -1191,19 +1194,18 @@ func (p *Proxy) do(ctx *context, parentSpan ot.Span) (err error) {
return err
}

ctx.setResponse(loopCTX.response, p.flags.PreserveOriginal())
ctx.setResponse(loopCTX.response)
ctx.proxySpan = loopCTX.proxySpan
} else if p.flags.Debug() {
debugReq, _, err := p.mapRequest(ctx, ctx.request.Context())
_, err := p.mapRequest(ctx, ctx.request.Context())
if err != nil {
perr := &proxyError{err: err}
p.makeErrorResponse(ctx, perr)
p.applyFiltersOnError(ctx, processedFilters)
return perr
}

ctx.outgoingDebugRequest = debugReq
ctx.setResponse(&http.Response{Header: make(http.Header)}, p.flags.PreserveOriginal())
ctx.setResponse(&http.Response{Header: make(http.Header)})
} else {

done, allow := p.checkBreaker(ctx)
Expand Down Expand Up @@ -1269,7 +1271,7 @@ func (p *Proxy) do(ctx *context, parentSpan ot.Span) (err error) {
done(rsp.StatusCode < http.StatusInternalServerError)
}

ctx.setResponse(rsp, p.flags.PreserveOriginal())
ctx.setResponse(rsp)
p.metrics.MeasureBackend(ctx.route.Id, backendStart)
p.metrics.MeasureBackendHost(ctx.route.Host, backendStart)
}
Expand All @@ -1291,7 +1293,7 @@ func (p *Proxy) serveResponse(ctx *context) {
dbgResponse(ctx.responseWriter, &debugInfo{
route: &ctx.route.Route,
incoming: ctx.originalRequest,
outgoing: ctx.outgoingDebugRequest,
outgoing: ctx.outgoingRequest,
response: ctx.response,
})

Expand Down Expand Up @@ -1360,7 +1362,7 @@ func (p *Proxy) errorResponse(ctx *context, err error) {
if p.flags.Debug() {
di := &debugInfo{
incoming: ctx.originalRequest,
outgoing: ctx.outgoingDebugRequest,
outgoing: ctx.outgoingRequest,
response: ctx.response,
err: err,
}
Expand Down Expand Up @@ -1516,11 +1518,6 @@ func (p *Proxy) ServeHTTP(w http.ResponseWriter, r *http.Request) {

logging.LogAccess(entry, additionalData)
}

// This flush is required in I/O error
if !ctx.successfulUpgrade {
lw.Flush()
}
}()

if p.flags.patchPath() {
Expand All @@ -1540,15 +1537,6 @@ func (p *Proxy) ServeHTTP(w http.ResponseWriter, r *http.Request) {
ctx.initialSpan = span
ctx.parentSpan = span

defer func() {
if ctx.response != nil && ctx.response.Body != nil {
err := ctx.response.Body.Close()
if err != nil {
ctx.Logger().Errorf("error during closing the response body: %v", err)
}
}
}()

err := p.do(ctx, span)

// writeTimeout() filter
Expand All @@ -1575,9 +1563,26 @@ func (p *Proxy) ServeHTTP(w http.ResponseWriter, r *http.Request) {
}
}

// This flush is required in I/O error
if !ctx.successfulUpgrade {
ctx.responseWriter.Flush()
}

if ctx.cancelBackendContext != nil {
ctx.cancelBackendContext()
}

if ctx.outgoingRequest != nil && ctx.outgoingRequest.Body != nil {
if err := ctx.outgoingRequest.Body.Close(); err != nil {
ctx.Logger().Errorf("Failed to close outgoing request body: %v", err)
}
}

if ctx.response != nil && ctx.response.Body != nil {
if err := ctx.response.Body.Close(); err != nil {
ctx.Logger().Errorf("Failed to close response body: %v", err)
}
}
}

// Close causes the proxy to stop closing idle
Expand Down

0 comments on commit 13dac9e

Please sign in to comment.