diff --git a/CHANGES.md b/CHANGES.md index 508844053..ba07da6fa 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -22,6 +22,7 @@ Release Notes. - Add the stream query trace. - Add the topN query trace. - Introduce the round-robin selector to Liaison Node. +- Optimize query performance of series index. ### Bugs diff --git a/banyand/internal/storage/index.go b/banyand/internal/storage/index.go index b8cd31cb0..d5950c151 100644 --- a/banyand/internal/storage/index.go +++ b/banyand/internal/storage/index.go @@ -20,20 +20,17 @@ package storage import ( "context" "path" - "strings" "github.com/pkg/errors" "go.uber.org/multierr" "github.com/apache/skywalking-banyandb/api/common" - databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1" "github.com/apache/skywalking-banyandb/pkg/index" "github.com/apache/skywalking-banyandb/pkg/index/inverted" "github.com/apache/skywalking-banyandb/pkg/index/posting" "github.com/apache/skywalking-banyandb/pkg/logger" pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1" "github.com/apache/skywalking-banyandb/pkg/query" - "github.com/apache/skywalking-banyandb/pkg/query/logical" ) func (s *segment[T, O]) IndexDB() IndexDB { @@ -200,24 +197,12 @@ func (s *seriesIndex) Search(ctx context.Context, series []*pbv1.Series, opts In } pl := seriesList.ToList() - if opts.Filter != nil && opts.Filter != logical.ENode { + if opts.Query != nil { var plFilter posting.List func() { if tracer != nil { span, _ := tracer.StartSpan(ctx, "filter") - span.Tag("exp", opts.Filter.String()) - var projectionStrBuilder strings.Builder - if len(opts.Projection) > 0 { - projectionStrBuilder.WriteString("[") - for i, p := range opts.Projection { - if i > 0 { - projectionStrBuilder.WriteString(", ") - } - projectionStrBuilder.WriteRune(rune(p.IndexRuleID)) - } - projectionStrBuilder.WriteString("]") - span.Tagf("projection", "%s", projectionStrBuilder.String()) - } + span.Tag("exp", opts.Query.String()) defer func() { if err != nil { span.Error(err) @@ -228,9 +213,7 @@ func (s *seriesIndex) Search(ctx context.Context, series []*pbv1.Series, opts In span.Stop() }() } - if plFilter, err = opts.Filter.Execute(func(_ databasev1.IndexRule_Type) (index.Searcher, error) { - return s.store, nil - }, 0); err != nil { + if plFilter, err = s.store.Execute(ctx, opts.Query); err != nil { return } if plFilter == nil { diff --git a/banyand/internal/storage/storage.go b/banyand/internal/storage/storage.go index 1ccf03d7f..fb7e0af69 100644 --- a/banyand/internal/storage/storage.go +++ b/banyand/internal/storage/storage.go @@ -34,6 +34,7 @@ import ( commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1" "github.com/apache/skywalking-banyandb/pkg/fs" "github.com/apache/skywalking-banyandb/pkg/index" + "github.com/apache/skywalking-banyandb/pkg/index/inverted" "github.com/apache/skywalking-banyandb/pkg/logger" pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1" "github.com/apache/skywalking-banyandb/pkg/query/model" @@ -66,7 +67,7 @@ type SupplyTSDB[T TSTable] func() T // IndexSearchOpts is the options for searching index. type IndexSearchOpts struct { - Filter index.Filter + Query *inverted.Query Order *model.OrderBy Projection []index.FieldKey PreloadSize int diff --git a/banyand/measure/query.go b/banyand/measure/query.go index ae37521aa..fa94091e4 100644 --- a/banyand/measure/query.go +++ b/banyand/measure/query.go @@ -194,7 +194,7 @@ func (s *measure) searchSeriesList(ctx context.Context, series []*pbv1.Series, m seriesFilter := roaring.NewPostingList() for i := range segments { sll, fieldResultList, err := segments[i].IndexDB().Search(ctx, series, storage.IndexSearchOpts{ - Filter: mqo.Filter, + Query: mqo.Query, Order: mqo.Order, PreloadSize: preloadSize, Projection: indexProjection, diff --git a/banyand/stream/query.go b/banyand/stream/query.go index 9dff205f6..0a5ebdc35 100644 --- a/banyand/stream/query.go +++ b/banyand/stream/query.go @@ -37,7 +37,7 @@ import ( "github.com/apache/skywalking-banyandb/pkg/partition" pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1" "github.com/apache/skywalking-banyandb/pkg/query" - "github.com/apache/skywalking-banyandb/pkg/query/logical" + logicalstream "github.com/apache/skywalking-banyandb/pkg/query/logical/stream" "github.com/apache/skywalking-banyandb/pkg/query/model" ) @@ -523,7 +523,7 @@ func (qr *queryResult) mergeByTimestamp() *model.StreamResult { func indexSearch(sqo model.StreamQueryOptions, tabs []*tsTable, seriesList pbv1.SeriesList, ) (posting.List, error) { - if sqo.Filter == nil || sqo.Filter == logical.ENode { + if sqo.Filter == nil || sqo.Filter == logicalstream.ENode { return nil, nil } result := roaring.NewPostingList() diff --git a/pkg/convert/json.go b/pkg/convert/json.go new file mode 100644 index 000000000..22b250d99 --- /dev/null +++ b/pkg/convert/json.go @@ -0,0 +1,29 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package convert + +import "encoding/json" + +// JSONToString converts a JSON marshaler to its JSON string representation. +func JSONToString(marshaler json.Marshaler) string { + bb, err := marshaler.MarshalJSON() + if err != nil { + return err.Error() + } + return string(bb) +} diff --git a/pkg/index/index.go b/pkg/index/index.go index 5d5bf91e4..8234f4965 100644 --- a/pkg/index/index.go +++ b/pkg/index/index.go @@ -24,6 +24,8 @@ import ( "fmt" "io" + "github.com/blugelabs/bluge" + "github.com/apache/skywalking-banyandb/api/common" databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1" modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1" @@ -93,7 +95,7 @@ func (r RangeOpts) Between(value []byte) int { return 0 } -// DocumentResult represents a document in a index. +// DocumentResult represents a document in an index. type DocumentResult struct { Values map[string][]byte SortedValue []byte @@ -131,7 +133,7 @@ func (i *dummyIterator) Close() error { return nil } -// Document represents a document in a index. +// Document represents a document in an index. type Document struct { Fields []Field EntityValues []byte @@ -147,7 +149,7 @@ type Batch struct { Documents Documents } -// Writer allows writing fields and docID in a document to a index. +// Writer allows writing fields and docID in a document to an index. type Writer interface { Batch(batch Batch) error } @@ -167,7 +169,14 @@ type Searcher interface { Range(fieldKey FieldKey, opts RangeOpts) (list posting.List, err error) } -// Store is an abstract of a index repository. +// Query is an abstract of an index query. +type Query interface { + bluge.Query + fmt.Stringer + Query() bluge.Query +} + +// Store is an abstract of an index repository. type Store interface { io.Closer Writer @@ -175,7 +184,7 @@ type Store interface { SizeOnDisk() int64 } -// Series represents a series in a index. +// Series represents a series in an index. type Series struct { EntityValues []byte ID common.SeriesID @@ -185,7 +194,7 @@ func (s Series) String() string { return fmt.Sprintf("%s:%d", s.EntityValues, s.ID) } -// SeriesDocument represents a series document in a index. +// SeriesDocument represents a series document in an index. type SeriesDocument struct { Fields map[string][]byte Key Series @@ -196,6 +205,7 @@ type SeriesStore interface { Store // Search returns a list of series that match the given matchers. Search(context.Context, []SeriesMatcher, []FieldKey) ([]SeriesDocument, error) + Execute(context.Context, Query) (posting.List, error) } // SeriesMatcherType represents the type of series matcher. diff --git a/pkg/index/inverted/inverted.go b/pkg/index/inverted/inverted.go index 4d831fb22..a481b7f3b 100644 --- a/pkg/index/inverted/inverted.go +++ b/pkg/index/inverted/inverted.go @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -// Package inverted implements a inverted index repository. +// Package inverted implements an inverted index repository. package inverted import ( @@ -62,10 +62,11 @@ var ( defaultProjection = []string{docIDField} ) -var analyzers map[databasev1.IndexRule_Analyzer]*analysis.Analyzer +// Analyzers is a map that associates each IndexRule_Analyzer type with a corresponding Analyzer. +var Analyzers map[databasev1.IndexRule_Analyzer]*analysis.Analyzer func init() { - analyzers = map[databasev1.IndexRule_Analyzer]*analysis.Analyzer{ + Analyzers = map[databasev1.IndexRule_Analyzer]*analysis.Analyzer{ databasev1.IndexRule_ANALYZER_KEYWORD: analyzer.NewKeywordAnalyzer(), databasev1.IndexRule_ANALYZER_SIMPLE: analyzer.NewSimpleAnalyzer(), databasev1.IndexRule_ANALYZER_STANDARD: analyzer.NewStandardAnalyzer(), @@ -74,7 +75,7 @@ func init() { var _ index.Store = (*store)(nil) -// StoreOpts wraps options to create a inverted index repository. +// StoreOpts wraps options to create an inverted index repository. type StoreOpts struct { Logger *logger.Logger Path string @@ -124,7 +125,7 @@ func (s *store) Batch(batch index.Batch) error { tf.StoreValue() } if f.Key.Analyzer != databasev1.IndexRule_ANALYZER_UNSPECIFIED { - tf = tf.WithAnalyzer(analyzers[f.Key.Analyzer]) + tf = tf.WithAnalyzer(Analyzers[f.Key.Analyzer]) } doc.AddField(tf) } @@ -153,7 +154,7 @@ func NewStore(opts StoreOpts) (index.SeriesStore, error) { WithPersisterNapTimeMSec(int(opts.BatchWaitSec * 1000)) } config := bluge.DefaultConfigWithIndexConfig(indexConfig) - config.DefaultSearchAnalyzer = analyzers[databasev1.IndexRule_ANALYZER_KEYWORD] + config.DefaultSearchAnalyzer = Analyzers[databasev1.IndexRule_ANALYZER_KEYWORD] config.Logger = log.New(opts.Logger, opts.Logger.Module(), 0) w, err := bluge.OpenWriter(config) if err != nil { @@ -271,7 +272,7 @@ func (s *store) Match(fieldKey index.FieldKey, matches []string) (posting.List, if err != nil { return nil, err } - analyzer := analyzers[fieldKey.Analyzer] + analyzer := Analyzers[fieldKey.Analyzer] fk := fieldKey.Marshal() query := bluge.NewBooleanQuery() if fieldKey.HasSeriesID() { @@ -309,6 +310,26 @@ func (s *store) Range(fieldKey index.FieldKey, opts index.RangeOpts) (list posti return } +func (s *store) Execute(ctx context.Context, query index.Query) (posting.List, error) { + reader, err := s.writer.Reader() + if err != nil { + return nil, err + } + documentMatchIterator, err := reader.Search(ctx, bluge.NewAllMatches(query.Query())) + if err != nil { + return nil, err + } + iter := newBlugeMatchIterator(documentMatchIterator, reader, nil) + defer func() { + err = multierr.Append(err, iter.Close()) + }() + list := roaring.NewPostingList() + for iter.Next() { + list.Insert(iter.Val().DocID) + } + return list, err +} + func (s *store) SizeOnDisk() int64 { _, bytes := s.writer.DirectoryStats() return int64(bytes) diff --git a/pkg/index/inverted/query.go b/pkg/index/inverted/query.go new file mode 100644 index 000000000..e3f90fda3 --- /dev/null +++ b/pkg/index/inverted/query.go @@ -0,0 +1,431 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package inverted + +import ( + "encoding/json" + "fmt" + "math" + "strings" + + "github.com/blugelabs/bluge" + "github.com/blugelabs/bluge/search" + "github.com/pkg/errors" + + databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1" + modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1" + "github.com/apache/skywalking-banyandb/pkg/convert" + "github.com/apache/skywalking-banyandb/pkg/query/logical" +) + +var ( + minTerm = string([][]byte{convert.Int64ToBytes(math.MinInt64)}[0]) + maxTerm = string([][]byte{convert.Int64ToBytes(math.MaxInt64)}[0]) + minInf = "-inf" + maxInf = "+inf" +) + +// GlobalIndexError represents a index rule is "global". +// The local filter can't handle it. +type GlobalIndexError struct { + IndexRule *databasev1.IndexRule + Expr logical.LiteralExpr +} + +func (g GlobalIndexError) Error() string { return g.IndexRule.String() } + +// Query is a wrapper for bluge.Query. +type Query struct { + query bluge.Query + node +} + +// Searcher implements index.Query. +func (q *Query) Searcher(i search.Reader, options search.SearcherOptions) (search.Searcher, error) { + return q.query.Searcher(i, options) +} + +func (q *Query) String() string { + return q.node.String() +} + +// Query implements index.Query. +func (q *Query) Query() bluge.Query { + return q.query +} + +// BuildLocalQuery returns blugeQuery for local indices. +func BuildLocalQuery(criteria *modelv1.Criteria, schema logical.Schema, entityDict map[string]int, + entity []*modelv1.TagValue, +) (*Query, [][]*modelv1.TagValue, bool, error) { + if criteria == nil { + return nil, [][]*modelv1.TagValue{entity}, false, nil + } + switch criteria.GetExp().(type) { + case *modelv1.Criteria_Condition: + cond := criteria.GetCondition() + expr, parsedEntity, err := logical.ParseExprOrEntity(entityDict, entity, cond) + if err != nil { + return nil, nil, false, err + } + if parsedEntity != nil { + return nil, parsedEntity, false, nil + } + if ok, indexRule := schema.IndexDefined(cond.Name); ok { + return parseConditionToQuery(cond, indexRule, expr, entity) + } + return nil, nil, false, errors.Wrapf(logical.ErrUnsupportedConditionOp, "mandatory index rule conf:%s", cond) + case *modelv1.Criteria_Le: + le := criteria.GetLe() + if le.GetLeft() == nil && le.GetRight() == nil { + return nil, nil, false, errors.WithMessagef(logical.ErrInvalidLogicalExpression, "both sides(left and right) of [%v] are empty", criteria) + } + if le.GetLeft() == nil { + return BuildLocalQuery(le.Right, schema, entityDict, entity) + } + if le.GetRight() == nil { + return BuildLocalQuery(le.Left, schema, entityDict, entity) + } + left, leftEntities, leftIsMatchAllQuery, err := BuildLocalQuery(le.Left, schema, entityDict, entity) + if err != nil { + return nil, nil, false, err + } + right, rightEntities, rightIsMatchAllQuery, err := BuildLocalQuery(le.Right, schema, entityDict, entity) + if err != nil { + return nil, nil, false, err + } + entities := logical.ParseEntities(le.Op, entity, leftEntities, rightEntities) + if entities == nil { + return nil, nil, false, nil + } + if left == nil && right == nil { + return nil, entities, false, nil + } + if leftIsMatchAllQuery && rightIsMatchAllQuery { + return &Query{ + query: bluge.NewMatchAllQuery(), + node: newMatchAllNode(), + }, entities, true, nil + } + switch le.Op { + case modelv1.LogicalExpression_LOGICAL_OP_AND: + query, node := bluge.NewBooleanQuery(), newMustNode() + if left != nil { + query.AddMust(left.query) + node.Append(left.node) + } + if right != nil { + query.AddMust(right.query) + node.Append(right.node) + } + return &Query{query, node}, entities, false, nil + case modelv1.LogicalExpression_LOGICAL_OP_OR: + if leftIsMatchAllQuery || rightIsMatchAllQuery { + return &Query{ + query: bluge.NewMatchAllQuery(), + node: newMatchAllNode(), + }, entities, true, nil + } + query, node := bluge.NewBooleanQuery(), newShouldNode() + query.SetMinShould(1) + if left != nil { + query.AddShould(left.query) + node.Append(left.node) + } + if right != nil { + query.AddShould(right.query) + node.Append(right.node) + } + return &Query{query, node}, entities, false, nil + } + } + return nil, nil, false, logical.ErrInvalidCriteriaType +} + +func parseConditionToQuery(cond *modelv1.Condition, indexRule *databasev1.IndexRule, + expr logical.LiteralExpr, entity []*modelv1.TagValue, +) (*Query, [][]*modelv1.TagValue, bool, error) { + field := string(convert.Uint32ToBytes(indexRule.Metadata.Id)) + b := expr.Bytes() + if len(b) < 1 { + return &Query{ + query: bluge.NewMatchAllQuery(), + node: newMatchAllNode(), + }, [][]*modelv1.TagValue{entity}, true, nil + } + term, str := string(b[0]), expr.String() + switch cond.Op { + case modelv1.Condition_BINARY_OP_GT: + query := bluge.NewTermRangeInclusiveQuery(term, maxTerm, false, false).SetField(field) + node := newTermRangeInclusiveNode(str, maxInf, false, false, indexRule) + return &Query{query, node}, [][]*modelv1.TagValue{entity}, false, nil + case modelv1.Condition_BINARY_OP_GE: + query := bluge.NewTermRangeInclusiveQuery(term, maxTerm, true, false).SetField(field) + node := newTermRangeInclusiveNode(str, maxInf, true, false, indexRule) + return &Query{query, node}, [][]*modelv1.TagValue{entity}, false, nil + case modelv1.Condition_BINARY_OP_LT: + query := bluge.NewTermRangeInclusiveQuery(minTerm, term, false, false).SetField(field) + node := newTermRangeInclusiveNode(minInf, str, false, false, indexRule) + return &Query{query, node}, [][]*modelv1.TagValue{entity}, false, nil + case modelv1.Condition_BINARY_OP_LE: + query := bluge.NewTermRangeInclusiveQuery(minTerm, term, false, true).SetField(field) + node := newTermRangeInclusiveNode(minInf, str, false, true, indexRule) + return &Query{query, node}, [][]*modelv1.TagValue{entity}, false, nil + case modelv1.Condition_BINARY_OP_EQ: + query := bluge.NewTermQuery(term).SetField(field) + node := newTermNode(str, indexRule) + return &Query{query, node}, [][]*modelv1.TagValue{entity}, false, nil + case modelv1.Condition_BINARY_OP_MATCH: + query := bluge.NewMatchQuery(term).SetField(field).SetAnalyzer(Analyzers[indexRule.Analyzer]) + node := newMatchNode(str, indexRule) + return &Query{query, node}, [][]*modelv1.TagValue{entity}, false, nil + case modelv1.Condition_BINARY_OP_NE: + query, node := bluge.NewBooleanQuery(), newMustNotNode() + query.AddMustNot(bluge.NewTermQuery(term).SetField(field)) + node.SetSubNode(newTermNode(str, indexRule)) + return &Query{query, node}, [][]*modelv1.TagValue{entity}, false, nil + case modelv1.Condition_BINARY_OP_HAVING: + bb, elements := expr.Bytes(), expr.Elements() + query, node := bluge.NewBooleanQuery(), newMustNode() + for _, b := range bb { + query.AddMust(bluge.NewTermQuery(string(b)).SetField(field)) + } + for _, e := range elements { + node.Append(newTermNode(e, indexRule)) + } + return &Query{query, node}, [][]*modelv1.TagValue{entity}, false, nil + case modelv1.Condition_BINARY_OP_NOT_HAVING: + bb, elements := expr.Bytes(), expr.Elements() + subQuery, subNode := bluge.NewBooleanQuery(), newMustNode() + for _, b := range bb { + subQuery.AddMust(bluge.NewTermQuery(string(b)).SetField(field)) + } + for _, e := range elements { + subNode.Append(newTermNode(e, indexRule)) + } + query, node := bluge.NewBooleanQuery(), newMustNotNode() + query.AddMustNot(subQuery) + node.SetSubNode(node) + return &Query{query, node}, [][]*modelv1.TagValue{entity}, false, nil + case modelv1.Condition_BINARY_OP_IN: + bb, elements := expr.Bytes(), expr.Elements() + query, node := bluge.NewBooleanQuery(), newShouldNode() + query.SetMinShould(1) + for _, b := range bb { + query.AddShould(bluge.NewTermQuery(string(b)).SetField(field)) + } + for _, e := range elements { + node.Append(newTermNode(e, indexRule)) + } + return &Query{query, node}, [][]*modelv1.TagValue{entity}, false, nil + case modelv1.Condition_BINARY_OP_NOT_IN: + bb, elements := expr.Bytes(), expr.Elements() + subQuery, subNode := bluge.NewBooleanQuery(), newShouldNode() + subQuery.SetMinShould(1) + for _, b := range bb { + subQuery.AddShould(bluge.NewTermQuery(string(b)).SetField(field)) + } + for _, e := range elements { + subNode.Append(newTermNode(e, indexRule)) + } + query, node := bluge.NewBooleanQuery(), newMustNotNode() + query.AddMustNot(subQuery) + node.SetSubNode(subNode) + return &Query{query, node}, [][]*modelv1.TagValue{entity}, false, nil + } + return nil, nil, false, errors.WithMessagef(logical.ErrUnsupportedConditionOp, "index filter parses %v", cond) +} + +type node interface { + fmt.Stringer +} + +type mustNode struct { + subNodes []node +} + +func newMustNode() *mustNode { + return &mustNode{ + subNodes: make([]node, 0), + } +} + +func (m *mustNode) Append(subNode node) { + m.subNodes = append(m.subNodes, subNode) +} + +func (m *mustNode) MarshalJSON() ([]byte, error) { + data := make(map[string]interface{}, 1) + data["must"] = m.subNodes + return json.Marshal(data) +} + +func (m *mustNode) String() string { + return convert.JSONToString(m) +} + +type shouldNode struct { + subNodes []node +} + +func newShouldNode() *shouldNode { + return &shouldNode{ + subNodes: make([]node, 0), + } +} + +func (s *shouldNode) Append(subNode node) { + s.subNodes = append(s.subNodes, subNode) +} + +func (s *shouldNode) MarshalJSON() ([]byte, error) { + data := make(map[string]interface{}, 1) + data["should"] = s.subNodes + return json.Marshal(data) +} + +func (s *shouldNode) String() string { + return convert.JSONToString(s) +} + +type mustNotNode struct { + subNode node +} + +func newMustNotNode() *mustNotNode { + return &mustNotNode{} +} + +func (m *mustNotNode) SetSubNode(subNode node) { + m.subNode = subNode +} + +func (m *mustNotNode) MarshalJSON() ([]byte, error) { + data := make(map[string]interface{}, 1) + data["mustNot"] = m.subNode + return json.Marshal(data) +} + +func (m *mustNotNode) String() string { + return convert.JSONToString(m) +} + +type matchAllNode struct{} + +func newMatchAllNode() *matchAllNode { + return &matchAllNode{} +} + +func (m *matchAllNode) String() string { + return "matchAll" +} + +type termRangeInclusiveNode struct { + indexRule *databasev1.IndexRule + min string + max string + minInclusive bool + maxInclusive bool +} + +func newTermRangeInclusiveNode(min, max string, minInclusive, maxInclusive bool, indexRule *databasev1.IndexRule) *termRangeInclusiveNode { + return &termRangeInclusiveNode{ + indexRule: indexRule, + min: min, + max: max, + minInclusive: minInclusive, + maxInclusive: maxInclusive, + } +} + +func (t *termRangeInclusiveNode) MarshalJSON() ([]byte, error) { + inner := make(map[string]interface{}, 1) + var builder strings.Builder + if t.minInclusive { + builder.WriteString("[") + } else { + builder.WriteString("(") + } + builder.WriteString(t.min + " ") + builder.WriteString(t.max) + if t.maxInclusive { + builder.WriteString("]") + } else { + builder.WriteString(")") + } + inner["range"] = builder.String() + inner["index"] = t.indexRule.Metadata.Name + ":" + t.indexRule.Metadata.Group + data := make(map[string]interface{}, 1) + data["termRangeInclusive"] = inner + return json.Marshal(data) +} + +func (t *termRangeInclusiveNode) String() string { + return convert.JSONToString(t) +} + +type termNode struct { + indexRule *databasev1.IndexRule + term string +} + +func newTermNode(term string, indexRule *databasev1.IndexRule) *termNode { + return &termNode{ + indexRule: indexRule, + term: term, + } +} + +func (t *termNode) MarshalJSON() ([]byte, error) { + inner := make(map[string]interface{}, 1) + inner["index"] = t.indexRule.Metadata.Name + ":" + t.indexRule.Metadata.Group + inner["value"] = t.term + data := make(map[string]interface{}, 1) + data["term"] = inner + return json.Marshal(data) +} + +func (t *termNode) String() string { + return convert.JSONToString(t) +} + +type matchNode struct { + indexRule *databasev1.IndexRule + match string +} + +func newMatchNode(match string, indexRule *databasev1.IndexRule) *matchNode { + return &matchNode{ + indexRule: indexRule, + match: match, + } +} + +func (m *matchNode) MarshalJSON() ([]byte, error) { + inner := make(map[string]interface{}, 1) + inner["index"] = m.indexRule.Metadata.Name + ":" + m.indexRule.Metadata.Group + inner["value"] = m.match + inner["analyzer"] = databasev1.IndexRule_Analyzer_name[int32(m.indexRule.Analyzer)] + data := make(map[string]interface{}, 1) + data["match"] = inner + return json.Marshal(data) +} + +func (m *matchNode) String() string { + return convert.JSONToString(m) +} diff --git a/pkg/query/logical/common.go b/pkg/query/logical/common.go index 88b772201..d06129b9d 100644 --- a/pkg/query/logical/common.go +++ b/pkg/query/logical/common.go @@ -24,12 +24,17 @@ import ( ) var ( - errTagNotDefined = errors.New("tag is not defined") - errUnsupportedConditionOp = errors.New("unsupported condition operation") - errUnsupportedConditionValue = errors.New("unsupported condition value type") - errInvalidCriteriaType = errors.New("invalid criteria type") - errIndexNotDefined = errors.New("index is not define for the tag") - errIndexSortingUnsupported = errors.New("index does not support sorting") + // ErrUnsupportedConditionOp indicates an unsupported condition operation. + ErrUnsupportedConditionOp = errors.New("unsupported condition operation") + // ErrUnsupportedConditionValue indicates an unsupported condition value type. + ErrUnsupportedConditionValue = errors.New("unsupported condition value type") + // ErrInvalidCriteriaType indicates an invalid criteria type. + ErrInvalidCriteriaType = errors.New("invalid criteria type") + // ErrInvalidLogicalExpression indicates an invalid logical expression. + ErrInvalidLogicalExpression = errors.New("invalid logical expression") + errTagNotDefined = errors.New("tag is not defined") + errIndexNotDefined = errors.New("index is not define for the tag") + errIndexSortingUnsupported = errors.New("index does not support sorting") ) // Tag represents the combination of tag family and tag name. diff --git a/pkg/query/logical/expr.go b/pkg/query/logical/expr.go index ed0bdedad..fef36557e 100644 --- a/pkg/query/logical/expr.go +++ b/pkg/query/logical/expr.go @@ -55,6 +55,11 @@ func (f *TagRef) String() string { return fmt.Sprintf("#%s<%s>", f.Tag.GetCompoundName(), f.Spec.Spec.GetType().String()) } +// Elements returns a slice containing the string representation of TagRef. +func (f *TagRef) Elements() []string { + return []string{fmt.Sprintf("#%s<%s>", f.Tag.GetCompoundName(), f.Spec.Spec.GetType().String())} +} + // NewTagRef returns a new TagRef. func NewTagRef(familyName, tagName string) *TagRef { return &TagRef{ @@ -85,6 +90,11 @@ func (f *FieldRef) String() string { return fmt.Sprintf("#%s<%s>", f.Spec.Spec.GetName(), f.Spec.Spec.GetFieldType().String()) } +// Elements returns a slice containing the string representation of FieldRef. +func (f *FieldRef) Elements() []string { + return []string{fmt.Sprintf("#%s<%s>", f.Spec.Spec.GetName(), f.Spec.Spec.GetFieldType().String())} +} + // DataType shows the type of the filed's value. func (f *FieldRef) DataType() int32 { if f.Spec == nil { diff --git a/pkg/query/logical/expr_literal.go b/pkg/query/logical/expr_literal.go index 0a6b5fd91..c21fdd087 100644 --- a/pkg/query/logical/expr_literal.go +++ b/pkg/query/logical/expr_literal.go @@ -39,6 +39,12 @@ type int64Literal struct { int64 } +func newInt64Literal(val int64) *int64Literal { + return &int64Literal{ + int64: val, + } +} + func (i *int64Literal) Compare(other LiteralExpr) (int, bool) { if o, ok := other.(*int64Literal); ok { return int(i.int64 - o.int64), true @@ -88,6 +94,10 @@ func (i *int64Literal) String() string { return strconv.FormatInt(i.int64, 10) } +func (i *int64Literal) Elements() []string { + return []string{strconv.FormatInt(i.int64, 10)} +} + var ( _ LiteralExpr = (*int64ArrLiteral)(nil) _ ComparableExpr = (*int64ArrLiteral)(nil) @@ -97,6 +107,12 @@ type int64ArrLiteral struct { arr []int64 } +func newInt64ArrLiteral(val []int64) *int64ArrLiteral { + return &int64ArrLiteral{ + arr: val, + } +} + func (i *int64ArrLiteral) Compare(other LiteralExpr) (int, bool) { if o, ok := other.(*int64ArrLiteral); ok { return 0, slices.Equal(i.arr, o.arr) @@ -161,6 +177,14 @@ func (i *int64ArrLiteral) String() string { return fmt.Sprintf("%v", i.arr) } +func (i *int64ArrLiteral) Elements() []string { + var elements []string + for _, v := range i.arr { + elements = append(elements, strconv.FormatInt(v, 10)) + } + return elements +} + var ( _ LiteralExpr = (*strLiteral)(nil) _ ComparableExpr = (*strLiteral)(nil) @@ -223,6 +247,10 @@ func (s *strLiteral) String() string { return s.string } +func (s *strLiteral) Elements() []string { + return []string{s.string} +} + var ( _ LiteralExpr = (*strArrLiteral)(nil) _ ComparableExpr = (*strArrLiteral)(nil) @@ -232,6 +260,12 @@ type strArrLiteral struct { arr []string } +func newStrArrLiteral(val []string) *strArrLiteral { + return &strArrLiteral{ + arr: val, + } +} + func (s *strArrLiteral) Compare(other LiteralExpr) (int, bool) { if o, ok := other.(*strArrLiteral); ok { return 0, StringSlicesEqual(s.arr, o.arr) @@ -296,34 +330,49 @@ func (s *strArrLiteral) String() string { return fmt.Sprintf("%v", s.arr) } -type bytesLiteral struct { +func (s *strArrLiteral) Elements() []string { + return s.arr +} + +// BytesLiteral represents a wrapper for a slice of bytes. +type BytesLiteral struct { bb []byte } -func newBytesLiteral(bb []byte) *bytesLiteral { - return &bytesLiteral{bb: bb} +// NewBytesLiteral creates a new instance of BytesLiteral with the provided slice of bytes. +func NewBytesLiteral(bb []byte) *BytesLiteral { + return &BytesLiteral{bb: bb} } -func (b *bytesLiteral) Bytes() [][]byte { +// Bytes returns a 2D slice of bytes where the inner slice contains the byte slice stored in the BytesLiteral. +func (b *BytesLiteral) Bytes() [][]byte { return [][]byte{b.bb} } -func (b *bytesLiteral) Equal(expr Expr) bool { - if other, ok := expr.(*bytesLiteral); ok { +// Equal checks if the current BytesLiteral is equal to the provided Expr. +func (b *BytesLiteral) Equal(expr Expr) bool { + if other, ok := expr.(*BytesLiteral); ok { return bytes.Equal(other.bb, b.bb) } return false } -func (b *bytesLiteral) DataType() int32 { +// DataType returns the data type of BytesLiteral. +func (b *BytesLiteral) DataType() int32 { return int32(databasev1.TagType_TAG_TYPE_DATA_BINARY) } -func (b *bytesLiteral) String() string { +// String converts the BytesLiteral's slice of bytes to a string representation. +func (b *BytesLiteral) String() string { return hex.EncodeToString(b.bb) } +// Elements returns a slice containing the string representation of the byte slice. +func (b *BytesLiteral) Elements() []string { + return []string{hex.EncodeToString(b.bb)} +} + var ( _ LiteralExpr = (*nullLiteral)(nil) _ ComparableExpr = (*nullLiteral)(nil) @@ -332,6 +381,10 @@ var ( type nullLiteral struct{} +func newNullLiteral() *nullLiteral { + return nullLiteralExpr +} + func (s nullLiteral) Compare(_ LiteralExpr) (int, bool) { return 0, false } @@ -359,3 +412,7 @@ func (s nullLiteral) DataType() int32 { func (s nullLiteral) String() string { return "null" } + +func (s nullLiteral) Elements() []string { + return []string{"null"} +} diff --git a/pkg/query/logical/interface.go b/pkg/query/logical/interface.go index dc002cad4..0634473aa 100644 --- a/pkg/query/logical/interface.go +++ b/pkg/query/logical/interface.go @@ -37,6 +37,7 @@ type Plan interface { // Expr represents a predicate in criteria. type Expr interface { fmt.Stringer + Elements() []string DataType() int32 Equal(Expr) bool } diff --git a/pkg/query/logical/measure/measure_plan_indexscan_local.go b/pkg/query/logical/measure/measure_plan_indexscan_local.go index 619d49719..a833023fd 100644 --- a/pkg/query/logical/measure/measure_plan_indexscan_local.go +++ b/pkg/query/logical/measure/measure_plan_indexscan_local.go @@ -27,7 +27,7 @@ import ( commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1" measurev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/measure/v1" modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1" - "github.com/apache/skywalking-banyandb/pkg/index" + "github.com/apache/skywalking-banyandb/pkg/index/inverted" "github.com/apache/skywalking-banyandb/pkg/logger" pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1" "github.com/apache/skywalking-banyandb/pkg/query" @@ -87,7 +87,7 @@ func (uis *unresolvedIndexScan) Analyze(s logical.Schema) (logical.Plan, error) // fill AnyEntry by default entity[idx] = pbv1.AnyTagValue } - filter, entities, err := logical.BuildLocalFilter(uis.criteria, s, entityMap, entity, true) + query, entities, _, err := inverted.BuildLocalQuery(uis.criteria, s, entityMap, entity) if err != nil { return nil, err } @@ -100,7 +100,7 @@ func (uis *unresolvedIndexScan) Analyze(s logical.Schema) (logical.Plan, error) projectionTagsRefs: projTagsRefs, projectionFieldsRefs: projFieldRefs, metadata: uis.metadata, - filter: filter, + query: query, entities: entities, groupByEntity: uis.groupByEntity, uis: uis, @@ -114,7 +114,7 @@ var ( ) type localIndexScan struct { - filter index.Filter + query *inverted.Query schema logical.Schema uis *unresolvedIndexScan order *logical.OrderBy @@ -155,7 +155,7 @@ func (i *localIndexScan) Execute(ctx context.Context) (mit executor.MIterator, e Name: i.metadata.GetName(), TimeRange: &i.timeRange, Entities: i.entities, - Filter: i.filter, + Query: i.query, OrderByType: orderByType, Order: orderBy, TagProjection: i.projectionTags, @@ -172,7 +172,7 @@ func (i *localIndexScan) Execute(ctx context.Context) (mit executor.MIterator, e func (i *localIndexScan) String() string { return fmt.Sprintf("IndexScan: startTime=%d,endTime=%d,Metadata{group=%s,name=%s},conditions=%s; projection=%s; order=%s;", i.timeRange.Start.Unix(), i.timeRange.End.Unix(), i.metadata.GetGroup(), i.metadata.GetName(), - i.filter, logical.FormatTagRefs(", ", i.projectionTagsRefs...), i.order) + i.query, logical.FormatTagRefs(", ", i.projectionTagsRefs...), i.order) } func (i *localIndexScan) Children() []logical.Plan { diff --git a/pkg/query/logical/parser.go b/pkg/query/logical/parser.go new file mode 100644 index 000000000..68f8ad47c --- /dev/null +++ b/pkg/query/logical/parser.go @@ -0,0 +1,159 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package logical + +import ( + "github.com/pkg/errors" + + modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1" + pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1" +) + +// ParseExprOrEntity parses the condition and returns the literal expression or the entities. +func ParseExprOrEntity(entityDict map[string]int, entity []*modelv1.TagValue, cond *modelv1.Condition) (LiteralExpr, [][]*modelv1.TagValue, error) { + entityIdx, ok := entityDict[cond.Name] + if ok && cond.Op != modelv1.Condition_BINARY_OP_EQ && cond.Op != modelv1.Condition_BINARY_OP_IN { + return nil, nil, errors.WithMessagef(ErrUnsupportedConditionOp, "tag belongs to the entity only supports EQ or IN operation in condition(%v)", cond) + } + switch v := cond.Value.Value.(type) { + case *modelv1.TagValue_Str: + if ok { + parsedEntity := make([]*modelv1.TagValue, len(entity)) + copy(parsedEntity, entity) + parsedEntity[entityIdx] = cond.Value + return nil, [][]*modelv1.TagValue{parsedEntity}, nil + } + return str(v.Str.GetValue()), nil, nil + case *modelv1.TagValue_StrArray: + if ok && cond.Op == modelv1.Condition_BINARY_OP_IN { + entities := make([][]*modelv1.TagValue, len(v.StrArray.Value)) + for i, va := range v.StrArray.Value { + parsedEntity := make([]*modelv1.TagValue, len(entity)) + copy(parsedEntity, entity) + parsedEntity[entityIdx] = &modelv1.TagValue{ + Value: &modelv1.TagValue_Str{ + Str: &modelv1.Str{ + Value: va, + }, + }, + } + entities[i] = parsedEntity + } + return nil, entities, nil + } + return newStrArrLiteral(v.StrArray.GetValue()), nil, nil + case *modelv1.TagValue_Int: + if ok { + parsedEntity := make([]*modelv1.TagValue, len(entity)) + copy(parsedEntity, entity) + parsedEntity[entityIdx] = cond.Value + return nil, [][]*modelv1.TagValue{parsedEntity}, nil + } + return newInt64Literal(v.Int.GetValue()), nil, nil + case *modelv1.TagValue_IntArray: + if ok && cond.Op == modelv1.Condition_BINARY_OP_IN { + entities := make([][]*modelv1.TagValue, len(v.IntArray.Value)) + for i, va := range v.IntArray.Value { + parsedEntity := make([]*modelv1.TagValue, len(entity)) + copy(parsedEntity, entity) + parsedEntity[entityIdx] = &modelv1.TagValue{ + Value: &modelv1.TagValue_Int{ + Int: &modelv1.Int{ + Value: va, + }, + }, + } + entities[i] = parsedEntity + } + return nil, entities, nil + } + return newInt64ArrLiteral(v.IntArray.GetValue()), nil, nil + case *modelv1.TagValue_Null: + return newNullLiteral(), nil, nil + } + return nil, nil, errors.WithMessagef(ErrUnsupportedConditionValue, "index filter parses %v", cond) +} + +// ParseEntities merges entities based on the logical operation. +func ParseEntities(op modelv1.LogicalExpression_LogicalOp, input []*modelv1.TagValue, left, right [][]*modelv1.TagValue) [][]*modelv1.TagValue { + count := len(input) + result := make([]*modelv1.TagValue, count) + anyEntity := func(entities [][]*modelv1.TagValue) bool { + for _, entity := range entities { + for _, entry := range entity { + if entry != pbv1.AnyTagValue { + return false + } + } + } + return true + } + leftAny := anyEntity(left) + rightAny := anyEntity(right) + + mergedEntities := make([][]*modelv1.TagValue, 0, len(left)+len(right)) + + switch op { + case modelv1.LogicalExpression_LOGICAL_OP_AND: + if leftAny && !rightAny { + return right + } + if !leftAny && rightAny { + return left + } + mergedEntities = append(mergedEntities, left...) + mergedEntities = append(mergedEntities, right...) + for i := 0; i < count; i++ { + entry := pbv1.AnyTagValue + for j := 0; j < len(mergedEntities); j++ { + e := mergedEntities[j][i] + if e == pbv1.AnyTagValue { + continue + } + if entry == pbv1.AnyTagValue { + entry = e + } else if pbv1.MustCompareTagValue(entry, e) != 0 { + return nil + } + } + result[i] = entry + } + case modelv1.LogicalExpression_LOGICAL_OP_OR: + if leftAny { + return left + } + if rightAny { + return right + } + mergedEntities = append(mergedEntities, left...) + mergedEntities = append(mergedEntities, right...) + for i := 0; i < count; i++ { + entry := pbv1.AnyTagValue + for j := 0; j < len(mergedEntities); j++ { + e := mergedEntities[j][i] + if entry == pbv1.AnyTagValue { + entry = e + } else if pbv1.MustCompareTagValue(entry, e) != 0 { + return mergedEntities + } + } + result[i] = entry + } + } + return [][]*modelv1.TagValue{result} +} diff --git a/pkg/query/logical/index_filter.go b/pkg/query/logical/stream/index_filter.go similarity index 66% rename from pkg/query/logical/index_filter.go rename to pkg/query/logical/stream/index_filter.go index 8450c94ed..533000f13 100644 --- a/pkg/query/logical/index_filter.go +++ b/pkg/query/logical/stream/index_filter.go @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -package logical +package stream import ( "bytes" @@ -28,25 +28,15 @@ import ( "github.com/apache/skywalking-banyandb/api/common" databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1" modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1" + "github.com/apache/skywalking-banyandb/pkg/convert" "github.com/apache/skywalking-banyandb/pkg/index" "github.com/apache/skywalking-banyandb/pkg/index/posting" - pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1" + "github.com/apache/skywalking-banyandb/pkg/query/logical" ) -var errInvalidLogicalExpression = errors.New("invalid logical expression") - -// GlobalIndexError represents a index rule is "global". -// The local filter can't handle it. -type GlobalIndexError struct { - IndexRule *databasev1.IndexRule - Expr LiteralExpr -} - -func (g GlobalIndexError) Error() string { return g.IndexRule.String() } - -// BuildLocalFilter returns a new index.Filter for local indices. -func BuildLocalFilter(criteria *modelv1.Criteria, schema Schema, entityDict map[string]int, - entity []*modelv1.TagValue, mandatoryIndexRule bool, +// buildLocalFilter returns a new index.Filter for local indices. +func buildLocalFilter(criteria *modelv1.Criteria, schema logical.Schema, + entityDict map[string]int, entity []*modelv1.TagValue, ) (index.Filter, [][]*modelv1.TagValue, error) { if criteria == nil { return nil, [][]*modelv1.TagValue{entity}, nil @@ -54,7 +44,7 @@ func BuildLocalFilter(criteria *modelv1.Criteria, schema Schema, entityDict map[ switch criteria.GetExp().(type) { case *modelv1.Criteria_Condition: cond := criteria.GetCondition() - expr, parsedEntity, err := parseExprOrEntity(entityDict, entity, cond) + expr, parsedEntity, err := logical.ParseExprOrEntity(entityDict, entity, cond) if err != nil { return nil, nil, err } @@ -62,31 +52,29 @@ func BuildLocalFilter(criteria *modelv1.Criteria, schema Schema, entityDict map[ return nil, parsedEntity, nil } if ok, indexRule := schema.IndexDefined(cond.Name); ok { - return parseCondition(cond, indexRule, expr, entity) - } else if mandatoryIndexRule { - return nil, nil, errors.Wrapf(errUnsupportedConditionOp, "mandatory index rule conf:%s", cond) + return parseConditionToFilter(cond, indexRule, expr, entity) } return ENode, [][]*modelv1.TagValue{entity}, nil case *modelv1.Criteria_Le: le := criteria.GetLe() if le.GetLeft() == nil && le.GetRight() == nil { - return nil, nil, errors.WithMessagef(errInvalidLogicalExpression, "both sides(left and right) of [%v] are empty", criteria) + return nil, nil, errors.WithMessagef(logical.ErrInvalidLogicalExpression, "both sides(left and right) of [%v] are empty", criteria) } if le.GetLeft() == nil { - return BuildLocalFilter(le.Right, schema, entityDict, entity, mandatoryIndexRule) + return buildLocalFilter(le.Right, schema, entityDict, entity) } if le.GetRight() == nil { - return BuildLocalFilter(le.Left, schema, entityDict, entity, mandatoryIndexRule) + return buildLocalFilter(le.Left, schema, entityDict, entity) } - left, leftEntities, err := BuildLocalFilter(le.Left, schema, entityDict, entity, mandatoryIndexRule) + left, leftEntities, err := buildLocalFilter(le.Left, schema, entityDict, entity) if err != nil { return nil, nil, err } - right, rightEntities, err := BuildLocalFilter(le.Right, schema, entityDict, entity, mandatoryIndexRule) + right, rightEntities, err := buildLocalFilter(le.Right, schema, entityDict, entity) if err != nil { return nil, nil, err } - entities := parseEntities(le.Op, entity, leftEntities, rightEntities) + entities := logical.ParseEntities(le.Op, entity, leftEntities, rightEntities) if entities == nil { return nil, nil, nil } @@ -110,10 +98,12 @@ func BuildLocalFilter(criteria *modelv1.Criteria, schema Schema, entityDict map[ return or, entities, nil } } - return nil, nil, errInvalidCriteriaType + return nil, nil, logical.ErrInvalidCriteriaType } -func parseCondition(cond *modelv1.Condition, indexRule *databasev1.IndexRule, expr LiteralExpr, entity []*modelv1.TagValue) (index.Filter, [][]*modelv1.TagValue, error) { +func parseConditionToFilter(cond *modelv1.Condition, indexRule *databasev1.IndexRule, + expr logical.LiteralExpr, entity []*modelv1.TagValue, +) (index.Filter, [][]*modelv1.TagValue, error) { switch cond.Op { case modelv1.Condition_BINARY_OP_GT: return newRange(indexRule, index.RangeOpts{ @@ -147,7 +137,7 @@ func parseCondition(cond *modelv1.Condition, indexRule *databasev1.IndexRule, ex } and := newAnd(l) for _, b := range bb { - and.append(newEq(indexRule, newBytesLiteral(b))) + and.append(newEq(indexRule, logical.NewBytesLiteral(b))) } return and, [][]*modelv1.TagValue{entity}, nil case modelv1.Condition_BINARY_OP_NOT_HAVING: @@ -158,7 +148,7 @@ func parseCondition(cond *modelv1.Condition, indexRule *databasev1.IndexRule, ex } and := newAnd(l) for _, b := range bb { - and.append(newEq(indexRule, newBytesLiteral(b))) + and.append(newEq(indexRule, logical.NewBytesLiteral(b))) } return newNot(indexRule, and), [][]*modelv1.TagValue{entity}, nil case modelv1.Condition_BINARY_OP_IN: @@ -169,7 +159,7 @@ func parseCondition(cond *modelv1.Condition, indexRule *databasev1.IndexRule, ex } or := newOr(l) for _, b := range bb { - or.append(newEq(indexRule, newBytesLiteral(b))) + or.append(newEq(indexRule, logical.NewBytesLiteral(b))) } return or, [][]*modelv1.TagValue{entity}, nil case modelv1.Condition_BINARY_OP_NOT_IN: @@ -180,149 +170,11 @@ func parseCondition(cond *modelv1.Condition, indexRule *databasev1.IndexRule, ex } or := newOr(l) for _, b := range bb { - or.append(newEq(indexRule, newBytesLiteral(b))) + or.append(newEq(indexRule, logical.NewBytesLiteral(b))) } return newNot(indexRule, or), [][]*modelv1.TagValue{entity}, nil } - return nil, nil, errors.WithMessagef(errUnsupportedConditionOp, "index filter parses %v", cond) -} - -func parseExprOrEntity(entityDict map[string]int, entity []*modelv1.TagValue, cond *modelv1.Condition) (LiteralExpr, [][]*modelv1.TagValue, error) { - entityIdx, ok := entityDict[cond.Name] - if ok && cond.Op != modelv1.Condition_BINARY_OP_EQ && cond.Op != modelv1.Condition_BINARY_OP_IN { - return nil, nil, errors.WithMessagef(errUnsupportedConditionOp, "tag belongs to the entity only supports EQ or IN operation in condition(%v)", cond) - } - switch v := cond.Value.Value.(type) { - case *modelv1.TagValue_Str: - if ok { - parsedEntity := make([]*modelv1.TagValue, len(entity)) - copy(parsedEntity, entity) - parsedEntity[entityIdx] = cond.Value - return nil, [][]*modelv1.TagValue{parsedEntity}, nil - } - return str(v.Str.GetValue()), nil, nil - case *modelv1.TagValue_StrArray: - if ok && cond.Op == modelv1.Condition_BINARY_OP_IN { - entities := make([][]*modelv1.TagValue, len(v.StrArray.Value)) - for i, va := range v.StrArray.Value { - parsedEntity := make([]*modelv1.TagValue, len(entity)) - copy(parsedEntity, entity) - parsedEntity[entityIdx] = &modelv1.TagValue{ - Value: &modelv1.TagValue_Str{ - Str: &modelv1.Str{ - Value: va, - }, - }, - } - entities[i] = parsedEntity - } - return nil, entities, nil - } - return &strArrLiteral{ - arr: v.StrArray.GetValue(), - }, nil, nil - case *modelv1.TagValue_Int: - if ok { - parsedEntity := make([]*modelv1.TagValue, len(entity)) - copy(parsedEntity, entity) - parsedEntity[entityIdx] = cond.Value - return nil, [][]*modelv1.TagValue{parsedEntity}, nil - } - return &int64Literal{ - int64: v.Int.GetValue(), - }, nil, nil - case *modelv1.TagValue_IntArray: - if ok && cond.Op == modelv1.Condition_BINARY_OP_IN { - entities := make([][]*modelv1.TagValue, len(v.IntArray.Value)) - for i, va := range v.IntArray.Value { - parsedEntity := make([]*modelv1.TagValue, len(entity)) - copy(parsedEntity, entity) - parsedEntity[entityIdx] = &modelv1.TagValue{ - Value: &modelv1.TagValue_Int{ - Int: &modelv1.Int{ - Value: va, - }, - }, - } - entities[i] = parsedEntity - } - return nil, entities, nil - } - return &int64ArrLiteral{ - arr: v.IntArray.GetValue(), - }, nil, nil - case *modelv1.TagValue_Null: - return nullLiteralExpr, nil, nil - } - return nil, nil, errors.WithMessagef(errUnsupportedConditionValue, "index filter parses %v", cond) -} - -func parseEntities(op modelv1.LogicalExpression_LogicalOp, input []*modelv1.TagValue, left, right [][]*modelv1.TagValue) [][]*modelv1.TagValue { - count := len(input) - result := make([]*modelv1.TagValue, count) - anyEntity := func(entities [][]*modelv1.TagValue) bool { - for _, entity := range entities { - for _, entry := range entity { - if entry != pbv1.AnyTagValue { - return false - } - } - } - return true - } - leftAny := anyEntity(left) - rightAny := anyEntity(right) - - mergedEntities := make([][]*modelv1.TagValue, 0, len(left)+len(right)) - - switch op { - case modelv1.LogicalExpression_LOGICAL_OP_AND: - if leftAny && !rightAny { - return right - } - if !leftAny && rightAny { - return left - } - mergedEntities = append(mergedEntities, left...) - mergedEntities = append(mergedEntities, right...) - for i := 0; i < count; i++ { - entry := pbv1.AnyTagValue - for j := 0; j < len(mergedEntities); j++ { - e := mergedEntities[j][i] - if e == pbv1.AnyTagValue { - continue - } - if entry == pbv1.AnyTagValue { - entry = e - } else if pbv1.MustCompareTagValue(entry, e) != 0 { - return nil - } - } - result[i] = entry - } - case modelv1.LogicalExpression_LOGICAL_OP_OR: - if leftAny { - return left - } - if rightAny { - return right - } - mergedEntities = append(mergedEntities, left...) - mergedEntities = append(mergedEntities, right...) - for i := 0; i < count; i++ { - entry := pbv1.AnyTagValue - for j := 0; j < len(mergedEntities); j++ { - e := mergedEntities[j][i] - if entry == pbv1.AnyTagValue { - entry = e - } else if pbv1.MustCompareTagValue(entry, e) != 0 { - return mergedEntities - } - } - result[i] = entry - } - } - return [][]*modelv1.TagValue{result} + return nil, nil, errors.WithMessagef(logical.ErrUnsupportedConditionOp, "index filter parses %v", cond) } type fieldKey struct { @@ -420,7 +272,7 @@ func (an *andNode) MarshalJSON() ([]byte, error) { } func (an *andNode) String() string { - return jsonToString(an) + return convert.JSONToString(an) } type orNode struct { @@ -465,13 +317,13 @@ func (on *orNode) MarshalJSON() ([]byte, error) { } func (on *orNode) String() string { - return jsonToString(on) + return convert.JSONToString(on) } type leaf struct { index.Filter Key fieldKey - Expr LiteralExpr + Expr logical.LiteralExpr } func (l *leaf) MarshalJSON() ([]byte, error) { @@ -518,14 +370,14 @@ func (n *not) MarshalJSON() ([]byte, error) { } func (n *not) String() string { - return jsonToString(n) + return convert.JSONToString(n) } type eq struct { *leaf } -func newEq(indexRule *databasev1.IndexRule, values LiteralExpr) *eq { +func newEq(indexRule *databasev1.IndexRule, values logical.LiteralExpr) *eq { return &eq{ leaf: &leaf{ Key: newFieldKey(indexRule), @@ -552,14 +404,14 @@ func (eq *eq) MarshalJSON() ([]byte, error) { } func (eq *eq) String() string { - return jsonToString(eq) + return convert.JSONToString(eq) } type match struct { *leaf } -func newMatch(indexRule *databasev1.IndexRule, values LiteralExpr) *match { +func newMatch(indexRule *databasev1.IndexRule, values logical.LiteralExpr) *match { return &match{ leaf: &leaf{ Key: newFieldKey(indexRule), @@ -591,7 +443,7 @@ func (match *match) MarshalJSON() ([]byte, error) { } func (match *match) String() string { - return jsonToString(match) + return convert.JSONToString(match) } type rangeOp struct { @@ -642,15 +494,7 @@ func (r *rangeOp) MarshalJSON() ([]byte, error) { } func (r *rangeOp) String() string { - return jsonToString(r) -} - -func jsonToString(marshaler json.Marshaler) string { - bb, err := marshaler.MarshalJSON() - if err != nil { - return err.Error() - } - return string(bb) + return convert.JSONToString(r) } var ( diff --git a/pkg/query/logical/stream/stream_plan_tag_filter.go b/pkg/query/logical/stream/stream_plan_tag_filter.go index a861dcfa3..e7acf94e7 100644 --- a/pkg/query/logical/stream/stream_plan_tag_filter.go +++ b/pkg/query/logical/stream/stream_plan_tag_filter.go @@ -55,7 +55,7 @@ func (uis *unresolvedTagFilter) Analyze(s logical.Schema) (logical.Plan, error) entity[idx] = pbv1.AnyTagValue } var err error - ctx.filter, ctx.entities, err = logical.BuildLocalFilter(uis.criteria, s, entityDict, entity, false) + ctx.filter, ctx.entities, err = buildLocalFilter(uis.criteria, s, entityDict, entity) if err != nil { return nil, err } diff --git a/pkg/query/logical/tag_filter.go b/pkg/query/logical/tag_filter.go index 0936e1108..445ddfcc1 100644 --- a/pkg/query/logical/tag_filter.go +++ b/pkg/query/logical/tag_filter.go @@ -25,6 +25,7 @@ import ( "github.com/pkg/errors" modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1" + "github.com/apache/skywalking-banyandb/pkg/convert" ) var errUnsupportedLogicalOperation = errors.New("unsupported logical operation") @@ -122,7 +123,7 @@ func BuildTagFilter(criteria *modelv1.Criteria, entityDict map[string]int, index return or, nil } } - return nil, errInvalidCriteriaType + return nil, ErrInvalidCriteriaType } func parseFilter(cond *modelv1.Condition, expr ComparableExpr) (TagFilter, error) { @@ -158,7 +159,7 @@ func parseFilter(cond *modelv1.Condition, expr ComparableExpr) (TagFilter, error case modelv1.Condition_BINARY_OP_NOT_IN: return newNotTag(newInTag(cond.Name, expr)), nil default: - return nil, errors.WithMessagef(errUnsupportedConditionOp, "tag filter parses %v", cond) + return nil, errors.WithMessagef(ErrUnsupportedConditionOp, "tag filter parses %v", cond) } } @@ -181,7 +182,7 @@ func parseExpr(value *modelv1.TagValue) (ComparableExpr, error) { case *modelv1.TagValue_Null: return nullLiteralExpr, nil } - return nil, errors.WithMessagef(errUnsupportedConditionValue, "tag filter parses %v", value) + return nil, errors.WithMessagef(ErrUnsupportedConditionValue, "tag filter parses %v", value) } // DummyFilter matches any predicate. @@ -261,7 +262,7 @@ func (an *andLogicalNode) MarshalJSON() ([]byte, error) { } func (an *andLogicalNode) String() string { - return jsonToString(an) + return convert.JSONToString(an) } type orLogicalNode struct { @@ -296,7 +297,7 @@ func (on *orLogicalNode) MarshalJSON() ([]byte, error) { } func (on *orLogicalNode) String() string { - return jsonToString(on) + return convert.JSONToString(on) } type tagLeaf struct { @@ -338,7 +339,7 @@ func (n *notTag) MarshalJSON() ([]byte, error) { } func (n *notTag) String() string { - return jsonToString(n) + return convert.JSONToString(n) } type inTag struct { @@ -390,7 +391,7 @@ func (eq *eqTag) MarshalJSON() ([]byte, error) { } func (eq *eqTag) String() string { - return jsonToString(eq) + return convert.JSONToString(eq) } type rangeOpts struct { @@ -480,7 +481,7 @@ func (r *rangeTag) MarshalJSON() ([]byte, error) { } func (r *rangeTag) String() string { - return jsonToString(r) + return convert.JSONToString(r) } func tagExpr(accessor TagValueIndexAccessor, registry TagSpecRegistry, tagName string) (ComparableExpr, error) { @@ -520,5 +521,5 @@ func (h *havingTag) MarshalJSON() ([]byte, error) { } func (h *havingTag) String() string { - return jsonToString(h) + return convert.JSONToString(h) } diff --git a/pkg/query/model/model.go b/pkg/query/model/model.go index 539508e4d..20e98659a 100644 --- a/pkg/query/model/model.go +++ b/pkg/query/model/model.go @@ -25,6 +25,7 @@ import ( databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1" modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1" "github.com/apache/skywalking-banyandb/pkg/index" + "github.com/apache/skywalking-banyandb/pkg/index/inverted" "github.com/apache/skywalking-banyandb/pkg/timestamp" ) @@ -72,7 +73,7 @@ const ( // MeasureQueryOptions is the options of a measure query. type MeasureQueryOptions struct { - Filter index.Filter + Query *inverted.Query TimeRange *timestamp.TimeRange Order *OrderBy Name string