Skip to content

Commit

Permalink
topsql: add backend of group by table or db (#262)
Browse files Browse the repository at this point in the history
  • Loading branch information
nolouch authored Sep 23, 2024
1 parent f915f04 commit 78151ff
Show file tree
Hide file tree
Showing 6 changed files with 321 additions and 21 deletions.
90 changes: 90 additions & 0 deletions component/topsql/query/default_query.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,15 @@ var (
sumMapP = sumMapPool{}
)

const (
// AggLevelQuery is the query level aggregation
AggLevelQuery = "query"
// AggLevelTable is the table level aggregation
AggLevelTable = "table"
// AggLevelDB is the db level aggregation
AggLevelDB = "db"
)

type DefaultQuery struct {
vmselectHandler http.HandlerFunc
documentDB *genji.DB
Expand Down Expand Up @@ -69,6 +78,52 @@ func (dq *DefaultQuery) Records(name string, startSecs, endSecs, windowSecs, top
})
}

func (dq *DefaultQuery) SummaryBy(startSecs, endSecs, windowSecs, top int, instance, instanceType, aggBy string, fill *[]SummaryByItem) error {
if startSecs > endSecs {
return nil
}

// adjust start to make result align to end
alignStartSecs := endSecs - (endSecs-startSecs)/windowSecs*windowSecs

var recordsResponse recordsMetricRespV2
if err := dq.fetchRecordsFromTSDBBy(store.MetricNameCPUTime, alignStartSecs, endSecs, windowSecs, instance, instanceType, aggBy, top, &recordsResponse); err != nil {
return err
}
if len(recordsResponse.Data.Results) == 0 {
return nil
}

for _, result := range recordsResponse.Data.Results {
text := result.Metric[aggBy].(string)
sumItem := SummaryByItem{
Text: text,
}
if text == "other" {
sumItem.IsOther = true
}
valueSum := uint64(0)
for _, value := range result.Values {
if len(value) != 2 {
continue
}

ts := uint64(value[0].(float64))
v, err := strconv.ParseUint(value[1].(string), 10, 64)
if err != nil {
continue
}

valueSum += v
sumItem.TimestampSec = append(sumItem.TimestampSec, ts)
sumItem.CPUTimeMs = append(sumItem.CPUTimeMs, v)
}
sumItem.CPUTimeMsSum = valueSum
*fill = append(*fill, sumItem)
}
return nil
}

