Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP txHandler: do not rebroadcast to peers sent duplicate messages #5424

Draft
wants to merge 25 commits into
base: master
Choose a base branch
from

Conversation

algorandskiy
Copy link
Contributor

@algorandskiy algorandskiy commented May 26, 2023

Summary

When the transaction deduplication was added it had a side effect of re-sending transactions to peers sent a message the handler has seen before. This PR fixes this.

Before dedup if A sent T, all but A get T, then, if B sent us T, noone gets T again
With dedup, if A sent T, all but A get T, then, if B sent us T, noone gets T again.
With this PR: if A and B sent T almost simultaneously all but A and B get T

Implementation overview:

  1. txSaltedCache how stores map of seen peers for a particular message
  2. CheckAndPut returns this map to to get rid additional lookups.
  3. The map is later used in RelayArray to let networking know whom to skip.

Implementation Details

CheckAndPut is not more complex because it needs to update the map value even if there is a match.
The original idea with fast check under a read lock is preserved but innerCheck also returns a current value and a "page" (cur/prev map) where it was found in order to update. Note innerCheck can return prev page and this also need to be considered when running an update with a write lock taken.
The found && senderFound denotes a new fast path without modification underlying maps.

Note, a reference data struct (*sync.Map in this case) is crucial to have the implementation to work because of the following scenario:

  1. Received txn A from N1, wrote to the cache, the the cache value (peers list) is attached to a work item txBacklogMsg (wi ) of this transaction.
  2. Received txn A from N2, updated the peers list. The reference in wi is updated automatically.
  3. It is time to relay, give the network the same peers list
  4. Received txn A from N3, updated to the cache value (peers list)
  5. The network has updated peers list with N3
  6. After broadcasting received txn A from N4, updated to the cache value (peers list), but b/c it is a duplicate it would not make its path to the network.

Test Plan

Added a new unit test

Benchmarks

master

BenchmarkDigestCaches/data.digestCacheMaker/threads=1-8         	 1000000	      1575 ns/op
BenchmarkDigestCaches/data.saltedCacheMaker/threads=1-8         	  769404	      1886 ns/op
BenchmarkDigestCaches/data.digestCacheMaker/threads=4-8         	 1347638	       992.8 ns/op
BenchmarkDigestCaches/data.saltedCacheMaker/threads=4-8         	  777824	      2042 ns/op
BenchmarkDigestCaches/data.digestCacheMaker/threads=16-8        	 1248975	       984.3 ns/op
BenchmarkDigestCaches/data.saltedCacheMaker/threads=16-8        	  709402	      2234 ns/op
BenchmarkDigestCaches/data.digestCacheMaker/threads=128-8       	 1233063	       996.1 ns/op
BenchmarkDigestCaches/data.saltedCacheMaker/threads=128-8       	  619723	      2289 ns/op

feature

BenchmarkDigestCaches/data.digestCacheMaker/threads=1-8         	  978464	      1460 ns/op
BenchmarkDigestCaches/data.saltedCacheMaker/threads=1-8         	  575578	      2303 ns/op
BenchmarkDigestCaches/data.saltedCacheDupMaker/threads=1-8      	  550164	      2238 ns/op
BenchmarkDigestCaches/data.digestCacheMaker/threads=4-8         	 1395024	       877.9 ns/op
BenchmarkDigestCaches/data.saltedCacheMaker/threads=4-8         	  499202	      2773 ns/op
BenchmarkDigestCaches/data.saltedCacheDupMaker/threads=4-8      	  651159	      1883 ns/op
BenchmarkDigestCaches/data.digestCacheMaker/threads=16-8        	 1308243	       948.8 ns/op
BenchmarkDigestCaches/data.saltedCacheMaker/threads=16-8        	  430381	      3130 ns/op
BenchmarkDigestCaches/data.saltedCacheDupMaker/threads=16-8     	  954093	      1338 ns/op
BenchmarkDigestCaches/data.digestCacheMaker/threads=128-8       	 1305469	       952.8 ns/op
BenchmarkDigestCaches/data.saltedCacheMaker/threads=128-8       	  443257	      3026 ns/op
BenchmarkDigestCaches/data.saltedCacheDupMaker/threads=128-8    	 1756513	       686.5 ns/op

@codecov
Copy link

codecov bot commented May 26, 2023

Codecov Report

Attention: Patch coverage is 69.49153% with 18 lines in your changes missing coverage. Please review.

Project coverage is 50.43%. Comparing base (5ff0c22) to head (2fa4046).
Report is 511 commits behind head on master.

