From 972ea8d62ae413f8ebfce3ffd1535e032b97f4fc Mon Sep 17 00:00:00 2001 From: tbuchaillot Date: Wed, 3 Jan 2024 10:52:49 +0100 Subject: [PATCH 1/3] Configurable SpanProcessor batch opts --- config/config.go | 22 ++++++++++++++++++++++ trace/provider.go | 2 +- trace/span_processor.go | 18 ++++++++++++++---- trace/span_processor_test.go | 7 ++++--- 4 files changed, 41 insertions(+), 8 deletions(-) diff --git a/config/config.go b/config/config.go index c6cbc80..cea93f4 100644 --- a/config/config.go +++ b/config/config.go @@ -32,6 +32,12 @@ type OpenTelemetry struct { TLS TLS `json:"tls"` // Defines the configurations to use in the sampler. Sampling Sampling `json:"sampling"` + + // BatchSpanProcessor configuration + BatchSize int `json:"batch_size"` + BatchTimeout int `json:"batch_timeout"` + BatchQueueSize int `json:"batch_queue_size"` + BatchExportTimeout int `json:"batch_export_timeout"` } type TLS struct { @@ -128,4 +134,20 @@ func (c *OpenTelemetry) SetDefaults() { if c.Sampling.Type == TRACEIDRATIOBASED && c.Sampling.Rate == 0 { c.Sampling.Rate = 0.5 } + + if c.BatchSize == 0 { + c.BatchSize = 512 + } + + if c.BatchTimeout == 0 { + c.BatchTimeout = 5000 + } + + if c.BatchQueueSize == 0 { + c.BatchQueueSize = 2048 + } + + if c.BatchExportTimeout == 0 { + c.BatchExportTimeout = 30000 + } } diff --git a/trace/provider.go b/trace/provider.go index 2f2dd8b..a785503 100644 --- a/trace/provider.go +++ b/trace/provider.go @@ -99,7 +99,7 @@ func NewProvider(opts ...Option) (Provider, error) { } // create the span processor - this is what will send the spans to the exporter. - spanProcesor := spanProcessorFactory(provider.cfg.SpanProcessorType, exporter) + spanProcesor := spanProcessorFactory(provider.cfg.SpanProcessorType, exporter, provider.cfg) // create the sampler based on the configs samplerType := provider.cfg.Sampling.Type diff --git a/trace/span_processor.go b/trace/span_processor.go index 2f74488..610b344 100644 --- a/trace/span_processor.go +++ b/trace/span_processor.go @@ -1,16 +1,19 @@ package trace import ( + "time" + + "github.com/TykTechnologies/opentelemetry/config" sdktrace "go.opentelemetry.io/otel/sdk/trace" ) -func spanProcessorFactory(spanProcessorType string, exporter sdktrace.SpanExporter) sdktrace.SpanProcessor { +func spanProcessorFactory(spanProcessorType string, exporter sdktrace.SpanExporter, cfg *config.OpenTelemetry) sdktrace.SpanProcessor { switch spanProcessorType { case "simple": return newSimpleSpanProcessor(exporter) default: // Default to BatchSpanProcessor - return newBatchSpanProcessor(exporter) + return newBatchSpanProcessor(exporter, cfg) } } @@ -18,6 +21,13 @@ func newSimpleSpanProcessor(exporter sdktrace.SpanExporter) sdktrace.SpanProcess return sdktrace.NewSimpleSpanProcessor(exporter) } -func newBatchSpanProcessor(exporter sdktrace.SpanExporter) sdktrace.SpanProcessor { - return sdktrace.NewBatchSpanProcessor(exporter) +func newBatchSpanProcessor(exporter sdktrace.SpanExporter, cfg *config.OpenTelemetry) sdktrace.SpanProcessor { + opts := []sdktrace.BatchSpanProcessorOption{ + sdktrace.WithMaxExportBatchSize(cfg.BatchSize), + sdktrace.WithMaxQueueSize(cfg.BatchQueueSize), + sdktrace.WithBatchTimeout(time.Duration(cfg.BatchTimeout) * time.Millisecond), + sdktrace.WithExportTimeout(time.Duration(cfg.BatchExportTimeout) * time.Millisecond), + } + + return sdktrace.NewBatchSpanProcessor(exporter, opts...) } diff --git a/trace/span_processor_test.go b/trace/span_processor_test.go index 2a57e9c..e416881 100644 --- a/trace/span_processor_test.go +++ b/trace/span_processor_test.go @@ -5,6 +5,7 @@ import ( "strconv" "testing" + "github.com/TykTechnologies/opentelemetry/config" "github.com/stretchr/testify/assert" sdktrace "go.opentelemetry.io/otel/sdk/trace" "go.opentelemetry.io/otel/trace" @@ -76,7 +77,7 @@ func Test_NewBatchSpanProcessor(t *testing.T) { te := testExporter{} // Create a new span processor - processor := newBatchSpanProcessor(&te) + processor := newBatchSpanProcessor(&te, &config.OpenTelemetry{}) assert.NotNil(t, processor) // Create a new tracer provider tp := sdktrace.NewTracerProvider(sdktrace.WithSampler(sdktrace.AlwaysSample())) @@ -104,7 +105,7 @@ func Test_NewBatchSpanProcessor(t *testing.T) { te := testExporter{} // Create a new span processor - processor := newBatchSpanProcessor(&te) + processor := newBatchSpanProcessor(&te, &config.OpenTelemetry{}) assert.NotNil(t, processor) // Create a new tracer provider tp := sdktrace.NewTracerProvider(sdktrace.WithSampler(sdktrace.AlwaysSample())) @@ -142,7 +143,7 @@ func Test_NewBatchSpanProcessor(t *testing.T) { te := testExporter{} // Create a new span processor - processor := newBatchSpanProcessor(&te) + processor := newBatchSpanProcessor(&te, &config.OpenTelemetry{}) assert.NotNil(t, processor) // Create a new tracer provider tp := sdktrace.NewTracerProvider(sdktrace.WithSampler(sdktrace.AlwaysSample())) From ce8fd0b9b71f00c5ace5259903836c98613bd4aa Mon Sep 17 00:00:00 2001 From: tbuchaillot Date: Wed, 3 Jan 2024 17:44:13 +0100 Subject: [PATCH 2/3] mpsc processor --- e2e/basic/basic.go | 1 + trace/mpsc.go | 160 ++++++++++++++++++++++++++++++++++++++++ trace/span_processor.go | 4 + 3 files changed, 165 insertions(+) create mode 100644 trace/mpsc.go diff --git a/e2e/basic/basic.go b/e2e/basic/basic.go index 78f4dfe..a8713c8 100644 --- a/e2e/basic/basic.go +++ b/e2e/basic/basic.go @@ -29,6 +29,7 @@ func main() { TLS: config.TLS{ Enable: false, }, + SpanProcessorType: "mpsc", } log.Println("Initializing OpenTelemetry at e2e-basic:", cfg.Endpoint) diff --git a/trace/mpsc.go b/trace/mpsc.go new file mode 100644 index 0000000..1db8992 --- /dev/null +++ b/trace/mpsc.go @@ -0,0 +1,160 @@ +package trace + +import ( + "context" + "sync" + "sync/atomic" + "time" + "unsafe" + + sdktrace "go.opentelemetry.io/otel/sdk/trace" +) + +// spanQueueNode represents a node in the queue. +type spanQueueNode struct { + span sdktrace.ReadOnlySpan + next *spanQueueNode +} + +// spanQueue is a basic lock-free queue for spans. +type spanQueue struct { + head atomic.Pointer[spanQueueNode] + tail atomic.Pointer[spanQueueNode] + padding [128]byte // Padding to avoid false sharing between head and tail +} + +// newSpanQueue creates a new spanQueue. +func newSpanQueue() *spanQueue { + q := &spanQueue{} + node := &spanQueueNode{} // Dummy node + q.head.Store(node) + q.tail.Store(node) + return q +} + +// enqueue adds a span to the queue. +func (q *spanQueue) enqueue(span sdktrace.ReadOnlySpan) { + node := &spanQueueNode{span: span} + for { + tail := q.tail.Load() + tailNext := (*unsafe.Pointer)(unsafe.Pointer(tail.next)) + if atomic.CompareAndSwapPointer(tailNext, nil, unsafe.Pointer(node)) { + q.tail.CompareAndSwap(tail, node) + return + } + q.tail.CompareAndSwap(tail, tail.next) + } +} + +// dequeue removes and returns the next span from the queue. +func (q *spanQueue) dequeue() (sdktrace.ReadOnlySpan, bool) { + for { + head := q.head.Load() + next := head.next + if next == nil { + return nil, false // Queue is empty + } + if q.head.CompareAndSwap(head, next) { + return next.span, true + } + } +} + +// BatchSpanProcessor is an implementation of the SpanProcessor that batches spans for async processing. +type BatchSpanProcessor struct { + queue *spanQueue + maxBatch int + exporter sdktrace.SpanExporter + shutdownCh chan struct{} + wg sync.WaitGroup +} + +// NewBatchSpanProcessor creates a new BatchSpanProcessor. +func NewMPSCSpanProcessor(exporter sdktrace.SpanExporter, maxBatchSize int) *BatchSpanProcessor { + bsp := &BatchSpanProcessor{ + queue: newSpanQueue(), + maxBatch: maxBatchSize, + exporter: exporter, + shutdownCh: make(chan struct{}), + } + + bsp.wg.Add(1) + go bsp.processQueue() + + return bsp +} + +// OnStart is called when a span is started. +func (bsp *BatchSpanProcessor) OnStart(_ context.Context, _ sdktrace.ReadWriteSpan) { + // Do nothing on start. +} + +// OnEnd is called when a span is finished. +func (bsp *BatchSpanProcessor) OnEnd(s sdktrace.ReadOnlySpan) { + bsp.queue.enqueue(s) +} + +// Shutdown is called when the SDK shuts down. +func (bsp *BatchSpanProcessor) Shutdown(ctx context.Context) error { + close(bsp.shutdownCh) + bsp.wg.Wait() + + return bsp.exporter.Shutdown(ctx) +} + +// ForceFlush exports all ended spans that have not yet been exported. +func (bsp *BatchSpanProcessor) ForceFlush(ctx context.Context) error { + batch := bsp.collectBatch() + if len(batch) > 0 { + return bsp.exporter.ExportSpans(ctx, batch) + } + return nil +} + +// processQueue processes the span queue in batches. +func (bsp *BatchSpanProcessor) processQueue() { + defer bsp.wg.Done() + + ticker := time.NewTicker(200 * time.Millisecond) + defer ticker.Stop() + + batch := make([]sdktrace.ReadOnlySpan, 0, bsp.maxBatch) + + for { + select { + case <-bsp.shutdownCh: + return + case <-ticker.C: + if len(batch) > 0 { + bsp.exportBatch(batch) + batch = make([]sdktrace.ReadOnlySpan, 0, bsp.maxBatch) + } + default: + if span, ok := bsp.queue.dequeue(); ok { + batch = append(batch, span) + if len(batch) >= bsp.maxBatch { + bsp.exportBatch(batch) + batch = make([]sdktrace.ReadOnlySpan, 0, bsp.maxBatch) + } + } + } + } +} + +// collectBatch collects a batch of spans from the queue. +func (bsp *BatchSpanProcessor) collectBatch() []sdktrace.ReadOnlySpan { + var batch []sdktrace.ReadOnlySpan + for { + if span, ok := bsp.queue.dequeue(); ok { + batch = append(batch, span) + } else { + break + } + } + return batch +} + +// exportBatch exports a batch of spans. +func (bsp *BatchSpanProcessor) exportBatch(batch []sdktrace.ReadOnlySpan) { + _ = bsp.exporter.ExportSpans(context.Background(), batch) +} diff --git a/trace/span_processor.go b/trace/span_processor.go index 610b344..340ec8d 100644 --- a/trace/span_processor.go +++ b/trace/span_processor.go @@ -1,6 +1,7 @@ package trace import ( + "fmt" "time" "github.com/TykTechnologies/opentelemetry/config" @@ -11,6 +12,9 @@ func spanProcessorFactory(spanProcessorType string, exporter sdktrace.SpanExport switch spanProcessorType { case "simple": return newSimpleSpanProcessor(exporter) + case "mpsc": + fmt.Println("Using MPSC Span Processor") + return NewMPSCSpanProcessor(exporter, cfg.BatchSize) default: // Default to BatchSpanProcessor return newBatchSpanProcessor(exporter, cfg) From ea883aade4e35012a06a0d3c4d8b41f509096632 Mon Sep 17 00:00:00 2001 From: tbuchaillot Date: Thu, 4 Jan 2024 11:59:09 +0100 Subject: [PATCH 3/3] fixing tests --- trace/span_processor.go | 17 ++++++++++++----- 1 file changed, 12 insertions(+), 5 deletions(-) diff --git a/trace/span_processor.go b/trace/span_processor.go index 340ec8d..f48a331 100644 --- a/trace/span_processor.go +++ b/trace/span_processor.go @@ -26,11 +26,18 @@ func newSimpleSpanProcessor(exporter sdktrace.SpanExporter) sdktrace.SpanProcess } func newBatchSpanProcessor(exporter sdktrace.SpanExporter, cfg *config.OpenTelemetry) sdktrace.SpanProcessor { - opts := []sdktrace.BatchSpanProcessorOption{ - sdktrace.WithMaxExportBatchSize(cfg.BatchSize), - sdktrace.WithMaxQueueSize(cfg.BatchQueueSize), - sdktrace.WithBatchTimeout(time.Duration(cfg.BatchTimeout) * time.Millisecond), - sdktrace.WithExportTimeout(time.Duration(cfg.BatchExportTimeout) * time.Millisecond), + opts := []sdktrace.BatchSpanProcessorOption{} + if cfg.BatchSize > 0 { + opts = append(opts, sdktrace.WithMaxExportBatchSize(cfg.BatchSize)) + } + if cfg.BatchQueueSize > 0 { + opts = append(opts, sdktrace.WithMaxQueueSize(cfg.BatchQueueSize)) + } + if cfg.BatchTimeout > 0 { + opts = append(opts, sdktrace.WithBatchTimeout(time.Duration(cfg.BatchTimeout)*time.Millisecond)) + } + if cfg.BatchExportTimeout > 0 { + opts = append(opts, sdktrace.WithExportTimeout(time.Duration(cfg.BatchExportTimeout)*time.Millisecond)) } return sdktrace.NewBatchSpanProcessor(exporter, opts...)