Skip to content

Commit 0e0533f

Browse files
committed
feat: start hiding beats monitoring behind otel abstraction
1 parent 03c128c commit 0e0533f

File tree

11 files changed

+312
-144
lines changed

11 files changed

+312
-144
lines changed

cmd/apm-server/main.go

+3
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,9 @@ func main() {
3030
return beater.NewRunner(beater.RunnerParams{
3131
Config: args.Config,
3232
Logger: args.Logger,
33+
34+
MeterProvider: args.MeterProvider,
35+
MetricsGatherer: args.MetricsGatherer,
3336
})
3437
},
3538
})

internal/beatcmd/beat.go

+202-8
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,11 @@ import (
3333
"time"
3434

3535
"github.com/gofrs/uuid/v5"
36+
"go.elastic.co/apm/module/apmotel/v2"
37+
"go.opentelemetry.io/otel"
38+
"go.opentelemetry.io/otel/attribute"
39+
sdkmetric "go.opentelemetry.io/otel/sdk/metric"
40+
"go.opentelemetry.io/otel/sdk/metric/metricdata"
3641
"go.uber.org/zap"
3742
"go.uber.org/zap/exp/zapslog"
3843
"golang.org/x/sync/errgroup"
@@ -76,6 +81,10 @@ type Beat struct {
7681

7782
rawConfig *config.C
7883
newRunner NewRunnerFunc
84+
85+
metricReader *sdkmetric.ManualReader
86+
meterProvider *sdkmetric.MeterProvider
87+
metricGatherer *apmotel.Gatherer
7988
}
8089

8190
// BeatParams holds parameters for NewBeat.
@@ -109,6 +118,18 @@ func NewBeat(args BeatParams) (*Beat, error) {
109118
beatName = hostname
110119
}
111120

121+
exporter, err := apmotel.NewGatherer()
122+
if err != nil {
123+
return nil, err
124+
}
125+
126+
metricReader := sdkmetric.NewManualReader()
127+
meterProvider := sdkmetric.NewMeterProvider(
128+
sdkmetric.WithReader(exporter),
129+
sdkmetric.WithReader(metricReader),
130+
)
131+
otel.SetMeterProvider(meterProvider)
132+
112133
eid := uuid.FromStringOrNil(metricreport.EphemeralID().String())
113134
b := &Beat{
114135
Beat: beat.Beat{
@@ -127,9 +148,12 @@ func NewBeat(args BeatParams) (*Beat, error) {
127148
BeatConfig: cfg.APMServer,
128149
Registry: reload.NewRegistry(),
129150
},
130-
Config: cfg,
131-
newRunner: args.NewRunner,
132-
rawConfig: rawConfig,
151+
Config: cfg,
152+
newRunner: args.NewRunner,
153+
rawConfig: rawConfig,
154+
metricReader: metricReader,
155+
meterProvider: meterProvider,
156+
metricGatherer: &exporter,
133157
}
134158

135159
if err := b.init(); err != nil {
@@ -374,7 +398,7 @@ func (b *Beat) Run(ctx context.Context) error {
374398
}
375399

376400
if b.Manager.Enabled() {
377-
reloader, err := NewReloader(b.Info, b.Registry, b.newRunner)
401+
reloader, err := NewReloader(b.Info, b.Registry, b.newRunner, b.meterProvider, b.metricReader, b.metricGatherer)
378402
if err != nil {
379403
return err
380404
}
@@ -390,9 +414,12 @@ func (b *Beat) Run(ctx context.Context) error {
390414
return errors.New("no output defined, please define one under the output section")
391415
}
392416
runner, err := b.newRunner(RunnerParams{
393-
Config: b.rawConfig,
394-
Info: b.Info,
395-
Logger: logp.NewLogger(""),
417+
Config: b.rawConfig,
418+
Info: b.Info,
419+
Logger: logp.NewLogger(""),
420+
MeterProvider: b.meterProvider,
421+
MetricReader: b.metricReader,
422+
MetricsGatherer: b.metricGatherer,
396423
})
397424
if err != nil {
398425
return err
@@ -410,7 +437,12 @@ func (b *Beat) Run(ctx context.Context) error {
410437
// is then exposed through the HTTP monitoring endpoint (e.g. /info and /state)
411438
// and/or pushed to Elasticsearch through the x-pack monitoring feature.
412439
func (b *Beat) registerMetrics() {
413-
// info
440+
b.registerInfoMetrics()
441+
b.registerStateMetrics()
442+
b.registerStatsMetrics()
443+
}
444+
445+
func (b *Beat) registerInfoMetrics() {
414446
infoRegistry := monitoring.GetNamespace("info").GetRegistry()
415447
monitoring.NewString(infoRegistry, "version").Set(b.Info.Version)
416448
monitoring.NewString(infoRegistry, "beat").Set(b.Info.Beat)
@@ -436,7 +468,9 @@ func (b *Beat) registerMetrics() {
436468
monitoring.NewString(infoRegistry, "gid").Set(u.Gid)
437469
}
438470
}()
471+
}
439472

473+
func (b *Beat) registerStateMetrics() {
440474
stateRegistry := monitoring.GetNamespace("state").GetRegistry()
441475

442476
// state.service
@@ -457,6 +491,166 @@ func (b *Beat) registerMetrics() {
457491
monitoring.NewBool(managementRegistry, "enabled").Set(b.Manager.Enabled())
458492
}
459493

494+
func (b *Beat) registerStatsMetrics() {
495+
// TODO: we should ensure all metrics are produced in the expected JSON
496+
// hierarchy for _source compatibility.
497+
libbeatRegistry := monitoring.Default.GetRegistry("libbeat")
498+
monitoring.NewFunc(libbeatRegistry, "output", func(_ monitoring.Mode, v monitoring.Visitor) {
499+
var rm metricdata.ResourceMetrics
500+
if err := b.metricReader.Collect(context.Background(), &rm); err != nil {
501+
return
502+
}
503+
v.OnRegistryStart()
504+
defer v.OnRegistryFinished()
505+
monitoring.ReportString(v, "type", "elasticsearch")
506+
for _, sm := range rm.ScopeMetrics {
507+
switch {
508+
case sm.Scope.Name == "github.com/elastic/go-docappender":
509+
addDocappenderLibbeatOutputMetrics(context.Background(), v, sm)
510+
}
511+
}
512+
})
513+
monitoring.NewFunc(libbeatRegistry, "pipeline", func(_ monitoring.Mode, v monitoring.Visitor) {
514+
var rm metricdata.ResourceMetrics
515+
if err := b.metricReader.Collect(context.Background(), &rm); err != nil {
516+
return
517+
}
518+
v.OnRegistryStart()
519+
defer v.OnRegistryFinished()
520+
for _, sm := range rm.ScopeMetrics {
521+
switch {
522+
case sm.Scope.Name == "github.com/elastic/go-docappender":
523+
addDocappenderLibbeatPipelineMetrics(context.Background(), v, sm)
524+
}
525+
}
526+
})
527+
monitoring.NewFunc(monitoring.Default, "output.elasticsearch", func(_ monitoring.Mode, v monitoring.Visitor) {
528+
var rm metricdata.ResourceMetrics
529+
if err := b.metricReader.Collect(context.Background(), &rm); err != nil {
530+
return
531+
}
532+
v.OnRegistryStart()
533+
defer v.OnRegistryFinished()
534+
for _, sm := range rm.ScopeMetrics {
535+
switch {
536+
case sm.Scope.Name == "github.com/elastic/go-docappender":
537+
addDocappenderOutputElasticsearchMetrics(context.Background(), v, sm)
538+
}
539+
}
540+
})
541+
}
542+
543+
// getScalarInt64 returns a single-value, dimensionless
544+
// gauge or counter integer value, or (0, false) if the
545+
// data does not match these constraints.
546+
func getScalarInt64(data metricdata.Aggregation) (int64, bool) {
547+
switch data := data.(type) {
548+
case metricdata.Sum[int64]:
549+
if len(data.DataPoints) != 1 || data.DataPoints[0].Attributes.Len() != 0 {
550+
break
551+
}
552+
return data.DataPoints[0].Value, true
553+
case metricdata.Gauge[int64]:
554+
if len(data.DataPoints) != 1 || data.DataPoints[0].Attributes.Len() != 0 {
555+
break
556+
}
557+
return data.DataPoints[0].Value, true
558+
}
559+
return 0, false
560+
}
561+
562+
func addAPMServerMetrics(v monitoring.Visitor, sm metricdata.ScopeMetrics) {
563+
for _, m := range sm.Metrics {
564+
if suffix, ok := strings.CutPrefix(m.Name, "apm-server."); ok {
565+
if value, ok := getScalarInt64(m.Data); ok {
566+
monitoring.ReportInt(v, suffix, value)
567+
}
568+
}
569+
}
570+
}
571+
572+
// Adapt go-docappender's OTel metrics to beats stack monitoring metrics,
573+
// with a mixture of libbeat-specific and apm-server specific metric names.
574+
func addDocappenderLibbeatOutputMetrics(ctx context.Context, v monitoring.Visitor, sm metricdata.ScopeMetrics) {
575+
for _, m := range sm.Metrics {
576+
switch m.Name {
577+
case "elasticsearch.events.processed":
578+
var acked, toomany, failed int64
579+
data, _ := m.Data.(metricdata.Sum[int64])
580+
for _, dp := range data.DataPoints {
581+
status, ok := dp.Attributes.Value(attribute.Key("status"))
582+
if !ok {
583+
continue
584+
}
585+
switch status.AsString() {
586+
case "Success":
587+
acked++
588+
case "TooMany":
589+
toomany++
590+
fallthrough
591+
default:
592+
failed++
593+
}
594+
}
595+
monitoring.ReportInt(v, "events.acked", acked)
596+
monitoring.ReportInt(v, "events.failed", failed)
597+
monitoring.ReportInt(v, "events.toomany", toomany)
598+
case "elasticsearch.events.count":
599+
if value, ok := getScalarInt64(m.Data); ok {
600+
monitoring.ReportInt(v, "events.total", value)
601+
}
602+
case "elasticsearch.events.queued":
603+
if value, ok := getScalarInt64(m.Data); ok {
604+
monitoring.ReportInt(v, "events.active", value)
605+
}
606+
case "elasticsearch.flushed.bytes":
607+
if value, ok := getScalarInt64(m.Data); ok {
608+
monitoring.ReportInt(v, "write.bytes", value)
609+
}
610+
case "elasticsearch.bulk_requests.count":
611+
if value, ok := getScalarInt64(m.Data); ok {
612+
monitoring.ReportInt(v, "events.batches", value)
613+
}
614+
}
615+
}
616+
}
617+
618+
func addDocappenderLibbeatPipelineMetrics(ctx context.Context, v monitoring.Visitor, sm metricdata.ScopeMetrics) {
619+
for _, m := range sm.Metrics {
620+
switch m.Name {
621+
case "elasticsearch.events.count":
622+
if value, ok := getScalarInt64(m.Data); ok {
623+
monitoring.ReportInt(v, "events.total", value)
624+
}
625+
}
626+
}
627+
}
628+
629+
// Add non-libbeat Elasticsearch output metrics under "output.elasticsearch".
630+
func addDocappenderOutputElasticsearchMetrics(ctx context.Context, v monitoring.Visitor, sm metricdata.ScopeMetrics) {
631+
for _, m := range sm.Metrics {
632+
switch m.Name {
633+
case "elasticsearch.bulk_requests.count":
634+
if value, ok := getScalarInt64(m.Data); ok {
635+
monitoring.ReportInt(v, "bulk_requests.completed", value)
636+
}
637+
case "elasticsearch.bulk_requests.available":
638+
if value, ok := getScalarInt64(m.Data); ok {
639+
monitoring.ReportInt(v, "bulk_requests.available", value)
640+
}
641+
case "elasticsearch.indexers.created":
642+
if value, ok := getScalarInt64(m.Data); ok {
643+
monitoring.ReportInt(v, "indexers.created", value)
644+
}
645+
case "elasticsearch.indexers.destroyed":
646+
if value, ok := getScalarInt64(m.Data); ok {
647+
monitoring.ReportInt(v, "indexers.destroyed", value)
648+
}
649+
// TODO output.elasticsearch.indexers.active (created - destroyed?)
650+
}
651+
}
652+
}
653+
460654
// registerElasticsearchVerfication registers a global callback to make sure
461655
// the Elasticsearch instance we are connecting to has a valid license, and is
462656
// at least on the same version as APM Server.

internal/beatcmd/beat_test.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -220,7 +220,7 @@ func TestRunManager_Reloader(t *testing.T) {
220220
stopCount.Add(1)
221221
return nil
222222
}), nil
223-
})
223+
}, nil, nil, nil)
224224
require.NoError(t, err)
225225

226226
agentInfo := &proto.AgentInfo{
@@ -346,7 +346,7 @@ func TestRunManager_Reloader_newRunnerError(t *testing.T) {
346346

347347
_, err := NewReloader(beat.Info{}, registry, func(_ RunnerParams) (Runner, error) {
348348
return nil, errors.New("newRunner error")
349-
})
349+
}, nil, nil, nil)
350350
require.NoError(t, err)
351351

352352
onObserved := func(observed *proto.CheckinObserved, currentIdx int) {

internal/beatcmd/reloader.go

+27-1
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,9 @@ import (
2424
"sync"
2525

2626
"github.com/joeshaw/multierror"
27+
"go.elastic.co/apm/module/apmotel/v2"
28+
"go.opentelemetry.io/otel/metric"
29+
sdkmetric "go.opentelemetry.io/otel/sdk/metric"
2730
"golang.org/x/sync/errgroup"
2831

2932
"github.com/elastic/beats/v7/libbeat/beat"
@@ -50,6 +53,21 @@ type RunnerParams struct {
5053

5154
// Logger holds a logger to use for logging throughout the APM Server.
5255
Logger *logp.Logger
56+
57+
// MeterProvider holds a metric.MeterProvider that can be used for
58+
// creating metrics. The same MeterProvider is expected to be used
59+
// for each instance of the Runner, to ensure counter metrics are
60+
// not reset.
61+
//
62+
// NOTE(axw) metrics registered through this provider are used for
63+
// feeding into both Elastic APM (if enabled) and the libbeat
64+
// monitoring framework. For the latter, only gauge and counter
65+
// metrics are supported, and attributes (dimensions) are ignored.
66+
MeterProvider metric.MeterProvider
67+
68+
MetricsGatherer *apmotel.Gatherer
69+
70+
MetricReader *sdkmetric.ManualReader
5371
}
5472

5573
// Runner is an interface returned by NewRunnerFunc.
@@ -60,12 +78,16 @@ type Runner interface {
6078

6179
// NewReloader returns a new Reloader which creates Runners using the provided
6280
// beat.Info and NewRunnerFunc.
63-
func NewReloader(info beat.Info, registry *reload.Registry, newRunner NewRunnerFunc) (*Reloader, error) {
81+
func NewReloader(info beat.Info, registry *reload.Registry, newRunner NewRunnerFunc, meterProvider metric.MeterProvider, metricReader *sdkmetric.ManualReader, metricGatherer *apmotel.Gatherer) (*Reloader, error) {
6482
r := &Reloader{
6583
info: info,
6684
logger: logp.NewLogger(""),
6785
newRunner: newRunner,
6886
stopped: make(chan struct{}),
87+
88+
meterProvider: meterProvider,
89+
metricReader: metricReader,
90+
metricGatherer: metricGatherer,
6991
}
7092
if err := registry.RegisterList(reload.InputRegName, reloadableListFunc(r.reloadInputs)); err != nil {
7193
return nil, fmt.Errorf("failed to register inputs reloader: %w", err)
@@ -86,6 +108,10 @@ type Reloader struct {
86108
logger *logp.Logger
87109
newRunner NewRunnerFunc
88110

111+
meterProvider metric.MeterProvider
112+
metricReader *sdkmetric.ManualReader
113+
metricGatherer *apmotel.Gatherer
114+
89115
runner Runner
90116
stopRunner func() error
91117

internal/beatcmd/reloader_test.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ func TestReloader(t *testing.T) {
7272
<-ctx.Done()
7373
return nil
7474
}), nil
75-
})
75+
}, nil, nil, nil)
7676
require.NoError(t, err)
7777

7878
ctx, cancel := context.WithCancel(context.Background())
@@ -156,7 +156,7 @@ func TestReloaderNewRunnerParams(t *testing.T) {
156156
<-ctx.Done()
157157
return nil
158158
}), nil
159-
})
159+
}, nil, nil, nil)
160160
require.NoError(t, err)
161161

162162
ctx, cancel := context.WithCancel(context.Background())

0 commit comments

Comments
 (0)