Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Implement OpenTelemetry tracing #1741

Draft
wants to merge 4 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/PeerDB-io/peer-flow/connectors/utils"
"github.com/PeerDB-io/peer-flow/connectors/utils/monitoring"
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/instrumentation/tracing"
"github.com/PeerDB-io/peer-flow/model"
"github.com/PeerDB-io/peer-flow/otel_metrics"
"github.com/PeerDB-io/peer-flow/otel_metrics/peerdb_guages"
Expand Down Expand Up @@ -55,6 +56,8 @@ func (a *FlowableActivity) CheckConnection(
ctx context.Context,
config *protos.SetupInput,
) (*CheckConnectionResult, error) {
ctx, span := tracing.Tracer().Start(ctx, "CheckConnection")
defer span.End()
ctx = context.WithValue(ctx, shared.FlowNameKey, config.FlowName)
dstConn, err := connectors.GetCDCSyncConnector(ctx, config.Peer)
if err != nil {
Expand All @@ -71,6 +74,8 @@ func (a *FlowableActivity) CheckConnection(
}

func (a *FlowableActivity) SetupMetadataTables(ctx context.Context, config *protos.SetupInput) error {
ctx, span := tracing.Tracer().Start(ctx, "SetupMetadataTables")
defer span.End()
ctx = context.WithValue(ctx, shared.FlowNameKey, config.FlowName)
dstConn, err := connectors.GetCDCSyncConnector(ctx, config.Peer)
if err != nil {
Expand All @@ -90,6 +95,8 @@ func (a *FlowableActivity) EnsurePullability(
ctx context.Context,
config *protos.EnsurePullabilityBatchInput,
) (*protos.EnsurePullabilityBatchOutput, error) {
ctx, span := tracing.Tracer().Start(ctx, "EnsurePullability")
defer span.End()
ctx = context.WithValue(ctx, shared.FlowNameKey, config.FlowJobName)
srcConn, err := connectors.GetCDCPullConnector(ctx, config.PeerConnectionConfig)
if err != nil {
Expand Down Expand Up @@ -274,6 +281,8 @@ func (a *FlowableActivity) SyncRecords(
options *protos.SyncFlowOptions,
sessionID string,
) (*model.SyncResponse, error) {
ctx, span := tracing.Tracer().Start(ctx, "SyncRecords")
defer span.End()
return syncCore(ctx, a, config, options, sessionID,
connectors.CDCPullConnector.PullRecords,
connectors.CDCSyncConnector.SyncRecords)
Expand Down
7 changes: 4 additions & 3 deletions flow/cmd/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,9 +89,10 @@ func killExistingScheduleFlows(

func APIMain(ctx context.Context, args *APIServerParams) error {
clientOptions := client.Options{
HostPort: args.TemporalHostPort,
Namespace: args.TemporalNamespace,
Logger: slog.New(logger.NewHandler(slog.NewJSONHandler(os.Stdout, nil))),
HostPort: args.TemporalHostPort,
Namespace: args.TemporalNamespace,
Logger: slog.New(logger.NewHandler(slog.NewJSONHandler(os.Stdout, nil))),
Interceptors: GetTemporalClientInterceptors(),
}
if args.TemporalCert != "" && args.TemporalKey != "" {
slog.Info("Using temporal certificate/key for authentication")
Expand Down
7 changes: 4 additions & 3 deletions flow/cmd/snapshot_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,10 @@ type SnapshotWorkerOptions struct {

func SnapshotWorkerMain(opts *SnapshotWorkerOptions) (client.Client, worker.Worker, error) {
clientOptions := client.Options{
HostPort: opts.TemporalHostPort,
Namespace: opts.TemporalNamespace,
Logger: slog.New(logger.NewHandler(slog.NewJSONHandler(os.Stdout, nil))),
HostPort: opts.TemporalHostPort,
Namespace: opts.TemporalNamespace,
Logger: slog.New(logger.NewHandler(slog.NewJSONHandler(os.Stdout, nil))),
Interceptors: GetTemporalClientInterceptors(),
}

if opts.TemporalCert != "" && opts.TemporalKey != "" {
Expand Down
8 changes: 4 additions & 4 deletions flow/cmd/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,9 +87,10 @@ func WorkerSetup(opts *WorkerSetupOptions) (*workerSetupResponse, error) {
}

clientOptions := client.Options{
HostPort: opts.TemporalHostPort,
Namespace: opts.TemporalNamespace,
Logger: slog.New(logger.NewHandler(slog.NewJSONHandler(os.Stdout, nil))),
HostPort: opts.TemporalHostPort,
Namespace: opts.TemporalNamespace,
Logger: slog.New(logger.NewHandler(slog.NewJSONHandler(os.Stdout, nil))),
Interceptors: GetTemporalClientInterceptors(),
}

if opts.TemporalCert != "" && opts.TemporalKey != "" {
Expand Down Expand Up @@ -117,7 +118,6 @@ func WorkerSetup(opts *WorkerSetupOptions) (*workerSetupResponse, error) {
return nil, fmt.Errorf("unable to create Temporal client: %w", err)
}
slog.Info("Created temporal client")

taskQueue := peerdbenv.PeerFlowTaskQueueName(shared.PeerFlowTaskQueue)
slog.Info(
fmt.Sprintf("Creating temporal worker for queue %v: %v workflow workers %v activity workers",
Expand Down
18 changes: 18 additions & 0 deletions flow/cmd/worker_common.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package cmd

import (
"log"

"go.temporal.io/sdk/contrib/opentelemetry"
"go.temporal.io/sdk/interceptor"
)

func GetTemporalClientInterceptors() []interceptor.ClientInterceptor {
// Maybe do this only when otel is actually enabled?
tracingInterceptor, err := opentelemetry.NewTracingInterceptor(opentelemetry.TracerOptions{})
if err != nil {
log.Printf("failed to create tracing interceptor: %v\n", err)
iamKunalGupta marked this conversation as resolved.
Show resolved Hide resolved
return nil
}
return []interceptor.ClientInterceptor{tracingInterceptor}
}
3 changes: 3 additions & 0 deletions flow/connectors/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"log/slog"

"github.com/jackc/pgx/v5/pgxpool"
"go.opencensus.io/trace"

"github.com/PeerDB-io/peer-flow/alerting"
connbigquery "github.com/PeerDB-io/peer-flow/connectors/bigquery"
Expand Down Expand Up @@ -267,6 +268,8 @@ func GetAs[T Connector](ctx context.Context, config *protos.Peer) (T, error) {
}

func GetCDCPullConnector(ctx context.Context, config *protos.Peer) (CDCPullConnector, error) {
ctx, span := trace.StartSpan(ctx, "connectors.GetCDCPullConnector")
defer span.End()
Comment on lines +271 to +272
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can move this into GetAs, using reflect.TypeFor[T] to put T in span label

return GetAs[CDCPullConnector](ctx, config)
}

Expand Down
10 changes: 7 additions & 3 deletions flow/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,16 @@ require (
go.opentelemetry.io/otel v1.26.0
go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v1.26.0
go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp v1.26.0
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.26.0
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.26.0
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.26.0
go.opentelemetry.io/otel/metric v1.26.0
go.opentelemetry.io/otel/sdk v1.26.0
go.opentelemetry.io/otel/sdk/metric v1.26.0
go.opentelemetry.io/otel/trace v1.26.0
go.temporal.io/api v1.33.0
go.temporal.io/sdk v1.26.1
go.temporal.io/sdk/contrib/opentelemetry v0.5.0
go.uber.org/automaxprocs v1.5.3
golang.org/x/crypto v0.23.0
golang.org/x/mod v0.17.0
Expand Down Expand Up @@ -112,12 +117,11 @@ require (
github.com/prometheus/common v0.53.0 // indirect
github.com/prometheus/procfs v0.15.0 // indirect
github.com/rogpeppe/go-internal v1.12.0 // indirect
github.com/segmentio/asm v1.2.0 // indirect
github.com/segmentio/asm v1.2.0 // indirect; indirectianian
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this tool generated?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

weird

github.com/sirupsen/logrus v1.9.3 // indirect
github.com/twmb/franz-go/pkg/kmsg v1.8.0 // indirect
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.51.0 // indirect
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.51.0 // indirect
go.opentelemetry.io/otel/trace v1.26.0 // indirect
go.opentelemetry.io/proto/otlp v1.2.0 // indirect
golang.org/x/term v0.20.0 // indirect
)
Expand Down Expand Up @@ -173,7 +177,7 @@ require (
github.com/stretchr/objx v0.5.2 // indirect
github.com/xrash/smetrics v0.0.0-20240312152122-5f08fbb34913 // indirect
github.com/zeebo/xxh3 v1.0.2 // indirect
go.opencensus.io v0.24.0 // indirect
go.opencensus.io v0.24.0
golang.org/x/exp v0.0.0-20240506185415-9bf2ced13842
golang.org/x/net v0.25.0 // indirect
golang.org/x/oauth2 v0.20.0 // indirect
Expand Down
10 changes: 10 additions & 0 deletions flow/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -429,6 +429,12 @@ go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v1.26.0 h1:+hm
go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v1.26.0/go.mod h1:NjC8142mLvvNT6biDpaMjyz78kyEHIwAJlSX0N9P5KI=
go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp v1.26.0 h1:HGZWGmCVRCVyAs2GQaiHQPbDHo+ObFWeUEOd+zDnp64=
go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp v1.26.0/go.mod h1:SaH+v38LSCHddyk7RGlU9uZyQoRrKao6IBnJw6Kbn+c=
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.26.0 h1:1u/AyyOqAWzy+SkPxDpahCNZParHV8Vid1RnI2clyDE=
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.26.0/go.mod h1:z46paqbJ9l7c9fIPCXTqTGwhQZ5XoTIsfeFYWboizjs=
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.26.0 h1:Waw9Wfpo/IXzOI8bCB7DIk+0JZcqqsyn1JFnAc+iam8=
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.26.0/go.mod h1:wnJIG4fOqyynOnnQF/eQb4/16VlX2EJAHhHgqIqWfAo=
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.26.0 h1:1wp/gyxsuYtuE/JFxsQRtcCDtMrO2qMvlfXALU5wkzI=
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.26.0/go.mod h1:gbTHmghkGgqxMomVQQMur1Nba4M0MQ8AYThXDUjsJ38=
go.opentelemetry.io/otel/metric v1.26.0 h1:7S39CLuY5Jgg9CrnA9HHiEjGMF/X2VHvoXGgSllRz30=
go.opentelemetry.io/otel/metric v1.26.0/go.mod h1:SY+rHOI4cEawI9a7N1A4nIg/nTQXe1ccCNWYOJUrpX4=
go.opentelemetry.io/otel/sdk v1.26.0 h1:Y7bumHf5tAiDlRYFmGqetNcLaVUZmh4iYfmGxtmz7F8=
Expand All @@ -443,10 +449,14 @@ go.temporal.io/api v1.33.0 h1:UdvRZAwXcR8WMY6nmQ+kDqKMmYPSisIVkoAPX1r+wkM=
go.temporal.io/api v1.33.0/go.mod h1:fgj/okGTIJVbheu3wSJjfEsSOcH/LzihaETvogVDh/c=
go.temporal.io/sdk v1.26.1 h1:ggmFBythnuuW3yQRp0VzOTrmbOf+Ddbe00TZl+CQ+6U=
go.temporal.io/sdk v1.26.1/go.mod h1:ph3K/74cry+JuSV9nJH+Q+Zeir2ddzoX2LjWL/e5yCo=
go.temporal.io/sdk/contrib/opentelemetry v0.5.0 h1:SOcS5VD7lWU+zwtY9PITn5nXLlSywgVzl5A7kWwQ6kI=
go.temporal.io/sdk/contrib/opentelemetry v0.5.0/go.mod h1:zJF/95YTBlTnsnMHLKiZzMFN76LnuTTGC7juBS7NeBY=
go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
go.uber.org/automaxprocs v1.5.3 h1:kWazyxZUrS3Gs4qUpbwo5kEIMGe/DAvi5Z4tl2NW4j8=
go.uber.org/automaxprocs v1.5.3/go.mod h1:eRbA25aqJrxAbsLO0xy5jVwPt7FQnRgjW+efnwa1WM0=
go.uber.org/goleak v1.1.10/go.mod h1:8a7PlsEVH3e/a/GLqe5IIrQx6GzcnRmZEufDUTk4A7A=
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=
go.uber.org/multierr v1.6.0/go.mod h1:cdWPpRnG4AhwMwsgIHip0KRBQjJy5kYEpYjJxpXp9iU=
go.uber.org/zap v1.18.1/go.mod h1:xg/QME4nWcxGxrpdeYfq7UvYrLh66cuVKdrbD1XF/NI=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
Expand Down
17 changes: 17 additions & 0 deletions flow/instrumentation/common/resource.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package common
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

avoid creating many small packages, it doesn't really matter until intersubpackage dependencies get involved. go's naming also makes having common names like util, utils, common, etc a pain

Already had to resolve util/utils split

Copy link
Member Author

@iamKunalGupta iamKunalGupta May 21, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the call graph is like setup -> tracing/metrics/logging -> common, so had to create a separate one this time.
I'll also change it to otel_common for better visibility


import (
"go.opentelemetry.io/otel/sdk/resource"
"go.opentelemetry.io/otel/semconv/v1.24.0"
)

// NewOtelResource returns a resource describing this application.
func NewOtelResource(otelServiceName string) (*resource.Resource, error) {
return resource.Merge(
resource.Default(),
resource.NewWithAttributes(
semconv.SchemaURL,
semconv.ServiceNameKey.String(otelServiceName),
),
)
}
40 changes: 40 additions & 0 deletions flow/instrumentation/setup.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package instrumentation

import (
"context"
"errors"

"github.com/PeerDB-io/peer-flow/instrumentation/tracing"
)

type Config struct {
EnableTracing bool
}

func SetupInstrumentation(ctx context.Context, serviceName string, config Config) (func(ctx context.Context) error, error) {
var shutdownFuncs []func(context.Context) error
shutdown := func(ctx context.Context) error {
var err error
for _, fn := range shutdownFuncs {
err = errors.Join(err, fn(ctx))
}
shutdownFuncs = nil
return err
}

var err error
handleErr := func(inErr error) {
iamKunalGupta marked this conversation as resolved.
Show resolved Hide resolved
err = errors.Join(inErr, shutdown(ctx))
}

if config.EnableTracing {
traceShutdown, err := tracing.SetupOtelTraceProviderExporter(serviceName)
if err != nil {
handleErr(err)
iamKunalGupta marked this conversation as resolved.
Show resolved Hide resolved
return shutdown, err
}
shutdownFuncs = append(shutdownFuncs, traceShutdown)
}
// Setup other stuff here in the future like metrics, logs etc
return shutdown, err
}
76 changes: 76 additions & 0 deletions flow/instrumentation/tracing/setup.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package tracing

import (
"context"
"fmt"

"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp"
sdktrace "go.opentelemetry.io/otel/sdk/trace"

"github.com/PeerDB-io/peer-flow/instrumentation/common"
"github.com/PeerDB-io/peer-flow/peerdbenv"
)

// PeerDBSpanProcessor adds PeerDB specific attributes to spans like deploymentUID
type PeerDBSpanProcessor struct{}

func (p *PeerDBSpanProcessor) OnStart(parent context.Context, s sdktrace.ReadWriteSpan) {
s.SetAttributes(attribute.String("deploymentUID", peerdbenv.PeerDBDeploymentUID()))
}

func (p *PeerDBSpanProcessor) OnEnd(s sdktrace.ReadOnlySpan) {
}

func (p *PeerDBSpanProcessor) Shutdown(ctx context.Context) error {
return nil
}

func (p *PeerDBSpanProcessor) ForceFlush(ctx context.Context) error {
return nil
}

func NewPeerDBSpanProcessor() sdktrace.SpanProcessor {
return &PeerDBSpanProcessor{}
}

func setupHttpOtelTraceExporter() (*otlptrace.Exporter, error) {
return otlptracehttp.New(context.Background())
}

func setupGrpcOtelTraceExporter() (*otlptrace.Exporter, error) {
return otlptracegrpc.New(context.Background())
}

func SetupOtelTraceProviderExporter(otelServiceName string) (func(ctx context.Context) error, error) {
otlpTraceProtocol := peerdbenv.GetEnvString("OTEL_EXPORTER_OTLP_PROTOCOL",
peerdbenv.GetEnvString("OTEL_EXPORTER_OTLP_TRACES_PROTOCOL", "http/protobuf"))
var traceExporter *otlptrace.Exporter
var err error
switch otlpTraceProtocol {
case "http/protobuf":
traceExporter, err = setupHttpOtelTraceExporter()
case "grpc":
traceExporter, err = setupGrpcOtelTraceExporter()
default:
return nil, fmt.Errorf("unsupported otel trace protocol: %s", otlpTraceProtocol)
}
if err != nil {
return nil, fmt.Errorf("failed to create OpenTelemetry trace exporter: %w", err)
}
otelResource, err := common.NewOtelResource(otelServiceName)
if err != nil {
return nil, fmt.Errorf("failed to create OpenTelemetry resource: %w", err)
}
tracerProvider := sdktrace.NewTracerProvider(
sdktrace.WithBatcher(traceExporter),
sdktrace.WithResource(otelResource),
)
// This sets up the trace provider globally, now a tracer can be retrieved via tracer.Tracer()
otel.SetTracerProvider(tracerProvider)
tracerProvider.RegisterSpanProcessor(NewPeerDBSpanProcessor())
return traceExporter.Shutdown, nil
}
26 changes: 26 additions & 0 deletions flow/instrumentation/tracing/tracer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package tracing

import (
"sync"

"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/trace"
)

var tracerName = "peerdb"

var globalTracerOnce sync.Once

var tracer trace.Tracer

func SetupTracer(name string) trace.Tracer {
globalTracerOnce.Do(func() {
tracerName = name
tracer = otel.GetTracerProvider().Tracer(tracerName)
})
return tracer
}

func Tracer() trace.Tracer {
return SetupTracer("peerdb")
}
Loading
Loading