Skip to content

Commit

Permalink
Fix duplicated data points in a writing batch (#536)
Browse files Browse the repository at this point in the history
  • Loading branch information
hanahmily committed Sep 13, 2024
1 parent 59c3968 commit a528a5d
Show file tree
Hide file tree
Showing 11 changed files with 230 additions and 172 deletions.
2 changes: 1 addition & 1 deletion banyand/measure/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,7 @@ func (b *block) unmarshalTagFamilyFromSeqReaders(decoder *encoding.BytesBlockDec
func (b *block) uncompressedSizeBytes() uint64 {
dataPointsCount := uint64(b.Len())

n := dataPointsCount * 8
n := dataPointsCount * (8 + 8) // 8 bytes for timestamp and 8 bytes for version

tff := b.tagFamilies
for i := range tff {
Expand Down
32 changes: 16 additions & 16 deletions banyand/measure/block_reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,40 +45,40 @@ func Test_blockReader_nextBlock(t *testing.T) {
name: "Test with single part",
dpsList: []*dataPoints{dpsTS1},
want: []blockMetadata{
{seriesID: 1, count: 1, uncompressedSizeBytes: 1676},
{seriesID: 2, count: 1, uncompressedSizeBytes: 55},
{seriesID: 3, count: 1, uncompressedSizeBytes: 24},
{seriesID: 1, count: 1, uncompressedSizeBytes: 1684},
{seriesID: 2, count: 1, uncompressedSizeBytes: 63},
{seriesID: 3, count: 1, uncompressedSizeBytes: 32},
},
},
{
name: "Test with multiple parts with different ts",
dpsList: []*dataPoints{dpsTS1, dpsTS2},
want: []blockMetadata{
{seriesID: 1, count: 1, uncompressedSizeBytes: 1676},
{seriesID: 1, count: 1, uncompressedSizeBytes: 1676},
{seriesID: 2, count: 1, uncompressedSizeBytes: 55},
{seriesID: 2, count: 1, uncompressedSizeBytes: 55},
{seriesID: 3, count: 1, uncompressedSizeBytes: 24},
{seriesID: 3, count: 1, uncompressedSizeBytes: 24},
{seriesID: 1, count: 1, uncompressedSizeBytes: 1684},
{seriesID: 1, count: 1, uncompressedSizeBytes: 1684},
{seriesID: 2, count: 1, uncompressedSizeBytes: 63},
{seriesID: 2, count: 1, uncompressedSizeBytes: 63},
{seriesID: 3, count: 1, uncompressedSizeBytes: 32},
{seriesID: 3, count: 1, uncompressedSizeBytes: 32},
},
},
{
name: "Test with a single part with same ts",
dpsList: []*dataPoints{duplicatedDps},
want: []blockMetadata{
{seriesID: 1, count: 1, uncompressedSizeBytes: 24},
{seriesID: 1, count: 1, uncompressedSizeBytes: 32},
},
},
{
name: "Test with multiple parts with same ts",
dpsList: []*dataPoints{dpsTS1, dpsTS1},
want: []blockMetadata{
{seriesID: 1, count: 1, uncompressedSizeBytes: 1676},
{seriesID: 1, count: 1, uncompressedSizeBytes: 1676},
{seriesID: 2, count: 1, uncompressedSizeBytes: 55},
{seriesID: 2, count: 1, uncompressedSizeBytes: 55},
{seriesID: 3, count: 1, uncompressedSizeBytes: 24},
{seriesID: 3, count: 1, uncompressedSizeBytes: 24},
{seriesID: 1, count: 1, uncompressedSizeBytes: 1684},
{seriesID: 1, count: 1, uncompressedSizeBytes: 1684},
{seriesID: 2, count: 1, uncompressedSizeBytes: 63},
{seriesID: 2, count: 1, uncompressedSizeBytes: 63},
{seriesID: 3, count: 1, uncompressedSizeBytes: 32},
{seriesID: 3, count: 1, uncompressedSizeBytes: 32},
},
},
}
Expand Down
2 changes: 1 addition & 1 deletion banyand/measure/merger_policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func newDefaultMergePolicy() *mergePolicy {
}

func newDefaultMergePolicyForTesting() *mergePolicy {
return newMergePolicy(4, 1.7, run.Bytes(math.MaxInt64))
return newMergePolicy(3, 1, run.Bytes(math.MaxInt64))
}

// NewMergePolicy creates a MergePolicy with given parameters.
Expand Down
38 changes: 19 additions & 19 deletions banyand/measure/merger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,43 +260,43 @@ func Test_mergeParts(t *testing.T) {
name: "Test with single part",
dpsList: []*dataPoints{dpsTS1},
want: []blockMetadata{
{seriesID: 1, count: 1, uncompressedSizeBytes: 1676},
{seriesID: 2, count: 1, uncompressedSizeBytes: 55},
{seriesID: 3, count: 1, uncompressedSizeBytes: 24},
{seriesID: 1, count: 1, uncompressedSizeBytes: 1684},
{seriesID: 2, count: 1, uncompressedSizeBytes: 63},
{seriesID: 3, count: 1, uncompressedSizeBytes: 32},
},
},
{
name: "Test with multiple parts with different ts",
dpsList: []*dataPoints{dpsTS1, dpsTS2, dpsTS2},
want: []blockMetadata{
{seriesID: 1, count: 2, uncompressedSizeBytes: 3352},
{seriesID: 2, count: 2, uncompressedSizeBytes: 110},
{seriesID: 3, count: 2, uncompressedSizeBytes: 48},
{seriesID: 1, count: 2, uncompressedSizeBytes: 3368},
{seriesID: 2, count: 2, uncompressedSizeBytes: 126},
{seriesID: 3, count: 2, uncompressedSizeBytes: 64},
},
},
{
name: "Test with multiple parts with same ts",
dpsList: []*dataPoints{dpsTS11, dpsTS1},
want: []blockMetadata{
{seriesID: 1, count: 1, uncompressedSizeBytes: 1676},
{seriesID: 2, count: 1, uncompressedSizeBytes: 55},
{seriesID: 3, count: 1, uncompressedSizeBytes: 24},
{seriesID: 1, count: 1, uncompressedSizeBytes: 1684},
{seriesID: 2, count: 1, uncompressedSizeBytes: 63},
{seriesID: 3, count: 1, uncompressedSizeBytes: 32},
},
},
{
name: "Test with multiple parts with a large quantity of different ts",
dpsList: []*dataPoints{generateHugeDps(1, 5000, 1), generateHugeDps(5001, 10000, 2)},
want: []blockMetadata{
{seriesID: 1, count: 1265, uncompressedSizeBytes: 2120140},
{seriesID: 1, count: 1265, uncompressedSizeBytes: 2120140},
{seriesID: 1, count: 1265, uncompressedSizeBytes: 2120140},
{seriesID: 1, count: 2470, uncompressedSizeBytes: 4139720},
{seriesID: 1, count: 2470, uncompressedSizeBytes: 4139720},
{seriesID: 1, count: 2470, uncompressedSizeBytes: 4139720},
{seriesID: 1, count: 2410, uncompressedSizeBytes: 4039160},
{seriesID: 1, count: 1205, uncompressedSizeBytes: 2019580},
{seriesID: 2, count: 2, uncompressedSizeBytes: 110},
{seriesID: 3, count: 2, uncompressedSizeBytes: 48},
{seriesID: 1, count: 1265, uncompressedSizeBytes: 2130260},
{seriesID: 1, count: 1265, uncompressedSizeBytes: 2130260},
{seriesID: 1, count: 1265, uncompressedSizeBytes: 2130260},
{seriesID: 1, count: 2470, uncompressedSizeBytes: 4159480},
{seriesID: 1, count: 2470, uncompressedSizeBytes: 4159480},
{seriesID: 1, count: 2470, uncompressedSizeBytes: 4159480},
{seriesID: 1, count: 2410, uncompressedSizeBytes: 4058440},
{seriesID: 1, count: 1205, uncompressedSizeBytes: 2029220},
{seriesID: 2, count: 2, uncompressedSizeBytes: 126},
{seriesID: 3, count: 2, uncompressedSizeBytes: 64},
},
},
}
Expand Down
4 changes: 1 addition & 3 deletions banyand/measure/part.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,16 +163,14 @@ func (mp *memPart) mustInitFromDataPoints(dps *dataPoints) {
continue
}
tsPrev = dps.timestamps[i]
} else {
tsPrev = 0
}

if uncompressedBlockSizeBytes >= maxUncompressedBlockSize ||
(i-indexPrev) > maxBlockLength || sid != sidPrev {
bsw.MustWriteDataPoints(sidPrev, dps.timestamps[indexPrev:i], dps.versions[indexPrev:i], dps.tagFamilies[indexPrev:i], dps.fields[indexPrev:i])
sidPrev = sid
tsPrev = 0
indexPrev = i
tsPrev = dps.timestamps[indexPrev]
uncompressedBlockSizeBytes = 0
}
uncompressedBlockSizeBytes += uncompressedDataPointSizeBytes(i, dps)
Expand Down
143 changes: 94 additions & 49 deletions banyand/measure/tstable_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"github.com/apache/skywalking-banyandb/pkg/query/model"
"github.com/apache/skywalking-banyandb/pkg/run"
"github.com/apache/skywalking-banyandb/pkg/test"
"github.com/apache/skywalking-banyandb/pkg/test/flags"
"github.com/apache/skywalking-banyandb/pkg/timestamp"
"github.com/apache/skywalking-banyandb/pkg/watcher"
)
Expand Down Expand Up @@ -194,9 +195,9 @@ func Test_tstIter(t *testing.T) {
minTimestamp: 1,
maxTimestamp: 1,
want: []blockMetadata{
{seriesID: 1, count: 1, uncompressedSizeBytes: 1676},
{seriesID: 2, count: 1, uncompressedSizeBytes: 55},
{seriesID: 3, count: 1, uncompressedSizeBytes: 24},
{seriesID: 1, count: 1, uncompressedSizeBytes: 1684},
{seriesID: 2, count: 1, uncompressedSizeBytes: 63},
{seriesID: 3, count: 1, uncompressedSizeBytes: 32},
},
},
{
Expand All @@ -206,22 +207,24 @@ func Test_tstIter(t *testing.T) {
minTimestamp: 1,
maxTimestamp: 2,
want: []blockMetadata{
{seriesID: 1, count: 1, uncompressedSizeBytes: 1676},
{seriesID: 1, count: 1, uncompressedSizeBytes: 1676},
{seriesID: 2, count: 1, uncompressedSizeBytes: 55},
{seriesID: 2, count: 1, uncompressedSizeBytes: 55},
{seriesID: 3, count: 1, uncompressedSizeBytes: 24},
{seriesID: 3, count: 1, uncompressedSizeBytes: 24},
{seriesID: 1, count: 1, uncompressedSizeBytes: 1684},
{seriesID: 1, count: 1, uncompressedSizeBytes: 1684},
{seriesID: 2, count: 1, uncompressedSizeBytes: 63},
{seriesID: 2, count: 1, uncompressedSizeBytes: 63},
{seriesID: 3, count: 1, uncompressedSizeBytes: 32},
{seriesID: 3, count: 1, uncompressedSizeBytes: 32},
},
},
{
name: "Test with a single part with same ts",
dpsList: []*dataPoints{duplicatedDps},
sids: []common.SeriesID{1},
dpsList: []*dataPoints{duplicatedDps1},
sids: []common.SeriesID{1, 2, 3},
minTimestamp: 1,
maxTimestamp: 1,
want: []blockMetadata{
{seriesID: 1, count: 1, uncompressedSizeBytes: 24},
{seriesID: 1, count: 1, uncompressedSizeBytes: 16},
{seriesID: 2, count: 1, uncompressedSizeBytes: 16},
{seriesID: 3, count: 1, uncompressedSizeBytes: 16},
},
},
{
Expand All @@ -231,12 +234,12 @@ func Test_tstIter(t *testing.T) {
minTimestamp: 1,
maxTimestamp: 2,
want: []blockMetadata{
{seriesID: 1, count: 1, uncompressedSizeBytes: 1676},
{seriesID: 1, count: 1, uncompressedSizeBytes: 1676},
{seriesID: 2, count: 1, uncompressedSizeBytes: 55},
{seriesID: 2, count: 1, uncompressedSizeBytes: 55},
{seriesID: 3, count: 1, uncompressedSizeBytes: 24},
{seriesID: 3, count: 1, uncompressedSizeBytes: 24},
{seriesID: 1, count: 1, uncompressedSizeBytes: 1684},
{seriesID: 1, count: 1, uncompressedSizeBytes: 1684},
{seriesID: 2, count: 1, uncompressedSizeBytes: 63},
{seriesID: 2, count: 1, uncompressedSizeBytes: 63},
{seriesID: 3, count: 1, uncompressedSizeBytes: 32},
{seriesID: 3, count: 1, uncompressedSizeBytes: 32},
},
},
}
Expand Down Expand Up @@ -280,9 +283,9 @@ func Test_tstIter(t *testing.T) {
minTimestamp: 1,
maxTimestamp: 1,
want: []blockMetadata{
{seriesID: 1, count: 1, uncompressedSizeBytes: 1676},
{seriesID: 2, count: 1, uncompressedSizeBytes: 55},
{seriesID: 3, count: 1, uncompressedSizeBytes: 24},
{seriesID: 1, count: 1, uncompressedSizeBytes: 1684},
{seriesID: 2, count: 1, uncompressedSizeBytes: 63},
{seriesID: 3, count: 1, uncompressedSizeBytes: 32},
},
},
{
Expand All @@ -292,9 +295,9 @@ func Test_tstIter(t *testing.T) {
minTimestamp: 1,
maxTimestamp: 2,
want: []blockMetadata{
{seriesID: 1, count: 2, uncompressedSizeBytes: 3352},
{seriesID: 2, count: 2, uncompressedSizeBytes: 110},
{seriesID: 3, count: 2, uncompressedSizeBytes: 48},
{seriesID: 1, count: 2, uncompressedSizeBytes: 3368},
{seriesID: 2, count: 2, uncompressedSizeBytes: 126},
{seriesID: 3, count: 2, uncompressedSizeBytes: 64},
},
},
{
Expand All @@ -304,9 +307,9 @@ func Test_tstIter(t *testing.T) {
minTimestamp: 1,
maxTimestamp: 2,
want: []blockMetadata{
{seriesID: 1, count: 1, uncompressedSizeBytes: 1676},
{seriesID: 2, count: 1, uncompressedSizeBytes: 55},
{seriesID: 3, count: 1, uncompressedSizeBytes: 24},
{seriesID: 1, count: 1, uncompressedSizeBytes: 1684},
{seriesID: 2, count: 1, uncompressedSizeBytes: 63},
{seriesID: 3, count: 1, uncompressedSizeBytes: 32},
},
},
}
Expand All @@ -322,38 +325,52 @@ func Test_tstIter(t *testing.T) {
require.NoError(t, err)
for i, dps := range tt.dpsList {
tst.mustAddDataPoints(dps)
timeout := time.After(flags.EventuallyTimeout) // Set the timeout duration
OUTER:
for {
snp := tst.currentSnapshot()
if snp == nil {
t.Logf("waiting for snapshot %d to be introduced", i)
time.Sleep(100 * time.Millisecond)
continue
}
if snp.creator != snapshotCreatorMemPart {
select {
case <-timeout:
t.Fatalf("timeout waiting for snapshot %d to be introduced", i)
default:
snp := tst.currentSnapshot()
if snp == nil {
t.Logf("waiting for snapshot %d to be introduced", i)
time.Sleep(100 * time.Millisecond)
continue
}
if snp.creator != snapshotCreatorMemPart {
snp.decRef()
break OUTER
}
t.Logf("waiting for snapshot %d to be flushed or merged: current creator:%d, parts: %+v",
i, snp.creator, snp.parts)
snp.decRef()
break
time.Sleep(100 * time.Millisecond)
}
t.Logf("waiting for snapshot %d to be flushed or merged: current creator:%d, parts: %+v",
i, snp.creator, snp.parts)
snp.decRef()
time.Sleep(100 * time.Millisecond)
}
}
// wait until some parts are merged
if len(tt.dpsList) > 0 {
timeout := time.After(flags.EventuallyTimeout) // Set the timeout duration
OUTER1:
for {
snp := tst.currentSnapshot()
if snp == nil {
time.Sleep(100 * time.Millisecond)
continue
}
if len(snp.parts) == 1 {
select {
case <-timeout:
t.Fatalf("timeout waiting for snapshot to be merged")
default:
snp := tst.currentSnapshot()
if snp == nil {
time.Sleep(100 * time.Millisecond)
continue
}
if len(snp.parts) == 1 {
snp.decRef()
break OUTER1
}
t.Logf("waiting for snapshot to be merged: current creator:%d, parts: %+v", snp.creator, snp.parts)
snp.decRef()
break
time.Sleep(100 * time.Millisecond)
}
t.Logf("waiting for snapshot to be merged: current creator:%d, parts: %+v", snp.creator, snp.parts)
snp.decRef()
time.Sleep(100 * time.Millisecond)
}
}
verify(t, tt, tst)
Expand Down Expand Up @@ -630,6 +647,34 @@ var duplicatedDps = &dataPoints{
},
}

var duplicatedDps1 = &dataPoints{
seriesIDs: []common.SeriesID{2, 2, 2, 1, 1, 1, 3, 3, 3},
timestamps: []int64{1, 1, 1, 1, 1, 1, 1, 1, 1},
versions: []int64{1, 2, 3, 3, 2, 1, 2, 1, 3},
tagFamilies: [][]nameValues{
{},
{},
{},
{},
{},
{},
{},
{},
{},
},
fields: []nameValues{
{},
{},
{},
{},
{},
{},
{},
{},
{},
},
}

func generateHugeDps(startTimestamp, endTimestamp, timestamp int64) *dataPoints {
hugeDps := &dataPoints{
seriesIDs: []common.SeriesID{},
Expand Down
2 changes: 1 addition & 1 deletion banyand/stream/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ func (b *block) unmarshalTagFamilyFromSeqReaders(decoder *encoding.BytesBlockDec
func (b *block) uncompressedSizeBytes() uint64 {
elementsCount := uint64(b.Len())

n := elementsCount * 8
n := elementsCount * (8 + 8) // 8 bytes for timestamp and 8 bytes for elementID

tff := b.tagFamilies
for i := range tff {
Expand Down
Loading

0 comments on commit a528a5d

Please sign in to comment.