Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

send transactions directly to static p2p peers #174

Draft
wants to merge 3 commits into
base: develop
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion eth/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,13 @@ import (
"context"
"errors"
"fmt"
"github.com/ethereum/go-ethereum/core/txpool/bundlepool"
"math/big"
"runtime"
"sync"
"time"

"github.com/ethereum/go-ethereum/core/txpool/bundlepool"

"github.com/ethereum/go-ethereum/accounts"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
Expand Down Expand Up @@ -308,6 +309,7 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) {
// Permit the downloader to use the trie cache allowance during fast sync
cacheLimit := cacheConfig.TrieCleanLimit + cacheConfig.TrieDirtyLimit + cacheConfig.SnapshotLimit
if eth.handler, err = newHandler(&handlerConfig{
StaticNodes: stack.Config().P2P.StaticNodes,
Database: chainDb,
Chain: eth.blockchain,
TxPool: eth.txPool,
Expand Down
34 changes: 32 additions & 2 deletions eth/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import (
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/metrics"
"github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/triedb/pathdb"
)

Expand Down Expand Up @@ -88,6 +89,7 @@ type txPool interface {
// handlerConfig is the collection of initialization parameters to create a full
// node network handler.
type handlerConfig struct {
StaticNodes []*enode.Node
Database ethdb.Database // Database for direct sync insertions
Chain *core.BlockChain // Blockchain to serve data from
TxPool txPool // Transaction pool to propagate from
Expand Down Expand Up @@ -137,6 +139,8 @@ type handler struct {

handlerStartCh chan struct{}
handlerDoneCh chan struct{}

staticNodes map[string]struct{}
}

// newHandler returns a handler for all Ethereum chain management protocol.
Expand All @@ -159,7 +163,12 @@ func newHandler(config *handlerConfig) (*handler, error) {
quitSync: make(chan struct{}),
handlerDoneCh: make(chan struct{}),
handlerStartCh: make(chan struct{}),
staticNodes: make(map[string]struct{}),
}
for _, node := range config.StaticNodes {
h.staticNodes[node.ID().String()] = struct{}{}
}

if config.Sync == downloader.FullSync {
// The database seems empty as the current block is the genesis. Yet the snap
// block is ahead, so snap sync was enabled for this node at a certain point.
Expand Down Expand Up @@ -642,21 +651,42 @@ func (h *handler) BroadcastTransactions(txs types.Transactions) {
peers := h.peers.peersWithoutTransaction(tx.Hash())

var numDirect int
var direct, announce []*ethPeer = nil, peers
switch {
case tx.Type() == types.BlobTxType:
blobTxs++
case tx.Size() > txMaxBroadcastSize:
largeTxs++
default:
numDirect = int(math.Sqrt(float64(len(peers))))
// Split the peers into direct-peers and announce-peers
// we send the tx directly to direct-peers; all static nodes are direct-peers
// we announce the tx to announce-peers
direct = make([]*ethPeer, 0, numDirect)
announce = make([]*ethPeer, 0, len(peers)-numDirect)
for _, peer := range peers {
if _, ok := h.staticNodes[peer.ID()]; ok {
direct = append(direct, peer)
} else {
announce = append(announce, peer)
}
}

// if directly-peers are not enough, move some announce-peers into directly pool
for len(direct) < numDirect && len(announce) > 0 {
// shift one peer to trusted
direct = append(direct, announce[0])
announce = announce[1:]
}
}

// Send the tx unconditionally to a subset of our peers
for _, peer := range peers[:numDirect] {
for _, peer := range direct {
txset[peer] = append(txset[peer], tx.Hash())
log.Trace("Broadcast transaction", "peer", peer.ID(), "hash", tx.Hash())
}
// For the remaining peers, send announcement only
for _, peer := range peers[numDirect:] {
for _, peer := range announce {
annos[peer] = append(annos[peer], tx.Hash())
log.Trace("Announce transaction", "peer", peer.ID(), "hash", tx.Hash())
}
Expand Down
Loading