diff --git a/exporter/elasticsearchexporter/data_stream_router.go b/exporter/elasticsearchexporter/data_stream_router.go index 0368f6a1b958..b95a66a913a2 100644 --- a/exporter/elasticsearchexporter/data_stream_router.go +++ b/exporter/elasticsearchexporter/data_stream_router.go @@ -8,7 +8,6 @@ import ( "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/plog" - "go.opentelemetry.io/collector/pdata/pmetric" "go.opentelemetry.io/collector/pdata/ptrace" ) @@ -60,7 +59,7 @@ func routeLogRecord( // routeDataPoint returns the name of the index to send the data point to according to data stream routing attributes. // This function may mutate record attributes. func routeDataPoint( - dataPoint pmetric.NumberDataPoint, + dataPoint dataPoint, scope pcommon.InstrumentationScope, resource pcommon.Resource, fIndex string, diff --git a/exporter/elasticsearchexporter/exporter.go b/exporter/elasticsearchexporter/exporter.go index 6cb64da0983d..9bf5deaa8873 100644 --- a/exporter/elasticsearchexporter/exporter.go +++ b/exporter/elasticsearchexporter/exporter.go @@ -157,27 +157,64 @@ func (e *elasticsearchExporter) pushMetricsData( for k := 0; k < scopeMetrics.Metrics().Len(); k++ { metric := scopeMetrics.Metrics().At(k) - // We only support Sum and Gauge metrics at the moment. - var dataPoints pmetric.NumberDataPointSlice - switch metric.Type() { - case pmetric.MetricTypeSum: - dataPoints = metric.Sum().DataPoints() - case pmetric.MetricTypeGauge: - dataPoints = metric.Gauge().DataPoints() - } - - for l := 0; l < dataPoints.Len(); l++ { - dataPoint := dataPoints.At(l) - fIndex, err := e.getMetricDataPointIndex(resource, scope, dataPoint) + upsertDataPoint := func(dp dataPoint, dpValue pcommon.Value) error { + fIndex, err := e.getMetricDataPointIndex(resource, scope, dp) if err != nil { - errs = append(errs, err) - continue + return err } if _, ok := resourceDocs[fIndex]; !ok { resourceDocs[fIndex] = make(map[uint32]objmodel.Document) } - if err := e.model.upsertMetricDataPoint(resourceDocs[fIndex], resource, scope, metric, dataPoint); err != nil { - errs = append(errs, err) + + if err = e.model.upsertMetricDataPointValue(resourceDocs[fIndex], resource, scope, metric, dp, dpValue); err != nil { + return err + } + return nil + } + + // TODO: support exponential histogram + switch metric.Type() { + case pmetric.MetricTypeSum: + dps := metric.Sum().DataPoints() + for l := 0; l < dps.Len(); l++ { + dp := dps.At(l) + val, err := numberToValue(dp) + if err != nil { + errs = append(errs, err) + continue + } + if err := upsertDataPoint(dp, val); err != nil { + errs = append(errs, err) + continue + } + } + case pmetric.MetricTypeGauge: + dps := metric.Gauge().DataPoints() + for l := 0; l < dps.Len(); l++ { + dp := dps.At(l) + val, err := numberToValue(dp) + if err != nil { + errs = append(errs, err) + continue + } + if err := upsertDataPoint(dp, val); err != nil { + errs = append(errs, err) + continue + } + } + case pmetric.MetricTypeHistogram: + dps := metric.Histogram().DataPoints() + for l := 0; l < dps.Len(); l++ { + dp := dps.At(l) + val, err := histogramToValue(dp) + if err != nil { + errs = append(errs, err) + continue + } + if err := upsertDataPoint(dp, val); err != nil { + errs = append(errs, err) + continue + } } } } @@ -208,10 +245,15 @@ func (e *elasticsearchExporter) pushMetricsData( return errors.Join(errs...) } +type dataPoint interface { + Timestamp() pcommon.Timestamp + Attributes() pcommon.Map +} + func (e *elasticsearchExporter) getMetricDataPointIndex( resource pcommon.Resource, scope pcommon.InstrumentationScope, - dataPoint pmetric.NumberDataPoint, + dataPoint dataPoint, ) (string, error) { fIndex := e.index if e.dynamicIndex { diff --git a/exporter/elasticsearchexporter/exporter_test.go b/exporter/elasticsearchexporter/exporter_test.go index 754cfaa4675f..92217b8c9015 100644 --- a/exporter/elasticsearchexporter/exporter_test.go +++ b/exporter/elasticsearchexporter/exporter_test.go @@ -495,6 +495,7 @@ func TestExporterMetrics(t *testing.T) { }, ) metrics.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(0).SetName("my.metric") + metrics.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(0).SetEmptySum().DataPoints().AppendEmpty().SetIntValue(0) mustSendMetrics(t, exporter, metrics) rec.WaitItems(1) diff --git a/exporter/elasticsearchexporter/model.go b/exporter/elasticsearchexporter/model.go index ccf76b5afdf6..36961ae18a0d 100644 --- a/exporter/elasticsearchexporter/model.go +++ b/exporter/elasticsearchexporter/model.go @@ -7,6 +7,7 @@ import ( "bytes" "encoding/binary" "encoding/json" + "errors" "fmt" "hash" "hash/fnv" @@ -65,7 +66,7 @@ var resourceAttrsToPreserve = map[string]bool{ type mappingModel interface { encodeLog(pcommon.Resource, plog.LogRecord, pcommon.InstrumentationScope) ([]byte, error) encodeSpan(pcommon.Resource, ptrace.Span, pcommon.InstrumentationScope) ([]byte, error) - upsertMetricDataPoint(map[uint32]objmodel.Document, pcommon.Resource, pcommon.InstrumentationScope, pmetric.Metric, pmetric.NumberDataPoint) error + upsertMetricDataPointValue(map[uint32]objmodel.Document, pcommon.Resource, pcommon.InstrumentationScope, pmetric.Metric, dataPoint, pcommon.Value) error encodeDocument(objmodel.Document) ([]byte, error) } @@ -185,7 +186,7 @@ func (m *encodeModel) encodeDocument(document objmodel.Document) ([]byte, error) return buf.Bytes(), nil } -func (m *encodeModel) upsertMetricDataPoint(documents map[uint32]objmodel.Document, resource pcommon.Resource, _ pcommon.InstrumentationScope, metric pmetric.Metric, dp pmetric.NumberDataPoint) error { +func (m *encodeModel) upsertMetricDataPointValue(documents map[uint32]objmodel.Document, resource pcommon.Resource, _ pcommon.InstrumentationScope, metric pmetric.Metric, dp dataPoint, value pcommon.Value) error { hash := metricHash(dp.Timestamp(), dp.Attributes()) var ( document objmodel.Document @@ -197,15 +198,66 @@ func (m *encodeModel) upsertMetricDataPoint(documents map[uint32]objmodel.Docume document.AddAttributes("", dp.Attributes()) } + document.AddAttribute(metric.Name(), value) + + documents[hash] = document + return nil +} + +func histogramToValue(dp pmetric.HistogramDataPoint) (pcommon.Value, error) { + bucketCounts := dp.BucketCounts() + explicitBounds := dp.ExplicitBounds() + if bucketCounts.Len() != explicitBounds.Len()+1 || explicitBounds.Len() == 0 { + return pcommon.Value{}, errors.New("error in histogram") + } + + vm := pcommon.NewValueMap() + m := vm.Map() + counts := m.PutEmptySlice("counts") + values := m.PutEmptySlice("values") + + values.EnsureCapacity(bucketCounts.Len()) + counts.EnsureCapacity(bucketCounts.Len()) + for i := 0; i < bucketCounts.Len(); i++ { + count := bucketCounts.At(i) + if count == 0 { + continue + } + + var value float64 + switch i { + // (-infinity, explicit_bounds[i]] + case 0: + value = explicitBounds.At(i) + if value > 0 { + value /= 2 + } + + // (explicit_bounds[i], +infinity) + case bucketCounts.Len() - 1: + value = explicitBounds.At(i - 1) + + // [explicit_bounds[i-1], explicit_bounds[i]) + default: + // Use the midpoint between the boundaries. + value = explicitBounds.At(i-1) + (explicitBounds.At(i)-explicitBounds.At(i-1))/2.0 + } + + counts.AppendEmpty().SetInt(int64(count)) + values.AppendEmpty().SetDouble(value) + } + + return vm, nil +} + +func numberToValue(dp pmetric.NumberDataPoint) (pcommon.Value, error) { switch dp.ValueType() { case pmetric.NumberDataPointValueTypeDouble: - document.AddAttribute(metric.Name(), pcommon.NewValueDouble(dp.DoubleValue())) + return pcommon.NewValueDouble(dp.DoubleValue()), nil case pmetric.NumberDataPointValueTypeInt: - document.AddAttribute(metric.Name(), pcommon.NewValueInt(dp.IntValue())) + return pcommon.NewValueInt(dp.IntValue()), nil } - - documents[hash] = document - return nil + return pcommon.Value{}, errors.New("invalid number data point") } func (m *encodeModel) encodeSpan(resource pcommon.Resource, span ptrace.Span, scope pcommon.InstrumentationScope) ([]byte, error) { diff --git a/exporter/elasticsearchexporter/model_test.go b/exporter/elasticsearchexporter/model_test.go index d3784e5081f1..c0be9a9c8fad 100644 --- a/exporter/elasticsearchexporter/model_test.go +++ b/exporter/elasticsearchexporter/model_test.go @@ -97,11 +97,14 @@ func TestEncodeMetric(t *testing.T) { var docsBytes [][]byte for i := 0; i < metrics.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(0).Sum().DataPoints().Len(); i++ { - err := model.upsertMetricDataPoint(docs, + val, err := numberToValue(metrics.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(0).Sum().DataPoints().At(i)) + require.NoError(t, err) + err = model.upsertMetricDataPointValue(docs, metrics.ResourceMetrics().At(0).Resource(), metrics.ResourceMetrics().At(0).ScopeMetrics().At(0).Scope(), metrics.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(0), - metrics.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(0).Sum().DataPoints().At(i)) + metrics.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(0).Sum().DataPoints().At(i), + val) require.NoError(t, err) } diff --git a/exporter/elasticsearchexporter/utils_test.go b/exporter/elasticsearchexporter/utils_test.go index f57f16272c24..e2c55a47609a 100644 --- a/exporter/elasticsearchexporter/utils_test.go +++ b/exporter/elasticsearchexporter/utils_test.go @@ -266,7 +266,9 @@ func newMetricsWithAttributeAndResourceMap(attrMp map[string]string, resMp map[s resourceMetrics := metrics.ResourceMetrics().AppendEmpty() fillResourceAttributeMap(resourceMetrics.Resource().Attributes(), resMp) - fillResourceAttributeMap(resourceMetrics.ScopeMetrics().AppendEmpty().Metrics().AppendEmpty().SetEmptySum().DataPoints().AppendEmpty().Attributes(), attrMp) + dp := resourceMetrics.ScopeMetrics().AppendEmpty().Metrics().AppendEmpty().SetEmptySum().DataPoints().AppendEmpty() + dp.SetIntValue(0) + fillResourceAttributeMap(dp.Attributes(), attrMp) return metrics }