Skip to content

Commit

Permalink
ozontech#487 request size metric added
Browse files Browse the repository at this point in the history
  • Loading branch information
romanchechyotkin committed Jul 23, 2024
1 parent 77f96e2 commit df0ad37
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 1 deletion.
1 change: 1 addition & 0 deletions metric/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ var (
SecondsBucketsDetailedNano = prometheus.ExponentialBuckets(0.000005, 2, 19) // covers range from 5ns to 1.3ms
SecondsBucketsDetailed = prometheus.ExponentialBuckets(0.0005, 2, 16) // covers range from 500us to 16.384s
SecondsBucketsLong = prometheus.ExponentialBuckets(0.005, 2, 16) // covers range from 5ms to 163.84s
RequestsSize = prometheus.ExponentialBuckets(100, 10, 5) // covers range from 100B to 1MB
)

type Ctl struct {
Expand Down
31 changes: 30 additions & 1 deletion plugin/input/http/http.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package http

import (
"bytes"
"io"
"net"
"net/http"
Expand Down Expand Up @@ -109,6 +110,7 @@ type Plugin struct {
bulkRequestsDoneTotal prometheus.Counter
requestsInProgress prometheus.Gauge
processBulkSeconds prometheus.Observer
requestsSize prometheus.Observer

metaTemplater *metadata.MetaTemplater
}
Expand Down Expand Up @@ -254,10 +256,11 @@ func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.InputPluginPa
}

func (p *Plugin) registerMetrics(ctl *metric.Ctl) {
p.bulkRequestsDoneTotal = ctl.RegisterCounter("bulk_requests_done_total", "")
p.bulkRequestsDoneTotal = ctl.RegisterCounter("bulk_requests_done_total", "Total http bulk requests done")
p.requestsInProgress = ctl.RegisterGauge("requests_in_progress", "")
p.processBulkSeconds = ctl.RegisterHistogram("process_bulk_seconds", "", metric.SecondsBucketsDetailed)
p.errorsTotal = ctl.RegisterCounter("input_http_errors", "Total http errors")
p.requestsSize = ctl.RegisterHistogram("requests_size", "", metric.RequestsSize)

if p.config.Auth.Strategy_ != StrategyDisabled {
httpAuthTotal := ctl.RegisterCounterVec("http_auth_success_total", "", "secret_name")
Expand Down Expand Up @@ -425,6 +428,8 @@ func (p *Plugin) ServeHTTP(w http.ResponseWriter, r *http.Request) {
}

func (p *Plugin) serveBulk(w http.ResponseWriter, r *http.Request, meta metadata.MetaData) {
var err error

if r.Method != http.MethodPost {
http.Error(w, "", http.StatusMethodNotAllowed)
return
Expand All @@ -433,6 +438,14 @@ func (p *Plugin) serveBulk(w http.ResponseWriter, r *http.Request, meta metadata
start := time.Now()
p.requestsInProgress.Inc()

r.Body, err = p.calculateRequestSize(r.Body)
if err != nil {
p.errorsTotal.Inc()
p.logger.Error("can't read body", zap.Error(err))
http.Error(w, "can't read body", http.StatusBadRequest)
return
}

reader := io.Reader(r.Body)
if r.Header.Get("Content-Encoding") == "gzip" {
zr, err := p.acquireGzipReader(reader)
Expand Down Expand Up @@ -592,6 +605,22 @@ func (p *Plugin) putGzipReader(reader *gzip.Reader) {
p.gzipReaderPool.Put(reader)
}

func (p *Plugin) calculateRequestSize(reqBody io.Reader) (io.ReadCloser, error) {
bodyBytes, err := io.ReadAll(reqBody)
if err != nil {
return nil, err
}

bodySize := len(bodyBytes)
p.requestsSize.Observe(float64(bodySize))

return io.NopCloser(bytes.NewBuffer(bodyBytes)), nil
}

func resetRequestBody() {

}

func getUserIP(r *http.Request) net.IP {
var userIP string
switch {
Expand Down

0 comments on commit df0ad37

Please sign in to comment.