Skip to content

Commit

Permalink
Emit metrics for reserved headers in tChannel. Hide actual changes un…
Browse files Browse the repository at this point in the history
…der feature flag.
  • Loading branch information
biosvs committed May 7, 2024
1 parent 08690c2 commit 5f291fa
Show file tree
Hide file tree
Showing 12 changed files with 463 additions and 37 deletions.
4 changes: 3 additions & 1 deletion transport/tchannel/channel_outbound.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,9 @@ func (o *ChannelOutbound) Call(ctx context.Context, req *transport.Request) (*tr
}

err = getResponseError(headers)
deleteReservedHeaders(headers)
// no check: err will be returned as is

deleteReservedHeaders(headers, o.transport.reservedHeaderMetric.With(req.Caller, req.Service))

resp := &transport.Response{
Headers: headers,
Expand Down
44 changes: 24 additions & 20 deletions transport/tchannel/channel_transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/opentracing/opentracing-go"
"github.com/uber/tchannel-go"
"go.uber.org/yarpc/api/transport"
"go.uber.org/yarpc/internal/observability"
"go.uber.org/yarpc/pkg/lifecycle"
"go.uber.org/zap"
)
Expand Down Expand Up @@ -82,13 +83,14 @@ func (options transportOptions) newChannelTransport() *ChannelTransport {
logger = zap.NewNop()
}
return &ChannelTransport{
once: lifecycle.NewOnce(),
ch: options.ch,
addr: options.addr,
tracer: options.tracer,
logger: logger.Named("tchannel"),
originalHeaders: options.originalHeaders,
newResponseWriter: newHandlerWriter,
once: lifecycle.NewOnce(),
ch: options.ch,
addr: options.addr,
tracer: options.tracer,
logger: logger.Named("tchannel"),
originalHeaders: options.originalHeaders,
newResponseWriter: newHandlerWriter,
reservedHeaderMetric: observability.NewReserveHeaderMetrics(options.meter, TransportName+"_channel"),
}
}

