From 5ed71daf270b28f478982538e4136379879e4c02 Mon Sep 17 00:00:00 2001 From: Alexander Tumin Date: Sat, 27 Jun 2020 22:46:50 +0300 Subject: [PATCH] Zipkin-specific observer support closes #156 --- observer.go | 56 ++++++++++++++++++++ span.go | 131 ++++++++++++++++++++++++++++++++++++---------- tracer.go | 51 ++++++++++-------- tracer_options.go | 12 ++++- tracer_test.go | 82 ++++++++++++++++++++++++++--- 5 files changed, 271 insertions(+), 61 deletions(-) create mode 100644 observer.go diff --git a/observer.go b/observer.go new file mode 100644 index 0000000..cfde4e8 --- /dev/null +++ b/observer.go @@ -0,0 +1,56 @@ +package zipkintracer + +import ( + "time" + + "github.com/openzipkin/zipkin-go" + "github.com/openzipkin/zipkin-go/model" +) + +// ZipkinStartSpanOptions allows ZipkinObserver.OnStartSpan() to inspect +// options used during zipkin.Span creation +type ZipkinStartSpanOptions struct { + // Parent span context reference, if any + Parent *model.SpanContext + + // Span's start time + StartTime time.Time + + // Kind clarifies context of timestamp, duration and remoteEndpoint in a span. + Kind model.Kind + + // Tags used during span creation + Tags map[string]string + + // RemoteEndpoint used during span creation + RemoteEndpoint *model.Endpoint +} + +// ZipkinObserver may be registered with a Tracer to receive notifications about new Spans +type ZipkinObserver interface { + // OnStartSpan is called when new Span is created. Creates and returns span observer. + // If the observer is not interested in the given span, it must return nil. + OnStartSpan(sp zipkin.Span, operationName string, options *ZipkinStartSpanOptions) ZipkinSpanObserver +} + +// ZipkinSpanObserver is created by the ZipkinObserver and receives notifications about +// other Span events. +type ZipkinSpanObserver interface { + // Callback called from zipkin.Span.SetName() + OnSetName(operationName string) + + // Callback called from zipkin.Span.SetTag() + OnSetTag(key, value string) + + // Callback called from zipkin.Span.SetRemoteEndpoint() + OnSetRemoteEndpoint(remote *model.Endpoint) + + // Callback called from zipkin.Span.Annotate() + OnAnnotate(t time.Time, annotation string) + + // Callback called from zipkin.Span.Finish() + OnFinish() + + // Callback called from zipkin.Span.FinishedWithDuration() + OnFinishedWithDuration(dur time.Duration) +} diff --git a/span.go b/span.go index fe7915d..4dd6f1e 100644 --- a/span.go +++ b/span.go @@ -16,6 +16,7 @@ package zipkintracer import ( "fmt" + "net" "time" otobserver "github.com/opentracing-contrib/go-observer" @@ -25,16 +26,12 @@ import ( "github.com/openzipkin/zipkin-go" ) -// FinisherWithDuration allows to finish span with given duration -type FinisherWithDuration interface { - FinishedWithDuration(d time.Duration) -} - type spanImpl struct { - tracer *tracerImpl - zipkinSpan zipkin.Span - startTime time.Time - observer otobserver.SpanObserver + tracer *tracerImpl + zipkinSpan zipkin.Span + observer otobserver.SpanObserver + zipkinObserver ZipkinSpanObserver + options ZipkinStartSpanOptions } func (s *spanImpl) SetOperationName(operationName string) opentracing.Span { @@ -42,32 +39,75 @@ func (s *spanImpl) SetOperationName(operationName string) opentracing.Span { s.observer.OnSetOperationName(operationName) } + if s.zipkinObserver != nil { + s.zipkinObserver.OnSetName(operationName) + } + s.zipkinSpan.SetName(operationName) return s } func (s *spanImpl) SetTag(key string, value interface{}) opentracing.Span { + strValue := fmt.Sprint(value) + if s.observer != nil { s.observer.OnSetTag(key, value) } if key == string(ext.SamplingPriority) { + + return s + } + + if key == string(ext.SpanKind) { + + return s + } + + endpointChanged := false + + switch { + case key == string(ext.SamplingPriority): // there are no means for now to change the sampling decision // but when finishedSpanHandler is in place we could change this. return s + case key == string(ext.SpanKind): + // this tag is translated into kind which can + // only be set on span creation + return s + case key == string(ext.PeerService): + serviceName, _ := value.(string) + s.options.RemoteEndpoint.ServiceName = serviceName + endpointChanged = true + case key == string(ext.PeerHostIPv4): + ipv4, _ := value.(string) + s.options.RemoteEndpoint.IPv4 = net.ParseIP(ipv4) + endpointChanged = true + case key == string(ext.PeerHostIPv6): + ipv6, _ := value.(string) + s.options.RemoteEndpoint.IPv6 = net.ParseIP(ipv6) + endpointChanged = true + case key == string(ext.PeerPort): + port, _ := value.(uint16) + s.options.RemoteEndpoint.Port = port + endpointChanged = true } - if key == string(ext.SpanKind) || - key == string(ext.PeerService) || - key == string(ext.PeerHostIPv4) || - key == string(ext.PeerHostIPv6) || - key == string(ext.PeerPort) { - // this tags are translated into kind and remoteEndpoint which can - // only be set on span creation + if endpointChanged { + s.zipkinSpan.SetRemoteEndpoint(s.options.RemoteEndpoint) + + if s.zipkinObserver != nil { + s.zipkinObserver.OnSetRemoteEndpoint(s.options.RemoteEndpoint) + } + return s } - s.zipkinSpan.Tag(key, fmt.Sprint(value)) + if s.zipkinObserver != nil { + s.zipkinObserver.OnSetTag(key, strValue) + } + + s.zipkinSpan.Tag(key, strValue) return s } @@ -78,7 +118,14 @@ func (s *spanImpl) LogKV(keyValues ...interface{}) { } for _, field := range fields { - s.zipkinSpan.Annotate(time.Now(), field.String()) + t := time.Now() + fieldValue := field.String() + + if s.zipkinObserver != nil { + s.zipkinObserver.OnAnnotate(t, fieldValue) + } + + s.zipkinSpan.Annotate(t, fieldValue) } } @@ -88,7 +135,13 @@ func (s *spanImpl) LogFields(fields ...log.Field) { func (s *spanImpl) logFields(t time.Time, fields ...log.Field) { for _, field := range fields { - s.zipkinSpan.Annotate(t, field.String()) + annotation := field.String() + + if s.zipkinObserver != nil { + s.zipkinObserver.OnAnnotate(t, annotation) + } + + s.zipkinSpan.Annotate(t, annotation) } } @@ -110,7 +163,13 @@ func (s *spanImpl) Log(ld opentracing.LogData) { ld.Timestamp = time.Now() } - s.zipkinSpan.Annotate(ld.Timestamp, fmt.Sprintf("%s:%s", ld.Event, ld.Payload)) + annotation := fmt.Sprintf("%s:%s", ld.Event, ld.Payload) + + if s.zipkinObserver != nil { + s.zipkinObserver.OnAnnotate(ld.Timestamp, annotation) + } + + s.zipkinSpan.Annotate(ld.Timestamp, annotation) } func (s *spanImpl) Finish() { @@ -118,28 +177,42 @@ func (s *spanImpl) Finish() { s.observer.OnFinish(opentracing.FinishOptions{}) } + if s.zipkinObserver != nil { + s.zipkinObserver.OnFinish() + } + s.zipkinSpan.Finish() } func (s *spanImpl) FinishWithOptions(opts opentracing.FinishOptions) { - if s.observer != nil { - s.observer.OnFinish(opts) - } - for _, lr := range opts.LogRecords { s.logFields(lr.Timestamp, lr.Fields...) } if !opts.FinishTime.IsZero() { - f, ok := s.zipkinSpan.(FinisherWithDuration) - if !ok { - return + dur := opts.FinishTime.Sub(s.options.StartTime) + + if s.observer != nil { + s.observer.OnFinish(opts) + } + + if s.zipkinObserver != nil { + s.zipkinObserver.OnFinishedWithDuration(dur) } - f.FinishedWithDuration(opts.FinishTime.Sub(s.startTime)) + + s.zipkinSpan.FinishedWithDuration(dur) return } - s.Finish() + if s.observer != nil { + s.observer.OnFinish(opts) + } + + if s.zipkinObserver != nil { + s.zipkinObserver.OnFinish() + } + + s.zipkinSpan.Finish() } func (s *spanImpl) Tracer() opentracing.Tracer { diff --git a/tracer.go b/tracer.go index b48d92a..892d625 100644 --- a/tracer.go +++ b/tracer.go @@ -58,43 +58,47 @@ func (t *tracerImpl) StartSpan(operationName string, opts ...opentracing.StartSp zopts := make([]zipkin.SpanOption, 0) + sp := &spanImpl{ + tracer: t, + } + // Parent if len(startSpanOptions.References) > 0 { parent, ok := (startSpanOptions.References[0].ReferencedContext).(SpanContext) if ok { zopts = append(zopts, zipkin.Parent(model.SpanContext(parent))) + sp.options.Parent = (*model.SpanContext)(&parent) } } - startTime := time.Now() // Time + sp.options.StartTime = time.Now() if !startSpanOptions.StartTime.IsZero() { - zopts = append(zopts, zipkin.StartTime(startSpanOptions.StartTime)) - startTime = startSpanOptions.StartTime + sp.options.StartTime = startSpanOptions.StartTime + zopts = append(zopts, zipkin.StartTime(sp.options.StartTime)) } - zopts = append(zopts, parseTagsAsZipkinOptions(startSpanOptions.Tags)...) + zopts = append(zopts, parseTagsAsZipkinOptions(startSpanOptions.Tags, &sp.options)...) - newSpan := t.zipkinTracer.StartSpan(operationName, zopts...) + sp.zipkinSpan = t.zipkinTracer.StartSpan(operationName, zopts...) - sp := &spanImpl{ - zipkinSpan: newSpan, - tracer: t, - startTime: startTime, - } if t.opts.observer != nil { observer, _ := t.opts.observer.OnStartSpan(sp, operationName, startSpanOptions) sp.observer = observer } + if t.opts.zipkinObserver != nil { + sp.zipkinObserver = t.opts.zipkinObserver.OnStartSpan(sp.zipkinSpan, operationName, &sp.options) + } + return sp } -func parseTagsAsZipkinOptions(t map[string]interface{}) []zipkin.SpanOption { +func parseTagsAsZipkinOptions(t map[string]interface{}, options *ZipkinStartSpanOptions) []zipkin.SpanOption { zopts := make([]zipkin.SpanOption, 0) - tags := map[string]string{} - remoteEndpoint := &model.Endpoint{} + options.Tags = map[string]string{} + options.RemoteEndpoint = &model.Endpoint{} var kind string if val, ok := t[string(ext.SpanKind)]; ok { @@ -112,29 +116,30 @@ func parseTagsAsZipkinOptions(t map[string]interface{}) []zipkin.SpanOption { mKind == model.Producer || mKind == model.Consumer { zopts = append(zopts, zipkin.Kind(mKind)) + options.Kind = mKind } else { - tags["span.kind"] = kind + options.Tags["span.kind"] = kind } } if val, ok := t[string(ext.PeerService)]; ok { serviceName, _ := val.(string) - remoteEndpoint.ServiceName = serviceName + options.RemoteEndpoint.ServiceName = serviceName } if val, ok := t[string(ext.PeerHostIPv4)]; ok { ipv4, _ := val.(string) - remoteEndpoint.IPv4 = net.ParseIP(ipv4) + options.RemoteEndpoint.IPv4 = net.ParseIP(ipv4) } if val, ok := t[string(ext.PeerHostIPv6)]; ok { ipv6, _ := val.(string) - remoteEndpoint.IPv6 = net.ParseIP(ipv6) + options.RemoteEndpoint.IPv6 = net.ParseIP(ipv6) } if val, ok := t[string(ext.PeerPort)]; ok { port, _ := val.(uint16) - remoteEndpoint.Port = port + options.RemoteEndpoint.Port = port } for key, val := range t { @@ -146,15 +151,15 @@ func parseTagsAsZipkinOptions(t map[string]interface{}) []zipkin.SpanOption { continue } - tags[key] = fmt.Sprint(val) + options.Tags[key] = fmt.Sprint(val) } - if len(tags) > 0 { - zopts = append(zopts, zipkin.Tags(tags)) + if len(options.Tags) > 0 { + zopts = append(zopts, zipkin.Tags(options.Tags)) } - if !remoteEndpoint.Empty() { - zopts = append(zopts, zipkin.RemoteEndpoint(remoteEndpoint)) + if !options.RemoteEndpoint.Empty() { + zopts = append(zopts, zipkin.RemoteEndpoint(options.RemoteEndpoint)) } return zopts diff --git a/tracer_options.go b/tracer_options.go index 190a692..ae2e312 100644 --- a/tracer_options.go +++ b/tracer_options.go @@ -31,8 +31,9 @@ const ( // TracerOptions allows creating a customized Tracer. type TracerOptions struct { - observer otobserver.Observer - b3InjectOpt B3InjectOption + observer otobserver.Observer + b3InjectOpt B3InjectOption + zipkinObserver ZipkinObserver } // TracerOption allows for functional options. @@ -46,6 +47,13 @@ func WithObserver(observer otobserver.Observer) TracerOption { } } +// WithZipkinObserver assigns an initialized zipkin observer to opts.zipkinObserver +func WithZipkinObserver(zipkinObserver ZipkinObserver) TracerOption { + return func(opts *TracerOptions) { + opts.zipkinObserver = zipkinObserver + } +} + // WithB3InjectOption sets the B3 injection style if using the native OpenTracing HTTPHeadersCarrier func WithB3InjectOption(b3InjectOption B3InjectOption) TracerOption { return func(opts *TracerOptions) { diff --git a/tracer_test.go b/tracer_test.go index 0c991d0..d95d557 100644 --- a/tracer_test.go +++ b/tracer_test.go @@ -38,7 +38,9 @@ func TestOTKindTagIsParsedSuccessfuly(t *testing.T) { {"span.kind": ext.SpanKindRPCServerEnum}, } for _, tags := range tagCases { - opts := parseTagsAsZipkinOptions(tags) + var zipkinStartSpanOptions ZipkinStartSpanOptions + + opts := parseTagsAsZipkinOptions(tags, &zipkinStartSpanOptions) rec := recorder.NewReporter() tr, _ := zipkin.NewTracer(rec) @@ -52,12 +54,34 @@ func TestOTKindTagIsParsedSuccessfuly(t *testing.T) { if want, have := model.Server, spans[0].Kind; want != have { t.Errorf("unexpected kind value, want %s, have %s", want, have) } + + if zipkinStartSpanOptions.Tags == nil { + t.Errorf("unexpected start options tags value, want non-nil map, have %v", zipkinStartSpanOptions.Tags) + } + + if len(zipkinStartSpanOptions.Tags) != 0 { + t.Errorf("unexpected start options tags value, want empty map, have %v", zipkinStartSpanOptions.Tags) + } + + if zipkinStartSpanOptions.RemoteEndpoint == nil { + t.Errorf("unexpected start options remote endpoint value, want non-nil instance, have %v", zipkinStartSpanOptions.RemoteEndpoint) + } + + if !zipkinStartSpanOptions.RemoteEndpoint.Empty() { + t.Errorf("unexpected start options remote endpoint value, want empty instance, have %v", zipkinStartSpanOptions.RemoteEndpoint) + } + + if want, have := model.Server, zipkinStartSpanOptions.Kind; want != have { + t.Errorf("unexpected start options kind value, want %s, have %s", want, have) + } } } func TestOTKindTagIsCantBeParsed(t *testing.T) { + var zipkinStartSpanOptions ZipkinStartSpanOptions + tags := map[string]interface{}{"span.kind": "banana"} - opts := parseTagsAsZipkinOptions(tags) + opts := parseTagsAsZipkinOptions(tags, &zipkinStartSpanOptions) rec := recorder.NewReporter() tr, _ := zipkin.NewTracer(rec) @@ -75,13 +99,33 @@ func TestOTKindTagIsCantBeParsed(t *testing.T) { if want, have := "banana", spans[0].Tags["span.kind"]; want != have { t.Errorf("unexpected tag value, want %s, have %s", want, have) } + + if zipkinStartSpanOptions.Tags == nil { + t.Errorf("unexpected start options tags value, want non-nil map, have %v", zipkinStartSpanOptions.Tags) + } + + if len(zipkinStartSpanOptions.Tags) == 0 { + t.Errorf("unexpected start options tags value, want non-empty map, have %v", zipkinStartSpanOptions.Tags) + } + + if want, have := "banana", zipkinStartSpanOptions.Tags["span.kind"]; want != have { + t.Errorf("unexpected start options tags[span.kind] value, want %s, have %s", want, have) + } } func TestOptionsFromOTTags(t *testing.T) { + var zipkinStartSpanOptions ZipkinStartSpanOptions + + const ( + sServiceA = "service_a" + sValue = "value" + sKey = "key" + ) + tags := map[string]interface{}{} - tags[string(ext.PeerService)] = "service_a" - tags["key"] = "value" - opts := parseTagsAsZipkinOptions(tags) + tags[string(ext.PeerService)] = sServiceA + tags[sKey] = sValue + opts := parseTagsAsZipkinOptions(tags, &zipkinStartSpanOptions) rec := recorder.NewReporter() tr, _ := zipkin.NewTracer(rec) @@ -92,11 +136,35 @@ func TestOptionsFromOTTags(t *testing.T) { t.Fatalf("unexpected number of spans, want %d, have %d", want, have) } - if want, have := "service_a", spans[0].RemoteEndpoint.ServiceName; want != have { + if want, have := sServiceA, spans[0].RemoteEndpoint.ServiceName; want != have { t.Errorf("unexpected remote service name, want %s, have %s", want, have) } - if want, have := "value", spans[0].Tags["key"]; want != have { + if want, have := sValue, spans[0].Tags[sKey]; want != have { t.Errorf("unexpected tag value, want %s, have %s", want, have) } + + if zipkinStartSpanOptions.Tags == nil { + t.Errorf("unexpected start options tags value, want non-nil map, have %s", zipkinStartSpanOptions.Tags) + } + + if len(zipkinStartSpanOptions.Tags) == 0 { + t.Errorf("unexpected start options tags value, want non-empty map, have %s", zipkinStartSpanOptions.Tags) + } + + if want, have := sValue, zipkinStartSpanOptions.Tags[sKey]; want != have { + t.Errorf("unexpected start options tags[key] value, want %s, have %s", want, have) + } + + if zipkinStartSpanOptions.RemoteEndpoint == nil { + t.Errorf("unexpected start options remote endpoint value, want non-nil instance, have %v", zipkinStartSpanOptions.RemoteEndpoint) + } + + if zipkinStartSpanOptions.RemoteEndpoint.Empty() { + t.Errorf("unexpected start options remote endpoint value, want non-empty instance, have %v", zipkinStartSpanOptions.RemoteEndpoint) + } + + if want, have := sServiceA, zipkinStartSpanOptions.RemoteEndpoint.ServiceName; want != have { + t.Errorf("unexpected start options remote service name, want %s, have %s", want, have) + } }