Skip to content

Commit

Permalink
Optimize query performance of series index (#491)
Browse files Browse the repository at this point in the history
  • Loading branch information
ButterBright authored Aug 3, 2024
1 parent b798035 commit 198129c
Show file tree
Hide file tree
Showing 19 changed files with 810 additions and 256 deletions.
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ Release Notes.
- Add the stream query trace.
- Add the topN query trace.
- Introduce the round-robin selector to Liaison Node.
- Optimize query performance of series index.

### Bugs

Expand Down
23 changes: 3 additions & 20 deletions banyand/internal/storage/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,20 +20,17 @@ package storage
import (
"context"
"path"
"strings"

"github.com/pkg/errors"
"go.uber.org/multierr"

"github.com/apache/skywalking-banyandb/api/common"
databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
"github.com/apache/skywalking-banyandb/pkg/index"
"github.com/apache/skywalking-banyandb/pkg/index/inverted"
"github.com/apache/skywalking-banyandb/pkg/index/posting"
"github.com/apache/skywalking-banyandb/pkg/logger"
pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
"github.com/apache/skywalking-banyandb/pkg/query"
"github.com/apache/skywalking-banyandb/pkg/query/logical"
)

func (s *segment[T, O]) IndexDB() IndexDB {
Expand Down Expand Up @@ -200,24 +197,12 @@ func (s *seriesIndex) Search(ctx context.Context, series []*pbv1.Series, opts In
}

pl := seriesList.ToList()
if opts.Filter != nil && opts.Filter != logical.ENode {
if opts.Query != nil {
var plFilter posting.List
func() {
if tracer != nil {
span, _ := tracer.StartSpan(ctx, "filter")
span.Tag("exp", opts.Filter.String())
var projectionStrBuilder strings.Builder
if len(opts.Projection) > 0 {
projectionStrBuilder.WriteString("[")
for i, p := range opts.Projection {
if i > 0 {
projectionStrBuilder.WriteString(", ")
}
projectionStrBuilder.WriteRune(rune(p.IndexRuleID))
}
projectionStrBuilder.WriteString("]")
span.Tagf("projection", "%s", projectionStrBuilder.String())
}
span.Tag("exp", opts.Query.String())
defer func() {
if err != nil {
span.Error(err)
Expand All @@ -228,9 +213,7 @@ func (s *seriesIndex) Search(ctx context.Context, series []*pbv1.Series, opts In
span.Stop()
}()
}
if plFilter, err = opts.Filter.Execute(func(_ databasev1.IndexRule_Type) (index.Searcher, error) {
return s.store, nil
}, 0); err != nil {
if plFilter, err = s.store.Execute(ctx, opts.Query); err != nil {
return
}
if plFilter == nil {
Expand Down
3 changes: 2 additions & 1 deletion banyand/internal/storage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
"github.com/apache/skywalking-banyandb/pkg/fs"
"github.com/apache/skywalking-banyandb/pkg/index"
"github.com/apache/skywalking-banyandb/pkg/index/inverted"
"github.com/apache/skywalking-banyandb/pkg/logger"
pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
"github.com/apache/skywalking-banyandb/pkg/query/model"
Expand Down Expand Up @@ -66,7 +67,7 @@ type SupplyTSDB[T TSTable] func() T

// IndexSearchOpts is the options for searching index.
type IndexSearchOpts struct {
Filter index.Filter
Query *inverted.Query
Order *model.OrderBy
Projection []index.FieldKey
PreloadSize int
Expand Down
2 changes: 1 addition & 1 deletion banyand/measure/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ func (s *measure) searchSeriesList(ctx context.Context, series []*pbv1.Series, m
seriesFilter := roaring.NewPostingList()
for i := range segments {
sll, fieldResultList, err := segments[i].IndexDB().Search(ctx, series, storage.IndexSearchOpts{
Filter: mqo.Filter,
Query: mqo.Query,
Order: mqo.Order,
PreloadSize: preloadSize,
Projection: indexProjection,
Expand Down
4 changes: 2 additions & 2 deletions banyand/stream/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ import (
"github.com/apache/skywalking-banyandb/pkg/partition"
pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
"github.com/apache/skywalking-banyandb/pkg/query"
"github.com/apache/skywalking-banyandb/pkg/query/logical"
logicalstream "github.com/apache/skywalking-banyandb/pkg/query/logical/stream"
"github.com/apache/skywalking-banyandb/pkg/query/model"
)

Expand Down Expand Up @@ -523,7 +523,7 @@ func (qr *queryResult) mergeByTimestamp() *model.StreamResult {
func indexSearch(sqo model.StreamQueryOptions,
tabs []*tsTable, seriesList pbv1.SeriesList,
) (posting.List, error) {
if sqo.Filter == nil || sqo.Filter == logical.ENode {
if sqo.Filter == nil || sqo.Filter == logicalstream.ENode {
return nil, nil
}
result := roaring.NewPostingList()
Expand Down
29 changes: 29 additions & 0 deletions pkg/convert/json.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
// Licensed to Apache Software Foundation (ASF) under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Apache Software Foundation (ASF) licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package convert

import "encoding/json"

// JSONToString converts a JSON marshaler to its JSON string representation.
func JSONToString(marshaler json.Marshaler) string {
bb, err := marshaler.MarshalJSON()
if err != nil {
return err.Error()
}
return string(bb)
}
22 changes: 16 additions & 6 deletions pkg/index/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import (
"fmt"
"io"

"github.com/blugelabs/bluge"

"github.com/apache/skywalking-banyandb/api/common"
databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
Expand Down Expand Up @@ -93,7 +95,7 @@ func (r RangeOpts) Between(value []byte) int {
return 0
}

// DocumentResult represents a document in a index.
// DocumentResult represents a document in an index.
type DocumentResult struct {
Values map[string][]byte
SortedValue []byte
Expand Down Expand Up @@ -131,7 +133,7 @@ func (i *dummyIterator) Close() error {
return nil
}

// Document represents a document in a index.
// Document represents a document in an index.
type Document struct {
Fields []Field
EntityValues []byte
Expand All @@ -147,7 +149,7 @@ type Batch struct {
Documents Documents
}

// Writer allows writing fields and docID in a document to a index.
// Writer allows writing fields and docID in a document to an index.
type Writer interface {
Batch(batch Batch) error
}
Expand All @@ -167,15 +169,22 @@ type Searcher interface {
Range(fieldKey FieldKey, opts RangeOpts) (list posting.List, err error)
}

// Store is an abstract of a index repository.
// Query is an abstract of an index query.
type Query interface {
bluge.Query
fmt.Stringer
Query() bluge.Query
}

// Store is an abstract of an index repository.
type Store interface {
io.Closer
Writer
Searcher
SizeOnDisk() int64
}

// Series represents a series in a index.
// Series represents a series in an index.
type Series struct {
EntityValues []byte
ID common.SeriesID
Expand All @@ -185,7 +194,7 @@ func (s Series) String() string {
return fmt.Sprintf("%s:%d", s.EntityValues, s.ID)
}

// SeriesDocument represents a series document in a index.
// SeriesDocument represents a series document in an index.
type SeriesDocument struct {
Fields map[string][]byte
Key Series
Expand All @@ -196,6 +205,7 @@ type SeriesStore interface {
Store
// Search returns a list of series that match the given matchers.
Search(context.Context, []SeriesMatcher, []FieldKey) ([]SeriesDocument, error)
Execute(context.Context, Query) (posting.List, error)
}

// SeriesMatcherType represents the type of series matcher.
Expand Down
35 changes: 28 additions & 7 deletions pkg/index/inverted/inverted.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.

// Package inverted implements a inverted index repository.
// Package inverted implements an inverted index repository.
package inverted

import (
Expand Down Expand Up @@ -62,10 +62,11 @@ var (
defaultProjection = []string{docIDField}
)

var analyzers map[databasev1.IndexRule_Analyzer]*analysis.Analyzer
// Analyzers is a map that associates each IndexRule_Analyzer type with a corresponding Analyzer.
var Analyzers map[databasev1.IndexRule_Analyzer]*analysis.Analyzer

func init() {
analyzers = map[databasev1.IndexRule_Analyzer]*analysis.Analyzer{
Analyzers = map[databasev1.IndexRule_Analyzer]*analysis.Analyzer{
databasev1.IndexRule_ANALYZER_KEYWORD: analyzer.NewKeywordAnalyzer(),
databasev1.IndexRule_ANALYZER_SIMPLE: analyzer.NewSimpleAnalyzer(),
databasev1.IndexRule_ANALYZER_STANDARD: analyzer.NewStandardAnalyzer(),
Expand All @@ -74,7 +75,7 @@ func init() {

var _ index.Store = (*store)(nil)

// StoreOpts wraps options to create a inverted index repository.
// StoreOpts wraps options to create an inverted index repository.
type StoreOpts struct {
Logger *logger.Logger
Path string
Expand Down Expand Up @@ -124,7 +125,7 @@ func (s *store) Batch(batch index.Batch) error {
tf.StoreValue()
}
if f.Key.Analyzer != databasev1.IndexRule_ANALYZER_UNSPECIFIED {
tf = tf.WithAnalyzer(analyzers[f.Key.Analyzer])
tf = tf.WithAnalyzer(Analyzers[f.Key.Analyzer])
}
doc.AddField(tf)
}
Expand Down Expand Up @@ -153,7 +154,7 @@ func NewStore(opts StoreOpts) (index.SeriesStore, error) {
WithPersisterNapTimeMSec(int(opts.BatchWaitSec * 1000))
}
config := bluge.DefaultConfigWithIndexConfig(indexConfig)
config.DefaultSearchAnalyzer = analyzers[databasev1.IndexRule_ANALYZER_KEYWORD]
config.DefaultSearchAnalyzer = Analyzers[databasev1.IndexRule_ANALYZER_KEYWORD]
config.Logger = log.New(opts.Logger, opts.Logger.Module(), 0)
w, err := bluge.OpenWriter(config)
if err != nil {
Expand Down Expand Up @@ -271,7 +272,7 @@ func (s *store) Match(fieldKey index.FieldKey, matches []string) (posting.List,
if err != nil {
return nil, err
}
analyzer := analyzers[fieldKey.Analyzer]
analyzer := Analyzers[fieldKey.Analyzer]
fk := fieldKey.Marshal()
query := bluge.NewBooleanQuery()
if fieldKey.HasSeriesID() {
Expand Down Expand Up @@ -309,6 +310,26 @@ func (s *store) Range(fieldKey index.FieldKey, opts index.RangeOpts) (list posti
return
}

func (s *store) Execute(ctx context.Context, query index.Query) (posting.List, error) {
reader, err := s.writer.Reader()
if err != nil {
return nil, err
}
documentMatchIterator, err := reader.Search(ctx, bluge.NewAllMatches(query.Query()))
if err != nil {
return nil, err
}
iter := newBlugeMatchIterator(documentMatchIterator, reader, nil)
defer func() {
err = multierr.Append(err, iter.Close())
}()
list := roaring.NewPostingList()
for iter.Next() {
list.Insert(iter.Val().DocID)
}
return list, err
}

func (s *store) SizeOnDisk() int64 {
_, bytes := s.writer.DirectoryStats()
return int64(bytes)
Expand Down
Loading

0 comments on commit 198129c

Please sign in to comment.