Skip to content

Commit

Permalink
Fix the index mode failure on multi-segments (#558)
Browse files Browse the repository at this point in the history
  • Loading branch information
hanahmily authored Nov 16, 2024
1 parent 4293184 commit 0794832
Show file tree
Hide file tree
Showing 8 changed files with 364 additions and 5 deletions.
3 changes: 3 additions & 0 deletions banyand/internal/storage/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,9 @@ func (s *seriesIndex) filter(ctx context.Context, series []*pbv1.Series,
if err != nil {
return nil, nil, nil, err
}
if len(ss) == 0 {
return nil, nil, nil, nil
}
sl, fields, tss, err = convertIndexSeriesToSeriesList(ss, len(projection) > 0)
if err != nil {
return nil, nil, nil, errors.WithMessagef(err, "failed to convert index series to series list, matchers: %v, matched: %d", seriesMatchers, len(ss))
Expand Down
38 changes: 34 additions & 4 deletions banyand/measure/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ const (
checkDoneEvery = 128
)

var nilResult = model.MeasureQueryResult(nil)

// Query allow to retrieve measure data points.
type Query interface {
LoadGroup(name string) (resourceSchema.Group, bool)
Expand Down Expand Up @@ -91,7 +93,7 @@ func (s *measure) Query(ctx context.Context, mqo model.MeasureQueryOptions) (mqr
tsdb := db.(storage.TSDB[*tsTable, option])
segments := tsdb.SelectSegments(*mqo.TimeRange)
if len(segments) < 1 {
return nil, nil
return nilResult, nil
}

if s.schema.IndexMode {
Expand All @@ -106,7 +108,7 @@ func (s *measure) Query(ctx context.Context, mqo model.MeasureQueryOptions) (mqr
for i := range segments {
segments[i].DecRef()
}
return nil, nil
return nilResult, nil
}
result := queryResult{
ctx: ctx,
Expand Down Expand Up @@ -256,7 +258,7 @@ func (s *measure) searchSeriesList(ctx context.Context, series []*pbv1.Series, m

func (s *measure) buildIndexQueryResult(ctx context.Context, series []*pbv1.Series, mqo model.MeasureQueryOptions,
segments []storage.Segment[*tsTable, option],
) (*indexSortResult, error) {
) (model.MeasureQueryResult, error) {
defer func() {
for i := range segments {
segments[i].DecRef()
Expand Down Expand Up @@ -300,7 +302,7 @@ func (s *measure) buildIndexQueryResult(ctx context.Context, series []*pbv1.Seri
PreloadSize: preloadSize,
Projection: indexProjection,
}

seriesFilter := roaring.NewPostingList()
for i := range segments {
if mqo.TimeRange.Include(segments[i].GetTimeRange()) {
opts.TimeRange = nil
Expand All @@ -312,8 +314,22 @@ func (s *measure) buildIndexQueryResult(ctx context.Context, series []*pbv1.Seri
if err != nil {
return nil, err
}
for j := 0; j < len(sr.sll); j++ {
if seriesFilter.Contains(uint64(sr.sll[j].ID)) {
sr.remove(j)
j--
continue
}
seriesFilter.Insert(uint64(sr.sll[j].ID))
}
if len(sr.sll) < 1 {
continue
}
r.segResults = append(r.segResults, sr)
}
if len(r.segResults) < 1 {
return nilResult, nil
}
heap.Init(&r.segResults)
return r, nil
}
Expand Down Expand Up @@ -804,10 +820,24 @@ type segResult struct {
i int
}

func (sr *segResult) remove(i int) {
sr.sll = append(sr.sll[:i], sr.sll[i+1:]...)
if sr.frl != nil {
sr.frl = append(sr.frl[:i], sr.frl[i+1:]...)
}
sr.timestamps = append(sr.timestamps[:i], sr.timestamps[i+1:]...)
if sr.sortedValues != nil {
sr.sortedValues = append(sr.sortedValues[:i], sr.sortedValues[i+1:]...)
}
}

type segResultHeap []*segResult

func (h segResultHeap) Len() int { return len(h) }
func (h segResultHeap) Less(i, j int) bool {
if h[i].sortedValues == nil {
return h[i].sll[h[i].i].ID < h[j].sll[h[j].i].ID
}
return bytes.Compare(h[i].sortedValues[h[i].i], h[j].sortedValues[h[j].i]) < 0
}
func (h segResultHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] }
Expand Down
4 changes: 3 additions & 1 deletion banyand/stream/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ import (

const checkDoneEvery = 128

var nilResult = model.StreamQueryResult(nil)

func (s *stream) Query(ctx context.Context, sqo model.StreamQueryOptions) (sqr model.StreamQueryResult, err error) {
if sqo.TimeRange == nil || len(sqo.Entities) < 1 {
return nil, errors.New("invalid query options: timeRange and series are required")
Expand All @@ -53,7 +55,7 @@ func (s *stream) Query(ctx context.Context, sqo model.StreamQueryOptions) (sqr m
}
db := s.databaseSupplier.SupplyTSDB()
if db == nil {
return sqr, nil
return nilResult, nil
}
var result queryResult
tsdb := db.(storage.TSDB[*tsTable, option])
Expand Down
1 change: 1 addition & 0 deletions test/cases/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ func Initialize(addr string, now time.Time) {
casesstreamdata.Write(conn, "duplicated", now, 0)
// // measure
interval = time.Minute
casesmeasuredata.Write(conn, "service_traffic", "sw_metric", "service_traffic_data_old.json", now.AddDate(0, 0, -1), interval)
casesmeasuredata.Write(conn, "service_traffic", "sw_metric", "service_traffic_data.json", now, interval)
casesmeasuredata.Write(conn, "service_instance_traffic", "sw_metric", "service_instance_traffic_data.json", now, interval)
casesmeasuredata.Write(conn, "service_cpm_minute", "sw_metric", "service_cpm_minute_data.json", now, interval)
Expand Down
30 changes: 30 additions & 0 deletions test/cases/measure/data/input/index_mode_none.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
# Licensed to Apache Software Foundation (ASF) under one or more contributor
# license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright
# ownership. Apache Software Foundation (ASF) licenses this file to you under
# the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

name: "service_traffic"
groups: [ "sw_metric" ]
tagProjection:
tagFamilies:
- name: "default"
tags: [ "id", "service_id", "name", "short_name", "service_group", "layer" ]
criteria:
condition:
name: "layer"
op: "BINARY_OP_EQ"
value:
int:
value: "-1"
154 changes: 154 additions & 0 deletions test/cases/measure/data/testdata/service_traffic_data_old.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
[
{
"tag_families": [
{
"tags": [
{
"str": {
"value": "1"
}
},
{
"str": {
"value": "service_1_expired"
}
},
{
"str": {
"value": "service_name_1_expired"
}
},
{
"str": {
"value": "service_short_name_1_expired"
}
},
{
"str": {
"value": "group1_expired"
}
},
{
"int": {
"value": 1
}
}
]
}
]
},
{
"tag_families": [
{
"tags": [
{
"str": {
"value": "2"
}
},
{
"str": {
"value": "service_2_expired"
}
},
{
"str": {
"value": "service_name_2_expired"
}
},
{
"str": {
"value": "service_short_name_2_expired"
}
},
{
"str": {
"value": "group1"
}
},
{
"int": {
"value": 2
}
}
]
}
]
},
{
"tag_families": [
{
"tags": [
{
"str": {
"value": "3"
}
},
{
"str": {
"value": "service_3_expired"
}
},
{
"str": {
"value": "service_name_3_expired"
}
},
{
"str": {
"value": "service_short_name_3_expired"
}
},
{
"str": {
"value": "group1_expired"
}
},
{
"int": {
"value": 1
}
}
]
}
]
},
{
"tag_families": [
{
"tags": [
{
"str": {
"value": "4"
}
},
{
"str": {
"value": "service_4"
}
},
{
"str": {
"value": "service_name_4"
}
},
{
"str": {
"value": "service_short_name_4"
}
},
{
"str": {
"value": "group4"
}
},
{
"int": {
"value": 3
}
}
]
}
]
}
]
Loading

0 comments on commit 0794832

Please sign in to comment.