From f74ba6910618d49a84f130f2b1bfb136ba647bfd Mon Sep 17 00:00:00 2001 From: Andrew Kroh Date: Mon, 28 Nov 2022 22:18:00 -0500 Subject: [PATCH] [Filebeat] Add inputmon package to unify input metrics (#33823) Add a helper package to ensure that input metrics are registered in a consistent manner. --- libbeat/monitoring/inputmon/input.go | 56 +++++++++++++++++++ .../input/awscloudwatch/cloudwatch.go | 3 +- x-pack/filebeat/input/awscloudwatch/input.go | 4 +- .../filebeat/input/awscloudwatch/metrics.go | 16 +++--- .../filebeat/input/awscloudwatch/processor.go | 3 +- x-pack/filebeat/input/awss3/input.go | 9 +-- .../input/awss3/input_benchmark_test.go | 4 +- x-pack/filebeat/input/awss3/metrics.go | 29 ++++------ x-pack/filebeat/input/awss3/metrics_test.go | 2 +- x-pack/filebeat/input/awss3/s3.go | 2 +- x-pack/filebeat/input/awss3/s3_objects.go | 7 +-- x-pack/filebeat/input/awss3/sqs.go | 2 +- x-pack/filebeat/input/awss3/sqs_s3_event.go | 8 +-- x-pack/filebeat/input/cel/input.go | 17 +++--- x-pack/filebeat/input/lumberjack/input.go | 4 +- x-pack/filebeat/input/lumberjack/metrics.go | 28 +++++----- x-pack/filebeat/input/lumberjack/server.go | 2 +- 17 files changed, 113 insertions(+), 83 deletions(-) create mode 100644 libbeat/monitoring/inputmon/input.go diff --git a/libbeat/monitoring/inputmon/input.go b/libbeat/monitoring/inputmon/input.go new file mode 100644 index 000000000000..905e4e9168d5 --- /dev/null +++ b/libbeat/monitoring/inputmon/input.go @@ -0,0 +1,56 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package inputmon + +import ( + "strings" + + "github.com/elastic/elastic-agent-libs/monitoring" +) + +// NewInputRegistry returns a new monitoring.Registry for metrics related to +// an input instance. The returned registry will be initialized with a static +// string values for the input and id. When the input stops it should invoke +// the returned cancel function to unregister the metrics. For testing purposes +// an optional monitoring.Registry may be provided as an alternative to using +// the global 'dataset' monitoring namespace. +func NewInputRegistry(inputType, id string, optionalParent *monitoring.Registry) (reg *monitoring.Registry, cancel func()) { + // Use the default registry unless one was provided (this would be for testing). + rootRegistry := optionalParent + if rootRegistry == nil { + rootRegistry = globalRegistry() + } + + // Sanitize dots from the id because they created nested objects within + // the monitoring registry, and we want a consistent flat level of nesting + key := sanitizeID(id) + + reg = rootRegistry.NewRegistry(key) + monitoring.NewString(reg, "input").Set(inputType) + monitoring.NewString(reg, "id").Set(id) + + return reg, func() { rootRegistry.Remove(key) } +} + +func sanitizeID(id string) string { + return strings.ReplaceAll(id, ".", "_") +} + +func globalRegistry() *monitoring.Registry { + return monitoring.GetNamespace("dataset").GetRegistry() +} diff --git a/x-pack/filebeat/input/awscloudwatch/cloudwatch.go b/x-pack/filebeat/input/awscloudwatch/cloudwatch.go index b2399e071ac1..ca54721bd279 100644 --- a/x-pack/filebeat/input/awscloudwatch/cloudwatch.go +++ b/x-pack/filebeat/input/awscloudwatch/cloudwatch.go @@ -16,7 +16,6 @@ import ( awscommon "github.com/elastic/beats/v7/x-pack/libbeat/common/aws" "github.com/elastic/elastic-agent-libs/logp" - "github.com/elastic/elastic-agent-libs/monitoring" ) type cloudwatchPoller struct { @@ -38,7 +37,7 @@ func newCloudwatchPoller(log *logp.Logger, metrics *inputMetrics, awsRegion string, apiSleep time.Duration, numberOfWorkers int, logStreams []*string, logStreamPrefix string) *cloudwatchPoller { if metrics == nil { - metrics = newInputMetrics(monitoring.NewRegistry(), "") + metrics = newInputMetrics("", nil) } return &cloudwatchPoller{ diff --git a/x-pack/filebeat/input/awscloudwatch/input.go b/x-pack/filebeat/input/awscloudwatch/input.go index 02e8d936ee50..03527856599d 100644 --- a/x-pack/filebeat/input/awscloudwatch/input.go +++ b/x-pack/filebeat/input/awscloudwatch/input.go @@ -22,7 +22,6 @@ import ( "github.com/elastic/beats/v7/libbeat/feature" awscommon "github.com/elastic/beats/v7/x-pack/libbeat/common/aws" conf "github.com/elastic/elastic-agent-libs/config" - "github.com/elastic/elastic-agent-libs/monitoring" "github.com/elastic/go-concert/unison" ) @@ -132,8 +131,7 @@ func (in *cloudwatchInput) Run(inputContext v2.Context, pipeline beat.Pipeline) } log := inputContext.Logger - metricRegistry := monitoring.GetNamespace("dataset").GetRegistry() - metrics := newInputMetrics(metricRegistry, inputContext.ID) + metrics := newInputMetrics(inputContext.ID, nil) cwPoller := newCloudwatchPoller( log.Named("cloudwatch_poller"), metrics, diff --git a/x-pack/filebeat/input/awscloudwatch/metrics.go b/x-pack/filebeat/input/awscloudwatch/metrics.go index c6f1b24493d8..0c7bcd7c8879 100644 --- a/x-pack/filebeat/input/awscloudwatch/metrics.go +++ b/x-pack/filebeat/input/awscloudwatch/metrics.go @@ -5,12 +5,12 @@ package awscloudwatch import ( + "github.com/elastic/beats/v7/libbeat/monitoring/inputmon" "github.com/elastic/elastic-agent-libs/monitoring" ) type inputMetrics struct { - id string // Input ID. - parent *monitoring.Registry // Parent registry holding this input's ID as a key. + unregister func() logEventsReceivedTotal *monitoring.Uint // Number of CloudWatch log events received. logGroupsTotal *monitoring.Uint // Logs collected from number of CloudWatch log groups. @@ -20,16 +20,14 @@ type inputMetrics struct { // Close removes the metrics from the registry. func (m *inputMetrics) Close() { - m.parent.Remove(m.id) + m.unregister() } -func newInputMetrics(parent *monitoring.Registry, id string) *inputMetrics { - reg := parent.NewRegistry(id) - monitoring.NewString(reg, "input").Set(inputName) - monitoring.NewString(reg, "id").Set(id) +func newInputMetrics(id string, optionalParent *monitoring.Registry) *inputMetrics { + reg, unreg := inputmon.NewInputRegistry(inputName, id, optionalParent) + out := &inputMetrics{ - id: id, - parent: parent, + unregister: unreg, logEventsReceivedTotal: monitoring.NewUint(reg, "log_events_received_total"), logGroupsTotal: monitoring.NewUint(reg, "log_groups_total"), cloudwatchEventsCreatedTotal: monitoring.NewUint(reg, "cloudwatch_events_created_total"), diff --git a/x-pack/filebeat/input/awscloudwatch/processor.go b/x-pack/filebeat/input/awscloudwatch/processor.go index 549b4a0f739c..999cad4d7f0f 100644 --- a/x-pack/filebeat/input/awscloudwatch/processor.go +++ b/x-pack/filebeat/input/awscloudwatch/processor.go @@ -14,7 +14,6 @@ import ( awscommon "github.com/elastic/beats/v7/x-pack/libbeat/common/aws" "github.com/elastic/elastic-agent-libs/logp" "github.com/elastic/elastic-agent-libs/mapstr" - "github.com/elastic/elastic-agent-libs/monitoring" ) type logProcessor struct { @@ -26,7 +25,7 @@ type logProcessor struct { func newLogProcessor(log *logp.Logger, metrics *inputMetrics, publisher beat.Client, ctx context.Context) *logProcessor { if metrics == nil { - metrics = newInputMetrics(monitoring.NewRegistry(), "") + metrics = newInputMetrics("", nil) } return &logProcessor{ log: log, diff --git a/x-pack/filebeat/input/awss3/input.go b/x-pack/filebeat/input/awss3/input.go index d54604602b1e..e9fe38731be6 100644 --- a/x-pack/filebeat/input/awss3/input.go +++ b/x-pack/filebeat/input/awss3/input.go @@ -21,7 +21,6 @@ import ( "github.com/elastic/beats/v7/libbeat/statestore" awscommon "github.com/elastic/beats/v7/x-pack/libbeat/common/aws" conf "github.com/elastic/elastic-agent-libs/config" - "github.com/elastic/elastic-agent-libs/monitoring" "github.com/elastic/go-concert/unison" ) @@ -186,9 +185,6 @@ func (in *s3Input) createSQSReceiver(ctx v2.Context, pipeline beat.Pipeline) (*s log.Infof("AWS SQS visibility_timeout is set to %v.", in.config.VisibilityTimeout) log.Infof("AWS SQS max_number_of_messages is set to %v.", in.config.MaxNumberOfMessages) - metricRegistry := monitoring.GetNamespace("dataset").GetRegistry() - metrics := newInputMetrics(metricRegistry, ctx.ID) - fileSelectors := in.config.FileSelectors if len(in.config.FileSelectors) == 0 { fileSelectors = []fileSelectorConfig{{ReaderConfig: in.config.ReaderConfig}} @@ -197,6 +193,7 @@ func (in *s3Input) createSQSReceiver(ctx v2.Context, pipeline beat.Pipeline) (*s if err != nil { return nil, err } + metrics := newInputMetrics(ctx.ID, nil) s3EventHandlerFactory := newS3ObjectProcessorFactory(log.Named("s3"), metrics, s3API, fileSelectors) sqsMessageHandler := newSQSS3EventProcessor(log.Named("sqs_s3_event"), metrics, sqsAPI, script, in.config.VisibilityTimeout, in.config.SQSMaxReceiveCount, pipeline, s3EventHandlerFactory) sqsReader := newSQSReader(log.Named("sqs"), metrics, sqsAPI, in.config.MaxNumberOfMessages, sqsMessageHandler) @@ -265,13 +262,11 @@ func (in *s3Input) createS3Lister(ctx v2.Context, cancelCtx context.Context, cli log.Infof("bucket_list_prefix is set to %v.", in.config.BucketListPrefix) log.Infof("AWS region is set to %v.", in.awsConfig.Region) - metricRegistry := monitoring.GetNamespace("dataset").GetRegistry() - metrics := newInputMetrics(metricRegistry, ctx.ID) - fileSelectors := in.config.FileSelectors if len(in.config.FileSelectors) == 0 { fileSelectors = []fileSelectorConfig{{ReaderConfig: in.config.ReaderConfig}} } + metrics := newInputMetrics(ctx.ID, nil) s3EventHandlerFactory := newS3ObjectProcessorFactory(log.Named("s3"), metrics, s3API, fileSelectors) s3Poller := newS3Poller(log.Named("s3_poller"), metrics, diff --git a/x-pack/filebeat/input/awss3/input_benchmark_test.go b/x-pack/filebeat/input/awss3/input_benchmark_test.go index b8d9f29ce362..0746b5e4a97a 100644 --- a/x-pack/filebeat/input/awss3/input_benchmark_test.go +++ b/x-pack/filebeat/input/awss3/input_benchmark_test.go @@ -199,7 +199,7 @@ func benchmarkInputSQS(t *testing.T, maxMessagesInflight int) testing.BenchmarkR return testing.Benchmark(func(b *testing.B) { log := logp.NewLogger(inputName) metricRegistry := monitoring.NewRegistry() - metrics := newInputMetrics(metricRegistry, "test_id") + metrics := newInputMetrics("test_id", metricRegistry) sqsAPI := newConstantSQS() s3API := newConstantS3(t) pipeline := &fakePipeline{} @@ -290,7 +290,7 @@ func benchmarkInputS3(t *testing.T, numberOfWorkers int) testing.BenchmarkResult log.Infof("benchmark with %d number of workers", numberOfWorkers) metricRegistry := monitoring.NewRegistry() - metrics := newInputMetrics(metricRegistry, "test_id") + metrics := newInputMetrics("test_id", metricRegistry) client := pubtest.NewChanClientWithCallback(100, func(event beat.Event) { event.Private.(*awscommon.EventACKTracker).ACK() diff --git a/x-pack/filebeat/input/awss3/metrics.go b/x-pack/filebeat/input/awss3/metrics.go index 59c9f4c6fe2d..2e56a55847f2 100644 --- a/x-pack/filebeat/input/awss3/metrics.go +++ b/x-pack/filebeat/input/awss3/metrics.go @@ -9,13 +9,13 @@ import ( "github.com/rcrowley/go-metrics" + "github.com/elastic/beats/v7/libbeat/monitoring/inputmon" "github.com/elastic/elastic-agent-libs/monitoring" "github.com/elastic/elastic-agent-libs/monitoring/adapter" ) type inputMetrics struct { - id string // Input ID. - parent *monitoring.Registry // Parent registry holding this input's ID as a key. + unregister func() sqsMessagesReceivedTotal *monitoring.Uint // Number of SQS messages received (not necessarily processed fully). sqsVisibilityTimeoutExtensionsTotal *monitoring.Uint // Number of SQS visibility timeout extensions. @@ -25,12 +25,9 @@ type inputMetrics struct { sqsMessageProcessingTime metrics.Sample // Histogram of the elapsed SQS processing times in nanoseconds (time of receipt to time of delete/return). s3ObjectsRequestedTotal *monitoring.Uint // Number of S3 objects downloaded. - // s3ObjectsAckedTotal is the number of S3 objects processed that were fully ACKed. - s3ObjectsAckedTotal *monitoring.Uint - // s3ObjectsListedTotal is the number of S3 objects returned by list operations. - s3ObjectsListedTotal *monitoring.Uint - // s3ObjectsProcessedTotal is the number of S3 objects that matched file_selectors rules. - s3ObjectsProcessedTotal *monitoring.Uint + s3ObjectsAckedTotal *monitoring.Uint // Number of S3 objects processed that were fully ACKed. + s3ObjectsListedTotal *monitoring.Uint // Number of S3 objects returned by list operations. + s3ObjectsProcessedTotal *monitoring.Uint // Number of S3 objects that matched file_selectors rules. s3BytesProcessedTotal *monitoring.Uint // Number of S3 bytes processed. s3EventsCreatedTotal *monitoring.Uint // Number of events created from processing S3 data. s3ObjectsInflight *monitoring.Uint // Number of S3 objects inflight (gauge). @@ -39,16 +36,14 @@ type inputMetrics struct { // Close removes the metrics from the registry. func (m *inputMetrics) Close() { - m.parent.Remove(m.id) + m.unregister() } -func newInputMetrics(parent *monitoring.Registry, id string) *inputMetrics { - reg := parent.NewRegistry(id) - monitoring.NewString(reg, "input").Set(inputName) - monitoring.NewString(reg, "id").Set(id) +func newInputMetrics(id string, optionalParent *monitoring.Registry) *inputMetrics { + reg, unreg := inputmon.NewInputRegistry(inputName, id, optionalParent) + out := &inputMetrics{ - id: id, - parent: parent, + unregister: unreg, sqsMessagesReceivedTotal: monitoring.NewUint(reg, "sqs_messages_received_total"), sqsVisibilityTimeoutExtensionsTotal: monitoring.NewUint(reg, "sqs_visibility_timeout_extensions_total"), sqsMessagesInflight: monitoring.NewUint(reg, "sqs_messages_inflight_gauge"), @@ -65,9 +60,9 @@ func newInputMetrics(parent *monitoring.Registry, id string) *inputMetrics { s3ObjectProcessingTime: metrics.NewUniformSample(1024), } adapter.NewGoMetrics(reg, "sqs_message_processing_time", adapter.Accept). - Register("histogram", metrics.NewHistogram(out.sqsMessageProcessingTime)) + Register("histogram", metrics.NewHistogram(out.sqsMessageProcessingTime)) //nolint:errcheck // A unique namespace is used so name collisions are impossible. adapter.NewGoMetrics(reg, "s3_object_processing_time", adapter.Accept). - Register("histogram", metrics.NewHistogram(out.s3ObjectProcessingTime)) + Register("histogram", metrics.NewHistogram(out.s3ObjectProcessingTime)) //nolint:errcheck // A unique namespace is used so name collisions are impossible. return out } diff --git a/x-pack/filebeat/input/awss3/metrics_test.go b/x-pack/filebeat/input/awss3/metrics_test.go index c15dfd1a193c..c40a6dd4a9e0 100644 --- a/x-pack/filebeat/input/awss3/metrics_test.go +++ b/x-pack/filebeat/input/awss3/metrics_test.go @@ -17,7 +17,7 @@ import ( func TestInputMetricsClose(t *testing.T) { reg := monitoring.NewRegistry() - metrics := newInputMetrics(reg, "aws-s3-aws.cloudfront_logs-8b312b5f-9f99-492c-b035-3dff354a1f01") + metrics := newInputMetrics("aws-s3-aws.cloudfront_logs-8b312b5f-9f99-492c-b035-3dff354a1f01", reg) metrics.Close() reg.Do(monitoring.Full, func(s string, _ interface{}) { diff --git a/x-pack/filebeat/input/awss3/s3.go b/x-pack/filebeat/input/awss3/s3.go index 5b1187e43174..54b52475ed9d 100644 --- a/x-pack/filebeat/input/awss3/s3.go +++ b/x-pack/filebeat/input/awss3/s3.go @@ -77,7 +77,7 @@ func newS3Poller(log *logp.Logger, bucketPollInterval time.Duration, ) *s3Poller { if metrics == nil { - metrics = newInputMetrics(monitoring.NewRegistry(), "") + metrics = newInputMetrics("", monitoring.NewRegistry()) } return &s3Poller{ numberOfWorkers: numberOfWorkers, diff --git a/x-pack/filebeat/input/awss3/s3_objects.go b/x-pack/filebeat/input/awss3/s3_objects.go index 0f273828dc40..0893a664e76a 100644 --- a/x-pack/filebeat/input/awss3/s3_objects.go +++ b/x-pack/filebeat/input/awss3/s3_objects.go @@ -20,16 +20,15 @@ import ( "strings" "time" - awscommon "github.com/elastic/beats/v7/x-pack/libbeat/common/aws" - "github.com/elastic/elastic-agent-libs/mapstr" - "github.com/aws/aws-sdk-go-v2/service/s3" "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/reader" "github.com/elastic/beats/v7/libbeat/reader/readfile" "github.com/elastic/beats/v7/libbeat/reader/readfile/encoding" + awscommon "github.com/elastic/beats/v7/x-pack/libbeat/common/aws" "github.com/elastic/elastic-agent-libs/logp" + "github.com/elastic/elastic-agent-libs/mapstr" "github.com/elastic/elastic-agent-libs/monitoring" ) @@ -47,7 +46,7 @@ type s3ObjectProcessorFactory struct { func newS3ObjectProcessorFactory(log *logp.Logger, metrics *inputMetrics, s3 s3Getter, sel []fileSelectorConfig) *s3ObjectProcessorFactory { if metrics == nil { - metrics = newInputMetrics(monitoring.NewRegistry(), "") + metrics = newInputMetrics("", monitoring.NewRegistry()) } if len(sel) == 0 { sel = []fileSelectorConfig{ diff --git a/x-pack/filebeat/input/awss3/sqs.go b/x-pack/filebeat/input/awss3/sqs.go index 55c1d742815a..f5f175d4f6d7 100644 --- a/x-pack/filebeat/input/awss3/sqs.go +++ b/x-pack/filebeat/input/awss3/sqs.go @@ -33,7 +33,7 @@ type sqsReader struct { func newSQSReader(log *logp.Logger, metrics *inputMetrics, sqs sqsAPI, maxMessagesInflight int, msgHandler sqsProcessor) *sqsReader { if metrics == nil { - metrics = newInputMetrics(monitoring.NewRegistry(), "") + metrics = newInputMetrics("", monitoring.NewRegistry()) } return &sqsReader{ maxMessagesInflight: maxMessagesInflight, diff --git a/x-pack/filebeat/input/awss3/sqs_s3_event.go b/x-pack/filebeat/input/awss3/sqs_s3_event.go index e94750c21d6b..3574ea7e0382 100644 --- a/x-pack/filebeat/input/awss3/sqs_s3_event.go +++ b/x-pack/filebeat/input/awss3/sqs_s3_event.go @@ -15,14 +15,12 @@ import ( "sync" "time" + "github.com/aws/aws-sdk-go-v2/service/sqs/types" "github.com/aws/smithy-go" + "go.uber.org/multierr" "github.com/elastic/beats/v7/libbeat/beat" awscommon "github.com/elastic/beats/v7/x-pack/libbeat/common/aws" - - "github.com/aws/aws-sdk-go-v2/service/sqs/types" - "go.uber.org/multierr" - "github.com/elastic/elastic-agent-libs/logp" "github.com/elastic/elastic-agent-libs/monitoring" ) @@ -108,7 +106,7 @@ func newSQSS3EventProcessor( s3 s3ObjectHandlerFactory, ) *sqsS3EventProcessor { if metrics == nil { - metrics = newInputMetrics(monitoring.NewRegistry(), "") + metrics = newInputMetrics("", monitoring.NewRegistry()) } return &sqsS3EventProcessor{ s3ObjectHandler: s3, diff --git a/x-pack/filebeat/input/cel/input.go b/x-pack/filebeat/input/cel/input.go index 9833ff742835..f958b593b2ef 100644 --- a/x-pack/filebeat/input/cel/input.go +++ b/x-pack/filebeat/input/cel/input.go @@ -41,6 +41,7 @@ import ( inputcursor "github.com/elastic/beats/v7/filebeat/input/v2/input-cursor" "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/feature" + "github.com/elastic/beats/v7/libbeat/monitoring/inputmon" "github.com/elastic/beats/v7/libbeat/version" "github.com/elastic/beats/v7/x-pack/filebeat/input/internal/httplog" "github.com/elastic/elastic-agent-libs/logp" @@ -104,7 +105,7 @@ func (input) run(env v2.Context, src *source, cursor map[string]interface{}, pub cfg := src.cfg log := env.Logger.With("input_url", cfg.Resource.URL) - metrics := newInputMetrics(monitoring.GetNamespace("dataset").GetRegistry(), env.ID) + metrics := newInputMetrics(env.ID) defer metrics.Close() ctx := ctxtool.FromCanceller(env.Cancelation) @@ -908,8 +909,7 @@ func test(url *url.URL) error { // inputMetrics handles the input's metric reporting. type inputMetrics struct { - id string - parent *monitoring.Registry + unregister func() resource *monitoring.String // URL-ish of input resource executions *monitoring.Uint // times the CEL program has been executed @@ -921,13 +921,10 @@ type inputMetrics struct { batchProcessingTime metrics.Sample // histogram of the elapsed successful batch processing times in nanoseconds (time of receipt to time of ACK for non-empty batches). } -func newInputMetrics(parent *monitoring.Registry, id string) *inputMetrics { - reg := parent.NewRegistry(id) - monitoring.NewString(reg, "input").Set(inputName) - monitoring.NewString(reg, "id").Set(id) +func newInputMetrics(id string) *inputMetrics { + reg, unreg := inputmon.NewInputRegistry(inputName, id, nil) out := &inputMetrics{ - id: id, - parent: reg, + unregister: unreg, resource: monitoring.NewString(reg, "resource"), executions: monitoring.NewUint(reg, "cel_executions"), batchesReceived: monitoring.NewUint(reg, "batches_received_total"), @@ -946,5 +943,5 @@ func newInputMetrics(parent *monitoring.Registry, id string) *inputMetrics { } func (m *inputMetrics) Close() { - m.parent.Remove(m.id) + m.unregister() } diff --git a/x-pack/filebeat/input/lumberjack/input.go b/x-pack/filebeat/input/lumberjack/input.go index 9471bb35e92b..61016fff67c1 100644 --- a/x-pack/filebeat/input/lumberjack/input.go +++ b/x-pack/filebeat/input/lumberjack/input.go @@ -11,7 +11,6 @@ import ( "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/feature" conf "github.com/elastic/elastic-agent-libs/config" - "github.com/elastic/elastic-agent-libs/monitoring" ) const ( @@ -73,8 +72,7 @@ func (i *lumberjackInput) Run(inputCtx inputv2.Context, pipeline beat.Pipeline) setGoLumberLogger(inputCtx.Logger.Named("go-lumber")) - metricRegistry := monitoring.GetNamespace("dataset").GetRegistry() - metrics := newInputMetrics(metricRegistry, inputCtx.ID) + metrics := newInputMetrics(inputCtx.ID, nil) defer metrics.Close() s, err := newServer(i.config, inputCtx.Logger, client.Publish, metrics) diff --git a/x-pack/filebeat/input/lumberjack/metrics.go b/x-pack/filebeat/input/lumberjack/metrics.go index 640b506c171b..f25a38c33324 100644 --- a/x-pack/filebeat/input/lumberjack/metrics.go +++ b/x-pack/filebeat/input/lumberjack/metrics.go @@ -7,33 +7,31 @@ package lumberjack import ( "github.com/rcrowley/go-metrics" + "github.com/elastic/beats/v7/libbeat/monitoring/inputmon" "github.com/elastic/elastic-agent-libs/monitoring" "github.com/elastic/elastic-agent-libs/monitoring/adapter" ) type inputMetrics struct { - id string // Input ID. - parent *monitoring.Registry // Parent registry holding this input's ID as a key. - bindAddress *monitoring.String // Bind address of input. - - batchesReceivedTotal *monitoring.Uint // Number of Lumberjack batches received (not necessarily processed fully). - batchesACKedTotal *monitoring.Uint // Number of Lumberjack batches ACKed. - messagesReceivedTotal *monitoring.Uint // Number of Lumberjack messages received (not necessarily processed fully). - batchProcessingTime metrics.Sample // Histogram of the elapsed batch processing times in nanoseconds (time of receipt to time of ACK for non-empty batches). + unregister func() + + bindAddress *monitoring.String // Bind address of input. + batchesReceivedTotal *monitoring.Uint // Number of Lumberjack batches received (not necessarily processed fully). + batchesACKedTotal *monitoring.Uint // Number of Lumberjack batches ACKed. + messagesReceivedTotal *monitoring.Uint // Number of Lumberjack messages received (not necessarily processed fully). + batchProcessingTime metrics.Sample // Histogram of the elapsed batch processing times in nanoseconds (time of receipt to time of ACK for non-empty batches). } // Close removes the metrics from the registry. func (m *inputMetrics) Close() { - m.parent.Remove(m.id) + m.unregister() } -func newInputMetrics(parent *monitoring.Registry, id string) *inputMetrics { - reg := parent.NewRegistry(id) - monitoring.NewString(reg, "input").Set(inputName) - monitoring.NewString(reg, "id").Set(id) +func newInputMetrics(id string, optionalParent *monitoring.Registry) *inputMetrics { + reg, unreg := inputmon.NewInputRegistry(inputName, id, optionalParent) + out := &inputMetrics{ - id: id, - parent: parent, + unregister: unreg, bindAddress: monitoring.NewString(reg, "bind_address"), batchesReceivedTotal: monitoring.NewUint(reg, "batches_received_total"), batchesACKedTotal: monitoring.NewUint(reg, "batches_acked_total"), diff --git a/x-pack/filebeat/input/lumberjack/server.go b/x-pack/filebeat/input/lumberjack/server.go index f12d988cb5b1..07cea8c33279 100644 --- a/x-pack/filebeat/input/lumberjack/server.go +++ b/x-pack/filebeat/input/lumberjack/server.go @@ -38,7 +38,7 @@ func newServer(c config, log *logp.Logger, pub func(beat.Event), metrics *inputM } if metrics == nil { - metrics = newInputMetrics(monitoring.NewRegistry(), "") + metrics = newInputMetrics("", monitoring.NewRegistry()) } bindURI := "tcp://" + bindAddress