Skip to content

Commit

Permalink
Zipkin-specific observer support
Browse files Browse the repository at this point in the history
closes #156
  • Loading branch information
iamtakingiteasy committed Jun 27, 2020
1 parent 223664c commit 6cb8f48
Show file tree
Hide file tree
Showing 5 changed files with 228 additions and 41 deletions.
53 changes: 53 additions & 0 deletions observer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package zipkintracer

import (
"time"

"github.com/openzipkin/zipkin-go"
"github.com/openzipkin/zipkin-go/model"
)

// ZipkinStartSpanOptions allows ZipkinObserver.OnStartSpan() to inspect
// options used during zipkin.Span creation
type ZipkinStartSpanOptions struct {
// Parent span context reference, if any
Parent *model.SpanContext

// Span's start time
StartTime time.Time

// Kind clarifies context of timestamp, duration and remoteEndpoint in a span.
Kind model.Kind

// Tags used during span creation
Tags map[string]string

// RemoteEndpoint used during span creation
RemoteEndpoint *model.Endpoint
}

// ZipkinObserver may be registered with a Tracer to receive notifications about new Spans
type ZipkinObserver interface {
// OnStartSpan is called when new Span is created. Creates and returns span observer.
// If the observer is not interested in the given span, it must return nil.
OnStartSpan(sp zipkin.Span, operationName string, options *ZipkinStartSpanOptions) ZipkinSpanObserver
}

// ZipkinSpanObserver is created by the ZipkinObserver and receives notifications about
// other Span events.
type ZipkinSpanObserver interface {
// Callback called from zipkin.Span.SetName()
OnSetName(operationName string)

// Callback called from zipkin.Span.SetTag()
OnSetTag(key, value string)

// Callback called from zipkin.Span.Annotate()
OnAnnotate(t time.Time, annotation string)

// Callback called from zipkin.Span.Finish()
OnFinish()

// Callback called from zipkin.Span.FinishedWithDuration()
OnFinishedWithDuration(dur time.Duration)
}
77 changes: 63 additions & 14 deletions span.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,22 +31,29 @@ type FinisherWithDuration interface {
}

type spanImpl struct {
tracer *tracerImpl
zipkinSpan zipkin.Span
startTime time.Time
observer otobserver.SpanObserver
tracer *tracerImpl
zipkinSpan zipkin.Span
startTime time.Time
observer otobserver.SpanObserver
zipkinObserver ZipkinSpanObserver
}

func (s *spanImpl) SetOperationName(operationName string) opentracing.Span {
if s.observer != nil {
s.observer.OnSetOperationName(operationName)
}

if s.zipkinObserver != nil {
s.zipkinObserver.OnSetName(operationName)
}

s.zipkinSpan.SetName(operationName)
return s
}

func (s *spanImpl) SetTag(key string, value interface{}) opentracing.Span {
strValue := fmt.Sprint(value)

if s.observer != nil {
s.observer.OnSetTag(key, value)
}
Expand All @@ -67,7 +74,11 @@ func (s *spanImpl) SetTag(key string, value interface{}) opentracing.Span {
return s
}

s.zipkinSpan.Tag(key, fmt.Sprint(value))
if s.zipkinObserver != nil {
s.zipkinObserver.OnSetTag(key, strValue)
}

s.zipkinSpan.Tag(key, strValue)
return s
}

Expand All @@ -78,7 +89,14 @@ func (s *spanImpl) LogKV(keyValues ...interface{}) {
}

for _, field := range fields {
s.zipkinSpan.Annotate(time.Now(), field.String())
t := time.Now()
fieldValue := field.String()

if s.zipkinObserver != nil {
s.zipkinObserver.OnAnnotate(t, fieldValue)
}

s.zipkinSpan.Annotate(t, fieldValue)
}
}

