Skip to content

Commit

Permalink
[querier] remove replica labels by query params
Browse files Browse the repository at this point in the history
  • Loading branch information
taloric authored and lzf575 committed Oct 16, 2023
1 parent 692d3bb commit 6b33299
Show file tree
Hide file tree
Showing 5 changed files with 33 additions and 21 deletions.
1 change: 1 addition & 0 deletions server/querier/app/prometheus/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ type Prometheus struct {
RequestQueryWithDebug bool `default:"false" yaml:"request-query-with-debug"`
ExternalTagCacheSize int `default:"1024" yaml:"external-tag-cache-size"`
ExternalTagLoadInterval int `default:"300" yaml:"external-tag-load-interval"`
ThanosReplicaLabels []string `yaml:"thanos-replica-labels"`
Cache PrometheusCache `yaml:"cache"`
}

Expand Down
19 changes: 10 additions & 9 deletions server/querier/app/prometheus/model/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,16 @@ import (
)

type PromQueryParams struct {
MetricsWithPrefix string
Promql string
StartTime string
EndTime string
Step string
Debug bool
Slimit string
Matchers []string
Context context.Context
MetricsWithPrefix string
Promql string
StartTime string
EndTime string
Step string
Debug bool
Slimit string
ThanosReplicaLabels []string
Matchers []string
Context context.Context
}

type PromQueryData struct {
Expand Down
5 changes: 2 additions & 3 deletions server/querier/app/prometheus/router/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ func promQuery(svc *service.PrometheusService) gin.HandlerFunc {
args.Slimit = c.Request.FormValue("slimit")
debug := c.Request.FormValue("debug")
args.Debug, _ = strconv.ParseBool(debug)

result, err := svc.PromInstantQueryService(&args, c.Request.Context())
if err != nil {
c.JSON(500, &model.PromQueryResponse{Error: err.Error(), Status: _STATUS_FAIL})
Expand Down Expand Up @@ -73,7 +74,6 @@ func promQueryRange(svc *service.PrometheusService) gin.HandlerFunc {
c.JSON(500, &model.PromQueryResponse{Error: err.Error(), Status: _STATUS_FAIL})
return
}
//pp.Println(result)
c.JSON(200, result)
})
}
Expand All @@ -92,9 +92,8 @@ func promReader(svc *service.PrometheusService) gin.HandlerFunc {
c.JSON(500, err)
return
}
//pp.Println(req)

resp, err := svc.PromRemoteReadService(&req, c.Request.Context())
//pp.Println(resp)
if err != nil {
c.JSON(500, err)
return
Expand Down
28 changes: 19 additions & 9 deletions server/querier/app/prometheus/service/converters.go
Original file line number Diff line number Diff line change
Expand Up @@ -592,10 +592,25 @@ func (p *prometheusReader) respTransToProm(ctx context.Context, metricsName stri

// merge and serialize all tags as map key
var deepflowNativeTagString, promTagJson string
var filterTagMap map[string]string
// merge prometheus tags
if tagIndex > -1 {
promTagJson = values[tagIndex].(string)
deepflowNativeTagString = promTagJson
tagMap := make(map[string]string)
json.Unmarshal([]byte(promTagJson), &tagMap)
filterTagMap = make(map[string]string, len(tagMap))
for k, v := range tagMap {
if k == "" || v == "" {
continue
}
// ignore replica labels if passed
if config.Cfg.Prometheus.ThanosReplicaLabels != nil && common.IsValueInSliceString(k, config.Cfg.Prometheus.ThanosReplicaLabels) {
continue
}
filterTagMap[k] = v
}
promFilterTagJson, _ := json.Marshal(filterTagMap)
deepflowNativeTagString = string(promFilterTagJson)
}

// merge deepflow autotagging tags
Expand All @@ -622,14 +637,9 @@ func (p *prometheusReader) respTransToProm(ctx context.Context, metricsName stri

// tag label pair
var pairs []prompb.Label
if tagIndex > -1 {
tagMap := make(map[string]string)
json.Unmarshal([]byte(promTagJson), &tagMap)
pairs = make([]prompb.Label, 0, 1+len(tagMap)+len(allDeepFlowNativeTags))
for k, v := range tagMap {
if k == "" || v == "" {
continue
}
if tagIndex > -1 && filterTagMap != nil {
pairs = make([]prompb.Label, 0, 1+len(filterTagMap)+len(allDeepFlowNativeTags))
for k, v := range filterTagMap {
if prefix == prefixTag {
// prometheus tag for deepflow metrics
pairs = append(pairs, prompb.Label{
Expand Down
1 change: 1 addition & 0 deletions server/server.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,7 @@ querier:
request-query-with-debug: true
external-tag-cache-size: 1024
external-tag-load-interval: 300
thanos-replica-labels: [] # remove duplicate replica labels when query data
cache:
enabled: true
cache-item-size: 512000 # max size of cache item, unit: byte
Expand Down

0 comments on commit 6b33299

Please sign in to comment.