Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimize query performance of series index #491

Merged
merged 21 commits into from
Aug 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ Release Notes.
- Add the stream query trace.
- Add the topN query trace.
- Introduce the round-robin selector to Liaison Node.
- Optimize query performance of series index.

### Bugs

Expand Down
23 changes: 3 additions & 20 deletions banyand/internal/storage/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,20 +20,17 @@ package storage
import (
"context"
"path"
"strings"

"github.com/pkg/errors"
"go.uber.org/multierr"

"github.com/apache/skywalking-banyandb/api/common"
databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
"github.com/apache/skywalking-banyandb/pkg/index"
"github.com/apache/skywalking-banyandb/pkg/index/inverted"
"github.com/apache/skywalking-banyandb/pkg/index/posting"
"github.com/apache/skywalking-banyandb/pkg/logger"
pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
"github.com/apache/skywalking-banyandb/pkg/query"
"github.com/apache/skywalking-banyandb/pkg/query/logical"
)

func (s *segment[T, O]) IndexDB() IndexDB {
Expand Down Expand Up @@ -200,24 +197,12 @@ func (s *seriesIndex) Search(ctx context.Context, series []*pbv1.Series, opts In
}

pl := seriesList.ToList()
if opts.Filter != nil && opts.Filter != logical.ENode {
if opts.Query != nil {
var plFilter posting.List
func() {
if tracer != nil {
span, _ := tracer.StartSpan(ctx, "filter")
hanahmily marked this conversation as resolved.
Show resolved Hide resolved
span.Tag("exp", opts.Filter.String())
var projectionStrBuilder strings.Builder
if len(opts.Projection) > 0 {
projectionStrBuilder.WriteString("[")
for i, p := range opts.Projection {
if i > 0 {
projectionStrBuilder.WriteString(", ")
}
projectionStrBuilder.WriteRune(rune(p.IndexRuleID))
}
projectionStrBuilder.WriteString("]")
span.Tagf("projection", "%s", projectionStrBuilder.String())
}
span.Tag("exp", opts.Query.String())
defer func() {
if err != nil {
span.Error(err)
Expand All @@ -228,9 +213,7 @@ func (s *seriesIndex) Search(ctx context.Context, series []*pbv1.Series, opts In
span.Stop()
}()
}
if plFilter, err = opts.Filter.Execute(func(_ databasev1.IndexRule_Type) (index.Searcher, error) {
return s.store, nil
}, 0); err != nil {
if plFilter, err = s.store.Execute(ctx, opts.Query); err != nil {
return
}
if plFilter == nil {
Expand Down
3 changes: 2 additions & 1 deletion banyand/internal/storage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
"github.com/apache/skywalking-banyandb/pkg/fs"
"github.com/apache/skywalking-banyandb/pkg/index"
"github.com/apache/skywalking-banyandb/pkg/index/inverted"
"github.com/apache/skywalking-banyandb/pkg/logger"
pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
"github.com/apache/skywalking-banyandb/pkg/query/model"
Expand Down Expand Up @@ -66,7 +67,7 @@ type SupplyTSDB[T TSTable] func() T

// IndexSearchOpts is the options for searching index.
type IndexSearchOpts struct {
Filter index.Filter
Query *inverted.Query
Order *model.OrderBy
Projection []index.FieldKey
PreloadSize int
Expand Down
2 changes: 1 addition & 1 deletion banyand/measure/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ func (s *measure) searchSeriesList(ctx context.Context, series []*pbv1.Series, m
seriesFilter := roaring.NewPostingList()
for i := range segments {
sll, fieldResultList, err := segments[i].IndexDB().Search(ctx, series, storage.IndexSearchOpts{
Filter: mqo.Filter,
Query: mqo.Query,
Order: mqo.Order,
PreloadSize: preloadSize,
Projection: indexProjection,
Expand Down
4 changes: 2 additions & 2 deletions banyand/stream/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ import (
"github.com/apache/skywalking-banyandb/pkg/partition"
pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
"github.com/apache/skywalking-banyandb/pkg/query"
"github.com/apache/skywalking-banyandb/pkg/query/logical"
logicalstream "github.com/apache/skywalking-banyandb/pkg/query/logical/stream"
"github.com/apache/skywalking-banyandb/pkg/query/model"
)

Expand Down Expand Up @@ -523,7 +523,7 @@ func (qr *queryResult) mergeByTimestamp() *model.StreamResult {
func indexSearch(sqo model.StreamQueryOptions,
tabs []*tsTable, seriesList pbv1.SeriesList,
) (posting.List, error) {
if sqo.Filter == nil || sqo.Filter == logical.ENode {
if sqo.Filter == nil || sqo.Filter == logicalstream.ENode {
return nil, nil
}
result := roaring.NewPostingList()
Expand Down
29 changes: 29 additions & 0 deletions pkg/convert/json.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
// Licensed to Apache Software Foundation (ASF) under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Apache Software Foundation (ASF) licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package convert

import "encoding/json"

// JSONToString converts a JSON marshaler to its JSON string representation.
func JSONToString(marshaler json.Marshaler) string {
bb, err := marshaler.MarshalJSON()
if err != nil {
return err.Error()
}
return string(bb)
}
22 changes: 16 additions & 6 deletions pkg/index/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import (
"fmt"
"io"

"github.com/blugelabs/bluge"

"github.com/apache/skywalking-banyandb/api/common"
databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
Expand Down Expand Up @@ -93,7 +95,7 @@ func (r RangeOpts) Between(value []byte) int {
return 0
}

// DocumentResult represents a document in a index.
// DocumentResult represents a document in an index.
type DocumentResult struct {
Values map[string][]byte
SortedValue []byte
Expand Down Expand Up @@ -131,7 +133,7 @@ func (i *dummyIterator) Close() error {
return nil
}

// Document represents a document in a index.
// Document represents a document in an index.
type Document struct {
Fields []Field
EntityValues []byte
Expand All @@ -147,7 +149,7 @@ type Batch struct {
Documents Documents
}

// Writer allows writing fields and docID in a document to a index.
// Writer allows writing fields and docID in a document to an index.
type Writer interface {
Batch(batch Batch) error
}
Expand All @@ -167,15 +169,22 @@ type Searcher interface {
Range(fieldKey FieldKey, opts RangeOpts) (list posting.List, err error)
}

// Store is an abstract of a index repository.
// Query is an abstract of an index query.
type Query interface {
bluge.Query
fmt.Stringer
Query() bluge.Query
}

// Store is an abstract of an index repository.
type Store interface {
io.Closer
Writer
Searcher
SizeOnDisk() int64
}

// Series represents a series in a index.
// Series represents a series in an index.
type Series struct {
EntityValues []byte
ID common.SeriesID
Expand All @@ -185,7 +194,7 @@ func (s Series) String() string {
return fmt.Sprintf("%s:%d", s.EntityValues, s.ID)
}

// SeriesDocument represents a series document in a index.
// SeriesDocument represents a series document in an index.
type SeriesDocument struct {
Fields map[string][]byte
Key Series
Expand All @@ -196,6 +205,7 @@ type SeriesStore interface {
Store
// Search returns a list of series that match the given matchers.
Search(context.Context, []SeriesMatcher, []FieldKey) ([]SeriesDocument, error)
Execute(context.Context, Query) (posting.List, error)
}

// SeriesMatcherType represents the type of series matcher.
Expand Down
35 changes: 28 additions & 7 deletions pkg/index/inverted/inverted.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.

// Package inverted implements a inverted index repository.
// Package inverted implements an inverted index repository.
package inverted

import (
Expand Down Expand Up @@ -62,10 +62,11 @@ var (
defaultProjection = []string{docIDField}
)

var analyzers map[databasev1.IndexRule_Analyzer]*analysis.Analyzer
// Analyzers is a map that associates each IndexRule_Analyzer type with a corresponding Analyzer.
var Analyzers map[databasev1.IndexRule_Analyzer]*analysis.Analyzer

func init() {
analyzers = map[databasev1.IndexRule_Analyzer]*analysis.Analyzer{
Analyzers = map[databasev1.IndexRule_Analyzer]*analysis.Analyzer{
databasev1.IndexRule_ANALYZER_KEYWORD: analyzer.NewKeywordAnalyzer(),
databasev1.IndexRule_ANALYZER_SIMPLE: analyzer.NewSimpleAnalyzer(),
databasev1.IndexRule_ANALYZER_STANDARD: analyzer.NewStandardAnalyzer(),
Expand All @@ -74,7 +75,7 @@ func init() {

var _ index.Store = (*store)(nil)

// StoreOpts wraps options to create a inverted index repository.
// StoreOpts wraps options to create an inverted index repository.
type StoreOpts struct {
Logger *logger.Logger
Path string
Expand Down Expand Up @@ -124,7 +125,7 @@ func (s *store) Batch(batch index.Batch) error {
tf.StoreValue()
}
if f.Key.Analyzer != databasev1.IndexRule_ANALYZER_UNSPECIFIED {
tf = tf.WithAnalyzer(analyzers[f.Key.Analyzer])
tf = tf.WithAnalyzer(Analyzers[f.Key.Analyzer])
}
doc.AddField(tf)
}
Expand Down Expand Up @@ -153,7 +154,7 @@ func NewStore(opts StoreOpts) (index.SeriesStore, error) {
WithPersisterNapTimeMSec(int(opts.BatchWaitSec * 1000))
}
config := bluge.DefaultConfigWithIndexConfig(indexConfig)
config.DefaultSearchAnalyzer = analyzers[databasev1.IndexRule_ANALYZER_KEYWORD]
config.DefaultSearchAnalyzer = Analyzers[databasev1.IndexRule_ANALYZER_KEYWORD]
config.Logger = log.New(opts.Logger, opts.Logger.Module(), 0)
w, err := bluge.OpenWriter(config)
if err != nil {
Expand Down Expand Up @@ -271,7 +272,7 @@ func (s *store) Match(fieldKey index.FieldKey, matches []string) (posting.List,
if err != nil {
return nil, err
}
analyzer := analyzers[fieldKey.Analyzer]
analyzer := Analyzers[fieldKey.Analyzer]
fk := fieldKey.Marshal()
query := bluge.NewBooleanQuery()
if fieldKey.HasSeriesID() {
Expand Down Expand Up @@ -309,6 +310,26 @@ func (s *store) Range(fieldKey index.FieldKey, opts index.RangeOpts) (list posti
return
}

func (s *store) Execute(ctx context.Context, query index.Query) (posting.List, error) {
reader, err := s.writer.Reader()
hanahmily marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return nil, err
}
documentMatchIterator, err := reader.Search(ctx, bluge.NewAllMatches(query.Query()))
if err != nil {
return nil, err
}
iter := newBlugeMatchIterator(documentMatchIterator, reader, nil)
defer func() {
err = multierr.Append(err, iter.Close())
}()
list := roaring.NewPostingList()
for iter.Next() {
list.Insert(iter.Val().DocID)
}
return list, err
}

func (s *store) SizeOnDisk() int64 {
_, bytes := s.writer.DirectoryStats()
return int64(bytes)
Expand Down
Loading
Loading