Skip to content

Commit

Permalink
[querier] add more debuginfo for promql query
Browse files Browse the repository at this point in the history
  • Loading branch information
taloric committed Jul 1, 2023
1 parent 1da95e4 commit a2db320
Show file tree
Hide file tree
Showing 6 changed files with 77 additions and 28 deletions.
15 changes: 11 additions & 4 deletions server/querier/app/prometheus/model/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ type PromQueryParams struct {
StartTime string
EndTime string
Step string
Debug bool
Matchers []string
Context context.Context
}
Expand All @@ -38,10 +39,11 @@ type PromQueryData struct {
}

type PromQueryResponse struct {
Status string `json:"status"`
Data interface{} `json:"data,omitempty"`
ErrorType errorType `json:"errorType,omitempty"`
Error string `json:"error,omitempty"`
Status string `json:"status"`
Data interface{} `json:"data,omitempty"`
ErrorType errorType `json:"errorType,omitempty"`
Error string `json:"error,omitempty"`
Stats PromQueryStats `json:"stats,omitempty"`
}

type PromMetaParams struct {
Expand All @@ -51,4 +53,9 @@ type PromMetaParams struct {
Context context.Context
}

type PromQueryStats struct {
SQL []string `json:"sql,omitempty"`
QueryTime []float64 `json:"query_time,omitempty"`
}

type errorType string
7 changes: 5 additions & 2 deletions server/querier/app/prometheus/router/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package router
import (
"context"
"io/ioutil"
"strconv"
"strings"

"github.com/gin-gonic/gin"
Expand All @@ -42,6 +43,8 @@ func promQuery(svc *service.PrometheusService) gin.HandlerFunc {
// ref: https://github.com/prometheus/prometheus/blob/main/prompb/types.proto#L157
args.StartTime = c.Request.FormValue("time")
args.EndTime = c.Request.FormValue("time")
debug := c.Request.FormValue("debug")
args.Debug, _ = strconv.ParseBool(debug)
result, err := svc.PromInstantQueryService(&args, c.Request.Context())
if err != nil {
c.JSON(500, &model.PromQueryResponse{Error: err.Error(), Status: _STATUS_FAIL})
Expand All @@ -59,8 +62,8 @@ func promQueryRange(svc *service.PrometheusService) gin.HandlerFunc {
args.StartTime = c.Request.FormValue("start")
args.EndTime = c.Request.FormValue("end")
args.Step = c.Request.FormValue("step")
//pp.Println(c.Request.Header.Get("Accept"))
//pp.Println(args.Promql)
debug := c.Request.FormValue("debug")
args.Debug, _ = strconv.ParseBool(debug)

result, err := svc.PromRangeQueryService(&args, c.Request.Context())
if err != nil {
Expand Down
5 changes: 3 additions & 2 deletions server/querier/app/prometheus/service/converters_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
. "github.com/smartystreets/goconvey/convey"

"github.com/deepflowio/deepflow/server/libs/datastructure"
cfg "github.com/deepflowio/deepflow/server/querier/app/prometheus/config"
"github.com/deepflowio/deepflow/server/querier/config"
)

Expand Down Expand Up @@ -63,11 +64,11 @@ func TestMain(m *testing.M) {
// init runtime objects for tests
QPSLeakyBucket = new(datastructure.LeakyBucket)
QPSLeakyBucket.Init(1e9)

config.Cfg = &config.QuerierConfig{Limit: "10000"}
config.Cfg = &config.QuerierConfig{Limit: "10000", Prometheus: cfg.Prometheus{AutoTaggingPrefix: "df_"}}

// run for test
m.Run()
QPSLeakyBucket.Close()
}

func TestParseMetric(t *testing.T) {
Expand Down
25 changes: 18 additions & 7 deletions server/querier/app/prometheus/service/promql.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,8 @@ func (p *prometheusExecutor) promQueryExecute(ctx context.Context, args *model.P

// instant query will hint default query range:
// query.lookback-delta: https://github.com/prometheus/prometheus/blob/main/cmd/prometheus/main.go#L398
qry, err := engine.NewInstantQuery(&RemoteReadQuerierable{Args: args, Ctx: ctx, MatchMetricNameFunc: p.matchMetricName}, nil, args.Promql, queryTime)
queriable := &RemoteReadQuerierable{Args: args, Ctx: ctx, MatchMetricNameFunc: p.matchMetricName}
qry, err := engine.NewInstantQuery(queriable, nil, args.Promql, queryTime)
if qry == nil || err != nil {
log.Error(err)
return nil, err
Expand All @@ -106,10 +107,14 @@ func (p *prometheusExecutor) promQueryExecute(ctx context.Context, args *model.P
log.Error(res.Err)
return nil, res.Err
}
return &model.PromQueryResponse{
result = &model.PromQueryResponse{
Data: &model.PromQueryData{ResultType: res.Value.Type(), Result: res.Value},
Status: _SUCCESS,
}, err
}
if args.Debug {
result.Stats = model.PromQueryStats{SQL: queriable.sql, QueryTime: queriable.query_time}
}
return result, err
}

func (p *prometheusExecutor) promQueryRangeExecute(ctx context.Context, args *model.PromQueryParams, engine *promql.Engine) (result *model.PromQueryResponse, err error) {
Expand Down Expand Up @@ -142,7 +147,8 @@ func (p *prometheusExecutor) promQueryRangeExecute(ctx context.Context, args *mo
defer span.End()
}

qry, err := engine.NewRangeQuery(&RemoteReadQuerierable{Args: args, Ctx: ctx, MatchMetricNameFunc: p.matchMetricName}, nil, args.Promql, start, end, step)
queriable := &RemoteReadQuerierable{Args: args, Ctx: ctx, MatchMetricNameFunc: p.matchMetricName}
qry, err := engine.NewRangeQuery(queriable, nil, args.Promql, start, end, step)
if qry == nil || err != nil {
log.Error(err)
return nil, err
Expand All @@ -152,15 +158,20 @@ func (p *prometheusExecutor) promQueryRangeExecute(ctx context.Context, args *mo
log.Error(res.Err)
return nil, res.Err
}
return &model.PromQueryResponse{
result = &model.PromQueryResponse{
Data: &model.PromQueryData{ResultType: res.Value.Type(), Result: res.Value},
Status: _SUCCESS,
}, err
}
if args.Debug {
// if query with `debug` parmas, return sql & query time
result.Stats = model.PromQueryStats{SQL: queriable.sql, QueryTime: queriable.query_time}
}
return result, err
}

func (p *prometheusExecutor) promRemoteReadExecute(ctx context.Context, req *prompb.ReadRequest) (resp *prompb.ReadResponse, err error) {
// analysis for ReadRequest
result, err := promReaderExecute(ctx, req)
result, _, _, err := promReaderExecute(ctx, req, false)
return result, err
}

Expand Down
14 changes: 13 additions & 1 deletion server/querier/app/prometheus/service/queryable.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ type RemoteReadQuerierable struct {
Args *model.PromQueryParams
Ctx context.Context
MatchMetricNameFunc func(*[]*labels.Matcher) string
sql []string
query_time []float64
}

func (q *RemoteReadQuerierable) Querier(ctx context.Context, mint, maxt int64) (storage.Querier, error) {
Expand Down Expand Up @@ -81,11 +83,21 @@ func (q *RemoteReadQuerier) Select(sortSeries bool, hints *storage.SelectHints,
Queries: []*prompb.Query{prompbQuery},
AcceptedResponseTypes: []prompb.ReadRequest_ResponseType{prompb.ReadRequest_STREAMED_XOR_CHUNKS},
}
resp, err := promReaderExecute(q.Ctx, req)
resp, sql, query_time, err := promReaderExecute(q.Ctx, req, q.Args.Debug)
if err != nil {
log.Error(err)
return storage.ErrSeriesSet(err)
}
if q.Args.Debug {
if q.Querierable.sql == nil {
q.Querierable.sql = make([]string, 0)
}
if q.Querierable.query_time == nil {
q.Querierable.query_time = make([]float64, 0)
}
q.Querierable.sql = append(q.Querierable.sql, sql)
q.Querierable.query_time = append(q.Querierable.query_time, query_time)
}
return remote.FromQueryResult(sortSeries, resp.Results[0])
}

Expand Down
39 changes: 27 additions & 12 deletions server/querier/app/prometheus/service/remote_read.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,23 +32,28 @@ import (
"github.com/deepflowio/deepflow/server/querier/engine/clickhouse"
)

func promReaderExecute(ctx context.Context, req *prompb.ReadRequest) (resp *prompb.ReadResponse, err error) {
func promReaderExecute(ctx context.Context, req *prompb.ReadRequest, debug bool) (resp *prompb.ReadResponse, sql string, duration float64, err error) {
// promrequest trans to sql
// pp.Println(req)
ctx, sql, db, datasource, err := promReaderTransToSQL(ctx, req)
// fmt.Println(sql, db)
if err != nil {
return nil, err
return nil, "", 0, err
}
if db == "" {
db = "prometheus"
}
query_uuid := uuid.New()
// mark query comes from promql
//lint:ignore SA1029 use string as context key, ensure no `type` reference to app/prometheus
ctx = context.WithValue(ctx, "remote_read", true)
// if `api` pass `debug` or config debug, get debug info from querier
debugQuerier := debug || config.Cfg.Prometheus.RequestQueryWithDebug
args := common.QuerierParams{
DB: db,
Sql: sql,
DataSource: datasource,
Debug: strconv.FormatBool(config.Cfg.Prometheus.RequestQueryWithDebug),
Debug: strconv.FormatBool(debugQuerier),
QueryUUID: query_uuid.String(),
Context: ctx,
}
Expand All @@ -68,17 +73,20 @@ func promReaderExecute(ctx context.Context, req *prompb.ReadRequest) (resp *prom

ckEngine := &clickhouse.CHEngine{DB: args.DB, DataSource: args.DataSource}
ckEngine.Init()
result, debug, err := ckEngine.ExecuteQuery(&args)
result, debugInfo, err := ckEngine.ExecuteQuery(&args)
if err != nil {
// TODO
log.Errorf("ExecuteQuery failed, debug info = %v, err info = %v", debug, err)
return nil, err
log.Errorf("ExecuteQuery failed, debug info = %v, err info = %v", debugInfo, err)
return nil, "", 0, err
}

if debugQuerier {
duration = extractQueryTimeFromQueryResponse(debugInfo)
sql = extractQuerySQLFromQueryResponse(debugInfo)
}

if config.Cfg.Prometheus.RequestQueryWithDebug {
// inject query_time for current span
query_time := extractDebugInfoFromQueryResponse(debug)
span.SetAttributes(attribute.Float64("query_time", query_time))
span.SetAttributes(attribute.Float64("query_time", duration))

// inject labels for parent span
targetLabels := make([]string, 0, len(result.Schemas))
Expand All @@ -103,14 +111,14 @@ func promReaderExecute(ctx context.Context, req *prompb.ReadRequest) (resp *prom
resp, err = respTransToProm(ctx, result)
if err != nil {
log.Error(err)
return nil, err
return nil, "", 0, err
}
// pp.Println(resp)
return resp, nil
return resp, sql, duration, nil
}

// extract query_time from query debug(map[string]interface{}) infos
func extractDebugInfoFromQueryResponse(debug map[string]interface{}) float64 {
func extractQueryTimeFromQueryResponse(debug map[string]interface{}) float64 {
if debug["query_time"] != nil {
query_time_str := strings.ReplaceAll(debug["query_time"].(string), "s", "")
query_time, _ := strconv.ParseFloat(query_time_str, 64)
Expand All @@ -119,6 +127,13 @@ func extractDebugInfoFromQueryResponse(debug map[string]interface{}) float64 {
return 0
}

func extractQuerySQLFromQueryResponse(debug map[string]interface{}) string {
if debug["sql"] != nil {
return debug["sql"].(string)
}
return ""
}

func queryDataExecute(ctx context.Context, sql string, db string, ds string) (*common.Result, error) {
query_uuid := uuid.New()
args := common.QuerierParams{
Expand Down

0 comments on commit a2db320

Please sign in to comment.