Skip to content

Commit

Permalink
[exporter/elasticsearch] Add metric histogram support
Browse files Browse the repository at this point in the history
  • Loading branch information
carsonip committed Jul 11, 2024
1 parent c5cc10c commit 9841f48
Show file tree
Hide file tree
Showing 6 changed files with 128 additions and 29 deletions.
3 changes: 1 addition & 2 deletions exporter/elasticsearchexporter/data_stream_router.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (

"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/collector/pdata/ptrace"
)

Expand Down Expand Up @@ -60,7 +59,7 @@ func routeLogRecord(
// routeDataPoint returns the name of the index to send the data point to according to data stream routing attributes.
// This function may mutate record attributes.
func routeDataPoint(
dataPoint pmetric.NumberDataPoint,
dataPoint dataPoint,
scope pcommon.InstrumentationScope,
resource pcommon.Resource,
fIndex string,
Expand Down
76 changes: 59 additions & 17 deletions exporter/elasticsearchexporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,27 +157,64 @@ func (e *elasticsearchExporter) pushMetricsData(
for k := 0; k < scopeMetrics.Metrics().Len(); k++ {
metric := scopeMetrics.Metrics().At(k)

// We only support Sum and Gauge metrics at the moment.
var dataPoints pmetric.NumberDataPointSlice
switch metric.Type() {
case pmetric.MetricTypeSum:
dataPoints = metric.Sum().DataPoints()
case pmetric.MetricTypeGauge:
dataPoints = metric.Gauge().DataPoints()
}

for l := 0; l < dataPoints.Len(); l++ {
dataPoint := dataPoints.At(l)
fIndex, err := e.getMetricDataPointIndex(resource, scope, dataPoint)
upsertDataPoint := func(dp dataPoint, dpValue pcommon.Value) error {
fIndex, err := e.getMetricDataPointIndex(resource, scope, dp)
if err != nil {
errs = append(errs, err)
continue
return err
}
if _, ok := resourceDocs[fIndex]; !ok {
resourceDocs[fIndex] = make(map[uint32]objmodel.Document)
}
if err := e.model.upsertMetricDataPoint(resourceDocs[fIndex], resource, scope, metric, dataPoint); err != nil {
errs = append(errs, err)

if err = e.model.upsertMetricDataPointValue(resourceDocs[fIndex], resource, scope, metric, dp, dpValue); err != nil {
return err
}
return nil
}

// TODO: support exponential histogram
switch metric.Type() {
case pmetric.MetricTypeSum:
dps := metric.Sum().DataPoints()
for l := 0; l < dps.Len(); l++ {
dp := dps.At(l)
val, err := numberToValue(dp)
if err != nil {
errs = append(errs, err)
continue
}
if err := upsertDataPoint(dp, val); err != nil {
errs = append(errs, err)
continue
}
}
case pmetric.MetricTypeGauge:
dps := metric.Gauge().DataPoints()
for l := 0; l < dps.Len(); l++ {
dp := dps.At(l)
val, err := numberToValue(dp)
if err != nil {
errs = append(errs, err)
continue
}
if err := upsertDataPoint(dp, val); err != nil {
errs = append(errs, err)
continue
}
}
case pmetric.MetricTypeHistogram:
dps := metric.Histogram().DataPoints()
for l := 0; l < dps.Len(); l++ {
dp := dps.At(l)
val, err := histogramToValue(dp)
if err != nil {
errs = append(errs, err)
continue
}
if err := upsertDataPoint(dp, val); err != nil {
errs = append(errs, err)
continue
}
}
}
}
Expand Down Expand Up @@ -208,10 +245,15 @@ func (e *elasticsearchExporter) pushMetricsData(
return errors.Join(errs...)
}

type dataPoint interface {
Timestamp() pcommon.Timestamp
Attributes() pcommon.Map
}

func (e *elasticsearchExporter) getMetricDataPointIndex(
resource pcommon.Resource,
scope pcommon.InstrumentationScope,
dataPoint pmetric.NumberDataPoint,
dataPoint dataPoint,
) (string, error) {
fIndex := e.index
if e.dynamicIndex {
Expand Down
1 change: 1 addition & 0 deletions exporter/elasticsearchexporter/exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -495,6 +495,7 @@ func TestExporterMetrics(t *testing.T) {
},
)
metrics.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(0).SetName("my.metric")
metrics.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(0).SetEmptySum().DataPoints().AppendEmpty().SetIntValue(0)
mustSendMetrics(t, exporter, metrics)

rec.WaitItems(1)
Expand Down
66 changes: 59 additions & 7 deletions exporter/elasticsearchexporter/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"bytes"
"encoding/binary"
"encoding/json"
"errors"
"fmt"
"hash"
"hash/fnv"
Expand Down Expand Up @@ -65,7 +66,7 @@ var resourceAttrsToPreserve = map[string]bool{
type mappingModel interface {
encodeLog(pcommon.Resource, plog.LogRecord, pcommon.InstrumentationScope) ([]byte, error)
encodeSpan(pcommon.Resource, ptrace.Span, pcommon.InstrumentationScope) ([]byte, error)
upsertMetricDataPoint(map[uint32]objmodel.Document, pcommon.Resource, pcommon.InstrumentationScope, pmetric.Metric, pmetric.NumberDataPoint) error
upsertMetricDataPointValue(map[uint32]objmodel.Document, pcommon.Resource, pcommon.InstrumentationScope, pmetric.Metric, dataPoint, pcommon.Value) error
encodeDocument(objmodel.Document) ([]byte, error)
}

Expand Down Expand Up @@ -185,7 +186,7 @@ func (m *encodeModel) encodeDocument(document objmodel.Document) ([]byte, error)
return buf.Bytes(), nil
}

func (m *encodeModel) upsertMetricDataPoint(documents map[uint32]objmodel.Document, resource pcommon.Resource, _ pcommon.InstrumentationScope, metric pmetric.Metric, dp pmetric.NumberDataPoint) error {
func (m *encodeModel) upsertMetricDataPointValue(documents map[uint32]objmodel.Document, resource pcommon.Resource, _ pcommon.InstrumentationScope, metric pmetric.Metric, dp dataPoint, value pcommon.Value) error {
hash := metricHash(dp.Timestamp(), dp.Attributes())
var (
document objmodel.Document
Expand All @@ -197,15 +198,66 @@ func (m *encodeModel) upsertMetricDataPoint(documents map[uint32]objmodel.Docume
document.AddAttributes("", dp.Attributes())
}

document.AddAttribute(metric.Name(), value)

documents[hash] = document
return nil
}

func histogramToValue(dp pmetric.HistogramDataPoint) (pcommon.Value, error) {
bucketCounts := dp.BucketCounts()
explicitBounds := dp.ExplicitBounds()
if bucketCounts.Len() != explicitBounds.Len()+1 || explicitBounds.Len() == 0 {
return pcommon.Value{}, errors.New("error in histogram")
}

vm := pcommon.NewValueMap()
m := vm.Map()
counts := m.PutEmptySlice("counts")
values := m.PutEmptySlice("values")

values.EnsureCapacity(bucketCounts.Len())
counts.EnsureCapacity(bucketCounts.Len())
for i := 0; i < bucketCounts.Len(); i++ {
count := bucketCounts.At(i)
if count == 0 {
continue
}

var value float64
switch i {
// (-infinity, explicit_bounds[i]]
case 0:
value = explicitBounds.At(i)
if value > 0 {
value /= 2
}

// (explicit_bounds[i], +infinity)
case bucketCounts.Len() - 1:
value = explicitBounds.At(i - 1)

// [explicit_bounds[i-1], explicit_bounds[i])
default:
// Use the midpoint between the boundaries.
value = explicitBounds.At(i-1) + (explicitBounds.At(i)-explicitBounds.At(i-1))/2.0
}

counts.AppendEmpty().SetInt(int64(count))
values.AppendEmpty().SetDouble(value)
}

return vm, nil
}

func numberToValue(dp pmetric.NumberDataPoint) (pcommon.Value, error) {
switch dp.ValueType() {
case pmetric.NumberDataPointValueTypeDouble:
document.AddAttribute(metric.Name(), pcommon.NewValueDouble(dp.DoubleValue()))
return pcommon.NewValueDouble(dp.DoubleValue()), nil
case pmetric.NumberDataPointValueTypeInt:
document.AddAttribute(metric.Name(), pcommon.NewValueInt(dp.IntValue()))
return pcommon.NewValueInt(dp.IntValue()), nil
}

documents[hash] = document
return nil
return pcommon.Value{}, errors.New("invalid number data point")
}

func (m *encodeModel) encodeSpan(resource pcommon.Resource, span ptrace.Span, scope pcommon.InstrumentationScope) ([]byte, error) {
Expand Down
7 changes: 5 additions & 2 deletions exporter/elasticsearchexporter/model_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,11 +97,14 @@ func TestEncodeMetric(t *testing.T) {

var docsBytes [][]byte
for i := 0; i < metrics.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(0).Sum().DataPoints().Len(); i++ {
err := model.upsertMetricDataPoint(docs,
val, err := numberToValue(metrics.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(0).Sum().DataPoints().At(i))
require.NoError(t, err)
err = model.upsertMetricDataPointValue(docs,
metrics.ResourceMetrics().At(0).Resource(),
metrics.ResourceMetrics().At(0).ScopeMetrics().At(0).Scope(),
metrics.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(0),
metrics.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(0).Sum().DataPoints().At(i))
metrics.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(0).Sum().DataPoints().At(i),
val)
require.NoError(t, err)
}

Expand Down
4 changes: 3 additions & 1 deletion exporter/elasticsearchexporter/utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,9 @@ func newMetricsWithAttributeAndResourceMap(attrMp map[string]string, resMp map[s
resourceMetrics := metrics.ResourceMetrics().AppendEmpty()

fillResourceAttributeMap(resourceMetrics.Resource().Attributes(), resMp)
fillResourceAttributeMap(resourceMetrics.ScopeMetrics().AppendEmpty().Metrics().AppendEmpty().SetEmptySum().DataPoints().AppendEmpty().Attributes(), attrMp)
dp := resourceMetrics.ScopeMetrics().AppendEmpty().Metrics().AppendEmpty().SetEmptySum().DataPoints().AppendEmpty()
dp.SetIntValue(0)
fillResourceAttributeMap(dp.Attributes(), attrMp)

return metrics
}
Expand Down

0 comments on commit 9841f48

Please sign in to comment.