Skip to content

Commit

Permalink
Add Tracker To Object Pool (#508)
Browse files Browse the repository at this point in the history
* Fix duplicated measure data in a single part

* Add the tracked pool to fix leak issues

Signed-off-by: Gao Hongtao <[email protected]>

---------

Signed-off-by: Gao Hongtao <[email protected]>
  • Loading branch information
hanahmily authored Aug 6, 2024
1 parent 768f664 commit f354d0a
Show file tree
Hide file tree
Showing 65 changed files with 879 additions and 1,015 deletions.
7 changes: 7 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,19 @@ Release Notes.
- Fix a bug that the Stream module didn't support duplicated in index-based filtering and sorting
- Fix the bug that segment's reference count is increased twice when the controller try to create an existing segment.
- Fix a bug where a distributed query would return an empty result if the "limit" was set much lower than the "offset".
- Fix duplicated measure data in a single part.
- Fix several "sync.Pool" leak issues by adding a tracker to the pool.

### Documentation

- Introduce new doc menu structure.
- Add installation on Docker and Kubernetes.
- Add quick-start guide.

### Chores

Bump up the version of infra e2e framework.

## 0.6.1

### Features
Expand Down
10 changes: 3 additions & 7 deletions banyand/internal/storage/index_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,6 @@ import (
"github.com/apache/skywalking-banyandb/pkg/test/flags"
)

var testSeriesPool pbv1.SeriesPool

func TestSeriesIndex_Primary(t *testing.T) {
ctx := context.Background()
path, fn := setUp(require.New(t))
Expand All @@ -46,7 +44,7 @@ func TestSeriesIndex_Primary(t *testing.T) {
}()
var docs index.Documents
for i := 0; i < 100; i++ {
series := testSeriesPool.Generate()
var series pbv1.Series
series.Subject = "service_instance_latency"
series.EntityValues = []*modelv1.TagValue{
{Value: &modelv1.TagValue_Str{Str: &modelv1.Str{Value: fmt.Sprintf("svc_%d", i)}}},
Expand All @@ -64,7 +62,6 @@ func TestSeriesIndex_Primary(t *testing.T) {
}
copy(doc.EntityValues, series.Buffer)
docs = append(docs, doc)
testSeriesPool.Release(series)
}
require.NoError(t, si.Write(docs))
// Restart the index
Expand Down Expand Up @@ -155,11 +152,10 @@ func TestSeriesIndex_Primary(t *testing.T) {
t.Run(tt.name, func(t *testing.T) {
var seriesQueries []*pbv1.Series
for i := range tt.entityValues {
seriesQuery := testSeriesPool.Generate()
defer testSeriesPool.Release(seriesQuery)
var seriesQuery pbv1.Series
seriesQuery.Subject = tt.subject
seriesQuery.EntityValues = tt.entityValues[i]
seriesQueries = append(seriesQueries, seriesQuery)
seriesQueries = append(seriesQueries, &seriesQuery)
}
sl, _, err := si.searchPrimary(ctx, seriesQueries, nil)
require.NoError(t, err)
Expand Down
10 changes: 10 additions & 0 deletions banyand/liaison/grpc/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package grpc

import (
"fmt"
"sync"

"github.com/pkg/errors"
Expand All @@ -37,6 +38,7 @@ var (
// together with the shardID calculated from the incoming data.
type NodeRegistry interface {
Locate(group, name string, shardID uint32) (string, error)
fmt.Stringer
}

type clusterNodeService struct {
Expand Down Expand Up @@ -94,8 +96,16 @@ func (n *clusterNodeService) OnDelete(metadata schema.Metadata) {
}
}

func (n *clusterNodeService) String() string {
return n.sel.String()
}

type localNodeService struct{}

func (l localNodeService) String() string {
return "local"
}

// NewLocalNodeRegistry creates a local(fake) node registry.
func NewLocalNodeRegistry() NodeRegistry {
return localNodeService{}
Expand Down
1 change: 0 additions & 1 deletion banyand/liaison/grpc/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,6 @@ func (s *server) Validate() error {
if s.enableIngestionAccessLog && s.accessLogRootPath == "" {
return errAccessLogRootPath
}
observability.UpdateAddress("grpc", s.addr)
if !s.tls {
return nil
}
Expand Down
2 changes: 0 additions & 2 deletions banyand/liaison/http/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ import (
measurev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/measure/v1"
propertyv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/property/v1"
streamv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/stream/v1"
"github.com/apache/skywalking-banyandb/banyand/observability"
"github.com/apache/skywalking-banyandb/pkg/logger"
"github.com/apache/skywalking-banyandb/pkg/run"
"github.com/apache/skywalking-banyandb/ui"
Expand Down Expand Up @@ -102,7 +101,6 @@ func (p *server) Validate() error {
if p.listenAddr == ":" {
return errNoAddr
}
observability.UpdateAddress("http", p.listenAddr)
if p.grpcCert != "" {
creds, errTLS := credentials.NewClientTLSFromFile(p.grpcCert, "")
if errTLS != nil {
Expand Down
14 changes: 7 additions & 7 deletions banyand/measure/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ package measure
import (
"slices"
"sort"
"sync"

"github.com/apache/skywalking-banyandb/api/common"
modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
Expand All @@ -29,6 +28,7 @@ import (
"github.com/apache/skywalking-banyandb/pkg/fs"
"github.com/apache/skywalking-banyandb/pkg/logger"
pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
"github.com/apache/skywalking-banyandb/pkg/pool"
"github.com/apache/skywalking-banyandb/pkg/query/model"
"github.com/apache/skywalking-banyandb/pkg/timestamp"
)
Expand Down Expand Up @@ -403,15 +403,15 @@ func generateBlock() *block {
if v == nil {
return &block{}
}
return v.(*block)
return v
}

func releaseBlock(b *block) {
b.reset()
blockPool.Put(b)
}

var blockPool sync.Pool
var blockPool = pool.Register[*block]("measure-block")

type blockCursor struct {
p *part
Expand Down Expand Up @@ -705,14 +705,14 @@ func (bc *blockCursor) loadData(tmpBlock *block) bool {
return true
}

var blockCursorPool sync.Pool
var blockCursorPool = pool.Register[*blockCursor]("measure-blockCursor")

func generateBlockCursor() *blockCursor {
v := blockCursorPool.Get()
if v == nil {
return &blockCursor{}
}
return v.(*blockCursor)
return v
}

func releaseBlockCursor(bc *blockCursor) {
Expand Down Expand Up @@ -832,12 +832,12 @@ func generateBlockPointer() *blockPointer {
if v == nil {
return &blockPointer{}
}
return v.(*blockPointer)
return v
}

func releaseBlockPointer(bi *blockPointer) {
bi.reset()
blockPointerPool.Put(bi)
}

var blockPointerPool sync.Pool
var blockPointerPool = pool.Register[*blockPointer]("measure-blockPointer")
11 changes: 5 additions & 6 deletions banyand/measure/block_metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,11 @@ import (
"errors"
"fmt"
"sort"
"sync"

"github.com/apache/skywalking-banyandb/api/common"
"github.com/apache/skywalking-banyandb/pkg/convert"
"github.com/apache/skywalking-banyandb/pkg/encoding"
"github.com/apache/skywalking-banyandb/pkg/pool"
"github.com/apache/skywalking-banyandb/pkg/query/model"
)

Expand Down Expand Up @@ -170,7 +170,6 @@ func (bm *blockMetadata) unmarshal(src []byte) ([]byte, error) {
if err != nil {
return nil, fmt.Errorf("cannot unmarshal tagFamily name: %w", err)
}
// TODO: cache dataBlock
tf := &dataBlock{}
src, err = tf.unmarshal(src)
if err != nil {
Expand Down Expand Up @@ -198,15 +197,15 @@ func generateBlockMetadata() *blockMetadata {
if v == nil {
return &blockMetadata{}
}
return v.(*blockMetadata)
return v
}

func releaseBlockMetadata(bm *blockMetadata) {
bm.reset()
blockMetadataPool.Put(bm)
}

var blockMetadataPool sync.Pool
var blockMetadataPool = pool.Register[*blockMetadata]("measure-blockMetadata")

type blockMetadataArray struct {
arr []blockMetadata
Expand All @@ -219,14 +218,14 @@ func (bma *blockMetadataArray) reset() {
bma.arr = bma.arr[:0]
}

var blockMetadataArrayPool sync.Pool
var blockMetadataArrayPool = pool.Register[*blockMetadataArray]("measure-blockMetadataArray")

func generateBlockMetadataArray() *blockMetadataArray {
v := blockMetadataArrayPool.Get()
if v == nil {
return &blockMetadataArray{}
}
return v.(*blockMetadataArray)
return v
}

func releaseBlockMetadataArray(bma *blockMetadataArray) {
Expand Down
10 changes: 5 additions & 5 deletions banyand/measure/block_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,11 @@ import (
"errors"
"fmt"
"io"
"sync"

"github.com/apache/skywalking-banyandb/pkg/encoding"
"github.com/apache/skywalking-banyandb/pkg/fs"
"github.com/apache/skywalking-banyandb/pkg/logger"
"github.com/apache/skywalking-banyandb/pkg/pool"
)

type seqReader struct {
Expand Down Expand Up @@ -70,7 +70,7 @@ func (sr *seqReader) mustReadFull(data []byte) {

func generateSeqReader() *seqReader {
if v := seqReaderPool.Get(); v != nil {
return v.(*seqReader)
return v
}
return &seqReader{}
}
Expand All @@ -80,7 +80,7 @@ func releaseSeqReader(sr *seqReader) {
seqReaderPool.Put(sr)
}

var seqReaderPool sync.Pool
var seqReaderPool = pool.Register[*seqReader]("measure-seqReader")

type seqReaders struct {
tagFamilyMetadata map[string]*seqReader
Expand Down Expand Up @@ -219,11 +219,11 @@ func (br *blockReader) error() error {
return br.err
}

var blockReaderPool sync.Pool
var blockReaderPool = pool.Register[*blockReader]("measure-blockReader")

func generateBlockReader() *blockReader {
if v := blockReaderPool.Get(); v != nil {
return v.(*blockReader)
return v
}
return &blockReader{}
}
Expand Down
15 changes: 11 additions & 4 deletions banyand/measure/block_reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,13 @@ func Test_blockReader_nextBlock(t *testing.T) {
{seriesID: 3, count: 1, uncompressedSizeBytes: 24},
},
},
{
name: "Test with a single part with same ts",
dpsList: []*dataPoints{duplicatedDps},
want: []blockMetadata{
{seriesID: 1, count: 1, uncompressedSizeBytes: 24},
},
},
{
name: "Test with multiple parts with same ts",
dpsList: []*dataPoints{dpsTS1, dpsTS1},
Expand All @@ -77,7 +84,7 @@ func Test_blockReader_nextBlock(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
verify := func(pp []*part) {
verify := func(t *testing.T, pp []*part) {
var pii []*partMergeIter
for _, p := range pp {
pmi := &partMergeIter{}
Expand Down Expand Up @@ -116,7 +123,7 @@ func Test_blockReader_nextBlock(t *testing.T) {
}
}

t.Run("memory parts", func(_ *testing.T) {
t.Run("memory parts", func(t *testing.T) {
var mpp []*memPart
defer func() {
for _, mp := range mpp {
Expand All @@ -130,7 +137,7 @@ func Test_blockReader_nextBlock(t *testing.T) {
mp.mustInitFromDataPoints(dps)
pp = append(pp, openMemPart(mp))
}
verify(pp)
verify(t, pp)
})

t.Run("file parts", func(t *testing.T) {
Expand Down Expand Up @@ -158,7 +165,7 @@ func Test_blockReader_nextBlock(t *testing.T) {
fpp = append(fpp, filePW)
pp = append(pp, filePW.p)
}
verify(pp)
verify(t, pp)
})
})
}
Expand Down
6 changes: 3 additions & 3 deletions banyand/measure/block_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,12 @@ package measure

import (
"path/filepath"
"sync"

"github.com/apache/skywalking-banyandb/api/common"
"github.com/apache/skywalking-banyandb/pkg/compress/zstd"
"github.com/apache/skywalking-banyandb/pkg/fs"
"github.com/apache/skywalking-banyandb/pkg/logger"
"github.com/apache/skywalking-banyandb/pkg/pool"
)

type writer struct {
Expand Down Expand Up @@ -285,12 +285,12 @@ func generateBlockWriter() *blockWriter {
},
}
}
return v.(*blockWriter)
return v
}

func releaseBlockWriter(bsw *blockWriter) {
bsw.reset()
blockWriterPool.Put(bsw)
}

var blockWriterPool sync.Pool
var blockWriterPool = pool.Register[*blockWriter]("measure-blockWriter")
2 changes: 1 addition & 1 deletion banyand/measure/column.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ func (c *column) mustSeqReadValues(decoder *encoding.BytesBlockDecoder, reader *
}
}

var bigValuePool bytes.BufferPool
var bigValuePool = bytes.NewBufferPool("measure-big-value")

type columnFamily struct {
name string
Expand Down
6 changes: 3 additions & 3 deletions banyand/measure/column_metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,11 @@ package measure

import (
"fmt"
"sync"

"github.com/apache/skywalking-banyandb/pkg/convert"
"github.com/apache/skywalking-banyandb/pkg/encoding"
pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
"github.com/apache/skywalking-banyandb/pkg/pool"
)

type columnMetadata struct {
Expand Down Expand Up @@ -148,12 +148,12 @@ func generateColumnFamilyMetadata() *columnFamilyMetadata {
if v == nil {
return &columnFamilyMetadata{}
}
return v.(*columnFamilyMetadata)
return v
}

func releaseColumnFamilyMetadata(cfm *columnFamilyMetadata) {
cfm.reset()
columnFamilyMetadataPool.Put(cfm)
}

var columnFamilyMetadataPool sync.Pool
var columnFamilyMetadataPool = pool.Register[*columnFamilyMetadata]("measure-columnFamilyMetadata")
Loading

0 comments on commit f354d0a

Please sign in to comment.