Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replace Opentracing with OpenTelemetry on v1 #776

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 8 additions & 6 deletions example/amqp/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,18 @@ import (

"github.com/google/uuid"
"github.com/urfave/cli"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/baggage"

"github.com/RichardKnop/machinery/v1"
"github.com/RichardKnop/machinery/v1/config"
"github.com/RichardKnop/machinery/v1/log"
"github.com/RichardKnop/machinery/v1/tasks"
"github.com/RichardKnop/machinery/v1/tracing"

exampletasks "github.com/RichardKnop/machinery/example/tasks"
tracers "github.com/RichardKnop/machinery/example/tracers"
opentracing "github.com/opentracing/opentracing-go"
opentracing_log "github.com/opentracing/opentracing-go/log"
)

var (
Expand Down Expand Up @@ -266,12 +268,12 @@ func send() error {
* set a batch id as baggage so it can travel all the way into
* the worker functions.
*/
span, ctx := opentracing.StartSpanFromContext(context.Background(), "send")
defer span.Finish()
ctx, span := otel.Tracer(tracing.MachineryTraceName).Start(context.Background(), "send")
defer span.End()

batchID := uuid.New().String()
span.SetBaggageItem("batch.id", batchID)
span.LogFields(opentracing_log.String("batch.id", batchID))
baggage.ContextWithBaggage(ctx, "batch_id", batchID)
span.SetAttributes(attribute.String("batch_id", batchID))

log.INFO.Println("Starting batch:", batchID)
/*
Expand Down
14 changes: 8 additions & 6 deletions example/redis/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,18 @@ import (

"github.com/google/uuid"
"github.com/urfave/cli"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/baggage"

"github.com/RichardKnop/machinery/v1"
"github.com/RichardKnop/machinery/v1/config"
"github.com/RichardKnop/machinery/v1/log"
"github.com/RichardKnop/machinery/v1/tasks"
"github.com/RichardKnop/machinery/v1/tracing"

exampletasks "github.com/RichardKnop/machinery/example/tasks"
tracers "github.com/RichardKnop/machinery/example/tracers"
opentracing "github.com/opentracing/opentracing-go"
opentracing_log "github.com/opentracing/opentracing-go/log"
)

var (
Expand Down Expand Up @@ -269,12 +271,12 @@ func send() error {
* set a batch id as baggage so it can travel all the way into
* the worker functions.
*/
span, ctx := opentracing.StartSpanFromContext(context.Background(), "send")
defer span.Finish()
ctx, span := otel.Tracer(tracing.MachineryTraceName).Start(context.Background(), "send")
defer span.End()

batchID := uuid.New().String()
span.SetBaggageItem("batch.id", batchID)
span.LogFields(opentracing_log.String("batch.id", batchID))
baggage.ContextWithBaggage(ctx, "batch_id", batchID)
span.SetAttributes(attribute.String("batch_id", batchID))

log.INFO.Println("Starting batch:", batchID)
/*
Expand Down
8 changes: 5 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,24 +9,26 @@ require (
github.com/aws/aws-sdk-go v1.37.16
github.com/bradfitz/gomemcache v0.0.0-20190913173617-a41fca850d0b
github.com/cpuguy83/go-md2man/v2 v2.0.0 // indirect
github.com/go-redis/redis/v8 v8.6.0
github.com/go-redis/redis/extra/redisotel/v8 v8.11.5
github.com/go-redis/redis/v8 v8.11.5
github.com/go-redsync/redsync/v4 v4.0.4
github.com/golang/snappy v0.0.2 // indirect
github.com/gomodule/redigo v2.0.0+incompatible
github.com/google/uuid v1.2.0
github.com/hashicorp/errwrap v1.1.0 // indirect
github.com/kelseyhightower/envconfig v1.4.0
github.com/klauspost/compress v1.11.7 // indirect
github.com/opentracing/opentracing-go v1.2.0
github.com/pkg/errors v0.9.1
github.com/robfig/cron/v3 v3.0.1
github.com/russross/blackfriday/v2 v2.1.0 // indirect
github.com/streadway/amqp v1.0.0
github.com/stretchr/testify v1.7.0
github.com/stretchr/testify v1.7.1
github.com/urfave/cli v1.22.5
github.com/xdg/stringprep v1.0.0 // indirect
go.mongodb.org/mongo-driver v1.4.6
go.opencensus.io v0.22.6 // indirect
go.opentelemetry.io/otel v1.8.0
go.opentelemetry.io/otel/trace v1.8.0
golang.org/x/crypto v0.0.0-20201221181555-eec23a3978ad // indirect
golang.org/x/oauth2 v0.0.0-20210201163806-010130855d6c // indirect
gopkg.in/yaml.v2 v2.4.0
Expand Down
94 changes: 55 additions & 39 deletions go.sum

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions v1/backends/redis/goredis.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"sync"
"time"

"github.com/go-redis/redis/extra/redisotel/v8"
"github.com/go-redis/redis/v8"
"github.com/go-redsync/redsync/v4"
redsyncgoredis "github.com/go-redsync/redsync/v4/redis/goredis/v8"
Expand Down Expand Up @@ -59,6 +60,7 @@ func NewGR(cnf *config.Config, addrs []string, db int) iface.Backend {

b.rclient = redis.NewUniversalClient(ropt)
}
b.rclient.AddHook(redisotel.NewTracingHook())
b.redsync = redsync.New(redsyncgoredis.NewPool(b.rclient))
return b
}
Expand Down
15 changes: 13 additions & 2 deletions v1/brokers/amqp/amqp.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"context"
"encoding/json"
"fmt"
"net/http"
"sync"
"time"

Expand Down Expand Up @@ -230,7 +231,7 @@ func (b *Broker) Publish(ctx context.Context, signature *tasks.Signature) error
false, // mandatory
false, // immediate
amqp.Publishing{
Headers: amqp.Table(signature.Headers),
Headers: convertHeaders(signature.Headers),
ContentType: "application/json",
Body: msg,
Priority: signature.Priority,
Expand Down Expand Up @@ -398,7 +399,7 @@ func (b *Broker) delay(signature *tasks.Signature, delayMs int64) error {
false, // mandatory
false, // immediate
amqp.Publishing{
Headers: amqp.Table(signature.Headers),
Headers: convertHeaders(signature.Headers),
ContentType: "application/json",
Body: message,
DeliveryMode: amqp.Persistent,
Expand Down Expand Up @@ -490,3 +491,13 @@ func (b *Broker) GetPendingTasks(queue string) ([]*tasks.Signature, error) {

return dumper.Signatures, nil
}

func convertHeaders(headers http.Header) amqp.Table {
table := make(amqp.Table, len(headers))

for k, v := range headers {
table[k] = v
}

return table
}
5 changes: 3 additions & 2 deletions v1/brokers/amqp/amqp_concurrence_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,13 @@ package amqp

import (
"fmt"
"testing"
"time"

"github.com/RichardKnop/machinery/v1/brokers/iface"
"github.com/RichardKnop/machinery/v1/config"
"github.com/RichardKnop/machinery/v1/tasks"
"github.com/streadway/amqp"
"testing"
"time"
)

type doNothingProcessor struct{}
Expand Down
2 changes: 2 additions & 0 deletions v1/brokers/redis/goredis.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"sync"
"time"

"github.com/go-redis/redis/extra/redisotel/v8"
"github.com/go-redis/redis/v8"
"github.com/go-redsync/redsync/v4"

Expand Down Expand Up @@ -62,6 +63,7 @@ func NewGR(cnf *config.Config, addrs []string, db int) iface.Broker {
} else {
b.rclient = redis.NewUniversalClient(ropt)
}
b.rclient.AddHook(redisotel.NewTracingHook())

if cnf.Redis.DelayedTasksKey != "" {
b.redisDelayedTasksKey = cnf.Redis.DelayedTasksKey
Expand Down
28 changes: 15 additions & 13 deletions v1/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@ import (
"github.com/google/uuid"
"github.com/robfig/cron/v3"

"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/trace"

"github.com/RichardKnop/machinery/v1/backends/result"
"github.com/RichardKnop/machinery/v1/brokers/eager"
"github.com/RichardKnop/machinery/v1/config"
Expand All @@ -21,7 +24,6 @@ import (
backendsiface "github.com/RichardKnop/machinery/v1/backends/iface"
brokersiface "github.com/RichardKnop/machinery/v1/brokers/iface"
lockiface "github.com/RichardKnop/machinery/v1/locks/iface"
opentracing "github.com/opentracing/opentracing-go"
)

// Server is the main Machinery object and stores all configuration
Expand Down Expand Up @@ -179,11 +181,11 @@ func (server *Server) GetRegisteredTask(name string) (interface{}, error) {

// SendTaskWithContext will inject the trace context in the signature headers before publishing it
func (server *Server) SendTaskWithContext(ctx context.Context, signature *tasks.Signature) (*result.AsyncResult, error) {
span, _ := opentracing.StartSpanFromContext(ctx, "SendTask", tracing.ProducerOption(), tracing.MachineryTag)
defer span.Finish()
ctx, span := otel.Tracer(tracing.MachineryTraceName).Start(ctx, "SendTask", trace.WithSpanKind(trace.SpanKindProducer), trace.WithAttributes(tracing.MachineryTag))
defer span.End()

// tag the span with some info about the signature
signature.Headers = tracing.HeadersWithSpan(signature.Headers, span)
signature.Headers = tracing.HeadersWithSpan(ctx, signature.Headers)

// Make sure result backend is defined
if server.backend == nil {
Expand Down Expand Up @@ -219,10 +221,10 @@ func (server *Server) SendTask(signature *tasks.Signature) (*result.AsyncResult,

// SendChainWithContext will inject the trace context in all the signature headers before publishing it
func (server *Server) SendChainWithContext(ctx context.Context, chain *tasks.Chain) (*result.ChainAsyncResult, error) {
span, _ := opentracing.StartSpanFromContext(ctx, "SendChain", tracing.ProducerOption(), tracing.MachineryTag, tracing.WorkflowChainTag)
defer span.Finish()
ctx, span := otel.Tracer(tracing.MachineryTraceName).Start(ctx, "SendChain", trace.WithSpanKind(trace.SpanKindProducer), trace.WithAttributes(tracing.MachineryTag, tracing.WorkflowChainTag))
defer span.End()

tracing.AnnotateSpanWithChainInfo(span, chain)
tracing.AnnotateSpanWithChainInfo(ctx, span, chain)

return server.SendChain(chain)
}
Expand All @@ -239,10 +241,10 @@ func (server *Server) SendChain(chain *tasks.Chain) (*result.ChainAsyncResult, e

// SendGroupWithContext will inject the trace context in all the signature headers before publishing it
func (server *Server) SendGroupWithContext(ctx context.Context, group *tasks.Group, sendConcurrency int) ([]*result.AsyncResult, error) {
span, _ := opentracing.StartSpanFromContext(ctx, "SendGroup", tracing.ProducerOption(), tracing.MachineryTag, tracing.WorkflowGroupTag)
defer span.Finish()
ctx, span := otel.Tracer(tracing.MachineryTraceName).Start(ctx, "SendGroup", trace.WithSpanKind(trace.SpanKindProducer), trace.WithAttributes(tracing.MachineryTag, tracing.WorkflowGroupTag))
defer span.End()

tracing.AnnotateSpanWithGroupInfo(span, group, sendConcurrency)
tracing.AnnotateSpanWithGroupInfo(ctx, span, group, sendConcurrency)

// Make sure result backend is defined
if server.backend == nil {
Expand Down Expand Up @@ -320,10 +322,10 @@ func (server *Server) SendGroup(group *tasks.Group, sendConcurrency int) ([]*res

// SendChordWithContext will inject the trace context in all the signature headers before publishing it
func (server *Server) SendChordWithContext(ctx context.Context, chord *tasks.Chord, sendConcurrency int) (*result.ChordAsyncResult, error) {
span, _ := opentracing.StartSpanFromContext(ctx, "SendChord", tracing.ProducerOption(), tracing.MachineryTag, tracing.WorkflowChordTag)
defer span.Finish()
ctx, span := otel.Tracer(tracing.MachineryTraceName).Start(ctx, "SendChord", trace.WithSpanKind(trace.SpanKindProducer), trace.WithAttributes(tracing.MachineryTag, tracing.WorkflowChordTag))
defer span.End()

tracing.AnnotateSpanWithChordInfo(span, chord, sendConcurrency)
tracing.AnnotateSpanWithChordInfo(ctx, span, chord, sendConcurrency)

_, err := server.SendGroupWithContext(ctx, chord.Group, sendConcurrency)
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions v1/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import (
"testing"

"github.com/stretchr/testify/assert"

"github.com/RichardKnop/machinery/v1"
"github.com/RichardKnop/machinery/v1/config"
)
Expand Down Expand Up @@ -37,7 +37,7 @@ func TestRegisterTaskInRaceCondition(t *testing.T) {
t.Parallel()

server := getTestServer(t)
for i:=0; i<10; i++ {
for i := 0; i < 10; i++ {
go func() {
err := server.RegisterTask("test_task", func() error { return nil })
assert.NoError(t, err)
Expand Down
33 changes: 4 additions & 29 deletions v1/tasks/signature.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@ package tasks

import (
"fmt"
"github.com/RichardKnop/machinery/v1/utils"
"net/http"
"time"

"github.com/RichardKnop/machinery/v1/utils"

"github.com/google/uuid"
)

Expand All @@ -15,33 +17,6 @@ type Arg struct {
Value interface{} `bson:"value"`
}

// Headers represents the headers which should be used to direct the task
type Headers map[string]interface{}

// Set on Headers implements opentracing.TextMapWriter for trace propagation
func (h Headers) Set(key, val string) {
h[key] = val
}

// ForeachKey on Headers implements opentracing.TextMapReader for trace propagation.
// It is essentially the same as the opentracing.TextMapReader implementation except
// for the added casting from interface{} to string.
func (h Headers) ForeachKey(handler func(key, val string) error) error {
for k, v := range h {
// Skip any non string values
stringValue, ok := v.(string)
if !ok {
continue
}

if err := handler(k, stringValue); err != nil {
return err
}
}

return nil
}

// Signature represents a single task invocation
type Signature struct {
UUID string
Expand All @@ -51,7 +26,7 @@ type Signature struct {
GroupUUID string
GroupTaskCount int
Args []Arg
Headers Headers
Headers http.Header
Priority uint8
Immutable bool
RetryCount int
Expand Down
17 changes: 6 additions & 11 deletions v1/tasks/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,7 @@ import (
"reflect"
"runtime/debug"

opentracing "github.com/opentracing/opentracing-go"
opentracing_ext "github.com/opentracing/opentracing-go/ext"
opentracing_log "github.com/opentracing/opentracing-go/log"
"go.opentelemetry.io/otel/trace"

"github.com/RichardKnop/machinery/v1/log"
)
Expand Down Expand Up @@ -101,8 +99,9 @@ func New(taskFunc interface{}, args []Arg) (*Task, error) {
// 2. The task func itself returns a non-nil error.
func (t *Task) Call() (taskResults []*TaskResult, err error) {
// retrieve the span from the task's context and finish it as soon as this function returns
if span := opentracing.SpanFromContext(t.Context); span != nil {
defer span.Finish()

if span := trace.SpanFromContext(t.Context); span != nil {
defer span.End()
}

defer func() {
Expand All @@ -118,12 +117,8 @@ func (t *Task) Call() (taskResults []*TaskResult, err error) {
}

// mark the span as failed and dump the error and stack trace to the span
if span := opentracing.SpanFromContext(t.Context); span != nil {
opentracing_ext.Error.Set(span, true)
span.LogFields(
opentracing_log.Error(err),
opentracing_log.Object("stack", string(debug.Stack())),
)
if span := trace.SpanFromContext(t.Context); span != nil {
span.RecordError(err, trace.WithStackTrace(true))
}

// Print stack trace
Expand Down
Loading