Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: optional otel tracing #44

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions cmd/config.sample.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
log_level = "INFO"
# maximum time allowed for a job to run once it has started execution
default_job_ttl = "60s"
enable_tracing = false
trace_file_path = "./out.traces"

# The broker that manages job queuing.
# Currently, only "redis" is supported.
Expand Down
2 changes: 2 additions & 0 deletions cmd/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,8 @@ func initCore(ko *koanf.Koanf) (*core.Core, error) {
DefaultQueue: ko.MustString("queue"),
DefaultGroupConcurrency: ko.MustInt("worker-concurrency"),
DefaultJobTTL: ko.MustDuration("app.default_job_ttl"),
EnableTracing: ko.Bool("app.enable_tracing"),
TraceFilePath: ko.String("app.trace_file_path"),
Results: rResult,
Broker: rBroker,
}, srcPool, backends, lo)
Expand Down
2 changes: 2 additions & 0 deletions config.test_mysql.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
[app]
log_level = "DEBUG"
default_job_ttl = "60s"
enable_tracing = false
trace_file_path = "./out.traces"

[job_queue.broker]
type = "redis"
Expand Down
2 changes: 2 additions & 0 deletions config.test_pg.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
[app]
log_level = "DEBUG"
default_job_ttl = "60s"
enable_tracing = true
trace_file_path = "./out.traces"

[job_queue.broker]
type = "redis"
Expand Down
2 changes: 2 additions & 0 deletions config.toml.sample
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
[app]
log_level = "DEBUG"
default_job_ttl = "60s"
enable_tracing = false
trace_file_path = "./out.traces"

[job_queue.broker]
type = "redis"
Expand Down
5 changes: 3 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ require (
github.com/spf13/pflag v1.0.5
github.com/stretchr/testify v1.8.4
github.com/vmihailenco/msgpack v4.0.4+incompatible
go.opentelemetry.io/otel v1.24.0
go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.9.0
go.opentelemetry.io/otel/sdk v1.24.0
)

