Skip to content

Commit

Permalink
network: tiny speedup, less allocation (algorand#6246)
Browse files Browse the repository at this point in the history
  • Loading branch information
jannotti authored Feb 19, 2025
1 parent be09340 commit 5f12e1a
Show file tree
Hide file tree
Showing 7 changed files with 69 additions and 80 deletions.
4 changes: 2 additions & 2 deletions config/consensus.go
Original file line number Diff line number Diff line change
Expand Up @@ -704,8 +704,8 @@ var MaxBytesKeyValueLen int
// of the consensus protocols. used for decoding purposes.
var MaxExtraAppProgramLen int

// MaxAvailableAppProgramLen is the largest supported app program size include the extra pages
// supported supported by any of the consensus protocols. used for decoding purposes.
// MaxAvailableAppProgramLen is the largest supported app program size including the extra
// pages supported by any of the consensus protocols. used for decoding purposes.
var MaxAvailableAppProgramLen int

// MaxProposedExpiredOnlineAccounts is the maximum number of online accounts
Expand Down
4 changes: 2 additions & 2 deletions data/transactions/verify/verifiedTxnCache.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,12 @@ var errTooManyPinnedEntries = &VerifiedTxnCacheError{errors.New("Too many pinned
// errMissingPinnedEntry is being generated when we're trying to pin a transaction that does not appear in the cache
var errMissingPinnedEntry = &VerifiedTxnCacheError{errors.New("Missing pinned entry")}

// VerifiedTransactionCache provides a cached store of recently verified transactions. The cache is desiged two have two separate "levels". On the
// VerifiedTransactionCache provides a cached store of recently verified transactions. The cache is designed to have two separate "levels". On the
// bottom tier, the cache would be using a cyclic buffer, where old transactions would end up overridden by new ones. In order to support transactions
// that goes into the transaction pool, we have a higher tier of pinned cache. Pinned transactions would not be cycled-away by new incoming transactions,
// and would only get eliminated by updates to the transaction pool, which would inform the cache of updates to the pinned items.
type VerifiedTransactionCache interface {
// Add adds a given transaction group and it's associated group context to the cache. If any of the transactions already appear
// Add adds a given transaction group and its associated group context to the cache. If any of the transactions already appear
// in the cache, the new entry overrides the old one.
Add(txgroup []transactions.SignedTxn, groupCtx *GroupContext)
// AddPayset works in a similar way to Add, but is intended for adding an array of transaction groups, along with their corresponding contexts.
Expand Down
43 changes: 21 additions & 22 deletions data/txDupCache.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import (
)

// digestCache is a rotating cache of size N accepting crypto.Digest as a key
// and keeping up to 2*N elements in memory
// and keeping up to 2*maxSize elements in memory
type digestCache struct {
cur map[crypto.Digest]struct{}
prev map[crypto.Digest]struct{}
Expand All @@ -49,11 +49,11 @@ func makeDigestCache(size int) *digestCache {
}

// check if digest d is in a cache.
// locking semantic: write lock must be taken
func (c *digestCache) check(d *crypto.Digest) bool {
_, found := c.cur[*d]
// locking semantic: read lock must be taken
func (c *digestCache) check(d crypto.Digest) bool {
_, found := c.cur[d]
if !found {
_, found = c.prev[*d]
_, found = c.prev[d]
}
return found
}
Expand All @@ -67,15 +67,15 @@ func (c *digestCache) swap() {

// put adds digest d into a cache.
// locking semantic: write lock must be taken
func (c *digestCache) put(d *crypto.Digest) {
func (c *digestCache) put(d crypto.Digest) {
if len(c.cur) >= c.maxSize {
c.swap()
}
c.cur[*d] = struct{}{}
c.cur[d] = struct{}{}
}

// CheckAndPut adds digest d into a cache if not found
func (c *digestCache) CheckAndPut(d *crypto.Digest) bool {
func (c *digestCache) CheckAndPut(d crypto.Digest) bool {
c.mu.Lock()
defer c.mu.Unlock()
if c.check(d) {
Expand All @@ -94,11 +94,11 @@ func (c *digestCache) Len() int {
}

// Delete from the cache
func (c *digestCache) Delete(d *crypto.Digest) {
func (c *digestCache) Delete(d crypto.Digest) {
c.mu.Lock()
defer c.mu.Unlock()
delete(c.cur, *d)
delete(c.prev, *d)
delete(c.cur, d)
delete(c.prev, d)
}

// txSaltedCache is a digest cache with a rotating salt
Expand Down Expand Up @@ -179,8 +179,8 @@ func (c *txSaltedCache) innerSwap(scheduled bool) {
}

// innerCheck returns true if exists, and the current salted hash if does not.
// locking semantic: write lock must be held
func (c *txSaltedCache) innerCheck(msg []byte) (*crypto.Digest, bool) {
// locking semantic: READ lock must be held, cache is not mutated
func (c *txSaltedCache) innerCheck(msg []byte) (crypto.Digest, bool) {
ptr := saltedPool.Get()
defer saltedPool.Put(ptr)

Expand All @@ -193,22 +193,22 @@ func (c *txSaltedCache) innerCheck(msg []byte) (*crypto.Digest, bool) {

_, found := c.cur[d]
if found {
return nil, true
return crypto.Digest{}, true
}

toBeHashed = append(toBeHashed[:len(msg)], c.prevSalt[:]...)
toBeHashed = toBeHashed[:len(msg)+len(c.prevSalt)]
pd := crypto.Digest(blake2b.Sum256(toBeHashed))
_, found = c.prev[pd]
if found {
return nil, true
return crypto.Digest{}, true
}
return &d, false
return d, false
}

// CheckAndPut adds msg into a cache if not found
// returns a hashing key used for insertion if the message not found.
func (c *txSaltedCache) CheckAndPut(msg []byte) (*crypto.Digest, bool) {
func (c *txSaltedCache) CheckAndPut(msg []byte) (crypto.Digest, bool) {
c.mu.RLock()
d, found := c.innerCheck(msg)
salt := c.curSalt
Expand All @@ -231,7 +231,7 @@ func (c *txSaltedCache) CheckAndPut(msg []byte) (*crypto.Digest, bool) {
} else {
// Do another check to see if another copy of the transaction won the race to write it to the cache
// Only check current to save a lookup since swaps are rare and no need to re-hash
if _, found := c.cur[*d]; found {
if _, found := c.cur[d]; found {
return d, found
}
}
Expand All @@ -246,16 +246,15 @@ func (c *txSaltedCache) CheckAndPut(msg []byte) (*crypto.Digest, bool) {
toBeHashed = append(toBeHashed, c.curSalt[:]...)
toBeHashed = toBeHashed[:len(msg)+len(c.curSalt)]

dn := crypto.Digest(blake2b.Sum256(toBeHashed))
d = &dn
d = crypto.Digest(blake2b.Sum256(toBeHashed))
}

c.cur[*d] = struct{}{}
c.cur[d] = struct{}{}
return d, false
}

// DeleteByKey from the cache by using a key used for insertion
func (c *txSaltedCache) DeleteByKey(d *crypto.Digest) {
func (c *txSaltedCache) DeleteByKey(d crypto.Digest) {
c.digestCache.Delete(d)
}

Expand Down
31 changes: 16 additions & 15 deletions data/txDupCache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,18 +45,18 @@ func TestTxHandlerDigestCache(t *testing.T) {
var ds [size]crypto.Digest
for i := 0; i < size; i++ {
crypto.RandBytes([]byte(ds[i][:]))
exist := cache.CheckAndPut(&ds[i])
exist := cache.CheckAndPut(ds[i])
require.False(t, exist)

exist = cache.check(&ds[i])
exist = cache.check(ds[i])
require.True(t, exist)
}

require.Equal(t, size, cache.Len())

// try to re-add, ensure not added
for i := 0; i < size; i++ {
exist := cache.CheckAndPut(&ds[i])
exist := cache.CheckAndPut(ds[i])
require.True(t, exist)
}

Expand All @@ -66,45 +66,45 @@ func TestTxHandlerDigestCache(t *testing.T) {
var ds2 [size]crypto.Digest
for i := 0; i < size; i++ {
crypto.RandBytes(ds2[i][:])
exist := cache.CheckAndPut(&ds2[i])
exist := cache.CheckAndPut(ds2[i])
require.False(t, exist)

exist = cache.check(&ds2[i])
exist = cache.check(ds2[i])
require.True(t, exist)
}

require.Equal(t, 2*size, cache.Len())

var d crypto.Digest
crypto.RandBytes(d[:])
exist := cache.CheckAndPut(&d)
exist := cache.CheckAndPut(d)
require.False(t, exist)
exist = cache.check(&d)
exist = cache.check(d)
require.True(t, exist)

require.Equal(t, size+1, cache.Len())

// ensure hashes from the prev batch are still there
for i := 0; i < size; i++ {
exist := cache.check(&ds2[i])
exist := cache.check(ds2[i])
require.True(t, exist)
}

// ensure hashes from the first batch are gone
for i := 0; i < size; i++ {
exist := cache.check(&ds[i])
exist := cache.check(ds[i])
require.False(t, exist)
}

// check deletion works
for i := 0; i < size; i++ {
cache.Delete(&ds[i])
cache.Delete(&ds2[i])
cache.Delete(ds[i])
cache.Delete(ds2[i])
}

require.Equal(t, 1, cache.Len())

cache.Delete(&d)
cache.Delete(d)
require.Equal(t, 0, cache.Len())
}

Expand All @@ -125,7 +125,7 @@ func TestTxHandlerSaltedCacheBasic(t *testing.T) {

// add some unique random
var ds [size][8]byte
var ks [size]*crypto.Digest
var ks [size]crypto.Digest
var exist bool
for i := 0; i < size; i++ {
crypto.RandBytes([]byte(ds[i][:]))
Expand All @@ -150,7 +150,7 @@ func TestTxHandlerSaltedCacheBasic(t *testing.T) {

// add some more and ensure capacity switch
var ds2 [size][8]byte
var ks2 [size]*crypto.Digest
var ks2 [size]crypto.Digest
for i := 0; i < size; i++ {
crypto.RandBytes(ds2[i][:])
ks2[i], exist = cache.CheckAndPut(ds2[i][:])
Expand Down Expand Up @@ -309,7 +309,7 @@ func (p *digestCachePusher) push() {
var d [crypto.DigestSize]byte
crypto.RandBytes(d[:])
h := crypto.Digest(blake2b.Sum256(d[:])) // digestCache does not hashes so calculate hash here
p.c.CheckAndPut(&h)
p.c.CheckAndPut(h)
}

func (p *saltedCachePusher) push() {
Expand Down Expand Up @@ -342,6 +342,7 @@ func BenchmarkDigestCaches(b *testing.B) {
}
for _, bench := range benchmarks {
b.Run(fmt.Sprintf("%T/threads=%d", bench.maker, bench.numThreads), func(b *testing.B) {
b.ReportAllocs()
benchmarkDigestCache(b, bench.maker, bench.numThreads)
})
}
Expand Down
Loading

0 comments on commit 5f12e1a

Please sign in to comment.