Expand All @@ -88,7 +106,13 @@ func (s *spanImpl) LogFields(fields ...log.Field) {

func (s *spanImpl) logFields(t time.Time, fields ...log.Field) {
for _, field := range fields {
s.zipkinSpan.Annotate(t, field.String())
annotation := field.String()

if s.zipkinObserver != nil {
s.zipkinObserver.OnAnnotate(t, annotation)
}

s.zipkinSpan.Annotate(t, annotation)
}
}

Expand All @@ -110,22 +134,28 @@ func (s *spanImpl) Log(ld opentracing.LogData) {
ld.Timestamp = time.Now()
}

s.zipkinSpan.Annotate(ld.Timestamp, fmt.Sprintf("%s:%s", ld.Event, ld.Payload))
annotation := fmt.Sprintf("%s:%s", ld.Event, ld.Payload)

if s.zipkinObserver != nil {
s.zipkinObserver.OnAnnotate(ld.Timestamp, annotation)
}

s.zipkinSpan.Annotate(ld.Timestamp, annotation)
}

func (s *spanImpl) Finish() {
if s.observer != nil {
s.observer.OnFinish(opentracing.FinishOptions{})
}

if s.zipkinObserver != nil {
s.zipkinObserver.OnFinish()
}

s.zipkinSpan.Finish()
}

func (s *spanImpl) FinishWithOptions(opts opentracing.FinishOptions) {
if s.observer != nil {
s.observer.OnFinish(opts)
}

for _, lr := range opts.LogRecords {
s.logFields(lr.Timestamp, lr.Fields...)
}
Expand All @@ -135,11 +165,30 @@ func (s *spanImpl) FinishWithOptions(opts opentracing.FinishOptions) {
if !ok {
return
}
f.FinishedWithDuration(opts.FinishTime.Sub(s.startTime))

dur := opts.FinishTime.Sub(s.startTime)

if s.observer != nil {
s.observer.OnFinish(opts)
}

if s.zipkinObserver != nil {
s.zipkinObserver.OnFinishedWithDuration(dur)
}

f.FinishedWithDuration(dur)
return
}

s.Finish()
if s.observer != nil {
s.observer.OnFinish(opts)
}

if s.zipkinObserver != nil {
s.zipkinObserver.OnFinish()
}

s.zipkinSpan.Finish()
}

