From 86a1331a058b7a113e2b3bd27343a408a466504e Mon Sep 17 00:00:00 2001 From: Gao Hongtao Date: Fri, 13 Sep 2024 14:15:10 +0800 Subject: [PATCH] Fix duplicated data points in a writing batch (#536) --- banyand/measure/block.go | 2 +- banyand/measure/block_reader_test.go | 32 +++--- banyand/measure/merger_policy.go | 2 +- banyand/measure/merger_test.go | 38 +++---- banyand/measure/part.go | 4 +- banyand/measure/tstable_test.go | 143 ++++++++++++++++++--------- banyand/stream/block.go | 2 +- banyand/stream/block_reader_test.go | 38 +++---- banyand/stream/merger_policy.go | 2 +- banyand/stream/merger_test.go | 32 +++--- banyand/stream/tstable_test.go | 107 +++++++++++--------- 11 files changed, 230 insertions(+), 172 deletions(-) diff --git a/banyand/measure/block.go b/banyand/measure/block.go index b95329a18..d6808bd3b 100644 --- a/banyand/measure/block.go +++ b/banyand/measure/block.go @@ -258,7 +258,7 @@ func (b *block) unmarshalTagFamilyFromSeqReaders(decoder *encoding.BytesBlockDec func (b *block) uncompressedSizeBytes() uint64 { dataPointsCount := uint64(b.Len()) - n := dataPointsCount * 8 + n := dataPointsCount * (8 + 8) // 8 bytes for timestamp and 8 bytes for version tff := b.tagFamilies for i := range tff { diff --git a/banyand/measure/block_reader_test.go b/banyand/measure/block_reader_test.go index cd576b78d..0cd0cf9d5 100644 --- a/banyand/measure/block_reader_test.go +++ b/banyand/measure/block_reader_test.go @@ -45,40 +45,40 @@ func Test_blockReader_nextBlock(t *testing.T) { name: "Test with single part", dpsList: []*dataPoints{dpsTS1}, want: []blockMetadata{ - {seriesID: 1, count: 1, uncompressedSizeBytes: 1676}, - {seriesID: 2, count: 1, uncompressedSizeBytes: 55}, - {seriesID: 3, count: 1, uncompressedSizeBytes: 24}, + {seriesID: 1, count: 1, uncompressedSizeBytes: 1684}, + {seriesID: 2, count: 1, uncompressedSizeBytes: 63}, + {seriesID: 3, count: 1, uncompressedSizeBytes: 32}, }, }, { name: "Test with multiple parts with different ts", dpsList: []*dataPoints{dpsTS1, dpsTS2}, want: []blockMetadata{ - {seriesID: 1, count: 1, uncompressedSizeBytes: 1676}, - {seriesID: 1, count: 1, uncompressedSizeBytes: 1676}, - {seriesID: 2, count: 1, uncompressedSizeBytes: 55}, - {seriesID: 2, count: 1, uncompressedSizeBytes: 55}, - {seriesID: 3, count: 1, uncompressedSizeBytes: 24}, - {seriesID: 3, count: 1, uncompressedSizeBytes: 24}, + {seriesID: 1, count: 1, uncompressedSizeBytes: 1684}, + {seriesID: 1, count: 1, uncompressedSizeBytes: 1684}, + {seriesID: 2, count: 1, uncompressedSizeBytes: 63}, + {seriesID: 2, count: 1, uncompressedSizeBytes: 63}, + {seriesID: 3, count: 1, uncompressedSizeBytes: 32}, + {seriesID: 3, count: 1, uncompressedSizeBytes: 32}, }, }, { name: "Test with a single part with same ts", dpsList: []*dataPoints{duplicatedDps}, want: []blockMetadata{ - {seriesID: 1, count: 1, uncompressedSizeBytes: 24}, + {seriesID: 1, count: 1, uncompressedSizeBytes: 32}, }, }, { name: "Test with multiple parts with same ts", dpsList: []*dataPoints{dpsTS1, dpsTS1}, want: []blockMetadata{ - {seriesID: 1, count: 1, uncompressedSizeBytes: 1676}, - {seriesID: 1, count: 1, uncompressedSizeBytes: 1676}, - {seriesID: 2, count: 1, uncompressedSizeBytes: 55}, - {seriesID: 2, count: 1, uncompressedSizeBytes: 55}, - {seriesID: 3, count: 1, uncompressedSizeBytes: 24}, - {seriesID: 3, count: 1, uncompressedSizeBytes: 24}, + {seriesID: 1, count: 1, uncompressedSizeBytes: 1684}, + {seriesID: 1, count: 1, uncompressedSizeBytes: 1684}, + {seriesID: 2, count: 1, uncompressedSizeBytes: 63}, + {seriesID: 2, count: 1, uncompressedSizeBytes: 63}, + {seriesID: 3, count: 1, uncompressedSizeBytes: 32}, + {seriesID: 3, count: 1, uncompressedSizeBytes: 32}, }, }, } diff --git a/banyand/measure/merger_policy.go b/banyand/measure/merger_policy.go index 750122ef2..d9d07678c 100644 --- a/banyand/measure/merger_policy.go +++ b/banyand/measure/merger_policy.go @@ -38,7 +38,7 @@ func newDefaultMergePolicy() *mergePolicy { } func newDefaultMergePolicyForTesting() *mergePolicy { - return newMergePolicy(4, 1.7, run.Bytes(math.MaxInt64)) + return newMergePolicy(3, 1, run.Bytes(math.MaxInt64)) } // NewMergePolicy creates a MergePolicy with given parameters. diff --git a/banyand/measure/merger_test.go b/banyand/measure/merger_test.go index b3983407d..b8a8ff50c 100644 --- a/banyand/measure/merger_test.go +++ b/banyand/measure/merger_test.go @@ -260,43 +260,43 @@ func Test_mergeParts(t *testing.T) { name: "Test with single part", dpsList: []*dataPoints{dpsTS1}, want: []blockMetadata{ - {seriesID: 1, count: 1, uncompressedSizeBytes: 1676}, - {seriesID: 2, count: 1, uncompressedSizeBytes: 55}, - {seriesID: 3, count: 1, uncompressedSizeBytes: 24}, + {seriesID: 1, count: 1, uncompressedSizeBytes: 1684}, + {seriesID: 2, count: 1, uncompressedSizeBytes: 63}, + {seriesID: 3, count: 1, uncompressedSizeBytes: 32}, }, }, { name: "Test with multiple parts with different ts", dpsList: []*dataPoints{dpsTS1, dpsTS2, dpsTS2}, want: []blockMetadata{ - {seriesID: 1, count: 2, uncompressedSizeBytes: 3352}, - {seriesID: 2, count: 2, uncompressedSizeBytes: 110}, - {seriesID: 3, count: 2, uncompressedSizeBytes: 48}, + {seriesID: 1, count: 2, uncompressedSizeBytes: 3368}, + {seriesID: 2, count: 2, uncompressedSizeBytes: 126}, + {seriesID: 3, count: 2, uncompressedSizeBytes: 64}, }, }, { name: "Test with multiple parts with same ts", dpsList: []*dataPoints{dpsTS11, dpsTS1}, want: []blockMetadata{ - {seriesID: 1, count: 1, uncompressedSizeBytes: 1676}, - {seriesID: 2, count: 1, uncompressedSizeBytes: 55}, - {seriesID: 3, count: 1, uncompressedSizeBytes: 24}, + {seriesID: 1, count: 1, uncompressedSizeBytes: 1684}, + {seriesID: 2, count: 1, uncompressedSizeBytes: 63}, + {seriesID: 3, count: 1, uncompressedSizeBytes: 32}, }, }, { name: "Test with multiple parts with a large quantity of different ts", dpsList: []*dataPoints{generateHugeDps(1, 5000, 1), generateHugeDps(5001, 10000, 2)}, want: []blockMetadata{ - {seriesID: 1, count: 1265, uncompressedSizeBytes: 2120140}, - {seriesID: 1, count: 1265, uncompressedSizeBytes: 2120140}, - {seriesID: 1, count: 1265, uncompressedSizeBytes: 2120140}, - {seriesID: 1, count: 2470, uncompressedSizeBytes: 4139720}, - {seriesID: 1, count: 2470, uncompressedSizeBytes: 4139720}, - {seriesID: 1, count: 2470, uncompressedSizeBytes: 4139720}, - {seriesID: 1, count: 2410, uncompressedSizeBytes: 4039160}, - {seriesID: 1, count: 1205, uncompressedSizeBytes: 2019580}, - {seriesID: 2, count: 2, uncompressedSizeBytes: 110}, - {seriesID: 3, count: 2, uncompressedSizeBytes: 48}, + {seriesID: 1, count: 1265, uncompressedSizeBytes: 2130260}, + {seriesID: 1, count: 1265, uncompressedSizeBytes: 2130260}, + {seriesID: 1, count: 1265, uncompressedSizeBytes: 2130260}, + {seriesID: 1, count: 2470, uncompressedSizeBytes: 4159480}, + {seriesID: 1, count: 2470, uncompressedSizeBytes: 4159480}, + {seriesID: 1, count: 2470, uncompressedSizeBytes: 4159480}, + {seriesID: 1, count: 2410, uncompressedSizeBytes: 4058440}, + {seriesID: 1, count: 1205, uncompressedSizeBytes: 2029220}, + {seriesID: 2, count: 2, uncompressedSizeBytes: 126}, + {seriesID: 3, count: 2, uncompressedSizeBytes: 64}, }, }, } diff --git a/banyand/measure/part.go b/banyand/measure/part.go index 36a72eb84..1e89b1e9c 100644 --- a/banyand/measure/part.go +++ b/banyand/measure/part.go @@ -163,16 +163,14 @@ func (mp *memPart) mustInitFromDataPoints(dps *dataPoints) { continue } tsPrev = dps.timestamps[i] - } else { - tsPrev = 0 } if uncompressedBlockSizeBytes >= maxUncompressedBlockSize || (i-indexPrev) > maxBlockLength || sid != sidPrev { bsw.MustWriteDataPoints(sidPrev, dps.timestamps[indexPrev:i], dps.versions[indexPrev:i], dps.tagFamilies[indexPrev:i], dps.fields[indexPrev:i]) sidPrev = sid - tsPrev = 0 indexPrev = i + tsPrev = dps.timestamps[indexPrev] uncompressedBlockSizeBytes = 0 } uncompressedBlockSizeBytes += uncompressedDataPointSizeBytes(i, dps) diff --git a/banyand/measure/tstable_test.go b/banyand/measure/tstable_test.go index eff1cc283..2f24d5a6c 100644 --- a/banyand/measure/tstable_test.go +++ b/banyand/measure/tstable_test.go @@ -35,6 +35,7 @@ import ( "github.com/apache/skywalking-banyandb/pkg/query/model" "github.com/apache/skywalking-banyandb/pkg/run" "github.com/apache/skywalking-banyandb/pkg/test" + "github.com/apache/skywalking-banyandb/pkg/test/flags" "github.com/apache/skywalking-banyandb/pkg/timestamp" "github.com/apache/skywalking-banyandb/pkg/watcher" ) @@ -194,9 +195,9 @@ func Test_tstIter(t *testing.T) { minTimestamp: 1, maxTimestamp: 1, want: []blockMetadata{ - {seriesID: 1, count: 1, uncompressedSizeBytes: 1676}, - {seriesID: 2, count: 1, uncompressedSizeBytes: 55}, - {seriesID: 3, count: 1, uncompressedSizeBytes: 24}, + {seriesID: 1, count: 1, uncompressedSizeBytes: 1684}, + {seriesID: 2, count: 1, uncompressedSizeBytes: 63}, + {seriesID: 3, count: 1, uncompressedSizeBytes: 32}, }, }, { @@ -206,22 +207,24 @@ func Test_tstIter(t *testing.T) { minTimestamp: 1, maxTimestamp: 2, want: []blockMetadata{ - {seriesID: 1, count: 1, uncompressedSizeBytes: 1676}, - {seriesID: 1, count: 1, uncompressedSizeBytes: 1676}, - {seriesID: 2, count: 1, uncompressedSizeBytes: 55}, - {seriesID: 2, count: 1, uncompressedSizeBytes: 55}, - {seriesID: 3, count: 1, uncompressedSizeBytes: 24}, - {seriesID: 3, count: 1, uncompressedSizeBytes: 24}, + {seriesID: 1, count: 1, uncompressedSizeBytes: 1684}, + {seriesID: 1, count: 1, uncompressedSizeBytes: 1684}, + {seriesID: 2, count: 1, uncompressedSizeBytes: 63}, + {seriesID: 2, count: 1, uncompressedSizeBytes: 63}, + {seriesID: 3, count: 1, uncompressedSizeBytes: 32}, + {seriesID: 3, count: 1, uncompressedSizeBytes: 32}, }, }, { name: "Test with a single part with same ts", - dpsList: []*dataPoints{duplicatedDps}, - sids: []common.SeriesID{1}, + dpsList: []*dataPoints{duplicatedDps1}, + sids: []common.SeriesID{1, 2, 3}, minTimestamp: 1, maxTimestamp: 1, want: []blockMetadata{ - {seriesID: 1, count: 1, uncompressedSizeBytes: 24}, + {seriesID: 1, count: 1, uncompressedSizeBytes: 16}, + {seriesID: 2, count: 1, uncompressedSizeBytes: 16}, + {seriesID: 3, count: 1, uncompressedSizeBytes: 16}, }, }, { @@ -231,12 +234,12 @@ func Test_tstIter(t *testing.T) { minTimestamp: 1, maxTimestamp: 2, want: []blockMetadata{ - {seriesID: 1, count: 1, uncompressedSizeBytes: 1676}, - {seriesID: 1, count: 1, uncompressedSizeBytes: 1676}, - {seriesID: 2, count: 1, uncompressedSizeBytes: 55}, - {seriesID: 2, count: 1, uncompressedSizeBytes: 55}, - {seriesID: 3, count: 1, uncompressedSizeBytes: 24}, - {seriesID: 3, count: 1, uncompressedSizeBytes: 24}, + {seriesID: 1, count: 1, uncompressedSizeBytes: 1684}, + {seriesID: 1, count: 1, uncompressedSizeBytes: 1684}, + {seriesID: 2, count: 1, uncompressedSizeBytes: 63}, + {seriesID: 2, count: 1, uncompressedSizeBytes: 63}, + {seriesID: 3, count: 1, uncompressedSizeBytes: 32}, + {seriesID: 3, count: 1, uncompressedSizeBytes: 32}, }, }, } @@ -280,9 +283,9 @@ func Test_tstIter(t *testing.T) { minTimestamp: 1, maxTimestamp: 1, want: []blockMetadata{ - {seriesID: 1, count: 1, uncompressedSizeBytes: 1676}, - {seriesID: 2, count: 1, uncompressedSizeBytes: 55}, - {seriesID: 3, count: 1, uncompressedSizeBytes: 24}, + {seriesID: 1, count: 1, uncompressedSizeBytes: 1684}, + {seriesID: 2, count: 1, uncompressedSizeBytes: 63}, + {seriesID: 3, count: 1, uncompressedSizeBytes: 32}, }, }, { @@ -292,9 +295,9 @@ func Test_tstIter(t *testing.T) { minTimestamp: 1, maxTimestamp: 2, want: []blockMetadata{ - {seriesID: 1, count: 2, uncompressedSizeBytes: 3352}, - {seriesID: 2, count: 2, uncompressedSizeBytes: 110}, - {seriesID: 3, count: 2, uncompressedSizeBytes: 48}, + {seriesID: 1, count: 2, uncompressedSizeBytes: 3368}, + {seriesID: 2, count: 2, uncompressedSizeBytes: 126}, + {seriesID: 3, count: 2, uncompressedSizeBytes: 64}, }, }, { @@ -304,9 +307,9 @@ func Test_tstIter(t *testing.T) { minTimestamp: 1, maxTimestamp: 2, want: []blockMetadata{ - {seriesID: 1, count: 1, uncompressedSizeBytes: 1676}, - {seriesID: 2, count: 1, uncompressedSizeBytes: 55}, - {seriesID: 3, count: 1, uncompressedSizeBytes: 24}, + {seriesID: 1, count: 1, uncompressedSizeBytes: 1684}, + {seriesID: 2, count: 1, uncompressedSizeBytes: 63}, + {seriesID: 3, count: 1, uncompressedSizeBytes: 32}, }, }, } @@ -322,38 +325,52 @@ func Test_tstIter(t *testing.T) { require.NoError(t, err) for i, dps := range tt.dpsList { tst.mustAddDataPoints(dps) + timeout := time.After(flags.EventuallyTimeout) // Set the timeout duration + OUTER: for { - snp := tst.currentSnapshot() - if snp == nil { - t.Logf("waiting for snapshot %d to be introduced", i) - time.Sleep(100 * time.Millisecond) - continue - } - if snp.creator != snapshotCreatorMemPart { + select { + case <-timeout: + t.Fatalf("timeout waiting for snapshot %d to be introduced", i) + default: + snp := tst.currentSnapshot() + if snp == nil { + t.Logf("waiting for snapshot %d to be introduced", i) + time.Sleep(100 * time.Millisecond) + continue + } + if snp.creator != snapshotCreatorMemPart { + snp.decRef() + break OUTER + } + t.Logf("waiting for snapshot %d to be flushed or merged: current creator:%d, parts: %+v", + i, snp.creator, snp.parts) snp.decRef() - break + time.Sleep(100 * time.Millisecond) } - t.Logf("waiting for snapshot %d to be flushed or merged: current creator:%d, parts: %+v", - i, snp.creator, snp.parts) - snp.decRef() - time.Sleep(100 * time.Millisecond) } } // wait until some parts are merged if len(tt.dpsList) > 0 { + timeout := time.After(flags.EventuallyTimeout) // Set the timeout duration + OUTER1: for { - snp := tst.currentSnapshot() - if snp == nil { - time.Sleep(100 * time.Millisecond) - continue - } - if len(snp.parts) == 1 { + select { + case <-timeout: + t.Fatalf("timeout waiting for snapshot to be merged") + default: + snp := tst.currentSnapshot() + if snp == nil { + time.Sleep(100 * time.Millisecond) + continue + } + if len(snp.parts) == 1 { + snp.decRef() + break OUTER1 + } + t.Logf("waiting for snapshot to be merged: current creator:%d, parts: %+v", snp.creator, snp.parts) snp.decRef() - break + time.Sleep(100 * time.Millisecond) } - t.Logf("waiting for snapshot to be merged: current creator:%d, parts: %+v", snp.creator, snp.parts) - snp.decRef() - time.Sleep(100 * time.Millisecond) } } verify(t, tt, tst) @@ -630,6 +647,34 @@ var duplicatedDps = &dataPoints{ }, } +var duplicatedDps1 = &dataPoints{ + seriesIDs: []common.SeriesID{2, 2, 2, 1, 1, 1, 3, 3, 3}, + timestamps: []int64{1, 1, 1, 1, 1, 1, 1, 1, 1}, + versions: []int64{1, 2, 3, 3, 2, 1, 2, 1, 3}, + tagFamilies: [][]nameValues{ + {}, + {}, + {}, + {}, + {}, + {}, + {}, + {}, + {}, + }, + fields: []nameValues{ + {}, + {}, + {}, + {}, + {}, + {}, + {}, + {}, + {}, + }, +} + func generateHugeDps(startTimestamp, endTimestamp, timestamp int64) *dataPoints { hugeDps := &dataPoints{ seriesIDs: []common.SeriesID{}, diff --git a/banyand/stream/block.go b/banyand/stream/block.go index e6e7b964a..3b65e88e5 100644 --- a/banyand/stream/block.go +++ b/banyand/stream/block.go @@ -233,7 +233,7 @@ func (b *block) unmarshalTagFamilyFromSeqReaders(decoder *encoding.BytesBlockDec func (b *block) uncompressedSizeBytes() uint64 { elementsCount := uint64(b.Len()) - n := elementsCount * 8 + n := elementsCount * (8 + 8) // 8 bytes for timestamp and 8 bytes for elementID tff := b.tagFamilies for i := range tff { diff --git a/banyand/stream/block_reader_test.go b/banyand/stream/block_reader_test.go index 154fdd21f..ae9a3fc11 100644 --- a/banyand/stream/block_reader_test.go +++ b/banyand/stream/block_reader_test.go @@ -45,39 +45,39 @@ func Test_blockReader_nextBlock(t *testing.T) { name: "Test with single part", esList: []*elements{esTS1}, want: []blockMetadata{ - {seriesID: 1, count: 1, uncompressedSizeBytes: 881}, - {seriesID: 2, count: 1, uncompressedSizeBytes: 55}, - {seriesID: 3, count: 1, uncompressedSizeBytes: 8}, + {seriesID: 1, count: 1, uncompressedSizeBytes: 889}, + {seriesID: 2, count: 1, uncompressedSizeBytes: 63}, + {seriesID: 3, count: 1, uncompressedSizeBytes: 16}, }, }, { name: "Test with multiple parts with different ts", esList: []*elements{esTS1, esTS2}, want: []blockMetadata{ - {seriesID: 1, count: 1, uncompressedSizeBytes: 881}, - {seriesID: 1, count: 1, uncompressedSizeBytes: 881}, - {seriesID: 2, count: 1, uncompressedSizeBytes: 55}, - {seriesID: 2, count: 1, uncompressedSizeBytes: 55}, - {seriesID: 3, count: 1, uncompressedSizeBytes: 8}, - {seriesID: 3, count: 1, uncompressedSizeBytes: 8}, + {seriesID: 1, count: 1, uncompressedSizeBytes: 889}, + {seriesID: 1, count: 1, uncompressedSizeBytes: 889}, + {seriesID: 2, count: 1, uncompressedSizeBytes: 63}, + {seriesID: 2, count: 1, uncompressedSizeBytes: 63}, + {seriesID: 3, count: 1, uncompressedSizeBytes: 16}, + {seriesID: 3, count: 1, uncompressedSizeBytes: 16}, }, }, { name: "Test with multiple parts with same ts", esList: []*elements{esTS1, esTS1}, want: []blockMetadata{ - {seriesID: 1, count: 1, uncompressedSizeBytes: 881}, - {seriesID: 1, count: 1, uncompressedSizeBytes: 881}, - {seriesID: 2, count: 1, uncompressedSizeBytes: 55}, - {seriesID: 2, count: 1, uncompressedSizeBytes: 55}, - {seriesID: 3, count: 1, uncompressedSizeBytes: 8}, - {seriesID: 3, count: 1, uncompressedSizeBytes: 8}, + {seriesID: 1, count: 1, uncompressedSizeBytes: 889}, + {seriesID: 1, count: 1, uncompressedSizeBytes: 889}, + {seriesID: 2, count: 1, uncompressedSizeBytes: 63}, + {seriesID: 2, count: 1, uncompressedSizeBytes: 63}, + {seriesID: 3, count: 1, uncompressedSizeBytes: 16}, + {seriesID: 3, count: 1, uncompressedSizeBytes: 16}, }, }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - verify := func(pp []*part) { + verify := func(t *testing.T, pp []*part) { var pii []*partMergeIter for _, p := range pp { pmi := &partMergeIter{} @@ -116,7 +116,7 @@ func Test_blockReader_nextBlock(t *testing.T) { } } - t.Run("memory parts", func(_ *testing.T) { + t.Run("memory parts", func(t *testing.T) { var mpp []*memPart defer func() { for _, mp := range mpp { @@ -130,7 +130,7 @@ func Test_blockReader_nextBlock(t *testing.T) { mp.mustInitFromElements(es) pp = append(pp, openMemPart(mp)) } - verify(pp) + verify(t, pp) }) t.Run("file parts", func(t *testing.T) { @@ -158,7 +158,7 @@ func Test_blockReader_nextBlock(t *testing.T) { fpp = append(fpp, filePW) pp = append(pp, filePW.p) } - verify(pp) + verify(t, pp) }) }) } diff --git a/banyand/stream/merger_policy.go b/banyand/stream/merger_policy.go index e694f8047..ad881e581 100644 --- a/banyand/stream/merger_policy.go +++ b/banyand/stream/merger_policy.go @@ -38,7 +38,7 @@ func newDefaultMergePolicy() *mergePolicy { } func newDefaultMergePolicyForTesting() *mergePolicy { - return newMergePolicy(4, 1.7, run.Bytes(math.MaxInt64)) + return newMergePolicy(3, 1, run.Bytes(math.MaxInt64)) } func newDisabledMergePolicyForTesting() *mergePolicy { diff --git a/banyand/stream/merger_test.go b/banyand/stream/merger_test.go index 310d164b5..3128decee 100644 --- a/banyand/stream/merger_test.go +++ b/banyand/stream/merger_test.go @@ -248,40 +248,40 @@ func Test_mergeParts(t *testing.T) { name: "Test with single part", esList: []*elements{esTS1}, want: []blockMetadata{ - {seriesID: 1, count: 1, uncompressedSizeBytes: 881}, - {seriesID: 2, count: 1, uncompressedSizeBytes: 55}, - {seriesID: 3, count: 1, uncompressedSizeBytes: 8}, + {seriesID: 1, count: 1, uncompressedSizeBytes: 889}, + {seriesID: 2, count: 1, uncompressedSizeBytes: 63}, + {seriesID: 3, count: 1, uncompressedSizeBytes: 16}, }, }, { name: "Test with multiple parts with different ts", esList: []*elements{esTS1, esTS2, esTS2}, want: []blockMetadata{ - {seriesID: 1, count: 3, uncompressedSizeBytes: 2643}, - {seriesID: 2, count: 3, uncompressedSizeBytes: 165}, - {seriesID: 3, count: 3, uncompressedSizeBytes: 24}, + {seriesID: 1, count: 3, uncompressedSizeBytes: 2667}, + {seriesID: 2, count: 3, uncompressedSizeBytes: 189}, + {seriesID: 3, count: 3, uncompressedSizeBytes: 48}, }, }, { name: "Test with multiple parts with same ts", esList: []*elements{esTS1, esTS1, esTS1}, want: []blockMetadata{ - {seriesID: 1, count: 3, uncompressedSizeBytes: 2643}, - {seriesID: 2, count: 3, uncompressedSizeBytes: 165}, - {seriesID: 3, count: 3, uncompressedSizeBytes: 24}, + {seriesID: 1, count: 3, uncompressedSizeBytes: 2667}, + {seriesID: 2, count: 3, uncompressedSizeBytes: 189}, + {seriesID: 3, count: 3, uncompressedSizeBytes: 48}, }, }, { name: "Test with multiple parts with a large quantity of different ts", esList: []*elements{generateHugeEs(1, 5000, 1), generateHugeEs(5001, 10000, 2)}, want: []blockMetadata{ - {seriesID: 1, count: 2448, uncompressedSizeBytes: 2156688}, - {seriesID: 1, count: 2448, uncompressedSizeBytes: 2156688}, - {seriesID: 1, count: 2552, uncompressedSizeBytes: 2248312}, - {seriesID: 1, count: 2448, uncompressedSizeBytes: 2156688}, - {seriesID: 1, count: 104, uncompressedSizeBytes: 91624}, - {seriesID: 2, count: 2, uncompressedSizeBytes: 110}, - {seriesID: 3, count: 2, uncompressedSizeBytes: 16}, + {seriesID: 1, count: 2448, uncompressedSizeBytes: 2176272}, + {seriesID: 1, count: 2448, uncompressedSizeBytes: 2176272}, + {seriesID: 1, count: 2552, uncompressedSizeBytes: 2268728}, + {seriesID: 1, count: 2448, uncompressedSizeBytes: 2176272}, + {seriesID: 1, count: 104, uncompressedSizeBytes: 92456}, + {seriesID: 2, count: 2, uncompressedSizeBytes: 126}, + {seriesID: 3, count: 2, uncompressedSizeBytes: 32}, }, }, } diff --git a/banyand/stream/tstable_test.go b/banyand/stream/tstable_test.go index c8759b9e1..9224e5523 100644 --- a/banyand/stream/tstable_test.go +++ b/banyand/stream/tstable_test.go @@ -36,6 +36,7 @@ import ( "github.com/apache/skywalking-banyandb/pkg/query/model" "github.com/apache/skywalking-banyandb/pkg/run" "github.com/apache/skywalking-banyandb/pkg/test" + "github.com/apache/skywalking-banyandb/pkg/test/flags" "github.com/apache/skywalking-banyandb/pkg/timestamp" "github.com/apache/skywalking-banyandb/pkg/watcher" ) @@ -190,9 +191,9 @@ func Test_tstIter(t *testing.T) { minTimestamp: 1, maxTimestamp: 1, want: []blockMetadata{ - {seriesID: 1, count: 1, uncompressedSizeBytes: 881}, - {seriesID: 2, count: 1, uncompressedSizeBytes: 55}, - {seriesID: 3, count: 1, uncompressedSizeBytes: 8}, + {seriesID: 1, count: 1, uncompressedSizeBytes: 889}, + {seriesID: 2, count: 1, uncompressedSizeBytes: 63}, + {seriesID: 3, count: 1, uncompressedSizeBytes: 16}, }, }, { @@ -202,12 +203,12 @@ func Test_tstIter(t *testing.T) { minTimestamp: 1, maxTimestamp: 2, want: []blockMetadata{ - {seriesID: 1, count: 1, uncompressedSizeBytes: 881}, - {seriesID: 1, count: 1, uncompressedSizeBytes: 881}, - {seriesID: 2, count: 1, uncompressedSizeBytes: 55}, - {seriesID: 2, count: 1, uncompressedSizeBytes: 55}, - {seriesID: 3, count: 1, uncompressedSizeBytes: 8}, - {seriesID: 3, count: 1, uncompressedSizeBytes: 8}, + {seriesID: 1, count: 1, uncompressedSizeBytes: 889}, + {seriesID: 1, count: 1, uncompressedSizeBytes: 889}, + {seriesID: 2, count: 1, uncompressedSizeBytes: 63}, + {seriesID: 2, count: 1, uncompressedSizeBytes: 63}, + {seriesID: 3, count: 1, uncompressedSizeBytes: 16}, + {seriesID: 3, count: 1, uncompressedSizeBytes: 16}, }, }, { @@ -217,12 +218,12 @@ func Test_tstIter(t *testing.T) { minTimestamp: 1, maxTimestamp: 2, want: []blockMetadata{ - {seriesID: 1, count: 1, uncompressedSizeBytes: 881}, - {seriesID: 1, count: 1, uncompressedSizeBytes: 881}, - {seriesID: 2, count: 1, uncompressedSizeBytes: 55}, - {seriesID: 2, count: 1, uncompressedSizeBytes: 55}, - {seriesID: 3, count: 1, uncompressedSizeBytes: 8}, - {seriesID: 3, count: 1, uncompressedSizeBytes: 8}, + {seriesID: 1, count: 1, uncompressedSizeBytes: 889}, + {seriesID: 1, count: 1, uncompressedSizeBytes: 889}, + {seriesID: 2, count: 1, uncompressedSizeBytes: 63}, + {seriesID: 2, count: 1, uncompressedSizeBytes: 63}, + {seriesID: 3, count: 1, uncompressedSizeBytes: 16}, + {seriesID: 3, count: 1, uncompressedSizeBytes: 16}, }, }, } @@ -268,9 +269,9 @@ func Test_tstIter(t *testing.T) { minTimestamp: 1, maxTimestamp: 1, want: []blockMetadata{ - {seriesID: 1, count: 1, uncompressedSizeBytes: 881}, - {seriesID: 2, count: 1, uncompressedSizeBytes: 55}, - {seriesID: 3, count: 1, uncompressedSizeBytes: 8}, + {seriesID: 1, count: 1, uncompressedSizeBytes: 889}, + {seriesID: 2, count: 1, uncompressedSizeBytes: 63}, + {seriesID: 3, count: 1, uncompressedSizeBytes: 16}, }, }, { @@ -280,9 +281,9 @@ func Test_tstIter(t *testing.T) { minTimestamp: 1, maxTimestamp: 2, want: []blockMetadata{ - {seriesID: 1, count: 3, uncompressedSizeBytes: 2643}, - {seriesID: 2, count: 3, uncompressedSizeBytes: 165}, - {seriesID: 3, count: 3, uncompressedSizeBytes: 24}, + {seriesID: 1, count: 3, uncompressedSizeBytes: 2667}, + {seriesID: 2, count: 3, uncompressedSizeBytes: 189}, + {seriesID: 3, count: 3, uncompressedSizeBytes: 48}, }, }, { @@ -292,9 +293,9 @@ func Test_tstIter(t *testing.T) { minTimestamp: 1, maxTimestamp: 2, want: []blockMetadata{ - {seriesID: 1, count: 2, uncompressedSizeBytes: 1762}, - {seriesID: 2, count: 2, uncompressedSizeBytes: 110}, - {seriesID: 3, count: 2, uncompressedSizeBytes: 16}, + {seriesID: 1, count: 2, uncompressedSizeBytes: 1778}, + {seriesID: 2, count: 2, uncompressedSizeBytes: 126}, + {seriesID: 3, count: 2, uncompressedSizeBytes: 32}, }, }, } @@ -310,38 +311,52 @@ func Test_tstIter(t *testing.T) { require.NoError(t, err) for i, es := range tt.esList { tst.mustAddElements(es) + timeout := time.After(flags.EventuallyTimeout) // Set the timeout duration + OUTER: for { - snp := tst.currentSnapshot() - if snp == nil { - t.Logf("waiting for snapshot %d to be introduced", i) - time.Sleep(100 * time.Millisecond) - continue - } - if snp.creator != snapshotCreatorMemPart { + select { + case <-timeout: + t.Fatalf("timeout waiting for snapshot %d to be introduced", i) + default: + snp := tst.currentSnapshot() + if snp == nil { + t.Logf("waiting for snapshot %d to be introduced", i) + time.Sleep(100 * time.Millisecond) + continue + } + if snp.creator != snapshotCreatorMemPart { + snp.decRef() + break OUTER + } + t.Logf("waiting for snapshot %d to be flushed or merged: current creator:%d, parts: %+v", + i, snp.creator, snp.parts) snp.decRef() - break + time.Sleep(100 * time.Millisecond) } - t.Logf("waiting for snapshot %d to be flushed or merged: current creator:%d, parts: %+v", - i, snp.creator, snp.parts) - snp.decRef() - time.Sleep(100 * time.Millisecond) } } // wait until some parts are merged if len(tt.esList) > 0 { + timeout := time.After(flags.EventuallyTimeout) // Set the timeout duration + OUTER1: for { - snp := tst.currentSnapshot() - if snp == nil { - time.Sleep(100 * time.Millisecond) - continue - } - if len(snp.parts) == 1 || len(snp.parts) < len(tt.esList) { + select { + case <-timeout: + t.Fatalf("timeout waiting for snapshot to be merged") + default: + snp := tst.currentSnapshot() + if snp == nil { + time.Sleep(100 * time.Millisecond) + continue + } + if len(snp.parts) == 1 || len(snp.parts) < len(tt.esList) { + snp.decRef() + break OUTER1 + } + t.Logf("waiting for snapshot to be merged: current creator:%d, parts: %+v", snp.creator, snp.parts) snp.decRef() - break + time.Sleep(100 * time.Millisecond) } - t.Logf("waiting for snapshot to be merged: current creator:%d, parts: %+v", snp.creator, snp.parts) - snp.decRef() - time.Sleep(100 * time.Millisecond) } } verify(t, tt, tst)