-
Notifications
You must be signed in to change notification settings - Fork 73
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Removed the prometheus/prometheus go library
It has been replaced with a lighter version based on buf.build
- Loading branch information
Showing
11 changed files
with
439 additions
and
2,158 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,109 @@ | ||
package remote | ||
|
||
import ( | ||
"bytes" | ||
"context" | ||
"crypto/tls" | ||
"fmt" | ||
"io" | ||
"net/http" | ||
"net/url" | ||
"time" | ||
|
||
"github.com/gogo/protobuf/proto" | ||
"github.com/golang/snappy" | ||
prompb "go.buf.build/grpc/go/prometheus/prometheus" | ||
) | ||
|
||
type HTTPConfig struct { | ||
Timeout time.Duration | ||
TLSConfig *tls.Config | ||
BasicAuth *BasicAuth | ||
Headers http.Header | ||
} | ||
|
||
type BasicAuth struct { | ||
User, Password string | ||
} | ||
|
||
// WriteClient is a client implementation of the Prometheus remote write protocol. | ||
// It follows the specs defined by the official design document: | ||
// https://docs.google.com/document/d/1LPhVRSFkGNSuU1fBd81ulhsCPR4hkSZyyBj1SZ8fWOM | ||
type WriteClient struct { | ||
hc *http.Client | ||
url *url.URL | ||
cfg *HTTPConfig | ||
} | ||
|
||
func NewWriteClient(endpoint string, cfg *HTTPConfig) (*WriteClient, error) { | ||
if cfg == nil { | ||
cfg = &HTTPConfig{} | ||
} | ||
u, err := url.Parse(endpoint) | ||
if err != nil { | ||
return nil, err | ||
} | ||
wc := &WriteClient{ | ||
hc: &http.Client{ | ||
Timeout: cfg.Timeout, | ||
}, | ||
url: u, | ||
cfg: cfg, | ||
} | ||
if cfg.TLSConfig != nil { | ||
wc.hc.Transport = &http.Transport{ | ||
TLSClientConfig: cfg.TLSConfig, | ||
} | ||
} | ||
return wc, nil | ||
} | ||
|
||
// Store sends a batch of samples to the HTTP endpoint, | ||
// the request is the proto marshaled and encoded. | ||
func (c *WriteClient) Store(ctx context.Context, series []*prompb.TimeSeries) error { | ||
b, err := c.requestBody(series) | ||
if err != nil { | ||
return err | ||
} | ||
req, err := http.NewRequestWithContext( | ||
ctx, http.MethodPost, c.url.String(), bytes.NewReader(b)) | ||
if err != nil { | ||
return fmt.Errorf("create new HTTP request failed: %w", err) | ||
} | ||
if c.cfg.BasicAuth != nil { | ||
req.SetBasicAuth(c.cfg.BasicAuth.User, c.cfg.BasicAuth.Password) | ||
} | ||
|
||
req.Header.Set("User-Agent", "k6-prometheus-rw-output") | ||
|
||
// They are mostly defined by the specs | ||
req.Header.Add("Content-Encoding", "snappy") | ||
req.Header.Set("Content-Type", "application/x-protobuf") | ||
req.Header.Set("X-Prometheus-Remote-Write-Version", "0.1.0") | ||
|
||
resp, err := c.hc.Do(req) | ||
if err != nil { | ||
return fmt.Errorf("HTTP POST request failed: %w", err) | ||
} | ||
defer resp.Body.Close() | ||
io.Copy(io.Discard, resp.Body) //nolint:errcheck | ||
|
||
if resp.StatusCode != http.StatusNoContent { | ||
return fmt.Errorf("got status code: %s instead expected: 204 No Content", resp.Status) | ||
} | ||
return nil | ||
} | ||
|
||
func (c *WriteClient) requestBody(series []*prompb.TimeSeries) ([]byte, error) { | ||
b, err := proto.Marshal(&prompb.WriteRequest{ | ||
Timeseries: series, | ||
}) | ||
if err != nil { | ||
return nil, fmt.Errorf("encoding series as protobuf write request failed: %w", err) | ||
} | ||
if snappy.MaxEncodedLen(len(b)) < 0 { | ||
return nil, fmt.Errorf("the protobuf message is too large to be handled by Snappy encoder; "+ | ||
"size: %d, limit: %d", len(b), 0xffffffff) | ||
} | ||
return snappy.Encode(nil, b), nil | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,132 @@ | ||
package remote | ||
|
||
import ( | ||
"context" | ||
"io" | ||
"net/http" | ||
"net/http/httptest" | ||
"net/url" | ||
"testing" | ||
"time" | ||
|
||
"github.com/stretchr/testify/assert" | ||
"github.com/stretchr/testify/require" | ||
prompb "go.buf.build/grpc/go/prometheus/prometheus" | ||
) | ||
|
||
func TestNewWrtiteClient(t *testing.T) { | ||
t.Parallel() | ||
t.Run("DefaultConfig", func(t *testing.T) { | ||
t.Parallel() | ||
wc, err := NewWriteClient("http://example.com/api/v1/write", nil) | ||
require.NoError(t, err) | ||
require.NotNil(t, wc) | ||
assert.Equal(t, wc.cfg, &HTTPConfig{}) | ||
}) | ||
|
||
t.Run("CustomConfig", func(t *testing.T) { | ||
t.Parallel() | ||
hc := &HTTPConfig{Timeout: time.Second} | ||
wc, err := NewWriteClient("http://example.com/api/v1/write", hc) | ||
require.NoError(t, err) | ||
require.NotNil(t, wc) | ||
assert.Equal(t, wc.cfg, hc) | ||
}) | ||
|
||
t.Run("InvalidURL", func(t *testing.T) { | ||
t.Parallel() | ||
wc, err := NewWriteClient("fake://bad url", nil) | ||
require.Error(t, err) | ||
assert.Nil(t, wc) | ||
}) | ||
} | ||
|
||
func TestClientStore(t *testing.T) { | ||
t.Parallel() | ||
h := func(rw http.ResponseWriter, r *http.Request) { | ||
assert.Equal(t, r.Header.Get("Content-Encoding"), "snappy") | ||
assert.Equal(t, r.Header.Get("Content-Type"), "application/x-protobuf") | ||
assert.Equal(t, r.Header.Get("User-Agent"), "k6-prometheus-rw-output") | ||
assert.Equal(t, r.Header.Get("X-Prometheus-Remote-Write-Version"), "0.1.0") | ||
assert.NotEmpty(t, r.Header.Get("Content-Length")) | ||
|
||
b, err := io.ReadAll(r.Body) | ||
assert.NoError(t, err) | ||
assert.NotEmpty(t, len(b)) | ||
|
||
rw.WriteHeader(http.StatusNoContent) | ||
} | ||
ts := httptest.NewServer(http.HandlerFunc(h)) | ||
defer ts.Close() | ||
|
||
u, err := url.Parse(ts.URL) | ||
require.NoError(t, err) | ||
|
||
c := &WriteClient{ | ||
hc: ts.Client(), | ||
url: u, | ||
cfg: &HTTPConfig{}, | ||
} | ||
data := &prompb.TimeSeries{ | ||
Labels: []*prompb.Label{ | ||
{ | ||
Name: "label1", | ||
Value: "label1-val", | ||
}, | ||
}, | ||
Samples: []*prompb.Sample{ | ||
{ | ||
Value: 8.5, | ||
Timestamp: time.Now().UnixMilli(), | ||
}, | ||
}, | ||
} | ||
err = c.Store(context.Background(), []*prompb.TimeSeries{data}) | ||
assert.NoError(t, err) | ||
} | ||
|
||
func TestClientStoreHTTPError(t *testing.T) { | ||
t.Parallel() | ||
h := func(w http.ResponseWriter, r *http.Request) { | ||
http.Error(w, "bad bad", http.StatusUnauthorized) | ||
} | ||
ts := httptest.NewServer(http.HandlerFunc(h)) | ||
defer ts.Close() | ||
|
||
u, err := url.Parse(ts.URL) | ||
require.NoError(t, err) | ||
|
||
c := &WriteClient{ | ||
hc: ts.Client(), | ||
url: u, | ||
cfg: &HTTPConfig{}, | ||
} | ||
assert.Error(t, c.Store(context.Background(), nil)) | ||
} | ||
|
||
func TestClientStoreHTTPBasic(t *testing.T) { | ||
t.Parallel() | ||
h := func(w http.ResponseWriter, r *http.Request) { | ||
u, pwd, ok := r.BasicAuth() | ||
require.True(t, ok) | ||
assert.Equal(t, "usertest", u) | ||
assert.Equal(t, "pwdtest", pwd) | ||
} | ||
ts := httptest.NewServer(http.HandlerFunc(h)) | ||
defer ts.Close() | ||
|
||
u, err := url.Parse(ts.URL) | ||
require.NoError(t, err) | ||
|
||
c := &WriteClient{ | ||
hc: ts.Client(), | ||
url: u, | ||
cfg: &HTTPConfig{ | ||
BasicAuth: &BasicAuth{ | ||
User: "usertest", | ||
Password: "pwdtest", | ||
}, | ||
}, | ||
} | ||
assert.Error(t, c.Store(context.Background(), nil)) | ||
} |
Oops, something went wrong.