Skip to content

Commit

Permalink
Emit metrics for reserved headers in HTTP. Hide actual changes under …
Browse files Browse the repository at this point in the history
…feature flag.
  • Loading branch information
biosvs committed May 10, 2024
1 parent f7b3c8e commit cb2af3f
Show file tree
Hide file tree
Showing 8 changed files with 322 additions and 55 deletions.
44 changes: 30 additions & 14 deletions transport/http/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"go.uber.org/yarpc/api/transport"
"go.uber.org/yarpc/internal/bufferpool"
"go.uber.org/yarpc/internal/iopool"
"go.uber.org/yarpc/internal/observability"
"go.uber.org/yarpc/pkg/errors"
"go.uber.org/yarpc/yarpcerrors"
"go.uber.org/zap"
Expand All @@ -48,21 +49,29 @@ func popHeader(h http.Header, n string) string {

// handler adapts a transport.Handler into a handler for net/http.
type handler struct {
router transport.Router
tracer opentracing.Tracer
grabHeaders map[string]struct{}
bothResponseError bool
logger *zap.Logger
router transport.Router
tracer opentracing.Tracer
grabHeaders map[string]struct{}
bothResponseError bool
logger *zap.Logger
reservedHeaderMetric *observability.ReservedHeaderMetrics
}

func (h handler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
responseWriter := newResponseWriter(w)
responseWriter := newResponseWriter(w, h.reservedHeaderMetric.With(req.Header.Get(CallerHeader), req.Header.Get(ServiceHeader)))

service := popHeader(req.Header, ServiceHeader)
procedure := popHeader(req.Header, ProcedureHeader)
bothResponseError := popHeader(req.Header, AcceptsBothResponseErrorHeader) == AcceptTrue

// add response header to echo accepted rpc-service
responseWriter.AddSystemHeader(ServiceHeader, service)
status := yarpcerrors.FromError(errors.WrapHandlerError(h.callHandler(responseWriter, req, service, procedure), service, procedure))

err := h.callHandler(responseWriter, req, service, procedure)
if err == nil && responseWriter.err != nil {
err = responseWriter.err
}
status := yarpcerrors.FromError(errors.WrapHandlerError(err, service, procedure))
if status == nil {
responseWriter.Close(http.StatusOK)
return
Expand Down Expand Up @@ -103,8 +112,10 @@ func (h handler) callHandler(responseWriter *responseWriter, req *http.Request,
return yarpcerrors.Newf(yarpcerrors.CodeNotFound, "request method was %s but only %s is allowed", req.Method, http.MethodPost)
}

caller := popHeader(req.Header, CallerHeader)

treq := &transport.Request{
Caller: popHeader(req.Header, CallerHeader),
Caller: caller,
Service: service,
Procedure: procedure,
Encoding: transport.Encoding(popHeader(req.Header, EncodingHeader)),
Expand All @@ -113,7 +124,7 @@ func (h handler) callHandler(responseWriter *responseWriter, req *http.Request,
RoutingKey: popHeader(req.Header, RoutingKeyHeader),
RoutingDelegate: popHeader(req.Header, RoutingDelegateHeader),
CallerProcedure: popHeader(req.Header, CallerProcedureHeader),
Headers: applicationHeaders.FromHTTPHeaders(req.Header, transport.Headers{}),
Headers: applicationHeaders.FromHTTPHeaders(req.Header, transport.Headers{}, h.reservedHeaderMetric.With(caller, service)),
Body: req.Body,
BodySize: int(req.ContentLength),
}
Expand Down Expand Up @@ -250,13 +261,18 @@ var (

// responseWriter adapts a http.ResponseWriter into a transport.ResponseWriter.
type responseWriter struct {
w http.ResponseWriter
buffer *bufferpool.Buffer
w http.ResponseWriter
buffer *bufferpool.Buffer
err error
edgeMetrics observability.ReservedHeaderEdgeMetrics
}

func newResponseWriter(w http.ResponseWriter) *responseWriter {
func newResponseWriter(w http.ResponseWriter, edgeMetrics observability.ReservedHeaderEdgeMetrics) *responseWriter {
w.Header().Set(ApplicationStatusHeader, ApplicationSuccessStatus)
return &responseWriter{w: w}
return &responseWriter{
w: w,
edgeMetrics: edgeMetrics,
}
}

func (rw *responseWriter) Write(s []byte) (int, error) {
Expand All @@ -267,7 +283,7 @@ func (rw *responseWriter) Write(s []byte) (int, error) {
}

func (rw *responseWriter) AddHeaders(h transport.Headers) {
applicationHeaders.ToHTTPHeaders(h, rw.w.Header())
_, rw.err = applicationHeaders.ToHTTPHeaders(h, rw.w.Header(), rw.edgeMetrics)
}

func (rw *responseWriter) SetApplicationError() {
Expand Down
3 changes: 2 additions & 1 deletion transport/http/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (
"go.uber.org/yarpc/api/transport"
"go.uber.org/yarpc/api/transport/transporttest"
"go.uber.org/yarpc/encoding/raw"
"go.uber.org/yarpc/internal/observability"
"go.uber.org/yarpc/internal/routertest"
"go.uber.org/yarpc/yarpcerrors"
)
Expand Down Expand Up @@ -427,7 +428,7 @@ func TestResponseWriter(t *testing.T) {
appErrCode := yarpcerrors.CodeAborted

recorder := httptest.NewRecorder()
writer := newResponseWriter(recorder)
writer := newResponseWriter(recorder, observability.ReservedHeaderEdgeMetrics{})

headers := transport.HeadersFromMap(map[string]string{
"foo": "bar",
Expand Down
51 changes: 41 additions & 10 deletions transport/http/header.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,44 +25,75 @@ import (
"strings"

"go.uber.org/yarpc/api/transport"
"go.uber.org/yarpc/internal/observability"
"go.uber.org/yarpc/yarpcerrors"
)

// headerConverter converts HTTP headers to and from transport headers.
type headerMapper struct{ Prefix string }

var (
applicationHeaders = headerMapper{ApplicationHeaderPrefix}

// enforceHeaderRules is a feature flag for a more strict error handling rules.
// If true and isReservedHeaderPrefix also true, an error will be returned for
// attempt to set such header; header will be stripped for incoming requests and receiving responses.
// See https://github.com/yarpc/yarpc-go/issues/2265 for more details.
enforceHeaderRules = false
)

// toHTTPHeaders converts application headers into transport headers.
// isReservedHeaderPrefix checks header name by prefix match.
func isReservedHeaderPrefix(header string) bool {
return strings.HasPrefix(strings.ToLower(header), "rpc-") || strings.HasPrefix(strings.ToLower(header), "$rpc$-")
}

// ToHTTPHeaders converts application headers into transport headers.
//
// Headers are read from 'from' and written to 'to'. The final header collection
// is returned.
//
// If 'to' is nil, a new map will be assigned.
func (hm headerMapper) ToHTTPHeaders(from transport.Headers, to http.Header) http.Header {
func (hm headerMapper) ToHTTPHeaders(from transport.Headers, to http.Header, edgeMetrics observability.ReservedHeaderEdgeMetrics) (http.Header, error) {
if to == nil {
to = make(http.Header, from.Len())
}

for k, v := range from.Items() {
if isReservedHeaderPrefix(k) {
edgeMetrics.IncError()
if enforceHeaderRules {
return nil, yarpcerrors.InternalErrorf("cannot use reserved header in application headers: %s", k)
}
}

to.Add(hm.Prefix+k, v)
}
return to

return to, nil
}

// fromHTTPHeaders converts HTTP headers to application headers.
// FromHTTPHeaders converts HTTP headers to application headers.
//
// Headers are read from 'from' and written to 'to'. The final header collection
// is returned.
//
// If 'to' is nil, a new map will be assigned.
func (hm headerMapper) FromHTTPHeaders(from http.Header, to transport.Headers) transport.Headers {
func (hm headerMapper) FromHTTPHeaders(from http.Header, to transport.Headers, edgeMetrics observability.ReservedHeaderEdgeMetrics) transport.Headers {
prefixLen := len(hm.Prefix)

for k := range from {
if strings.HasPrefix(k, hm.Prefix) {
key := k[prefixLen:]
to = to.With(key, from.Get(k))
if !strings.HasPrefix(k, hm.Prefix) {
continue
}

key := k[prefixLen:]

if isReservedHeaderPrefix(key) {
edgeMetrics.IncStripped()
if enforceHeaderRules {
continue
}
}

to = to.With(key, from.Get(k))
// Note: undefined behavior for multiple occurrences of the same header
}
return to
Expand Down
Loading

0 comments on commit cb2af3f

Please sign in to comment.