Files with missing lines Patch % Lines
data/txDupCache.go 79.48% 7 Missing and 1 partial ⚠️
network/wsNetwork.go 38.46% 6 Missing and 2 partials ⚠️
data/txHandler.go 71.42% 1 Missing and 1 partial ⚠️

❗ There is a different number of reports uploaded between BASE (5ff0c22) and HEAD (2fa4046). Click for more details.

HEAD has 42 uploads less than BASE
Flag BASE (5ff0c22) HEAD (2fa4046)
64 22
Additional details and impacted files
@@            Coverage Diff             @@
##           master    #5424      +/-   ##
==========================================
- Coverage   55.60%   50.43%   -5.18%     
==========================================
  Files         447      447              
  Lines       63395    63422      +27     
==========================================
- Hits        35253    31986    -3267     
- Misses      25760    28931    +3171     
- Partials     2382     2505     +123     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

data/txDupCache.go Outdated Show resolved Hide resolved
data/txDupCache.go Outdated Show resolved Hide resolved
data/txDupCache.go Show resolved Hide resolved
data/txDupCache_test.go Outdated Show resolved Hide resolved
AlgoAxel
AlgoAxel previously approved these changes Jun 2, 2023
Copy link
Contributor

@AlgoAxel AlgoAxel left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One opportunity to use Eventually, but up to you if you take it.

data/txHandler_test.go Outdated Show resolved Hide resolved
data/txHandler_test.go Outdated Show resolved Hide resolved
data/txDupCache.go Outdated Show resolved Hide resolved
@algorandskiy algorandskiy dismissed stale reviews from AlgoAxel and iansuvak via a9da829 June 6, 2023 15:02
@algorandskiy
Copy link
Contributor Author

Fixed outstanding review comments and merged master in

