Skip to content

Commit

Permalink
contrib/google.golang.org/grpc: add WithIgnoredMethods (#577)
Browse files Browse the repository at this point in the history
Add an option which allows ignoring some server methods when tracing.

Fixes #576
  • Loading branch information
mathetake authored Jan 29, 2020
1 parent e235f11 commit 7704362
Show file tree
Hide file tree
Showing 3 changed files with 92 additions and 5 deletions.
74 changes: 73 additions & 1 deletion contrib/google.golang.org/grpc/grpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -456,7 +456,6 @@ type rig struct {
func (r *rig) Close() {
r.server.Stop()
r.conn.Close()
r.listener.Close()
}

func newRig(traceClient bool, interceptorOpts ...Option) (*rig, error) {
Expand Down Expand Up @@ -591,3 +590,76 @@ func TestAnalyticsSettings(t *testing.T) {
assertRate(t, mt, 0.23, WithAnalyticsRate(0.23))
})
}

func TestIgnoredMethods(t *testing.T) {
t.Run("unary", func(t *testing.T) {
mt := mocktracer.Start()
defer mt.Stop()
for _, c := range []struct {
ignore []string
exp int
}{
{ignore: []string{}, exp: 2},
{ignore: []string{"/some/endpoint"}, exp: 2},
{ignore: []string{"/grpc.Fixture/Ping"}, exp: 1},
{ignore: []string{"/grpc.Fixture/Ping", "/additional/endpoint"}, exp: 1},
} {
rig, err := newRig(true, WithIgnoredMethods(c.ignore...))
if err != nil {
t.Fatalf("error setting up rig: %s", err)
}
client := rig.client
resp, err := client.Ping(context.Background(), &FixtureRequest{Name: "pass"})
assert.Nil(t, err)
assert.Equal(t, resp.Message, "passed")

spans := mt.FinishedSpans()
assert.Len(t, spans, c.exp)
rig.Close()
mt.Reset()
}
})

t.Run("stream", func(t *testing.T) {
mt := mocktracer.Start()
defer mt.Stop()
for _, c := range []struct {
ignore []string
exp int
}{
// client span: 1 send + 1 recv(OK) + 1 stream finish (OK)
// server span: 1 send + 2 recv(OK + EOF) + 1 stream finish(EOF)
{ignore: []string{}, exp: 7},
{ignore: []string{"/some/endpoint"}, exp: 7},
{ignore: []string{"/grpc.Fixture/StreamPing"}, exp: 3},
{ignore: []string{"/grpc.Fixture/StreamPing", "/additional/endpoint"}, exp: 3},
} {
rig, err := newRig(true, WithIgnoredMethods(c.ignore...))
if err != nil {
t.Fatalf("error setting up rig: %s", err)
}

ctx, done := context.WithCancel(context.Background())
client := rig.client
stream, err := client.StreamPing(ctx)
assert.NoError(t, err)

err = stream.Send(&FixtureRequest{Name: "pass"})
assert.NoError(t, err)

resp, err := stream.Recv()
assert.NoError(t, err)
assert.Equal(t, resp.Message, "passed")

assert.NoError(t, stream.CloseSend())
done() // close stream from client side
rig.Close()

waitForSpans(mt, c.exp, 5*time.Second)

spans := mt.FinishedSpans()
assert.Len(t, spans, c.exp)
mt.Reset()
}
})
}
13 changes: 13 additions & 0 deletions contrib/google.golang.org/grpc/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ type config struct {
traceStreamCalls bool
traceStreamMessages bool
noDebugStack bool
ignoredMethods map[string]struct{}
}

func (cfg *config) serverServiceName() string {
Expand Down Expand Up @@ -116,3 +117,15 @@ func WithAnalyticsRate(rate float64) Option {
}
}
}

// WithIgnoredMethods specifies full methods to be ignored by the server side interceptor.
// When an incoming request's full method is in ms, no spans will be created.
func WithIgnoredMethods(ms ...string) Option {
ims := make(map[string]struct{}, len(ms))
for _, e := range ms {
ims[e] = struct{}{}
}
return func(cfg *config) {
cfg.ignoredMethods = ims
}
}
10 changes: 6 additions & 4 deletions contrib/google.golang.org/grpc/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func (ss *serverStream) Context() context.Context {
}

func (ss *serverStream) RecvMsg(m interface{}) (err error) {
if ss.cfg.traceStreamMessages {
if _, ok := ss.cfg.ignoredMethods[ss.method]; ss.cfg.traceStreamMessages && !ok {
span, _ := startSpanFromContext(
ss.ctx,
ss.method,
Expand All @@ -47,7 +47,7 @@ func (ss *serverStream) RecvMsg(m interface{}) (err error) {
}

func (ss *serverStream) SendMsg(m interface{}) (err error) {
if ss.cfg.traceStreamMessages {
if _, ok := ss.cfg.ignoredMethods[ss.method]; ss.cfg.traceStreamMessages && !ok {
span, _ := startSpanFromContext(
ss.ctx,
ss.method,
Expand All @@ -73,9 +73,8 @@ func StreamServerInterceptor(opts ...Option) grpc.StreamServerInterceptor {
}
return func(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) (err error) {
ctx := ss.Context()

// if we've enabled call tracing, create a span
if cfg.traceStreamCalls {
if _, ok := cfg.ignoredMethods[info.FullMethod]; cfg.traceStreamCalls && !ok {
var span ddtrace.Span
span, ctx = startSpanFromContext(
ctx,
Expand Down Expand Up @@ -116,6 +115,9 @@ func UnaryServerInterceptor(opts ...Option) grpc.UnaryServerInterceptor {
fn(cfg)
}
return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
if _, ok := cfg.ignoredMethods[info.FullMethod]; ok {
return handler(ctx, req)
}
span, ctx := startSpanFromContext(
ctx,
info.FullMethod,
Expand Down

0 comments on commit 7704362

Please sign in to comment.