Skip to content

Commit

Permalink
Integrate stake weighted gossip selection (#511)
Browse files Browse the repository at this point in the history
* Integrate stake weighted gossip selection

* nit

* merged

* add validity check
  • Loading branch information
StephenButtolph authored and ceyonur committed Mar 18, 2024
1 parent 6fefaaf commit 0e0870f
Show file tree
Hide file tree
Showing 11 changed files with 422 additions and 14 deletions.
4 changes: 4 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,11 @@ go 1.21

require (
github.com/VictoriaMetrics/fastcache v1.10.0
<<<<<<< HEAD
github.com/ava-labs/avalanchego v1.11.2
=======
github.com/ava-labs/avalanchego v1.11.3-stake-weighted-gossip.2
>>>>>>> 16cf2556ea (Integrate stake weighted gossip selection (#511))
github.com/cespare/cp v0.1.0
github.com/davecgh/go-spew v1.1.1
github.com/deckarep/golang-set/v2 v2.1.0
Expand Down
5 changes: 5 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,15 @@ github.com/ajg/form v1.5.1/go.mod h1:uL1WgH+h2mgNtvBq0339dVnzXdBETtL2LeUXaIv25UY
github.com/allegro/bigcache v1.2.1-0.20190218064605-e24eb225f156 h1:eMwmnE/GDgah4HI848JfFxHt+iPb26b4zyfspmqY0/8=
github.com/allegro/bigcache v1.2.1-0.20190218064605-e24eb225f156/go.mod h1:Cb/ax3seSYIx7SuZdm2G2xzfwmv3TPSk2ucNfQESPXM=
github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod h1:grANhF5doyWs3UAsr3K4I6qtAmlQcZDesFNEHPZAzj8=
<<<<<<< HEAD
github.com/ava-labs/avalanchego v1.11.2 h1:8iodZ+RjqpRwHdiXPPtvaNt72qravge7voGzw3yPRzg=
github.com/ava-labs/avalanchego v1.11.2/go.mod h1:oTVnF9idL57J4LM/6RByTmKhI4QvV6OCnF99ysyBljE=
github.com/ava-labs/coreth v0.13.2-0.20240304213436-8afbf2d68461 h1:SIwGF3eVEwmexLm7is/MvG7W5sbmpGXaUT6RfUPP3jw=
github.com/ava-labs/coreth v0.13.2-0.20240304213436-8afbf2d68461/go.mod h1:v24MTMbxFSvyM7YeQFyWiXjIzVo2+UVs7tgH7xrByew=
=======
github.com/ava-labs/avalanchego v1.11.3-stake-weighted-gossip.2 h1:27eTGL+BfWj/QyeWA56rZazRFgO3Ydhq2sOuUjV3geY=
github.com/ava-labs/avalanchego v1.11.3-stake-weighted-gossip.2/go.mod h1:LZlP3XIzML5IPwxs0jJdMDhTvTkNCakLVsPRHQYm0hI=
>>>>>>> 16cf2556ea (Integrate stake weighted gossip selection (#511))
github.com/aymerick/raymond v2.0.3-0.20180322193309-b565731e1464+incompatible/go.mod h1:osfaiScAUVup+UC9Nfq76eWqDhXlp+4UYaA8uhTBO6g=
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
Expand Down
10 changes: 3 additions & 7 deletions peer/network_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -871,7 +871,7 @@ type testAppSender struct {
sendCrossChainAppResponseFn func(ids.ID, uint32, []byte) error
sendAppRequestFn func(context.Context, set.Set[ids.NodeID], uint32, []byte) error
sendAppResponseFn func(ids.NodeID, uint32, []byte) error
sendAppGossipFn func([]byte, int, int, int) error
sendAppGossipFn func(common.SendConfig, []byte) error
}

func (t testAppSender) SendCrossChainAppRequest(_ context.Context, chainID ids.ID, requestID uint32, appRequestBytes []byte) error {
Expand All @@ -882,10 +882,6 @@ func (t testAppSender) SendCrossChainAppResponse(_ context.Context, chainID ids.
return t.sendCrossChainAppResponseFn(chainID, requestID, appResponseBytes)
}

func (t testAppSender) SendAppGossipSpecific(context.Context, set.Set[ids.NodeID], []byte) error {
panic("not implemented")
}

func (t testAppSender) SendAppRequest(ctx context.Context, nodeIDs set.Set[ids.NodeID], requestID uint32, message []byte) error {
return t.sendAppRequestFn(ctx, nodeIDs, requestID, message)
}
Expand All @@ -894,8 +890,8 @@ func (t testAppSender) SendAppResponse(_ context.Context, nodeID ids.NodeID, req
return t.sendAppResponseFn(nodeID, requestID, message)
}

func (t testAppSender) SendAppGossip(_ context.Context, message []byte, numValidators int, numNonValidators int, numPeers int) error {
return t.sendAppGossipFn(message, numValidators, numNonValidators, numPeers)
func (t testAppSender) SendAppGossip(_ context.Context, config common.SendConfig, message []byte) error {
return t.sendAppGossipFn(config, message)
}

func (t testAppSender) SendAppError(ctx context.Context, nodeID ids.NodeID, requestID uint32, errorCode int32, errorMessage string) error {
Expand Down
21 changes: 21 additions & 0 deletions plugin/evm/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ const (
defaultMaxBlocksPerRequest = 0 // Default to no maximum on the number of blocks per getLogs request
defaultContinuousProfilerFrequency = 15 * time.Minute
defaultContinuousProfilerMaxFiles = 5
defaultPushGossipPercentStake = .9
defaultPushGossipNumValidators = 100
defaultPushGossipNumPeers = 0
defaultPushRegossipNumValidators = 10
Expand Down Expand Up @@ -154,6 +155,7 @@ type Config struct {
KeystoreInsecureUnlockAllowed bool `json:"keystore-insecure-unlock-allowed"`

// Gossip Settings
<<<<<<< HEAD
PushGossipNumValidators int `json:"push-gossip-num-validators"`
PushGossipNumPeers int `json:"push-gossip-num-peers"`
PushRegossipNumValidators int `json:"push-regossip-num-validators"`
Expand All @@ -162,6 +164,17 @@ type Config struct {
PullGossipFrequency Duration `json:"pull-gossip-frequency"`
RegossipFrequency Duration `json:"regossip-frequency"`
PriorityRegossipAddresses []common.Address `json:"priority-regossip-addresses"`
=======
PushGossipPercentStake float64 `json:"push-gossip-percent-stake"`
PushGossipNumValidators int `json:"push-gossip-num-validators"`
PushGossipNumPeers int `json:"push-gossip-num-peers"`
PushRegossipNumValidators int `json:"push-regossip-num-validators"`
PushRegossipNumPeers int `json:"push-regossip-num-peers"`
PushGossipFrequency Duration `json:"push-gossip-frequency"`
PullGossipFrequency Duration `json:"pull-gossip-frequency"`
RegossipFrequency Duration `json:"regossip-frequency"`
TxRegossipFrequency Duration `json:"tx-regossip-frequency"` // Deprecated: use RegossipFrequency instead
>>>>>>> 16cf2556ea (Integrate stake weighted gossip selection (#511))

// Log
LogLevel string `json:"log-level"`
Expand Down Expand Up @@ -260,6 +273,11 @@ func (c *Config) SetDefaults() {
c.AcceptorQueueLimit = defaultAcceptorQueueLimit
c.CommitInterval = defaultCommitInterval
c.SnapshotWait = defaultSnapshotWait
<<<<<<< HEAD
=======
c.RegossipFrequency.Duration = defaultTxRegossipFrequency
c.PushGossipPercentStake = defaultPushGossipPercentStake
>>>>>>> 16cf2556ea (Integrate stake weighted gossip selection (#511))
c.PushGossipNumValidators = defaultPushGossipNumValidators
c.PushGossipNumPeers = defaultPushGossipNumPeers
c.PushRegossipNumValidators = defaultPushRegossipNumValidators
Expand Down Expand Up @@ -317,5 +335,8 @@ func (c *Config) Validate() error {
return fmt.Errorf("cannot use commit interval of 0 with pruning enabled")
}

if c.PushGossipPercentStake < 0 || c.PushGossipPercentStake > 1 {
return fmt.Errorf("push-gossip-percent-stake is %f but must be in the range [0, 1]", c.PushGossipPercentStake)
}
return nil
}
197 changes: 197 additions & 0 deletions plugin/evm/gossiper_atomic_gossiping_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,197 @@
// (c) 2019-2021, Ava Labs, Inc. All rights reserved.
// See the file LICENSE for licensing terms.

package evm

import (
"context"
"os"
"sync"
"testing"
"time"

"github.com/ava-labs/avalanchego/ids"
"github.com/ava-labs/avalanchego/utils/set"
"github.com/stretchr/testify/assert"

commonEng "github.com/ava-labs/avalanchego/snow/engine/common"

"github.com/ava-labs/coreth/plugin/evm/message"
)

// show that a txID discovered from gossip is requested to the same node only if
// the txID is unknown
func TestMempoolAtmTxsAppGossipHandling(t *testing.T) {
assert := assert.New(t)

_, vm, _, sharedMemory, sender := GenesisVM(t, true, "", "", "")
defer func() {
assert.NoError(vm.Shutdown(context.Background()))
}()

nodeID := ids.GenerateTestNodeID()

var (
txGossiped int
txGossipedLock sync.Mutex
txRequested bool
)
sender.CantSendAppGossip = false
sender.SendAppGossipF = func(context.Context, commonEng.SendConfig, []byte) error {
txGossipedLock.Lock()
defer txGossipedLock.Unlock()

txGossiped++
return nil
}
sender.SendAppRequestF = func(context.Context, set.Set[ids.NodeID], uint32, []byte) error {
txRequested = true
return nil
}

// Create conflicting transactions
importTxs := createImportTxOptions(t, vm, sharedMemory)
tx, conflictingTx := importTxs[0], importTxs[1]

// gossip tx and check it is accepted and gossiped
msg := message.AtomicTxGossip{
Tx: tx.SignedBytes(),
}
msgBytes, err := message.BuildGossipMessage(vm.networkCodec, msg)
assert.NoError(err)

vm.ctx.Lock.Unlock()

// show that no txID is requested
assert.NoError(vm.AppGossip(context.Background(), nodeID, msgBytes))
time.Sleep(500 * time.Millisecond)

vm.ctx.Lock.Lock()

assert.False(txRequested, "tx should not have been requested")
txGossipedLock.Lock()
assert.Equal(0, txGossiped, "tx should not have been gossiped")
txGossipedLock.Unlock()
assert.True(vm.mempool.has(tx.ID()))

vm.ctx.Lock.Unlock()

// show that tx is not re-gossiped
assert.NoError(vm.AppGossip(context.Background(), nodeID, msgBytes))

vm.ctx.Lock.Lock()

txGossipedLock.Lock()
assert.Equal(0, txGossiped, "tx should not have been gossiped")
txGossipedLock.Unlock()

// show that conflicting tx is not added to mempool
msg = message.AtomicTxGossip{
Tx: conflictingTx.SignedBytes(),
}
msgBytes, err = message.BuildGossipMessage(vm.networkCodec, msg)
assert.NoError(err)

vm.ctx.Lock.Unlock()

assert.NoError(vm.AppGossip(context.Background(), nodeID, msgBytes))

vm.ctx.Lock.Lock()

assert.False(txRequested, "tx should not have been requested")
txGossipedLock.Lock()
assert.Equal(0, txGossiped, "tx should not have been gossiped")
txGossipedLock.Unlock()
assert.False(vm.mempool.has(conflictingTx.ID()), "conflicting tx should not be in the atomic mempool")
}

// show that txs already marked as invalid are not re-requested on gossiping
func TestMempoolAtmTxsAppGossipHandlingDiscardedTx(t *testing.T) {
if os.Getenv("RUN_FLAKY_TESTS") != "true" {
t.Skip("FLAKY")
}
assert := assert.New(t)

_, vm, _, sharedMemory, sender := GenesisVM(t, true, "", "", "")
defer func() {
assert.NoError(vm.Shutdown(context.Background()))
}()
mempool := vm.mempool

var (
txGossiped int
txGossipedLock sync.Mutex
txRequested bool
)
sender.CantSendAppGossip = false
sender.SendAppGossipF = func(context.Context, commonEng.SendConfig, []byte) error {
txGossipedLock.Lock()
defer txGossipedLock.Unlock()

txGossiped++
return nil
}
sender.SendAppRequestF = func(context.Context, set.Set[ids.NodeID], uint32, []byte) error {
txRequested = true
return nil
}

// Create a transaction and mark it as invalid by discarding it
importTxs := createImportTxOptions(t, vm, sharedMemory)
tx, conflictingTx := importTxs[0], importTxs[1]
txID := tx.ID()

mempool.AddTx(tx)
mempool.NextTx()
mempool.DiscardCurrentTx(txID)

// Check the mempool does not contain the discarded transaction
assert.False(mempool.has(txID))

// Gossip the transaction to the VM and ensure that it is not added to the mempool
// and is not re-gossipped.
nodeID := ids.GenerateTestNodeID()
msg := message.AtomicTxGossip{
Tx: tx.SignedBytes(),
}
msgBytes, err := message.BuildGossipMessage(vm.networkCodec, msg)
assert.NoError(err)

vm.ctx.Lock.Unlock()

assert.NoError(vm.AppGossip(context.Background(), nodeID, msgBytes))

vm.ctx.Lock.Lock()

assert.False(txRequested, "tx shouldn't be requested")
txGossipedLock.Lock()
assert.Zero(txGossiped, "tx should not have been gossiped")
txGossipedLock.Unlock()

assert.False(mempool.has(txID))

// Gossip the transaction that conflicts with the originally
// discarded tx and ensure it is accepted into the mempool and gossipped
// to the network.
nodeID = ids.GenerateTestNodeID()
msg = message.AtomicTxGossip{
Tx: conflictingTx.SignedBytes(),
}
msgBytes, err = message.BuildGossipMessage(vm.networkCodec, msg)
assert.NoError(err)

vm.ctx.Lock.Unlock()

assert.NoError(vm.AppGossip(context.Background(), nodeID, msgBytes))
time.Sleep(500 * time.Millisecond)

vm.ctx.Lock.Lock()

assert.False(txRequested, "tx shouldn't be requested")
txGossipedLock.Lock()
assert.Equal(1, txGossiped, "conflicting tx should have been gossiped")
txGossipedLock.Unlock()

assert.False(mempool.has(txID))
assert.True(mempool.has(conflictingTx.ID()))
}
4 changes: 3 additions & 1 deletion plugin/evm/gossiper_eth_gossiping_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ import (
"github.com/ava-labs/avalanchego/ids"
"github.com/ava-labs/avalanchego/utils/set"

commonEng "github.com/ava-labs/avalanchego/snow/engine/common"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/rlp"
Expand Down Expand Up @@ -105,7 +107,7 @@ func TestMempoolEthTxsAppGossipHandling(t *testing.T) {
return nil
}
wg.Add(1)
sender.SendAppGossipF = func(context.Context, []byte, int, int, int) error {
sender.SendAppGossipF = func(context.Context, commonEng.SendConfig, []byte) error {
wg.Done()
return nil
}
Expand Down
2 changes: 1 addition & 1 deletion plugin/evm/syncervm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ func TestStateSyncToggleEnabledToDisabled(t *testing.T) {

syncDisabledVM := &VM{}
appSender := &commonEng.SenderTest{T: t}
appSender.SendAppGossipF = func(context.Context, []byte, int, int, int) error { return nil }
appSender.SendAppGossipF = func(context.Context, commonEng.SendConfig, []byte) error { return nil }
appSender.SendAppRequestF = func(ctx context.Context, nodeSet set.Set[ids.NodeID], requestID uint32, request []byte) error {
nodeID, hasItem := nodeSet.Pop()
if !hasItem {
Expand Down
Loading

0 comments on commit 0e0870f

Please sign in to comment.