Skip to content

Commit

Permalink
revert otel decoder fe53d28 (#94)
Browse files Browse the repository at this point in the history
Co-authored-by: jgheewala <[email protected]>
  • Loading branch information
jgheewala and jgheewala authored Apr 27, 2023
1 parent 112e21c commit 7c02344
Show file tree
Hide file tree
Showing 6 changed files with 1,441 additions and 1 deletion.
4 changes: 3 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@ require (
github.com/smartystreets/assertions v1.13.0
github.com/smartystreets/goconvey v1.7.2
github.com/stretchr/testify v1.8.0
go.opentelemetry.io/proto/otlp v0.19.0
google.golang.org/grpc v1.49.0
google.golang.org/protobuf v1.28.1
)

require (
Expand All @@ -30,6 +32,7 @@ require (
github.com/go-stack/stack v1.8.1 // indirect
github.com/gogo/googleapis v1.4.1 // indirect
github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1 // indirect
github.com/grpc-ecosystem/grpc-gateway/v2 v2.11.1 // indirect
github.com/josharian/intern v1.0.0 // indirect
github.com/jtolds/gls v4.20.0+incompatible // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
Expand All @@ -43,6 +46,5 @@ require (
golang.org/x/sys v0.0.0-20220808155132-1c4a2a72c664 // indirect
golang.org/x/text v0.3.7 // indirect
google.golang.org/genproto v0.0.0-20220808204814-fd01256a5276 // indirect
google.golang.org/protobuf v1.28.1 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
7 changes: 7 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -432,6 +432,8 @@ github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69
github.com/golang-jwt/jwt/v4 v4.0.0/go.mod h1:/xlHOz8bRuivTWchD4jCa+NbatV+wEUSzwAxVc6locg=
github.com/golang/freetype v0.0.0-20170609003504-e2365dfdc4a0/go.mod h1:E/TSTwGwJL78qG/PmXZO1EjYhfJinVAhrmmHX6Z8B9k=
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q=
github.com/golang/glog v1.0.0 h1:nfP3RFugxnNRyKgeWd4oI1nYvXpxrx8ck8ZrcizshdQ=
github.com/golang/glog v1.0.0/go.mod h1:EWib/APOK0SL3dFbYqvxE3UYd8E6s1ouQ7iEp/0LWV4=
github.com/golang/groupcache v0.0.0-20160516000752-02826c3e7903/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc=
github.com/golang/groupcache v0.0.0-20190129154638-5b532d6fd5ef/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc=
github.com/golang/groupcache v0.0.0-20190702054246-869f871628b6/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc=
Expand Down Expand Up @@ -566,6 +568,9 @@ github.com/grpc-ecosystem/grpc-gateway v1.9.0/go.mod h1:vNeuVxBJEsws4ogUvrchl83t
github.com/grpc-ecosystem/grpc-gateway v1.9.5/go.mod h1:vNeuVxBJEsws4ogUvrchl83t/GYV9WGTSLVdBhOQFDY=
github.com/grpc-ecosystem/grpc-gateway v1.14.5/go.mod h1:UJ0EZAp832vCd54Wev9N1BMKEyvcZ5+IM0AwDrnlkEc=
github.com/grpc-ecosystem/grpc-gateway v1.16.0/go.mod h1:BDjrQk3hbvj6Nolgz8mAMFbcEtjT1g+wF4CSlocrBnw=
github.com/grpc-ecosystem/grpc-gateway/v2 v2.7.0/go.mod h1:hgWBS7lorOAVIJEQMi4ZsPv9hVvWI6+ch50m39Pf2Ks=
github.com/grpc-ecosystem/grpc-gateway/v2 v2.11.1 h1:/sDbPb60SusIXjiJGYLUoS/rAQurQmvGWmwn2bBPM9c=
github.com/grpc-ecosystem/grpc-gateway/v2 v2.11.1/go.mod h1:G+WkljZi4mflcqVxYSgvt8MNctRQHjEH8ubKtt1Ka3w=
github.com/grpc-ecosystem/grpc-opentracing v0.0.0-20180507213350-8e809c8a8645/go.mod h1:6iZfnjpejD4L/4DwD7NryNaJyCQdzwWwH2MWhCA90Kw=
github.com/hailocab/go-hostpool v0.0.0-20160125115350-e80d13ce29ed/go.mod h1:tMWxXQ9wFIaZeTI9F+hmhFiGpFmhOHzyShyFUhRm0H4=
github.com/hashicorp/consul/api v1.1.0/go.mod h1:VmuI/Lkw1nC05EYQWNKwWGbkg+FbDBtguAZLlVdkD9Q=
Expand Down Expand Up @@ -1228,6 +1233,8 @@ go.opentelemetry.io/otel/trace v1.7.0/go.mod h1:fzLSB9nqR2eXzxPXb2JW9IKE+ScyXA48
go.opentelemetry.io/otel/trace v1.8.0/go.mod h1:0Bt3PXY8w+3pheS3hQUt+wow8b1ojPaTBoTCh2zIFI4=
go.opentelemetry.io/otel/trace v1.9.0/go.mod h1:2737Q0MuG8q1uILYm2YYVkAyLtOofiTNGg6VODnOiPo=
go.opentelemetry.io/proto/otlp v0.7.0/go.mod h1:PqfVotwruBrMGOCsRd/89rSnXhoiJIqeYNgFYFoEGnI=
go.opentelemetry.io/proto/otlp v0.19.0 h1:IVN6GR+mhC4s5yfcTbmzHYODqvWAp3ZedA2SJPI1Nnw=
go.opentelemetry.io/proto/otlp v0.19.0/go.mod h1:H7XAot3MsfNsj7EXtrA2q5xSNQ10UqI405h3+duxN4U=
go.uber.org/atomic v1.3.2/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
go.uber.org/atomic v1.5.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ=
Expand Down
53 changes: 53 additions & 0 deletions protocol/otlp/decoder.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package otlp

import (
"bytes"
"context"
"net/http"
"sync"

"github.com/signalfx/golib/v3/datapoint/dpsink"
"github.com/signalfx/golib/v3/log"
"github.com/signalfx/ingest-protocols/logkey"
"github.com/signalfx/ingest-protocols/protocol"
"github.com/signalfx/ingest-protocols/protocol/signalfx"
metricsservicev1 "go.opentelemetry.io/proto/otlp/collector/metrics/v1"
"google.golang.org/protobuf/proto"
)

type httpMetricDecoder struct {
sink dpsink.Sink
logger log.Logger
buffs sync.Pool
}

// NewHTTPMetricDecoder decodes OTLP metrics and puts them onto the provided sink.
func NewHTTPMetricDecoder(sink dpsink.Sink, logger log.Logger) signalfx.ErrorReader {
return &httpMetricDecoder{
sink: sink,
logger: log.NewContext(logger).With(logkey.Protocol, "otlp"),
buffs: sync.Pool{
New: func() interface{} {
return new(bytes.Buffer)
},
},
}
}

func (d *httpMetricDecoder) Read(ctx context.Context, req *http.Request) (err error) {
jeff := d.buffs.Get().(*bytes.Buffer)
defer d.buffs.Put(jeff)
jeff.Reset()
if err = protocol.ReadFromRequest(jeff, req, d.logger); err != nil {
return err
}
var msg metricsservicev1.ExportMetricsServiceRequest
if err = proto.Unmarshal(jeff.Bytes(), &msg); err != nil {
return err
}
dps := FromOTLPMetricRequest(&msg)
if len(dps) > 0 {
err = d.sink.AddDatapoints(ctx, dps)
}
return err
}
99 changes: 99 additions & 0 deletions protocol/otlp/decoder_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
package otlp

import (
"bytes"
"context"
"errors"
"io"
"net/http"
"sync"
"testing"

"github.com/signalfx/golib/v3/datapoint/dptest"
"github.com/signalfx/golib/v3/log"
. "github.com/smartystreets/goconvey/convey"
metricsservicev1 "go.opentelemetry.io/proto/otlp/collector/metrics/v1"
commonv1 "go.opentelemetry.io/proto/otlp/common/v1"
metricsv1 "go.opentelemetry.io/proto/otlp/metrics/v1"
"google.golang.org/protobuf/proto"
)

var errRead = errors.New("could not read")

type errorReader struct{}

func (errorReader *errorReader) Read([]byte) (int, error) {
return 0, errRead
}

func TestDecoder(t *testing.T) {
Convey("httpMetricDecoder", t, func() {
sendTo := dptest.NewBasicSink()
decoder := NewHTTPMetricDecoder(sendTo, log.Discard)

Convey("Bad request reading", func() {
req := &http.Request{
Body: io.NopCloser(&errorReader{}),
}
req.ContentLength = 1
ctx := context.Background()
So(decoder.Read(ctx, req), ShouldEqual, errRead)
})

Convey("Bad request content", func() {
req := &http.Request{
Body: io.NopCloser(bytes.NewBufferString("asdf")),
}
req.ContentLength = 4
ctx := context.Background()
So(decoder.Read(ctx, req), ShouldNotBeNil)
})

Convey("Good request", func(c C) {
var msg metricsservicev1.ExportMetricsServiceRequest
msg.ResourceMetrics = []*metricsv1.ResourceMetrics{
{
ScopeMetrics: []*metricsv1.ScopeMetrics{
{
Metrics: []*metricsv1.Metric{
{
Name: "test",
Data: &metricsv1.Metric_Gauge{
Gauge: &metricsv1.Gauge{
DataPoints: []*metricsv1.NumberDataPoint{
{
Attributes: []*commonv1.KeyValue{},
StartTimeUnixNano: 1000,
TimeUnixNano: 1000,
Value: &metricsv1.NumberDataPoint_AsInt{AsInt: 4},
},
},
},
},
},
},
},
},
},
}
b, _ := proto.Marshal(&msg)
req := &http.Request{
Body: io.NopCloser(bytes.NewBuffer(b)),
}
req.ContentLength = int64(len(b))
ctx := context.Background()

var wg sync.WaitGroup
wg.Add(1)
go func() {
dp := <-sendTo.PointsChan
c.So(dp, ShouldNotBeNil)
wg.Done()
}()

So(decoder.Read(ctx, req), ShouldBeNil)

wg.Wait()
})
})
}
Loading

0 comments on commit 7c02344

Please sign in to comment.