diff --git a/server/querier/app/prometheus/model/prometheus.go b/server/querier/app/prometheus/model/prometheus.go index e23e623400a..d26e91a5e7b 100644 --- a/server/querier/app/prometheus/model/prometheus.go +++ b/server/querier/app/prometheus/model/prometheus.go @@ -28,6 +28,7 @@ type PromQueryParams struct { StartTime string EndTime string Step string + Debug bool Matchers []string Context context.Context } @@ -38,10 +39,11 @@ type PromQueryData struct { } type PromQueryResponse struct { - Status string `json:"status"` - Data interface{} `json:"data,omitempty"` - ErrorType errorType `json:"errorType,omitempty"` - Error string `json:"error,omitempty"` + Status string `json:"status"` + Data interface{} `json:"data,omitempty"` + ErrorType errorType `json:"errorType,omitempty"` + Error string `json:"error,omitempty"` + Stats PromQueryStats `json:"stats,omitempty"` } type PromMetaParams struct { @@ -51,4 +53,9 @@ type PromMetaParams struct { Context context.Context } +type PromQueryStats struct { + SQL []string `json:"sql,omitempty"` + QueryTime []float64 `json:"query_time,omitempty"` +} + type errorType string diff --git a/server/querier/app/prometheus/router/prometheus.go b/server/querier/app/prometheus/router/prometheus.go index 5d15f1cac6c..c3a1e0f30b3 100644 --- a/server/querier/app/prometheus/router/prometheus.go +++ b/server/querier/app/prometheus/router/prometheus.go @@ -19,6 +19,7 @@ package router import ( "context" "io/ioutil" + "strconv" "strings" "github.com/gin-gonic/gin" @@ -42,6 +43,8 @@ func promQuery(svc *service.PrometheusService) gin.HandlerFunc { // ref: https://github.com/prometheus/prometheus/blob/main/prompb/types.proto#L157 args.StartTime = c.Request.FormValue("time") args.EndTime = c.Request.FormValue("time") + debug := c.Request.FormValue("debug") + args.Debug, _ = strconv.ParseBool(debug) result, err := svc.PromInstantQueryService(&args, c.Request.Context()) if err != nil { c.JSON(500, &model.PromQueryResponse{Error: err.Error(), Status: _STATUS_FAIL}) @@ -59,8 +62,8 @@ func promQueryRange(svc *service.PrometheusService) gin.HandlerFunc { args.StartTime = c.Request.FormValue("start") args.EndTime = c.Request.FormValue("end") args.Step = c.Request.FormValue("step") - //pp.Println(c.Request.Header.Get("Accept")) - //pp.Println(args.Promql) + debug := c.Request.FormValue("debug") + args.Debug, _ = strconv.ParseBool(debug) result, err := svc.PromRangeQueryService(&args, c.Request.Context()) if err != nil { diff --git a/server/querier/app/prometheus/service/converters_test.go b/server/querier/app/prometheus/service/converters_test.go index ece4b7b111f..e336a975de6 100644 --- a/server/querier/app/prometheus/service/converters_test.go +++ b/server/querier/app/prometheus/service/converters_test.go @@ -26,6 +26,7 @@ import ( . "github.com/smartystreets/goconvey/convey" "github.com/deepflowio/deepflow/server/libs/datastructure" + cfg "github.com/deepflowio/deepflow/server/querier/app/prometheus/config" "github.com/deepflowio/deepflow/server/querier/config" ) @@ -63,11 +64,11 @@ func TestMain(m *testing.M) { // init runtime objects for tests QPSLeakyBucket = new(datastructure.LeakyBucket) QPSLeakyBucket.Init(1e9) - - config.Cfg = &config.QuerierConfig{Limit: "10000"} + config.Cfg = &config.QuerierConfig{Limit: "10000", Prometheus: cfg.Prometheus{AutoTaggingPrefix: "df_"}} // run for test m.Run() + QPSLeakyBucket.Close() } func TestParseMetric(t *testing.T) { diff --git a/server/querier/app/prometheus/service/promql.go b/server/querier/app/prometheus/service/promql.go index 8c00b8ad568..69d512c47bb 100644 --- a/server/querier/app/prometheus/service/promql.go +++ b/server/querier/app/prometheus/service/promql.go @@ -96,7 +96,8 @@ func (p *prometheusExecutor) promQueryExecute(ctx context.Context, args *model.P // instant query will hint default query range: // query.lookback-delta: https://github.com/prometheus/prometheus/blob/main/cmd/prometheus/main.go#L398 - qry, err := engine.NewInstantQuery(&RemoteReadQuerierable{Args: args, Ctx: ctx, MatchMetricNameFunc: p.matchMetricName}, nil, args.Promql, queryTime) + queriable := &RemoteReadQuerierable{Args: args, Ctx: ctx, MatchMetricNameFunc: p.matchMetricName} + qry, err := engine.NewInstantQuery(queriable, nil, args.Promql, queryTime) if qry == nil || err != nil { log.Error(err) return nil, err @@ -106,10 +107,14 @@ func (p *prometheusExecutor) promQueryExecute(ctx context.Context, args *model.P log.Error(res.Err) return nil, res.Err } - return &model.PromQueryResponse{ + result = &model.PromQueryResponse{ Data: &model.PromQueryData{ResultType: res.Value.Type(), Result: res.Value}, Status: _SUCCESS, - }, err + } + if args.Debug { + result.Stats = model.PromQueryStats{SQL: queriable.sql, QueryTime: queriable.query_time} + } + return result, err } func (p *prometheusExecutor) promQueryRangeExecute(ctx context.Context, args *model.PromQueryParams, engine *promql.Engine) (result *model.PromQueryResponse, err error) { @@ -142,7 +147,8 @@ func (p *prometheusExecutor) promQueryRangeExecute(ctx context.Context, args *mo defer span.End() } - qry, err := engine.NewRangeQuery(&RemoteReadQuerierable{Args: args, Ctx: ctx, MatchMetricNameFunc: p.matchMetricName}, nil, args.Promql, start, end, step) + queriable := &RemoteReadQuerierable{Args: args, Ctx: ctx, MatchMetricNameFunc: p.matchMetricName} + qry, err := engine.NewRangeQuery(queriable, nil, args.Promql, start, end, step) if qry == nil || err != nil { log.Error(err) return nil, err @@ -152,15 +158,20 @@ func (p *prometheusExecutor) promQueryRangeExecute(ctx context.Context, args *mo log.Error(res.Err) return nil, res.Err } - return &model.PromQueryResponse{ + result = &model.PromQueryResponse{ Data: &model.PromQueryData{ResultType: res.Value.Type(), Result: res.Value}, Status: _SUCCESS, - }, err + } + if args.Debug { + // if query with `debug` parmas, return sql & query time + result.Stats = model.PromQueryStats{SQL: queriable.sql, QueryTime: queriable.query_time} + } + return result, err } func (p *prometheusExecutor) promRemoteReadExecute(ctx context.Context, req *prompb.ReadRequest) (resp *prompb.ReadResponse, err error) { // analysis for ReadRequest - result, err := promReaderExecute(ctx, req) + result, _, _, err := promReaderExecute(ctx, req, false) return result, err } diff --git a/server/querier/app/prometheus/service/queryable.go b/server/querier/app/prometheus/service/queryable.go index 3b60b3ef847..d147326c533 100644 --- a/server/querier/app/prometheus/service/queryable.go +++ b/server/querier/app/prometheus/service/queryable.go @@ -36,6 +36,8 @@ type RemoteReadQuerierable struct { Args *model.PromQueryParams Ctx context.Context MatchMetricNameFunc func(*[]*labels.Matcher) string + sql []string + query_time []float64 } func (q *RemoteReadQuerierable) Querier(ctx context.Context, mint, maxt int64) (storage.Querier, error) { @@ -81,11 +83,21 @@ func (q *RemoteReadQuerier) Select(sortSeries bool, hints *storage.SelectHints, Queries: []*prompb.Query{prompbQuery}, AcceptedResponseTypes: []prompb.ReadRequest_ResponseType{prompb.ReadRequest_STREAMED_XOR_CHUNKS}, } - resp, err := promReaderExecute(q.Ctx, req) + resp, sql, query_time, err := promReaderExecute(q.Ctx, req, q.Args.Debug) if err != nil { log.Error(err) return storage.ErrSeriesSet(err) } + if q.Args.Debug { + if q.Querierable.sql == nil { + q.Querierable.sql = make([]string, 0) + } + if q.Querierable.query_time == nil { + q.Querierable.query_time = make([]float64, 0) + } + q.Querierable.sql = append(q.Querierable.sql, sql) + q.Querierable.query_time = append(q.Querierable.query_time, query_time) + } return remote.FromQueryResult(sortSeries, resp.Results[0]) } diff --git a/server/querier/app/prometheus/service/remote_read.go b/server/querier/app/prometheus/service/remote_read.go index 5feee2d55af..3ef0dddaf2a 100644 --- a/server/querier/app/prometheus/service/remote_read.go +++ b/server/querier/app/prometheus/service/remote_read.go @@ -32,23 +32,28 @@ import ( "github.com/deepflowio/deepflow/server/querier/engine/clickhouse" ) -func promReaderExecute(ctx context.Context, req *prompb.ReadRequest) (resp *prompb.ReadResponse, err error) { +func promReaderExecute(ctx context.Context, req *prompb.ReadRequest, debug bool) (resp *prompb.ReadResponse, sql string, duration float64, err error) { // promrequest trans to sql // pp.Println(req) ctx, sql, db, datasource, err := promReaderTransToSQL(ctx, req) // fmt.Println(sql, db) if err != nil { - return nil, err + return nil, "", 0, err } if db == "" { db = "prometheus" } query_uuid := uuid.New() + // mark query comes from promql + //lint:ignore SA1029 use string as context key, ensure no `type` reference to app/prometheus + ctx = context.WithValue(ctx, "remote_read", true) + // if `api` pass `debug` or config debug, get debug info from querier + debugQuerier := debug || config.Cfg.Prometheus.RequestQueryWithDebug args := common.QuerierParams{ DB: db, Sql: sql, DataSource: datasource, - Debug: strconv.FormatBool(config.Cfg.Prometheus.RequestQueryWithDebug), + Debug: strconv.FormatBool(debugQuerier), QueryUUID: query_uuid.String(), Context: ctx, } @@ -68,17 +73,20 @@ func promReaderExecute(ctx context.Context, req *prompb.ReadRequest) (resp *prom ckEngine := &clickhouse.CHEngine{DB: args.DB, DataSource: args.DataSource} ckEngine.Init() - result, debug, err := ckEngine.ExecuteQuery(&args) + result, debugInfo, err := ckEngine.ExecuteQuery(&args) if err != nil { - // TODO - log.Errorf("ExecuteQuery failed, debug info = %v, err info = %v", debug, err) - return nil, err + log.Errorf("ExecuteQuery failed, debug info = %v, err info = %v", debugInfo, err) + return nil, "", 0, err + } + + if debugQuerier { + duration = extractQueryTimeFromQueryResponse(debugInfo) + sql = extractQuerySQLFromQueryResponse(debugInfo) } if config.Cfg.Prometheus.RequestQueryWithDebug { // inject query_time for current span - query_time := extractDebugInfoFromQueryResponse(debug) - span.SetAttributes(attribute.Float64("query_time", query_time)) + span.SetAttributes(attribute.Float64("query_time", duration)) // inject labels for parent span targetLabels := make([]string, 0, len(result.Schemas)) @@ -103,14 +111,14 @@ func promReaderExecute(ctx context.Context, req *prompb.ReadRequest) (resp *prom resp, err = respTransToProm(ctx, result) if err != nil { log.Error(err) - return nil, err + return nil, "", 0, err } // pp.Println(resp) - return resp, nil + return resp, sql, duration, nil } // extract query_time from query debug(map[string]interface{}) infos -func extractDebugInfoFromQueryResponse(debug map[string]interface{}) float64 { +func extractQueryTimeFromQueryResponse(debug map[string]interface{}) float64 { if debug["query_time"] != nil { query_time_str := strings.ReplaceAll(debug["query_time"].(string), "s", "") query_time, _ := strconv.ParseFloat(query_time_str, 64) @@ -119,6 +127,13 @@ func extractDebugInfoFromQueryResponse(debug map[string]interface{}) float64 { return 0 } +func extractQuerySQLFromQueryResponse(debug map[string]interface{}) string { + if debug["sql"] != nil { + return debug["sql"].(string) + } + return "" +} + func queryDataExecute(ctx context.Context, sql string, db string, ds string) (*common.Result, error) { query_uuid := uuid.New() args := common.QuerierParams{