Skip to content

Commit

Permalink
chore: remove sync counter and take it up later
Browse files Browse the repository at this point in the history
  • Loading branch information
iamKunalGupta committed Jan 13, 2025
1 parent 0a752c7 commit d0f00d2
Showing 1 changed file with 0 additions and 83 deletions.
83 changes: 0 additions & 83 deletions flow/otel_metrics/observables.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,7 @@ package otel_metrics
import (
"context"
"fmt"
"log/slog"
"sync"
"sync/atomic"

"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
Expand Down Expand Up @@ -101,78 +99,6 @@ func NewFloat64SyncGauge(meter metric.Meter, gaugeName string, opts ...metric.Fl
return &Float64SyncGauge{syncGauge: syncGauge}, nil
}

func atomicAdd[V any](a *V, b V) error {
switch v := any(b).(type) {
case int64:
atomic.AddInt64(any(*a).(*int64), v)
default:
return fmt.Errorf("unsupported type %T for atomicAdd", v)
}
return nil
}

type SyncCounter[V comparable, O metric.Observable] struct {
observableCounter O
observations sync.Map
name string
}

func (a *SyncCounter[V, O]) Callback(ctx context.Context, observeFunc func(value V, options ...metric.ObserveOption)) error {
a.observations.Range(func(key, value interface{}) bool {
attrs := key.(attribute.Set)
val := value.(*ObservationMapValue[V])
observeFunc(val.Value, metric.WithAttributeSet(attrs))
// TODO what to do for counters??
// If the pointer is still same we can safely delete, else it means that the value was overwritten in parallel
a.observations.CompareAndDelete(attrs, val)
return true
})
return nil
}

func (a *SyncCounter[V, O]) Add(input V, attrs attribute.Set) {
val := ObservationMapValue[V]{Value: input}

actual, loaded := a.observations.LoadOrStore(attrs, &val)
if loaded {
// If the value was already present, we need to add the new value to the existing value
existing := actual.(*ObservationMapValue[V])
if err := atomicAdd(&existing.Value, val.Value); err != nil {
slog.Error("Failed to add value to existing value", slog.Any("error", err))
}
}
}

type Int64SyncCounter struct {
embedded.Int64Counter
syncCounter *SyncCounter[int64, metric.Int64ObservableCounter]
}

func (i *Int64SyncCounter) Add(ctx context.Context, incr int64, options ...metric.AddOption) {
if i == nil {
return
}
c := metric.NewAddConfig(options)
i.syncCounter.Add(incr, c.Attributes())
}

func NewInt64SyncCounter(meter metric.Meter, counterName string, opts ...metric.Int64ObservableCounterOption) (*Int64SyncCounter, error) {
syncCounter := &SyncCounter[int64, metric.Int64ObservableCounter]{
name: counterName,
}
observableCounter, err := meter.Int64ObservableCounter(counterName,
append(opts, metric.WithInt64Callback(func(ctx context.Context, observer metric.Int64Observer) error {
return syncCounter.Callback(ctx, func(value int64, options ...metric.ObserveOption) {
observer.Observe(value, options...)
})
}))...)
if err != nil {
return nil, fmt.Errorf("failed to create Int64SyncCounter: %w", err)
}
syncCounter.observableCounter = observableCounter
return &Int64SyncCounter{syncCounter: syncCounter}, nil
}

func Int64Gauge(meter metric.Meter, name string, opts ...metric.Int64GaugeOption) (metric.Int64Gauge, error) {
gaugeConfig := metric.NewInt64GaugeConfig(opts...)
return NewInt64SyncGauge(meter, name,
Expand All @@ -188,12 +114,3 @@ func Float64Gauge(meter metric.Meter, name string, opts ...metric.Float64GaugeOp
metric.WithUnit(gaugeConfig.Unit()),
)
}

// TODO this implementation is still pending
func Int64Counter(meter metric.Meter, name string, opts ...metric.Int64CounterOption) (metric.Int64Counter, error) {
counterConfig := metric.NewInt64CounterConfig(opts...)
return NewInt64SyncCounter(meter, name,
metric.WithDescription(counterConfig.Description()),
metric.WithUnit(counterConfig.Unit()),
)
}

0 comments on commit d0f00d2

Please sign in to comment.