require (
Expand All @@ -41,9 +44,7 @@ require (
github.com/robfig/cron/v3 v3.0.1 // indirect
github.com/vmihailenco/msgpack/v5 v5.4.1 // indirect
github.com/vmihailenco/tagparser/v2 v2.0.0 // indirect
go.opentelemetry.io/otel v1.24.0 // indirect
go.opentelemetry.io/otel/metric v1.24.0 // indirect
go.opentelemetry.io/otel/sdk v1.24.0 // indirect
go.opentelemetry.io/otel/trace v1.24.0 // indirect
golang.org/x/net v0.23.0 // indirect
golang.org/x/sys v0.18.0 // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,8 @@ github.com/vmihailenco/tagparser/v2 v2.0.0 h1:y09buUbR+b5aycVFQs/g70pqKVZNBmxwAh
github.com/vmihailenco/tagparser/v2 v2.0.0/go.mod h1:Wri+At7QHww0WTrCBeu4J6bNtoV6mEfg5OIWRZA9qds=
go.opentelemetry.io/otel v1.24.0 h1:0LAOdjNmQeSTzGBzduGe/rU4tZhMwL5rWgtp9Ku5Jfo=
go.opentelemetry.io/otel v1.24.0/go.mod h1:W7b9Ozg4nkF5tWI5zsXkaKKDjdVjpD4oAt9Qi/MArHo=
go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.9.0 h1:0uV0qzHk48i1SF8qRI8odMYiwPOLh9gBhiJFpj8H6JY=
go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.9.0/go.mod h1:Fl1iS5ZhWgXXXTdJMuBSVsS5nkL5XluHbg97kjOuYU4=
go.opentelemetry.io/otel/metric v1.24.0 h1:6EhoGWWK28x1fbpA4tYTOWBkPefTDQnb8WSGXlc88kI=
go.opentelemetry.io/otel/metric v1.24.0/go.mod h1:VYhLe1rFfxuTXLgj4CBiyz+9WYBA8pNGJgDcSFRKBco=
go.opentelemetry.io/otel/sdk v1.24.0 h1:YMPPDNymmQN3ZgczicBY3B6sf9n62Dlj9pWD3ucgoDw=
Expand Down
87 changes: 82 additions & 5 deletions internal/core/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"errors"
"fmt"
"log/slog"
"os"
"strconv"
"sync"
"time"
Expand All @@ -15,14 +16,38 @@ import (
"github.com/vmihailenco/msgpack"
"github.com/zerodha/dungbeetle/v2/internal/dbpool"
"github.com/zerodha/dungbeetle/v2/models"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/exporters/stdout/stdouttrace"
"go.opentelemetry.io/otel/sdk/resource"
"go.opentelemetry.io/otel/sdk/trace"
semconv "go.opentelemetry.io/otel/semconv/v1.24.0"
)

// traces is a thin wrapper around otel trace-provider which
// is optionally passed to the distributed queue
type traces struct {
f *os.File
tp *trace.TracerProvider
}

func (t *traces) Close() error {
if err := t.tp.Shutdown(context.Background()); err != nil {
return err
}

return t.f.Close()
}

// Opt represents core options.
type Opt struct {
DefaultQueue string
DefaultGroupConcurrency int
DefaultJobTTL time.Duration

// Optional tracing parameters
EnableTracing bool
TraceFilePath string

// DSNs for connecting to the broker backend and the broker state backend.
Broker tasqueue.Broker
Results tasqueue.Results
Expand All @@ -41,7 +66,8 @@ type Core struct {
resultBackends ResultBackends

// Distributed queue system.
q *tasqueue.Server
tracer *traces
q *tasqueue.Server

// Job states for cancellation.
jobCtx map[string]context.CancelFunc
Expand Down Expand Up @@ -74,6 +100,11 @@ func (co *Core) Start(ctx context.Context, workerName string, concurrency int) e
co.q = qs
qs.Start(ctx)

// Close the tracer once the job processor ends
if co.opt.EnableTracing {
co.tracer.Close()
}

return nil
}

Expand Down Expand Up @@ -400,17 +431,63 @@ type taskMeta struct {
TTL int `json:"ttl"`
}

func initTracer(fpath string) (*traces, error) {
// Write telemetry data to a file.
f, err := os.Create(fpath)
if err != nil {
return nil, err
}

exp, err := stdouttrace.New(
stdouttrace.WithWriter(f),
// Use human-readable output.
stdouttrace.WithPrettyPrint(),
)
if err != nil {
return nil, err
}

r, err := resource.Merge(
resource.Default(),
resource.NewWithAttributes(
semconv.SchemaURL,
semconv.ServiceNameKey.String("dungbeetle"),
),
)
if err != nil {
return nil, err
}

tp := trace.NewTracerProvider(trace.WithBatcher(exp), trace.WithResource(r))
otel.SetTracerProvider(tp)

return &traces{
f: f,
tp: tp,
}, nil
}

// initQueue creates and returns a distributed queue system (Tasqueue) and registers
// Tasks (SQL queries) to be executed. The queue system uses a broker (eg: Kafka) and stores
// job states in a state store (eg: Redis)
func (co *Core) initQueue() (*tasqueue.Server, error) {
var err error
// TODO: set log level
qs, err := tasqueue.NewServer(tasqueue.ServerOpts{
opts := tasqueue.ServerOpts{
Broker: co.opt.Broker,
Results: co.opt.Results,
Logger: co.lo.Handler(),
})
}

var err error
if co.opt.EnableTracing {
co.tracer, err = initTracer(co.opt.TraceFilePath)
if err != nil {
return nil, err
}
opts.TraceProvider = co.tracer.tp
}

// TODO: set log level
qs, err := tasqueue.NewServer(opts)
if err != nil {
return nil, err
}
Expand Down
Loading