Skip to content

Commit

Permalink
feat(metrics): support https
Browse files Browse the repository at this point in the history
Signed-off-by: Rory Z <[email protected]>
  • Loading branch information
Rory-Z committed Nov 1, 2023
1 parent 4de0afe commit 0c8e712
Show file tree
Hide file tree
Showing 7 changed files with 93 additions and 66 deletions.
17 changes: 8 additions & 9 deletions client/client_4.x.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,9 @@ import (
var _ client = &cluster4x{}

type cluster4x struct {
username string
password string
version string
client *fasthttp.Client
version string
client *fasthttp.Client
uri *fasthttp.URI
}

func (n *cluster4x) getVersion() string {
Expand All @@ -33,7 +32,7 @@ func (n *cluster4x) getLicense() (lic *collector.LicenseInfo, err error) {
}
Code int
}{}
data, statusCode, err := callHTTPGet(n.client, "/api/v4/license", n.username, n.password)
data, statusCode, err := callHTTPGet(n.client, n.uri, "/api/v4/license")
if statusCode == http.StatusNotFound {
// open source version doesn't support license api
err = nil
Expand Down Expand Up @@ -82,7 +81,7 @@ func (n *cluster4x) getClusterStatus() (cluster collector.ClusterStatus, err err
}
Code int
}{}
err = callHTTPGetWithResp(n.client, "/api/v4/nodes", n.username, n.password, &resp)
err = callHTTPGetWithResp(n.client, n.uri, "/api/v4/nodes", &resp)
if err != nil {
return
}
Expand Down Expand Up @@ -123,7 +122,7 @@ func (n *cluster4x) getBrokerMetrics() (metrics *collector.Broker, err error) {
}
Code int
}{}
data, statusCode, err := callHTTPGet(n.client, "/api/v4/monitor/current_metrics", n.username, n.password)
data, statusCode, err := callHTTPGet(n.client, n.uri, "/api/v4/monitor/current_metrics")
if statusCode == http.StatusNotFound {
// open source version doesn't support this api
err = nil
Expand Down Expand Up @@ -178,7 +177,7 @@ func (n *cluster4x) getRuleEngineMetrics() (metrics []collector.RuleEngine, err
}
Code int
}{}
err = callHTTPGetWithResp(n.client, "/api/v4/rules?_limit=10000", n.username, n.password, &resp)
err = callHTTPGetWithResp(n.client, n.uri, "/api/v4/rules?_limit=10000", &resp)
if err != nil {
return
}
Expand Down Expand Up @@ -236,7 +235,7 @@ func (n *cluster4x) getDataBridge() (bridges []collector.DataBridge, err error)
}
Code int
}{}
err = callHTTPGetWithResp(n.client, "/api/v4/resources", n.username, n.password, &resp)
err = callHTTPGetWithResp(n.client, n.uri, "/api/v4/resources", &resp)
if err != nil {
return
}
Expand Down
31 changes: 15 additions & 16 deletions client/client_5.x.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,10 @@ import (
var _ client = &cluster5x{}

type cluster5x struct {
username string
password string
version string
edition edition
client *fasthttp.Client
version string
edition edition
client *fasthttp.Client
uri *fasthttp.URI
}

func (n *cluster5x) getVersion() string {
Expand All @@ -32,7 +31,7 @@ func (n *cluster5x) getLicense() (lic *collector.LicenseInfo, err error) {
MaxConnections int64 `json:"max_connections"`
ExpiryAt string `json:"expiry_at"`
}{}
err = callHTTPGetWithResp(n.client, "/api/v5/license", n.username, n.password, &resp)
err = callHTTPGetWithResp(n.client, n.uri, "/api/v5/license", &resp)
if err != nil {
return
}
Expand Down Expand Up @@ -63,7 +62,7 @@ func (n *cluster5x) getClusterStatus() (cluster collector.ClusterStatus, err err
Load5 any `json:"load5"`
Load15 any `json:"load15"`
}{{}}
err = callHTTPGetWithResp(n.client, "/api/v5/nodes", n.username, n.password, &resp)
err = callHTTPGetWithResp(n.client, n.uri, "/api/v5/nodes", &resp)
if err != nil {
return
}
Expand Down Expand Up @@ -109,7 +108,7 @@ func (n *cluster5x) getBrokerMetrics() (metrics *collector.Broker, err error) {
SentMsgRate int64 `json:"sent_msg_rate"`
ReceivedMsgRate int64 `json:"received_msg_rate"`
}{}
err = callHTTPGetWithResp(n.client, "/api/v5/monitor_current", n.username, n.password, &resp)
err = callHTTPGetWithResp(n.client, n.uri, "/api/v5/monitor_current", &resp)
if err != nil {
return
}
Expand All @@ -129,7 +128,7 @@ func (n *cluster5x) getRuleEngineMetrics() (metrics []collector.RuleEngine, err
Enable bool
}
}{}
err = callHTTPGetWithResp(n.client, "/api/v5/rules?limit=10000", n.username, n.password, &resp)
err = callHTTPGetWithResp(n.client, n.uri, "/api/v5/rules?limit=10000", &resp)
if err != nil {
return
}
Expand Down Expand Up @@ -157,7 +156,7 @@ func (n *cluster5x) getRuleEngineMetrics() (metrics []collector.RuleEngine, err
}
} `json:"node_metrics"`
}{}
err = callHTTPGetWithResp(n.client, fmt.Sprintf("/api/v5/rules/%s/metrics", rule.ID), n.username, n.password, &metricsResp)
err = callHTTPGetWithResp(n.client, n.uri, fmt.Sprintf("/api/v5/rules/%s/metrics", rule.ID), &metricsResp)
if err != nil {
return
}
Expand Down Expand Up @@ -190,7 +189,7 @@ func (n *cluster5x) getDataBridge() (bridges []collector.DataBridge, err error)
Type string
Status string
}{{}}
err = callHTTPGetWithResp(n.client, "/api/v5/bridges", n.username, n.password, &bridgesResp)
err = callHTTPGetWithResp(n.client, n.uri, "/api/v5/bridges", &bridgesResp)
if err != nil {
return
}
Expand All @@ -214,7 +213,7 @@ func (n *cluster5x) getDataBridge() (bridges []collector.DataBridge, err error)
Dropped int64
}
}{}
err = callHTTPGetWithResp(n.client, fmt.Sprintf("/api/v5/bridges/%s:%s/metrics", data.Type, data.Name), n.username, n.password, &metricsResp)
err = callHTTPGetWithResp(n.client, n.uri, fmt.Sprintf("/api/v5/bridges/%s:%s/metrics", data.Type, data.Name), &metricsResp)
if err != nil {
return
}
Expand All @@ -233,7 +232,7 @@ func (n *cluster5x) getAuthenticationMetrics() (dataSources []collector.DataSour
Backend string
Enable bool
}{{}}
err = callHTTPGetWithResp(n.client, "/api/v5/authentication", n.username, n.password, &resp)
err = callHTTPGetWithResp(n.client, n.uri, "/api/v5/authentication", &resp)
if err != nil {
return
}
Expand All @@ -257,7 +256,7 @@ func (n *cluster5x) getAuthenticationMetrics() (dataSources []collector.DataSour
} `json:"node_metrics"`
Status string
}{}
err = callHTTPGetWithResp(n.client, fmt.Sprintf("/api/v5/authentication/%s/status", plugin.ID), n.username, n.password, &status)
err = callHTTPGetWithResp(n.client, n.uri, fmt.Sprintf("/api/v5/authentication/%s/status", plugin.ID), &status)
if err != nil {
return
}
Expand Down Expand Up @@ -296,7 +295,7 @@ func (n *cluster5x) getAuthorizationMetrics() (dataSources []collector.DataSourc
Enable bool
}
}{}
err = callHTTPGetWithResp(n.client, "/api/v5/authorization/sources", n.username, n.password, &resp)
err = callHTTPGetWithResp(n.client, n.uri, "/api/v5/authorization/sources", &resp)
if err != nil {
return
}
Expand All @@ -320,7 +319,7 @@ func (n *cluster5x) getAuthorizationMetrics() (dataSources []collector.DataSourc
} `json:"node_metrics"`
Status string
}{}
err = callHTTPGetWithResp(n.client, fmt.Sprintf("/api/v5/authorization/sources/%s/status", plugin.Type), n.username, n.password, &status)
err = callHTTPGetWithResp(n.client, n.uri, fmt.Sprintf("/api/v5/authorization/sources/%s/status", plugin.Type), &status)
if err != nil {
return
}
Expand Down
13 changes: 6 additions & 7 deletions client/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,12 @@ func NewCluster(metrics *config.Metrics, logger log.Logger) collector.Cluster {
c := &cluster{}

go func() {
httpClient := getHTTPClient(metrics.Target)
httpClient := getHTTPClient(metrics)
uri := getURI(metrics)
for {
client4 := &cluster4x{
username: metrics.APIKey,
password: metrics.APISecret,
client: httpClient,
client: httpClient,
uri: uri,
}
if _, err := client4.getClusterStatus(); err == nil {
c.client = client4
Expand All @@ -38,9 +38,8 @@ func NewCluster(metrics *config.Metrics, logger log.Logger) collector.Cluster {
}

client5 := &cluster5x{
username: metrics.APIKey,
password: metrics.APISecret,
client: httpClient,
client: httpClient,
uri: uri,
}
if _, err := client5.getClusterStatus(); err == nil {
c.client = client5
Expand Down
44 changes: 23 additions & 21 deletions client/utils.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package client

import (
"encoding/base64"
"emqx-exporter/config"
"errors"
"fmt"
"net/http"
Expand Down Expand Up @@ -29,52 +29,54 @@ func cutNodeName(nodeName string) string {
return slice[1]
}

func getHTTPClient(host string) *fasthttp.Client {
func getURI(metrics *config.Metrics) *fasthttp.URI {
uri := &fasthttp.URI{}
uri.SetUsername(metrics.APIKey)
uri.SetPassword(metrics.APISecret)
uri.SetScheme(metrics.Scheme)
uri.SetHost(metrics.Target)
return uri
}

func getHTTPClient(metrics *config.Metrics) *fasthttp.Client {
return &fasthttp.Client{
Name: "EMQX-Exporter", //User-Agent
MaxConnsPerHost: 5,
MaxIdleConnDuration: 30 * time.Second,
ReadTimeout: 5 * time.Second,
WriteTimeout: 5 * time.Second,
MaxConnWaitTimeout: 5 * time.Second,
ConfigureClient: func(hc *fasthttp.HostClient) error {
hc.Addr = host
return nil
},
TLSConfig: metrics.TLSClientConfig.ToTLSConfig(),
}
}

func callHTTPGet(client *fasthttp.Client, uri, username, password string) (data []byte, statusCode int, err error) {
func callHTTPGet(client *fasthttp.Client, uri *fasthttp.URI, requestURI string) (data []byte, statusCode int, err error) {
req := fasthttp.AcquireRequest()
defer fasthttp.ReleaseRequest(req)

req.SetRequestURI(uri)
req.SetURI(uri)
req.URI().SetPath(requestURI)
req.Header.SetMethod(http.MethodGet)
req.Header.Set("Authorization", "Basic "+base64.StdEncoding.EncodeToString([]byte(username+":"+password)))
// for fasthttp, must set host, otherwise will panic
// but it doesn't matter what value is set
// the host will be replaced by the real host in fasthttp.Client.ConfigureClient
req.Header.SetHost("emqx-exporter")

resp := fasthttp.AcquireResponse()
defer fasthttp.ReleaseResponse(resp)

err = client.Do(req, resp)
if err != nil {
err = fmt.Errorf("request %s failed. %w", uri, err)
err = fmt.Errorf("request %s failed. %w", req.URI().String(), err)
return
}

statusCode = resp.StatusCode()

if resp.StatusCode() != http.StatusOK {
err = fmt.Errorf("%s: %s", uri, http.StatusText(resp.StatusCode()))
err = fmt.Errorf("%s: %s", req.URI().String(), http.StatusText(resp.StatusCode()))
return
}

data = resp.Body()
if len(data) == 0 {
err = fmt.Errorf("get nothing from api %s", uri)
err = fmt.Errorf("get nothing from api %s", req.URI().String())
return
}
if !jsoniter.Valid(data) {
Expand All @@ -87,12 +89,12 @@ func callHTTPGet(client *fasthttp.Client, uri, username, password string) (data
if code.ValueType() == jsoniter.NumberValue {
// for emqx 4.4, it will return integer type code if occurred error
if code.ToInt() != 0 {
errMsg = fmt.Sprintf("%s: %d", uri, code.ToInt())
errMsg = fmt.Sprintf("%s: %d", req.URI().String(), code.ToInt())
}
} else if code.ValueType() == jsoniter.StringValue {
// for emqx 5, it will return string type code if occurred error
if code.ToString() != "" {
errMsg = fmt.Sprintf("%s: %s", uri, code.ToString())
errMsg = fmt.Sprintf("%s: %s", req.URI().String(), code.ToString())
}
}

Expand All @@ -106,15 +108,15 @@ func callHTTPGet(client *fasthttp.Client, uri, username, password string) (data
return
}

func callHTTPGetWithResp(client *fasthttp.Client, uri, username, password string, respData interface{}) (err error) {
data, _, err := callHTTPGet(client, uri, username, password)
func callHTTPGetWithResp(client *fasthttp.Client, uri *fasthttp.URI, requestURI string, respData interface{}) (err error) {
data, _, err := callHTTPGet(client, uri, requestURI)
if err != nil {
return
}

err = jsoniter.Unmarshal(data, respData)
if err != nil {
err = fmt.Errorf("unmarshal api resp failed: %s, %s", uri, err.Error())
err = fmt.Errorf("unmarshal api resp failed: %s, %s", requestURI, err.Error())
return
}
return
Expand Down
35 changes: 28 additions & 7 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,11 @@ type Config struct {
}

type Metrics struct {
Target string `yaml:"target"`
APIKey string `yaml:"api_key"`
APISecret string `yaml:"api_secret"`
APIKey string `yaml:"api_key"`
APISecret string `yaml:"api_secret"`
Target string `yaml:"target"`
Scheme string `yaml:"scheme,omitempty"`
TLSClientConfig *TLSClientConfig `yaml:"tls_config,omitempty"`
}

type Probe struct {
Expand Down Expand Up @@ -103,16 +105,32 @@ func (sc *SafeConfig) ReloadConfig(confFile string) (err error) {
}

if c.Metrics != nil {
if c.Metrics.Target == "" {
return fmt.Errorf("metrics.target is required")
}
if c.Metrics.APIKey == "" {
return fmt.Errorf("metrics.api_key is required")
}

if c.Metrics.APISecret == "" {
return fmt.Errorf("metrics.api_secret is required")
}
if c.Metrics.Target == "" {
return fmt.Errorf("metrics.target is required")
}
if c.Metrics.TLSClientConfig != nil {
if c.Metrics.Scheme == "" {
c.Metrics.Scheme = "https"
}
if c.Metrics.TLSClientConfig.CAData, err = dataFromSliceOrFile(c.Metrics.TLSClientConfig.CAData, c.Metrics.TLSClientConfig.CAFile); err != nil {
return fmt.Errorf("metrics.ssl_config.ca_data: %s", err)
}
if c.Metrics.TLSClientConfig.CertData, err = dataFromSliceOrFile(c.Metrics.TLSClientConfig.CertData, c.Metrics.TLSClientConfig.CertFile); err != nil {
return fmt.Errorf("metrics.ssl_config.cert_data: %s", err)
}
if c.Metrics.TLSClientConfig.KeyData, err = dataFromSliceOrFile(c.Metrics.TLSClientConfig.KeyData, c.Metrics.TLSClientConfig.KeyFile); err != nil {
return fmt.Errorf("metrics.ssl_config.key_data: %s", err)
}
}
if c.Metrics.Scheme == "" {
c.Metrics.Scheme = "http"
}
}

for index, probe := range c.Probes {
Expand Down Expand Up @@ -153,6 +171,9 @@ func (sc *SafeConfig) ReloadConfig(confFile string) (err error) {
}

func (conf *TLSClientConfig) ToTLSConfig() *tls.Config {
if conf == nil {
return nil
}
certpool := x509.NewCertPool()
certpool.AppendCertsFromPEM(conf.CAData)
clientKeyPair, _ := tls.X509KeyPair(conf.CertData, conf.KeyData)
Expand Down
9 changes: 3 additions & 6 deletions config/example/config.yaml
Original file line number Diff line number Diff line change
@@ -1,13 +1,10 @@
metrics:
## EMQX API
api_key: "some_api_key" ## EMQX API key
api_secret: "some_api_secret" ## EMQX API secret
target: 127.0.0.1:18083
## EMQX API key
api_key: "some_api_key"
## EMQX API secret
api_secret: "some_api_secret"
probes:
- target: 127.0.0.1:1883 ## MQTT broker address
scheme: ## tcp, default is tcp
scheme: ## mqtt | tcp | mqtts | ssl | tls | ws | wss, default is tcp
client_id:
username:
password:
Expand Down
Loading

0 comments on commit 0c8e712

Please sign in to comment.