Skip to content

Commit

Permalink
Add slow query logging (#526)
Browse files Browse the repository at this point in the history
  • Loading branch information
hanahmily authored Sep 3, 2024
1 parent c1e8e29 commit 2bad32a
Show file tree
Hide file tree
Showing 16 changed files with 149 additions and 55 deletions.
3 changes: 2 additions & 1 deletion CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ Release Notes.
- Optimize query performance of series index.
- Add liaison, remote queue, storage(rotation), time-series tables, metadata cache and scheduler metrics.
- Add HTTP health check endpoint for the data node.
- Add slow query log for the distributed query and local query.

### Bugs

Expand Down Expand Up @@ -60,7 +61,7 @@ Release Notes.
- Update CI to publish linux/amd64 and linux/arm64 Docker images.
- Make the build system compiles the binary based on the platform which is running on.
- Push "skywalking-banyandb-test" image for e2e and stress test. This image contains bydbctl to do a health check.
- Set etcd-client log level to "error" and etcd-server log level to "warn".
- Set etcd-client log level to "error" and etcd-server log level to "error".

## 0.6.1

Expand Down
12 changes: 12 additions & 0 deletions banyand/dquery/dquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ package dquery
import (
"context"
"errors"
"time"

"go.uber.org/multierr"

Expand Down Expand Up @@ -60,6 +61,7 @@ type queryService struct {
tqp *topNQueryProcessor
closer *run.Closer
nodeID string
slowQuery time.Duration
}

// NewService return a new query service.
Expand Down Expand Up @@ -90,6 +92,16 @@ func (q *queryService) Name() string {
return moduleName
}

func (q *queryService) FlagSet() *run.FlagSet {
fs := run.NewFlagSet("distributed-query")
fs.DurationVar(&q.slowQuery, "dst-slow-query", 0, "distributed slow query threshold, 0 means no slow query log")
return fs
}

func (q *queryService) Validate() error {
return nil
}

func (q *queryService) PreRun(ctx context.Context) error {
val := ctx.Value(common.ContextNodeKey)
if val == nil {
Expand Down
8 changes: 7 additions & 1 deletion banyand/dquery/measure.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ func (p *measureQueryProcessor) Rev(message bus.Message) (resp bus.Message) {
timeRange: queryCriteria.TimeRange,
}))
if err != nil {
ml.Error().Err(err).RawJSON("req", logger.Proto(queryCriteria)).Msg("fail to close the query plan")
ml.Error().Err(err).Dur("latency", time.Since(n)).RawJSON("req", logger.Proto(queryCriteria)).Msg("fail to query")
resp = bus.NewMessage(bus.MessageID(now), common.NewError("fail to execute the query plan for measure %s: %v", meta.GetName(), err))
return
}
Expand Down Expand Up @@ -144,5 +144,11 @@ func (p *measureQueryProcessor) Rev(message bus.Message) (resp bus.Message) {
e.RawJSON("ret", logger.Proto(qr)).Msg("got a measure")
}
resp = bus.NewMessage(bus.MessageID(now), qr)
if !queryCriteria.Trace && p.slowQuery > 0 {
latency := time.Since(n)
if latency > p.slowQuery {
p.log.Warn().Dur("latency", latency).RawJSON("req", logger.Proto(queryCriteria)).Int("resp_count", len(result)).Msg("measure slow query")
}
}
return
}
14 changes: 9 additions & 5 deletions banyand/dquery/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (

"github.com/apache/skywalking-banyandb/api/common"
commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
measurev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/measure/v1"
streamv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/stream/v1"
"github.com/apache/skywalking-banyandb/banyand/stream"
"github.com/apache/skywalking-banyandb/pkg/bus"
Expand Down Expand Up @@ -80,9 +79,9 @@ func (p *streamQueryProcessor) Rev(message bus.Message) (resp bus.Message) {
p.log.Debug().Str("plan", plan.String()).Msg("query plan")
}
ctx := context.Background()
var tracer *query.Tracer
var span *query.Span
if queryCriteria.Trace {
var tracer *query.Tracer
var span *query.Span
tracer, ctx = query.NewTracer(ctx, n.Format(time.RFC3339Nano))
span, ctx = tracer.StartSpan(ctx, "distributed-%s", p.queryService.nodeID)
span.Tag("plan", plan.String())
Expand All @@ -93,7 +92,7 @@ func (p *streamQueryProcessor) Rev(message bus.Message) (resp bus.Message) {
d.Trace = tracer.ToProto()
case common.Error:
span.Error(errors.New(d.Msg()))
resp = bus.NewMessage(bus.MessageID(now), &measurev1.QueryResponse{Trace: tracer.ToProto()})
resp = bus.NewMessage(bus.MessageID(now), &streamv1.QueryResponse{Trace: tracer.ToProto()})
default:
panic("unexpected data type")
}
Expand All @@ -113,6 +112,11 @@ func (p *streamQueryProcessor) Rev(message bus.Message) (resp bus.Message) {
}

resp = bus.NewMessage(bus.MessageID(now), &streamv1.QueryResponse{Elements: entities})

if !queryCriteria.Trace && p.slowQuery > 0 {
latency := time.Since(n)
if latency > p.slowQuery {
p.log.Warn().Dur("latency", latency).RawJSON("req", logger.Proto(queryCriteria)).Int("resp_count", len(entities)).Msg("stream slow query")
}
}
return
}
34 changes: 32 additions & 2 deletions banyand/dquery/topn.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
package dquery

import (
"context"
"errors"
"time"

"go.uber.org/multierr"
Expand All @@ -30,7 +32,9 @@ import (
"github.com/apache/skywalking-banyandb/pkg/bus"
"github.com/apache/skywalking-banyandb/pkg/convert"
"github.com/apache/skywalking-banyandb/pkg/iter/sort"
"github.com/apache/skywalking-banyandb/pkg/logger"
pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
pkgquery "github.com/apache/skywalking-banyandb/pkg/query"
)

const defaultTopNQueryTimeout = 10 * time.Second
Expand All @@ -46,6 +50,7 @@ func (t *topNQueryProcessor) Rev(message bus.Message) (resp bus.Message) {
t.log.Warn().Msg("invalid event data type")
return
}
n := time.Now()
now := bus.MessageID(request.TimeRange.Begin.Nanos)
if request.GetFieldValueSort() == modelv1.Sort_SORT_UNSPECIFIED {
resp = bus.NewMessage(now, common.NewError("unspecified requested sort direction"))
Expand All @@ -56,7 +61,25 @@ func (t *topNQueryProcessor) Rev(message bus.Message) (resp bus.Message) {
return
}
if e := t.log.Debug(); e.Enabled() {
e.Stringer("req", request).Msg("received a topN query event")
e.RawJSON("req", logger.Proto(request)).Msg("received a topN query event")
}
if request.Trace {
tracer, ctx := pkgquery.NewTracer(context.TODO(), n.Format(time.RFC3339Nano))
span, _ := tracer.StartSpan(ctx, "distributed-client")
span.Tag("request", convert.BytesToString(logger.Proto(request)))
defer func() {
data := resp.Data()
switch d := data.(type) {
case *measurev1.TopNResponse:
d.Trace = tracer.ToProto()
case common.Error:
span.Error(errors.New(d.Msg()))
resp = bus.NewMessage(now, &measurev1.TopNResponse{Trace: tracer.ToProto()})
default:
panic("unexpected data type")
}
span.Stop()
}()
}
agg := request.Agg
request.Agg = modelv1.AggregationFunction_AGGREGATION_FUNCTION_UNSPECIFIED
Expand Down Expand Up @@ -103,9 +126,16 @@ func (t *topNQueryProcessor) Rev(message bus.Message) (resp bus.Message) {
resp = bus.NewMessage(now, &measurev1.TopNResponse{})
return
}
lists := aggregator.Val(tags)
resp = bus.NewMessage(now, &measurev1.TopNResponse{
Lists: aggregator.Val(tags),
Lists: lists,
})
if !request.Trace && t.slowQuery > 0 {
latency := time.Since(n)
if latency > t.slowQuery {
t.log.Warn().Dur("latency", latency).RawJSON("req", logger.Proto(request)).Int("resp_count", len(lists)).Msg("top_n slow query")
}
}
return
}

Expand Down
2 changes: 1 addition & 1 deletion banyand/liaison/grpc/measure.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func (ms *measureService) Write(measure measurev1.MeasureService_WriteServer) er
}
ms.metrics.totalStreamMsgReceived.Inc(1, metadata.Group, "measure", "write")
if errResp := measure.Send(&measurev1.WriteResponse{Metadata: metadata, Status: status, MessageId: messageId}); errResp != nil {
logger.Debug().Err(errResp).Msg("failed to send response")
logger.Debug().Err(errResp).Msg("failed to send measure write response")
ms.metrics.totalStreamMsgSentErr.Inc(1, metadata.Group, "measure", "write")
}
}
Expand Down
2 changes: 1 addition & 1 deletion banyand/liaison/grpc/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func (s *streamService) Write(stream streamv1.StreamService_WriteServer) error {
}
s.metrics.totalStreamMsgReceived.Inc(1, metadata.Group, "stream", "write")
if errResp := stream.Send(&streamv1.WriteResponse{Metadata: metadata, Status: status, MessageId: messageId}); errResp != nil {
logger.Debug().Err(errResp).Msg("failed to send response")
logger.Debug().Err(errResp).Msg("failed to send stream write response")
s.metrics.totalStreamMsgSentErr.Inc(1, metadata.Group, "stream", "write")
}
}
Expand Down
2 changes: 1 addition & 1 deletion banyand/metadata/embeddedetcd/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ func NewServer(options ...Option) (Server, error) {
for _, opt := range options {
opt(conf)
}
zapCfg := logger.GetLogger("etcd-server").DefaultLevel(zerolog.WarnLevel).ToZapConfig()
zapCfg := logger.GetLogger("etcd-server").DefaultLevel(zerolog.ErrorLevel).ToZapConfig()

var l *zap.Logger
var err error
Expand Down
2 changes: 1 addition & 1 deletion banyand/metadata/schema/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ OUTER:
for {
select {
case <-w.closer.CloseNotify():
w.l.Warn().Msgf("watcher closed")
w.l.Info().Msgf("watcher closed")
return
case watchResp, ok := <-wch:
if !ok {
Expand Down
50 changes: 14 additions & 36 deletions banyand/query/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,16 +24,11 @@ import (
"runtime/debug"
"time"

"go.uber.org/multierr"

"github.com/apache/skywalking-banyandb/api/common"
"github.com/apache/skywalking-banyandb/api/data"
commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
measurev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/measure/v1"
streamv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/stream/v1"
"github.com/apache/skywalking-banyandb/banyand/measure"
"github.com/apache/skywalking-banyandb/banyand/metadata"
"github.com/apache/skywalking-banyandb/banyand/queue"
"github.com/apache/skywalking-banyandb/banyand/stream"
"github.com/apache/skywalking-banyandb/pkg/bus"
"github.com/apache/skywalking-banyandb/pkg/logger"
Expand All @@ -55,16 +50,6 @@ var (
_ bus.MessageListener = (*topNQueryProcessor)(nil)
)

type queryService struct {
metaService metadata.Repo
pipeline queue.Server
log *logger.Logger
sqp *streamQueryProcessor
mqp *measureQueryProcessor
tqp *topNQueryProcessor
nodeID string
}

type streamQueryProcessor struct {
streamService stream.Service
*queryService
Expand Down Expand Up @@ -147,6 +132,12 @@ func (p *streamQueryProcessor) Rev(message bus.Message) (resp bus.Message) {

resp = bus.NewMessage(bus.MessageID(now), &streamv1.QueryResponse{Elements: entities})

if !queryCriteria.Trace && p.slowQuery > 0 {
latency := time.Since(n)
if latency > p.slowQuery {
p.log.Warn().Dur("latency", latency).RawJSON("req", logger.Proto(queryCriteria)).Int("resp_count", len(entities)).Msg("stream slow query")
}
}
return
}

Expand Down Expand Up @@ -227,13 +218,13 @@ func (p *measureQueryProcessor) Rev(message bus.Message) (resp bus.Message) {

mIterator, err := plan.(executor.MeasureExecutable).Execute(executor.WithMeasureExecutionContext(ctx, ec))
if err != nil {
ml.Error().Err(err).RawJSON("req", logger.Proto(queryCriteria)).Msg("fail to close the query plan")
ml.Error().Err(err).RawJSON("req", logger.Proto(queryCriteria)).Msg("fail to query")
resp = bus.NewMessage(bus.MessageID(now), common.NewError("fail to execute the query plan for measure %s: %v", meta.GetName(), err))
return
}
defer func() {
if err = mIterator.Close(); err != nil {
ml.Error().Err(err).RawJSON("req", logger.Proto(queryCriteria)).Msg("fail to close the query plan")
ml.Error().Err(err).Dur("latency", time.Since(n)).RawJSON("req", logger.Proto(queryCriteria)).Msg("fail to close the query plan")
if span != nil {
span.Error(fmt.Errorf("fail to close the query plan: %w", err))
}
Expand Down Expand Up @@ -264,24 +255,11 @@ func (p *measureQueryProcessor) Rev(message bus.Message) (resp bus.Message) {
e.RawJSON("ret", logger.Proto(qr)).Msg("got a measure")
}
resp = bus.NewMessage(bus.MessageID(now), qr)
return
}

func (q *queryService) Name() string {
return moduleName
}

func (q *queryService) PreRun(ctx context.Context) error {
val := ctx.Value(common.ContextNodeKey)
if val == nil {
return errors.New("node id is empty")
if !queryCriteria.Trace && p.slowQuery > 0 {
latency := time.Since(n)
if latency > p.slowQuery {
p.log.Warn().Dur("latency", latency).RawJSON("req", logger.Proto(queryCriteria)).Int("resp_count", len(result)).Msg("measure slow query")
}
}
node := val.(common.Node)
q.nodeID = node.NodeID
q.log = logger.GetLogger(moduleName)
return multierr.Combine(
q.pipeline.Subscribe(data.TopicStreamQuery, q.sqp),
q.pipeline.Subscribe(data.TopicMeasureQuery, q.mqp),
q.pipeline.Subscribe(data.TopicTopNQuery, q.tqp),
)
return
}
6 changes: 6 additions & 0 deletions banyand/query/processor_topn.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,12 @@ func (t *topNQueryProcessor) Rev(message bus.Message) (resp bus.Message) {
}()

resp = bus.NewMessage(bus.MessageID(now), toTopNResponse(result))
if !request.Trace && t.slowQuery > 0 {
latency := time.Since(n)
if latency > t.slowQuery {
t.log.Warn().Dur("latency", latency).RawJSON("req", logger.Proto(request)).Int("resp_count", len(result)).Msg("top_n slow query")
}
}
return
}

Expand Down
47 changes: 47 additions & 0 deletions banyand/query/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,32 @@ package query

import (
"context"
"errors"
"time"

"go.uber.org/multierr"

"github.com/apache/skywalking-banyandb/api/common"
"github.com/apache/skywalking-banyandb/api/data"
"github.com/apache/skywalking-banyandb/banyand/measure"
"github.com/apache/skywalking-banyandb/banyand/metadata"
"github.com/apache/skywalking-banyandb/banyand/queue"
"github.com/apache/skywalking-banyandb/banyand/stream"
"github.com/apache/skywalking-banyandb/pkg/logger"
"github.com/apache/skywalking-banyandb/pkg/run"
)

type queryService struct {
metaService metadata.Repo
pipeline queue.Server
log *logger.Logger
sqp *streamQueryProcessor
mqp *measureQueryProcessor
tqp *topNQueryProcessor
nodeID string
slowQuery time.Duration
}

// NewService return a new query service.
func NewService(_ context.Context, streamService stream.Service, measureService measure.Service,
metaService metadata.Repo, pipeline queue.Server,
Expand All @@ -53,3 +71,32 @@ func NewService(_ context.Context, streamService stream.Service, measureService
}
return svc, nil
}

func (q *queryService) Name() string {
return moduleName
}

func (q *queryService) PreRun(ctx context.Context) error {
val := ctx.Value(common.ContextNodeKey)
if val == nil {
return errors.New("node id is empty")
}
node := val.(common.Node)
q.nodeID = node.NodeID
q.log = logger.GetLogger(moduleName)
return multierr.Combine(
q.pipeline.Subscribe(data.TopicStreamQuery, q.sqp),
q.pipeline.Subscribe(data.TopicMeasureQuery, q.mqp),
q.pipeline.Subscribe(data.TopicTopNQuery, q.tqp),
)
}

func (q *queryService) FlagSet() *run.FlagSet {
fs := run.NewFlagSet("query")
fs.DurationVar(&q.slowQuery, "slow-query", 0, "slow query threshold, 0 means no slow query log")
return fs
}

func (q *queryService) Validate() error {
return nil
}
Loading

0 comments on commit 2bad32a

Please sign in to comment.