From d0f00d28b62e777fc13a08271e162df7e0541612 Mon Sep 17 00:00:00 2001 From: Kunal Gupta <39487888+iamKunalGupta@users.noreply.github.com> Date: Tue, 14 Jan 2025 01:08:54 +0530 Subject: [PATCH] chore: remove sync counter and take it up later --- flow/otel_metrics/observables.go | 83 -------------------------------- 1 file changed, 83 deletions(-) diff --git a/flow/otel_metrics/observables.go b/flow/otel_metrics/observables.go index 36fc3c55f..6d9f97017 100644 --- a/flow/otel_metrics/observables.go +++ b/flow/otel_metrics/observables.go @@ -3,9 +3,7 @@ package otel_metrics import ( "context" "fmt" - "log/slog" "sync" - "sync/atomic" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/metric" @@ -101,78 +99,6 @@ func NewFloat64SyncGauge(meter metric.Meter, gaugeName string, opts ...metric.Fl return &Float64SyncGauge{syncGauge: syncGauge}, nil } -func atomicAdd[V any](a *V, b V) error { - switch v := any(b).(type) { - case int64: - atomic.AddInt64(any(*a).(*int64), v) - default: - return fmt.Errorf("unsupported type %T for atomicAdd", v) - } - return nil -} - -type SyncCounter[V comparable, O metric.Observable] struct { - observableCounter O - observations sync.Map - name string -} - -func (a *SyncCounter[V, O]) Callback(ctx context.Context, observeFunc func(value V, options ...metric.ObserveOption)) error { - a.observations.Range(func(key, value interface{}) bool { - attrs := key.(attribute.Set) - val := value.(*ObservationMapValue[V]) - observeFunc(val.Value, metric.WithAttributeSet(attrs)) - // TODO what to do for counters?? - // If the pointer is still same we can safely delete, else it means that the value was overwritten in parallel - a.observations.CompareAndDelete(attrs, val) - return true - }) - return nil -} - -func (a *SyncCounter[V, O]) Add(input V, attrs attribute.Set) { - val := ObservationMapValue[V]{Value: input} - - actual, loaded := a.observations.LoadOrStore(attrs, &val) - if loaded { - // If the value was already present, we need to add the new value to the existing value - existing := actual.(*ObservationMapValue[V]) - if err := atomicAdd(&existing.Value, val.Value); err != nil { - slog.Error("Failed to add value to existing value", slog.Any("error", err)) - } - } -} - -type Int64SyncCounter struct { - embedded.Int64Counter - syncCounter *SyncCounter[int64, metric.Int64ObservableCounter] -} - -func (i *Int64SyncCounter) Add(ctx context.Context, incr int64, options ...metric.AddOption) { - if i == nil { - return - } - c := metric.NewAddConfig(options) - i.syncCounter.Add(incr, c.Attributes()) -} - -func NewInt64SyncCounter(meter metric.Meter, counterName string, opts ...metric.Int64ObservableCounterOption) (*Int64SyncCounter, error) { - syncCounter := &SyncCounter[int64, metric.Int64ObservableCounter]{ - name: counterName, - } - observableCounter, err := meter.Int64ObservableCounter(counterName, - append(opts, metric.WithInt64Callback(func(ctx context.Context, observer metric.Int64Observer) error { - return syncCounter.Callback(ctx, func(value int64, options ...metric.ObserveOption) { - observer.Observe(value, options...) - }) - }))...) - if err != nil { - return nil, fmt.Errorf("failed to create Int64SyncCounter: %w", err) - } - syncCounter.observableCounter = observableCounter - return &Int64SyncCounter{syncCounter: syncCounter}, nil -} - func Int64Gauge(meter metric.Meter, name string, opts ...metric.Int64GaugeOption) (metric.Int64Gauge, error) { gaugeConfig := metric.NewInt64GaugeConfig(opts...) return NewInt64SyncGauge(meter, name, @@ -188,12 +114,3 @@ func Float64Gauge(meter metric.Meter, name string, opts ...metric.Float64GaugeOp metric.WithUnit(gaugeConfig.Unit()), ) } - -// TODO this implementation is still pending -func Int64Counter(meter metric.Meter, name string, opts ...metric.Int64CounterOption) (metric.Int64Counter, error) { - counterConfig := metric.NewInt64CounterConfig(opts...) - return NewInt64SyncCounter(meter, name, - metric.WithDescription(counterConfig.Description()), - metric.WithUnit(counterConfig.Unit()), - ) -}