func (dq *DefaultQuery) Summary(startSecs, endSecs, windowSecs, top int, instance, instanceType string, fill *[]SummaryItem) error {
if startSecs > endSecs {
return nil
Expand Down Expand Up @@ -303,6 +358,41 @@ func (dq *DefaultQuery) fetchRecordsFromTSDB(name string, startSecs int, endSecs
return json.Unmarshal(respR.Body.Bytes(), metricResponse)
}

func (dq *DefaultQuery) fetchRecordsFromTSDBBy(name string, startSecs int, endSecs int, windowSecs int, instance, instanceType, by string, top int, metricResponse *recordsMetricRespV2) error {
if dq.vmselectHandler == nil {
return fmt.Errorf("empty query handler")
}

bufResp := bytesP.Get()
header := headerP.Get()

defer bytesP.Put(bufResp)
defer headerP.Put(header)

req, err := http.NewRequest("GET", "/api/v1/query_range", nil)
if err != nil {
return err
}
reqQuery := req.URL.Query()
reqQuery.Set("query", fmt.Sprintf("topk_avg(%d, sum(sum_over_time(%s{instance=\"%s\", instance_type=\"%s\"}[%d])) by (%s), \"%s=other\")", top, name, instance, instanceType, windowSecs, by, by))
reqQuery.Set("start", strconv.Itoa(startSecs))
reqQuery.Set("end", strconv.Itoa(endSecs))
reqQuery.Set("step", strconv.Itoa(windowSecs))
reqQuery.Set("nocache", "1")
req.URL.RawQuery = reqQuery.Encode()
req.Header.Set("Accept", "application/json")

respR := utils.NewRespWriter(bufResp, header)
dq.vmselectHandler(&respR, req)

if statusOK := respR.Code >= 200 && respR.Code < 300; !statusOK {
errStr := respR.Body.String()
log.Warn("failed to fetch timeseries db", zap.String("error", errStr))
return errors.New(errStr)
}
return json.Unmarshal(respR.Body.Bytes(), metricResponse)
}

func (dq *DefaultQuery) fetchInstancesFromTSDB(startSecs, endSecs int, fill *[]InstanceItem) error {
if dq.vmselectHandler == nil {
return fmt.Errorf("empty query handler")
Expand Down
23 changes: 23 additions & 0 deletions component/topsql/query/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,14 @@ type RecordPlanItem struct {
SQLDurationCount []uint64 `json:"sql_duration_count,omitempty"`
}

type SummaryByItem struct {
Text string `json:"text"`
TimestampSec []uint64 `json:"timestamp_sec"`
CPUTimeMs []uint64 `json:"cpu_time_ms,omitempty"`
CPUTimeMsSum uint64 `json:"cpu_time_ms_sum"`
IsOther bool `json:"is_other"`
}

type SummaryItem struct {
SQLDigest string `json:"sql_digest"`
SQLText string `json:"sql_text"`
Expand Down Expand Up @@ -76,6 +84,21 @@ type recordsMetricRespDataResultMetric struct {
PlanDigest string `json:"plan_digest"`
}

type recordsMetricRespV2 struct {
Status string `json:"status"`
Data recordsMetricRespDataV2 `json:"data"`
}

type recordsMetricRespDataV2 struct {
ResultType string `json:"resultType"`
Results []recordsMetricRespDataResultV2 `json:"result"`
}

type recordsMetricRespDataResultV2 struct {
Metric map[string]interface{} `json:"metric"`
Values []recordsMetricRespDataResultValue `json:"values"`
}

type recordsMetricRespDataResultValue = []interface{}

type instancesMetricResp struct {
Expand Down
1 change: 1 addition & 0 deletions component/topsql/query/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package query
type Query interface {
Records(name string, startSecs, endSecs, windowSecs, top int, instance, instanceType string, fill *[]RecordItem) error
Summary(startSecs, endSecs, windowSecs, top int, instance, instanceType string, fill *[]SummaryItem) error
SummaryBy(startSecs, endSecs, windowSecs, top int, instance, instanceType, by string, fill *[]SummaryByItem) error
Instances(startSecs, endSecs int, fill *[]InstanceItem) error
Close()
}
83 changes: 65 additions & 18 deletions component/topsql/service/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@ import (

var (
recordsP = recordsPool{}
summaryP = summaryPool{}
summaryBySqlP = summarySQLPool{}
summaryByItemP = summaryByItemPool{}
instanceItemsP = InstanceItemsPool{}

metricNames = []string{
Expand Down Expand Up @@ -79,34 +80,79 @@ func (s *Service) metricHandler(name string) gin.HandlerFunc {
}

func (s *Service) summaryHandler(c *gin.Context) {
start, end, windowSecs, top, instance, instanceType, err := parseAllParams(c)
start, end, windowSecs, top, instance, instanceType, groupBy, err := parseAllParams(c)
if err != nil {
c.JSON(http.StatusBadRequest, gin.H{
"status": "error",
"message": err.Error(),
})
return
}

items := summaryP.Get()
defer summaryP.Put(items)
err = s.query.Summary(start, end, windowSecs, top, instance, instanceType, items)
if err != nil {
c.JSON(http.StatusServiceUnavailable, gin.H{
"status": "error",
"message": err.Error(),
switch groupBy {
case query.AggLevelTable:
if instanceType == "tidb" {
c.JSON(http.StatusBadRequest, gin.H{
"status": "error",
"message": "table summary is not supported for tidb",
})
return
}
items := summaryByItemP.Get()
defer summaryByItemP.Put(items)
err = s.query.SummaryBy(start, end, windowSecs, top, instance, instanceType, query.AggLevelTable, items)
if err != nil {
c.JSON(http.StatusServiceUnavailable, gin.H{
"status": "error",
"message": err.Error(),
})
return
}
c.JSON(http.StatusOK, gin.H{
"status": "ok",
"data_by": items,
})
case query.AggLevelDB:
if instanceType == "tidb" {
c.JSON(http.StatusBadRequest, gin.H{
"status": "error",
"message": "db summary is not supported for tidb",
})
return
}
items := summaryByItemP.Get()
defer summaryByItemP.Put(items)
err = s.query.SummaryBy(start, end, windowSecs, top, instance, instanceType, query.AggLevelDB, items)
if err != nil {
c.JSON(http.StatusServiceUnavailable, gin.H{
"status": "error",
"message": err.Error(),
})
return
}
c.JSON(http.StatusOK, gin.H{
"status": "ok",
"data_by": items,
})
default:
items := summaryBySqlP.Get()
defer summaryBySqlP.Put(items)
err = s.query.Summary(start, end, windowSecs, top, instance, instanceType, items)
if err != nil {
c.JSON(http.StatusServiceUnavailable, gin.H{
"status": "error",
"message": err.Error(),
})
return
}
c.JSON(http.StatusOK, gin.H{
"status": "ok",
"data": items,
})
return
}

c.JSON(http.StatusOK, gin.H{
"status": "ok",
"data": items,
})
}

func (s *Service) queryMetric(c *gin.Context, name string) {
start, end, windowSecs, top, instance, instanceType, err := parseAllParams(c)
start, end, windowSecs, top, instance, instanceType, _, err := parseAllParams(c)
if err != nil {
c.JSON(http.StatusBadRequest, gin.H{
"status": "error",
Expand All @@ -132,7 +178,7 @@ func (s *Service) queryMetric(c *gin.Context, name string) {
})
}

func parseAllParams(c *gin.Context) (start, end, windowSecs, top int, instance, instanceType string, err error) {
func parseAllParams(c *gin.Context) (start, end, windowSecs, top int, instance, instanceType string, groupBy string, err error) {
instance = c.Query("instance")
if len(instance) == 0 {
err = errors.New("no instance")
Expand Down Expand Up @@ -174,6 +220,7 @@ func parseAllParams(c *gin.Context) (start, end, windowSecs, top int, instance,
}
windowSecs = int(duration.Seconds())

groupBy = c.Query("group_by")
return
}

Expand Down
23 changes: 20 additions & 3 deletions component/topsql/service/pools.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,23 +77,40 @@ func (tip *recordsPool) Put(ti *[]query.RecordItem) {
tip.p.Put(ti)
}

type summaryPool struct {
type summarySQLPool struct {
p sync.Pool
}

func (sp *summaryPool) Get() *[]query.SummaryItem {
func (sp *summarySQLPool) Get() *[]query.SummaryItem {
sv := sp.p.Get()
if sv == nil {
return &[]query.SummaryItem{}
}
return sv.(*[]query.SummaryItem)
}

func (sp *summaryPool) Put(s *[]query.SummaryItem) {
func (sp *summarySQLPool) Put(s *[]query.SummaryItem) {
*s = (*s)[:0]
sp.p.Put(s)
}

type summaryByItemPool struct {
p sync.Pool
}

func (tp *summaryByItemPool) Get() *[]query.SummaryByItem {
tv := tp.p.Get()
if tv == nil {
return &[]query.SummaryByItem{}
}
return tv.(*[]query.SummaryByItem)
}

func (tp *summaryByItemPool) Put(t *[]query.SummaryByItem) {
*t = (*t)[:0]
tp.p.Put(t)
}

type InstanceItemsPool struct {
p sync.Pool
}
Expand Down
Loading

0 comments on commit 78151ff

Please sign in to comment.