Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support otel metrics for existing metrics #51

Merged
merged 11 commits into from
Aug 3, 2023
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ jobs:
- run: go mod tidy -v
- uses: dominikh/[email protected]
with:
version: "2022.1.1"
version: "2023.1.3"
install-go: false
- name: Verify repo is up-to-date
run: |
Expand Down
96 changes: 40 additions & 56 deletions appender.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ import (
"github.com/elastic/go-elasticsearch/v8"
"go.elastic.co/apm/module/apmzap/v2"
"go.elastic.co/apm/v2"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
"go.uber.org/zap"
Expand Down Expand Up @@ -60,6 +59,7 @@ var (
// Up to `config.MaxRequests` bulk requests may be flushing/active concurrently, to allow the
// server to make progress encoding while Elasticsearch is busy servicing flushed bulk requests.
type Appender struct {
// legacy metrics for Stats()
bulkRequests int64
docsAdded int64
docsActive int64
Expand All @@ -81,12 +81,10 @@ type Appender struct {
errgroup errgroup.Group
errgroupContext context.Context
cancelErrgroupContext context.CancelFunc
bufferDuration metric.Float64Histogram
flushDuration metric.Float64Histogram
telemetryAttrs attribute.Set

mu sync.Mutex
closed chan struct{}
metrics metrics
mu sync.Mutex
closed chan struct{}
}

// New returns a new Appender that indexes documents into Elasticsearch.
Expand Down Expand Up @@ -126,31 +124,10 @@ func New(client *elasticsearch.Client, cfg Config) (*Appender, error) {
cfg.Scaling.IdleInterval = 30 * time.Second
}
}
if cfg.MeterProvider == nil {
cfg.MeterProvider = otel.GetMeterProvider()
}
meter := cfg.MeterProvider.Meter("github.com/elastic/go-docappender")
bufDuration, err := meter.Float64Histogram("elasticsearch.buffer.latency",
metric.WithUnit("s"),
metric.WithDescription(
"The amount of time a document was buffered for, in seconds.",
),
)
if err != nil {
return nil, fmt.Errorf(
"failed creating elasticsearch.buffer.latency metric: %w", err,
)
}
flushDuration, err := meter.Float64Histogram("elasticsearch.flushed.latency",
metric.WithUnit("s"),
metric.WithDescription(
"The amount of time a _bulk request took, in seconds.",
),
)

ms, err := newMetrics(cfg)
if err != nil {
return nil, fmt.Errorf(
"failed creating elasticsearch.flushed.latency metric: %w", err,
)
return nil, err
}
available := make(chan *bulkIndexer, cfg.MaxRequests)
for i := 0; i < cfg.MaxRequests; i++ {
Expand All @@ -160,15 +137,14 @@ func New(client *elasticsearch.Client, cfg Config) (*Appender, error) {
cfg.Logger = zap.NewNop()
}
indexer := &Appender{
availableBulkRequests: int64(len(available)),
config: cfg,
available: available,
closed: make(chan struct{}),
bulkItems: make(chan bulkIndexerItem, cfg.DocumentBufferSize),
bufferDuration: bufDuration,
flushDuration: flushDuration,
telemetryAttrs: attribute.NewSet(cfg.MetricAttributes...),
config: cfg,
available: available,
closed: make(chan struct{}),
bulkItems: make(chan bulkIndexerItem, cfg.DocumentBufferSize),
metrics: ms,
telemetryAttrs: cfg.MetricAttributes,
}
indexer.addCount(int64(len(available)), &indexer.availableBulkRequests, ms.availableBulkRequests)

// We create a cancellable context for the errgroup.Group for unblocking
// flushes when Close returns. We intentionally do not use errgroup.WithContext,
Expand Down Expand Up @@ -259,18 +235,26 @@ func (a *Appender) Add(ctx context.Context, index string, document io.Reader) er
return ErrClosed
case a.bulkItems <- item:
}
atomic.AddInt64(&a.docsAdded, 1)
atomic.AddInt64(&a.docsActive, 1)
a.addCount(1, &a.docsAdded, a.metrics.docsAdded)
a.addCount(1, &a.docsActive, a.metrics.docsActive)
return nil
}

func (a *Appender) addCount(delta int64, lm *int64, m metric.Int64Counter) {
// legacy metric
atomic.AddInt64(lm, delta)

attrs := metric.WithAttributeSet(a.config.MetricAttributes)
m.Add(context.Background(), delta, attrs)
}

func (a *Appender) flush(ctx context.Context, bulkIndexer *bulkIndexer) error {
n := bulkIndexer.Items()
if n == 0 {
return nil
}
defer atomic.AddInt64(&a.docsActive, -int64(n))
defer atomic.AddInt64(&a.bulkRequests, 1)
defer a.addCount(-int64(n), &a.docsActive, a.metrics.docsActive)
defer a.addCount(1, &a.bulkRequests, a.metrics.bulkRequests)

logger := a.config.Logger
if a.tracingEnabled() {
Expand All @@ -288,10 +272,10 @@ func (a *Appender) flush(ctx context.Context, bulkIndexer *bulkIndexer) error {
// Record the bulkIndexer buffer's length as the bytesTotal metric after
// the request has been flushed.
if flushed := bulkIndexer.BytesFlushed(); flushed > 0 {
atomic.AddInt64(&a.bytesTotal, int64(flushed))
a.addCount(int64(flushed), &a.bytesTotal, a.metrics.bytesTotal)
}
if err != nil {
atomic.AddInt64(&a.docsFailed, int64(n))
a.addCount(int64(n), &a.docsFailed, a.metrics.docsFailed)
logger.Error("bulk indexing request failed", zap.Error(err))
if a.tracingEnabled() {
apm.CaptureError(ctx, err).Send()
Expand All @@ -300,7 +284,7 @@ func (a *Appender) flush(ctx context.Context, bulkIndexer *bulkIndexer) error {
var errTooMany errorTooManyRequests
// 429 may be returned as errors from the bulk indexer.
if errors.As(err, &errTooMany) {
atomic.AddInt64(&a.tooManyRequests, int64(n))
a.addCount(int64(n), &a.tooManyRequests, a.metrics.tooManyRequests)
}
return err
}
Expand Down Expand Up @@ -335,19 +319,19 @@ func (a *Appender) flush(ctx context.Context, bulkIndexer *bulkIndexer) error {
}
}
if docsFailed > 0 {
atomic.AddInt64(&a.docsFailed, docsFailed)
a.addCount(docsFailed, &a.docsFailed, a.metrics.docsFailed)
}
if docsIndexed > 0 {
atomic.AddInt64(&a.docsIndexed, docsIndexed)
a.addCount(docsIndexed, &a.docsIndexed, a.metrics.docsIndexed)
}
if tooManyRequests > 0 {
atomic.AddInt64(&a.tooManyRequests, tooManyRequests)
a.addCount(tooManyRequests, &a.tooManyRequests, a.metrics.tooManyRequests)
}
if clientFailed > 0 {
atomic.AddInt64(&a.docsFailedClient, clientFailed)
a.addCount(clientFailed, &a.docsFailedClient, a.metrics.docsFailedClient)
}
if serverFailed > 0 {
atomic.AddInt64(&a.docsFailedServer, serverFailed)
a.addCount(serverFailed, &a.docsFailedServer, a.metrics.docsFailedServer)
}
logger.Debug(
"bulk request completed",
Expand Down Expand Up @@ -379,7 +363,7 @@ func (a *Appender) runActiveIndexer() {
// It doesn't account for the time spent in the buffered channel.
firstDocTS = time.Now()
active = <-a.available
atomic.AddInt64(&a.availableBulkRequests, -1)
a.addCount(-1, &a.availableBulkRequests, a.metrics.availableBulkRequests)
flushTimer.Reset(a.config.FlushInterval)
}
if err := active.add(item); err != nil {
Expand Down Expand Up @@ -442,13 +426,13 @@ func (a *Appender) runActiveIndexer() {
})
indexer.Reset()
a.available <- indexer
atomic.AddInt64(&a.availableBulkRequests, 1)
a.flushDuration.Record(context.Background(), took.Seconds(),
a.addCount(1, &a.availableBulkRequests, a.metrics.availableBulkRequests)
a.metrics.flushDuration.Record(context.Background(), took.Seconds(),
attrs,
)
return err
})
a.bufferDuration.Record(context.Background(),
a.metrics.bufferDuration.Record(context.Background(),
time.Since(firstDocTS).Seconds(), attrs,
)
}
Expand All @@ -458,11 +442,11 @@ func (a *Appender) runActiveIndexer() {
now := time.Now()
info := a.scalingInformation()
if a.maybeScaleDown(now, info, &timedFlush) {
atomic.AddInt64(&a.activeDestroyed, 1)
a.addCount(1, &a.activeDestroyed, a.metrics.activeDestroyed)
return
}
if a.maybeScaleUp(now, info, &fullFlush) {
atomic.AddInt64(&a.activeCreated, 1)
a.addCount(1, &a.activeCreated, a.metrics.activeCreated)
a.errgroup.Go(func() error {
a.runActiveIndexer()
return nil
Expand Down
75 changes: 69 additions & 6 deletions appender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,23 @@ func TestAppender(t *testing.T) {
}
json.NewEncoder(w).Encode(result)
})
indexer, err := docappender.New(client, docappender.Config{FlushInterval: time.Minute})

rdr := sdkmetric.NewManualReader(sdkmetric.WithTemporalitySelector(
func(ik sdkmetric.InstrumentKind) metricdata.Temporality {
return metricdata.DeltaTemporality
},
))

indexerAttrs := attribute.NewSet(
attribute.String("a", "b"), attribute.String("c", "d"),
)

indexer, err := docappender.New(client, docappender.Config{
FlushInterval: time.Minute,
MeterProvider: sdkmetric.NewMeterProvider(sdkmetric.WithReader(rdr)),
MetricAttributes: indexerAttrs,
})

require.NoError(t, err)
defer indexer.Close(context.Background())

Expand Down Expand Up @@ -117,6 +133,54 @@ loop:
AvailableBulkRequests: 10,
BytesTotal: bytesTotal,
}, stats)

var rm metricdata.ResourceMetrics
assert.NoError(t, rdr.Collect(context.Background(), &rm))
var asserted int
assertCounter := func(metric metricdata.Metrics, count int64, attrs attribute.Set) {
asserted++
counter := metric.Data.(metricdata.Sum[int64])
for _, dp := range counter.DataPoints {
assert.Equal(t, count, dp.Value)
assert.Equal(t, attrs, dp.Attributes)
}
}
// check the set of names and then check the counter or histogram
unexpectedMetrics := []string{}
for _, metric := range rm.ScopeMetrics[0].Metrics {
axw marked this conversation as resolved.
Show resolved Hide resolved
switch metric.Name {
case "elasticsearch.events.count":
assertCounter(metric, stats.Added, indexerAttrs)
case "elasticsearch.events.queued":
assertCounter(metric, stats.Active, indexerAttrs)
case "elasticsearch.bulk_requests.count":
assertCounter(metric, stats.BulkRequests, indexerAttrs)
case "elasticsearch.failed.count":
assertCounter(metric, stats.Failed, indexerAttrs)
case "elasticsearch.failed.client.count":
assertCounter(metric, stats.FailedClient, indexerAttrs)
case "elasticsearch.failed.server.count":
assertCounter(metric, stats.FailedServer, indexerAttrs)
case "elasticsearch.events.processed":
assertCounter(metric, stats.Indexed, indexerAttrs)
case "elasticsearch.failed.too_many_reqs":
assertCounter(metric, stats.TooManyRequests, indexerAttrs)
case "elasticsearch.bulk_requests.available":
assertCounter(metric, stats.AvailableBulkRequests, indexerAttrs)
case "elasticsearch.flushed.bytes":
assertCounter(metric, stats.BytesTotal, indexerAttrs)
case "elasticsearch.buffer.latency":
// expect this metric name but no assertions done
// as it's histogram and it's checked elsewhere
case "elasticsearch.flushed.latency":
// expect this metric name but no assertions done
// as it's histogram and it's checked elsewhere
default:
unexpectedMetrics = append(unexpectedMetrics, metric.Name)
}
axw marked this conversation as resolved.
Show resolved Hide resolved
}
assert.Empty(t, unexpectedMetrics)
assert.Equal(t, 10, asserted)
}

func TestAppenderAvailableAppenders(t *testing.T) {
Expand Down Expand Up @@ -279,9 +343,9 @@ func TestAppenderFlushMetric(t *testing.T) {
},
))

indexerAttrs := []attribute.KeyValue{
indexerAttrs := attribute.NewSet(
attribute.String("a", "b"), attribute.String("c", "d"),
}
)
indexer, err := docappender.New(client, docappender.Config{
FlushBytes: 1,
MeterProvider: sdkmetric.NewMeterProvider(sdkmetric.WithReader(rdr)),
Expand Down Expand Up @@ -332,13 +396,12 @@ func TestAppenderFlushMetric(t *testing.T) {
assert.Equal(t, attrs, dp.Attributes)
}
}
wantAttrs := attribute.NewSet(indexerAttrs...)
for _, metric := range rm.ScopeMetrics[0].Metrics {
switch metric.Name {
case "elasticsearch.buffer.latency":
assertHistogram(metric, docs, false, wantAttrs)
assertHistogram(metric, docs, false, indexerAttrs)
case "elasticsearch.flushed.latency":
assertHistogram(metric, 2, true, wantAttrs)
assertHistogram(metric, 2, true, indexerAttrs)
}
}
assert.Equal(t, 2, asserted)
Expand Down
2 changes: 1 addition & 1 deletion config.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ type Config struct {

// MetricAttributes holds any extra attributes to set in the recorded
// metrics.
MetricAttributes []attribute.KeyValue
MetricAttributes attribute.Set
}

// ScalingConfig holds the docappender autoscaling configuration.
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module github.com/elastic/go-docappender

go 1.19
go 1.20

require (
github.com/elastic/go-elasticsearch/v8 v8.9.0
Expand Down
Loading