Skip to content

Commit

Permalink
Support duplicated data for Stream (#485)
Browse files Browse the repository at this point in the history
* Support duplicated data

* Bump storage versions

---------

Signed-off-by: Gao Hongtao <[email protected]>
  • Loading branch information
hanahmily authored Jul 14, 2024
1 parent b12cf80 commit 8993456
Show file tree
Hide file tree
Showing 60 changed files with 2,002 additions and 1,099 deletions.
5 changes: 5 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@ Release Notes.

## 0.7.0

### File System Changes

- Bump up the version of the file system to 1.1.0 which is not compatible with the previous version.

### Features

- Check unregistered nodes in background.
Expand All @@ -20,6 +24,7 @@ Release Notes.
- Fix the bug that the data node can't re-register to etcd when the connection is lost.
- Fix memory leak in sorting the stream by the inverted index.
- Fix the wrong array flags parsing in command line. The array flags should be parsed by "StringSlice" instead of "StringArray".
- Fix a bug that the Stream module didn't support duplicated in index-based filtering and sorting

## 0.6.1

Expand Down
4 changes: 3 additions & 1 deletion banyand/dquery/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,9 @@ func (p *streamQueryProcessor) Rev(message bus.Message) (resp bus.Message) {
if p.log.Debug().Enabled() {
p.log.Debug().Str("plan", plan.String()).Msg("query plan")
}
entities, err := plan.(executor.StreamExecutable).Execute(executor.WithDistributedExecutionContext(context.Background(), &distributedContext{
se := plan.(executor.StreamExecutable)
defer se.Close()
entities, err := se.Execute(executor.WithDistributedExecutionContext(context.Background(), &distributedContext{
Broadcaster: p.broadcaster,
timeRange: queryCriteria.TimeRange,
}))
Expand Down
9 changes: 1 addition & 8 deletions banyand/internal/storage/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,16 +76,9 @@ func newSeriesIndex(ctx context.Context, path string, startTime time.Time, flush
}

func (s *seriesIndex) Write(docs index.Documents) error {
applied := make(chan struct{})
err := s.store.Batch(index.Batch{
return s.store.Batch(index.Batch{
Documents: docs,
Applied: applied,
})
if err != nil {
return err
}
<-applied
return nil
}

var rangeOpts = index.RangeOpts{}
Expand Down
2 changes: 1 addition & 1 deletion banyand/internal/storage/version.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import (

const (
metadataFilename = "metadata"
currentVersion = "1.0.0"
currentVersion = "1.1.0"
compatibleVersionsKey = "versions"
compatibleVersionsFilename = "versions.yml"
)
Expand Down
2 changes: 1 addition & 1 deletion banyand/internal/storage/versions.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,4 @@
# limitations under the License.

versions:
- 1.0.0
- 1.1.0
10 changes: 10 additions & 0 deletions banyand/measure/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package measure

import (
"slices"
"sort"
"sync"

Expand Down Expand Up @@ -478,6 +479,10 @@ func (bc *blockCursor) copyAllTo(r *pbv1.MeasureResult, entityValuesAll map[comm
r.SID = bc.bm.seriesID
r.Timestamps = append(r.Timestamps, bc.timestamps[idx:offset]...)
r.Versions = append(r.Versions, bc.versions[idx:offset]...)
if desc {
slices.Reverse(r.Timestamps)
slices.Reverse(r.Versions)
}
var entityValues map[string]*modelv1.TagValue
if entityValuesAll != nil {
entityValues = entityValuesAll[r.SID]
Expand Down Expand Up @@ -537,6 +542,8 @@ OUTER:
for i := 0; i < size; i++ {
t.Values[i] = pbv1.NullTagValue
}
} else if desc {
slices.Reverse(t.Values)
}
tf.Tags = append(tf.Tags, t)
}
Expand All @@ -549,6 +556,9 @@ OUTER:
for _, v := range c.values[idx:offset] {
f.Values = append(f.Values, mustDecodeFieldValue(c.valueType, v))
}
if desc {
slices.Reverse(f.Values)
}
r.Fields = append(r.Fields, f)
}
}
Expand Down
2 changes: 2 additions & 0 deletions banyand/measure/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -424,6 +424,7 @@ func (qr *queryResult) Pull() *pbv1.MeasureResult {
bc := qr.data[0]
bc.copyAllTo(r, qr.entityValues, qr.tagProjection, qr.orderByTimestampDesc())
qr.data = qr.data[:0]
releaseBlockCursor(bc)
return r
}
return qr.merge(qr.entityValues, qr.tagProjection)
Expand Down Expand Up @@ -490,6 +491,7 @@ func (qr *queryResult) Pop() interface{} {
n := len(old)
x := old[n-1]
qr.data = old[0 : n-1]
releaseBlockCursor(x)
return x
}

Expand Down
4 changes: 3 additions & 1 deletion banyand/query/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,9 @@ func (p *streamQueryProcessor) Rev(message bus.Message) (resp bus.Message) {
if p.log.Debug().Enabled() {
p.log.Debug().Str("plan", plan.String()).Msg("query plan")
}
entities, err := plan.(executor.StreamExecutable).Execute(executor.WithStreamExecutionContext(context.Background(), ec))
se := plan.(executor.StreamExecutable)
defer se.Close()
entities, err := se.Execute(executor.WithStreamExecutionContext(context.Background(), ec))
if err != nil {
p.log.Error().Err(err).RawJSON("req", logger.Proto(queryCriteria)).Msg("fail to execute the query plan")
resp = bus.NewMessage(bus.MessageID(now), common.NewError("execute the query plan for stream %s: %v", meta.GetName(), err))
Expand Down
5 changes: 3 additions & 2 deletions banyand/stream/benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
"github.com/apache/skywalking-banyandb/banyand/internal/storage"
"github.com/apache/skywalking-banyandb/pkg/convert"
"github.com/apache/skywalking-banyandb/pkg/fs"
"github.com/apache/skywalking-banyandb/pkg/index"
"github.com/apache/skywalking-banyandb/pkg/index/posting"
Expand Down Expand Up @@ -119,7 +120,7 @@ func generateData(p parameter) ([]*elements, []index.Documents, mockIndex) {
es := &elements{
seriesIDs: []common.SeriesID{},
timestamps: []int64{},
elementIDs: []string{},
elementIDs: []uint64{},
tagFamilies: [][]tagValues{},
}
var docs index.Documents
Expand All @@ -129,7 +130,7 @@ func generateData(p parameter) ([]*elements, []index.Documents, mockIndex) {
for k := 1; k <= p.seriesCount; k++ {
elementID := strconv.Itoa(k) + strconv.Itoa(timestamp)
es.seriesIDs = append(es.seriesIDs, common.SeriesID(k))
es.elementIDs = append(es.elementIDs, elementID)
es.elementIDs = append(es.elementIDs, convert.HashStr(elementID))
es.timestamps = append(es.timestamps, unixTimestamp)
num := generateRandomNumber(int64(p.tagCardinality))
value := filterTagValuePrefix + strconv.Itoa(num)
Expand Down
128 changes: 66 additions & 62 deletions banyand/stream/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,23 @@ import (
"sort"
"sync"

"golang.org/x/exp/slices"

"github.com/apache/skywalking-banyandb/api/common"
modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
"github.com/apache/skywalking-banyandb/pkg/bytes"
"github.com/apache/skywalking-banyandb/pkg/convert"
"github.com/apache/skywalking-banyandb/pkg/encoding"
"github.com/apache/skywalking-banyandb/pkg/fs"
"github.com/apache/skywalking-banyandb/pkg/index/posting"
"github.com/apache/skywalking-banyandb/pkg/logger"
pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
"github.com/apache/skywalking-banyandb/pkg/timestamp"
)

type block struct {
timestamps []int64
elementIDs []string
elementIDs []uint64
tagFamilies []tagFamily
}

Expand All @@ -47,7 +52,7 @@ func (b *block) reset() {
b.tagFamilies = tff[:0]
}

func (b *block) mustInitFromElements(timestamps []int64, elementIDs []string, tagFamilies [][]tagValues) {
func (b *block) mustInitFromElements(timestamps []int64, elementIDs []uint64, tagFamilies [][]tagValues) {
b.reset()
size := len(timestamps)
if size == 0 {
Expand Down Expand Up @@ -320,14 +325,14 @@ func mustReadTimestampsFrom(dst []int64, tm *timestampsMetadata, count int, read
return dst
}

func mustWriteElementIDsTo(em *elementIDsMetadata, elementIDs []string, elementIDsWriter *writer) {
func mustWriteElementIDsTo(em *elementIDsMetadata, elementIDs []uint64, elementIDsWriter *writer) {
em.reset()

bb := bigValuePool.Generate()
defer bigValuePool.Release(bb)
elementIDsByteSlice := make([][]byte, len(elementIDs))
for i, elementID := range elementIDs {
elementIDsByteSlice[i] = []byte(elementID)
elementIDsByteSlice[i] = convert.Uint64ToBytes(elementID)
}
bb.Buf = encoding.EncodeBytesBlock(bb.Buf, elementIDsByteSlice)
if len(bb.Buf) > maxElementIDsBlockSize {
Expand All @@ -339,7 +344,7 @@ func mustWriteElementIDsTo(em *elementIDsMetadata, elementIDs []string, elementI
elementIDsWriter.MustWrite(bb.Buf)
}

func mustReadElementIDsFrom(dst []string, em *elementIDsMetadata, count int, reader fs.Reader) []string {
func mustReadElementIDsFrom(dst []uint64, em *elementIDsMetadata, count int, reader fs.Reader) []uint64 {
bb := bigValuePool.Generate()
defer bigValuePool.Release(bb)
bb.Buf = bytes.ResizeExact(bb.Buf, int(em.size))
Expand All @@ -351,7 +356,7 @@ func mustReadElementIDsFrom(dst []string, em *elementIDsMetadata, count int, rea
logger.Panicf("%s: cannot unmarshal elementIDs: %v", reader.Path(), err)
}
for _, elementID := range elementIDsByteSlice {
dst = append(dst, string(elementID))
dst = append(dst, convert.BytesToUint64(elementID))
}
return dst
}
Expand All @@ -372,7 +377,7 @@ func mustSeqReadTimestampsFrom(dst []int64, tm *timestampsMetadata, count int, r
return dst
}

func mustSeqReadElementIDsFrom(dst []string, em *elementIDsMetadata, count int, reader *seqReader) []string {
func mustSeqReadElementIDsFrom(dst []uint64, em *elementIDsMetadata, count int, reader *seqReader) []uint64 {
if em.offset != reader.bytesRead {
logger.Panicf("offset %d must be equal to bytesRead %d", em.offset, reader.bytesRead)
}
Expand All @@ -387,7 +392,7 @@ func mustSeqReadElementIDsFrom(dst []string, em *elementIDsMetadata, count int,
logger.Panicf("%s: cannot unmarshal elementIDs: %v", reader.Path(), err)
}
for _, elementID := range elementIDsByteSlice {
dst = append(dst, string(elementID))
dst = append(dst, convert.BytesToUint64(elementID))
}
return dst
}
Expand All @@ -408,17 +413,17 @@ func releaseBlock(b *block) {
var blockPool sync.Pool

type blockCursor struct {
p *part
timestamps []int64
filteredTimestamps []int64
elementIDs []string
tagFamilies []tagFamily
tagValuesDecoder encoding.BytesBlockDecoder
tagProjection []pbv1.TagProjection
bm blockMetadata
idx int
minTimestamp int64
maxTimestamp int64
p *part
timestamps []int64
elementFilter posting.List
elementIDs []uint64
tagFamilies []tagFamily
tagValuesDecoder encoding.BytesBlockDecoder
tagProjection []pbv1.TagProjection
bm blockMetadata
idx int
minTimestamp int64
maxTimestamp int64
}

func (bc *blockCursor) reset() {
Expand Down Expand Up @@ -446,54 +451,55 @@ func (bc *blockCursor) init(p *part, bm *blockMetadata, opts queryOptions) {
bc.minTimestamp = opts.minTimestamp
bc.maxTimestamp = opts.maxTimestamp
bc.tagProjection = opts.TagProjection
seriesID := bc.bm.seriesID
if opts.filteredRefMap != nil {
bc.filteredTimestamps = opts.filteredRefMap[seriesID]
}
bc.elementFilter = opts.elementFilter
}

func (bc *blockCursor) copyAllTo(r *pbv1.StreamResult, desc bool) {
var idx, offset int
if desc {
idx = 0
offset = bc.idx + 1
} else {
idx = bc.idx
offset = len(bc.timestamps)
start, end := 0, bc.idx+1
if !desc {
start, end = bc.idx, len(bc.timestamps)
}
if offset <= idx {
if end <= start {
return
}
r.Timestamps = append(r.Timestamps, bc.timestamps[idx:offset]...)
r.ElementIDs = append(r.ElementIDs, bc.elementIDs[idx:offset]...)
for i := idx; i < offset; i++ {
r.SIDs = append(r.SIDs, bc.bm.seriesID)

r.Timestamps = append(r.Timestamps, bc.timestamps[start:end]...)
r.ElementIDs = append(r.ElementIDs, bc.elementIDs[start:end]...)
requiredCapacity := end - start
r.SIDs = append(r.SIDs, make([]common.SeriesID, requiredCapacity)...)
for i := range r.SIDs[len(r.SIDs)-requiredCapacity:] {
r.SIDs[len(r.SIDs)-requiredCapacity+i] = bc.bm.seriesID
}

if desc {
slices.Reverse(r.Timestamps)
slices.Reverse(r.ElementIDs)
}

if len(r.TagFamilies) != len(bc.tagProjection) {
for _, tp := range bc.tagProjection {
tf := pbv1.TagFamily{
Name: tp.Family,
}
for _, n := range tp.Names {
t := pbv1.Tag{
Name: n,
}
tf.Tags = append(tf.Tags, t)
r.TagFamilies = make([]pbv1.TagFamily, len(bc.tagProjection))
for i, tp := range bc.tagProjection {
r.TagFamilies[i] = pbv1.TagFamily{Name: tp.Family, Tags: make([]pbv1.Tag, len(tp.Names))}
for j, n := range tp.Names {
r.TagFamilies[i].Tags[j] = pbv1.Tag{Name: n}
}
r.TagFamilies = append(r.TagFamilies, tf)
}
}

for i, cf := range bc.tagFamilies {
for i2, c := range cf.tags {
if c.values != nil {
for _, v := range c.values[idx:offset] {
r.TagFamilies[i].Tags[i2].Values = append(r.TagFamilies[i].Tags[i2].Values, mustDecodeTagValue(c.valueType, v))
}
} else {
for j := idx; j < offset; j++ {
r.TagFamilies[i].Tags[i2].Values = append(r.TagFamilies[i].Tags[i2].Values, pbv1.NullTagValue)
for j, c := range cf.tags {
values := make([]*modelv1.TagValue, end-start)
for k := start; k < end; k++ {
if c.values != nil {
values[k-start] = mustDecodeTagValue(c.valueType, c.values[k])
} else {
values[k-start] = pbv1.NullTagValue
}
}
if desc {
slices.Reverse(values)
}
r.TagFamilies[i].Tags[j].Values = append(r.TagFamilies[i].Tags[j].Values, values...)
}
}
}
Expand Down Expand Up @@ -552,15 +558,13 @@ func (bc *blockCursor) loadData(tmpBlock *block) bool {

idxList := make([]int, 0)
var start, end int
if bc.filteredTimestamps != nil {
for _, ts := range bc.filteredTimestamps {
idx := timestamp.Find(tmpBlock.timestamps, ts)
if idx == -1 {
continue
if bc.elementFilter != nil {
for i := range tmpBlock.elementIDs {
if bc.elementFilter.Contains(tmpBlock.elementIDs[i]) {
idxList = append(idxList, i)
bc.timestamps = append(bc.timestamps, tmpBlock.timestamps[i])
bc.elementIDs = append(bc.elementIDs, tmpBlock.elementIDs[i])
}
idxList = append(idxList, idx)
bc.timestamps = append(bc.timestamps, tmpBlock.timestamps[idx])
bc.elementIDs = append(bc.elementIDs, tmpBlock.elementIDs[idx])
}
if len(bc.timestamps) == 0 {
return false
Expand Down Expand Up @@ -590,7 +594,7 @@ func (bc *blockCursor) loadData(tmpBlock *block) bool {
logger.Panicf("unexpected number of values for tags %q: got %d; want %d",
tmpBlock.tagFamilies[i].tags[blockIndex].name, len(tmpBlock.tagFamilies[i].tags[blockIndex].values), len(tmpBlock.timestamps))
}
if bc.filteredTimestamps != nil {
if len(idxList) > 0 {
for _, idx := range idxList {
t.values = append(t.values, tmpBlock.tagFamilies[i].tags[blockIndex].values[idx])
}
Expand Down
Loading

0 comments on commit 8993456

Please sign in to comment.