Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Poc/mpsc processor #34

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,12 @@ type OpenTelemetry struct {
TLS TLS `json:"tls"`
// Defines the configurations to use in the sampler.
Sampling Sampling `json:"sampling"`

// BatchSpanProcessor configuration
BatchSize int `json:"batch_size"`
BatchTimeout int `json:"batch_timeout"`
BatchQueueSize int `json:"batch_queue_size"`
BatchExportTimeout int `json:"batch_export_timeout"`
}

type TLS struct {
Expand Down Expand Up @@ -128,4 +134,20 @@ func (c *OpenTelemetry) SetDefaults() {
if c.Sampling.Type == TRACEIDRATIOBASED && c.Sampling.Rate == 0 {
c.Sampling.Rate = 0.5
}

if c.BatchSize == 0 {
c.BatchSize = 512
}

if c.BatchTimeout == 0 {
c.BatchTimeout = 5000
}

if c.BatchQueueSize == 0 {
c.BatchQueueSize = 2048
}

if c.BatchExportTimeout == 0 {
c.BatchExportTimeout = 30000
}
}
1 change: 1 addition & 0 deletions e2e/basic/basic.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ func main() {
TLS: config.TLS{
Enable: false,
},
SpanProcessorType: "mpsc",
}

log.Println("Initializing OpenTelemetry at e2e-basic:", cfg.Endpoint)
Expand Down
160 changes: 160 additions & 0 deletions trace/mpsc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
package trace

import (
"context"
"sync"
"sync/atomic"
"time"
"unsafe"

sdktrace "go.opentelemetry.io/otel/sdk/trace"
)

// spanQueueNode represents a node in the queue.
type spanQueueNode struct {
span sdktrace.ReadOnlySpan
next *spanQueueNode
}

// spanQueue is a basic lock-free queue for spans.
type spanQueue struct {
head atomic.Pointer[spanQueueNode]
tail atomic.Pointer[spanQueueNode]
padding [128]byte // Padding to avoid false sharing between head and tail

Check failure on line 23 in trace/mpsc.go

View workflow job for this annotation

GitHub Actions / lint

field `padding` is unused (unused)
}

// newSpanQueue creates a new spanQueue.
func newSpanQueue() *spanQueue {
q := &spanQueue{}
node := &spanQueueNode{} // Dummy node
q.head.Store(node)
q.tail.Store(node)
return q

Check failure on line 32 in trace/mpsc.go

View workflow job for this annotation

GitHub Actions / lint

return statements should not be cuddled if block has more than two lines (wsl)
}

// enqueue adds a span to the queue.
func (q *spanQueue) enqueue(span sdktrace.ReadOnlySpan) {
node := &spanQueueNode{span: span}
for {

Check failure on line 38 in trace/mpsc.go

View workflow job for this annotation

GitHub Actions / lint

for statement without condition should never be cuddled (wsl)
tail := q.tail.Load()
tailNext := (*unsafe.Pointer)(unsafe.Pointer(tail.next))
if atomic.CompareAndSwapPointer(tailNext, nil, unsafe.Pointer(node)) {

Check failure on line 41 in trace/mpsc.go

View workflow job for this annotation

GitHub Actions / lint

only one cuddle assignment allowed before if statement (wsl)
q.tail.CompareAndSwap(tail, node)
return
}
q.tail.CompareAndSwap(tail, tail.next)

Check failure on line 45 in trace/mpsc.go

View workflow job for this annotation

GitHub Actions / lint

expressions should not be cuddled with blocks (wsl)
}
}

// dequeue removes and returns the next span from the queue.
func (q *spanQueue) dequeue() (sdktrace.ReadOnlySpan, bool) {
for {
head := q.head.Load()
next := head.next
if next == nil {

Check failure on line 54 in trace/mpsc.go

View workflow job for this annotation

GitHub Actions / lint

only one cuddle assignment allowed before if statement (wsl)
return nil, false // Queue is empty
}
if q.head.CompareAndSwap(head, next) {

Check failure on line 57 in trace/mpsc.go

View workflow job for this annotation

GitHub Actions / lint

if statements should only be cuddled with assignments (wsl)
return next.span, true
}
}
}

// BatchSpanProcessor is an implementation of the SpanProcessor that batches spans for async processing.
type BatchSpanProcessor struct {
queue *spanQueue
maxBatch int
exporter sdktrace.SpanExporter
shutdownCh chan struct{}
wg sync.WaitGroup
}

// NewBatchSpanProcessor creates a new BatchSpanProcessor.
func NewMPSCSpanProcessor(exporter sdktrace.SpanExporter, maxBatchSize int) *BatchSpanProcessor {
bsp := &BatchSpanProcessor{
queue: newSpanQueue(),
maxBatch: maxBatchSize,
exporter: exporter,
shutdownCh: make(chan struct{}),
}

bsp.wg.Add(1)
go bsp.processQueue()

return bsp
}

// OnStart is called when a span is started.
func (bsp *BatchSpanProcessor) OnStart(_ context.Context, _ sdktrace.ReadWriteSpan) {
// Do nothing on start.
}

// OnEnd is called when a span is finished.
func (bsp *BatchSpanProcessor) OnEnd(s sdktrace.ReadOnlySpan) {
bsp.queue.enqueue(s)
}

// Shutdown is called when the SDK shuts down.
func (bsp *BatchSpanProcessor) Shutdown(ctx context.Context) error {
close(bsp.shutdownCh)
bsp.wg.Wait()

return bsp.exporter.Shutdown(ctx)
}

// ForceFlush exports all ended spans that have not yet been exported.
func (bsp *BatchSpanProcessor) ForceFlush(ctx context.Context) error {
batch := bsp.collectBatch()
if len(batch) > 0 {
return bsp.exporter.ExportSpans(ctx, batch)
}
return nil

Check failure on line 111 in trace/mpsc.go

View workflow job for this annotation

GitHub Actions / lint

return statements should not be cuddled if block has more than two lines (wsl)
}

// processQueue processes the span queue in batches.
func (bsp *BatchSpanProcessor) processQueue() {
defer bsp.wg.Done()

ticker := time.NewTicker(200 * time.Millisecond)
defer ticker.Stop()

batch := make([]sdktrace.ReadOnlySpan, 0, bsp.maxBatch)

for {
select {
case <-bsp.shutdownCh:
return
case <-ticker.C:
if len(batch) > 0 {
bsp.exportBatch(batch)
batch = make([]sdktrace.ReadOnlySpan, 0, bsp.maxBatch)
}
default:
if span, ok := bsp.queue.dequeue(); ok {
batch = append(batch, span)
if len(batch) >= bsp.maxBatch {
bsp.exportBatch(batch)
batch = make([]sdktrace.ReadOnlySpan, 0, bsp.maxBatch)
}
}
}
}
}

// collectBatch collects a batch of spans from the queue.
func (bsp *BatchSpanProcessor) collectBatch() []sdktrace.ReadOnlySpan {
var batch []sdktrace.ReadOnlySpan
for {
if span, ok := bsp.queue.dequeue(); ok {
batch = append(batch, span)
} else {
break
}
}
return batch
}

// exportBatch exports a batch of spans.
func (bsp *BatchSpanProcessor) exportBatch(batch []sdktrace.ReadOnlySpan) {
_ = bsp.exporter.ExportSpans(context.Background(), batch)

Check failure on line 159 in trace/mpsc.go

View workflow job for this annotation

GitHub Actions / lint

Error return value of `bsp.exporter.ExportSpans` is not checked (errcheck)
}
2 changes: 1 addition & 1 deletion trace/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ func NewProvider(opts ...Option) (Provider, error) {
}

// create the span processor - this is what will send the spans to the exporter.
spanProcesor := spanProcessorFactory(provider.cfg.SpanProcessorType, exporter)
spanProcesor := spanProcessorFactory(provider.cfg.SpanProcessorType, exporter, provider.cfg)

// create the sampler based on the configs
samplerType := provider.cfg.Sampling.Type
Expand Down
29 changes: 25 additions & 4 deletions trace/span_processor.go
Original file line number Diff line number Diff line change
@@ -1,23 +1,44 @@
package trace

import (
"fmt"
"time"

"github.com/TykTechnologies/opentelemetry/config"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
)

func spanProcessorFactory(spanProcessorType string, exporter sdktrace.SpanExporter) sdktrace.SpanProcessor {
func spanProcessorFactory(spanProcessorType string, exporter sdktrace.SpanExporter, cfg *config.OpenTelemetry) sdktrace.SpanProcessor {

Check failure on line 11 in trace/span_processor.go

View workflow job for this annotation

GitHub Actions / lint

line is 135 characters (lll)
switch spanProcessorType {
case "simple":
return newSimpleSpanProcessor(exporter)
case "mpsc":
fmt.Println("Using MPSC Span Processor")
return NewMPSCSpanProcessor(exporter, cfg.BatchSize)
default:
// Default to BatchSpanProcessor
return newBatchSpanProcessor(exporter)
return newBatchSpanProcessor(exporter, cfg)
}
}

func newSimpleSpanProcessor(exporter sdktrace.SpanExporter) sdktrace.SpanProcessor {
return sdktrace.NewSimpleSpanProcessor(exporter)
}

func newBatchSpanProcessor(exporter sdktrace.SpanExporter) sdktrace.SpanProcessor {
return sdktrace.NewBatchSpanProcessor(exporter)
func newBatchSpanProcessor(exporter sdktrace.SpanExporter, cfg *config.OpenTelemetry) sdktrace.SpanProcessor {
opts := []sdktrace.BatchSpanProcessorOption{}
if cfg.BatchSize > 0 {
opts = append(opts, sdktrace.WithMaxExportBatchSize(cfg.BatchSize))
}
if cfg.BatchQueueSize > 0 {
opts = append(opts, sdktrace.WithMaxQueueSize(cfg.BatchQueueSize))
}
if cfg.BatchTimeout > 0 {
opts = append(opts, sdktrace.WithBatchTimeout(time.Duration(cfg.BatchTimeout)*time.Millisecond))
}
if cfg.BatchExportTimeout > 0 {
opts = append(opts, sdktrace.WithExportTimeout(time.Duration(cfg.BatchExportTimeout)*time.Millisecond))
}

return sdktrace.NewBatchSpanProcessor(exporter, opts...)
}
7 changes: 4 additions & 3 deletions trace/span_processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"strconv"
"testing"

"github.com/TykTechnologies/opentelemetry/config"
"github.com/stretchr/testify/assert"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
"go.opentelemetry.io/otel/trace"
Expand Down Expand Up @@ -76,7 +77,7 @@ func Test_NewBatchSpanProcessor(t *testing.T) {
te := testExporter{}

// Create a new span processor
processor := newBatchSpanProcessor(&te)
processor := newBatchSpanProcessor(&te, &config.OpenTelemetry{})
assert.NotNil(t, processor)
// Create a new tracer provider
tp := sdktrace.NewTracerProvider(sdktrace.WithSampler(sdktrace.AlwaysSample()))
Expand Down Expand Up @@ -104,7 +105,7 @@ func Test_NewBatchSpanProcessor(t *testing.T) {
te := testExporter{}

// Create a new span processor
processor := newBatchSpanProcessor(&te)
processor := newBatchSpanProcessor(&te, &config.OpenTelemetry{})
assert.NotNil(t, processor)
// Create a new tracer provider
tp := sdktrace.NewTracerProvider(sdktrace.WithSampler(sdktrace.AlwaysSample()))
Expand Down Expand Up @@ -142,7 +143,7 @@ func Test_NewBatchSpanProcessor(t *testing.T) {
te := testExporter{}

// Create a new span processor
processor := newBatchSpanProcessor(&te)
processor := newBatchSpanProcessor(&te, &config.OpenTelemetry{})
assert.NotNil(t, processor)
// Create a new tracer provider
tp := sdktrace.NewTracerProvider(sdktrace.WithSampler(sdktrace.AlwaysSample()))
Expand Down
Loading