diff --git a/go.mod b/go.mod index 7427b7a..7e0f41c 100644 --- a/go.mod +++ b/go.mod @@ -21,7 +21,9 @@ require ( github.com/smartystreets/assertions v1.13.0 github.com/smartystreets/goconvey v1.7.2 github.com/stretchr/testify v1.8.0 + go.opentelemetry.io/proto/otlp v0.19.0 google.golang.org/grpc v1.49.0 + google.golang.org/protobuf v1.28.1 ) require ( @@ -30,6 +32,7 @@ require ( github.com/go-stack/stack v1.8.1 // indirect github.com/gogo/googleapis v1.4.1 // indirect github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1 // indirect + github.com/grpc-ecosystem/grpc-gateway/v2 v2.11.1 // indirect github.com/josharian/intern v1.0.0 // indirect github.com/jtolds/gls v4.20.0+incompatible // indirect github.com/pmezard/go-difflib v1.0.0 // indirect @@ -43,6 +46,5 @@ require ( golang.org/x/sys v0.0.0-20220808155132-1c4a2a72c664 // indirect golang.org/x/text v0.3.7 // indirect google.golang.org/genproto v0.0.0-20220808204814-fd01256a5276 // indirect - google.golang.org/protobuf v1.28.1 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/go.sum b/go.sum index a681508..9e41b9e 100644 --- a/go.sum +++ b/go.sum @@ -432,6 +432,8 @@ github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69 github.com/golang-jwt/jwt/v4 v4.0.0/go.mod h1:/xlHOz8bRuivTWchD4jCa+NbatV+wEUSzwAxVc6locg= github.com/golang/freetype v0.0.0-20170609003504-e2365dfdc4a0/go.mod h1:E/TSTwGwJL78qG/PmXZO1EjYhfJinVAhrmmHX6Z8B9k= github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= +github.com/golang/glog v1.0.0 h1:nfP3RFugxnNRyKgeWd4oI1nYvXpxrx8ck8ZrcizshdQ= +github.com/golang/glog v1.0.0/go.mod h1:EWib/APOK0SL3dFbYqvxE3UYd8E6s1ouQ7iEp/0LWV4= github.com/golang/groupcache v0.0.0-20160516000752-02826c3e7903/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= github.com/golang/groupcache v0.0.0-20190129154638-5b532d6fd5ef/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= github.com/golang/groupcache v0.0.0-20190702054246-869f871628b6/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= @@ -566,6 +568,9 @@ github.com/grpc-ecosystem/grpc-gateway v1.9.0/go.mod h1:vNeuVxBJEsws4ogUvrchl83t github.com/grpc-ecosystem/grpc-gateway v1.9.5/go.mod h1:vNeuVxBJEsws4ogUvrchl83t/GYV9WGTSLVdBhOQFDY= github.com/grpc-ecosystem/grpc-gateway v1.14.5/go.mod h1:UJ0EZAp832vCd54Wev9N1BMKEyvcZ5+IM0AwDrnlkEc= github.com/grpc-ecosystem/grpc-gateway v1.16.0/go.mod h1:BDjrQk3hbvj6Nolgz8mAMFbcEtjT1g+wF4CSlocrBnw= +github.com/grpc-ecosystem/grpc-gateway/v2 v2.7.0/go.mod h1:hgWBS7lorOAVIJEQMi4ZsPv9hVvWI6+ch50m39Pf2Ks= +github.com/grpc-ecosystem/grpc-gateway/v2 v2.11.1 h1:/sDbPb60SusIXjiJGYLUoS/rAQurQmvGWmwn2bBPM9c= +github.com/grpc-ecosystem/grpc-gateway/v2 v2.11.1/go.mod h1:G+WkljZi4mflcqVxYSgvt8MNctRQHjEH8ubKtt1Ka3w= github.com/grpc-ecosystem/grpc-opentracing v0.0.0-20180507213350-8e809c8a8645/go.mod h1:6iZfnjpejD4L/4DwD7NryNaJyCQdzwWwH2MWhCA90Kw= github.com/hailocab/go-hostpool v0.0.0-20160125115350-e80d13ce29ed/go.mod h1:tMWxXQ9wFIaZeTI9F+hmhFiGpFmhOHzyShyFUhRm0H4= github.com/hashicorp/consul/api v1.1.0/go.mod h1:VmuI/Lkw1nC05EYQWNKwWGbkg+FbDBtguAZLlVdkD9Q= @@ -1228,6 +1233,8 @@ go.opentelemetry.io/otel/trace v1.7.0/go.mod h1:fzLSB9nqR2eXzxPXb2JW9IKE+ScyXA48 go.opentelemetry.io/otel/trace v1.8.0/go.mod h1:0Bt3PXY8w+3pheS3hQUt+wow8b1ojPaTBoTCh2zIFI4= go.opentelemetry.io/otel/trace v1.9.0/go.mod h1:2737Q0MuG8q1uILYm2YYVkAyLtOofiTNGg6VODnOiPo= go.opentelemetry.io/proto/otlp v0.7.0/go.mod h1:PqfVotwruBrMGOCsRd/89rSnXhoiJIqeYNgFYFoEGnI= +go.opentelemetry.io/proto/otlp v0.19.0 h1:IVN6GR+mhC4s5yfcTbmzHYODqvWAp3ZedA2SJPI1Nnw= +go.opentelemetry.io/proto/otlp v0.19.0/go.mod h1:H7XAot3MsfNsj7EXtrA2q5xSNQ10UqI405h3+duxN4U= go.uber.org/atomic v1.3.2/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE= go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE= go.uber.org/atomic v1.5.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ= diff --git a/protocol/otlp/decoder.go b/protocol/otlp/decoder.go new file mode 100644 index 0000000..4ca173e --- /dev/null +++ b/protocol/otlp/decoder.go @@ -0,0 +1,53 @@ +package otlp + +import ( + "bytes" + "context" + "net/http" + "sync" + + "github.com/signalfx/golib/v3/datapoint/dpsink" + "github.com/signalfx/golib/v3/log" + "github.com/signalfx/ingest-protocols/logkey" + "github.com/signalfx/ingest-protocols/protocol" + "github.com/signalfx/ingest-protocols/protocol/signalfx" + metricsservicev1 "go.opentelemetry.io/proto/otlp/collector/metrics/v1" + "google.golang.org/protobuf/proto" +) + +type httpMetricDecoder struct { + sink dpsink.Sink + logger log.Logger + buffs sync.Pool +} + +// NewHTTPMetricDecoder decodes OTLP metrics and puts them onto the provided sink. +func NewHTTPMetricDecoder(sink dpsink.Sink, logger log.Logger) signalfx.ErrorReader { + return &httpMetricDecoder{ + sink: sink, + logger: log.NewContext(logger).With(logkey.Protocol, "otlp"), + buffs: sync.Pool{ + New: func() interface{} { + return new(bytes.Buffer) + }, + }, + } +} + +func (d *httpMetricDecoder) Read(ctx context.Context, req *http.Request) (err error) { + jeff := d.buffs.Get().(*bytes.Buffer) + defer d.buffs.Put(jeff) + jeff.Reset() + if err = protocol.ReadFromRequest(jeff, req, d.logger); err != nil { + return err + } + var msg metricsservicev1.ExportMetricsServiceRequest + if err = proto.Unmarshal(jeff.Bytes(), &msg); err != nil { + return err + } + dps := FromOTLPMetricRequest(&msg) + if len(dps) > 0 { + err = d.sink.AddDatapoints(ctx, dps) + } + return err +} diff --git a/protocol/otlp/decoder_test.go b/protocol/otlp/decoder_test.go new file mode 100644 index 0000000..7ed16c1 --- /dev/null +++ b/protocol/otlp/decoder_test.go @@ -0,0 +1,99 @@ +package otlp + +import ( + "bytes" + "context" + "errors" + "io" + "net/http" + "sync" + "testing" + + "github.com/signalfx/golib/v3/datapoint/dptest" + "github.com/signalfx/golib/v3/log" + . "github.com/smartystreets/goconvey/convey" + metricsservicev1 "go.opentelemetry.io/proto/otlp/collector/metrics/v1" + commonv1 "go.opentelemetry.io/proto/otlp/common/v1" + metricsv1 "go.opentelemetry.io/proto/otlp/metrics/v1" + "google.golang.org/protobuf/proto" +) + +var errRead = errors.New("could not read") + +type errorReader struct{} + +func (errorReader *errorReader) Read([]byte) (int, error) { + return 0, errRead +} + +func TestDecoder(t *testing.T) { + Convey("httpMetricDecoder", t, func() { + sendTo := dptest.NewBasicSink() + decoder := NewHTTPMetricDecoder(sendTo, log.Discard) + + Convey("Bad request reading", func() { + req := &http.Request{ + Body: io.NopCloser(&errorReader{}), + } + req.ContentLength = 1 + ctx := context.Background() + So(decoder.Read(ctx, req), ShouldEqual, errRead) + }) + + Convey("Bad request content", func() { + req := &http.Request{ + Body: io.NopCloser(bytes.NewBufferString("asdf")), + } + req.ContentLength = 4 + ctx := context.Background() + So(decoder.Read(ctx, req), ShouldNotBeNil) + }) + + Convey("Good request", func(c C) { + var msg metricsservicev1.ExportMetricsServiceRequest + msg.ResourceMetrics = []*metricsv1.ResourceMetrics{ + { + ScopeMetrics: []*metricsv1.ScopeMetrics{ + { + Metrics: []*metricsv1.Metric{ + { + Name: "test", + Data: &metricsv1.Metric_Gauge{ + Gauge: &metricsv1.Gauge{ + DataPoints: []*metricsv1.NumberDataPoint{ + { + Attributes: []*commonv1.KeyValue{}, + StartTimeUnixNano: 1000, + TimeUnixNano: 1000, + Value: &metricsv1.NumberDataPoint_AsInt{AsInt: 4}, + }, + }, + }, + }, + }, + }, + }, + }, + }, + } + b, _ := proto.Marshal(&msg) + req := &http.Request{ + Body: io.NopCloser(bytes.NewBuffer(b)), + } + req.ContentLength = int64(len(b)) + ctx := context.Background() + + var wg sync.WaitGroup + wg.Add(1) + go func() { + dp := <-sendTo.PointsChan + c.So(dp, ShouldNotBeNil) + wg.Done() + }() + + So(decoder.Read(ctx, req), ShouldBeNil) + + wg.Wait() + }) + }) +} diff --git a/protocol/otlp/metrics.go b/protocol/otlp/metrics.go new file mode 100644 index 0000000..5d1f2b2 --- /dev/null +++ b/protocol/otlp/metrics.go @@ -0,0 +1,443 @@ +// Copyright OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// This code is copied and modified directly from the OTEL Collector: +// https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/1d6309bb62264cc7e2dda076ed95385b1ddef28a/pkg/translator/signalfx/from_metrics.go + +package otlp + +import ( + "encoding/base64" + "encoding/json" + "math" + "strconv" + "time" + + "github.com/signalfx/golib/v3/datapoint" + metricsservicev1 "go.opentelemetry.io/proto/otlp/collector/metrics/v1" + commonv1 "go.opentelemetry.io/proto/otlp/common/v1" + metricsv1 "go.opentelemetry.io/proto/otlp/metrics/v1" +) + +var ( + // Some standard dimension keys. + // upper bound dimension key for histogram buckets. + upperBoundDimensionKey = "le" + + // infinity bound dimension value is used on all histograms. + infinityBoundSFxDimValue = float64ToDimValue(math.Inf(1)) +) + +// SignalFxMetric is a single NumberDataPoint paired with a metric name such that it contains all of +// the information needed to convert it to a SignalFx datapoint. It serves as an intermediate +// object between an OTLP DataPoint and the SignalFx datapoint.Datapoint type. Atttribute values +// must be made into strings and attributes from the resource should be added to the attributes of +// the DP. +type SignalFxMetric struct { + Name string + Type datapoint.MetricType + DP metricsv1.NumberDataPoint +} + +// ToDatapoint converts the SignalFxMetric to a datapoint.Datapoint instance +func (s *SignalFxMetric) ToDatapoint() *datapoint.Datapoint { + return &datapoint.Datapoint{ + Metric: s.Name, + MetricType: s.Type, + Timestamp: time.Unix(0, int64(s.DP.GetTimeUnixNano())), + Dimensions: StringAttributesToDimensions(s.DP.GetAttributes()), + Value: numberToSignalFxValue(&s.DP), + } +} + +// StringAttributesToDimensions converts a list of string KVs into a map. +func StringAttributesToDimensions(attributes []*commonv1.KeyValue) map[string]string { + dimensions := make(map[string]string, len(attributes)) + if len(attributes) == 0 { + return dimensions + } + for _, kv := range attributes { + if kv.GetValue().GetValue() == nil { + continue + } + if v, ok := kv.GetValue().GetValue().(*commonv1.AnyValue_StringValue); ok { + if v.StringValue == "" { + // Don't bother setting things that serialize to nothing + continue + } + + dimensions[kv.Key] = v.StringValue + } else { + panic("attributes must be converted to string before using in SignalFxMetric") + } + } + return dimensions +} + +// FromOTLPMetricRequest converts the ResourceMetrics in an incoming request to SignalFx datapoints +func FromOTLPMetricRequest(md *metricsservicev1.ExportMetricsServiceRequest) []*datapoint.Datapoint { + return FromOTLPResourceMetrics(md.GetResourceMetrics()) +} + +// FromOTLPResourceMetrics converts OTLP ResourceMetrics to SignalFx datapoints. +func FromOTLPResourceMetrics(rms []*metricsv1.ResourceMetrics) []*datapoint.Datapoint { + return datapointsFromMetrics(SignalFxMetricsFromOTLPResourceMetrics(rms)) +} + +// FromMetric converts a OTLP Metric to SignalFx datapoint(s). +func FromMetric(m *metricsv1.Metric) []*datapoint.Datapoint { + return datapointsFromMetrics(SignalFxMetricsFromOTLPMetric(m)) +} + +func datapointsFromMetrics(sfxMetrics []SignalFxMetric) []*datapoint.Datapoint { + sfxDps := make([]*datapoint.Datapoint, len(sfxMetrics)) + for i := range sfxMetrics { + sfxDps[i] = sfxMetrics[i].ToDatapoint() + } + return sfxDps +} + +// SignalFxMetricsFromOTLPResourceMetrics creates the intermediate SignalFxMetric from OTLP metrics +// instead of going all the way to datapoint.Datapoint. +func SignalFxMetricsFromOTLPResourceMetrics(rms []*metricsv1.ResourceMetrics) []SignalFxMetric { + var sfxDps []SignalFxMetric + + for _, rm := range rms { + for _, ilm := range rm.GetScopeMetrics() { + for _, m := range ilm.GetMetrics() { + sfxDps = append(sfxDps, SignalFxMetricsFromOTLPMetric(m)...) + } + } + + resourceAttrs := stringifyAttributes(rm.GetResource().GetAttributes()) + for i := range sfxDps { + sfxDps[i].DP.Attributes = append(sfxDps[i].DP.Attributes, resourceAttrs...) + } + } + + return sfxDps +} + +// SignalFxMetricsFromOTLPMetric converts an OTLP Metric to a SignalFxMetric +func SignalFxMetricsFromOTLPMetric(m *metricsv1.Metric) []SignalFxMetric { + var sfxMetrics []SignalFxMetric + + data := m.GetData() + switch data.(type) { + case *metricsv1.Metric_Gauge: + sfxMetrics = convertNumberDataPoints(m.GetGauge().GetDataPoints(), deriveSignalFxMetricType(m), m.GetName()) + case *metricsv1.Metric_Sum: + sfxMetrics = convertNumberDataPoints(m.GetSum().GetDataPoints(), deriveSignalFxMetricType(m), m.GetName()) + case *metricsv1.Metric_Histogram: + sfxMetrics = convertHistogram(m.GetHistogram().GetDataPoints(), deriveSignalFxMetricType(m), m.GetName()) + case *metricsv1.Metric_ExponentialHistogram: + // TODO: Add support for these + case *metricsv1.Metric_Summary: + sfxMetrics = convertSummaryDataPoints(m.GetSummary().GetDataPoints(), m.GetName()) + } + + return sfxMetrics +} + +func deriveSignalFxMetricType(m *metricsv1.Metric) datapoint.MetricType { + switch m.GetData().(type) { + case *metricsv1.Metric_Gauge: + return datapoint.Gauge + + case *metricsv1.Metric_Sum: + if !m.GetSum().GetIsMonotonic() { + return datapoint.Gauge + } + if m.GetSum().GetAggregationTemporality() == metricsv1.AggregationTemporality_AGGREGATION_TEMPORALITY_DELTA { + return datapoint.Count + } + return datapoint.Counter + + case *metricsv1.Metric_Histogram: + if m.GetHistogram().GetAggregationTemporality() == metricsv1.AggregationTemporality_AGGREGATION_TEMPORALITY_DELTA { + return datapoint.Count + } + return datapoint.Counter + } + panic("invalid metric type") +} + +func convertNumberDataPoints(dps []*metricsv1.NumberDataPoint, typ datapoint.MetricType, name string) []SignalFxMetric { + out := make([]SignalFxMetric, len(dps)) + for i, dp := range dps { + out[i].Name = name + out[i].Type = typ + out[i].DP = metricsv1.NumberDataPoint{ + Attributes: stringifyAttributes(dp.Attributes), + StartTimeUnixNano: dp.StartTimeUnixNano, + TimeUnixNano: dp.TimeUnixNano, + Value: dp.Value, + } + } + return out +} + +func convertHistogram(histDPs []*metricsv1.HistogramDataPoint, typ datapoint.MetricType, name string) []SignalFxMetric { + biggestCount := 0 + for _, histDP := range histDPs { + c := len(histDP.GetBucketCounts()) + if c > biggestCount { + biggestCount = c + } + } + + // ensure we are big enough to fit everything + out := make([]SignalFxMetric, len(histDPs)*(2+biggestCount)) + + i := 0 + for _, histDP := range histDPs { + stringAttrs := stringifyAttributes(histDP.GetAttributes()) + + countPt := &out[i] + countPt.Name = name + "_count" + countPt.Type = typ + c := int64(histDP.GetCount()) + countPt.DP.Attributes = stringAttrs + countPt.DP.TimeUnixNano = histDP.GetTimeUnixNano() + countPt.DP.Value = &metricsv1.NumberDataPoint_AsInt{AsInt: c} + i++ + + sumPt := &out[i] + sumPt.Name = name + "_sum" + sumPt.Type = typ + sum := histDP.GetSum() + sumPt.DP.Attributes = stringAttrs + sumPt.DP.TimeUnixNano = histDP.GetTimeUnixNano() + sumPt.DP.Value = &metricsv1.NumberDataPoint_AsDouble{AsDouble: sum} + i++ + + bounds := histDP.GetExplicitBounds() + counts := histDP.GetBucketCounts() + + // Spec says counts is optional but if present it must have one more + // element than the bounds array. + if len(counts) > 0 && len(counts) != len(bounds)+1 { + continue + } + + var le int64 + for j, c := range counts { + bound := infinityBoundSFxDimValue + if j < len(bounds) { + bound = float64ToDimValue(bounds[j]) + } + + dp := &out[i] + dp.Name = name + "_bucket" + dp.Type = typ + dp.DP.Attributes = append(stringAttrs, &commonv1.KeyValue{ + Key: upperBoundDimensionKey, + Value: &commonv1.AnyValue{ + Value: &commonv1.AnyValue_StringValue{ + StringValue: bound, + }, + }, + }) + dp.DP.TimeUnixNano = histDP.GetTimeUnixNano() + le += int64(c) + dp.DP.Value = &metricsv1.NumberDataPoint_AsInt{AsInt: le} + i++ + } + } + + return out[:i] +} + +func convertSummaryDataPoints( + in []*metricsv1.SummaryDataPoint, + name string, +) []SignalFxMetric { + biggestCount := 0 + for _, sumDP := range in { + c := len(sumDP.GetQuantileValues()) + if c > biggestCount { + biggestCount = c + } + } + out := make([]SignalFxMetric, len(in)*(2+biggestCount)) + + i := 0 + for _, inDp := range in { + stringAttrs := stringifyAttributes(inDp.GetAttributes()) + + countPt := &out[i] + countPt.Name = name + "_count" + countPt.Type = datapoint.Counter + c := int64(inDp.GetCount()) + countPt.DP.Attributes = stringAttrs + countPt.DP.TimeUnixNano = inDp.GetTimeUnixNano() + countPt.DP.Value = &metricsv1.NumberDataPoint_AsInt{AsInt: c} + i++ + + sumPt := &out[i] + sumPt.Name = name + "_sum" + sumPt.Type = datapoint.Counter + sumPt.DP.Attributes = stringAttrs + sumPt.DP.TimeUnixNano = inDp.GetTimeUnixNano() + sumPt.DP.Value = &metricsv1.NumberDataPoint_AsDouble{AsDouble: inDp.GetSum()} + i++ + + qvs := inDp.GetQuantileValues() + for _, qv := range qvs { + qPt := &out[i] + qPt.Name = name + "_quantile" + qPt.Type = datapoint.Gauge + qPt.DP.Attributes = append(stringAttrs, &commonv1.KeyValue{ + Key: "quantile", + Value: &commonv1.AnyValue{ + Value: &commonv1.AnyValue_StringValue{ + StringValue: strconv.FormatFloat(qv.GetQuantile(), 'f', -1, 64), + }, + }, + }) + qPt.DP.TimeUnixNano = inDp.GetTimeUnixNano() + qPt.DP.Value = &metricsv1.NumberDataPoint_AsDouble{AsDouble: qv.GetValue()} + i++ + } + } + return out[:i] +} + +func stringifyAttributes(attributes []*commonv1.KeyValue) []*commonv1.KeyValue { + out := make([]*commonv1.KeyValue, len(attributes)) + for i, kv := range attributes { + out[i] = &commonv1.KeyValue{ + Key: attributes[i].Key, + Value: &commonv1.AnyValue{ + Value: &commonv1.AnyValue_StringValue{ + StringValue: StringifyAnyValue(kv.GetValue()), + }, + }, + } + } + return out +} + +// StringifyAnyValue converts an AnyValue to a string. KVLists and Arrays get recursively JSON +// marshalled. +func StringifyAnyValue(a *commonv1.AnyValue) string { + var v string + if a == nil { + return "" + } + switch a.GetValue().(type) { + case *commonv1.AnyValue_StringValue: + v = a.GetStringValue() + + case *commonv1.AnyValue_BytesValue: + v = base64.StdEncoding.EncodeToString(a.GetBytesValue()) + + case *commonv1.AnyValue_BoolValue: + v = strconv.FormatBool(a.GetBoolValue()) + + case *commonv1.AnyValue_DoubleValue: + v = float64ToDimValue(a.GetDoubleValue()) + + case *commonv1.AnyValue_IntValue: + v = strconv.FormatInt(a.GetIntValue(), 10) + + case *commonv1.AnyValue_KvlistValue, *commonv1.AnyValue_ArrayValue: + jsonStr, _ := json.Marshal(anyValueToRaw(a)) + v = string(jsonStr) + } + + return v +} + +// nolint:gocyclo +func anyValueToRaw(a *commonv1.AnyValue) interface{} { + var v interface{} + if a == nil { + return nil + } + switch a.GetValue().(type) { + case *commonv1.AnyValue_StringValue: + v = a.GetStringValue() + + case *commonv1.AnyValue_BytesValue: + v = a.GetBytesValue() + + case *commonv1.AnyValue_BoolValue: + v = a.GetBoolValue() + + case *commonv1.AnyValue_DoubleValue: + v = a.GetDoubleValue() + + case *commonv1.AnyValue_IntValue: + v = a.GetIntValue() + + case *commonv1.AnyValue_KvlistValue: + kvl := a.GetKvlistValue() + tv := make(map[string]interface{}, len(kvl.Values)) + for _, kv := range kvl.Values { + tv[kv.Key] = anyValueToRaw(kv.Value) + } + v = tv + + case *commonv1.AnyValue_ArrayValue: + av := a.GetArrayValue() + tv := make([]interface{}, len(av.Values)) + for i := range av.Values { + tv[i] = anyValueToRaw(av.Values[i]) + } + v = tv + } + return v +} + +// Is equivalent to strconv.FormatFloat(f, 'g', -1, 64), but hardcodes a few common cases for increased efficiency. +func float64ToDimValue(f float64) string { + // Parameters below are the same used by Prometheus + // see https://github.com/prometheus/common/blob/b5fe7d854c42dc7842e48d1ca58f60feae09d77b/expfmt/text_create.go#L450 + // SignalFx agent uses a different pattern + // https://github.com/signalfx/signalfx-agent/blob/5779a3de0c9861fa07316fd11b3c4ff38c0d78f0/internal/monitors/prometheusexporter/conversion.go#L77 + // The important issue here is consistency with the exporter, opting for the + // more common one used by Prometheus. + switch { + case f == 0: + return "0" + case f == 1: + return "1" + case math.IsInf(f, +1): + return "+Inf" + default: + return strconv.FormatFloat(f, 'g', -1, 64) + } +} + +func numberToSignalFxValue(in *metricsv1.NumberDataPoint) datapoint.Value { + v := in.GetValue() + switch n := v.(type) { + case *metricsv1.NumberDataPoint_AsDouble: + return datapoint.NewFloatValue(n.AsDouble) + case *metricsv1.NumberDataPoint_AsInt: + return datapoint.NewIntValue(n.AsInt) + } + return nil +} + +func mergeStringMaps(ms ...map[string]string) map[string]string { + out := make(map[string]string) + for _, m := range ms { + for k, v := range m { + out[k] = v + } + } + return out +} diff --git a/protocol/otlp/metrics_test.go b/protocol/otlp/metrics_test.go new file mode 100644 index 0000000..af297fe --- /dev/null +++ b/protocol/otlp/metrics_test.go @@ -0,0 +1,836 @@ +// Copyright OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package otlp + +import ( + "math" + "testing" + "time" + + "github.com/signalfx/golib/v3/datapoint" + . "github.com/smartystreets/goconvey/convey" + metricsservicev1 "go.opentelemetry.io/proto/otlp/collector/metrics/v1" + commonv1 "go.opentelemetry.io/proto/otlp/common/v1" + metricsv1 "go.opentelemetry.io/proto/otlp/metrics/v1" + resourcev1 "go.opentelemetry.io/proto/otlp/resource/v1" +) + +const ( + unixSecs = int64(1574092046) + unixNSecs = int64(11 * time.Millisecond) +) + +var ts = time.Unix(unixSecs, unixNSecs) + +func Test_FromMetrics(t *testing.T) { + labelMap := map[string]string{ + "k0": "v0", + "k1": "v1", + } + + const doubleVal = 1234.5678 + makeDoublePt := func() *metricsv1.NumberDataPoint { + return &metricsv1.NumberDataPoint{ + TimeUnixNano: uint64(ts.UnixNano()), + Value: &metricsv1.NumberDataPoint_AsDouble{AsDouble: doubleVal}, + } + } + + makeDoublePtWithLabels := func() *metricsv1.NumberDataPoint { + pt := makeDoublePt() + pt.Attributes = stringMapToAttributeMap(labelMap) + return pt + } + + const int64Val = int64(123) + makeInt64Pt := func() *metricsv1.NumberDataPoint { + return &metricsv1.NumberDataPoint{ + TimeUnixNano: uint64(ts.UnixNano()), + Value: &metricsv1.NumberDataPoint_AsInt{AsInt: int64Val}, + } + } + + makeInt64PtWithLabels := func() *metricsv1.NumberDataPoint { + pt := makeInt64Pt() + pt.Attributes = stringMapToAttributeMap(labelMap) + return pt + } + + makeNilValuePt := func() *metricsv1.NumberDataPoint { + return &metricsv1.NumberDataPoint{ + TimeUnixNano: uint64(ts.UnixNano()), + Value: nil, + } + } + + histBounds := []float64{1, 2, 4} + histCounts := []uint64{4, 2, 3, 7} + + makeDoubleHistDP := func() *metricsv1.HistogramDataPoint { + return &metricsv1.HistogramDataPoint{ + TimeUnixNano: uint64(ts.UnixNano()), + Count: 16, + Sum: float64Pointer(100.0), + ExplicitBounds: histBounds, + BucketCounts: histCounts, + Attributes: stringMapToAttributeMap(labelMap), + } + } + doubleHistDP := makeDoubleHistDP() + + makeDoubleHistDPBadCounts := func() *metricsv1.HistogramDataPoint { + return &metricsv1.HistogramDataPoint{ + TimeUnixNano: uint64(ts.UnixNano()), + Count: 16, + Sum: float64Pointer(100.0), + ExplicitBounds: histBounds, + BucketCounts: []uint64{4}, + Attributes: stringMapToAttributeMap(labelMap), + } + } + + makeIntHistDP := func() *metricsv1.HistogramDataPoint { + return &metricsv1.HistogramDataPoint{ + TimeUnixNano: uint64(ts.UnixNano()), + Count: 16, + Sum: float64Pointer(100), + ExplicitBounds: histBounds, + BucketCounts: histCounts, + Attributes: stringMapToAttributeMap(labelMap), + } + } + intHistDP := makeIntHistDP() + + makeIntHistDPBadCounts := func() *metricsv1.HistogramDataPoint { + return &metricsv1.HistogramDataPoint{ + TimeUnixNano: uint64(ts.UnixNano()), + Count: 16, + Sum: float64Pointer(100), + ExplicitBounds: histBounds, + BucketCounts: []uint64{4}, + Attributes: stringMapToAttributeMap(labelMap), + } + } + + makeHistDPNoBuckets := func() *metricsv1.HistogramDataPoint { + return &metricsv1.HistogramDataPoint{ + Count: 2, + Sum: float64Pointer(10), + TimeUnixNano: uint64(ts.UnixNano()), + Attributes: stringMapToAttributeMap(labelMap), + } + } + histDPNoBuckets := makeHistDPNoBuckets() + + const summarySumVal = 123.4 + const summaryCountVal = 111 + + makeSummaryDP := func() *metricsv1.SummaryDataPoint { + summaryDP := &metricsv1.SummaryDataPoint{ + TimeUnixNano: uint64(ts.UnixNano()), + Sum: summarySumVal, + Count: summaryCountVal, + Attributes: stringMapToAttributeMap(labelMap), + } + for i := 0; i < 4; i++ { + summaryDP.QuantileValues = append(summaryDP.QuantileValues, &metricsv1.SummaryDataPoint_ValueAtQuantile{ + Quantile: 0.25 * float64(i+1), + Value: float64(i), + }) + } + return summaryDP + } + + makeEmptySummaryDP := func() *metricsv1.SummaryDataPoint { + return &metricsv1.SummaryDataPoint{ + TimeUnixNano: uint64(ts.UnixNano()), + Sum: summarySumVal, + Count: summaryCountVal, + Attributes: stringMapToAttributeMap(labelMap), + } + } + + createRMS := func() (*metricsv1.ResourceMetrics, *[]*metricsv1.Metric) { + out := &metricsv1.ResourceMetrics{} + var metrics *[]*metricsv1.Metric + ilm := &metricsv1.ScopeMetrics{} + out.ScopeMetrics = append(out.ScopeMetrics, ilm) + metrics = &ilm.Metrics + + return out, metrics + } + tests := []struct { + name string + metricsFn func() []*metricsv1.ResourceMetrics + wantSfxDataPoints []*datapoint.Datapoint + }{ + { + name: "nil_node_nil_resources_no_dims", + metricsFn: func() []*metricsv1.ResourceMetrics { + out, metrics := createRMS() + + *metrics = []*metricsv1.Metric{ + { + Name: "gauge_double_with_no_dims", + Data: &metricsv1.Metric_Gauge{ + Gauge: &metricsv1.Gauge{ + DataPoints: []*metricsv1.NumberDataPoint{ + makeDoublePt(), + }, + }, + }, + }, + { + Name: "gauge_int_with_no_dims", + Data: &metricsv1.Metric_Gauge{ + Gauge: &metricsv1.Gauge{ + DataPoints: []*metricsv1.NumberDataPoint{ + makeInt64Pt(), + }, + }, + }, + }, + { + Name: "cumulative_double_with_no_dims", + Data: &metricsv1.Metric_Sum{ + Sum: &metricsv1.Sum{ + IsMonotonic: true, + AggregationTemporality: metricsv1.AggregationTemporality_AGGREGATION_TEMPORALITY_CUMULATIVE, + DataPoints: []*metricsv1.NumberDataPoint{ + makeDoublePt(), + }, + }, + }, + }, + { + Name: "cumulative_int_with_no_dims", + Data: &metricsv1.Metric_Sum{ + Sum: &metricsv1.Sum{ + IsMonotonic: true, + AggregationTemporality: metricsv1.AggregationTemporality_AGGREGATION_TEMPORALITY_CUMULATIVE, + DataPoints: []*metricsv1.NumberDataPoint{ + makeInt64Pt(), + }, + }, + }, + }, + { + Name: "delta_double_with_no_dims", + Data: &metricsv1.Metric_Sum{ + Sum: &metricsv1.Sum{ + IsMonotonic: true, + AggregationTemporality: metricsv1.AggregationTemporality_AGGREGATION_TEMPORALITY_DELTA, + DataPoints: []*metricsv1.NumberDataPoint{ + makeDoublePt(), + }, + }, + }, + }, + { + Name: "delta_int_with_no_dims", + Data: &metricsv1.Metric_Sum{ + Sum: &metricsv1.Sum{ + IsMonotonic: true, + AggregationTemporality: metricsv1.AggregationTemporality_AGGREGATION_TEMPORALITY_DELTA, + DataPoints: []*metricsv1.NumberDataPoint{ + makeInt64Pt(), + }, + }, + }, + }, + { + Name: "gauge_sum_double_with_no_dims", + Data: &metricsv1.Metric_Sum{ + Sum: &metricsv1.Sum{ + IsMonotonic: false, + DataPoints: []*metricsv1.NumberDataPoint{ + makeDoublePt(), + }, + }, + }, + }, + { + Name: "gauge_sum_int_with_no_dims", + Data: &metricsv1.Metric_Sum{ + Sum: &metricsv1.Sum{ + IsMonotonic: false, + DataPoints: []*metricsv1.NumberDataPoint{ + makeInt64Pt(), + }, + }, + }, + }, + { + Name: "gauge_sum_int_with_nil_value", + Data: &metricsv1.Metric_Sum{ + Sum: &metricsv1.Sum{ + IsMonotonic: false, + DataPoints: []*metricsv1.NumberDataPoint{ + makeNilValuePt(), + }, + }, + }, + }, + { + Name: "nil_data", + Data: nil, + }, + } + + return []*metricsv1.ResourceMetrics{out} + }, + wantSfxDataPoints: []*datapoint.Datapoint{ + doubleSFxDataPoint("gauge_double_with_no_dims", datapoint.Gauge, nil, doubleVal), + int64SFxDataPoint("gauge_int_with_no_dims", datapoint.Gauge, nil, int64Val), + doubleSFxDataPoint("cumulative_double_with_no_dims", datapoint.Counter, nil, doubleVal), + int64SFxDataPoint("cumulative_int_with_no_dims", datapoint.Counter, nil, int64Val), + doubleSFxDataPoint("delta_double_with_no_dims", datapoint.Count, nil, doubleVal), + int64SFxDataPoint("delta_int_with_no_dims", datapoint.Count, nil, int64Val), + doubleSFxDataPoint("gauge_sum_double_with_no_dims", datapoint.Gauge, nil, doubleVal), + int64SFxDataPoint("gauge_sum_int_with_no_dims", datapoint.Gauge, nil, int64Val), + { + Metric: "gauge_sum_int_with_nil_value", + Timestamp: ts, + Value: nil, + MetricType: datapoint.Gauge, + Dimensions: map[string]string{}, + }, + }, + }, + { + name: "nil_node_and_resources_with_dims", + metricsFn: func() []*metricsv1.ResourceMetrics { + out, metrics := createRMS() + + *metrics = []*metricsv1.Metric{ + { + Name: "gauge_double_with_dims", + Data: &metricsv1.Metric_Gauge{ + Gauge: &metricsv1.Gauge{ + DataPoints: []*metricsv1.NumberDataPoint{ + makeDoublePtWithLabels(), + }, + }, + }, + }, + { + Name: "gauge_int_with_dims", + Data: &metricsv1.Metric_Gauge{ + Gauge: &metricsv1.Gauge{ + DataPoints: []*metricsv1.NumberDataPoint{ + makeInt64PtWithLabels(), + }, + }, + }, + }, + { + Name: "cumulative_double_with_dims", + Data: &metricsv1.Metric_Sum{ + Sum: &metricsv1.Sum{ + IsMonotonic: true, + AggregationTemporality: metricsv1.AggregationTemporality_AGGREGATION_TEMPORALITY_CUMULATIVE, + DataPoints: []*metricsv1.NumberDataPoint{ + makeDoublePtWithLabels(), + }, + }, + }, + }, + { + Name: "cumulative_int_with_dims", + Data: &metricsv1.Metric_Sum{ + Sum: &metricsv1.Sum{ + IsMonotonic: true, + AggregationTemporality: metricsv1.AggregationTemporality_AGGREGATION_TEMPORALITY_CUMULATIVE, + DataPoints: []*metricsv1.NumberDataPoint{ + makeInt64PtWithLabels(), + }, + }, + }, + }, + } + + return []*metricsv1.ResourceMetrics{out} + }, + wantSfxDataPoints: []*datapoint.Datapoint{ + doubleSFxDataPoint("gauge_double_with_dims", datapoint.Gauge, labelMap, doubleVal), + int64SFxDataPoint("gauge_int_with_dims", datapoint.Gauge, labelMap, int64Val), + doubleSFxDataPoint("cumulative_double_with_dims", datapoint.Counter, labelMap, doubleVal), + int64SFxDataPoint("cumulative_int_with_dims", datapoint.Counter, labelMap, int64Val), + }, + }, + { + name: "with_node_resources_dims", + metricsFn: func() []*metricsv1.ResourceMetrics { + out, metrics := createRMS() + out.Resource = &resourcev1.Resource{ + Attributes: []*commonv1.KeyValue{ + { + Key: "k_r0", + Value: &commonv1.AnyValue{Value: &commonv1.AnyValue_StringValue{StringValue: "v_r0"}}, + }, + { + Key: "k_r1", + Value: &commonv1.AnyValue{Value: &commonv1.AnyValue_StringValue{StringValue: "v_r1"}}, + }, + { + Key: "k_n0", + Value: &commonv1.AnyValue{Value: &commonv1.AnyValue_StringValue{StringValue: "v_n0"}}, + }, + { + Key: "k_n1", + Value: &commonv1.AnyValue{Value: &commonv1.AnyValue_StringValue{StringValue: "v_n1"}}, + }, + }, + } + + *metrics = []*metricsv1.Metric{ + { + Name: "gauge_double_with_dims", + Data: &metricsv1.Metric_Gauge{ + Gauge: &metricsv1.Gauge{ + DataPoints: []*metricsv1.NumberDataPoint{ + makeDoublePtWithLabels(), + }, + }, + }, + }, + { + Name: "gauge_int_with_dims", + Data: &metricsv1.Metric_Gauge{ + Gauge: &metricsv1.Gauge{ + DataPoints: []*metricsv1.NumberDataPoint{ + makeInt64PtWithLabels(), + }, + }, + }, + }, + } + return []*metricsv1.ResourceMetrics{out} + }, + wantSfxDataPoints: []*datapoint.Datapoint{ + doubleSFxDataPoint( + "gauge_double_with_dims", + datapoint.Gauge, + mergeStringMaps(map[string]string{ + "k_n0": "v_n0", + "k_n1": "v_n1", + "k_r0": "v_r0", + "k_r1": "v_r1", + }, labelMap), + doubleVal), + int64SFxDataPoint( + "gauge_int_with_dims", + datapoint.Gauge, + mergeStringMaps(map[string]string{ + "k_n0": "v_n0", + "k_n1": "v_n1", + "k_r0": "v_r0", + "k_r1": "v_r1", + }, labelMap), + int64Val), + }, + }, + { + name: "histograms", + metricsFn: func() []*metricsv1.ResourceMetrics { + out, metrics := createRMS() + + *metrics = []*metricsv1.Metric{ + { + Name: "int_histo", + Data: &metricsv1.Metric_Histogram{ + Histogram: &metricsv1.Histogram{ + DataPoints: []*metricsv1.HistogramDataPoint{ + makeIntHistDP(), + }, + }, + }, + }, + { + Name: "int_delta_histo", + Data: &metricsv1.Metric_Histogram{ + Histogram: &metricsv1.Histogram{ + AggregationTemporality: metricsv1.AggregationTemporality_AGGREGATION_TEMPORALITY_DELTA, + DataPoints: []*metricsv1.HistogramDataPoint{ + makeIntHistDP(), + }, + }, + }, + }, + { + Name: "double_histo", + Data: &metricsv1.Metric_Histogram{ + Histogram: &metricsv1.Histogram{ + DataPoints: []*metricsv1.HistogramDataPoint{ + makeDoubleHistDP(), + }, + }, + }, + }, + { + Name: "double_delta_histo", + Data: &metricsv1.Metric_Histogram{ + Histogram: &metricsv1.Histogram{ + AggregationTemporality: metricsv1.AggregationTemporality_AGGREGATION_TEMPORALITY_DELTA, + DataPoints: []*metricsv1.HistogramDataPoint{ + makeDoubleHistDP(), + }, + }, + }, + }, + { + Name: "double_histo_bad_counts", + Data: &metricsv1.Metric_Histogram{ + Histogram: &metricsv1.Histogram{ + DataPoints: []*metricsv1.HistogramDataPoint{ + makeDoubleHistDPBadCounts(), + }, + }, + }, + }, + { + Name: "int_histo_bad_counts", + Data: &metricsv1.Metric_Histogram{ + Histogram: &metricsv1.Histogram{ + DataPoints: []*metricsv1.HistogramDataPoint{ + makeIntHistDPBadCounts(), + }, + }, + }, + }, + } + return []*metricsv1.ResourceMetrics{out} + }, + wantSfxDataPoints: mergeDPs( + expectedFromHistogram("int_histo", labelMap, intHistDP, false), + expectedFromHistogram("int_delta_histo", labelMap, intHistDP, true), + expectedFromHistogram("double_histo", labelMap, doubleHistDP, false), + expectedFromHistogram("double_delta_histo", labelMap, doubleHistDP, true), + []*datapoint.Datapoint{ + int64SFxDataPoint("double_histo_bad_counts_count", datapoint.Counter, labelMap, int64(doubleHistDP.Count)), + doubleSFxDataPoint("double_histo_bad_counts_sum", datapoint.Counter, labelMap, *doubleHistDP.Sum), + }, + []*datapoint.Datapoint{ + int64SFxDataPoint("int_histo_bad_counts_count", datapoint.Counter, labelMap, int64(intHistDP.Count)), + doubleSFxDataPoint("int_histo_bad_counts_sum", datapoint.Counter, labelMap, *intHistDP.Sum), + }, + ), + }, + { + name: "distribution_no_buckets", + metricsFn: func() []*metricsv1.ResourceMetrics { + out, metrics := createRMS() + + *metrics = []*metricsv1.Metric{ + { + Name: "no_bucket_histo", + Data: &metricsv1.Metric_Histogram{ + Histogram: &metricsv1.Histogram{ + DataPoints: []*metricsv1.HistogramDataPoint{ + makeHistDPNoBuckets(), + }, + }, + }, + }, + } + return []*metricsv1.ResourceMetrics{out} + }, + wantSfxDataPoints: expectedFromHistogram("no_bucket_histo", labelMap, histDPNoBuckets, false), + }, + { + name: "summaries", + metricsFn: func() []*metricsv1.ResourceMetrics { + out, metrics := createRMS() + + *metrics = []*metricsv1.Metric{ + { + Name: "summary", + Data: &metricsv1.Metric_Summary{ + Summary: &metricsv1.Summary{ + DataPoints: []*metricsv1.SummaryDataPoint{ + makeSummaryDP(), + }, + }, + }, + }, + } + return []*metricsv1.ResourceMetrics{out} + }, + wantSfxDataPoints: expectedFromSummary("summary", labelMap, summaryCountVal, summarySumVal), + }, + { + name: "empty_summary", + metricsFn: func() []*metricsv1.ResourceMetrics { + out, metrics := createRMS() + + *metrics = []*metricsv1.Metric{ + { + Name: "empty_summary", + Data: &metricsv1.Metric_Summary{ + Summary: &metricsv1.Summary{ + DataPoints: []*metricsv1.SummaryDataPoint{ + makeEmptySummaryDP(), + }, + }, + }, + }, + } + return []*metricsv1.ResourceMetrics{out} + }, + wantSfxDataPoints: expectedFromEmptySummary("empty_summary", labelMap, summaryCountVal, summarySumVal), + }, + } + for _, tt := range tests { + Convey(tt.name, t, func() { + rms := tt.metricsFn() + gotSfxDataPoints := FromOTLPMetricRequest(&metricsservicev1.ExportMetricsServiceRequest{ResourceMetrics: rms}) + So(tt.wantSfxDataPoints, ShouldResemble, gotSfxDataPoints) + + firstMetric := rms[0].GetScopeMetrics()[0].Metrics[0] + dpsFromMetric := FromMetric(firstMetric) + So(dpsFromMetric, ShouldNotBeEmpty) + }) + } +} + +func TestMetricTypeDerive(t *testing.T) { + Convey("deriveSignalFxMetricType panics if invalid metric", t, func() { + So(func() { deriveSignalFxMetricType(&metricsv1.Metric{}) }, ShouldPanic) + }) +} + +func TestAttributesToDimensions(t *testing.T) { + Convey("stringifyAttributes", t, func() { + attrs := []*commonv1.KeyValue{ + { + Key: "a", + Value: &commonv1.AnyValue{Value: &commonv1.AnyValue_StringValue{StringValue: "s"}}, + }, + { + Key: "b", + Value: &commonv1.AnyValue{Value: &commonv1.AnyValue_StringValue{StringValue: ""}}, + }, + { + Key: "c", + Value: &commonv1.AnyValue{Value: &commonv1.AnyValue_BoolValue{BoolValue: true}}, + }, + { + Key: "d", + Value: &commonv1.AnyValue{Value: &commonv1.AnyValue_IntValue{IntValue: 44}}, + }, + { + Key: "e", + Value: &commonv1.AnyValue{Value: &commonv1.AnyValue_DoubleValue{DoubleValue: 45.1}}, + }, + { + Key: "f", + Value: &commonv1.AnyValue{ + Value: &commonv1.AnyValue_ArrayValue{ + ArrayValue: &commonv1.ArrayValue{ + Values: []*commonv1.AnyValue{ + {Value: &commonv1.AnyValue_StringValue{StringValue: "n1"}}, + {Value: &commonv1.AnyValue_StringValue{StringValue: "n2"}}, + }, + }, + }, + }, + }, + { + Key: "g", + Value: &commonv1.AnyValue{ + Value: &commonv1.AnyValue_KvlistValue{ + KvlistValue: &commonv1.KeyValueList{ + Values: []*commonv1.KeyValue{ + {Key: "k1", Value: &commonv1.AnyValue{Value: &commonv1.AnyValue_StringValue{StringValue: "n1"}}}, + {Key: "k2", Value: &commonv1.AnyValue{Value: &commonv1.AnyValue_BoolValue{BoolValue: false}}}, + {Key: "k3", Value: nil}, + {Key: "k4", Value: &commonv1.AnyValue{Value: &commonv1.AnyValue_DoubleValue{DoubleValue: 40.3}}}, + {Key: "k5", Value: &commonv1.AnyValue{Value: &commonv1.AnyValue_IntValue{IntValue: 41}}}, + {Key: "k6", Value: &commonv1.AnyValue{Value: &commonv1.AnyValue_BytesValue{BytesValue: []byte("n2")}}}, + }, + }, + }, + }, + }, + { + Key: "h", + Value: nil, + }, + { + Key: "i", + Value: &commonv1.AnyValue{Value: &commonv1.AnyValue_DoubleValue{DoubleValue: 0}}, + }, + { + Key: "j", + Value: &commonv1.AnyValue{Value: nil}, + }, + { + Key: "k", + Value: &commonv1.AnyValue{Value: &commonv1.AnyValue_BytesValue{BytesValue: []byte("to boldly go")}}, + }, + } + + dimKVs := stringifyAttributes(attrs) + var m SignalFxMetric + m.DP.Attributes = dimKVs + So(m.ToDatapoint().Dimensions, ShouldResemble, map[string]string{ + "a": "s", + "c": "true", + "d": "44", + "e": "45.1", + "f": `["n1","n2"]`, + "g": `{"k1":"n1","k2":false,"k3":null,"k4":40.3,"k5":41,"k6":"bjI="}`, + "i": "0", + // No entry for "j" because it's nil and would be skipped by ToDatapoint() + "k": "dG8gYm9sZGx5IGdv", + }) + }) + + Convey("Non-string attributes in SignalFxMetric panic", t, func() { + attrs := []*commonv1.KeyValue{ + { + Key: "a", + Value: nil, + }, + { + Key: "c", + Value: &commonv1.AnyValue{Value: &commonv1.AnyValue_BoolValue{BoolValue: true}}, + }, + } + var m SignalFxMetric + m.DP.Attributes = attrs + So(func() { m.ToDatapoint() }, ShouldPanic) + }) +} + +func doubleSFxDataPoint( + metric string, + metricType datapoint.MetricType, + dims map[string]string, + val float64, +) *datapoint.Datapoint { + return &datapoint.Datapoint{ + Metric: metric, + Timestamp: ts, + Value: datapoint.NewFloatValue(val), + MetricType: metricType, + Dimensions: cloneStringMap(dims), + } +} + +func int64SFxDataPoint( + metric string, + metricType datapoint.MetricType, + dims map[string]string, + val int64, +) *datapoint.Datapoint { + return &datapoint.Datapoint{ + Metric: metric, + Timestamp: ts, + Value: datapoint.NewIntValue(val), + MetricType: metricType, + Dimensions: cloneStringMap(dims), + } +} + +func expectedFromHistogram( + metricName string, + dims map[string]string, + histDP *metricsv1.HistogramDataPoint, + isDelta bool, +) []*datapoint.Datapoint { + buckets := histDP.GetBucketCounts() + + dps := make([]*datapoint.Datapoint, 0) + + typ := datapoint.Counter + if isDelta { + typ = datapoint.Count + } + + dps = append(dps, + int64SFxDataPoint(metricName+"_count", typ, dims, int64(histDP.GetCount())), + doubleSFxDataPoint(metricName+"_sum", typ, dims, histDP.GetSum())) + + explicitBounds := histDP.GetExplicitBounds() + if explicitBounds == nil { + return dps + } + var le int64 + for i := 0; i < len(explicitBounds); i++ { + dimsCopy := cloneStringMap(dims) + dimsCopy[upperBoundDimensionKey] = float64ToDimValue(explicitBounds[i]) + le += int64(buckets[i]) + dps = append(dps, int64SFxDataPoint(metricName+"_bucket", typ, dimsCopy, le)) + } + dimsCopy := cloneStringMap(dims) + dimsCopy[upperBoundDimensionKey] = float64ToDimValue(math.Inf(1)) + le += int64(buckets[len(buckets)-1]) + dps = append(dps, int64SFxDataPoint(metricName+"_bucket", typ, dimsCopy, le)) + return dps +} + +func expectedFromSummary(name string, labelMap map[string]string, count int64, sumVal float64) []*datapoint.Datapoint { + countPt := int64SFxDataPoint(name+"_count", datapoint.Counter, labelMap, count) + sumPt := doubleSFxDataPoint(name+"_sum", datapoint.Counter, labelMap, sumVal) + out := []*datapoint.Datapoint{countPt, sumPt} + quantileDimVals := []string{"0.25", "0.5", "0.75", "1"} + for i := 0; i < 4; i++ { + qDims := map[string]string{"quantile": quantileDimVals[i]} + qPt := doubleSFxDataPoint( + name+"_quantile", + datapoint.Gauge, + mergeStringMaps(labelMap, qDims), + float64(i), + ) + out = append(out, qPt) + } + return out +} + +func expectedFromEmptySummary(name string, labelMap map[string]string, count int64, sumVal float64) []*datapoint.Datapoint { + countPt := int64SFxDataPoint(name+"_count", datapoint.Counter, labelMap, count) + sumPt := doubleSFxDataPoint(name+"_sum", datapoint.Counter, labelMap, sumVal) + return []*datapoint.Datapoint{countPt, sumPt} +} + +func mergeDPs(dps ...[]*datapoint.Datapoint) []*datapoint.Datapoint { + var out []*datapoint.Datapoint + for i := range dps { + out = append(out, dps[i]...) + } + return out +} + +func cloneStringMap(m map[string]string) map[string]string { + out := make(map[string]string) + for k, v := range m { + out[k] = v + } + return out +} + +func stringMapToAttributeMap(m map[string]string) []*commonv1.KeyValue { + ret := make([]*commonv1.KeyValue, 0, len(m)) + for k, v := range m { + ret = append(ret, &commonv1.KeyValue{ + Key: k, + Value: &commonv1.AnyValue{Value: &commonv1.AnyValue_StringValue{StringValue: v}}, + }) + } + return ret +} + +func float64Pointer(f float64) *float64 { return &f }