Skip to content

Commit

Permalink
Handwritten elasticsearch router, support gzipped body (#486)
Browse files Browse the repository at this point in the history
* Handwritten elasticsearch router, support gzipped body

* Rename tls package -> xtls
  • Loading branch information
vadimalekseev authored Aug 25, 2023
1 parent 82333a1 commit e34e03c
Show file tree
Hide file tree
Showing 13 changed files with 357 additions and 80 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ require (
github.com/jackc/pgconn v1.14.1
github.com/jackc/pgproto3/v2 v2.3.2
github.com/jackc/pgx/v4 v4.18.1
github.com/klauspost/compress v1.16.7
github.com/minio/minio-go v6.0.14+incompatible
github.com/prometheus/client_golang v1.11.1
github.com/rjeczalik/notify v0.9.3
Expand Down Expand Up @@ -100,7 +101,6 @@ require (
github.com/jcmturner/rpc/v2 v2.0.3 // indirect
github.com/josharian/intern v1.0.0 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/klauspost/compress v1.16.7 // indirect
github.com/lann/builder v0.0.0-20180802200727-47ae307949d0 // indirect
github.com/lann/ps v0.0.0-20150810152359-62de8c46ede0 // indirect
github.com/lib/pq v1.10.4 // indirect
Expand Down
5 changes: 3 additions & 2 deletions metric/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,9 @@ const (
)

var (
SecondsBucketsDetailed = prometheus.ExponentialBuckets(0.0005, 2, 16) // covers range from 500us to 16.384s
SecondsBucketsLong = prometheus.ExponentialBuckets(0.005, 2, 16) // covers range from 5ms to 163.84s
SecondsBucketsDetailedNano = prometheus.ExponentialBuckets(0.000005, 2, 19) // covers range from 5ns to 1.3ms
SecondsBucketsDetailed = prometheus.ExponentialBuckets(0.0005, 2, 16) // covers range from 500us to 16.384s
SecondsBucketsLong = prometheus.ExponentialBuckets(0.005, 2, 16) // covers range from 5ms to 163.84s
)

type Ctl struct {
Expand Down
2 changes: 1 addition & 1 deletion pipeline/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ func (p *Pipeline) registerMetrics() {
p.wrongEventCRIFormatMetric = m.RegisterCounter("wrong_event_cri_format", "Wrong event CRI format counter")
p.maxEventSizeExceededMetric = m.RegisterCounter("max_event_size_exceeded", "Max event size exceeded counter")
p.eventPoolLatency = m.RegisterHistogram("event_pool_latency_seconds",
"How long we are wait an event from the pool", metric.SecondsBucketsDetailed).
"How long we are wait an event from the pool", metric.SecondsBucketsDetailedNano).
WithLabelValues()
}

Expand Down
1 change: 1 addition & 0 deletions plugin/action/debug/debug.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ var (
loggerByPipelineMu sync.Mutex
)

// return shared logger between concurrent running processors
func (p *Plugin) setupLogger(pipelineName string, parentLogger *zap.Logger, config *Config) {
loggerByPipelineMu.Lock()
defer loggerByPipelineMu.Unlock()
Expand Down
42 changes: 16 additions & 26 deletions plugin/input/http/elasticsearch.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@ package http

import (
"net/http"

"go.uber.org/zap"
)

var info = []byte(`{
Expand Down Expand Up @@ -89,6 +87,15 @@ var xpack = []byte(`{
"tagline": "You know, for nothing"
}`)

var license = []byte(`{
"license": {
"mode": "basic",
"status": "active",
"type": "basic",
"uid": "e76d6ce9-f78c-44ff-8fd5-b5877357d649"
}
}`)

var result = []byte(`{
"took": 30,
"errors": false,
Expand All @@ -97,35 +104,18 @@ var result = []byte(`{

var empty = []byte(`{}`)

func (p *Plugin) elasticsearch(mux *http.ServeMux) {
mux.HandleFunc("/", p.serveElasticsearchInfo)
mux.HandleFunc("/_xpack", p.serveElasticsearchXPack)
mux.HandleFunc("/_bulk", p.serve)
mux.HandleFunc("/_template/", p.serveElasticsearchTemplate)
}

func (p *Plugin) serveElasticsearchXPack(w http.ResponseWriter, _ *http.Request) {
_, err := w.Write(xpack)
if err != nil {
p.logger.Error("can't write response", zap.Error(err))
}
}

func (p *Plugin) serveElasticsearchTemplate(w http.ResponseWriter, _ *http.Request) {
_, err := w.Write(empty)
if err != nil {
p.logger.Error("can't write response", zap.Error(err))
}
_, _ = w.Write(xpack)
}

func (p *Plugin) serveElasticsearchInfo(w http.ResponseWriter, r *http.Request) {
if r.Method == http.MethodGet && r.RequestURI == "/" {
_, err := w.Write(info)
if err != nil {
p.logger.Error("can't write response", zap.Error(err))
}
if r.Method == http.MethodGet {
_, _ = w.Write(info)
return
}
// otherwise return an empty response
}

p.logger.Error("unknown request", zap.String("uri", r.RequestURI), zap.String("method", r.Method))
func (p *Plugin) serveElasticsearchLicense(w http.ResponseWriter, _ *http.Request) {
_, _ = w.Write(license)
}
76 changes: 76 additions & 0 deletions plugin/input/http/elasticsearch_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package http

import (
"encoding/json"
"net/http"
"net/http/httptest"
"testing"

"github.com/stretchr/testify/require"
)

func TestElasticsearchResponse(t *testing.T) {
t.Parallel()
r := require.New(t)

p := Plugin{}

tcs := []struct {
Name string
Handler http.HandlerFunc
Request *http.Request
ExpectedBody []byte
ExpectedCode int
}{
{
Name: "info",
Handler: p.serveElasticsearchInfo,
Request: httptest.NewRequest(http.MethodGet, "/", http.NoBody),
ExpectedBody: info,
ExpectedCode: http.StatusOK,
},
{
Name: "info head",
Handler: p.serveElasticsearchInfo,
Request: httptest.NewRequest(http.MethodHead, "/", http.NoBody),
ExpectedBody: nil,
ExpectedCode: http.StatusOK,
},
{
Name: "xpack",
Handler: p.serveElasticsearchXPack,
Request: httptest.NewRequest(http.MethodGet, "/", http.NoBody),
ExpectedBody: xpack,
ExpectedCode: http.StatusOK,
},
{
Name: "license",
Handler: p.serveElasticsearchLicense,
Request: httptest.NewRequest(http.MethodGet, "/", http.NoBody),
ExpectedBody: license,
ExpectedCode: http.StatusOK,
},
}

for _, tc := range tcs {
tc := tc
t.Run(tc.Name, func(t *testing.T) {
t.Parallel()

rec := httptest.NewRecorder()

tc.Handler.ServeHTTP(rec, tc.Request)

r.Equal(http.StatusOK, rec.Code)

body := rec.Body.Bytes()
r.Equal(tc.ExpectedBody, body)

// check body is valid json
if len(body) > 0 {
m := map[string]any{}
r.NoError(json.Unmarshal(rec.Body.Bytes(), &m))
}
})
}
}
Loading

0 comments on commit e34e03c

Please sign in to comment.