Expand All @@ -97,14 +99,15 @@ func (options transportOptions) newChannelTransport() *ChannelTransport {
// If you have a YARPC peer.Chooser, use the unqualified tchannel.Transport
// instead.
type ChannelTransport struct {
once *lifecycle.Once
ch Channel
addr string
tracer opentracing.Tracer
logger *zap.Logger
router transport.Router
originalHeaders bool
newResponseWriter func(inboundCallResponse, tchannel.Format, headerCase) responseWriter
once *lifecycle.Once
ch Channel
addr string
tracer opentracing.Tracer
logger *zap.Logger
router transport.Router
originalHeaders bool
newResponseWriter responseWriterConstructor
reservedHeaderMetric *observability.ReservedHeaderMetrics
}

// Channel returns the underlying TChannel "Channel" instance.
Expand Down Expand Up @@ -140,11 +143,12 @@ func (t *ChannelTransport) start() error {
sc := t.ch.GetSubChannel(s)
existing := sc.GetHandlers()
sc.SetHandler(handler{
existing: existing,
router: t.router,
tracer: t.tracer,
logger: t.logger,
newResponseWriter: t.newResponseWriter,
existing: existing,
router: t.router,
tracer: t.tracer,
logger: t.logger,
reservedHeaderMetrics: t.reservedHeaderMetric,
newResponseWriter: t.newResponseWriter,
})
}
}
Expand Down
5 changes: 4 additions & 1 deletion transport/tchannel/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"go.uber.org/multierr"
"go.uber.org/yarpc/api/transport"
"go.uber.org/yarpc/internal/bufferpool"
"go.uber.org/yarpc/internal/observability"
"go.uber.org/yarpc/pkg/errors"
"go.uber.org/yarpc/yarpcerrors"
"go.uber.org/zap"
Expand Down Expand Up @@ -99,6 +100,7 @@ type handler struct {
tracer opentracing.Tracer
headerCase headerCase
logger *zap.Logger
reservedHeaderMetrics *observability.ReservedHeaderMetrics
newResponseWriter responseWriterConstructor
excludeServiceHeaderInResponse bool
}
Expand All @@ -109,7 +111,7 @@ func (h handler) Handle(ctx ncontext.Context, call *tchannel.InboundCall) {

func (h handler) handle(ctx context.Context, call inboundCall) {
// you MUST close the responseWriter no matter what unless you have a tchannel.SystemError
responseWriter := h.newResponseWriter(call.Response(), call.Format(), h.headerCase)
responseWriter := h.newResponseWriter(call.Response(), call.Format(), h.headerCase, h.reservedHeaderMetrics.With(call.CallerName(), call.ServiceName()))
defer responseWriter.ReleaseBuffer()

if !h.excludeServiceHeaderInResponse {
Expand Down Expand Up @@ -183,6 +185,7 @@ func (h handler) callHandler(ctx context.Context, call inboundCall, responseWrit
}

transportHeadersToRequest(treq, headers)
deleteReservedPrefixHeaders(headers, h.reservedHeaderMetrics.With(call.CallerName(), call.ServiceName()))
treq.Headers = headers

if tcall, ok := call.(tchannelCall); ok {
Expand Down
9 changes: 5 additions & 4 deletions transport/tchannel/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
"go.uber.org/yarpc/api/transport/transporttest"
"go.uber.org/yarpc/encoding/json"
"go.uber.org/yarpc/encoding/raw"
"go.uber.org/yarpc/internal/observability"
"go.uber.org/yarpc/internal/routertest"
"go.uber.org/yarpc/internal/testtime"
pkgerrors "go.uber.org/yarpc/pkg/errors"
Expand Down Expand Up @@ -580,7 +581,7 @@ func TestResponseWriter(t *testing.T) {
resp := newResponseRecorder()
call.resp = resp

w := newHandlerWriter(call.Response(), call.Format(), tt.headerCase)
w := newHandlerWriter(call.Response(), call.Format(), tt.headerCase, observability.ReservedHeaderEdgeMetrics{})
tt.apply(w)
assert.NoError(t, w.Close())

Expand Down Expand Up @@ -623,7 +624,7 @@ func TestResponseWriterFailure(t *testing.T) {
resp := newResponseRecorder()
tt.setupResp(resp)

w := newHandlerWriter(resp, tchannel.Raw, canonicalizedHeaderCase)
w := newHandlerWriter(resp, tchannel.Raw, canonicalizedHeaderCase, observability.ReservedHeaderEdgeMetrics{})
_, err := w.Write([]byte("foo"))
assert.NoError(t, err)
_, err = w.Write([]byte("bar"))
Expand All @@ -638,7 +639,7 @@ func TestResponseWriterFailure(t *testing.T) {

func TestResponseWriterEmptyBodyHeaders(t *testing.T) {
res := newResponseRecorder()
w := newHandlerWriter(res, tchannel.Raw, canonicalizedHeaderCase)
w := newHandlerWriter(res, tchannel.Raw, canonicalizedHeaderCase, observability.ReservedHeaderEdgeMetrics{})

w.AddHeaders(transport.NewHeaders().With("foo", "bar"))
require.NoError(t, w.Close())
Expand Down Expand Up @@ -809,7 +810,7 @@ func TestRpcServiceHeader(t *testing.T) {
hw := &responseWriterImpl{}
h := handler{
headerCase: canonicalizedHeaderCase,
newResponseWriter: func(inboundCallResponse, tchannel.Format, headerCase) responseWriter {
newResponseWriter: func(inboundCallResponse, tchannel.Format, headerCase, observability.ReservedHeaderEdgeMetrics) responseWriter {
return hw
},
}
Expand Down
51 changes: 50 additions & 1 deletion transport/tchannel/header.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (

"github.com/uber/tchannel-go"
"go.uber.org/yarpc/api/transport"
"go.uber.org/yarpc/internal/observability"
"go.uber.org/yarpc/transport/tchannel/internal"
"go.uber.org/yarpc/yarpcerrors"
)
Expand Down Expand Up @@ -68,11 +69,23 @@ var _reservedHeaderKeys = map[string]struct{}{
CallerProcedureHeader: {},
}

var (
// enforceHeaderRules is a feature flag for a more strict error handling rules.
// See https://github.com/yarpc/yarpc-go/issues/2265 for more details.
enforceHeaderRules = false
)

// isReservedHeaderKey checks header name by exact match.
func isReservedHeaderKey(key string) bool {
_, ok := _reservedHeaderKeys[strings.ToLower(key)]
return ok
}

// isReservedHeaderPrefix checks header name by prefix match.
func isReservedHeaderPrefix(header string) bool {
return strings.HasPrefix(strings.ToLower(header), "rpc-") || strings.HasPrefix(strings.ToLower(header), "$rpc$-")
}

// readRequestHeaders reads headers and baggage from an incoming request.
func readRequestHeaders(
ctx context.Context,
Expand Down Expand Up @@ -236,10 +249,46 @@ func getHeaderMap(hs transport.Headers, headerCase headerCase) map[string]string
}
}

func deleteReservedHeaders(headers transport.Headers) {
func findReservedHeaderPrefix(headers map[string]string) (string, bool) {
for key := range headers {
if isReservedHeaderPrefix(key) {
return key, true
}
}
return "", false
}

func validateApplicationHeaders(headers map[string]string, edgeMetrics observability.ReservedHeaderEdgeMetrics) error {
key, found := findReservedHeaderPrefix(headers)
if !found {
return nil
}

edgeMetrics.IncError()

if enforceHeaderRules {
return yarpcerrors.InternalErrorf("header with rpc prefix is not allowed in request application headers (%s was passed)", key)
}
return nil
}

func deleteReservedHeaders(headers transport.Headers, edgeMetrics observability.ReservedHeaderEdgeMetrics) {
for headerKey := range _reservedHeaderKeys {
headers.Del(headerKey)
}

deleteReservedPrefixHeaders(headers, edgeMetrics)
}

func deleteReservedPrefixHeaders(headers transport.Headers, edgeMetrics observability.ReservedHeaderEdgeMetrics) {
for key := range headers.Items() {
if isReservedHeaderPrefix(key) {
edgeMetrics.IncStripped()
if enforceHeaderRules {
headers.Del(key)
}
}
}
}

// this check ensures that the service we're issuing a request to is the one
Expand Down
Loading

0 comments on commit 5f291fa

Please sign in to comment.