Skip to content

Commit

Permalink
sq: 从tunnel取指标信息
Browse files Browse the repository at this point in the history
---
up TODO

if tunnel.Status != "CONNECT" {

+filterkey_node

+labels

NewRequest(http.MethodGet > tp.TextToMetricFamilies(resp.Body)

func tunMerge: req-uds

up

merge: clear format notes

s
  • Loading branch information
sam@zm4210 authored and sam#gemmi-win10 committed Dec 10, 2022
1 parent 1993a5d commit 56baa01
Show file tree
Hide file tree
Showing 2 changed files with 122 additions and 31 deletions.
148 changes: 119 additions & 29 deletions merger/merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@ import (
"golang.org/x/sync/errgroup"

"math/rand"
// "net/http"
"net/http"
"strings"
"net"
// "net/http/httptest"
// "net/url"
// "strconv"
Expand All @@ -32,13 +34,11 @@ func (m *merger) merge(ctx context.Context, w io.Writer) error {
source := source
g.Go(func() error {
//请求source, 取得body
resp, err := m.client.Get(source.url)
resp, err := m.client.Get(source.url) //if uds
if err != nil {
fmt.Println("[ERR] cli.Get")
// fmt.Println("[ERR] get url: %s", source.url)
fmt.Println("[ERR] get url: %s", source.url)
//return errors.Wrap(err, fmt.Sprintf("get url: %s", source.url))
// xiechen-func内? continue不可用
return nil
return nil //xiechen-func内? continue不可用
}
defer resp.Body.Close()

Expand All @@ -47,16 +47,14 @@ func (m *merger) merge(ctx context.Context, w io.Writer) error {
//prom2json.FetchMetricFamilies: https://github.com/LyridInc/prom2lyrid/blob/b2ee0cdb0b4d173bce7ffa6e0ac8993a16733c23/model/endpoint.go
out, err := tp.TextToMetricFamilies(resp.Body)
if err != nil {
fmt.Println("[ERR] TextToMetricFamilies")
// fmt.Println("[ERR] parse url: %s", source.url)
fmt.Println("[ERR] parse body: %s", source.url)
// return errors.Wrap(err, fmt.Sprintf("parse url: %s", source.url))
return nil
}

// 按out依次解析name
mu.Lock()
defer mu.Unlock()
// var metricFamily2 *prom.MetricFamily
for name, metricFamily := range out {
// fmt.Println("name: "+name)
// append labels
Expand All @@ -71,43 +69,28 @@ func (m *merger) merge(ctx context.Context, w io.Writer) error {
} else {
result[name] = metricFamily //不存在时,按name加1条
}
// metricFamily2= metricFamily
}

// TODO result: 每个source下面增加一条 filter_$type=0的metric (用于filter标识);
// ===> filter匹配: type|dash1,dash2,dashId3,..
/* _= &prom.MetricFamily{
"",
"",
nil,
nil,
} */

//+DO: 每个source下面增加一条 filterkey_$type=0的metric (用于filter标识); ==> filter匹配: type|dash1,dash2,dashId3,..
if ""==source.filter {
source.filter= "filter_nontype"
source.filter= "filterkey_nontype"
}
metricFamily2, _:= filterMetric(source)
//name来自metricKeyName; 但这里的设定不会在http输出生效
// result[source.filter] = metricFamily2 //lazy 直接用最后一个 metricFamily

// append metrics //同key 不同label的merge?
if mfResult, ok := result[source.filter]; ok { //key可获得value值时(*prom.MetricFamily),value追加
mfResult.Metric = append(mfResult.Metric, metricFamily2.Metric...)
} else {
//result[name], name来自metricKeyName; 但这里的设定不会在http输出生效
result[source.filter] = metricFamily2 //不存在时,按name加1条
}
return nil
})
}

// TODO 解析tunService.cmap中status=CONNECT的指标;
detailsMap := m.ReverseTunnelService.GetTunnelDetailsMap()
for item := range detailsMap.IterBuffered() {
tunnel := item.Val.(*chserver.TunnelDetails)
fmt.Println(tunnel.Meta.Target)
}

m.tunMerge(result)

// wait to process all routines
if err := g.Wait(); err != nil { //并发结束
return err
Expand Down Expand Up @@ -138,6 +121,113 @@ func (m *merger) merge(ctx context.Context, w io.Writer) error {
return nil
}

func (m *merger) tunMerge(result map[string]*prom.MetricFamily) error {
mu := &sync.Mutex{}
detailsMap := m.ReverseTunnelService.GetTunnelDetailsMap()
for item := range detailsMap.IterBuffered() {
epID:= item.Key
tunnel := item.Val.(*chserver.TunnelDetails)
fmt.Println("tun-Meta: ", tunnel.Meta)
if tunnel.Status != "CONNECT" {
fmt.Println("[ERR] tunnel.Status: %s", tunnel.Status)
return nil //err
}

// resp, err := m.client.Get(source.url) //if uds
requestURL := fmt.Sprintf("http://localhost/metrics")
req, err := http.NewRequest(http.MethodGet, requestURL, nil)
if err != nil {
fmt.Println("[ERR] get url(req): %s", requestURL)
return err
}

// Transport tr: unixMode
rUds:= strings.ReplaceAll(tunnel.Meta.LocalUds, "/", "-") //chclient/poll.go: createTunnel
sockpath := "/tmp/chserver-sock/"+string(epID)+rUds //"-tmp-node-exporter1.sock"
tr := newSocketTransport(sockpath)

httpClient := &http.Client{
Transport: tr,
Timeout: 10 * time.Second,
}
resp, err := httpClient.Do(req)
if err != nil {
fmt.Println("[ERR] get url(cli.Do): %s", sockpath, err)
return err
}
defer resp.Body.Close()
defer httpClient.CloseIdleConnections()
// fmt.Printf("resp.Body: %v\n", resp.Body) //obj's addr


var labels []*prom.LabelPair
k:= "mtarget"
v:= fmt.Sprintf("%s-%s", tunnel.Meta.Desc, tunnel.Meta.Target)
labels = append(labels, &prom.LabelPair{Name: &k, Value: &v})
fmt.Printf("[INFO] add url: tun-%s with labels: %v\n", tunnel.Meta.Target, labels)


//====func (body, labels)================================
//解析到out下 //map[string]*prom.MetricFamily
tp := new(expfmt.TextParser)
//prom2json.FetchMetricFamilies: https://github.com/LyridInc/prom2lyrid/blob/b2ee0cdb0b4d173bce7ffa6e0ac8993a16733c23/model/endpoint.go
out, err := tp.TextToMetricFamilies(resp.Body)
if err != nil {
fmt.Println("[ERR] parse body: %s", sockpath)
return nil
}
// fmt.Println("out: ", out)

// 按out依次解析name
mu.Lock()
defer mu.Unlock()
for name, metricFamily := range out {
// append labels
if len(labels) > 0 {
for _, metric := range metricFamily.Metric {
metric.Label = append(metric.Label, labels...)
}
}

// append metrics
if mfResult, ok := result[name]; ok { //key可获得value值时(*prom.MetricFamily),value追加
mfResult.Metric = append(mfResult.Metric, metricFamily.Metric...)
} else {
result[name] = metricFamily //不存在时,按name加1条
}
}

//
//+DO: 每个source下面增加一条 filterkey_$type=0的metric (用于filter标识); ==> filter匹配: type|dash1,dash2,dashId3,..
source:= &source{}
source.labels= labels
if ""==source.filter {
source.filter= "filterkey_node" //"filter_nontype"
}
metricFamily2, _:= filterMetric(source)
// append metrics //同key 不同label的merge?
if mfResult, ok := result[source.filter]; ok { //key可获得value值时(*prom.MetricFamily),value追加
mfResult.Metric = append(mfResult.Metric, metricFamily2.Metric...)
} else {
//result[name], name来自metricKeyName; 但这里的设定不会在http输出生效
result[source.filter] = metricFamily2 //不存在时,按name加1条
}
}

return nil
}

func newSocketTransport(socketPath string) *http.Transport {
defaultTimeout := 10 * time.Second

tr := new(http.Transport)
tr.DisableCompression = true
tr.DialContext = func(_ context.Context, _, _ string) (net.Conn, error) {
return net.DialTimeout("unix", socketPath, defaultTimeout)
}
return tr
}

func addStaticFilterByType(mtype string, source *source, result map[string]*prom.MetricFamily) error {
if ""==source.filter {
source.filter= "filter_nontype"
Expand Down
5 changes: 3 additions & 2 deletions sam-custom.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,16 @@
**TODO**

- merger ~~ep节点容错~~; ~~追加dash-id配置参数指标~~
- ~~chserver,取tunMap信息~~, 获取node_exp指标(unixDomainSock)
- ~~chserver,取tunMap信息~~, ~~获取node_exp指标(unixDomainSock) 22.12.10~~
-
- merger.json动态加载; conf:支持读取unixDomainSock
- node_exporter > mids_exporter绑定到本地uds
- ~~node_exporter~~, flag参数; > mids_exporter绑定到本地uds

```bash
# metricFamily.Metric
https://github.com/prometheus/client_model #v1.x 旧版本

# chisel-uds
curl -fSL --unix-socket /tmp/chserver-sock/10002-tmp-node-exporter1.sock http://localhost/metrics
```

Expand Down

0 comments on commit 56baa01

Please sign in to comment.