Skip to content

Commit

Permalink
Add the measure query trace system (#476)
Browse files Browse the repository at this point in the history
  • Loading branch information
hanahmily authored Jun 25, 2024
1 parent 7a12fd0 commit 065d7df
Show file tree
Hide file tree
Showing 23 changed files with 791 additions and 125 deletions.
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ Release Notes.

- Check unregistered nodes in background.
- Improve sorting performance of stream.
- Add the measure query trace.

### Bugs

Expand Down
2 changes: 2 additions & 0 deletions api/proto/banyandb/common/v1/trace.proto
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ message Span {
string message = 5;
// children is a list of child spans of the span.
repeated Span children = 6;
// duration is the duration of the span.
int64 duration = 7;
}

// Tag is the key-value pair of a span.
Expand Down
15 changes: 12 additions & 3 deletions banyand/dquery/dquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,11 @@ package dquery

import (
"context"
"errors"

"go.uber.org/multierr"

"github.com/apache/skywalking-banyandb/api/common"
"github.com/apache/skywalking-banyandb/api/data"
modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
"github.com/apache/skywalking-banyandb/banyand/measure"
Expand All @@ -42,13 +44,14 @@ const (
var _ run.Service = (*queryService)(nil)

type queryService struct {
log *logger.Logger
metaService metadata.Repo
pipeline queue.Server
log *logger.Logger
sqp *streamQueryProcessor
mqp *measureQueryProcessor
tqp *topNQueryProcessor
closer *run.Closer
pipeline queue.Server
nodeID string
}

// NewService return a new query service.
Expand Down Expand Up @@ -78,7 +81,13 @@ func (q *queryService) Name() string {
return moduleName
}

func (q *queryService) PreRun(_ context.Context) error {
func (q *queryService) PreRun(ctx context.Context) error {
val := ctx.Value(common.ContextNodeKey)
if val == nil {
return errors.New("node id is empty")
}
node := val.(common.Node)
q.nodeID = node.NodeID
q.log = logger.GetLogger(moduleName)
q.sqp.streamService = stream.NewPortableRepository(q.metaService, q.log)
q.mqp.measureService = measure.NewPortableRepository(q.metaService, q.log)
Expand Down
60 changes: 50 additions & 10 deletions banyand/dquery/measure.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ package dquery

import (
"context"
"errors"
"fmt"
"time"

"github.com/apache/skywalking-banyandb/api/common"
Expand All @@ -27,6 +29,7 @@ import (
"github.com/apache/skywalking-banyandb/banyand/measure"
"github.com/apache/skywalking-banyandb/pkg/bus"
"github.com/apache/skywalking-banyandb/pkg/logger"
"github.com/apache/skywalking-banyandb/pkg/query"
"github.com/apache/skywalking-banyandb/pkg/query/executor"
logical_measure "github.com/apache/skywalking-banyandb/pkg/query/logical/measure"
)
Expand All @@ -39,7 +42,8 @@ type measureQueryProcessor struct {

func (p *measureQueryProcessor) Rev(message bus.Message) (resp bus.Message) {
queryCriteria, ok := message.Data().(*measurev1.QueryRequest)
now := time.Now().UnixNano()
n := time.Now()
now := n.UnixNano()
if !ok {
resp = bus.NewMessage(bus.MessageID(now), common.NewError("invalid event data type"))
return
Expand Down Expand Up @@ -79,8 +83,28 @@ func (p *measureQueryProcessor) Rev(message bus.Message) (resp bus.Message) {
if e := ml.Debug(); e.Enabled() {
e.Str("plan", plan.String()).Msg("query plan")
}

mIterator, err := plan.(executor.MeasureExecutable).Execute(executor.WithDistributedExecutionContext(context.Background(), &distributedContext{
ctx := context.Background()
var tracer *query.Tracer
var span *query.Span
if queryCriteria.Trace {
tracer, ctx = query.NewTracer(ctx, n.Format(time.RFC3339Nano))
span, ctx = tracer.StartSpan(ctx, "distributed-%s", p.queryService.nodeID)
span.Tag("plan", plan.String())
defer func() {
data := resp.Data()
switch d := data.(type) {
case *measurev1.QueryResponse:
d.Trace = tracer.ToProto()
case common.Error:
span.Error(errors.New(d.Msg()))
resp = bus.NewMessage(bus.MessageID(now), &measurev1.QueryResponse{Trace: tracer.ToProto()})
default:
panic("unexpected data type")
}
span.Stop()
}()
}
mIterator, err := plan.(executor.MeasureExecutable).Execute(executor.WithDistributedExecutionContext(ctx, &distributedContext{
Broadcaster: p.broadcaster,
timeRange: queryCriteria.TimeRange,
}))
Expand All @@ -92,18 +116,34 @@ func (p *measureQueryProcessor) Rev(message bus.Message) (resp bus.Message) {
defer func() {
if err = mIterator.Close(); err != nil {
ml.Error().Err(err).RawJSON("req", logger.Proto(queryCriteria)).Msg("fail to close the query plan")
if span != nil {
span.Error(fmt.Errorf("fail to close the query plan: %w", err))
}
}
}()
result := make([]*measurev1.DataPoint, 0)
for mIterator.Next() {
current := mIterator.Current()
if len(current) > 0 {
result = append(result, current[0])
func() {
var r int
if tracer != nil {
iterSpan, _ := tracer.StartSpan(ctx, "iterator")
defer func() {
iterSpan.Tag("rounds", fmt.Sprintf("%d", r))
iterSpan.Tag("size", fmt.Sprintf("%d", len(result)))
iterSpan.Stop()
}()
}
}
for mIterator.Next() {
r++
current := mIterator.Current()
if len(current) > 0 {
result = append(result, current[0])
}
}
}()
qr := &measurev1.QueryResponse{DataPoints: result}
if e := ml.Debug(); e.Enabled() {
e.RawJSON("ret", logger.Proto(&measurev1.QueryResponse{DataPoints: result})).Msg("got a measure")
e.RawJSON("ret", logger.Proto(qr)).Msg("got a measure")
}
resp = bus.NewMessage(bus.MessageID(now), &measurev1.QueryResponse{DataPoints: result})
resp = bus.NewMessage(bus.MessageID(now), qr)
return
}
85 changes: 72 additions & 13 deletions banyand/internal/storage/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ import (
"github.com/apache/skywalking-banyandb/pkg/index/posting"
"github.com/apache/skywalking-banyandb/pkg/logger"
pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
"github.com/apache/skywalking-banyandb/pkg/query"
"github.com/apache/skywalking-banyandb/pkg/query/logical"
"github.com/apache/skywalking-banyandb/pkg/timestamp"
)

Expand Down Expand Up @@ -88,15 +90,26 @@ func (s *seriesIndex) Write(docs index.Documents) error {

var rangeOpts = index.RangeOpts{}

func (s *seriesIndex) searchPrimary(ctx context.Context, series []*pbv1.Series) (pbv1.SeriesList, error) {
func (s *seriesIndex) searchPrimary(ctx context.Context, series []*pbv1.Series) (sl pbv1.SeriesList, err error) {
seriesMatchers := make([]index.SeriesMatcher, len(series))
for i := range series {
var err error
seriesMatchers[i], err = convertEntityValuesToSeriesMatcher(series[i])
if err != nil {
return nil, err
}
}
tracer := query.GetTracer(ctx)
var span *query.Span
if tracer != nil {
span, _ = tracer.StartSpan(ctx, "seriesIndex.searchPrimary")
span.Tagf("matchers", "%v", seriesMatchers)
defer func() {
if err != nil {
span.Error(err)
}
span.Stop()
}()
}
ss, err := s.store.Search(ctx, seriesMatchers)
if err != nil {
return nil, err
Expand All @@ -105,6 +118,9 @@ func (s *seriesIndex) searchPrimary(ctx context.Context, series []*pbv1.Series)
if err != nil {
return nil, errors.WithMessagef(err, "failed to convert index series to series list, matchers: %v, matched: %d", seriesMatchers, len(ss))
}
if span != nil {
span.Tagf("matched", "%d", len(result))
}
return result, nil
}

Expand Down Expand Up @@ -174,28 +190,54 @@ func convertIndexSeriesToSeriesList(indexSeries []index.Series) (pbv1.SeriesList
return seriesList, nil
}

func (s *seriesIndex) Search(ctx context.Context, series []*pbv1.Series, filter index.Filter, order *pbv1.OrderBy, preloadSize int) (pbv1.SeriesList, error) {
func (s *seriesIndex) Search(ctx context.Context, series []*pbv1.Series, filter index.Filter, order *pbv1.OrderBy, preloadSize int) (sl pbv1.SeriesList, err error) {
tracer := query.GetTracer(ctx)
if tracer != nil {
var span *query.Span
span, ctx = tracer.StartSpan(ctx, "seriesIndex.Search")
defer func() {
if err != nil {
span.Error(err)
}
span.Stop()
}()
}
seriesList, err := s.searchPrimary(ctx, series)
if err != nil {
return nil, err
}

pl := seriesList.ToList()
if filter != nil {
if filter != nil && filter != logical.ENode {
var plFilter posting.List
// TODO: merge searchPrimary and filter
plFilter, err = filter.Execute(func(_ databasev1.IndexRule_Type) (index.Searcher, error) {
return s.store, nil
}, 0)
func() {
if tracer != nil {
span, _ := tracer.StartSpan(ctx, "filter")
span.Tag("exp", filter.String())
defer func() {
if err != nil {
span.Error(err)
} else {
span.Tagf("matched", "%d", plFilter.Len())
span.Tagf("total", "%d", pl.Len())
}
span.Stop()
}()
}
if plFilter, err = filter.Execute(func(_ databasev1.IndexRule_Type) (index.Searcher, error) {
return s.store, nil
}, 0); err != nil {
return
}
if plFilter == nil {
return
}
err = pl.Intersect(plFilter)
}()
if err != nil {
return nil, err
}
if plFilter == nil {
return pbv1.SeriesList{}, nil
}
if err = pl.Intersect(plFilter); err != nil {
return nil, err
}
}

if order == nil || order.Index == nil {
Expand All @@ -205,6 +247,17 @@ func (s *seriesIndex) Search(ctx context.Context, series []*pbv1.Series, filter
fieldKey := index.FieldKey{
IndexRuleID: order.Index.GetMetadata().Id,
}
var span *query.Span
if tracer != nil {
span, _ = tracer.StartSpan(ctx, "sort")
span.Tagf("preload", "%d", preloadSize)
defer func() {
if err != nil {
span.Error(err)
}
span.Stop()
}()
}
// TODO:// merge searchPrimary and sort
iter, err := s.store.Iterator(fieldKey, rangeOpts, order.Sort, preloadSize)
if err != nil {
Expand All @@ -215,7 +268,9 @@ func (s *seriesIndex) Search(ctx context.Context, series []*pbv1.Series, filter
}()

var sortedSeriesList pbv1.SeriesList
var r int
for iter.Next() {
r++
docID := iter.Val().DocID
if !pl.Contains(docID) {
continue
Expand All @@ -225,6 +280,10 @@ func (s *seriesIndex) Search(ctx context.Context, series []*pbv1.Series, filter
return nil, err
}
}
if span != nil {
span.Tagf("rounds", "%d", r)
span.Tagf("size", "%d", len(sortedSeriesList))
}
return sortedSeriesList, err
}

Expand Down
37 changes: 27 additions & 10 deletions banyand/liaison/grpc/measure.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,10 @@ import (
"github.com/apache/skywalking-banyandb/banyand/queue"
"github.com/apache/skywalking-banyandb/pkg/accesslog"
"github.com/apache/skywalking-banyandb/pkg/bus"
"github.com/apache/skywalking-banyandb/pkg/convert"
"github.com/apache/skywalking-banyandb/pkg/logger"
pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
"github.com/apache/skywalking-banyandb/pkg/query"
"github.com/apache/skywalking-banyandb/pkg/timestamp"
)

Expand Down Expand Up @@ -145,21 +147,36 @@ func (ms *measureService) Write(measure measurev1.MeasureService_WriteServer) er

var emptyMeasureQueryResponse = &measurev1.QueryResponse{DataPoints: make([]*measurev1.DataPoint, 0)}

func (ms *measureService) Query(_ context.Context, req *measurev1.QueryRequest) (*measurev1.QueryResponse, error) {
if err := timestamp.CheckTimeRange(req.GetTimeRange()); err != nil {
func (ms *measureService) Query(_ context.Context, req *measurev1.QueryRequest) (resp *measurev1.QueryResponse, err error) {
if err = timestamp.CheckTimeRange(req.GetTimeRange()); err != nil {
return nil, status.Errorf(codes.InvalidArgument, "%v is invalid :%s", req.GetTimeRange(), err)
}
message := bus.NewMessage(bus.MessageID(time.Now().UnixNano()), req)
feat, errQuery := ms.broadcaster.Publish(data.TopicMeasureQuery, message)
if errQuery != nil {
return nil, errQuery
now := time.Now()
if req.Trace {
ctx := context.TODO()
tracer, _ := query.NewTracer(ctx, now.Format(time.RFC3339Nano))
span, _ := tracer.StartSpan(ctx, "measure-grpc")
span.Tag("request", convert.BytesToString(logger.Proto(req)))
defer func() {
if err != nil {
span.Error(err)
} else {
span.AddSubTrace(resp.Trace)
resp.Trace = tracer.ToProto()
}
span.Stop()
}()
}
msg, errFeat := feat.Get()
if errFeat != nil {
if errors.Is(errFeat, io.EOF) {
feat, err := ms.broadcaster.Publish(data.TopicMeasureQuery, bus.NewMessage(bus.MessageID(now.UnixNano()), req))
if err != nil {
return nil, err
}
msg, err := feat.Get()
if err != nil {
if errors.Is(err, io.EOF) {
return emptyMeasureQueryResponse, nil
}
return nil, errFeat
return nil, err
}
data := msg.Data()
switch d := data.(type) {
Expand Down
Loading

0 comments on commit 065d7df

Please sign in to comment.