From f354d0a9e98b435947feda713eda0b1d9a8b2772 Mon Sep 17 00:00:00 2001 From: Gao Hongtao Date: Tue, 6 Aug 2024 15:31:29 +0800 Subject: [PATCH] Add Tracker To Object Pool (#508) * Fix duplicated measure data in a single part * Add the tracked pool to fix leak issues Signed-off-by: Gao Hongtao --------- Signed-off-by: Gao Hongtao --- CHANGES.md | 7 + banyand/internal/storage/index_test.go | 10 +- banyand/liaison/grpc/node.go | 10 + banyand/liaison/grpc/server.go | 1 - banyand/liaison/http/server.go | 2 - banyand/measure/block.go | 14 +- banyand/measure/block_metadata.go | 11 +- banyand/measure/block_reader.go | 10 +- banyand/measure/block_reader_test.go | 15 +- banyand/measure/block_writer.go | 6 +- banyand/measure/column.go | 2 +- banyand/measure/column_metadata.go | 6 +- banyand/measure/datapoints.go | 18 +- banyand/measure/introducer.go | 15 +- banyand/measure/part.go | 27 +- banyand/measure/part_iter.go | 10 +- banyand/measure/query.go | 5 +- banyand/measure/tstable.go | 16 + banyand/measure/tstable_test.go | 62 +++ banyand/observability/meter_prom.go | 12 +- banyand/observability/metrics_system.go | 16 +- banyand/observability/service.go | 16 +- banyand/observability/system.go | 162 -------- banyand/queue/sub/server.go | 1 - banyand/stream/block.go | 14 +- banyand/stream/block_metadata.go | 11 +- banyand/stream/block_reader.go | 10 +- banyand/stream/block_writer.go | 6 +- banyand/stream/introducer.go | 15 +- banyand/stream/part.go | 6 +- banyand/stream/part_iter.go | 10 +- banyand/stream/query.go | 7 +- banyand/stream/tag.go | 2 +- banyand/stream/tag_metadata.go | 6 +- banyand/stream/tstable.go | 16 + pkg/bytes/buffer.go | 13 +- pkg/encoding/bytes.go | 2 +- pkg/encoding/encoder.go | 292 ------------- pkg/encoding/encoder_test.go | 392 ------------------ pkg/encoding/int.go | 11 +- pkg/fs/local_file_system.go | 10 +- pkg/index/inverted/inverted.go | 6 +- pkg/meter/native/collection.go | 2 + pkg/node/interface.go | 13 + pkg/node/maglev.go | 13 + pkg/node/round_robin.go | 69 ++- pkg/node/round_robin_test.go | 22 + pkg/pb/v1/series.go | 21 - pkg/pool/pool.go | 81 ++++ pkg/test/gmatcher/gmatcher.go | 59 +++ .../measure/testdata/groups/exception.json | 18 + .../measure/testdata/measures/duplicated.json | 42 ++ test/cases/init.go | 1 + .../measure/data/input/duplicated_part.yaml | 25 ++ .../measure/data/testdata/duplicated.json | 182 ++++++++ .../measure/data/want/duplicated_part.yaml | 38 ++ test/cases/measure/measure.go | 1 + .../distributed/query/query_suite_test.go | 3 + test/integration/etcd/client_test.go | 3 + test/integration/load/load_suite_test.go | 3 + .../standalone/cold_query/query_suite_test.go | 3 + .../standalone/other/measure_test.go | 3 + .../standalone/other/property_test.go | 3 + .../standalone/query/query_suite_test.go | 3 + .../query_ondisk/query_ondisk_suite_test.go | 3 + 65 files changed, 879 insertions(+), 1015 deletions(-) delete mode 100644 banyand/observability/system.go delete mode 100644 pkg/encoding/encoder.go delete mode 100644 pkg/encoding/encoder_test.go create mode 100644 pkg/pool/pool.go create mode 100644 pkg/test/gmatcher/gmatcher.go create mode 100644 pkg/test/measure/testdata/groups/exception.json create mode 100644 pkg/test/measure/testdata/measures/duplicated.json create mode 100644 test/cases/measure/data/input/duplicated_part.yaml create mode 100644 test/cases/measure/data/testdata/duplicated.json create mode 100644 test/cases/measure/data/want/duplicated_part.yaml diff --git a/CHANGES.md b/CHANGES.md index ba07da6fa..7389aaae3 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -35,12 +35,19 @@ Release Notes. - Fix a bug that the Stream module didn't support duplicated in index-based filtering and sorting - Fix the bug that segment's reference count is increased twice when the controller try to create an existing segment. - Fix a bug where a distributed query would return an empty result if the "limit" was set much lower than the "offset". +- Fix duplicated measure data in a single part. +- Fix several "sync.Pool" leak issues by adding a tracker to the pool. ### Documentation + - Introduce new doc menu structure. - Add installation on Docker and Kubernetes. - Add quick-start guide. +### Chores + +Bump up the version of infra e2e framework. + ## 0.6.1 ### Features diff --git a/banyand/internal/storage/index_test.go b/banyand/internal/storage/index_test.go index 0e61b6239..d73886a35 100644 --- a/banyand/internal/storage/index_test.go +++ b/banyand/internal/storage/index_test.go @@ -33,8 +33,6 @@ import ( "github.com/apache/skywalking-banyandb/pkg/test/flags" ) -var testSeriesPool pbv1.SeriesPool - func TestSeriesIndex_Primary(t *testing.T) { ctx := context.Background() path, fn := setUp(require.New(t)) @@ -46,7 +44,7 @@ func TestSeriesIndex_Primary(t *testing.T) { }() var docs index.Documents for i := 0; i < 100; i++ { - series := testSeriesPool.Generate() + var series pbv1.Series series.Subject = "service_instance_latency" series.EntityValues = []*modelv1.TagValue{ {Value: &modelv1.TagValue_Str{Str: &modelv1.Str{Value: fmt.Sprintf("svc_%d", i)}}}, @@ -64,7 +62,6 @@ func TestSeriesIndex_Primary(t *testing.T) { } copy(doc.EntityValues, series.Buffer) docs = append(docs, doc) - testSeriesPool.Release(series) } require.NoError(t, si.Write(docs)) // Restart the index @@ -155,11 +152,10 @@ func TestSeriesIndex_Primary(t *testing.T) { t.Run(tt.name, func(t *testing.T) { var seriesQueries []*pbv1.Series for i := range tt.entityValues { - seriesQuery := testSeriesPool.Generate() - defer testSeriesPool.Release(seriesQuery) + var seriesQuery pbv1.Series seriesQuery.Subject = tt.subject seriesQuery.EntityValues = tt.entityValues[i] - seriesQueries = append(seriesQueries, seriesQuery) + seriesQueries = append(seriesQueries, &seriesQuery) } sl, _, err := si.searchPrimary(ctx, seriesQueries, nil) require.NoError(t, err) diff --git a/banyand/liaison/grpc/node.go b/banyand/liaison/grpc/node.go index b43a3bdb5..9f307fc20 100644 --- a/banyand/liaison/grpc/node.go +++ b/banyand/liaison/grpc/node.go @@ -18,6 +18,7 @@ package grpc import ( + "fmt" "sync" "github.com/pkg/errors" @@ -37,6 +38,7 @@ var ( // together with the shardID calculated from the incoming data. type NodeRegistry interface { Locate(group, name string, shardID uint32) (string, error) + fmt.Stringer } type clusterNodeService struct { @@ -94,8 +96,16 @@ func (n *clusterNodeService) OnDelete(metadata schema.Metadata) { } } +func (n *clusterNodeService) String() string { + return n.sel.String() +} + type localNodeService struct{} +func (l localNodeService) String() string { + return "local" +} + // NewLocalNodeRegistry creates a local(fake) node registry. func NewLocalNodeRegistry() NodeRegistry { return localNodeService{} diff --git a/banyand/liaison/grpc/server.go b/banyand/liaison/grpc/server.go index a4d48bb51..2cc0dd735 100644 --- a/banyand/liaison/grpc/server.go +++ b/banyand/liaison/grpc/server.go @@ -191,7 +191,6 @@ func (s *server) Validate() error { if s.enableIngestionAccessLog && s.accessLogRootPath == "" { return errAccessLogRootPath } - observability.UpdateAddress("grpc", s.addr) if !s.tls { return nil } diff --git a/banyand/liaison/http/server.go b/banyand/liaison/http/server.go index f1b10053d..f7b797b27 100644 --- a/banyand/liaison/http/server.go +++ b/banyand/liaison/http/server.go @@ -40,7 +40,6 @@ import ( measurev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/measure/v1" propertyv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/property/v1" streamv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/stream/v1" - "github.com/apache/skywalking-banyandb/banyand/observability" "github.com/apache/skywalking-banyandb/pkg/logger" "github.com/apache/skywalking-banyandb/pkg/run" "github.com/apache/skywalking-banyandb/ui" @@ -102,7 +101,6 @@ func (p *server) Validate() error { if p.listenAddr == ":" { return errNoAddr } - observability.UpdateAddress("http", p.listenAddr) if p.grpcCert != "" { creds, errTLS := credentials.NewClientTLSFromFile(p.grpcCert, "") if errTLS != nil { diff --git a/banyand/measure/block.go b/banyand/measure/block.go index 001940cce..ae5006e69 100644 --- a/banyand/measure/block.go +++ b/banyand/measure/block.go @@ -20,7 +20,6 @@ package measure import ( "slices" "sort" - "sync" "github.com/apache/skywalking-banyandb/api/common" modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1" @@ -29,6 +28,7 @@ import ( "github.com/apache/skywalking-banyandb/pkg/fs" "github.com/apache/skywalking-banyandb/pkg/logger" pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1" + "github.com/apache/skywalking-banyandb/pkg/pool" "github.com/apache/skywalking-banyandb/pkg/query/model" "github.com/apache/skywalking-banyandb/pkg/timestamp" ) @@ -403,7 +403,7 @@ func generateBlock() *block { if v == nil { return &block{} } - return v.(*block) + return v } func releaseBlock(b *block) { @@ -411,7 +411,7 @@ func releaseBlock(b *block) { blockPool.Put(b) } -var blockPool sync.Pool +var blockPool = pool.Register[*block]("measure-block") type blockCursor struct { p *part @@ -705,14 +705,14 @@ func (bc *blockCursor) loadData(tmpBlock *block) bool { return true } -var blockCursorPool sync.Pool +var blockCursorPool = pool.Register[*blockCursor]("measure-blockCursor") func generateBlockCursor() *blockCursor { v := blockCursorPool.Get() if v == nil { return &blockCursor{} } - return v.(*blockCursor) + return v } func releaseBlockCursor(bc *blockCursor) { @@ -832,7 +832,7 @@ func generateBlockPointer() *blockPointer { if v == nil { return &blockPointer{} } - return v.(*blockPointer) + return v } func releaseBlockPointer(bi *blockPointer) { @@ -840,4 +840,4 @@ func releaseBlockPointer(bi *blockPointer) { blockPointerPool.Put(bi) } -var blockPointerPool sync.Pool +var blockPointerPool = pool.Register[*blockPointer]("measure-blockPointer") diff --git a/banyand/measure/block_metadata.go b/banyand/measure/block_metadata.go index 410932829..ff23c4af3 100644 --- a/banyand/measure/block_metadata.go +++ b/banyand/measure/block_metadata.go @@ -21,11 +21,11 @@ import ( "errors" "fmt" "sort" - "sync" "github.com/apache/skywalking-banyandb/api/common" "github.com/apache/skywalking-banyandb/pkg/convert" "github.com/apache/skywalking-banyandb/pkg/encoding" + "github.com/apache/skywalking-banyandb/pkg/pool" "github.com/apache/skywalking-banyandb/pkg/query/model" ) @@ -170,7 +170,6 @@ func (bm *blockMetadata) unmarshal(src []byte) ([]byte, error) { if err != nil { return nil, fmt.Errorf("cannot unmarshal tagFamily name: %w", err) } - // TODO: cache dataBlock tf := &dataBlock{} src, err = tf.unmarshal(src) if err != nil { @@ -198,7 +197,7 @@ func generateBlockMetadata() *blockMetadata { if v == nil { return &blockMetadata{} } - return v.(*blockMetadata) + return v } func releaseBlockMetadata(bm *blockMetadata) { @@ -206,7 +205,7 @@ func releaseBlockMetadata(bm *blockMetadata) { blockMetadataPool.Put(bm) } -var blockMetadataPool sync.Pool +var blockMetadataPool = pool.Register[*blockMetadata]("measure-blockMetadata") type blockMetadataArray struct { arr []blockMetadata @@ -219,14 +218,14 @@ func (bma *blockMetadataArray) reset() { bma.arr = bma.arr[:0] } -var blockMetadataArrayPool sync.Pool +var blockMetadataArrayPool = pool.Register[*blockMetadataArray]("measure-blockMetadataArray") func generateBlockMetadataArray() *blockMetadataArray { v := blockMetadataArrayPool.Get() if v == nil { return &blockMetadataArray{} } - return v.(*blockMetadataArray) + return v } func releaseBlockMetadataArray(bma *blockMetadataArray) { diff --git a/banyand/measure/block_reader.go b/banyand/measure/block_reader.go index eb221eefb..238b68331 100644 --- a/banyand/measure/block_reader.go +++ b/banyand/measure/block_reader.go @@ -22,11 +22,11 @@ import ( "errors" "fmt" "io" - "sync" "github.com/apache/skywalking-banyandb/pkg/encoding" "github.com/apache/skywalking-banyandb/pkg/fs" "github.com/apache/skywalking-banyandb/pkg/logger" + "github.com/apache/skywalking-banyandb/pkg/pool" ) type seqReader struct { @@ -70,7 +70,7 @@ func (sr *seqReader) mustReadFull(data []byte) { func generateSeqReader() *seqReader { if v := seqReaderPool.Get(); v != nil { - return v.(*seqReader) + return v } return &seqReader{} } @@ -80,7 +80,7 @@ func releaseSeqReader(sr *seqReader) { seqReaderPool.Put(sr) } -var seqReaderPool sync.Pool +var seqReaderPool = pool.Register[*seqReader]("measure-seqReader") type seqReaders struct { tagFamilyMetadata map[string]*seqReader @@ -219,11 +219,11 @@ func (br *blockReader) error() error { return br.err } -var blockReaderPool sync.Pool +var blockReaderPool = pool.Register[*blockReader]("measure-blockReader") func generateBlockReader() *blockReader { if v := blockReaderPool.Get(); v != nil { - return v.(*blockReader) + return v } return &blockReader{} } diff --git a/banyand/measure/block_reader_test.go b/banyand/measure/block_reader_test.go index 63f3184ed..cd576b78d 100644 --- a/banyand/measure/block_reader_test.go +++ b/banyand/measure/block_reader_test.go @@ -62,6 +62,13 @@ func Test_blockReader_nextBlock(t *testing.T) { {seriesID: 3, count: 1, uncompressedSizeBytes: 24}, }, }, + { + name: "Test with a single part with same ts", + dpsList: []*dataPoints{duplicatedDps}, + want: []blockMetadata{ + {seriesID: 1, count: 1, uncompressedSizeBytes: 24}, + }, + }, { name: "Test with multiple parts with same ts", dpsList: []*dataPoints{dpsTS1, dpsTS1}, @@ -77,7 +84,7 @@ func Test_blockReader_nextBlock(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - verify := func(pp []*part) { + verify := func(t *testing.T, pp []*part) { var pii []*partMergeIter for _, p := range pp { pmi := &partMergeIter{} @@ -116,7 +123,7 @@ func Test_blockReader_nextBlock(t *testing.T) { } } - t.Run("memory parts", func(_ *testing.T) { + t.Run("memory parts", func(t *testing.T) { var mpp []*memPart defer func() { for _, mp := range mpp { @@ -130,7 +137,7 @@ func Test_blockReader_nextBlock(t *testing.T) { mp.mustInitFromDataPoints(dps) pp = append(pp, openMemPart(mp)) } - verify(pp) + verify(t, pp) }) t.Run("file parts", func(t *testing.T) { @@ -158,7 +165,7 @@ func Test_blockReader_nextBlock(t *testing.T) { fpp = append(fpp, filePW) pp = append(pp, filePW.p) } - verify(pp) + verify(t, pp) }) }) } diff --git a/banyand/measure/block_writer.go b/banyand/measure/block_writer.go index fd065f1ab..4a07291c5 100644 --- a/banyand/measure/block_writer.go +++ b/banyand/measure/block_writer.go @@ -19,12 +19,12 @@ package measure import ( "path/filepath" - "sync" "github.com/apache/skywalking-banyandb/api/common" "github.com/apache/skywalking-banyandb/pkg/compress/zstd" "github.com/apache/skywalking-banyandb/pkg/fs" "github.com/apache/skywalking-banyandb/pkg/logger" + "github.com/apache/skywalking-banyandb/pkg/pool" ) type writer struct { @@ -285,7 +285,7 @@ func generateBlockWriter() *blockWriter { }, } } - return v.(*blockWriter) + return v } func releaseBlockWriter(bsw *blockWriter) { @@ -293,4 +293,4 @@ func releaseBlockWriter(bsw *blockWriter) { blockWriterPool.Put(bsw) } -var blockWriterPool sync.Pool +var blockWriterPool = pool.Register[*blockWriter]("measure-blockWriter") diff --git a/banyand/measure/column.go b/banyand/measure/column.go index eafa0134a..35f34ab8a 100644 --- a/banyand/measure/column.go +++ b/banyand/measure/column.go @@ -112,7 +112,7 @@ func (c *column) mustSeqReadValues(decoder *encoding.BytesBlockDecoder, reader * } } -var bigValuePool bytes.BufferPool +var bigValuePool = bytes.NewBufferPool("measure-big-value") type columnFamily struct { name string diff --git a/banyand/measure/column_metadata.go b/banyand/measure/column_metadata.go index 3f1eb9571..3f7102af0 100644 --- a/banyand/measure/column_metadata.go +++ b/banyand/measure/column_metadata.go @@ -36,11 +36,11 @@ package measure import ( "fmt" - "sync" "github.com/apache/skywalking-banyandb/pkg/convert" "github.com/apache/skywalking-banyandb/pkg/encoding" pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1" + "github.com/apache/skywalking-banyandb/pkg/pool" ) type columnMetadata struct { @@ -148,7 +148,7 @@ func generateColumnFamilyMetadata() *columnFamilyMetadata { if v == nil { return &columnFamilyMetadata{} } - return v.(*columnFamilyMetadata) + return v } func releaseColumnFamilyMetadata(cfm *columnFamilyMetadata) { @@ -156,4 +156,4 @@ func releaseColumnFamilyMetadata(cfm *columnFamilyMetadata) { columnFamilyMetadataPool.Put(cfm) } -var columnFamilyMetadataPool sync.Pool +var columnFamilyMetadataPool = pool.Register[*columnFamilyMetadata]("measure-columnFamilyMetadata") diff --git a/banyand/measure/datapoints.go b/banyand/measure/datapoints.go index 03b070933..156d4b450 100644 --- a/banyand/measure/datapoints.go +++ b/banyand/measure/datapoints.go @@ -123,6 +123,19 @@ type dataPoints struct { fields []nameValues } +func (d *dataPoints) skip(i int) { + if len(d.timestamps) <= i { + return + } + d.seriesIDs = append(d.seriesIDs[:i], d.seriesIDs[i+1:]...) + d.timestamps = append(d.timestamps[:i], d.timestamps[i+1:]...) + d.versions = append(d.versions[:i], d.versions[i+1:]...) + d.tagFamilies = append(d.tagFamilies[:i], d.tagFamilies[i+1:]...) + if len(d.fields) > 0 { + d.fields = append(d.fields[:i], d.fields[i+1:]...) + } +} + func (d *dataPoints) Len() int { return len(d.seriesIDs) } @@ -131,7 +144,10 @@ func (d *dataPoints) Less(i, j int) bool { if d.seriesIDs[i] != d.seriesIDs[j] { return d.seriesIDs[i] < d.seriesIDs[j] } - return d.timestamps[i] < d.timestamps[j] + if d.timestamps[i] != d.timestamps[j] { + return d.timestamps[i] < d.timestamps[j] + } + return d.versions[i] > d.versions[j] } func (d *dataPoints) Swap(i, j int) { diff --git a/banyand/measure/introducer.go b/banyand/measure/introducer.go index 7fce11d1d..0bfb2e239 100644 --- a/banyand/measure/introducer.go +++ b/banyand/measure/introducer.go @@ -18,8 +18,7 @@ package measure import ( - "sync" - + "github.com/apache/skywalking-banyandb/pkg/pool" "github.com/apache/skywalking-banyandb/pkg/watcher" ) @@ -33,14 +32,14 @@ func (i *introduction) reset() { i.applied = nil } -var introductionPool = sync.Pool{} +var introductionPool = pool.Register[*introduction]("measure-introduction") func generateIntroduction() *introduction { v := introductionPool.Get() if v == nil { return &introduction{} } - i := v.(*introduction) + i := v i.reset() return i } @@ -61,7 +60,7 @@ func (i *flusherIntroduction) reset() { i.applied = nil } -var flusherIntroductionPool = sync.Pool{} +var flusherIntroductionPool = pool.Register[*flusherIntroduction]("measure-flusher-introduction") func generateFlusherIntroduction() *flusherIntroduction { v := flusherIntroductionPool.Get() @@ -70,7 +69,7 @@ func generateFlusherIntroduction() *flusherIntroduction { flushed: make(map[uint64]*partWrapper), } } - i := v.(*flusherIntroduction) + i := v i.reset() return i } @@ -95,14 +94,14 @@ func (i *mergerIntroduction) reset() { i.creator = 0 } -var mergerIntroductionPool = sync.Pool{} +var mergerIntroductionPool = pool.Register[*mergerIntroduction]("measure-merger-introduction") func generateMergerIntroduction() *mergerIntroduction { v := mergerIntroductionPool.Get() if v == nil { return &mergerIntroduction{} } - i := v.(*mergerIntroduction) + i := v i.reset() return i } diff --git a/banyand/measure/part.go b/banyand/measure/part.go index c63838143..41baf29bd 100644 --- a/banyand/measure/part.go +++ b/banyand/measure/part.go @@ -22,7 +22,6 @@ import ( "path" "path/filepath" "sort" - "sync" "sync/atomic" "time" @@ -30,6 +29,7 @@ import ( "github.com/apache/skywalking-banyandb/pkg/bytes" "github.com/apache/skywalking-banyandb/pkg/fs" "github.com/apache/skywalking-banyandb/pkg/logger" + "github.com/apache/skywalking-banyandb/pkg/pool" ) const ( @@ -150,16 +150,29 @@ func (mp *memPart) mustInitFromDataPoints(dps *dataPoints) { var sidPrev common.SeriesID uncompressedBlockSizeBytes := uint64(0) var indexPrev int - for i := range dps.timestamps { + var tsPrev int64 + for i := 0; i < len(dps.timestamps); i++ { sid := dps.seriesIDs[i] if sidPrev == 0 { sidPrev = sid } + if sid == sidPrev { + if tsPrev == dps.timestamps[i] { + dps.skip(i) + i-- + continue + } + tsPrev = dps.timestamps[i] + } else { + tsPrev = 0 + } + if uncompressedBlockSizeBytes >= maxUncompressedBlockSize || (i-indexPrev) > maxBlockLength || sid != sidPrev { bsw.MustWriteDataPoints(sidPrev, dps.timestamps[indexPrev:i], dps.versions[indexPrev:i], dps.tagFamilies[indexPrev:i], dps.fields[indexPrev:i]) sidPrev = sid + tsPrev = 0 indexPrev = i uncompressedBlockSizeBytes = 0 } @@ -209,7 +222,7 @@ func generateMemPart() *memPart { if v == nil { return &memPart{} } - return v.(*memPart) + return v } func releaseMemPart(mp *memPart) { @@ -217,7 +230,7 @@ func releaseMemPart(mp *memPart) { memPartPool.Put(mp) } -var memPartPool sync.Pool +var memPartPool = pool.Register[*memPart]("measure-memPart") type partWrapper struct { mp *memPart @@ -239,6 +252,12 @@ func (pw *partWrapper) decRef() { if n > 0 { return } + if pw.mp != nil { + releaseMemPart(pw.mp) + pw.mp = nil + pw.p = nil + return + } pw.p.close() if pw.removable.Load() && pw.p.fileSystem != nil { go func(pw *partWrapper) { diff --git a/banyand/measure/part_iter.go b/banyand/measure/part_iter.go index 0c916334b..cf67d3207 100644 --- a/banyand/measure/part_iter.go +++ b/banyand/measure/part_iter.go @@ -22,7 +22,6 @@ import ( "fmt" "io" "sort" - "sync" "github.com/apache/skywalking-banyandb/api/common" "github.com/apache/skywalking-banyandb/pkg/bytes" @@ -30,6 +29,7 @@ import ( "github.com/apache/skywalking-banyandb/pkg/encoding" "github.com/apache/skywalking-banyandb/pkg/fs" "github.com/apache/skywalking-banyandb/pkg/logger" + "github.com/apache/skywalking-banyandb/pkg/pool" ) type partIter struct { @@ -327,7 +327,7 @@ func generatePartMergeIter() *partMergeIter { if v == nil { return &partMergeIter{} } - return v.(*partMergeIter) + return v } func releasePartMergeIter(pmi *partMergeIter) { @@ -335,7 +335,7 @@ func releasePartMergeIter(pmi *partMergeIter) { pmiPool.Put(pmi) } -var pmiPool sync.Pool +var pmiPool = pool.Register[*partMergeIter]("measure-partMergeIter") type partMergeIterHeap []*partMergeIter @@ -369,7 +369,7 @@ func generateColumnValuesDecoder() *encoding.BytesBlockDecoder { if v == nil { return &encoding.BytesBlockDecoder{} } - return v.(*encoding.BytesBlockDecoder) + return v } func releaseColumnValuesDecoder(d *encoding.BytesBlockDecoder) { @@ -377,4 +377,4 @@ func releaseColumnValuesDecoder(d *encoding.BytesBlockDecoder) { columnValuesDecoderPool.Put(d) } -var columnValuesDecoderPool sync.Pool +var columnValuesDecoderPool = pool.Register[*encoding.BytesBlockDecoder]("measure-columnValuesDecoder") diff --git a/banyand/measure/query.go b/banyand/measure/query.go index fa94091e4..c9ac2117c 100644 --- a/banyand/measure/query.go +++ b/banyand/measure/query.go @@ -242,9 +242,8 @@ func (s *measure) searchBlocks(ctx context.Context, result *queryResult, sids [] defer releaseBlockMetadataArray(bma) defFn := startBlockScanSpan(ctx, len(sids), parts, result) defer defFn() - // TODO: cache tstIter - var tstIter tstIter - defer tstIter.reset() + tstIter := generateTstIter() + defer releaseTstIter(tstIter) originalSids := make([]common.SeriesID, len(sids)) copy(originalSids, sids) sort.Slice(sids, func(i, j int) bool { return sids[i] < sids[j] }) diff --git a/banyand/measure/tstable.go b/banyand/measure/tstable.go index d2a20b4ae..033979a8b 100644 --- a/banyand/measure/tstable.go +++ b/banyand/measure/tstable.go @@ -33,6 +33,7 @@ import ( "github.com/apache/skywalking-banyandb/api/common" "github.com/apache/skywalking-banyandb/pkg/fs" "github.com/apache/skywalking-banyandb/pkg/logger" + "github.com/apache/skywalking-banyandb/pkg/pool" "github.com/apache/skywalking-banyandb/pkg/run" "github.com/apache/skywalking-banyandb/pkg/timestamp" "github.com/apache/skywalking-banyandb/pkg/watcher" @@ -376,6 +377,21 @@ func (ti *tstIter) Error() error { return ti.err } +func generateTstIter() *tstIter { + v := tstIterPool.Get() + if v == nil { + return &tstIter{} + } + return v +} + +func releaseTstIter(ti *tstIter) { + ti.reset() + tstIterPool.Put(ti) +} + +var tstIterPool = pool.Register[*tstIter]("measure-tstIter") + type partIterHeap []*partIter func (pih *partIterHeap) Len() int { diff --git a/banyand/measure/tstable_test.go b/banyand/measure/tstable_test.go index 57ea15a49..386f57b1d 100644 --- a/banyand/measure/tstable_test.go +++ b/banyand/measure/tstable_test.go @@ -214,6 +214,16 @@ func Test_tstIter(t *testing.T) { {seriesID: 3, count: 1, uncompressedSizeBytes: 24}, }, }, + { + name: "Test with a single part with same ts", + dpsList: []*dataPoints{duplicatedDps}, + sids: []common.SeriesID{1}, + minTimestamp: 1, + maxTimestamp: 1, + want: []blockMetadata{ + {seriesID: 1, count: 1, uncompressedSizeBytes: 24}, + }, + }, { name: "Test with multiple parts with same ts", dpsList: []*dataPoints{dpsTS1, dpsTS1}, @@ -568,6 +578,58 @@ var dpsTS2 = &dataPoints{ }, } +var duplicatedDps = &dataPoints{ + seriesIDs: []common.SeriesID{1, 1, 1}, + timestamps: []int64{1, 1, 1}, + versions: []int64{1, 2, 3}, + tagFamilies: [][]nameValues{ + { + { + name: "arrTag", values: []*nameValue{ + {name: "strArrTag", valueType: pbv1.ValueTypeStrArr, value: nil, valueArr: [][]byte{[]byte("value1"), []byte("value2")}}, + {name: "intArrTag", valueType: pbv1.ValueTypeInt64Arr, value: nil, valueArr: [][]byte{convert.Int64ToBytes(25), convert.Int64ToBytes(30)}}, + }, + }, + { + name: "binaryTag", values: []*nameValue{ + {name: "binaryTag", valueType: pbv1.ValueTypeBinaryData, value: longText, valueArr: nil}, + }, + }, + { + name: "singleTag", values: []*nameValue{ + {name: "strTag", valueType: pbv1.ValueTypeStr, value: []byte("value1"), valueArr: nil}, + {name: "intTag", valueType: pbv1.ValueTypeInt64, value: convert.Int64ToBytes(10), valueArr: nil}, + }, + }, + }, + { + { + name: "singleTag", values: []*nameValue{ + {name: "strTag1", valueType: pbv1.ValueTypeStr, value: []byte("tag1"), valueArr: nil}, + {name: "strTag2", valueType: pbv1.ValueTypeStr, value: []byte("tag2"), valueArr: nil}, + }, + }, + }, + {}, // empty tagFamilies for seriesID 3 + }, + fields: []nameValues{ + { + name: "skipped", values: []*nameValue{ + {name: "strField", valueType: pbv1.ValueTypeStr, value: []byte("field1"), valueArr: nil}, + {name: "intField", valueType: pbv1.ValueTypeInt64, value: convert.Int64ToBytes(1110), valueArr: nil}, + {name: "floatField", valueType: pbv1.ValueTypeFloat64, value: convert.Float64ToBytes(1221233.343), valueArr: nil}, + {name: "binaryField", valueType: pbv1.ValueTypeBinaryData, value: longText, valueArr: nil}, + }, + }, + {}, // empty fields for seriesID 2 + { + name: "onlyFields", values: []*nameValue{ + {name: "intField", valueType: pbv1.ValueTypeInt64, value: convert.Int64ToBytes(1110), valueArr: nil}, + }, + }, + }, +} + func generateHugeDps(startTimestamp, endTimestamp, timestamp int64) *dataPoints { hugeDps := &dataPoints{ seriesIDs: []common.SeriesID{}, diff --git a/banyand/observability/meter_prom.go b/banyand/observability/meter_prom.go index 260b442fc..ea5d2d8a7 100644 --- a/banyand/observability/meter_prom.go +++ b/banyand/observability/meter_prom.go @@ -18,6 +18,7 @@ package observability import ( + "net/http" "sync" grpcprom "github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus" @@ -42,10 +43,6 @@ var ( func init() { reg.MustRegister(collectors.NewGoCollector()) reg.MustRegister(collectors.NewProcessCollector(collectors.ProcessCollectorOpts{})) - metricsMux.Handle("/metrics", promhttp.HandlerFor( - reg, - promhttp.HandlerOpts{}, - )) } // NewMeterProvider returns a meter.Provider based on the given scope. @@ -53,6 +50,13 @@ func newPromMeterProvider() meter.Provider { return prom.NewProvider(SystemScope, reg) } +func registerMetricsEndpoint(metricsMux *http.ServeMux) { + metricsMux.Handle("/metrics", promhttp.HandlerFor( + reg, + promhttp.HandlerOpts{}, + )) +} + // MetricsServerInterceptor returns a server interceptor for metrics. func promMetricsServerInterceptor() (grpc.UnaryServerInterceptor, grpc.StreamServerInterceptor) { once.Do(func() { diff --git a/banyand/observability/metrics_system.go b/banyand/observability/metrics_system.go index ae391b734..7717db5dc 100644 --- a/banyand/observability/metrics_system.go +++ b/banyand/observability/metrics_system.go @@ -55,8 +55,22 @@ var ( upTimeGauge meter.Gauge diskStateGauge meter.Gauge initMetricsOnce sync.Once + diskMap = sync.Map{} ) +// UpdatePath updates a path to monitoring its disk usage. +func UpdatePath(path string) { + diskMap.Store(path, nil) +} + +func getPath() (paths []string) { + diskMap.Range(func(key, _ any) bool { + paths = append(paths, key.(string)) + return true + }) + return paths +} + func init() { MetricsCollector.Register("cpu", collectCPU) MetricsCollector.Register("memory", collectMemory) @@ -169,7 +183,7 @@ func collectUpTime() { } func collectDisk() { - for path := range getPath() { + for _, path := range getPath() { usage, err := disk.Usage(path) if err != nil { if _, err = os.Stat(path); err != nil { diff --git a/banyand/observability/service.go b/banyand/observability/service.go index 8e41c5ed0..4170b09f5 100644 --- a/banyand/observability/service.go +++ b/banyand/observability/service.go @@ -42,9 +42,8 @@ const ( ) var ( - _ run.Service = (*metricService)(nil) - _ run.Config = (*metricService)(nil) - metricsMux = http.NewServeMux() + _ run.Service = (*metricService)(nil) + _ run.Config = (*metricService)(nil) // MetricsServerInterceptor is the function to obtain grpc metrics interceptor. MetricsServerInterceptor func() (grpc.UnaryServerInterceptor, grpc.StreamServerInterceptor) = emptyMetricsServerInterceptor ) @@ -146,6 +145,11 @@ func (p *metricService) Serve() run.StopNotify { if err != nil { p.l.Fatal().Err(err).Msg("Failed to register metrics collector") } + metricsMux := http.NewServeMux() + metricsMux.HandleFunc("/_route", p.routeTableHandler) + if containsMode(p.modes, flagPromethusMode) { + registerMetricsEndpoint(metricsMux) + } if containsMode(p.modes, flagNativeMode) { err = p.scheduler.Register("native-metric-collection", cron.Descriptor, "@every 5s", func(_ time.Time, _ *logger.Logger) bool { NativeMetricCollection.FlushMetrics() @@ -180,6 +184,12 @@ func (p *metricService) GracefulStop() { p.closer.CloseThenWait() } +func (p *metricService) routeTableHandler(w http.ResponseWriter, _ *http.Request) { + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + _, _ = w.Write([]byte(p.nodeSelector.String())) +} + func containsMode(modes []string, mode string) bool { for _, item := range modes { if item == mode { diff --git a/banyand/observability/system.go b/banyand/observability/system.go deleted file mode 100644 index 20aa68b6f..000000000 --- a/banyand/observability/system.go +++ /dev/null @@ -1,162 +0,0 @@ -// Licensed to Apache Software Foundation (ASF) under one or more contributor -// license agreements. See the NOTICE file distributed with -// this work for additional information regarding copyright -// ownership. Apache Software Foundation (ASF) licenses this file to you under -// the Apache License, Version 2.0 (the "License"); you may -// not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package observability - -import ( - "encoding/json" - "net/http" - "os" - "sync" - - "github.com/shirou/gopsutil/v3/cpu" - "github.com/shirou/gopsutil/v3/disk" - "github.com/shirou/gopsutil/v3/host" - "github.com/shirou/gopsutil/v3/mem" - - "github.com/apache/skywalking-banyandb/pkg/logger" -) - -var systemInfoInstance *SystemInfo - -func init() { - systemInfoInstance = &SystemInfo{ - Addresses: make(map[string]string), - DiskUsages: make(map[string]*DiskUsage), - } -} - -// UpdateAddress updates the address of the given name. -func UpdateAddress(name, address string) { - systemInfoInstance.Lock() - defer systemInfoInstance.Unlock() - systemInfoInstance.Addresses[name] = address -} - -func getAddresses() map[string]string { - systemInfoInstance.RLock() - defer systemInfoInstance.RUnlock() - return systemInfoInstance.Addresses -} - -// UpdatePath updates a path to monitoring its disk usage. -func UpdatePath(path string) { - systemInfoInstance.Lock() - defer systemInfoInstance.Unlock() - systemInfoInstance.DiskUsages[path] = nil -} - -func getPath() map[string]*DiskUsage { - systemInfoInstance.RLock() - defer systemInfoInstance.RUnlock() - return systemInfoInstance.DiskUsages -} - -// SystemInfo represents the system information of a node. -type SystemInfo struct { - Addresses map[string]string `json:"addresses"` - DiskUsages map[string]*DiskUsage `json:"disk_usages"` - NodeID string `json:"node_id"` - Hostname string `json:"hostname"` - Roles []string `json:"roles"` - Uptime uint64 `json:"uptime"` - CPUUsage float64 `json:"cpu_usage"` - MemoryUsage float64 `json:"memory_usage"` - sync.RWMutex -} - -// DiskUsage represents the disk usage for a given path. -type DiskUsage struct { - Capacity uint64 `json:"capacity"` - Used uint64 `json:"used"` -} - -// ErrorResponse represents the error response. -type ErrorResponse struct { - Message string `json:"message"` - OriginalError string `json:"original_error,omitempty"` -} - -func init() { - metricsMux.HandleFunc("/system", systemInfoHandler) -} - -func systemInfoHandler(w http.ResponseWriter, _ *http.Request) { - hostname, _ := os.Hostname() - uptime, _ := getUptime() - cpuUsage, _ := getCPUUsage() - memoryUsage, _ := getMemoryUsage() - - systemInfo := &SystemInfo{ - NodeID: "1", - Roles: []string{"meta", "ingest", "query", "data"}, - Hostname: hostname, - Uptime: uptime, - CPUUsage: cpuUsage, - MemoryUsage: memoryUsage, - Addresses: getAddresses(), - DiskUsages: make(map[string]*DiskUsage), - } - for k := range getPath() { - usage, _ := getDiskUsage(k) - systemInfo.DiskUsages[k] = &usage - } - w.Header().Set("Content-Type", "application/json") - w.WriteHeader(http.StatusOK) - if err := json.NewEncoder(w).Encode([]*SystemInfo{systemInfo}); err != nil { - w.WriteHeader(http.StatusInternalServerError) - errorResponse := &ErrorResponse{ - Message: "Error encoding JSON response", - OriginalError: err.Error(), - } - if err := json.NewEncoder(w).Encode(errorResponse); err != nil { - logger.GetLogger().Error().Err(err).Msg("Error encoding JSON response") - } - } -} - -func getUptime() (uint64, error) { - uptime, err := host.Uptime() - if err != nil { - return 0, err - } - return uptime, nil -} - -func getCPUUsage() (float64, error) { - percentages, err := cpu.Percent(0, false) - if err != nil { - return 0, err - } - return percentages[0], nil -} - -func getMemoryUsage() (float64, error) { - vm, err := mem.VirtualMemory() - if err != nil { - return 0, err - } - return vm.UsedPercent, nil -} - -func getDiskUsage(path string) (DiskUsage, error) { - usage, err := disk.Usage(path) - if err != nil { - return DiskUsage{}, err - } - return DiskUsage{Capacity: usage.Total, Used: usage.Used}, nil -} diff --git a/banyand/queue/sub/server.go b/banyand/queue/sub/server.go index 2b880f9f0..829e460a1 100644 --- a/banyand/queue/sub/server.go +++ b/banyand/queue/sub/server.go @@ -112,7 +112,6 @@ func (s *server) Validate() error { if s.addr == ":" { return errNoAddr } - observability.UpdateAddress("grpc", s.addr) if !s.tls { return nil } diff --git a/banyand/stream/block.go b/banyand/stream/block.go index bd0db065b..e6e7b964a 100644 --- a/banyand/stream/block.go +++ b/banyand/stream/block.go @@ -19,7 +19,6 @@ package stream import ( "sort" - "sync" "golang.org/x/exp/slices" @@ -31,6 +30,7 @@ import ( "github.com/apache/skywalking-banyandb/pkg/index/posting" "github.com/apache/skywalking-banyandb/pkg/logger" pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1" + "github.com/apache/skywalking-banyandb/pkg/pool" "github.com/apache/skywalking-banyandb/pkg/query/model" "github.com/apache/skywalking-banyandb/pkg/timestamp" ) @@ -351,7 +351,7 @@ func generateBlock() *block { if v == nil { return &block{} } - return v.(*block) + return v } func releaseBlock(b *block) { @@ -359,7 +359,7 @@ func releaseBlock(b *block) { blockPool.Put(b) } -var blockPool sync.Pool +var blockPool = pool.Register[*block]("stream-block") type blockCursor struct { p *part @@ -559,14 +559,14 @@ func (bc *blockCursor) loadData(tmpBlock *block) bool { return true } -var blockCursorPool sync.Pool +var blockCursorPool = pool.Register[*blockCursor]("stream-blockCursor") func generateBlockCursor() *blockCursor { v := blockCursorPool.Get() if v == nil { return &blockCursor{} } - return v.(*blockCursor) + return v } func releaseBlockCursor(bc *blockCursor) { @@ -668,7 +668,7 @@ func generateBlockPointer() *blockPointer { if v == nil { return &blockPointer{} } - return v.(*blockPointer) + return v } func releaseBlockPointer(bi *blockPointer) { @@ -676,4 +676,4 @@ func releaseBlockPointer(bi *blockPointer) { blockPointerPool.Put(bi) } -var blockPointerPool sync.Pool +var blockPointerPool = pool.Register[*blockPointer]("stream-blockPointer") diff --git a/banyand/stream/block_metadata.go b/banyand/stream/block_metadata.go index 9f0091ee5..4f410cd59 100644 --- a/banyand/stream/block_metadata.go +++ b/banyand/stream/block_metadata.go @@ -21,11 +21,11 @@ import ( "errors" "fmt" "sort" - "sync" "github.com/apache/skywalking-banyandb/api/common" "github.com/apache/skywalking-banyandb/pkg/convert" "github.com/apache/skywalking-banyandb/pkg/encoding" + "github.com/apache/skywalking-banyandb/pkg/pool" "github.com/apache/skywalking-banyandb/pkg/query/model" ) @@ -175,7 +175,6 @@ func (bm *blockMetadata) unmarshal(src []byte) ([]byte, error) { if err != nil { return nil, fmt.Errorf("cannot unmarshal tagFamily name: %w", err) } - // TODO: cache dataBlock tf := &dataBlock{} src, err = tf.unmarshal(src) if err != nil { @@ -202,7 +201,7 @@ func generateBlockMetadata() *blockMetadata { if v == nil { return &blockMetadata{} } - return v.(*blockMetadata) + return v } func releaseBlockMetadata(bm *blockMetadata) { @@ -210,7 +209,7 @@ func releaseBlockMetadata(bm *blockMetadata) { blockMetadataPool.Put(bm) } -var blockMetadataPool sync.Pool +var blockMetadataPool = pool.Register[*blockMetadata]("stream-blockMetadata") type blockMetadataArray struct { arr []blockMetadata @@ -223,14 +222,14 @@ func (bma *blockMetadataArray) reset() { bma.arr = bma.arr[:0] } -var blockMetadataArrayPool sync.Pool +var blockMetadataArrayPool = pool.Register[*blockMetadataArray]("stream-blockMetadataArray") func generateBlockMetadataArray() *blockMetadataArray { v := blockMetadataArrayPool.Get() if v == nil { return &blockMetadataArray{} } - return v.(*blockMetadataArray) + return v } func releaseBlockMetadataArray(bma *blockMetadataArray) { diff --git a/banyand/stream/block_reader.go b/banyand/stream/block_reader.go index 60701515b..1c8a987cd 100644 --- a/banyand/stream/block_reader.go +++ b/banyand/stream/block_reader.go @@ -22,11 +22,11 @@ import ( "errors" "fmt" "io" - "sync" "github.com/apache/skywalking-banyandb/pkg/encoding" "github.com/apache/skywalking-banyandb/pkg/fs" "github.com/apache/skywalking-banyandb/pkg/logger" + "github.com/apache/skywalking-banyandb/pkg/pool" ) type seqReader struct { @@ -70,7 +70,7 @@ func (sr *seqReader) mustReadFull(data []byte) { func generateSeqReader() *seqReader { if v := seqReaderPool.Get(); v != nil { - return v.(*seqReader) + return v } return &seqReader{} } @@ -80,7 +80,7 @@ func releaseSeqReader(sr *seqReader) { seqReaderPool.Put(sr) } -var seqReaderPool sync.Pool +var seqReaderPool = pool.Register[*seqReader]("stream-seqReader") type seqReaders struct { tagFamilyMetadata map[string]*seqReader @@ -216,11 +216,11 @@ func (br *blockReader) error() error { return br.err } -var blockReaderPool sync.Pool +var blockReaderPool = pool.Register[*blockReader]("stream-blockReader") func generateBlockReader() *blockReader { if v := blockReaderPool.Get(); v != nil { - return v.(*blockReader) + return v } return &blockReader{} } diff --git a/banyand/stream/block_writer.go b/banyand/stream/block_writer.go index 5125bd85a..f55f9ebfb 100644 --- a/banyand/stream/block_writer.go +++ b/banyand/stream/block_writer.go @@ -19,12 +19,12 @@ package stream import ( "path/filepath" - "sync" "github.com/apache/skywalking-banyandb/api/common" "github.com/apache/skywalking-banyandb/pkg/compress/zstd" "github.com/apache/skywalking-banyandb/pkg/fs" "github.com/apache/skywalking-banyandb/pkg/logger" + "github.com/apache/skywalking-banyandb/pkg/pool" ) type writer struct { @@ -279,7 +279,7 @@ func generateBlockWriter() *blockWriter { }, } } - return v.(*blockWriter) + return v } func releaseBlockWriter(bsw *blockWriter) { @@ -287,4 +287,4 @@ func releaseBlockWriter(bsw *blockWriter) { blockWriterPool.Put(bsw) } -var blockWriterPool sync.Pool +var blockWriterPool = pool.Register[*blockWriter]("stream-blockWriter") diff --git a/banyand/stream/introducer.go b/banyand/stream/introducer.go index 76c6e659d..072e8b39c 100644 --- a/banyand/stream/introducer.go +++ b/banyand/stream/introducer.go @@ -18,8 +18,7 @@ package stream import ( - "sync" - + "github.com/apache/skywalking-banyandb/pkg/pool" "github.com/apache/skywalking-banyandb/pkg/watcher" ) @@ -33,14 +32,14 @@ func (i *introduction) reset() { i.applied = nil } -var introductionPool = sync.Pool{} +var introductionPool = pool.Register[*introduction]("stream-introduction") func generateIntroduction() *introduction { v := introductionPool.Get() if v == nil { return &introduction{} } - intro := v.(*introduction) + intro := v intro.reset() return intro } @@ -61,7 +60,7 @@ func (i *flusherIntroduction) reset() { i.applied = nil } -var flusherIntroductionPool = sync.Pool{} +var flusherIntroductionPool = pool.Register[*flusherIntroduction]("stream-flusher-introduction") func generateFlusherIntroduction() *flusherIntroduction { v := flusherIntroductionPool.Get() @@ -70,7 +69,7 @@ func generateFlusherIntroduction() *flusherIntroduction { flushed: make(map[uint64]*partWrapper), } } - fi := v.(*flusherIntroduction) + fi := v fi.reset() return fi } @@ -95,14 +94,14 @@ func (i *mergerIntroduction) reset() { i.creator = 0 } -var mergerIntroductionPool = sync.Pool{} +var mergerIntroductionPool = pool.Register[*mergerIntroduction]("stream-merger-introduction") func generateMergerIntroduction() *mergerIntroduction { v := mergerIntroductionPool.Get() if v == nil { return &mergerIntroduction{} } - mi := v.(*mergerIntroduction) + mi := v mi.reset() return mi } diff --git a/banyand/stream/part.go b/banyand/stream/part.go index b6ad05c71..228e607c1 100644 --- a/banyand/stream/part.go +++ b/banyand/stream/part.go @@ -22,7 +22,6 @@ import ( "path" "path/filepath" "sort" - "sync" "sync/atomic" "time" @@ -30,6 +29,7 @@ import ( "github.com/apache/skywalking-banyandb/pkg/bytes" "github.com/apache/skywalking-banyandb/pkg/fs" "github.com/apache/skywalking-banyandb/pkg/logger" + "github.com/apache/skywalking-banyandb/pkg/pool" ) const ( @@ -198,7 +198,7 @@ func generateMemPart() *memPart { if v == nil { return &memPart{} } - return v.(*memPart) + return v } func releaseMemPart(mp *memPart) { @@ -206,7 +206,7 @@ func releaseMemPart(mp *memPart) { memPartPool.Put(mp) } -var memPartPool sync.Pool +var memPartPool = pool.Register[*memPart]("stream-memPart") type partWrapper struct { mp *memPart diff --git a/banyand/stream/part_iter.go b/banyand/stream/part_iter.go index e9c24b985..f42770327 100644 --- a/banyand/stream/part_iter.go +++ b/banyand/stream/part_iter.go @@ -22,7 +22,6 @@ import ( "fmt" "io" "sort" - "sync" "github.com/apache/skywalking-banyandb/api/common" "github.com/apache/skywalking-banyandb/pkg/bytes" @@ -30,6 +29,7 @@ import ( "github.com/apache/skywalking-banyandb/pkg/encoding" "github.com/apache/skywalking-banyandb/pkg/fs" "github.com/apache/skywalking-banyandb/pkg/logger" + "github.com/apache/skywalking-banyandb/pkg/pool" ) type partIter struct { @@ -324,7 +324,7 @@ func generatePartMergeIter() *partMergeIter { if v == nil { return &partMergeIter{} } - return v.(*partMergeIter) + return v } func releasePartMergeIter(pmi *partMergeIter) { @@ -332,7 +332,7 @@ func releasePartMergeIter(pmi *partMergeIter) { pmiPool.Put(pmi) } -var pmiPool sync.Pool +var pmiPool = pool.Register[*partMergeIter]("stream-partMergeIter") type partMergeIterHeap []*partMergeIter @@ -366,7 +366,7 @@ func generateColumnValuesDecoder() *encoding.BytesBlockDecoder { if v == nil { return &encoding.BytesBlockDecoder{} } - return v.(*encoding.BytesBlockDecoder) + return v } func releaseColumnValuesDecoder(d *encoding.BytesBlockDecoder) { @@ -374,4 +374,4 @@ func releaseColumnValuesDecoder(d *encoding.BytesBlockDecoder) { columnValuesDecoderPool.Put(d) } -var columnValuesDecoderPool sync.Pool +var columnValuesDecoderPool = pool.Register[*encoding.BytesBlockDecoder]("stream-columnValuesDecoder") diff --git a/banyand/stream/query.go b/banyand/stream/query.go index 0a5ebdc35..7c7661360 100644 --- a/banyand/stream/query.go +++ b/banyand/stream/query.go @@ -190,9 +190,8 @@ func (qr *queryResult) scanParts(ctx context.Context, qo queryOptions) error { defer releaseBlockMetadataArray(bma) defFn := startBlockScanSpan(ctx, len(qo.sortedSids), parts, qr) defer defFn() - // TODO: cache tstIter - var ti tstIter - defer ti.reset() + ti := generateTstIter() + defer releaseTstIter(ti) sids := qo.sortedSids ti.init(bma, parts, sids, qo.minTimestamp, qo.maxTimestamp) if ti.Error() != nil { @@ -288,6 +287,7 @@ func (qr *queryResult) load(ctx context.Context, qo queryOptions) *model.StreamR return blankCursorList[i] > blankCursorList[j] }) for _, index := range blankCursorList { + releaseBlockCursor(qr.data[index]) qr.data = append(qr.data[:index], qr.data[index+1:]...) } qr.loaded = true @@ -610,6 +610,7 @@ func mustDecodeTagValue(valueType pbv1.ValueType, value []byte) *modelv1.TagValu case pbv1.ValueTypeStrArr: var values []string bb := bigValuePool.Generate() + defer bigValuePool.Release(bb) var err error for len(value) > 0 { bb.Buf, value, err = unmarshalVarArray(bb.Buf[:0], value) diff --git a/banyand/stream/tag.go b/banyand/stream/tag.go index 7f5459c0b..b7954810e 100644 --- a/banyand/stream/tag.go +++ b/banyand/stream/tag.go @@ -112,7 +112,7 @@ func (t *tag) mustSeqReadValues(decoder *encoding.BytesBlockDecoder, reader *seq } } -var bigValuePool bytes.BufferPool +var bigValuePool = bytes.NewBufferPool("stream-big-value") type tagFamily struct { name string diff --git a/banyand/stream/tag_metadata.go b/banyand/stream/tag_metadata.go index d044f0ec7..d470b6f90 100644 --- a/banyand/stream/tag_metadata.go +++ b/banyand/stream/tag_metadata.go @@ -19,11 +19,11 @@ package stream import ( "fmt" - "sync" "github.com/apache/skywalking-banyandb/pkg/convert" "github.com/apache/skywalking-banyandb/pkg/encoding" pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1" + "github.com/apache/skywalking-banyandb/pkg/pool" ) type tagMetadata struct { @@ -131,7 +131,7 @@ func generateTagFamilyMetadata() *tagFamilyMetadata { if v == nil { return &tagFamilyMetadata{} } - return v.(*tagFamilyMetadata) + return v } func releaseTagFamilyMetadata(tfm *tagFamilyMetadata) { @@ -139,4 +139,4 @@ func releaseTagFamilyMetadata(tfm *tagFamilyMetadata) { tagFamilyMetadataPool.Put(tfm) } -var tagFamilyMetadataPool sync.Pool +var tagFamilyMetadataPool = pool.Register[*tagFamilyMetadata]("stream-tagFamilyMetadata") diff --git a/banyand/stream/tstable.go b/banyand/stream/tstable.go index dff93b399..ea329ff1d 100644 --- a/banyand/stream/tstable.go +++ b/banyand/stream/tstable.go @@ -34,6 +34,7 @@ import ( "github.com/apache/skywalking-banyandb/api/common" "github.com/apache/skywalking-banyandb/pkg/fs" "github.com/apache/skywalking-banyandb/pkg/logger" + "github.com/apache/skywalking-banyandb/pkg/pool" "github.com/apache/skywalking-banyandb/pkg/run" "github.com/apache/skywalking-banyandb/pkg/timestamp" "github.com/apache/skywalking-banyandb/pkg/watcher" @@ -388,6 +389,21 @@ func (ti *tstIter) Error() error { return ti.err } +func generateTstIter() *tstIter { + v := tstIterPool.Get() + if v == nil { + return &tstIter{} + } + return v +} + +func releaseTstIter(ti *tstIter) { + ti.reset() + tstIterPool.Put(ti) +} + +var tstIterPool = pool.Register[*tstIter]("stream-tstIter") + type partIterHeap []*partIter func (pih *partIterHeap) Len() int { diff --git a/pkg/bytes/buffer.go b/pkg/bytes/buffer.go index 595ce54cb..c657cde97 100644 --- a/pkg/bytes/buffer.go +++ b/pkg/bytes/buffer.go @@ -21,9 +21,9 @@ package bytes import ( "fmt" "io" - "sync" "github.com/apache/skywalking-banyandb/pkg/fs" + "github.com/apache/skywalking-banyandb/pkg/pool" ) var ( @@ -107,9 +107,16 @@ func (r *reader) Close() error { return nil } +// NewBufferPool creates a new BufferPool. +func NewBufferPool(name string) *BufferPool { + return &BufferPool{ + p: pool.Register[*Buffer](name), + } +} + // BufferPool is a pool of Buffer. type BufferPool struct { - p sync.Pool + p *pool.Synced[*Buffer] } // Generate generates a Buffer. @@ -118,7 +125,7 @@ func (bp *BufferPool) Generate() *Buffer { if bbv == nil { return &Buffer{} } - return bbv.(*Buffer) + return bbv } // Release releases a Buffer. diff --git a/pkg/encoding/bytes.go b/pkg/encoding/bytes.go index 6aab3f89d..25ece6fa3 100644 --- a/pkg/encoding/bytes.go +++ b/pkg/encoding/bytes.go @@ -297,4 +297,4 @@ func decompressBlock(dst, src []byte) ([]byte, []byte, error) { } } -var bbPool bytes.BufferPool +var bbPool = bytes.NewBufferPool("encoding.bytesBlock") diff --git a/pkg/encoding/encoder.go b/pkg/encoding/encoder.go deleted file mode 100644 index 78add92cd..000000000 --- a/pkg/encoding/encoder.go +++ /dev/null @@ -1,292 +0,0 @@ -// Licensed to Apache Software Foundation (ASF) under one or more contributor -// license agreements. See the NOTICE file distributed with -// this work for additional information regarding copyright -// ownership. Apache Software Foundation (ASF) licenses this file to you under -// the Apache License, Version 2.0 (the "License"); you may -// not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package encoding - -import ( - "bytes" - "encoding/binary" - "io" - "sync" - "time" - - "github.com/pkg/errors" - - "github.com/apache/skywalking-banyandb/pkg/convert" -) - -var ( - encoderPool = sync.Pool{ - New: newEncoder, - } - decoderPool = sync.Pool{ - New: func() interface{} { - return &decoder{} - }, - } - - errInvalidValue = errors.New("invalid encoded value") - errNoData = errors.New("there is no data") -) - -type encoderPoolDelegator struct { - pool *sync.Pool - fn ParseInterval - name string - size int -} - -// NewEncoderPool returns a SeriesEncoderPool which provides int-based xor encoders. -func NewEncoderPool(name string, size int, fn ParseInterval) SeriesEncoderPool { - return &encoderPoolDelegator{ - name: name, - pool: &encoderPool, - size: size, - fn: fn, - } -} - -func (b *encoderPoolDelegator) Get(metadata []byte, buffer BufferWriter) SeriesEncoder { - encoder := b.pool.Get().(*encoder) - encoder.name = b.name - encoder.size = b.size - encoder.fn = b.fn - encoder.Reset(metadata, buffer) - return encoder -} - -func (b *encoderPoolDelegator) Put(seriesEncoder SeriesEncoder) { - _, ok := seriesEncoder.(*encoder) - if ok { - b.pool.Put(seriesEncoder) - } -} - -type decoderPoolDelegator struct { - pool *sync.Pool - fn ParseInterval - name string - size int -} - -// NewDecoderPool returns a SeriesDecoderPool which provides int-based xor decoders. -func NewDecoderPool(name string, size int, fn ParseInterval) SeriesDecoderPool { - return &decoderPoolDelegator{ - name: name, - pool: &decoderPool, - size: size, - fn: fn, - } -} - -func (b *decoderPoolDelegator) Get(_ []byte) SeriesDecoder { - decoder := b.pool.Get().(*decoder) - decoder.name = b.name - decoder.size = b.size - decoder.fn = b.fn - return decoder -} - -func (b *decoderPoolDelegator) Put(seriesDecoder SeriesDecoder) { - _, ok := seriesDecoder.(*decoder) - if ok { - b.pool.Put(seriesDecoder) - } -} - -var _ SeriesEncoder = (*encoder)(nil) - -// ParseInterval parses the interval rule from the key in a kv pair. -type ParseInterval = func(key []byte) time.Duration - -type encoder struct { - buff BufferWriter - bw *Writer - values *XOREncoder - fn ParseInterval - name string - interval time.Duration - startTime uint64 - prevTime uint64 - num int - size int -} - -func newEncoder() interface{} { - bw := NewWriter() - return &encoder{ - bw: bw, - values: NewXOREncoder(bw), - } -} - -func (ie *encoder) Append(ts uint64, value []byte) { - if len(value) > 8 { - return - } - if ie.startTime == 0 { - ie.startTime = ts - ie.prevTime = ts - } else if ie.startTime > ts { - ie.startTime = ts - } - gap := int(ie.prevTime) - int(ts) - if gap < 0 { - return - } - zeroNum := gap/int(ie.interval) - 1 - for i := 0; i < zeroNum; i++ { - ie.bw.WriteBool(false) - ie.num++ - } - ie.prevTime = ts - l := len(value) - ie.bw.WriteBool(l > 0) - ie.values.Write(convert.BytesToUint64(value)) - ie.num++ -} - -func (ie *encoder) IsFull() bool { - return ie.num >= ie.size -} - -func (ie *encoder) Reset(key []byte, buffer BufferWriter) { - ie.buff = buffer - ie.bw.Reset(buffer) - ie.interval = ie.fn(key) - ie.startTime = 0 - ie.prevTime = 0 - ie.num = 0 - ie.values = NewXOREncoder(ie.bw) -} - -func (ie *encoder) Encode() error { - ie.bw.Flush() - buffWriter := NewPacker(ie.buff) - buffWriter.PutUint64(ie.startTime) - buffWriter.PutUint16(uint16(ie.num)) - return nil -} - -func (ie *encoder) StartTime() uint64 { - return ie.startTime -} - -var _ SeriesDecoder = (*decoder)(nil) - -type decoder struct { - fn ParseInterval - name string - area []byte - size int - interval time.Duration - startTime uint64 - num int -} - -func (i *decoder) Decode(key, data []byte) error { - if len(data) < 10 { - return errInvalidValue - } - i.interval = i.fn(key) - i.startTime = binary.LittleEndian.Uint64(data[len(data)-10 : len(data)-2]) - i.num = int(binary.LittleEndian.Uint16(data[len(data)-2:])) - i.area = data[:len(data)-10] - return nil -} - -func (i decoder) Len() int { - return i.num -} - -func (i decoder) IsFull() bool { - return i.num >= i.size -} - -func (i decoder) Get(ts uint64) ([]byte, error) { - for iter := i.Iterator(); iter.Next(); { - if iter.Time() == ts { - return iter.Val(), nil - } - } - return nil, errors.WithMessagef(errNoData, "ts:%d", ts) -} - -func (i decoder) Range() (start, end uint64) { - return i.startTime, i.startTime + uint64(i.num-1)*uint64(i.interval) -} - -func (i decoder) Iterator() SeriesIterator { - br := NewReader(bytes.NewReader(i.area)) - return &intIterator{ - endTime: i.startTime + uint64(i.num*int(i.interval)), - interval: int(i.interval), - br: br, - values: NewXORDecoder(br), - size: i.num, - } -} - -var _ SeriesIterator = (*intIterator)(nil) - -type intIterator struct { - err error - br *Reader - values *XORDecoder - endTime uint64 - interval int - size int - currVal uint64 - currTime uint64 - index int -} - -func (i *intIterator) Next() bool { - if i.index >= i.size { - return false - } - var b bool - var err error - for !b { - b, err = i.br.ReadBool() - if errors.Is(err, io.EOF) { - return false - } - if err != nil { - i.err = err - return false - } - i.index++ - i.currTime = i.endTime - uint64(i.interval*i.index) - } - if i.values.Next() { - i.currVal = i.values.Value() - } - return true -} - -func (i *intIterator) Val() []byte { - return convert.Uint64ToBytes(i.currVal) -} - -func (i *intIterator) Time() uint64 { - return i.currTime -} - -func (i *intIterator) Error() error { - return i.err -} diff --git a/pkg/encoding/encoder_test.go b/pkg/encoding/encoder_test.go deleted file mode 100644 index 840a883f4..000000000 --- a/pkg/encoding/encoder_test.go +++ /dev/null @@ -1,392 +0,0 @@ -// Licensed to Apache Software Foundation (ASF) under one or more contributor -// license agreements. See the NOTICE file distributed with -// this work for additional information regarding copyright -// ownership. Apache Software Foundation (ASF) licenses this file to you under -// the Apache License, Version 2.0 (the "License"); you may -// not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package encoding - -import ( - "bytes" - "testing" - "time" - - "github.com/stretchr/testify/assert" - - "github.com/apache/skywalking-banyandb/pkg/convert" -) - -func TestNewEncoderAndDecoder(t *testing.T) { - type tsData struct { - ts []uint64 - data []any - start uint64 - end uint64 - } - tests := []struct { - name string - args tsData - want tsData - }{ - { - name: "int golden path", - args: tsData{ - ts: []uint64{uint64(4 * time.Minute), uint64(3 * time.Minute), uint64(2 * time.Minute), uint64(1 * time.Minute)}, - data: []any{7, 8, 7, 9}, - }, - want: tsData{ - ts: []uint64{uint64(4 * time.Minute), uint64(3 * time.Minute), uint64(2 * time.Minute), uint64(1 * time.Minute)}, - data: []any{7, 8, 7, 9}, - start: uint64(time.Minute), - end: uint64(4 * time.Minute), - }, - }, - { - name: "int more than the size", - args: tsData{ - ts: []uint64{uint64(4 * time.Minute), uint64(3 * time.Minute), uint64(2 * time.Minute), uint64(1 * time.Minute), uint64(1 * time.Minute)}, - data: []any{7, 8, 7, 9, 6}, - }, - want: tsData{ - ts: []uint64{uint64(4 * time.Minute), uint64(3 * time.Minute), uint64(2 * time.Minute), uint64(1 * time.Minute)}, - data: []any{7, 8, 7, 9}, - start: uint64(time.Minute), - end: uint64(4 * time.Minute), - }, - }, - { - name: "int less than the size", - args: tsData{ - ts: []uint64{uint64(3 * time.Minute), uint64(2 * time.Minute), uint64(time.Minute)}, - data: []any{7, 8, 7}, - }, - want: tsData{ - ts: []uint64{uint64(3 * time.Minute), uint64(2 * time.Minute), uint64(time.Minute)}, - data: []any{7, 8, 7}, - start: uint64(time.Minute), - end: uint64(3 * time.Minute), - }, - }, - { - name: "int empty slot in the middle", - args: tsData{ - ts: []uint64{uint64(4 * time.Minute), uint64(time.Minute)}, - data: []any{7, 9}, - }, - want: tsData{ - ts: []uint64{uint64(4 * time.Minute), uint64(1 * time.Minute)}, - data: []any{7, 9}, - start: uint64(time.Minute), - end: uint64(4 * time.Minute), - }, - }, - { - name: "float64 golden path", - args: tsData{ - ts: []uint64{uint64(4 * time.Minute), uint64(3 * time.Minute), uint64(2 * time.Minute), uint64(1 * time.Minute)}, - data: []any{7.0, 8.0, 7.0, 9.0}, - }, - want: tsData{ - ts: []uint64{uint64(4 * time.Minute), uint64(3 * time.Minute), uint64(2 * time.Minute), uint64(1 * time.Minute)}, - data: []any{7.0, 8.0, 7.0, 9.0}, - start: uint64(time.Minute), - end: uint64(4 * time.Minute), - }, - }, - { - name: "float64 more than the size", - args: tsData{ - ts: []uint64{uint64(4 * time.Minute), uint64(3 * time.Minute), uint64(2 * time.Minute), uint64(1 * time.Minute), uint64(1 * time.Minute)}, - data: []any{0.7, 0.8, 0.7, 0.9, 0.6}, - }, - want: tsData{ - ts: []uint64{uint64(4 * time.Minute), uint64(3 * time.Minute), uint64(2 * time.Minute), uint64(1 * time.Minute)}, - data: []any{0.7, 0.8, 0.7, 0.9}, - start: uint64(time.Minute), - end: uint64(4 * time.Minute), - }, - }, - { - name: "float64 less than the size", - args: tsData{ - ts: []uint64{uint64(3 * time.Minute), uint64(2 * time.Minute), uint64(time.Minute)}, - data: []any{1.7, 1.8, 1.7}, - }, - want: tsData{ - ts: []uint64{uint64(3 * time.Minute), uint64(2 * time.Minute), uint64(time.Minute)}, - data: []any{1.7, 1.8, 1.7}, - start: uint64(time.Minute), - end: uint64(3 * time.Minute), - }, - }, - { - name: "float64 empty slot in the middle", - args: tsData{ - ts: []uint64{uint64(4 * time.Minute), uint64(time.Minute)}, - data: []any{0.700033, 0.988822}, - }, - want: tsData{ - ts: []uint64{uint64(4 * time.Minute), uint64(1 * time.Minute)}, - data: []any{0.700033, 0.988822}, - start: uint64(time.Minute), - end: uint64(4 * time.Minute), - }, - }, - } - key := []byte("foo") - fn := func(k []byte) time.Duration { - assert.Equal(t, key, k) - return 1 * time.Minute - } - encoderPool := NewEncoderPool("minute", 4, fn) - decoderPool := NewDecoderPool("minute", 4, fn) - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - at := assert.New(t) - var buffer bytes.Buffer - encoder := encoderPool.Get(key, &buffer) - defer encoderPool.Put(encoder) - decoder := decoderPool.Get(key) - defer decoderPool.Put(decoder) - isFull := false - for i, v := range tt.args.ts { - encoder.Append(v, ToBytes(tt.args.data[i])) - if encoder.IsFull() { - isFull = true - break - } - } - err := encoder.Encode() - at.NoError(err) - - at.Equal(tt.want.start, encoder.StartTime()) - at.NoError(decoder.Decode(key, buffer.Bytes())) - start, end := decoder.Range() - at.Equal(tt.want.start, start) - at.Equal(tt.want.end, end) - if isFull { - at.True(decoder.IsFull()) - } - i := 0 - for iter := decoder.Iterator(); iter.Next(); i++ { - at.NoError(iter.Error()) - at.Equal(tt.want.ts[i], iter.Time()) - at.Equal(tt.want.data[i], BytesTo(tt.want.data[i], iter.Val())) - v, err := decoder.Get(iter.Time()) - at.NoError(err) - at.Equal(tt.want.data[i], BytesTo(tt.want.data[i], v)) - } - at.Equal(len(tt.want.ts), i) - }) - } -} - -func ToBytes(v any) []byte { - switch d := v.(type) { - case int: - return convert.Int64ToBytes(int64(d)) - case float64: - return convert.Float64ToBytes(d) - } - return nil -} - -func BytesTo(t any, b []byte) any { - switch t.(type) { - case int: - return int(convert.BytesToInt64(b)) - case float64: - return convert.BytesToFloat64(b) - } - return nil -} - -func TestNewDecoderGet(t *testing.T) { - type tsData struct { - ts []uint64 - data []any - } - type wantData struct { - ts []uint64 - data []any - wantErr []bool - start uint64 - end uint64 - } - tests := []struct { - name string - args tsData - want wantData - }{ - { - name: "int golden path", - args: tsData{ - ts: []uint64{uint64(4 * time.Minute), uint64(3 * time.Minute), uint64(2 * time.Minute), uint64(1 * time.Minute)}, - data: []any{7, 8, 7, 9}, - }, - want: wantData{ - ts: []uint64{uint64(4 * time.Minute), uint64(3 * time.Minute), uint64(2 * time.Minute), uint64(1 * time.Minute)}, - data: []any{7, 8, 7, 9}, - start: uint64(time.Minute), - end: uint64(4 * time.Minute), - }, - }, - { - name: "int more than the size", - args: tsData{ - ts: []uint64{uint64(4 * time.Minute), uint64(3 * time.Minute), uint64(2 * time.Minute), uint64(1 * time.Minute), uint64(1 * time.Minute)}, - data: []any{7, 8, 7, 9, 6}, - }, - want: wantData{ - ts: []uint64{uint64(4 * time.Minute), uint64(3 * time.Minute), uint64(2 * time.Minute), uint64(1 * time.Minute), 0}, - data: []any{7, 8, 7, 9, nil}, - wantErr: []bool{false, false, false, false, true}, - start: uint64(time.Minute), - end: uint64(4 * time.Minute), - }, - }, - { - name: "int less than the size", - args: tsData{ - ts: []uint64{uint64(3 * time.Minute), uint64(2 * time.Minute), uint64(time.Minute)}, - data: []any{7, 8, 7}, - }, - want: wantData{ - ts: []uint64{uint64(3 * time.Minute), uint64(2 * time.Minute), uint64(time.Minute)}, - data: []any{7, 8, 7}, - start: uint64(time.Minute), - end: uint64(3 * time.Minute), - }, - }, - { - name: "int empty slot in the middle", - args: tsData{ - ts: []uint64{uint64(4 * time.Minute), uint64(time.Minute)}, - data: []any{7, 9}, - }, - want: wantData{ - ts: []uint64{uint64(4 * time.Minute), uint64(3 * time.Minute), uint64(2 * time.Minute), uint64(1 * time.Minute)}, - data: []any{7, nil, nil, 9}, - wantErr: []bool{false, true, true, false}, - start: uint64(time.Minute), - end: uint64(4 * time.Minute), - }, - }, - { - name: "float golden path", - args: tsData{ - ts: []uint64{uint64(4 * time.Minute), uint64(3 * time.Minute), uint64(2 * time.Minute), uint64(1 * time.Minute)}, - data: []any{7.0, 8.0, 7.0, 9.0}, - }, - want: wantData{ - ts: []uint64{uint64(4 * time.Minute), uint64(3 * time.Minute), uint64(2 * time.Minute), uint64(1 * time.Minute)}, - data: []any{7.0, 8.0, 7.0, 9.0}, - start: uint64(time.Minute), - end: uint64(4 * time.Minute), - }, - }, - { - name: "float more than the size", - args: tsData{ - ts: []uint64{uint64(4 * time.Minute), uint64(3 * time.Minute), uint64(2 * time.Minute), uint64(1 * time.Minute), uint64(1 * time.Minute)}, - data: []any{1.7, 1.8, 1.7, 1.9, 1.6}, - }, - want: wantData{ - ts: []uint64{uint64(4 * time.Minute), uint64(3 * time.Minute), uint64(2 * time.Minute), uint64(1 * time.Minute), 0}, - data: []any{1.7, 1.8, 1.7, 1.9, nil}, - wantErr: []bool{false, false, false, false, true}, - start: uint64(time.Minute), - end: uint64(4 * time.Minute), - }, - }, - { - name: "float less than the size", - args: tsData{ - ts: []uint64{uint64(3 * time.Minute), uint64(2 * time.Minute), uint64(time.Minute)}, - data: []any{0.71, 0.833, 0.709}, - }, - want: wantData{ - ts: []uint64{uint64(3 * time.Minute), uint64(2 * time.Minute), uint64(time.Minute)}, - data: []any{0.71, 0.833, 0.709}, - start: uint64(time.Minute), - end: uint64(3 * time.Minute), - }, - }, - { - name: "float empty slot in the middle", - args: tsData{ - ts: []uint64{uint64(4 * time.Minute), uint64(time.Minute)}, - data: []any{1.7, 1.9}, - }, - want: wantData{ - ts: []uint64{uint64(4 * time.Minute), uint64(3 * time.Minute), uint64(2 * time.Minute), uint64(1 * time.Minute)}, - data: []any{1.7, nil, nil, 1.9}, - wantErr: []bool{false, true, true, false}, - start: uint64(time.Minute), - end: uint64(4 * time.Minute), - }, - }, - } - key := []byte("foo") - fn := func(k []byte) time.Duration { - assert.Equal(t, key, k) - return 1 * time.Minute - } - encoderPool := NewEncoderPool("minute", 4, fn) - decoderPool := NewDecoderPool("minute", 4, fn) - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - at := assert.New(t) - var buffer bytes.Buffer - encoder := encoderPool.Get(key, &buffer) - defer encoderPool.Put(encoder) - decoder := decoderPool.Get(key) - defer decoderPool.Put(decoder) - isFull := false - for i, v := range tt.args.ts { - encoder.Append(v, ToBytes(tt.args.data[i])) - if encoder.IsFull() { - isFull = true - break - } - } - err := encoder.Encode() - at.NoError(err) - - at.Equal(tt.want.start, encoder.StartTime()) - at.NoError(decoder.Decode(key, buffer.Bytes())) - start, end := decoder.Range() - at.Equal(tt.want.start, start) - at.Equal(tt.want.end, end) - if isFull { - at.True(decoder.IsFull()) - } - for i, t := range tt.want.ts { - wantErr := false - if tt.want.wantErr != nil { - wantErr = tt.want.wantErr[i] - } - v, err := decoder.Get(t) - if wantErr { - at.ErrorIs(err, errNoData) - } else { - at.NoError(err) - at.Equal(tt.want.data[i], BytesTo(tt.want.data[i], v)) - } - } - }) - } -} diff --git a/pkg/encoding/int.go b/pkg/encoding/int.go index 21b774826..05a227ab2 100644 --- a/pkg/encoding/int.go +++ b/pkg/encoding/int.go @@ -20,7 +20,8 @@ package encoding import ( "encoding/binary" "fmt" - "sync" + + "github.com/apache/skywalking-banyandb/pkg/pool" ) // Uint16ToBytes appends the bytes of the given uint16 to the given byte slice. @@ -224,7 +225,7 @@ func GenerateInt64List(size int) *Int64List { L: make([]int64, size), } } - is := v.(*Int64List) + is := v if n := size - cap(is.L); n > 0 { is.L = append(is.L[:cap(is.L)], make([]int64, n)...) } @@ -243,7 +244,7 @@ type Int64List struct { L []int64 } -var int64ListPool sync.Pool +var int64ListPool = pool.Register[*Int64List]("encoding-int64List") // GenerateUint64List generates a list of uint64 with the given size. // The returned list may be from a pool and should be released after use. @@ -254,7 +255,7 @@ func GenerateUint64List(size int) *Uint64List { L: make([]uint64, size), } } - is := v.(*Uint64List) + is := v if n := size - cap(is.L); n > 0 { is.L = append(is.L[:cap(is.L)], make([]uint64, n)...) } @@ -273,4 +274,4 @@ type Uint64List struct { L []uint64 } -var uint64ListPool sync.Pool +var uint64ListPool = pool.Register[*Uint64List]("encoding-uin64List") diff --git a/pkg/fs/local_file_system.go b/pkg/fs/local_file_system.go index ff7dac5ed..39fcb22a5 100644 --- a/pkg/fs/local_file_system.go +++ b/pkg/fs/local_file_system.go @@ -25,12 +25,12 @@ import ( "io" "os" "path/filepath" - "sync" "time" "github.com/shirou/gopsutil/v3/disk" "github.com/apache/skywalking-banyandb/pkg/logger" + "github.com/apache/skywalking-banyandb/pkg/pool" ) const defaultIOSize = 256 * 1024 @@ -465,7 +465,7 @@ func generateReader(f *os.File) *bufio.Reader { if v == nil { return bufio.NewReaderSize(f, defaultIOSize) } - br := v.(*bufio.Reader) + br := v br.Reset(f) return br } @@ -475,14 +475,14 @@ func releaseReader(br *bufio.Reader) { bufReaderPool.Put(br) } -var bufReaderPool sync.Pool +var bufReaderPool = pool.Register[*bufio.Reader]("fs-bufReader") func generateWriter(f *os.File) *bufio.Writer { v := bufWriterPool.Get() if v == nil { return bufio.NewWriterSize(f, defaultIOSize) } - bw := v.(*bufio.Writer) + bw := v bw.Reset(f) return bw } @@ -492,4 +492,4 @@ func releaseWriter(bw *bufio.Writer) { bufWriterPool.Put(bw) } -var bufWriterPool sync.Pool +var bufWriterPool = pool.Register[*bufio.Writer]("fs-bufWriter") diff --git a/pkg/index/inverted/inverted.go b/pkg/index/inverted/inverted.go index a481b7f3b..81af6e208 100644 --- a/pkg/index/inverted/inverted.go +++ b/pkg/index/inverted/inverted.go @@ -25,7 +25,6 @@ import ( "io" "log" "math" - "sync" "time" "github.com/blugelabs/bluge" @@ -43,6 +42,7 @@ import ( "github.com/apache/skywalking-banyandb/pkg/index/posting" "github.com/apache/skywalking-banyandb/pkg/index/posting/roaring" "github.com/apache/skywalking-banyandb/pkg/logger" + "github.com/apache/skywalking-banyandb/pkg/pool" "github.com/apache/skywalking-banyandb/pkg/run" ) @@ -88,14 +88,14 @@ type store struct { l *logger.Logger } -var batchPool sync.Pool +var batchPool = pool.Register[*blugeIndex.Batch]("index-bluge-batch") func generateBatch() *blugeIndex.Batch { b := batchPool.Get() if b == nil { return bluge.NewBatch() } - return b.(*blugeIndex.Batch) + return b } func releaseBatch(b *blugeIndex.Batch) { diff --git a/pkg/meter/native/collection.go b/pkg/meter/native/collection.go index 9904e11a6..209211fd3 100644 --- a/pkg/meter/native/collection.go +++ b/pkg/meter/native/collection.go @@ -19,6 +19,7 @@ package native import ( + "fmt" "time" "google.golang.org/protobuf/types/known/timestamppb" @@ -34,6 +35,7 @@ import ( // NodeSelector has Locate method to select a nodeId. type NodeSelector interface { Locate(group, name string, shardID uint32) (string, error) + fmt.Stringer } type collector interface { diff --git a/pkg/node/interface.go b/pkg/node/interface.go index 0f41aab58..c4341b7dd 100644 --- a/pkg/node/interface.go +++ b/pkg/node/interface.go @@ -20,6 +20,7 @@ package node import ( "context" + "fmt" "sync" "github.com/pkg/errors" @@ -42,6 +43,7 @@ type Selector interface { RemoveNode(node *databasev1.Node) Pick(group, name string, shardID uint32) (string, error) run.PreRunner + fmt.Stringer } // NewPickFirstSelector returns a simple selector that always returns the first node if exists. @@ -58,6 +60,17 @@ type pickFirstSelector struct { mu sync.RWMutex } +// String implements Selector. +func (p *pickFirstSelector) String() string { + n, err := p.Pick("", "", 0) + if err != nil { + return fmt.Sprintf("%v", err) + } + p.mu.Lock() + defer p.mu.Unlock() + return fmt.Sprintf("pick [%s] from %s", n, p.nodeIDs) +} + func (p *pickFirstSelector) PreRun(context.Context) error { return nil } diff --git a/pkg/node/maglev.go b/pkg/node/maglev.go index 34a855f6c..3efcaab6e 100644 --- a/pkg/node/maglev.go +++ b/pkg/node/maglev.go @@ -19,6 +19,7 @@ package node import ( "context" + "fmt" "sort" "strconv" "sync" @@ -38,6 +39,18 @@ type maglevSelector struct { mutex sync.RWMutex } +// String implements Selector. +func (m *maglevSelector) String() string { + var groups []string + m.routers.Range(func(key, _ any) bool { + groups = append(groups, key.(string)) + return true + }) + m.mutex.RLock() + defer m.mutex.Unlock() + return fmt.Sprintf("nodes:%s groups:%s", m.nodes, groups) +} + func (m *maglevSelector) Name() string { return "maglev-selector" } diff --git a/pkg/node/round_robin.go b/pkg/node/round_robin.go index 2815b19cd..d0bb37e1c 100644 --- a/pkg/node/round_robin.go +++ b/pkg/node/round_robin.go @@ -19,6 +19,7 @@ package node import ( "context" + "encoding/json" "fmt" "slices" "sort" @@ -32,22 +33,47 @@ import ( databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1" "github.com/apache/skywalking-banyandb/banyand/metadata" "github.com/apache/skywalking-banyandb/banyand/metadata/schema" + "github.com/apache/skywalking-banyandb/pkg/convert" ) type roundRobinSelector struct { schemaRegistry metadata.Repo closeCh chan struct{} - lookupTable sync.Map + lookupTable []key nodes []string mu sync.RWMutex } +func (r *roundRobinSelector) String() string { + r.mu.RLock() + defer r.mu.RUnlock() + result := make(map[string]string) + for _, entry := range r.lookupTable { + n, err := r.Pick(entry.group, "", entry.shardID) + key := fmt.Sprintf("%s-%d", entry.group, entry.shardID) + if err != nil { + result[key] = fmt.Sprintf("%v", err) + continue + } + result[key] = n + } + if len(result) < 1 { + return "" + } + jsonBytes, err := json.Marshal(result) + if err != nil { + return fmt.Sprintf("%v", err) + } + return convert.BytesToString(jsonBytes) +} + // NewRoundRobinSelector creates a new round-robin selector. func NewRoundRobinSelector(schemaRegistry metadata.Repo) Selector { rrs := &roundRobinSelector{ nodes: make([]string, 0), closeCh: make(chan struct{}), schemaRegistry: schemaRegistry, + lookupTable: make([]key, 0), } return rrs } @@ -69,9 +95,11 @@ func (r *roundRobinSelector) OnAddOrUpdate(schemaMetadata schema.Metadata) { if !ok || !validateGroup(group) { return } + r.mu.Lock() + defer r.mu.Unlock() for i := uint32(0); i < group.ResourceOpts.ShardNum; i++ { k := key{group: group.Metadata.Name, shardID: i} - r.lookupTable.Store(k, 0) + r.lookupTable = append(r.lookupTable, k) } r.sortEntries() } @@ -80,12 +108,18 @@ func (r *roundRobinSelector) OnDelete(schemaMetadata schema.Metadata) { if schemaMetadata.Kind != schema.KindGroup { return } + r.mu.Lock() + defer r.mu.Unlock() group := schemaMetadata.Spec.(*commonv1.Group) for i := uint32(0); i < group.ResourceOpts.ShardNum; i++ { k := key{group: group.Metadata.Name, shardID: i} - r.lookupTable.Delete(k) + for j := range r.lookupTable { + if r.lookupTable[j] == k { + r.lookupTable = append(r.lookupTable[:j], r.lookupTable[j+1:]...) + break + } + } } - r.sortEntries() } func (r *roundRobinSelector) OnInit(kinds []schema.Kind) (bool, []int64) { @@ -101,8 +135,10 @@ func (r *roundRobinSelector) OnInit(kinds []schema.Kind) (bool, []int64) { if err != nil { panic(fmt.Sprintf("failed to list group: %v", err)) } + r.mu.Lock() + defer r.mu.Unlock() var revision int64 - r.lookupTable = sync.Map{} + r.lookupTable = r.lookupTable[:0] for _, g := range gg { if !validateGroup(g) { continue @@ -112,7 +148,7 @@ func (r *roundRobinSelector) OnInit(kinds []schema.Kind) (bool, []int64) { } for i := uint32(0); i < g.ResourceOpts.ShardNum; i++ { k := key{group: g.Metadata.Name, shardID: i} - r.lookupTable.Store(k, 0) + r.lookupTable = append(r.lookupTable, k) } } r.sortEntries() @@ -144,29 +180,26 @@ func (r *roundRobinSelector) Pick(group, _ string, shardID uint32) (string, erro if len(r.nodes) == 0 { return "", errors.New("no nodes available") } - entry, ok := r.lookupTable.Load(k) - if ok { - return r.selectNode(entry), nil + i := sort.Search(len(r.lookupTable), func(i int) bool { + if r.lookupTable[i].group == group { + return r.lookupTable[i].shardID >= shardID + } + return r.lookupTable[i].group > group + }) + if i < len(r.lookupTable) && r.lookupTable[i] == k { + return r.selectNode(i), nil } return "", fmt.Errorf("%s-%d is a unknown shard", group, shardID) } func (r *roundRobinSelector) sortEntries() { - var keys []key - r.lookupTable.Range(func(k, _ any) bool { - keys = append(keys, k.(key)) - return true - }) - slices.SortFunc(keys, func(a, b key) int { + slices.SortFunc(r.lookupTable, func(a, b key) int { n := strings.Compare(a.group, b.group) if n != 0 { return n } return int(a.shardID) - int(b.shardID) }) - for i := range keys { - r.lookupTable.Store(keys[i], i) - } } func (r *roundRobinSelector) Close() { diff --git a/pkg/node/round_robin_test.go b/pkg/node/round_robin_test.go index 9e8037c50..223e1908f 100644 --- a/pkg/node/round_robin_test.go +++ b/pkg/node/round_robin_test.go @@ -111,6 +111,28 @@ func TestCleanupGroup(t *testing.T) { assert.Error(t, err) } +func TestSortNodeEntries(t *testing.T) { + selector := &roundRobinSelector{ + nodes: make([]string, 0), + } + setupGroup(selector) + selector.AddNode(&databasev1.Node{Metadata: &commonv1.Metadata{Name: "node3"}}) + selector.AddNode(&databasev1.Node{Metadata: &commonv1.Metadata{Name: "node1"}}) + selector.AddNode(&databasev1.Node{Metadata: &commonv1.Metadata{Name: "node2"}}) + assert.EqualValues(t, []string{"node1", "node2", "node3"}, selector.nodes) +} + +func TestStringer(t *testing.T) { + selector := NewRoundRobinSelector(nil) + assert.Empty(t, selector.String()) + setupGroup(selector) + assert.NotEmpty(t, selector.String()) + selector.AddNode(&databasev1.Node{Metadata: &commonv1.Metadata{Name: "node3"}}) + selector.AddNode(&databasev1.Node{Metadata: &commonv1.Metadata{Name: "node1"}}) + selector.AddNode(&databasev1.Node{Metadata: &commonv1.Metadata{Name: "node2"}}) + assert.NotEmpty(t, selector.String()) +} + var groupSchema = schema.Metadata{ TypeMeta: schema.TypeMeta{ Kind: schema.KindGroup, diff --git a/pkg/pb/v1/series.go b/pkg/pb/v1/series.go index 754d155e2..a0e768276 100644 --- a/pkg/pb/v1/series.go +++ b/pkg/pb/v1/series.go @@ -19,7 +19,6 @@ package v1 import ( "sort" - "sync" "github.com/pkg/errors" @@ -97,26 +96,6 @@ func (s *Series) reset() { s.Buffer = s.Buffer[:0] } -// SeriesPool is a pool of Series. -type SeriesPool struct { - pool sync.Pool -} - -// Generate creates a new Series or gets one from the pool. -func (sp *SeriesPool) Generate() *Series { - sv := sp.pool.Get() - if sv == nil { - return &Series{} - } - return sv.(*Series) -} - -// Release puts a Series back to the pool. -func (sp *SeriesPool) Release(s *Series) { - s.reset() - sp.pool.Put(s) -} - // SeriesList is a collection of Series. type SeriesList []*Series diff --git a/pkg/pool/pool.go b/pkg/pool/pool.go new file mode 100644 index 000000000..323cb81ca --- /dev/null +++ b/pkg/pool/pool.go @@ -0,0 +1,81 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// Package pool provides a pool for reusing objects. +package pool + +import ( + "fmt" + "sync" + "sync/atomic" +) + +var poolMap = sync.Map{} + +// Register registers a new pool with the given name. +func Register[T any](name string) *Synced[T] { + p := new(Synced[T]) + if _, ok := poolMap.LoadOrStore(name, p); ok { + panic(fmt.Sprintf("duplicated pool: %s", name)) + } + return p +} + +// AllRefsCount returns the reference count of all pools. +func AllRefsCount() map[string]int { + result := make(map[string]int) + poolMap.Range(func(key, value any) bool { + result[key.(string)] = value.(Trackable).RefsCount() + return true + }) + return result +} + +// Trackable is the interface that wraps the RefsCount method. +type Trackable interface { + // RefsCount returns the reference count of the pool. + RefsCount() int +} + +// Synced is a pool that is safe for concurrent use. +type Synced[T any] struct { + sync.Pool + refs atomic.Int32 +} + +// Get returns an object from the pool. +// If the pool is empty, nil is returned. +func (p *Synced[T]) Get() T { + v := p.Pool.Get() + p.refs.Add(1) + if v == nil { + var t T + return t + } + return v.(T) +} + +// Put puts an object back to the pool. +func (p *Synced[T]) Put(v T) { + p.Pool.Put(v) + p.refs.Add(-1) +} + +// RefsCount returns the reference count of the pool. +func (p *Synced[T]) RefsCount() int { + return int(p.refs.Load()) +} diff --git a/pkg/test/gmatcher/gmatcher.go b/pkg/test/gmatcher/gmatcher.go new file mode 100644 index 000000000..93b6fb1af --- /dev/null +++ b/pkg/test/gmatcher/gmatcher.go @@ -0,0 +1,59 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// Package gmatcher provides custom Gomega matchers. +package gmatcher + +import ( + "fmt" + + "github.com/onsi/gomega" +) + +// HaveZeroRef returns a matcher that checks if all pools have 0 references. +func HaveZeroRef() gomega.OmegaMatcher { + return &ZeroRefMatcher{} +} + +var _ gomega.OmegaMatcher = &ZeroRefMatcher{} + +// ZeroRefMatcher is a matcher that checks if all pools have 0 references. +type ZeroRefMatcher struct{} + +// FailureMessage implements types.GomegaMatcher. +func (p *ZeroRefMatcher) FailureMessage(actual interface{}) (message string) { + return fmt.Sprintf("expected all pools to have 0 references, got %v", actual) +} + +// Match implements types.GomegaMatcher. +func (p *ZeroRefMatcher) Match(actual interface{}) (success bool, err error) { + data, ok := actual.(map[string]int) + if !ok { + return false, fmt.Errorf("expected map[string]int, got %T", actual) + } + for pooName, refers := range data { + if refers > 0 { + return false, fmt.Errorf("pool %s has %d references", pooName, refers) + } + } + return true, nil +} + +// NegatedFailureMessage implements types.GomegaMatcher. +func (p *ZeroRefMatcher) NegatedFailureMessage(actual interface{}) (message string) { + return fmt.Sprintf("expected at least one pool to have references, got %v", actual) +} diff --git a/pkg/test/measure/testdata/groups/exception.json b/pkg/test/measure/testdata/groups/exception.json new file mode 100644 index 000000000..fadbd5a5b --- /dev/null +++ b/pkg/test/measure/testdata/groups/exception.json @@ -0,0 +1,18 @@ +{ + "metadata": { + "name": "exception" + }, + "catalog": "CATALOG_MEASURE", + "resource_opts": { + "shard_num": 2, + "segment_interval": { + "unit": "UNIT_DAY", + "num": 1 + }, + "ttl": { + "unit": "UNIT_DAY", + "num": 7 + } + }, + "updated_at": "2021-04-15T01:30:15.01Z" +} \ No newline at end of file diff --git a/pkg/test/measure/testdata/measures/duplicated.json b/pkg/test/measure/testdata/measures/duplicated.json new file mode 100644 index 000000000..e25f0a372 --- /dev/null +++ b/pkg/test/measure/testdata/measures/duplicated.json @@ -0,0 +1,42 @@ +{ + "metadata": { + "name": "duplicated", + "group": "exception" + }, + "tag_families": [ + { + "name": "default", + "tags": [ + { + "name": "id", + "type": "TAG_TYPE_STRING" + }, + { + "name": "entity_id", + "type": "TAG_TYPE_STRING" + } + ] + } + ], + "fields": [ + { + "name": "total", + "field_type": "FIELD_TYPE_INT", + "encoding_method": "ENCODING_METHOD_GORILLA", + "compression_method": "COMPRESSION_METHOD_ZSTD" + }, + { + "name": "value", + "field_type": "FIELD_TYPE_INT", + "encoding_method": "ENCODING_METHOD_GORILLA", + "compression_method": "COMPRESSION_METHOD_ZSTD" + } + ], + "entity": { + "tag_names": [ + "entity_id" + ] + }, + "interval": "1m", + "updated_at": "2021-04-15T01:30:15.01Z" +} \ No newline at end of file diff --git a/test/cases/init.go b/test/cases/init.go index a39e09f64..7341a96d3 100644 --- a/test/cases/init.go +++ b/test/cases/init.go @@ -54,4 +54,5 @@ func Initialize(addr string, now time.Time) { casesmeasuredata.Write(conn, "service_latency_minute", "sw_metric", "service_latency_minute_data.json", now, interval) casesmeasuredata.Write(conn, "service_instance_latency_minute", "sw_metric", "service_instance_latency_minute_data.json", now, interval) casesmeasuredata.Write(conn, "service_instance_latency_minute", "sw_metric", "service_instance_latency_minute_data1.json", now.Add(1*time.Minute), interval) + casesmeasuredata.Write(conn, "duplicated", "exception", "duplicated.json", now, 0) } diff --git a/test/cases/measure/data/input/duplicated_part.yaml b/test/cases/measure/data/input/duplicated_part.yaml new file mode 100644 index 000000000..919008203 --- /dev/null +++ b/test/cases/measure/data/input/duplicated_part.yaml @@ -0,0 +1,25 @@ +# Licensed to Apache Software Foundation (ASF) under one or more contributor +# license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright +# ownership. Apache Software Foundation (ASF) licenses this file to you under +# the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +name: "duplicated" +groups: ["exception"] +tagProjection: + tagFamilies: + - name: "default" + tags: ["id", "entity_id"] +fieldProjection: + names: ["total", "value"] diff --git a/test/cases/measure/data/testdata/duplicated.json b/test/cases/measure/data/testdata/duplicated.json new file mode 100644 index 000000000..d67feaaf1 --- /dev/null +++ b/test/cases/measure/data/testdata/duplicated.json @@ -0,0 +1,182 @@ +[ + { + "tag_families": [ + { + "tags": [ + { + "str": { + "value": "svc1" + } + }, + { + "str": { + "value": "entity_1" + } + } + ] + } + ], + "fields": [ + { + "int": { + "value": 100 + } + }, + { + "int": { + "value": 1 + } + } + ] + }, + { + "tag_families": [ + { + "tags": [ + { + "str": { + "value": "svc1" + } + }, + { + "str": { + "value": "entity_1" + } + } + ] + } + ], + "fields": [ + { + "int": { + "value": 100 + } + }, + { + "int": { + "value": 2 + } + } + ] + }, + { + "tag_families": [ + { + "tags": [ + { + "str": { + "value": "svc1" + } + }, + { + "str": { + "value": "entity_1" + } + } + ] + } + ], + "fields": [ + { + "int": { + "value": 100 + } + }, + { + "int": { + "value": 3 + } + } + ] + }, + { + "tag_families": [ + { + "tags": [ + { + "str": { + "value": "svc2" + } + }, + { + "str": { + "value": "entity_1" + } + } + ] + } + ], + "fields": [ + { + "int": { + "value": 100 + } + }, + { + "int": { + "value": 5 + } + } + ] + }, + { + "tag_families": [ + { + "tags": [ + { + "str": { + "value": "svc2" + } + }, + { + "str": { + "value": "entity_1" + } + } + ] + } + ], + "fields": [ + { + "int": { + "value": 50 + } + }, + { + "int": { + "value": 4 + } + } + ] + }, + { + "tag_families": [ + { + "tags": [ + { + "str": { + "value": "svc3" + } + }, + { + "str": { + "value": "entity_1" + } + } + ] + } + ], + "fields": [ + { + "int": { + "value": 300 + } + }, + { + "int": { + "value": 6 + } + } + ] + } +] diff --git a/test/cases/measure/data/want/duplicated_part.yaml b/test/cases/measure/data/want/duplicated_part.yaml new file mode 100644 index 000000000..10a57e59a --- /dev/null +++ b/test/cases/measure/data/want/duplicated_part.yaml @@ -0,0 +1,38 @@ +# Licensed to Apache Software Foundation (ASF) under one or more contributor +# license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright +# ownership. Apache Software Foundation (ASF) licenses this file to you under +# the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +dataPoints: +- fields: + - name: total + value: + int: + value: "300" + - name: value + value: + int: + value: "6" + tagFamilies: + - name: default + tags: + - key: id + value: + str: + value: svc3 + - key: entity_id + value: + str: + value: entity_1 diff --git a/test/cases/measure/measure.go b/test/cases/measure/measure.go index 960401ca7..06b24498a 100644 --- a/test/cases/measure/measure.go +++ b/test/cases/measure/measure.go @@ -70,4 +70,5 @@ var _ = g.DescribeTable("Scanning Measures", verify, g.Entry("float64 value", helpers.Args{Input: "float", Duration: 25 * time.Minute, Offset: -20 * time.Minute}), g.Entry("float64 aggregation:min", helpers.Args{Input: "float_agg_min", Duration: 25 * time.Minute, Offset: -20 * time.Minute}), g.Entry("all_latency", helpers.Args{Input: "all_latency", Duration: 25 * time.Minute, Offset: -20 * time.Minute}), + g.Entry("duplicated in a part", helpers.Args{Input: "duplicated_part", Duration: 25 * time.Minute, Offset: -20 * time.Minute}), ) diff --git a/test/integration/distributed/query/query_suite_test.go b/test/integration/distributed/query/query_suite_test.go index 0c78a14f2..3d8eaad31 100644 --- a/test/integration/distributed/query/query_suite_test.go +++ b/test/integration/distributed/query/query_suite_test.go @@ -35,8 +35,10 @@ import ( "github.com/apache/skywalking-banyandb/banyand/metadata/schema" "github.com/apache/skywalking-banyandb/pkg/grpchelper" "github.com/apache/skywalking-banyandb/pkg/logger" + "github.com/apache/skywalking-banyandb/pkg/pool" "github.com/apache/skywalking-banyandb/pkg/test" "github.com/apache/skywalking-banyandb/pkg/test/flags" + "github.com/apache/skywalking-banyandb/pkg/test/gmatcher" "github.com/apache/skywalking-banyandb/pkg/test/helpers" test_measure "github.com/apache/skywalking-banyandb/pkg/test/measure" "github.com/apache/skywalking-banyandb/pkg/test/setup" @@ -132,4 +134,5 @@ var _ = SynchronizedAfterSuite(func() { }, func() { deferFunc() Eventually(gleak.Goroutines, flags.EventuallyTimeout).ShouldNot(gleak.HaveLeaked(goods)) + Eventually(pool.AllRefsCount, flags.EventuallyTimeout).Should(gmatcher.HaveZeroRef()) }) diff --git a/test/integration/etcd/client_test.go b/test/integration/etcd/client_test.go index a78d8345d..ddfb47973 100644 --- a/test/integration/etcd/client_test.go +++ b/test/integration/etcd/client_test.go @@ -36,8 +36,10 @@ import ( databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1" "github.com/apache/skywalking-banyandb/banyand/metadata/schema" + "github.com/apache/skywalking-banyandb/pkg/pool" "github.com/apache/skywalking-banyandb/pkg/test" "github.com/apache/skywalking-banyandb/pkg/test/flags" + "github.com/apache/skywalking-banyandb/pkg/test/gmatcher" "github.com/apache/skywalking-banyandb/pkg/test/helpers" "github.com/apache/skywalking-banyandb/pkg/test/setup" ) @@ -76,6 +78,7 @@ var _ = Describe("Client Test", func() { AfterEach(func() { dirSpaceDef() Eventually(gleak.Goroutines, flags.EventuallyTimeout).ShouldNot(gleak.HaveLeaked(goods)) + Eventually(pool.AllRefsCount, flags.EventuallyTimeout).Should(gmatcher.HaveZeroRef()) }) It("should be using user/password connect etcd server successfully", func() { diff --git a/test/integration/load/load_suite_test.go b/test/integration/load/load_suite_test.go index 1858e3806..84dccc386 100644 --- a/test/integration/load/load_suite_test.go +++ b/test/integration/load/load_suite_test.go @@ -36,8 +36,10 @@ import ( streamv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/stream/v1" "github.com/apache/skywalking-banyandb/pkg/grpchelper" "github.com/apache/skywalking-banyandb/pkg/logger" + "github.com/apache/skywalking-banyandb/pkg/pool" "github.com/apache/skywalking-banyandb/pkg/test" "github.com/apache/skywalking-banyandb/pkg/test/flags" + "github.com/apache/skywalking-banyandb/pkg/test/gmatcher" "github.com/apache/skywalking-banyandb/pkg/test/helpers" "github.com/apache/skywalking-banyandb/pkg/test/setup" cases_stream_data "github.com/apache/skywalking-banyandb/test/cases/stream/data" @@ -157,6 +159,7 @@ var _ = Describe("Load Test Suit", func() { } deferFunc() Eventually(gleak.Goroutines, flags.EventuallyTimeout).ShouldNot(gleak.HaveLeaked(goods)) + Eventually(pool.AllRefsCount, flags.EventuallyTimeout).Should(gmatcher.HaveZeroRef()) }) }) diff --git a/test/integration/standalone/cold_query/query_suite_test.go b/test/integration/standalone/cold_query/query_suite_test.go index 18e832ade..a28c145ed 100644 --- a/test/integration/standalone/cold_query/query_suite_test.go +++ b/test/integration/standalone/cold_query/query_suite_test.go @@ -29,7 +29,9 @@ import ( "github.com/apache/skywalking-banyandb/pkg/grpchelper" "github.com/apache/skywalking-banyandb/pkg/logger" + "github.com/apache/skywalking-banyandb/pkg/pool" "github.com/apache/skywalking-banyandb/pkg/test/flags" + "github.com/apache/skywalking-banyandb/pkg/test/gmatcher" "github.com/apache/skywalking-banyandb/pkg/test/helpers" "github.com/apache/skywalking-banyandb/pkg/test/setup" "github.com/apache/skywalking-banyandb/pkg/timestamp" @@ -89,4 +91,5 @@ var _ = SynchronizedAfterSuite(func() { }, func() { deferFunc() Eventually(gleak.Goroutines, flags.EventuallyTimeout).ShouldNot(gleak.HaveLeaked(goods)) + Eventually(pool.AllRefsCount, flags.EventuallyTimeout).Should(gmatcher.HaveZeroRef()) }) diff --git a/test/integration/standalone/other/measure_test.go b/test/integration/standalone/other/measure_test.go index a2ab6e816..a0ced9ac0 100644 --- a/test/integration/standalone/other/measure_test.go +++ b/test/integration/standalone/other/measure_test.go @@ -27,7 +27,9 @@ import ( "google.golang.org/grpc/credentials/insecure" "github.com/apache/skywalking-banyandb/pkg/grpchelper" + "github.com/apache/skywalking-banyandb/pkg/pool" "github.com/apache/skywalking-banyandb/pkg/test/flags" + "github.com/apache/skywalking-banyandb/pkg/test/gmatcher" "github.com/apache/skywalking-banyandb/pkg/test/helpers" "github.com/apache/skywalking-banyandb/pkg/test/setup" "github.com/apache/skywalking-banyandb/pkg/timestamp" @@ -57,6 +59,7 @@ var _ = g.Describe("Query service_cpm_minute", func() { gm.Expect(conn.Close()).To(gm.Succeed()) deferFn() gm.Eventually(gleak.Goroutines, flags.EventuallyTimeout).ShouldNot(gleak.HaveLeaked(goods)) + gm.Eventually(pool.AllRefsCount, flags.EventuallyTimeout).Should(gmatcher.HaveZeroRef()) }) g.It("queries service_cpm_minute by id after updating", func() { casesMeasureData.Write(conn, "service_cpm_minute", "sw_metric", "service_cpm_minute_data1.json", baseTime, interval) diff --git a/test/integration/standalone/other/property_test.go b/test/integration/standalone/other/property_test.go index ab217059c..164b3ce32 100644 --- a/test/integration/standalone/other/property_test.go +++ b/test/integration/standalone/other/property_test.go @@ -32,7 +32,9 @@ import ( modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1" propertyv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/property/v1" "github.com/apache/skywalking-banyandb/pkg/grpchelper" + "github.com/apache/skywalking-banyandb/pkg/pool" "github.com/apache/skywalking-banyandb/pkg/test/flags" + "github.com/apache/skywalking-banyandb/pkg/test/gmatcher" "github.com/apache/skywalking-banyandb/pkg/test/setup" ) @@ -57,6 +59,7 @@ var _ = Describe("Property application", func() { Expect(conn.Close()).To(Succeed()) deferFn() Eventually(gleak.Goroutines, flags.EventuallyTimeout).ShouldNot(gleak.HaveLeaked(goods)) + Eventually(pool.AllRefsCount, flags.EventuallyTimeout).Should(gmatcher.HaveZeroRef()) }) It("applies properties", func() { md := &propertyv1.Metadata{ diff --git a/test/integration/standalone/query/query_suite_test.go b/test/integration/standalone/query/query_suite_test.go index a1e684b2d..765cd893b 100644 --- a/test/integration/standalone/query/query_suite_test.go +++ b/test/integration/standalone/query/query_suite_test.go @@ -29,7 +29,9 @@ import ( "github.com/apache/skywalking-banyandb/pkg/grpchelper" "github.com/apache/skywalking-banyandb/pkg/logger" + "github.com/apache/skywalking-banyandb/pkg/pool" "github.com/apache/skywalking-banyandb/pkg/test/flags" + "github.com/apache/skywalking-banyandb/pkg/test/gmatcher" "github.com/apache/skywalking-banyandb/pkg/test/helpers" "github.com/apache/skywalking-banyandb/pkg/test/setup" "github.com/apache/skywalking-banyandb/pkg/timestamp" @@ -90,4 +92,5 @@ var _ = SynchronizedAfterSuite(func() { }, func() { deferFunc() Eventually(gleak.Goroutines, flags.EventuallyTimeout).ShouldNot(gleak.HaveLeaked(goods)) + Eventually(pool.AllRefsCount, flags.EventuallyTimeout).Should(gmatcher.HaveZeroRef()) }) diff --git a/test/integration/standalone/query_ondisk/query_ondisk_suite_test.go b/test/integration/standalone/query_ondisk/query_ondisk_suite_test.go index f6fcb14cf..346b89183 100644 --- a/test/integration/standalone/query_ondisk/query_ondisk_suite_test.go +++ b/test/integration/standalone/query_ondisk/query_ondisk_suite_test.go @@ -29,8 +29,10 @@ import ( "github.com/apache/skywalking-banyandb/pkg/grpchelper" "github.com/apache/skywalking-banyandb/pkg/logger" + "github.com/apache/skywalking-banyandb/pkg/pool" "github.com/apache/skywalking-banyandb/pkg/test" "github.com/apache/skywalking-banyandb/pkg/test/flags" + "github.com/apache/skywalking-banyandb/pkg/test/gmatcher" "github.com/apache/skywalking-banyandb/pkg/test/helpers" "github.com/apache/skywalking-banyandb/pkg/test/setup" "github.com/apache/skywalking-banyandb/pkg/timestamp" @@ -103,4 +105,5 @@ var _ = SynchronizedAfterSuite(func() { }, func() { deferFunc() Eventually(gleak.Goroutines, flags.EventuallyTimeout).ShouldNot(gleak.HaveLeaked(goods)) + Eventually(pool.AllRefsCount, flags.EventuallyTimeout).Should(gmatcher.HaveZeroRef()) })