func (s *spanImpl) Tracer() opentracing.Tracer {
Expand Down
45 changes: 27 additions & 18 deletions tracer.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,43 +58,51 @@ func (t *tracerImpl) StartSpan(operationName string, opts ...opentracing.StartSp

zopts := make([]zipkin.SpanOption, 0)

var zipkinStartSpanOptions ZipkinStartSpanOptions

// Parent
if len(startSpanOptions.References) > 0 {
parent, ok := (startSpanOptions.References[0].ReferencedContext).(SpanContext)
if ok {
zopts = append(zopts, zipkin.Parent(model.SpanContext(parent)))
zipkinStartSpanOptions.Parent = (*model.SpanContext)(&parent)
}
}

startTime := time.Now()
// Time
zipkinStartSpanOptions.StartTime = time.Now()
if !startSpanOptions.StartTime.IsZero() {
zopts = append(zopts, zipkin.StartTime(startSpanOptions.StartTime))
startTime = startSpanOptions.StartTime
zipkinStartSpanOptions.StartTime = startSpanOptions.StartTime
zopts = append(zopts, zipkin.StartTime(zipkinStartSpanOptions.StartTime))
}

zopts = append(zopts, parseTagsAsZipkinOptions(startSpanOptions.Tags)...)
zopts = append(zopts, parseTagsAsZipkinOptions(startSpanOptions.Tags, &zipkinStartSpanOptions)...)

newSpan := t.zipkinTracer.StartSpan(operationName, zopts...)

sp := &spanImpl{
zipkinSpan: newSpan,
tracer: t,
startTime: startTime,
startTime: zipkinStartSpanOptions.StartTime,
}

if t.opts.observer != nil {
observer, _ := t.opts.observer.OnStartSpan(sp, operationName, startSpanOptions)
sp.observer = observer
}

if t.opts.zipkinObserver != nil {
sp.zipkinObserver = t.opts.zipkinObserver.OnStartSpan(sp.zipkinSpan, operationName, &zipkinStartSpanOptions)
}

return sp
}

func parseTagsAsZipkinOptions(t map[string]interface{}) []zipkin.SpanOption {
func parseTagsAsZipkinOptions(t map[string]interface{}, options *ZipkinStartSpanOptions) []zipkin.SpanOption {
zopts := make([]zipkin.SpanOption, 0)

tags := map[string]string{}
remoteEndpoint := &model.Endpoint{}
options.Tags = map[string]string{}
options.RemoteEndpoint = &model.Endpoint{}

var kind string
if val, ok := t[string(ext.SpanKind)]; ok {
Expand All @@ -112,29 +120,30 @@ func parseTagsAsZipkinOptions(t map[string]interface{}) []zipkin.SpanOption {
mKind == model.Producer ||
mKind == model.Consumer {
zopts = append(zopts, zipkin.Kind(mKind))
options.Kind = mKind
} else {
tags["span.kind"] = kind
options.Tags["span.kind"] = kind
}
}

if val, ok := t[string(ext.PeerService)]; ok {
serviceName, _ := val.(string)
remoteEndpoint.ServiceName = serviceName
options.RemoteEndpoint.ServiceName = serviceName
}

if val, ok := t[string(ext.PeerHostIPv4)]; ok {
ipv4, _ := val.(string)
remoteEndpoint.IPv4 = net.ParseIP(ipv4)
options.RemoteEndpoint.IPv4 = net.ParseIP(ipv4)
}

if val, ok := t[string(ext.PeerHostIPv6)]; ok {
ipv6, _ := val.(string)
remoteEndpoint.IPv6 = net.ParseIP(ipv6)
options.RemoteEndpoint.IPv6 = net.ParseIP(ipv6)
}

if val, ok := t[string(ext.PeerPort)]; ok {
port, _ := val.(uint16)
remoteEndpoint.Port = port
options.RemoteEndpoint.Port = port
}

for key, val := range t {
Expand All @@ -146,15 +155,15 @@ func parseTagsAsZipkinOptions(t map[string]interface{}) []zipkin.SpanOption {
continue
}

tags[key] = fmt.Sprint(val)
options.Tags[key] = fmt.Sprint(val)
}

if len(tags) > 0 {
zopts = append(zopts, zipkin.Tags(tags))
if len(options.Tags) > 0 {
zopts = append(zopts, zipkin.Tags(options.Tags))
}

if !remoteEndpoint.Empty() {
zopts = append(zopts, zipkin.RemoteEndpoint(remoteEndpoint))
if !options.RemoteEndpoint.Empty() {
zopts = append(zopts, zipkin.RemoteEndpoint(options.RemoteEndpoint))
}

return zopts
Expand Down
12 changes: 10 additions & 2 deletions tracer_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,9 @@ const (

// TracerOptions allows creating a customized Tracer.
type TracerOptions struct {
observer otobserver.Observer
b3InjectOpt B3InjectOption
observer otobserver.Observer
b3InjectOpt B3InjectOption
zipkinObserver ZipkinObserver
}

// TracerOption allows for functional options.
Expand All @@ -46,6 +47,13 @@ func WithObserver(observer otobserver.Observer) TracerOption {
}
}

// WithZipkinObserver assigns an initialized zipkin observer to opts.zipkinObserver
func WithZipkinObserver(zipkinObserver ZipkinObserver) TracerOption {
return func(opts *TracerOptions) {
opts.zipkinObserver = zipkinObserver
}
}

// WithB3InjectOption sets the B3 injection style if using the native OpenTracing HTTPHeadersCarrier
func WithB3InjectOption(b3InjectOption B3InjectOption) TracerOption {
return func(opts *TracerOptions) {
Expand Down
Loading

0 comments on commit 6cb8f48

Please sign in to comment.