Skip to content

Commit

Permalink
Bytes based batching for metrics
Browse files Browse the repository at this point in the history
Signed-off-by: Israel Blancas <[email protected]>
  • Loading branch information
iblancasa committed Mar 4, 2025
1 parent a8b2be1 commit fbb1ef4
Show file tree
Hide file tree
Showing 5 changed files with 370 additions and 95 deletions.
126 changes: 126 additions & 0 deletions exporter/exporterhelper/internal/sizer/metrics_sizer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package sizer // import "go.opentelemetry.io/collector/exporter/exporterhelper/internal/sizer"

import (
math_bits "math/bits"

"go.opentelemetry.io/collector/pdata/pmetric"
) // MetricsCountSizer returns the nunmber of metrics entries.

type MetricsSizer interface {
MetricsSize(md pmetric.Metrics) (count int)
ResourceMetricsSize(rm pmetric.ResourceMetrics) (count int)
ScopeMetricsSize(sm pmetric.ScopeMetrics) (count int)
MetricSize(m pmetric.Metric) int
DeltaSize(newItemSize int) int
NumberDataPointSize(ndp pmetric.NumberDataPoint) int
HistogramDataPointSize(hdp pmetric.HistogramDataPoint) int
ExponentialHistogramDataPointSize(ehdp pmetric.ExponentialHistogramDataPoint) int
SummaryDataPointSize(sdps pmetric.SummaryDataPoint) int
}

type MetricsBytesSizer struct {
pmetric.ProtoMarshaler
}

var _ MetricsSizer = &MetricsBytesSizer{}

// DeltaSize() returns the delta size of a proto slice when a new item is added.
// Example:
//
// prevSize := proto1.Size()
// proto1.RepeatedField().AppendEmpty() = proto2
//
// Then currSize of proto1 can be calculated as
//
// currSize := (prevSize + sizer.DeltaSize(proto2.Size()))
//
// This is derived from opentelemetry-collector/pdata/internal/data/protogen/metrics/v1/metrics.pb.go
// which is generated with gogo/protobuf.
func (s *MetricsBytesSizer) DeltaSize(newItemSize int) int {
return 1 + newItemSize + math_bits.Len64(uint64(newItemSize|1)+6)/7 //nolint:gosec // disable G115
}

type MetricsCountSizer struct{}

var _ MetricsSizer = &MetricsCountSizer{}

func (s *MetricsCountSizer) MetricsSize(md pmetric.Metrics) int {
return md.DataPointCount()
}

func (s *MetricsCountSizer) ResourceMetricsSize(rm pmetric.ResourceMetrics) (count int) {
for i := 0; i < rm.ScopeMetrics().Len(); i++ {
count += s.ScopeMetricsSize(rm.ScopeMetrics().At(i))
}
return count
}

func (s *MetricsCountSizer) ScopeMetricsSize(sm pmetric.ScopeMetrics) (count int) {
for i := 0; i < sm.Metrics().Len(); i++ {
count += s.MetricSize(sm.Metrics().At(i))
}
return count
}

func (s *MetricsCountSizer) MetricSize(m pmetric.Metric) int {
switch m.Type() {
case pmetric.MetricTypeGauge:
return m.Gauge().DataPoints().Len()
case pmetric.MetricTypeSum:
return m.Sum().DataPoints().Len()
case pmetric.MetricTypeHistogram:
return m.Histogram().DataPoints().Len()
case pmetric.MetricTypeExponentialHistogram:
return m.ExponentialHistogram().DataPoints().Len()
case pmetric.MetricTypeSummary:
return m.Summary().DataPoints().Len()
}
return 0
}

func (s *MetricsCountSizer) DeltaSize(newItemSize int) int {
return newItemSize
}

func (s *MetricsCountSizer) NumberDataPointSize(_ pmetric.NumberDataPoint) int {
return 1
}

func (s *MetricsCountSizer) HistogramDataPointSize(_ pmetric.HistogramDataPoint) int {
return 1
}

func (s *MetricsCountSizer) ExponentialHistogramDataPointSize(_ pmetric.ExponentialHistogramDataPoint) int {
return 1
}

func (s *MetricsCountSizer) SummaryDataPointSize(_ pmetric.SummaryDataPoint) int {
return 1
}

// func (s *LogsCountSizer) LogsSize(ld plog.Logs) int {
// return ld.LogRecordCount()
// }

// func (s *LogsCountSizer) ResourceLogsSize(rl plog.ResourceLogs) int {
// count := 0
// for k := 0; k < rl.ScopeLogs().Len(); k++ {
// count += rl.ScopeLogs().At(k).LogRecords().Len()
// }
// return count
// }

// func (s *LogsCountSizer) ScopeLogsSize(sl plog.ScopeLogs) int {
// return sl.LogRecords().Len()
// }

// func (s *LogsCountSizer) LogRecordSize(_ plog.LogRecord) int {
// return 1
// }

// func (s *LogsCountSizer) DeltaSize(newItemSize int) int {
// return newItemSize
// }
26 changes: 17 additions & 9 deletions exporter/exporterhelper/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"go.opentelemetry.io/collector/consumer/consumererror"
"go.opentelemetry.io/collector/exporter"
"go.opentelemetry.io/collector/exporter/exporterhelper/internal"
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/sizer"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/collector/pipeline"
)
Expand All @@ -24,16 +25,16 @@ var (
)

type metricsRequest struct {
md pmetric.Metrics
pusher consumer.ConsumeMetricsFunc
cachedItemsCount int
md pmetric.Metrics
pusher consumer.ConsumeMetricsFunc
cachedSize int
}

func newMetricsRequest(md pmetric.Metrics, pusher consumer.ConsumeMetricsFunc) Request {
return &metricsRequest{
md: md,
pusher: pusher,
cachedItemsCount: md.DataPointCount(),
md: md,
pusher: pusher,
cachedSize: -1,
}
}

Expand Down Expand Up @@ -66,11 +67,18 @@ func (req *metricsRequest) Export(ctx context.Context) error {
}

func (req *metricsRequest) ItemsCount() int {
return req.cachedItemsCount
return req.md.DataPointCount()
}

func (req *metricsRequest) setCachedItemsCount(count int) {
req.cachedItemsCount = count
func (req *metricsRequest) Size(sizer sizer.MetricsSizer) int {
if req.cachedSize == -1 {
req.cachedSize = sizer.MetricsSize(req.md)
}
return req.cachedSize
}

func (req *metricsRequest) setCachedSize(count int) {
req.cachedSize = count
}

type metricsExporter struct {
Expand Down
Loading

0 comments on commit fbb1ef4

Please sign in to comment.