Skip to content

Commit

Permalink
[Filebeat] Add inputmon package to unify input metrics (elastic#33823)
Browse files Browse the repository at this point in the history
Add a helper package to ensure that input metrics are registered in a consistent manner.
  • Loading branch information
andrewkroh authored Nov 29, 2022
1 parent ad5cbba commit f74ba69
Show file tree
Hide file tree
Showing 17 changed files with 113 additions and 83 deletions.
56 changes: 56 additions & 0 deletions libbeat/monitoring/inputmon/input.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package inputmon

import (
"strings"

"github.com/elastic/elastic-agent-libs/monitoring"
)

// NewInputRegistry returns a new monitoring.Registry for metrics related to
// an input instance. The returned registry will be initialized with a static
// string values for the input and id. When the input stops it should invoke
// the returned cancel function to unregister the metrics. For testing purposes
// an optional monitoring.Registry may be provided as an alternative to using
// the global 'dataset' monitoring namespace.
func NewInputRegistry(inputType, id string, optionalParent *monitoring.Registry) (reg *monitoring.Registry, cancel func()) {
// Use the default registry unless one was provided (this would be for testing).
rootRegistry := optionalParent
if rootRegistry == nil {
rootRegistry = globalRegistry()
}

// Sanitize dots from the id because they created nested objects within
// the monitoring registry, and we want a consistent flat level of nesting
key := sanitizeID(id)

reg = rootRegistry.NewRegistry(key)
monitoring.NewString(reg, "input").Set(inputType)
monitoring.NewString(reg, "id").Set(id)

return reg, func() { rootRegistry.Remove(key) }
}

func sanitizeID(id string) string {
return strings.ReplaceAll(id, ".", "_")
}

func globalRegistry() *monitoring.Registry {
return monitoring.GetNamespace("dataset").GetRegistry()
}
3 changes: 1 addition & 2 deletions x-pack/filebeat/input/awscloudwatch/cloudwatch.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import (

awscommon "github.com/elastic/beats/v7/x-pack/libbeat/common/aws"
"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/elastic-agent-libs/monitoring"
)

type cloudwatchPoller struct {
Expand All @@ -38,7 +37,7 @@ func newCloudwatchPoller(log *logp.Logger, metrics *inputMetrics,
awsRegion string, apiSleep time.Duration,
numberOfWorkers int, logStreams []*string, logStreamPrefix string) *cloudwatchPoller {
if metrics == nil {
metrics = newInputMetrics(monitoring.NewRegistry(), "")
metrics = newInputMetrics("", nil)
}

return &cloudwatchPoller{
Expand Down
4 changes: 1 addition & 3 deletions x-pack/filebeat/input/awscloudwatch/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"github.com/elastic/beats/v7/libbeat/feature"
awscommon "github.com/elastic/beats/v7/x-pack/libbeat/common/aws"
conf "github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/elastic-agent-libs/monitoring"
"github.com/elastic/go-concert/unison"
)

Expand Down Expand Up @@ -132,8 +131,7 @@ func (in *cloudwatchInput) Run(inputContext v2.Context, pipeline beat.Pipeline)
}

log := inputContext.Logger
metricRegistry := monitoring.GetNamespace("dataset").GetRegistry()
metrics := newInputMetrics(metricRegistry, inputContext.ID)
metrics := newInputMetrics(inputContext.ID, nil)
cwPoller := newCloudwatchPoller(
log.Named("cloudwatch_poller"),
metrics,
Expand Down
16 changes: 7 additions & 9 deletions x-pack/filebeat/input/awscloudwatch/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,12 @@
package awscloudwatch

import (
"github.com/elastic/beats/v7/libbeat/monitoring/inputmon"
"github.com/elastic/elastic-agent-libs/monitoring"
)

type inputMetrics struct {
id string // Input ID.
parent *monitoring.Registry // Parent registry holding this input's ID as a key.
unregister func()

logEventsReceivedTotal *monitoring.Uint // Number of CloudWatch log events received.
logGroupsTotal *monitoring.Uint // Logs collected from number of CloudWatch log groups.
Expand All @@ -20,16 +20,14 @@ type inputMetrics struct {

// Close removes the metrics from the registry.
func (m *inputMetrics) Close() {
m.parent.Remove(m.id)
m.unregister()
}

func newInputMetrics(parent *monitoring.Registry, id string) *inputMetrics {
reg := parent.NewRegistry(id)
monitoring.NewString(reg, "input").Set(inputName)
monitoring.NewString(reg, "id").Set(id)
func newInputMetrics(id string, optionalParent *monitoring.Registry) *inputMetrics {
reg, unreg := inputmon.NewInputRegistry(inputName, id, optionalParent)

out := &inputMetrics{
id: id,
parent: parent,
unregister: unreg,
logEventsReceivedTotal: monitoring.NewUint(reg, "log_events_received_total"),
logGroupsTotal: monitoring.NewUint(reg, "log_groups_total"),
cloudwatchEventsCreatedTotal: monitoring.NewUint(reg, "cloudwatch_events_created_total"),
Expand Down
3 changes: 1 addition & 2 deletions x-pack/filebeat/input/awscloudwatch/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (
awscommon "github.com/elastic/beats/v7/x-pack/libbeat/common/aws"
"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/elastic-agent-libs/mapstr"
"github.com/elastic/elastic-agent-libs/monitoring"
)

type logProcessor struct {
Expand All @@ -26,7 +25,7 @@ type logProcessor struct {

func newLogProcessor(log *logp.Logger, metrics *inputMetrics, publisher beat.Client, ctx context.Context) *logProcessor {
if metrics == nil {
metrics = newInputMetrics(monitoring.NewRegistry(), "")
metrics = newInputMetrics("", nil)
}
return &logProcessor{
log: log,
Expand Down
9 changes: 2 additions & 7 deletions x-pack/filebeat/input/awss3/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"github.com/elastic/beats/v7/libbeat/statestore"
awscommon "github.com/elastic/beats/v7/x-pack/libbeat/common/aws"
conf "github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/elastic-agent-libs/monitoring"
"github.com/elastic/go-concert/unison"
)

Expand Down Expand Up @@ -186,9 +185,6 @@ func (in *s3Input) createSQSReceiver(ctx v2.Context, pipeline beat.Pipeline) (*s
log.Infof("AWS SQS visibility_timeout is set to %v.", in.config.VisibilityTimeout)
log.Infof("AWS SQS max_number_of_messages is set to %v.", in.config.MaxNumberOfMessages)

metricRegistry := monitoring.GetNamespace("dataset").GetRegistry()
metrics := newInputMetrics(metricRegistry, ctx.ID)

fileSelectors := in.config.FileSelectors
if len(in.config.FileSelectors) == 0 {
fileSelectors = []fileSelectorConfig{{ReaderConfig: in.config.ReaderConfig}}
Expand All @@ -197,6 +193,7 @@ func (in *s3Input) createSQSReceiver(ctx v2.Context, pipeline beat.Pipeline) (*s
if err != nil {
return nil, err
}
metrics := newInputMetrics(ctx.ID, nil)
s3EventHandlerFactory := newS3ObjectProcessorFactory(log.Named("s3"), metrics, s3API, fileSelectors)
sqsMessageHandler := newSQSS3EventProcessor(log.Named("sqs_s3_event"), metrics, sqsAPI, script, in.config.VisibilityTimeout, in.config.SQSMaxReceiveCount, pipeline, s3EventHandlerFactory)
sqsReader := newSQSReader(log.Named("sqs"), metrics, sqsAPI, in.config.MaxNumberOfMessages, sqsMessageHandler)
Expand Down Expand Up @@ -265,13 +262,11 @@ func (in *s3Input) createS3Lister(ctx v2.Context, cancelCtx context.Context, cli
log.Infof("bucket_list_prefix is set to %v.", in.config.BucketListPrefix)
log.Infof("AWS region is set to %v.", in.awsConfig.Region)

metricRegistry := monitoring.GetNamespace("dataset").GetRegistry()
metrics := newInputMetrics(metricRegistry, ctx.ID)

fileSelectors := in.config.FileSelectors
if len(in.config.FileSelectors) == 0 {
fileSelectors = []fileSelectorConfig{{ReaderConfig: in.config.ReaderConfig}}
}
metrics := newInputMetrics(ctx.ID, nil)
s3EventHandlerFactory := newS3ObjectProcessorFactory(log.Named("s3"), metrics, s3API, fileSelectors)
s3Poller := newS3Poller(log.Named("s3_poller"),
metrics,
Expand Down
4 changes: 2 additions & 2 deletions x-pack/filebeat/input/awss3/input_benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ func benchmarkInputSQS(t *testing.T, maxMessagesInflight int) testing.BenchmarkR
return testing.Benchmark(func(b *testing.B) {
log := logp.NewLogger(inputName)
metricRegistry := monitoring.NewRegistry()
metrics := newInputMetrics(metricRegistry, "test_id")
metrics := newInputMetrics("test_id", metricRegistry)
sqsAPI := newConstantSQS()
s3API := newConstantS3(t)
pipeline := &fakePipeline{}
Expand Down Expand Up @@ -290,7 +290,7 @@ func benchmarkInputS3(t *testing.T, numberOfWorkers int) testing.BenchmarkResult
log.Infof("benchmark with %d number of workers", numberOfWorkers)

metricRegistry := monitoring.NewRegistry()
metrics := newInputMetrics(metricRegistry, "test_id")
metrics := newInputMetrics("test_id", metricRegistry)

client := pubtest.NewChanClientWithCallback(100, func(event beat.Event) {
event.Private.(*awscommon.EventACKTracker).ACK()
Expand Down
29 changes: 12 additions & 17 deletions x-pack/filebeat/input/awss3/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,13 @@ import (

"github.com/rcrowley/go-metrics"

"github.com/elastic/beats/v7/libbeat/monitoring/inputmon"
"github.com/elastic/elastic-agent-libs/monitoring"
"github.com/elastic/elastic-agent-libs/monitoring/adapter"
)

type inputMetrics struct {
id string // Input ID.
parent *monitoring.Registry // Parent registry holding this input's ID as a key.
unregister func()

sqsMessagesReceivedTotal *monitoring.Uint // Number of SQS messages received (not necessarily processed fully).
sqsVisibilityTimeoutExtensionsTotal *monitoring.Uint // Number of SQS visibility timeout extensions.
Expand All @@ -25,12 +25,9 @@ type inputMetrics struct {
sqsMessageProcessingTime metrics.Sample // Histogram of the elapsed SQS processing times in nanoseconds (time of receipt to time of delete/return).

s3ObjectsRequestedTotal *monitoring.Uint // Number of S3 objects downloaded.
// s3ObjectsAckedTotal is the number of S3 objects processed that were fully ACKed.
s3ObjectsAckedTotal *monitoring.Uint
// s3ObjectsListedTotal is the number of S3 objects returned by list operations.
s3ObjectsListedTotal *monitoring.Uint
// s3ObjectsProcessedTotal is the number of S3 objects that matched file_selectors rules.
s3ObjectsProcessedTotal *monitoring.Uint
s3ObjectsAckedTotal *monitoring.Uint // Number of S3 objects processed that were fully ACKed.
s3ObjectsListedTotal *monitoring.Uint // Number of S3 objects returned by list operations.
s3ObjectsProcessedTotal *monitoring.Uint // Number of S3 objects that matched file_selectors rules.
s3BytesProcessedTotal *monitoring.Uint // Number of S3 bytes processed.
s3EventsCreatedTotal *monitoring.Uint // Number of events created from processing S3 data.
s3ObjectsInflight *monitoring.Uint // Number of S3 objects inflight (gauge).
Expand All @@ -39,16 +36,14 @@ type inputMetrics struct {

// Close removes the metrics from the registry.
func (m *inputMetrics) Close() {
m.parent.Remove(m.id)
m.unregister()
}

func newInputMetrics(parent *monitoring.Registry, id string) *inputMetrics {
reg := parent.NewRegistry(id)
monitoring.NewString(reg, "input").Set(inputName)
monitoring.NewString(reg, "id").Set(id)
func newInputMetrics(id string, optionalParent *monitoring.Registry) *inputMetrics {
reg, unreg := inputmon.NewInputRegistry(inputName, id, optionalParent)

out := &inputMetrics{
id: id,
parent: parent,
unregister: unreg,
sqsMessagesReceivedTotal: monitoring.NewUint(reg, "sqs_messages_received_total"),
sqsVisibilityTimeoutExtensionsTotal: monitoring.NewUint(reg, "sqs_visibility_timeout_extensions_total"),
sqsMessagesInflight: monitoring.NewUint(reg, "sqs_messages_inflight_gauge"),
Expand All @@ -65,9 +60,9 @@ func newInputMetrics(parent *monitoring.Registry, id string) *inputMetrics {
s3ObjectProcessingTime: metrics.NewUniformSample(1024),
}
adapter.NewGoMetrics(reg, "sqs_message_processing_time", adapter.Accept).
Register("histogram", metrics.NewHistogram(out.sqsMessageProcessingTime))
Register("histogram", metrics.NewHistogram(out.sqsMessageProcessingTime)) //nolint:errcheck // A unique namespace is used so name collisions are impossible.
adapter.NewGoMetrics(reg, "s3_object_processing_time", adapter.Accept).
Register("histogram", metrics.NewHistogram(out.s3ObjectProcessingTime))
Register("histogram", metrics.NewHistogram(out.s3ObjectProcessingTime)) //nolint:errcheck // A unique namespace is used so name collisions are impossible.
return out
}

Expand Down
2 changes: 1 addition & 1 deletion x-pack/filebeat/input/awss3/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import (
func TestInputMetricsClose(t *testing.T) {
reg := monitoring.NewRegistry()

metrics := newInputMetrics(reg, "aws-s3-aws.cloudfront_logs-8b312b5f-9f99-492c-b035-3dff354a1f01")
metrics := newInputMetrics("aws-s3-aws.cloudfront_logs-8b312b5f-9f99-492c-b035-3dff354a1f01", reg)
metrics.Close()

reg.Do(monitoring.Full, func(s string, _ interface{}) {
Expand Down
2 changes: 1 addition & 1 deletion x-pack/filebeat/input/awss3/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func newS3Poller(log *logp.Logger,
bucketPollInterval time.Duration,
) *s3Poller {
if metrics == nil {
metrics = newInputMetrics(monitoring.NewRegistry(), "")
metrics = newInputMetrics("", monitoring.NewRegistry())
}
return &s3Poller{
numberOfWorkers: numberOfWorkers,
Expand Down
7 changes: 3 additions & 4 deletions x-pack/filebeat/input/awss3/s3_objects.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,15 @@ import (
"strings"
"time"

awscommon "github.com/elastic/beats/v7/x-pack/libbeat/common/aws"
"github.com/elastic/elastic-agent-libs/mapstr"

"github.com/aws/aws-sdk-go-v2/service/s3"

"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/reader"
"github.com/elastic/beats/v7/libbeat/reader/readfile"
"github.com/elastic/beats/v7/libbeat/reader/readfile/encoding"
awscommon "github.com/elastic/beats/v7/x-pack/libbeat/common/aws"
"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/elastic-agent-libs/mapstr"
"github.com/elastic/elastic-agent-libs/monitoring"
)

Expand All @@ -47,7 +46,7 @@ type s3ObjectProcessorFactory struct {

func newS3ObjectProcessorFactory(log *logp.Logger, metrics *inputMetrics, s3 s3Getter, sel []fileSelectorConfig) *s3ObjectProcessorFactory {
if metrics == nil {
metrics = newInputMetrics(monitoring.NewRegistry(), "")
metrics = newInputMetrics("", monitoring.NewRegistry())
}
if len(sel) == 0 {
sel = []fileSelectorConfig{
Expand Down
2 changes: 1 addition & 1 deletion x-pack/filebeat/input/awss3/sqs.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ type sqsReader struct {

func newSQSReader(log *logp.Logger, metrics *inputMetrics, sqs sqsAPI, maxMessagesInflight int, msgHandler sqsProcessor) *sqsReader {
if metrics == nil {
metrics = newInputMetrics(monitoring.NewRegistry(), "")
metrics = newInputMetrics("", monitoring.NewRegistry())
}
return &sqsReader{
maxMessagesInflight: maxMessagesInflight,
Expand Down
8 changes: 3 additions & 5 deletions x-pack/filebeat/input/awss3/sqs_s3_event.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,12 @@ import (
"sync"
"time"

"github.com/aws/aws-sdk-go-v2/service/sqs/types"
"github.com/aws/smithy-go"
"go.uber.org/multierr"

"github.com/elastic/beats/v7/libbeat/beat"
awscommon "github.com/elastic/beats/v7/x-pack/libbeat/common/aws"

"github.com/aws/aws-sdk-go-v2/service/sqs/types"
"go.uber.org/multierr"

"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/elastic-agent-libs/monitoring"
)
Expand Down Expand Up @@ -108,7 +106,7 @@ func newSQSS3EventProcessor(
s3 s3ObjectHandlerFactory,
) *sqsS3EventProcessor {
if metrics == nil {
metrics = newInputMetrics(monitoring.NewRegistry(), "")
metrics = newInputMetrics("", monitoring.NewRegistry())
}
return &sqsS3EventProcessor{
s3ObjectHandler: s3,
Expand Down
Loading

0 comments on commit f74ba69

Please sign in to comment.