From 2a3e6a34c66c5535cfc710f5356488cf15a8bcdc Mon Sep 17 00:00:00 2001 From: taloric Date: Thu, 10 Oct 2024 15:45:07 +0800 Subject: [PATCH] feat: support extra filter for promql query --- .../app/prometheus/cache/keyGenerator.go | 3 +- .../app/prometheus/cache/remoteread_cache.go | 12 +++--- server/querier/app/prometheus/cache/utils.go | 5 ++- .../app/prometheus/model/prometheus.go | 40 ++++++++++--------- .../app/prometheus/router/prometheus.go | 2 + .../app/prometheus/service/converters.go | 13 +++++- .../querier/app/prometheus/service/promql.go | 35 +++++++++------- .../app/prometheus/service/remote_read.go | 9 +++-- 8 files changed, 72 insertions(+), 47 deletions(-) diff --git a/server/querier/app/prometheus/cache/keyGenerator.go b/server/querier/app/prometheus/cache/keyGenerator.go index afd66b5f3bf..e00c09459d8 100644 --- a/server/querier/app/prometheus/cache/keyGenerator.go +++ b/server/querier/app/prometheus/cache/keyGenerator.go @@ -53,9 +53,10 @@ type CacheKeyGenerator struct { // generate key without query time (start/end) for cache query func (k *CacheKeyGenerator) GenerateCacheKey(req *model.DeepFlowPromRequest) string { return fmt.Sprintf( - "df:%s:%s:%s:%d:%d:%d:%s", + "df:%s:%s:%s:%s:%d:%d:%d:%s", req.OrgID, strings.Join(req.BlockTeamID, "-"), + req.ExtraFilters, req.Query, req.Step, req.Start%int64(req.Step.Seconds()), // real interval for data diff --git a/server/querier/app/prometheus/cache/remoteread_cache.go b/server/querier/app/prometheus/cache/remoteread_cache.go index 77cb2c0c0ed..46997fd48f6 100644 --- a/server/querier/app/prometheus/cache/remoteread_cache.go +++ b/server/querier/app/prometheus/cache/remoteread_cache.go @@ -308,7 +308,7 @@ func NewRemoteReadQueryCache() *RemoteReadQueryCache { return s } -func (s *RemoteReadQueryCache) AddOrMerge(req *prompb.ReadRequest, resp *prompb.ReadResponse, orgFilter string) *prompb.ReadResponse { +func (s *RemoteReadQueryCache) AddOrMerge(req *prompb.ReadRequest, resp *prompb.ReadResponse, orgFilter string, extraFilters string) *prompb.ReadResponse { if req == nil || len(req.Queries) == 0 { return resp } @@ -320,7 +320,7 @@ func (s *RemoteReadQueryCache) AddOrMerge(req *prompb.ReadRequest, resp *prompb. return resp } - key := promRequestToCacheKey(q, orgFilter) + key := promRequestToCacheKey(q, orgFilter, extraFilters) start, end := GetPromRequestQueryTime(q) start = timeAlign(start) @@ -398,11 +398,11 @@ func copyResponse(cached *prompb.ReadResponse) *prompb.ReadResponse { return resp } -func (s *RemoteReadQueryCache) Remove(req *prompb.ReadRequest, orgFilter string) { +func (s *RemoteReadQueryCache) Remove(req *prompb.ReadRequest, orgFilter string, extraFilter string) { if req == nil || len(req.Queries) == 0 { return } - key := promRequestToCacheKey(req.Queries[0], orgFilter) + key := promRequestToCacheKey(req.Queries[0], orgFilter, extraFilter) s.lock.Lock() defer s.lock.Unlock() @@ -412,7 +412,7 @@ func (s *RemoteReadQueryCache) Remove(req *prompb.ReadRequest, orgFilter string) } } -func (s *RemoteReadQueryCache) Get(req *prompb.Query, start int64, end int64, orgFilter string) (*CacheItem, CacheHit, int64, int64) { +func (s *RemoteReadQueryCache) Get(req *prompb.Query, start int64, end int64, orgFilter string, extraFilters string) (*CacheItem, CacheHit, int64, int64) { if req.Hints.Func == "series" { // for series api, don't use cache // not count cache miss here @@ -420,7 +420,7 @@ func (s *RemoteReadQueryCache) Get(req *prompb.Query, start int64, end int64, or } // for query api, cache query samples - key := promRequestToCacheKey(req, orgFilter) + key := promRequestToCacheKey(req, orgFilter, extraFilters) start = timeAlign(start) // lock for concurrency key reading diff --git a/server/querier/app/prometheus/cache/utils.go b/server/querier/app/prometheus/cache/utils.go index 9d793f56c40..ab2998ff6ae 100644 --- a/server/querier/app/prometheus/cache/utils.go +++ b/server/querier/app/prometheus/cache/utils.go @@ -55,11 +55,14 @@ func GetPromRequestQueryTime(q *prompb.Query) (int64, int64) { return q.Hints.StartMs / 1000, endTime } -func promRequestToCacheKey(q *prompb.Query, orgFilter string) string { +func promRequestToCacheKey(q *prompb.Query, orgFilter string, extraFilters string) string { matcher := &strings.Builder{} if len(orgFilter) > 0 { matcher.WriteString(orgFilter + "-") } + if len(extraFilters) > 0 { + matcher.WriteString(extraFilters + "-") + } for i := 0; i < len(q.Matchers); i++ { matcher.WriteString(q.Matchers[i].GetName() + q.Matchers[i].Type.String() + q.Matchers[i].GetValue()) matcher.WriteByte('-') diff --git a/server/querier/app/prometheus/model/prometheus.go b/server/querier/app/prometheus/model/prometheus.go index c48df849577..2d86c9e405d 100644 --- a/server/querier/app/prometheus/model/prometheus.go +++ b/server/querier/app/prometheus/model/prometheus.go @@ -24,17 +24,18 @@ import ( ) type PromQueryParams struct { - Debug bool - Offloading bool - Slimit int - Promql string - StartTime string - EndTime string - Step string - OrgID string - Matchers []string - BlockTeamID []string - Context context.Context + Debug bool + Offloading bool + Slimit int + Promql string + StartTime string + EndTime string + Step string + OrgID string + ExtraFilters string + Matchers []string + BlockTeamID []string + Context context.Context } type PromQueryData struct { @@ -81,12 +82,13 @@ type WrapHistorySeries struct { } type DeepFlowPromRequest struct { - Slimit int - Start int64 - End int64 - Step time.Duration - Query string - OrgID string - BlockTeamID []string - Matchers []string + Slimit int + Start int64 + End int64 + Step time.Duration + Query string + OrgID string + ExtraFilters string + BlockTeamID []string + Matchers []string } diff --git a/server/querier/app/prometheus/router/prometheus.go b/server/querier/app/prometheus/router/prometheus.go index aa7c53fc9ae..5875979fdf1 100644 --- a/server/querier/app/prometheus/router/prometheus.go +++ b/server/querier/app/prometheus/router/prometheus.go @@ -54,6 +54,7 @@ func promQuery(svc *service.PrometheusService) gin.HandlerFunc { debug := c.Request.FormValue("debug") block_team_id := c.Request.FormValue("block-team-id") // when parsed, block all team in query offloading := c.Request.FormValue("operator-offloading") + args.ExtraFilters = c.Request.FormValue("extra-filters") setRouterArgs(slimit, &args.Slimit, config.Cfg.Prometheus.SeriesLimit, strconv.Atoi) setRouterArgs(debug, &args.Debug, config.Cfg.Prometheus.RequestQueryWithDebug, strconv.ParseBool) setRouterArgs(offloading, &args.Offloading, config.Cfg.Prometheus.OperatorOffloading, strconv.ParseBool) @@ -185,6 +186,7 @@ func promSeriesReader(svc *service.PrometheusService) gin.HandlerFunc { debug := c.Request.FormValue("debug") block_team_id := c.Request.FormValue("block-team-id") offloading := c.Request.FormValue("operator-offloading") + args.ExtraFilters = c.Request.FormValue("extra-filters") setRouterArgs(debug, &args.Debug, config.Cfg.Prometheus.RequestQueryWithDebug, strconv.ParseBool) setRouterArgs(offloading, &args.Offloading, config.Cfg.Prometheus.OperatorOffloading, strconv.ParseBool) err := setRouterArgs(block_team_id, &args.BlockTeamID, nil, splitStrings) diff --git a/server/querier/app/prometheus/service/converters.go b/server/querier/app/prometheus/service/converters.go index fb8a01ce47d..814bbb77db5 100644 --- a/server/querier/app/prometheus/service/converters.go +++ b/server/querier/app/prometheus/service/converters.go @@ -362,6 +362,9 @@ func (p *prometheusReader) promReaderTransToSQL(ctx context.Context, req *prompb if len(p.blockTeamID) > 0 { filters = append(filters, fmt.Sprintf("team_id not in (%s)", strings.Join(p.blockTeamID, ","))) } + if len(p.extraFilters) > 0 { + filters = append(filters, fmt.Sprintf("(%s)", p.extraFilters)) + } sql := parseToQuerierSQL(ctx, db, table, metricsArray, filters, groupBy, orderBy) return ctx, sql, db, dataPrecision, queryMetric, err @@ -839,7 +842,7 @@ func (p *prometheusReader) respTransToProm(ctx context.Context, metricsName stri continue } - if metricsType != "Int" && metricsType != "Float64" { + if metricsType != "Int" && metricsType != "Float64" && metricsType != "UInt64" { return nil, fmt.Errorf("unknown metrics type %s, value = %v ", metricsType, values[columnIndexes[METRICS_INDEX]]) } @@ -917,7 +920,7 @@ func (p *prometheusReader) respTransToProm(ctx context.Context, metricsName stri return resp, nil } -func convertTo[T int | float64](val interface{}) (v T, b bool) { +func convertTo[T int | float64 | uint64](val interface{}) (v T, b bool) { v, b = val.(T) return } @@ -935,6 +938,9 @@ func parseValue(valueType string, val interface{}) (v float64, b bool) { return float64(metricsValueInt), ok } return metricsValueFloat, ok + case "UInt64": + metricsValueUint, ok := convertTo[uint64](val) + return float64(metricsValueUint), ok default: return 0, false } @@ -1094,6 +1100,9 @@ func (p *prometheusReader) parseQueryRequestToSQL(ctx context.Context, queryReq if len(p.blockTeamID) > 0 { filters = append(filters, fmt.Sprintf("team_id not in (%s)", strings.Join(p.blockTeamID, ","))) } + if len(p.extraFilters) > 0 { + filters = append(filters, fmt.Sprintf("(%s)", p.extraFilters)) + } sql := parseToQuerierSQL(ctx, chCommon.DB_NAME_PROMETHEUS, queryReq.GetMetric(), selection, filters, groupBy, orderBy) return sql } diff --git a/server/querier/app/prometheus/service/promql.go b/server/querier/app/prometheus/service/promql.go index f5a11dcc781..908e549028e 100644 --- a/server/querier/app/prometheus/service/promql.go +++ b/server/querier/app/prometheus/service/promql.go @@ -138,6 +138,7 @@ func (p *prometheusExecutor) promQueryExecute(ctx context.Context, args *model.P orgID: args.OrgID, slimit: args.Slimit, blockTeamID: args.BlockTeamID, + extraFilters: args.ExtraFilters, getExternalTagFromCache: p.convertExternalTagToQuerierAllowTag, addExternalTagToCache: p.addExtraLabelsToCache, } @@ -201,6 +202,7 @@ func (p *prometheusExecutor) promQueryRangeExecute(ctx context.Context, args *mo slimit: args.Slimit, orgID: args.OrgID, blockTeamID: args.BlockTeamID, + extraFilters: args.ExtraFilters, getExternalTagFromCache: p.convertExternalTagToQuerierAllowTag, addExternalTagToCache: p.addExtraLabelsToCache, } @@ -266,18 +268,20 @@ func (p *prometheusExecutor) offloadRangeQueryExecute(ctx context.Context, args slimit: args.Slimit, orgID: args.OrgID, blockTeamID: args.BlockTeamID, + extraFilters: args.ExtraFilters, getExternalTagFromCache: p.convertExternalTagToQuerierAllowTag, addExternalTagToCache: p.addExtraLabelsToCache, } queryRequests := analyzer.parsePromQL(args.Promql, start, end, step) promRequest := &model.DeepFlowPromRequest{ - Slimit: args.Slimit, - Start: start.UnixMilli(), - End: end.UnixMilli(), - Step: step, - Query: args.Promql, - OrgID: args.OrgID, - BlockTeamID: args.BlockTeamID, + Slimit: args.Slimit, + Start: start.UnixMilli(), + End: end.UnixMilli(), + Step: step, + Query: args.Promql, + OrgID: args.OrgID, + BlockTeamID: args.BlockTeamID, + ExtraFilters: args.ExtraFilters, } var cached promql.Result @@ -398,6 +402,7 @@ func (p *prometheusExecutor) offloadInstantQueryExecute(ctx context.Context, arg slimit: args.Slimit, orgID: args.OrgID, blockTeamID: args.BlockTeamID, + extraFilters: args.ExtraFilters, getExternalTagFromCache: p.convertExternalTagToQuerierAllowTag, addExternalTagToCache: p.addExtraLabelsToCache, } @@ -415,13 +420,14 @@ func (p *prometheusExecutor) offloadInstantQueryExecute(ctx context.Context, arg } promRequest := &model.DeepFlowPromRequest{ - Slimit: args.Slimit, - Start: minStart, - End: maxEnd, - Step: 1 * time.Second, - Query: args.Promql, - OrgID: args.OrgID, - BlockTeamID: args.BlockTeamID, + Slimit: args.Slimit, + Start: minStart, + End: maxEnd, + Step: 1 * time.Second, + Query: args.Promql, + OrgID: args.OrgID, + BlockTeamID: args.BlockTeamID, + ExtraFilters: args.ExtraFilters, } var cached promql.Result @@ -690,6 +696,7 @@ func (p *prometheusExecutor) series(ctx context.Context, args *model.PromQueryPa slimit: config.Cfg.Prometheus.SeriesLimit, orgID: args.OrgID, blockTeamID: args.BlockTeamID, + extraFilters: args.ExtraFilters, getExternalTagFromCache: p.convertExternalTagToQuerierAllowTag, addExternalTagToCache: p.addExtraLabelsToCache, } diff --git a/server/querier/app/prometheus/service/remote_read.go b/server/querier/app/prometheus/service/remote_read.go index 523cc103080..c90aed27d43 100644 --- a/server/querier/app/prometheus/service/remote_read.go +++ b/server/querier/app/prometheus/service/remote_read.go @@ -41,6 +41,7 @@ import ( type prometheusReader struct { slimit int orgID string + extraFilters string blockTeamID []string interceptPrometheusExpr func(func(e *parser.AggregateExpr) error) error getExternalTagFromCache func(string, string) string @@ -62,7 +63,7 @@ func (p *prometheusReader) promReaderExecute(ctx context.Context, req *prompb.Re // when error occurs, means query not finished yet, remove the first query placeholder // if error is nil, means query finished, don't clean key if err != nil || response == nil { - cache.PromReadResponseCache().Remove(r, cacheOrgFilterKey) + cache.PromReadResponseCache().Remove(r, cacheOrgFilterKey, p.extraFilters) } }(req) @@ -72,7 +73,7 @@ func (p *prometheusReader) promReaderExecute(ctx context.Context, req *prompb.Re if cacheAvailable { var hit cache.CacheHit var cacheItem *cache.CacheItem - cacheItem, hit, start, end = cache.PromReadResponseCache().Get(req.Queries[0], start, end, cacheOrgFilterKey) + cacheItem, hit, start, end = cache.PromReadResponseCache().Get(req.Queries[0], start, end, cacheOrgFilterKey, p.extraFilters) if cacheItem != nil { response = cacheItem.Data() } @@ -86,7 +87,7 @@ func (p *prometheusReader) promReaderExecute(ctx context.Context, req *prompb.Re log.Infof("req [%s:%d-%d] wait 10 seconds to get cache result", metricName, start, end) return response, "", "", 0, errors.New("query timeout, retry to get response! ") case <-loadCompleted: - cacheItem, hit, start, end = cache.PromReadResponseCache().Get(req.Queries[0], start, end, cacheOrgFilterKey) + cacheItem, hit, start, end = cache.PromReadResponseCache().Get(req.Queries[0], start, end, cacheOrgFilterKey, p.extraFilters) if cacheItem != nil { response = cacheItem.Data() } @@ -182,7 +183,7 @@ func (p *prometheusReader) promReaderExecute(ctx context.Context, req *prompb.Re if cacheAvailable { // merge result into cache - response = cache.PromReadResponseCache().AddOrMerge(req, resp, cacheOrgFilterKey) + response = cache.PromReadResponseCache().AddOrMerge(req, resp, cacheOrgFilterKey, p.extraFilters) } else { // not using cache, query result would be real result response = resp