Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support extra filter for promql query #8263

Merged
merged 1 commit into from
Oct 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion server/querier/app/prometheus/cache/keyGenerator.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,10 @@ type CacheKeyGenerator struct {
// generate key without query time (start/end) for cache query
func (k *CacheKeyGenerator) GenerateCacheKey(req *model.DeepFlowPromRequest) string {
return fmt.Sprintf(
"df:%s:%s:%s:%d:%d:%d:%s",
"df:%s:%s:%s:%s:%d:%d:%d:%s",
req.OrgID,
strings.Join(req.BlockTeamID, "-"),
req.ExtraFilters,
req.Query,
req.Step,
req.Start%int64(req.Step.Seconds()), // real interval for data
Expand Down
12 changes: 6 additions & 6 deletions server/querier/app/prometheus/cache/remoteread_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,7 @@ func NewRemoteReadQueryCache() *RemoteReadQueryCache {
return s
}

func (s *RemoteReadQueryCache) AddOrMerge(req *prompb.ReadRequest, resp *prompb.ReadResponse, orgFilter string) *prompb.ReadResponse {
func (s *RemoteReadQueryCache) AddOrMerge(req *prompb.ReadRequest, resp *prompb.ReadResponse, orgFilter string, extraFilters string) *prompb.ReadResponse {
if req == nil || len(req.Queries) == 0 {
return resp
}
Expand All @@ -320,7 +320,7 @@ func (s *RemoteReadQueryCache) AddOrMerge(req *prompb.ReadRequest, resp *prompb.
return resp
}

key := promRequestToCacheKey(q, orgFilter)
key := promRequestToCacheKey(q, orgFilter, extraFilters)
start, end := GetPromRequestQueryTime(q)
start = timeAlign(start)

Expand Down Expand Up @@ -398,11 +398,11 @@ func copyResponse(cached *prompb.ReadResponse) *prompb.ReadResponse {
return resp
}

func (s *RemoteReadQueryCache) Remove(req *prompb.ReadRequest, orgFilter string) {
func (s *RemoteReadQueryCache) Remove(req *prompb.ReadRequest, orgFilter string, extraFilter string) {
if req == nil || len(req.Queries) == 0 {
return
}
key := promRequestToCacheKey(req.Queries[0], orgFilter)
key := promRequestToCacheKey(req.Queries[0], orgFilter, extraFilter)
s.lock.Lock()
defer s.lock.Unlock()

Expand All @@ -412,15 +412,15 @@ func (s *RemoteReadQueryCache) Remove(req *prompb.ReadRequest, orgFilter string)
}
}

func (s *RemoteReadQueryCache) Get(req *prompb.Query, start int64, end int64, orgFilter string) (*CacheItem, CacheHit, int64, int64) {
func (s *RemoteReadQueryCache) Get(req *prompb.Query, start int64, end int64, orgFilter string, extraFilters string) (*CacheItem, CacheHit, int64, int64) {
if req.Hints.Func == "series" {
// for series api, don't use cache
// not count cache miss here
return nil, CacheMiss, start, end
}

// for query api, cache query samples
key := promRequestToCacheKey(req, orgFilter)
key := promRequestToCacheKey(req, orgFilter, extraFilters)
start = timeAlign(start)

// lock for concurrency key reading
Expand Down
5 changes: 4 additions & 1 deletion server/querier/app/prometheus/cache/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,14 @@ func GetPromRequestQueryTime(q *prompb.Query) (int64, int64) {
return q.Hints.StartMs / 1000, endTime
}

func promRequestToCacheKey(q *prompb.Query, orgFilter string) string {
func promRequestToCacheKey(q *prompb.Query, orgFilter string, extraFilters string) string {
matcher := &strings.Builder{}
if len(orgFilter) > 0 {
matcher.WriteString(orgFilter + "-")
}
if len(extraFilters) > 0 {
matcher.WriteString(extraFilters + "-")
}
for i := 0; i < len(q.Matchers); i++ {
matcher.WriteString(q.Matchers[i].GetName() + q.Matchers[i].Type.String() + q.Matchers[i].GetValue())
matcher.WriteByte('-')
Expand Down
40 changes: 21 additions & 19 deletions server/querier/app/prometheus/model/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,17 +24,18 @@ import (
)

type PromQueryParams struct {
Debug bool
Offloading bool
Slimit int
Promql string
StartTime string
EndTime string
Step string
OrgID string
Matchers []string
BlockTeamID []string
Context context.Context
Debug bool
Offloading bool
Slimit int
Promql string
StartTime string
EndTime string
Step string
OrgID string
ExtraFilters string
Matchers []string
BlockTeamID []string
Context context.Context
}

type PromQueryData struct {
Expand Down Expand Up @@ -81,12 +82,13 @@ type WrapHistorySeries struct {
}

type DeepFlowPromRequest struct {
Slimit int
Start int64
End int64
Step time.Duration
Query string
OrgID string
BlockTeamID []string
Matchers []string
Slimit int
Start int64
End int64
Step time.Duration
Query string
OrgID string
ExtraFilters string
BlockTeamID []string
Matchers []string
}
2 changes: 2 additions & 0 deletions server/querier/app/prometheus/router/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ func promQuery(svc *service.PrometheusService) gin.HandlerFunc {
debug := c.Request.FormValue("debug")
block_team_id := c.Request.FormValue("block-team-id") // when parsed, block all team in query
offloading := c.Request.FormValue("operator-offloading")
args.ExtraFilters = c.Request.FormValue("extra-filters")
setRouterArgs(slimit, &args.Slimit, config.Cfg.Prometheus.SeriesLimit, strconv.Atoi)
setRouterArgs(debug, &args.Debug, config.Cfg.Prometheus.RequestQueryWithDebug, strconv.ParseBool)
setRouterArgs(offloading, &args.Offloading, config.Cfg.Prometheus.OperatorOffloading, strconv.ParseBool)
Expand Down Expand Up @@ -185,6 +186,7 @@ func promSeriesReader(svc *service.PrometheusService) gin.HandlerFunc {
debug := c.Request.FormValue("debug")
block_team_id := c.Request.FormValue("block-team-id")
offloading := c.Request.FormValue("operator-offloading")
args.ExtraFilters = c.Request.FormValue("extra-filters")
setRouterArgs(debug, &args.Debug, config.Cfg.Prometheus.RequestQueryWithDebug, strconv.ParseBool)
setRouterArgs(offloading, &args.Offloading, config.Cfg.Prometheus.OperatorOffloading, strconv.ParseBool)
err := setRouterArgs(block_team_id, &args.BlockTeamID, nil, splitStrings)
Expand Down
13 changes: 11 additions & 2 deletions server/querier/app/prometheus/service/converters.go
Original file line number Diff line number Diff line change
Expand Up @@ -362,6 +362,9 @@ func (p *prometheusReader) promReaderTransToSQL(ctx context.Context, req *prompb
if len(p.blockTeamID) > 0 {
filters = append(filters, fmt.Sprintf("team_id not in (%s)", strings.Join(p.blockTeamID, ",")))
}
if len(p.extraFilters) > 0 {
filters = append(filters, fmt.Sprintf("(%s)", p.extraFilters))
}

sql := parseToQuerierSQL(ctx, db, table, metricsArray, filters, groupBy, orderBy)
return ctx, sql, db, dataPrecision, queryMetric, err
Expand Down Expand Up @@ -839,7 +842,7 @@ func (p *prometheusReader) respTransToProm(ctx context.Context, metricsName stri
continue
}

if metricsType != "Int" && metricsType != "Float64" {
if metricsType != "Int" && metricsType != "Float64" && metricsType != "UInt64" {
return nil, fmt.Errorf("unknown metrics type %s, value = %v ", metricsType, values[columnIndexes[METRICS_INDEX]])
}

Expand Down Expand Up @@ -917,7 +920,7 @@ func (p *prometheusReader) respTransToProm(ctx context.Context, metricsName stri
return resp, nil
}

func convertTo[T int | float64](val interface{}) (v T, b bool) {
func convertTo[T int | float64 | uint64](val interface{}) (v T, b bool) {
v, b = val.(T)
return
}
Expand All @@ -935,6 +938,9 @@ func parseValue(valueType string, val interface{}) (v float64, b bool) {
return float64(metricsValueInt), ok
}
return metricsValueFloat, ok
case "UInt64":
metricsValueUint, ok := convertTo[uint64](val)
return float64(metricsValueUint), ok
default:
return 0, false
}
Expand Down Expand Up @@ -1094,6 +1100,9 @@ func (p *prometheusReader) parseQueryRequestToSQL(ctx context.Context, queryReq
if len(p.blockTeamID) > 0 {
filters = append(filters, fmt.Sprintf("team_id not in (%s)", strings.Join(p.blockTeamID, ",")))
}
if len(p.extraFilters) > 0 {
filters = append(filters, fmt.Sprintf("(%s)", p.extraFilters))
}
sql := parseToQuerierSQL(ctx, chCommon.DB_NAME_PROMETHEUS, queryReq.GetMetric(), selection, filters, groupBy, orderBy)
return sql
}
Expand Down
35 changes: 21 additions & 14 deletions server/querier/app/prometheus/service/promql.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ func (p *prometheusExecutor) promQueryExecute(ctx context.Context, args *model.P
orgID: args.OrgID,
slimit: args.Slimit,
blockTeamID: args.BlockTeamID,
extraFilters: args.ExtraFilters,
getExternalTagFromCache: p.convertExternalTagToQuerierAllowTag,
addExternalTagToCache: p.addExtraLabelsToCache,
}
Expand Down Expand Up @@ -201,6 +202,7 @@ func (p *prometheusExecutor) promQueryRangeExecute(ctx context.Context, args *mo
slimit: args.Slimit,
orgID: args.OrgID,
blockTeamID: args.BlockTeamID,
extraFilters: args.ExtraFilters,
getExternalTagFromCache: p.convertExternalTagToQuerierAllowTag,
addExternalTagToCache: p.addExtraLabelsToCache,
}
Expand Down Expand Up @@ -266,18 +268,20 @@ func (p *prometheusExecutor) offloadRangeQueryExecute(ctx context.Context, args
slimit: args.Slimit,
orgID: args.OrgID,
blockTeamID: args.BlockTeamID,
extraFilters: args.ExtraFilters,
getExternalTagFromCache: p.convertExternalTagToQuerierAllowTag,
addExternalTagToCache: p.addExtraLabelsToCache,
}
queryRequests := analyzer.parsePromQL(args.Promql, start, end, step)
promRequest := &model.DeepFlowPromRequest{
Slimit: args.Slimit,
Start: start.UnixMilli(),
End: end.UnixMilli(),
Step: step,
Query: args.Promql,
OrgID: args.OrgID,
BlockTeamID: args.BlockTeamID,
Slimit: args.Slimit,
Start: start.UnixMilli(),
End: end.UnixMilli(),
Step: step,
Query: args.Promql,
OrgID: args.OrgID,
BlockTeamID: args.BlockTeamID,
ExtraFilters: args.ExtraFilters,
}

var cached promql.Result
Expand Down Expand Up @@ -398,6 +402,7 @@ func (p *prometheusExecutor) offloadInstantQueryExecute(ctx context.Context, arg
slimit: args.Slimit,
orgID: args.OrgID,
blockTeamID: args.BlockTeamID,
extraFilters: args.ExtraFilters,
getExternalTagFromCache: p.convertExternalTagToQuerierAllowTag,
addExternalTagToCache: p.addExtraLabelsToCache,
}
Expand All @@ -415,13 +420,14 @@ func (p *prometheusExecutor) offloadInstantQueryExecute(ctx context.Context, arg
}

promRequest := &model.DeepFlowPromRequest{
Slimit: args.Slimit,
Start: minStart,
End: maxEnd,
Step: 1 * time.Second,
Query: args.Promql,
OrgID: args.OrgID,
BlockTeamID: args.BlockTeamID,
Slimit: args.Slimit,
Start: minStart,
End: maxEnd,
Step: 1 * time.Second,
Query: args.Promql,
OrgID: args.OrgID,
BlockTeamID: args.BlockTeamID,
ExtraFilters: args.ExtraFilters,
}

var cached promql.Result
Expand Down Expand Up @@ -690,6 +696,7 @@ func (p *prometheusExecutor) series(ctx context.Context, args *model.PromQueryPa
slimit: config.Cfg.Prometheus.SeriesLimit,
orgID: args.OrgID,
blockTeamID: args.BlockTeamID,
extraFilters: args.ExtraFilters,
getExternalTagFromCache: p.convertExternalTagToQuerierAllowTag,
addExternalTagToCache: p.addExtraLabelsToCache,
}
Expand Down
9 changes: 5 additions & 4 deletions server/querier/app/prometheus/service/remote_read.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import (
type prometheusReader struct {
slimit int
orgID string
extraFilters string
blockTeamID []string
interceptPrometheusExpr func(func(e *parser.AggregateExpr) error) error
getExternalTagFromCache func(string, string) string
Expand All @@ -62,7 +63,7 @@ func (p *prometheusReader) promReaderExecute(ctx context.Context, req *prompb.Re
// when error occurs, means query not finished yet, remove the first query placeholder
// if error is nil, means query finished, don't clean key
if err != nil || response == nil {
cache.PromReadResponseCache().Remove(r, cacheOrgFilterKey)
cache.PromReadResponseCache().Remove(r, cacheOrgFilterKey, p.extraFilters)
}
}(req)

Expand All @@ -72,7 +73,7 @@ func (p *prometheusReader) promReaderExecute(ctx context.Context, req *prompb.Re
if cacheAvailable {
var hit cache.CacheHit
var cacheItem *cache.CacheItem
cacheItem, hit, start, end = cache.PromReadResponseCache().Get(req.Queries[0], start, end, cacheOrgFilterKey)
cacheItem, hit, start, end = cache.PromReadResponseCache().Get(req.Queries[0], start, end, cacheOrgFilterKey, p.extraFilters)
if cacheItem != nil {
response = cacheItem.Data()
}
Expand All @@ -86,7 +87,7 @@ func (p *prometheusReader) promReaderExecute(ctx context.Context, req *prompb.Re
log.Infof("req [%s:%d-%d] wait 10 seconds to get cache result", metricName, start, end)
return response, "", "", 0, errors.New("query timeout, retry to get response! ")
case <-loadCompleted:
cacheItem, hit, start, end = cache.PromReadResponseCache().Get(req.Queries[0], start, end, cacheOrgFilterKey)
cacheItem, hit, start, end = cache.PromReadResponseCache().Get(req.Queries[0], start, end, cacheOrgFilterKey, p.extraFilters)
if cacheItem != nil {
response = cacheItem.Data()
}
Expand Down Expand Up @@ -182,7 +183,7 @@ func (p *prometheusReader) promReaderExecute(ctx context.Context, req *prompb.Re

if cacheAvailable {
// merge result into cache
response = cache.PromReadResponseCache().AddOrMerge(req, resp, cacheOrgFilterKey)
response = cache.PromReadResponseCache().AddOrMerge(req, resp, cacheOrgFilterKey, p.extraFilters)
} else {
// not using cache, query result would be real result
response = resp
Expand Down
Loading