Skip to content

Commit

Permalink
Merge branch 'master' into version-bump-v1.11.4
Browse files Browse the repository at this point in the history
  • Loading branch information
StephenButtolph authored Apr 9, 2024
2 parents 5644908 + 1040ceb commit 23c4e44
Show file tree
Hide file tree
Showing 19 changed files with 653 additions and 319 deletions.
4 changes: 3 additions & 1 deletion cache/lru_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,9 @@ func (c *LRU[K, _]) evict(key K) {
}

func (c *LRU[K, V]) flush() {
c.elements = linked.NewHashmap[K, V]()
if c.elements != nil {
c.elements.Clear()
}
}

func (c *LRU[_, _]) len() int {
Expand Down
2 changes: 1 addition & 1 deletion cache/lru_sized_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ func (c *sizedLRU[K, _]) evict(key K) {
}

func (c *sizedLRU[K, V]) flush() {
c.elements = linked.NewHashmap[K, V]()
c.elements.Clear()
c.currentSize = 0
}

Expand Down
130 changes: 52 additions & 78 deletions database/prefixdb/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,10 @@ import (
"sync"

"github.com/ava-labs/avalanchego/database"
"github.com/ava-labs/avalanchego/utils"
"github.com/ava-labs/avalanchego/utils/hashing"
)

const (
defaultBufCap = 256
)

var (
_ database.Database = (*Database)(nil)
_ database.Batch = (*batch)(nil)
Expand All @@ -26,9 +23,8 @@ var (
// a unique value.
type Database struct {
// All keys in this db begin with this byte slice
dbPrefix []byte
// Holds unused []byte
bufferPool sync.Pool
dbPrefix []byte
bufferPool *utils.BytesPool

// lock needs to be held during Close to guarantee db will not be set to nil
// concurrently with another operation. All other operations can hold RLock.
Expand All @@ -40,13 +36,9 @@ type Database struct {

func newDB(prefix []byte, db database.Database) *Database {
return &Database{
dbPrefix: prefix,
db: db,
bufferPool: sync.Pool{
New: func() interface{} {
return make([]byte, 0, defaultBufCap)
},
},
dbPrefix: prefix,
db: db,
bufferPool: utils.NewBytesPool(),
}
}

Expand Down Expand Up @@ -91,9 +83,6 @@ func PrefixKey(prefix, key []byte) []byte {
return prefixedKey
}

// Assumes that it is OK for the argument to db.db.Has
// to be modified after db.db.Has returns
// [key] may be modified after this method returns.
func (db *Database) Has(key []byte) (bool, error) {
db.lock.RLock()
defer db.lock.RUnlock()
Expand All @@ -102,14 +91,11 @@ func (db *Database) Has(key []byte) (bool, error) {
return false, database.ErrClosed
}
prefixedKey := db.prefix(key)
has, err := db.db.Has(prefixedKey)
db.bufferPool.Put(prefixedKey)
return has, err
defer db.bufferPool.Put(prefixedKey)

return db.db.Has(*prefixedKey)
}

// Assumes that it is OK for the argument to db.db.Get
// to be modified after db.db.Get returns.
// [key] may be modified after this method returns.
func (db *Database) Get(key []byte) ([]byte, error) {
db.lock.RLock()
defer db.lock.RUnlock()
Expand All @@ -118,15 +104,11 @@ func (db *Database) Get(key []byte) ([]byte, error) {
return nil, database.ErrClosed
}
prefixedKey := db.prefix(key)
val, err := db.db.Get(prefixedKey)
db.bufferPool.Put(prefixedKey)
return val, err
defer db.bufferPool.Put(prefixedKey)

return db.db.Get(*prefixedKey)
}

// Assumes that it is OK for the argument to db.db.Put
// to be modified after db.db.Put returns.
// [key] can be modified after this method returns.
// [value] should not be modified.
func (db *Database) Put(key, value []byte) error {
db.lock.RLock()
defer db.lock.RUnlock()
Expand All @@ -135,14 +117,11 @@ func (db *Database) Put(key, value []byte) error {
return database.ErrClosed
}
prefixedKey := db.prefix(key)
err := db.db.Put(prefixedKey, value)
db.bufferPool.Put(prefixedKey)
return err
defer db.bufferPool.Put(prefixedKey)

return db.db.Put(*prefixedKey, value)
}

// Assumes that it is OK for the argument to db.db.Delete
// to be modified after db.db.Delete returns.
// [key] may be modified after this method returns.
func (db *Database) Delete(key []byte) error {
db.lock.RLock()
defer db.lock.RUnlock()
Expand All @@ -151,9 +130,9 @@ func (db *Database) Delete(key []byte) error {
return database.ErrClosed
}
prefixedKey := db.prefix(key)
err := db.db.Delete(prefixedKey)
db.bufferPool.Put(prefixedKey)
return err
defer db.bufferPool.Put(prefixedKey)

return db.db.Delete(*prefixedKey)
}

func (db *Database) NewBatch() database.Batch {
Expand Down Expand Up @@ -186,15 +165,17 @@ func (db *Database) NewIteratorWithStartAndPrefix(start, prefix []byte) database
Err: database.ErrClosed,
}
}

prefixedStart := db.prefix(start)
defer db.bufferPool.Put(prefixedStart)

prefixedPrefix := db.prefix(prefix)
it := &iterator{
Iterator: db.db.NewIteratorWithStartAndPrefix(prefixedStart, prefixedPrefix),
defer db.bufferPool.Put(prefixedPrefix)

return &iterator{
Iterator: db.db.NewIteratorWithStartAndPrefix(*prefixedStart, *prefixedPrefix),
db: db,
}
db.bufferPool.Put(prefixedStart)
db.bufferPool.Put(prefixedPrefix)
return it
}

func (db *Database) Compact(start, limit []byte) error {
Expand All @@ -204,7 +185,14 @@ func (db *Database) Compact(start, limit []byte) error {
if db.closed {
return database.ErrClosed
}
return db.db.Compact(db.prefix(start), db.prefix(limit))

prefixedStart := db.prefix(start)
defer db.bufferPool.Put(prefixedStart)

prefixedLimit := db.prefix(limit)
defer db.bufferPool.Put(prefixedLimit)

return db.db.Compact(*prefixedStart, *prefixedLimit)
}

func (db *Database) Close() error {
Expand Down Expand Up @@ -236,23 +224,12 @@ func (db *Database) HealthCheck(ctx context.Context) (interface{}, error) {
}

// Return a copy of [key], prepended with this db's prefix.
// The returned slice should be put back in the pool
// when it's done being used.
func (db *Database) prefix(key []byte) []byte {
// Get a []byte from the pool
prefixedKey := db.bufferPool.Get().([]byte)
// The returned slice should be put back in the pool when it's done being used.
func (db *Database) prefix(key []byte) *[]byte {
keyLen := len(db.dbPrefix) + len(key)
if cap(prefixedKey) >= keyLen {
// The [] byte we got from the pool is big enough to hold the prefixed key
prefixedKey = prefixedKey[:keyLen]
} else {
// The []byte from the pool wasn't big enough.
// Put it back and allocate a new, bigger one
db.bufferPool.Put(prefixedKey)
prefixedKey = make([]byte, keyLen)
}
copy(prefixedKey, db.dbPrefix)
copy(prefixedKey[len(db.dbPrefix):], key)
prefixedKey := db.bufferPool.Get(keyLen)
copy(*prefixedKey, db.dbPrefix)
copy((*prefixedKey)[len(db.dbPrefix):], key)
return prefixedKey
}

Expand All @@ -264,33 +241,32 @@ type batch struct {
// Each key is prepended with the database's prefix.
// Each byte slice underlying a key should be returned to the pool
// when this batch is reset.
ops []database.BatchOp
ops []batchOp
}

type batchOp struct {
Key *[]byte
Value []byte
Delete bool
}

// Assumes that it is OK for the argument to b.Batch.Put
// to be modified after b.Batch.Put returns
// [key] may be modified after this method returns.
// [value] may be modified after this method returns.
func (b *batch) Put(key, value []byte) error {
prefixedKey := b.db.prefix(key)
copiedValue := slices.Clone(value)
b.ops = append(b.ops, database.BatchOp{
b.ops = append(b.ops, batchOp{
Key: prefixedKey,
Value: copiedValue,
})
return b.Batch.Put(prefixedKey, copiedValue)
return b.Batch.Put(*prefixedKey, copiedValue)
}

// Assumes that it is OK for the argument to b.Batch.Delete
// to be modified after b.Batch.Delete returns
// [key] may be modified after this method returns.
func (b *batch) Delete(key []byte) error {
prefixedKey := b.db.prefix(key)
b.ops = append(b.ops, database.BatchOp{
b.ops = append(b.ops, batchOp{
Key: prefixedKey,
Delete: true,
})
return b.Batch.Delete(prefixedKey)
return b.Batch.Delete(*prefixedKey)
}

// Write flushes any accumulated data to the memory database.
Expand All @@ -316,19 +292,17 @@ func (b *batch) Reset() {

// Clear b.writes
if cap(b.ops) > len(b.ops)*database.MaxExcessCapacityFactor {
b.ops = make([]database.BatchOp, 0, cap(b.ops)/database.CapacityReductionFactor)
b.ops = make([]batchOp, 0, cap(b.ops)/database.CapacityReductionFactor)
} else {
b.ops = b.ops[:0]
}
b.Batch.Reset()
}

// Replay replays the batch contents.
// Assumes it's safe to modify the key argument to w.Delete and w.Put
// after those methods return.
// Replay the batch contents.
func (b *batch) Replay(w database.KeyValueWriterDeleter) error {
for _, op := range b.ops {
keyWithoutPrefix := op.Key[len(b.db.dbPrefix):]
keyWithoutPrefix := (*op.Key)[len(b.db.dbPrefix):]
if op.Delete {
if err := w.Delete(keyWithoutPrefix); err != nil {
return err
Expand Down
42 changes: 39 additions & 3 deletions network/p2p/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,10 @@ import (
"errors"
"fmt"

"go.uber.org/zap"

"github.com/ava-labs/avalanchego/ids"
"github.com/ava-labs/avalanchego/message"
"github.com/ava-labs/avalanchego/snow/engine/common"
"github.com/ava-labs/avalanchego/utils/set"
)
Expand Down Expand Up @@ -72,6 +75,14 @@ func (c *Client) AppRequest(
appRequestBytes []byte,
onResponse AppResponseCallback,
) error {
// Cancellation is removed from this context to avoid erroring unexpectedly.
// SendAppRequest should be non-blocking and any error other than context
// cancellation is unexpected.
//
// This guarantees that the router should never receive an unexpected
// AppResponse.
ctxWithoutCancel := context.WithoutCancel(ctx)

c.router.lock.Lock()
defer c.router.lock.Unlock()

Expand All @@ -87,11 +98,17 @@ func (c *Client) AppRequest(
}

if err := c.sender.SendAppRequest(
ctx,
ctxWithoutCancel,
set.Of(nodeID),
requestID,
appRequestBytes,
); err != nil {
c.router.log.Error("unexpected error when sending message",
zap.Stringer("op", message.AppRequestOp),
zap.Stringer("nodeID", nodeID),
zap.Uint32("requestID", requestID),
zap.Error(err),
)
return err
}

Expand All @@ -111,8 +128,13 @@ func (c *Client) AppGossip(
config common.SendConfig,
appGossipBytes []byte,
) error {
// Cancellation is removed from this context to avoid erroring unexpectedly.
// SendAppGossip should be non-blocking and any error other than context
// cancellation is unexpected.
ctxWithoutCancel := context.WithoutCancel(ctx)

return c.sender.SendAppGossip(
ctx,
ctxWithoutCancel,
config,
PrefixMessage(c.handlerPrefix, appGossipBytes),
)
Expand All @@ -126,6 +148,14 @@ func (c *Client) CrossChainAppRequest(
appRequestBytes []byte,
onResponse CrossChainAppResponseCallback,
) error {
// Cancellation is removed from this context to avoid erroring unexpectedly.
// SendCrossChainAppRequest should be non-blocking and any error other than
// context cancellation is unexpected.
//
// This guarantees that the router should never receive an unexpected
// CrossChainAppResponse.
ctxWithoutCancel := context.WithoutCancel(ctx)

c.router.lock.Lock()
defer c.router.lock.Unlock()

Expand All @@ -139,11 +169,17 @@ func (c *Client) CrossChainAppRequest(
}

if err := c.sender.SendCrossChainAppRequest(
ctx,
ctxWithoutCancel,
chainID,
requestID,
PrefixMessage(c.handlerPrefix, appRequestBytes),
); err != nil {
c.router.log.Error("unexpected error when sending message",
zap.Stringer("op", message.CrossChainAppRequestOp),
zap.Stringer("chainID", chainID),
zap.Uint32("requestID", requestID),
zap.Error(err),
)
return err
}

Expand Down
Loading

0 comments on commit 23c4e44

Please sign in to comment.