Skip to content

Commit

Permalink
metrics: Add MeterProvider, 2 new flush metrics (#46)
Browse files Browse the repository at this point in the history
Adds 2 new arguments to `Config`; `MeterProvider`, `MetricAttributes`.
The `MeterProvider` is used to create and record two new metrics,
`elasticsearch.buffer.latency` and `elasticsearch.flushed.latency`. The
former measures how long a document is stored in the Bulk Indexer's
buffer for, while the latter records the Elasticsearch _bulk request
latency.

`elasticsearch.buffer.latency` does not take into account the time a
document spent in the buffered channel. Doing so is quite costly and
reduces the performance more than expected, so it was abandoned.

`elasticsearch.flushed.latency` not only measures the strict request
round trip, but also the response parsing and atomic metric operations.

---------

Signed-off-by: Marc Lopez Rubio <[email protected]>
  • Loading branch information
marclop authored Jul 18, 2023
1 parent 96531c3 commit c5b5ffa
Show file tree
Hide file tree
Showing 5 changed files with 180 additions and 6 deletions.
59 changes: 58 additions & 1 deletion appender.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ import (
"github.com/elastic/go-elasticsearch/v8"
"go.elastic.co/apm/module/apmzap/v2"
"go.elastic.co/apm/v2"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
)
Expand Down Expand Up @@ -78,6 +81,9 @@ type Appender struct {
errgroup errgroup.Group
errgroupContext context.Context
cancelErrgroupContext context.CancelFunc
bufferDuration metric.Float64Histogram
flushDuration metric.Float64Histogram
telemetryAttrs attribute.Set

mu sync.Mutex
closed chan struct{}
Expand Down Expand Up @@ -120,6 +126,32 @@ func New(client *elasticsearch.Client, cfg Config) (*Appender, error) {
cfg.Scaling.IdleInterval = 30 * time.Second
}
}
if cfg.MeterProvider == nil {
cfg.MeterProvider = otel.GetMeterProvider()
}
meter := cfg.MeterProvider.Meter("github.com/elastic/go-docappender")
bufDuration, err := meter.Float64Histogram("elasticsearch.buffer.latency",
metric.WithUnit("s"),
metric.WithDescription(
"The amount of time a document was buffered for, in seconds.",
),
)
if err != nil {
return nil, fmt.Errorf(
"failed creating elasticsearch.buffer.latency metric: %w", err,
)
}
flushDuration, err := meter.Float64Histogram("elasticsearch.flushed.latency",
metric.WithUnit("s"),
metric.WithDescription(
"The amount of time a _bulk request took, in seconds.",
),
)
if err != nil {
return nil, fmt.Errorf(
"failed creating elasticsearch.flushed.latency metric: %w", err,
)
}
available := make(chan *bulkIndexer, cfg.MaxRequests)
for i := 0; i < cfg.MaxRequests; i++ {
available <- newBulkIndexer(client, cfg.CompressionLevel)
Expand All @@ -133,6 +165,9 @@ func New(client *elasticsearch.Client, cfg Config) (*Appender, error) {
available: available,
closed: make(chan struct{}),
bulkItems: make(chan bulkIndexerItem, cfg.DocumentBufferSize),
bufferDuration: bufDuration,
flushDuration: flushDuration,
telemetryAttrs: attribute.NewSet(cfg.MetricAttributes...),
}

// We create a cancellable context for the errgroup.Group for unblocking
Expand Down Expand Up @@ -336,8 +371,12 @@ func (a *Appender) runActiveIndexer() {
if !flushTimer.Stop() {
<-flushTimer.C
}
var firstDocTS time.Time
handleBulkItem := func(item bulkIndexerItem) {
if active == nil {
// NOTE(marclop) Record the TS when the first document is cached.
// It doesn't account for the time spent in the buffered channel.
firstDocTS = time.Now()
active = <-a.available
atomic.AddInt64(&a.availableBulkRequests, -1)
flushTimer.Reset(a.config.FlushInterval)
Expand Down Expand Up @@ -394,13 +433,23 @@ func (a *Appender) runActiveIndexer() {
if active != nil {
indexer := active
active = nil
attrs := metric.WithAttributeSet(a.telemetryAttrs)
a.errgroup.Go(func() error {
err := a.flush(a.errgroupContext, indexer)
var err error
took := timeFunc(func() {
err = a.flush(a.errgroupContext, indexer)
})
indexer.Reset()
a.available <- indexer
atomic.AddInt64(&a.availableBulkRequests, 1)
a.flushDuration.Record(context.Background(), took.Seconds(),
attrs,
)
return err
})
a.bufferDuration.Record(context.Background(),
time.Since(firstDocTS).Seconds(), attrs,
)
}
if a.config.Scaling.Disabled {
continue
Expand Down Expand Up @@ -628,3 +677,11 @@ type Stats struct {
// Downscales represents the number of times an active indexer was destroyed.
IndexersDestroyed int64
}

func timeFunc(f func()) time.Duration {
t0 := time.Now()
if f != nil {
f()
}
return time.Since(t0)
}
84 changes: 84 additions & 0 deletions appender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ import (
"github.com/stretchr/testify/require"
"go.elastic.co/apm/v2/apmtest"
"go.elastic.co/apm/v2/model"
"go.opentelemetry.io/otel/attribute"
sdkmetric "go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"go.uber.org/zap/zaptest/observer"
Expand Down Expand Up @@ -260,6 +263,87 @@ func TestAppenderFlushInterval(t *testing.T) {
}
}

func TestAppenderFlushMetric(t *testing.T) {
requests := make(chan esutil.BulkIndexerResponse)
client := docappendertest.NewMockElasticsearchClient(t, func(_ http.ResponseWriter, r *http.Request) {
_, items := docappendertest.DecodeBulkRequest(r)
select {
case <-r.Context().Done():
case requests <- items:
}
})

rdr := sdkmetric.NewManualReader(sdkmetric.WithTemporalitySelector(
func(ik sdkmetric.InstrumentKind) metricdata.Temporality {
return metricdata.DeltaTemporality
},
))

indexerAttrs := []attribute.KeyValue{
attribute.String("a", "b"), attribute.String("c", "d"),
}
indexer, err := docappender.New(client, docappender.Config{
FlushBytes: 1,
MeterProvider: sdkmetric.NewMeterProvider(sdkmetric.WithReader(rdr)),
MetricAttributes: indexerAttrs,
})
require.NoError(t, err)
defer indexer.Close(context.Background())

select {
case <-requests:
t.Fatal("unexpected request, no documents buffered")
case <-time.After(50 * time.Millisecond):
}

docs := 10
for i := 0; i < docs; i++ {
addMinimalDoc(t, indexer, fmt.Sprintf("logs-foo-testing-%d", i))
}

timeout := time.After(time.Second)
for i := 0; i < docs; i++ {
select {
case res := <-requests:
assert.Len(t, res.Items, 1)
case <-timeout:
t.Fatal("timed out waiting for request, flush interval elapsed")
}
}

ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
defer cancel()

var rm metricdata.ResourceMetrics
assert.NoError(t, rdr.Collect(ctx, &rm))

var asserted int
assertHistogram := func(latencyMetric metricdata.Metrics, count int, ignoreCount bool, attrs attribute.Set) {
asserted++
assert.Equal(t, "s", latencyMetric.Unit)
histo := latencyMetric.Data.(metricdata.Histogram[float64])
for _, dp := range histo.DataPoints {
if !ignoreCount {
assert.Equal(t, count, int(dp.Count))
} else {
assert.Greater(t, int(dp.Count), count)
}
assert.Positive(t, dp.Sum)
assert.Equal(t, attrs, dp.Attributes)
}
}
wantAttrs := attribute.NewSet(indexerAttrs...)
for _, metric := range rm.ScopeMetrics[0].Metrics {
switch metric.Name {
case "elasticsearch.buffer.latency":
assertHistogram(metric, docs, false, wantAttrs)
case "elasticsearch.flushed.latency":
assertHistogram(metric, 2, true, wantAttrs)
}
}
assert.Equal(t, 2, asserted)
}

func TestAppenderFlushBytes(t *testing.T) {
requests := make(chan struct{}, 1)
client := docappendertest.NewMockElasticsearchClient(t, func(w http.ResponseWriter, r *http.Request) {
Expand Down
15 changes: 13 additions & 2 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import (
"time"

"go.elastic.co/apm/v2"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -70,12 +72,21 @@ type Config struct {
// If DocumentBufferSize is zero, the default 1024 will be used.
DocumentBufferSize int

// Tracer holds an optional apm.Tracer to use for tracing bulk requests
// to Elasticsearch. Each bulk request is traced as a transaction.
// Scaling configuration for the docappender.
//
// If unspecified, scaling is enabled by default.
Scaling ScalingConfig

// MeterProvider holds the OTel MeterProvider to be used to create and
// record appender metrics.
//
// If unset, the global OTel MeterProvider will be used, if that is unset,
// no metrics will be recorded.
MeterProvider metric.MeterProvider

// MetricAttributes holds any extra attributes to set in the recorded
// metrics.
MetricAttributes []attribute.KeyValue
}

// ScalingConfig holds the docappender autoscaling configuration.
Expand Down
9 changes: 8 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@ require (
go.elastic.co/apm/module/apmzap/v2 v2.4.3
go.elastic.co/apm/v2 v2.4.3
go.elastic.co/fastjson v1.3.0
go.opentelemetry.io/otel v1.16.0
go.opentelemetry.io/otel/metric v1.16.0
go.opentelemetry.io/otel/sdk/metric v0.39.0
go.uber.org/zap v1.24.0
golang.org/x/sync v0.3.0
)
Expand All @@ -20,14 +23,18 @@ require (
github.com/elastic/elastic-transport-go/v8 v8.3.0 // indirect
github.com/elastic/go-sysinfo v1.7.1 // indirect
github.com/elastic/go-windows v1.0.1 // indirect
github.com/google/go-cmp v0.5.4 // indirect
github.com/go-logr/logr v1.2.4 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/google/go-cmp v0.5.9 // indirect
github.com/joeshaw/multierror v0.0.0-20140124173710-69b34d4ec901 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/prometheus/procfs v0.7.3 // indirect
github.com/tidwall/match v1.1.1 // indirect
github.com/tidwall/pretty v1.2.0 // indirect
go.elastic.co/apm/module/apmhttp/v2 v2.4.3 // indirect
go.opentelemetry.io/otel/sdk v1.16.0 // indirect
go.opentelemetry.io/otel/trace v1.16.0 // indirect
go.uber.org/atomic v1.7.0 // indirect
go.uber.org/multierr v1.6.0 // indirect
golang.org/x/sys v0.8.0 // indirect
Expand Down
19 changes: 17 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,14 @@ github.com/elastic/go-sysinfo v1.7.1/go.mod h1:i1ZYdU10oLNfRzq4vq62BEwD2fH8KaWh6
github.com/elastic/go-windows v1.0.0/go.mod h1:TsU0Nrp7/y3+VwE82FoZF8gC/XFg/Elz6CcloAxnPgU=
github.com/elastic/go-windows v1.0.1 h1:AlYZOldA+UJ0/2nBuqWdo90GFCgG9xuyw9SYzGUtJm0=
github.com/elastic/go-windows v1.0.1/go.mod h1:FoVvqWSun28vaDQPbj2Elfc0JahhPB7WQEGa3c814Ss=
github.com/google/go-cmp v0.5.4 h1:L8R9j+yAqZuZjsqh/z+F1NCffTKKLShY6zXTItVIZ8M=
github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A=
github.com/go-logr/logr v1.2.4 h1:g01GSCwiDw2xSZfjJ2/T9M+S6pFdcNtFYsp+Y43HYDQ=
github.com/go-logr/logr v1.2.4/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A=
github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag=
github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE=
github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38=
github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/jessevdk/go-flags v1.4.0/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI=
github.com/joeshaw/multierror v0.0.0-20140124173710-69b34d4ec901 h1:rp+c0RAYOWj8l6qbCUTSiRLG/iKnW3K3/QfPPuSsBt4=
github.com/joeshaw/multierror v0.0.0-20140124173710-69b34d4ec901/go.mod h1:Z86h9688Y0wesXCyonoVr47MasHilkuLMqGhRZ4Hpak=
Expand Down Expand Up @@ -51,6 +57,16 @@ go.elastic.co/apm/v2 v2.4.3 h1:k6mj63O7IIyqqn3S52C2vBXvaSK9M5FHp0aZHpPH/as=
go.elastic.co/apm/v2 v2.4.3/go.mod h1:+CiBUdrrAGnGCL9TNx7tQz3BrfYV23L8Ljvotoc87so=
go.elastic.co/fastjson v1.3.0 h1:hJO3OsYIhiqiT4Fgu0ZxAECnKASbwgiS+LMW5oCopKs=
go.elastic.co/fastjson v1.3.0/go.mod h1:K9vDh7O0ODsVKV2B5e2XYLY277QZaCbB3tS1SnARvko=
go.opentelemetry.io/otel v1.16.0 h1:Z7GVAX/UkAXPKsy94IU+i6thsQS4nb7LviLpnaNeW8s=
go.opentelemetry.io/otel v1.16.0/go.mod h1:vl0h9NUa1D5s1nv3A5vZOYWn8av4K8Ml6JDeHrT/bx4=
go.opentelemetry.io/otel/metric v1.16.0 h1:RbrpwVG1Hfv85LgnZ7+txXioPDoh6EdbZHo26Q3hqOo=
go.opentelemetry.io/otel/metric v1.16.0/go.mod h1:QE47cpOmkwipPiefDwo2wDzwJrlfxxNYodqc4xnGCo4=
go.opentelemetry.io/otel/sdk v1.16.0 h1:Z1Ok1YsijYL0CSJpHt4cS3wDDh7p572grzNrBMiMWgE=
go.opentelemetry.io/otel/sdk v1.16.0/go.mod h1:tMsIuKXuuIWPBAOrH+eHtvhTL+SntFtXF9QD68aP6p4=
go.opentelemetry.io/otel/sdk/metric v0.39.0 h1:Kun8i1eYf48kHH83RucG93ffz0zGV1sh46FAScOTuDI=
go.opentelemetry.io/otel/sdk/metric v0.39.0/go.mod h1:piDIRgjcK7u0HCL5pCA4e74qpK/jk3NiUoAHATVAmiI=
go.opentelemetry.io/otel/trace v1.16.0 h1:8JRpaObFoW0pxuVPapkgH8UhHQj+bJW8jJsCZEu5MQs=
go.opentelemetry.io/otel/trace v1.16.0/go.mod h1:Yt9vYq1SdNz3xdjZZK7wcXv1qv2pwLkqr2QVwea0ef0=
go.uber.org/atomic v1.7.0 h1:ADUqmZGgLDDfbSL9ZmPxKTybcoEYHgpYfELNoN+7hsw=
go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
go.uber.org/goleak v1.1.11 h1:wy28qYRKZgnJTxGxvye5/wgWr1EKjmUDGYox5mGlRlI=
Expand All @@ -68,7 +84,6 @@ golang.org/x/sys v0.0.0-20210124154548-22da62e12c0c/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.8.0 h1:EBmGv8NaZBZTWvrbjNoL6HVt+IVy3QDQpJs7VRIw3tU=
golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 h1:H2TDz8ibqkAF6YGhCdN3jS9O0/s90v0rJh3X/OLHEUk=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
Expand Down

0 comments on commit c5b5ffa

Please sign in to comment.