vals, found = c.cur[*d]
if found {
if _, senderFound = vals.Load(sender); senderFound {
return d, vals, true
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

missing a test for this case?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it is hard to test since the value need to appear in a current page between read write locks

algonautshant
algonautshant previously approved these changes Jun 6, 2023
Copy link
Contributor

@algonautshant algonautshant left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good.

Many more locks, map lookups, code complexity and memory footprint added for possibly very limited gain.
Will be nice to evaluate this tradeoff thoroughly and be prepare to reverse this if it is not sufficiently helpful.

iansuvak
iansuvak previously approved these changes Jun 6, 2023
@algorandskiy
Copy link
Contributor Author

A current delay between accepting and and relaying is:

  1. Hashing + cache access
  2. Decoding
  3. Signature verification within 2ms batch
  4. txpool Check + Remember delay

I.e. there is some delay but might not be enough to collect enough duplicates to make re-broadcasting filtering effective.
TODO, as a follow up PR: benchmark and and implement ~50ms (?) delay before re-broadcasting in order to collect as much duplicates as possible without introducing too much tx latency.

@algorandskiy algorandskiy marked this pull request as draft June 9, 2023 02:15
@algorandskiy
Copy link
Contributor Author

Do not merge - the benchmark showed 6.9k tps vs 7.7k tps on master, so networking code appears to be much slower now.

@algorandskiy algorandskiy changed the title txHandler: do not rebroadcast to peers sent duplicate messages WIP txHandler: do not rebroadcast to peers sent duplicate messages Jun 9, 2023
@cce
Copy link
Contributor

cce commented Jun 9, 2023

Given the performance impact would it be possible to have this behavior be optional? Like keep the original txSaltedCache with the old struct{} value type, and then a new txPeerTrackingCache type that could be optionally enabled?

data/txDupCache.go Outdated Show resolved Hide resolved
@@ -315,7 +315,7 @@ func (p *digestCachePusher) push() {
func (p *saltedCachePusher) push() {
var d [crypto.DigestSize]byte
crypto.RandBytes(d[:])
p.c.CheckAndPut(d[:]) // saltedCache hashes inside
p.c.CheckAndPut(d[:], struct{}{}) // saltedCache hashes inside
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't realize this before, but your benchmark is only checking the raw performance of totally unique txns, with no duplicates, which is a synthetic scenario good for performance comparison but the real workload will probably have at least a few duplicates (and peers) per digest

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, updated the benchmark and posted results

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe give sync.Map a pointer to an object rather than struct{}{}? I can't imagine how it will convert that to a map key

if found {
return d, found
if _, senderFound = vals.Load(sender); senderFound {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it necessary to have all these checks for whether you've seen the sender already? Or can we just optimistically call Store again for that case? it seems like this would simplify the code a bit, e.g.

func (c *txSaltedCache) CheckAndPut(msg []byte, sender network.Peer) (d *crypto.Digest, vals *sync.Map, found bool) {
    c.mu.RLock()
    d, vals, _, found = c.innerCheck(msg)
    c.mu.RUnlock()
    salt := c.curSalt
    // fast read-only path: assuming most messages are duplicates, hash msg and check cache
    if found {
        vals.Store(sender, struct{}{}) // record the sender for this txn
        return d, vals, true
    }

    // not found: acquire write lock to add this msg hash to cache
    c.mu.Lock()
    defer c.mu.Unlock()
    // salt may have changed between RUnlock() and Lock(), rehash if needed
    if salt != c.curSalt {
        d, vals, _, found = c.innerCheck(msg)
        if found {
            // already added to cache between RUnlock() and Lock(), return
            vals.Store(sender, struct{}{}) // record the sender for this txn
            return d, vals, true
        }
    } else { // not found or found in cur page
        // Do another check to see if another copy of the transaction won the race to write it to the cache
        // Only check current to save a lookup since swap is handled in the first branch
        vals, found = c.cur[*d]
        if found {
            vals.Store(sender, struct{}{}) // record the sender for this txn
            return d, vals, true
        }
    }

    if len(c.cur) >= c.maxSize {
        c.innerSwap()
        ptr := saltedPool.Get()
        defer saltedPool.Put(ptr)

        buf := ptr.([]byte)
        toBeHashed := append(buf[:0], msg...)
        toBeHashed = append(toBeHashed, c.curSalt[:]...)
        toBeHashed = toBeHashed[:len(msg)+len(c.curSalt)]

        dn := crypto.Digest(blake2b.Sum256(toBeHashed))
        d = &dn
    }

    vals = &sync.Map{}
    vals.Store(sender, struct{}{}) // record the sender for this txn
    c.cur[*d] = vals
    return d, vals, false
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll benchmark this version vs Load + Store I have

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BenchmarkDigestCaches/data.digestCacheMaker/threads=1-8         	  944719	      1435 ns/op
BenchmarkDigestCaches/data.saltedCacheMaker/threads=1-8         	  585506	      2342 ns/op
BenchmarkDigestCaches/data.saltedCacheDupMaker/threads=1-8      	  568954	      2254 ns/op
BenchmarkDigestCaches/data.digestCacheMaker/threads=4-8         	 1327412	       893.9 ns/op
BenchmarkDigestCaches/data.saltedCacheMaker/threads=4-8         	  495157	      2733 ns/op
BenchmarkDigestCaches/data.saltedCacheDupMaker/threads=4-8      	  627554	      2041 ns/op
BenchmarkDigestCaches/data.digestCacheMaker/threads=16-8        	 1287717	      1023 ns/op
BenchmarkDigestCaches/data.saltedCacheMaker/threads=16-8        	  382932	      3128 ns/op
BenchmarkDigestCaches/data.saltedCacheDupMaker/threads=16-8     	  981458	      1280 ns/op
BenchmarkDigestCaches/data.digestCacheMaker/threads=128-8       	 1307422	       988.9 ns/op
BenchmarkDigestCaches/data.saltedCacheMaker/threads=128-8       	  406440	      3055 ns/op
BenchmarkDigestCaches/data.saltedCacheDupMaker/threads=128-8    	 1475484	       787.5 ns/op

idk, kind of the same.

The Map type is optimized for two common use cases: (1) when the entry for a given key is only ever written once but read many times, as in caches that only grow, or (2) when multiple goroutines read, write, and overwrite entries for disjoint sets of keys.

It appears to be a sharded map hashmap, and according to (1) it is better to read rather then rewrite

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My thinking was, the only reason to do the Load() before Store() is to optimize for the case where the same peer gives you same the transaction multiple times, which seems unlikely

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's definitely not the case that "multiple goroutines read, write, and overwrite entries for disjoint sets of keys" in your benchmarks.

However in practice, assuming every peer sends the same txn once, and there are 20 handlers that are randomly assigned to the txns from different peers, this is more likely to be true.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sync.Map.Store actually calls Load at the very beginning so in makes sense to call Store directly.
But anyway, allocation new sync.Map per new txn appears to be a main contributor to the TPS slowdown.

@algorandskiy algorandskiy dismissed stale reviews from iansuvak and algonautshant via bf3bad5 June 9, 2023 16:11
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants