From 782dd2f920db9baaa185960a286ac2043d2595e3 Mon Sep 17 00:00:00 2001 From: Calvin Kim Date: Tue, 6 Aug 2024 17:53:22 +0900 Subject: [PATCH 01/11] peer: make peer meet query.Peer interface query.Peer is used for downloading blocks out of order during headers first download. Methods SubscribeRecvMsg() and OnDisconnect() are added to abide by the interface. --- peer/peer.go | 33 +++++++++++++++++++++++++++++++++ 1 file changed, 33 insertions(+) diff --git a/peer/peer.go b/peer/peer.go index 195fc0b4fe..bffc9f5b13 100644 --- a/peer/peer.go +++ b/peer/peer.go @@ -494,6 +494,10 @@ type Peer struct { queueQuit chan struct{} outQuit chan struct{} quit chan struct{} + + // subscribers is a channel for relaying all messages that were received + // to this peer. + subscribers []chan wire.Message } // String returns the peer's address and directionality as a human-readable @@ -1098,6 +1102,24 @@ func (p *Peer) readMessage(encoding wire.MessageEncoding) (wire.Message, []byte, return msg, buf, nil } +// SubscribeRecvMsg adds a OnRead subscription to the peer. All bitcoin +// messages received from this peer will be sent on the returned +// channel. A closure is also returned, that should be called to cancel +// the subscription. +func (p *Peer) SubscribeRecvMsg() (<-chan wire.Message, func()) { + msgChan := make(chan wire.Message, 1) + p.subscribers = append(p.subscribers, msgChan) + + // Cancellation is just removing the channel from the subscribers list. + idx := len(p.subscribers) - 1 + cancel := func() { + p.subscribers = append(p.subscribers[:idx], + p.subscribers[idx+1:]...) + } + + return msgChan, cancel +} + // writeMessage sends a bitcoin message to the peer with logging. func (p *Peer) writeMessage(msg wire.Message, enc wire.MessageEncoding) error { // Don't do anything if we're disconnecting. @@ -1402,6 +1424,10 @@ out: // needed. rmsg, buf, err := p.readMessage(p.wireEncoding) idleTimer.Stop() + // Send the received message to all the subscribers. + for _, sub := range p.subscribers { + sub <- rmsg + } if err != nil { // In order to allow regression tests with malformed messages, don't // disconnect the peer when we're in regression test mode and the @@ -1446,6 +1472,7 @@ out: } break out } + atomic.StoreInt64(&p.lastRecv, time.Now().Unix()) p.stallControl <- stallControlMsg{sccReceiveMessage, rmsg} @@ -1961,6 +1988,12 @@ func (p *Peer) Disconnect() { close(p.quit) } +// OnDisconnect returns a channel that will be closed when this peer is +// disconnected. +func (p *Peer) OnDisconnect() <-chan struct{} { + return p.quit +} + // readRemoteVersionMsg waits for the next message to arrive from the remote // peer. If the next message is not a version message or the version is not // acceptable then return an error. From 510083d7f80fdd89db9c6f2f2e2c1fded2a06c64 Mon Sep 17 00:00:00 2001 From: Calvin Kim Date: Tue, 6 Aug 2024 17:56:33 +0900 Subject: [PATCH 02/11] main: add ConnectedPeers() to server ConnectedPeers returns all the currently connected peers. This is used to provide the query.WorkManager with all the currently connected peers from the netsync package. --- server.go | 28 ++++++++++++++++++++++++++++ 1 file changed, 28 insertions(+) diff --git a/server.go b/server.go index 66794e4bb7..52d3459023 100644 --- a/server.go +++ b/server.go @@ -1982,6 +1982,34 @@ type removeNodeMsg struct { reply chan error } +// ConnectedPeers returns an array consisting of all connected peers. +func (s *server) ConnectedPeers() []*peer.Peer { + replyChan := make(chan []*serverPeer, 1) + + // Send a query for a subscription for the connected peers. + select { + case s.query <- getPeersMsg{ + reply: replyChan, + }: + + case <-s.quit: + return nil + } + + // Wait for the result here. + select { + case reply := <-replyChan: + peers := make([]*peer.Peer, 0, len(reply)) + for _, sp := range reply { + peers = append(peers, sp.Peer) + } + + return peers + case <-s.quit: + return nil + } +} + // handleQuery is the central handler for all queries and commands from other // goroutines related to peer state. func (s *server) handleQuery(state *peerState, querymsg interface{}) { From eebcd60c66a090ea1d083c8956fb52974508600c Mon Sep 17 00:00:00 2001 From: Calvin Kim Date: Wed, 7 Aug 2024 15:20:24 +0900 Subject: [PATCH 03/11] netsync: add checkpointedBlocksQuery checkpointedBlocksQuery is a helper to create []*query.Request which can be passed off to query.Workmanager to query for wire.Messages to multiple peers. This is useful for downloading blocks out of order from multiple peers during ibd. --- go.mod | 1 + go.sum | 2 ++ netsync/manager.go | 69 ++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 72 insertions(+) diff --git a/go.mod b/go.mod index 1f445d9065..3bd36f8652 100644 --- a/go.mod +++ b/go.mod @@ -14,6 +14,7 @@ require ( github.com/gorilla/websocket v1.5.0 github.com/jessevdk/go-flags v1.4.0 github.com/jrick/logrotate v1.0.0 + github.com/lightninglabs/neutrino v0.16.0 github.com/stretchr/testify v1.8.4 github.com/syndtr/goleveldb v1.0.1-0.20210819022825-2ae1ddf74ef7 golang.org/x/crypto v0.22.0 diff --git a/go.sum b/go.sum index bb666c89de..3f3b346498 100644 --- a/go.sum +++ b/go.sum @@ -63,6 +63,8 @@ github.com/jrick/logrotate v1.0.0 h1:lQ1bL/n9mBNeIXoTUoYRlK4dHuNJVofX9oWqBtPnSzI github.com/jrick/logrotate v1.0.0/go.mod h1:LNinyqDIJnpAur+b8yyulnQw/wDuN1+BYKlTRt3OuAQ= github.com/kkdai/bstream v0.0.0-20161212061736-f391b8402d23 h1:FOOIBWrEkLgmlgGfMuZT83xIwfPDxEI2OHu6xUmJMFE= github.com/kkdai/bstream v0.0.0-20161212061736-f391b8402d23/go.mod h1:J+Gs4SYgM6CZQHDETBtE9HaSEkGmuNXF86RwHhHUvq4= +github.com/lightninglabs/neutrino v0.16.0 h1:YNTQG32fPR/Zg0vvJVI65OBH8l3U18LSXXtX91hx0q0= +github.com/lightninglabs/neutrino v0.16.0/go.mod h1:x3OmY2wsA18+Kc3TSV2QpSUewOCiscw2mKpXgZv2kZk= github.com/nxadm/tail v1.4.4 h1:DQuhQpB1tVlglWS2hLQ5OV6B5r8aGxSrPc5Qo6uTN78= github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A= github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= diff --git a/netsync/manager.go b/netsync/manager.go index 3215a86ace..58515a19fd 100644 --- a/netsync/manager.go +++ b/netsync/manager.go @@ -20,6 +20,7 @@ import ( "github.com/btcsuite/btcd/mempool" peerpkg "github.com/btcsuite/btcd/peer" "github.com/btcsuite/btcd/wire" + "github.com/lightninglabs/neutrino/query" ) const ( @@ -173,6 +174,74 @@ func limitAdd(m map[chainhash.Hash]struct{}, hash chainhash.Hash, limit int) { m[hash] = struct{}{} } +// checkpointedBlocksQuery is a helper to construct query.Requests for GetData +// messages. +type checkpointedBlocksQuery struct { + msg *wire.MsgGetData + blocks map[chainhash.Hash]struct{} +} + +// newCheckpointedBlocksQuery returns an initialized newCheckpointedBlocksQuery. +func newCheckpointedBlocksQuery(msg *wire.MsgGetData) checkpointedBlocksQuery { + m := make(map[chainhash.Hash]struct{}, len(msg.InvList)) + for _, inv := range msg.InvList { + m[inv.Hash] = struct{}{} + } + return checkpointedBlocksQuery{msg, m} +} + +// handleResponse returns that the progress is progressed and finished if the +// received wire.Message is a MsgBlock. +func (c *checkpointedBlocksQuery) handleResponse(req, resp wire.Message, + peerAddr string) query.Progress { + + block, ok := resp.(*wire.MsgBlock) + if !ok { + // We are only looking for block messages. + return query.Progress{ + Finished: false, + Progressed: false, + } + } + + // If we didn't find this block in the map of blocks we're expecting, + // we're neither finished nor progressed. + hash := block.BlockHash() + _, found := c.blocks[hash] + if !found { + return query.Progress{ + Finished: false, + Progressed: false, + } + } + delete(c.blocks, hash) + + // If we have blocks we're expecting, we've progressed but not finished. + if len(c.blocks) > 0 { + return query.Progress{ + Finished: false, + Progressed: true, + } + } + + // We have no more blocks we're expecting from heres so we're finished. + return query.Progress{ + Finished: true, + Progressed: true, + } +} + +// requests returns a slice of query.Request that can be queued to +// query.WorkManager. +func (c *checkpointedBlocksQuery) requests() []*query.Request { + req := &query.Request{ + Req: c.msg, + HandleResp: c.handleResponse, + } + + return []*query.Request{req} +} + // SyncManager is used to communicate block related messages with peers. The // SyncManager is started as by executing Start() in a goroutine. Once started, // it selects peers to sync from and starts the initial block download. Once the From e3e450f819530cf422507dc1d2f2bc3f7bbef560 Mon Sep 17 00:00:00 2001 From: Calvin Kim Date: Wed, 7 Aug 2024 16:11:52 +0900 Subject: [PATCH 04/11] netsync: refactor handleBlockMsg handleBlockMsg used to check that the block header is both valid and then process the blocks as they come in. It's now refactored so that it also handles blocks that are not in order. For out of order block downloads handleBlockMsg would mark the block as an orphan but it's now refactored to handle those cases. Whenever a block that's not the next from the chain tip is received, it's now temporarily stored in memory until the next block from the chain tip is received. And then all the blocks that are in sequence are processed. --- netsync/manager.go | 209 +++++++++++++++++++++++++++++++++------------ 1 file changed, 156 insertions(+), 53 deletions(-) diff --git a/netsync/manager.go b/netsync/manager.go index 58515a19fd..413732eb28 100644 --- a/netsync/manager.go +++ b/netsync/manager.go @@ -268,10 +268,12 @@ type SyncManager struct { lastProgressTime time.Time // The following fields are used for headers-first mode. - headersFirstMode bool - headerList *list.List - startHeader *list.Element - nextCheckpoint *chaincfg.Checkpoint + headersFirstMode bool + headerList *list.List + startHeader *list.Element + nextCheckpoint *chaincfg.Checkpoint + queuedBlocks map[chainhash.Hash]*blockMsg + queuedBlocksPrevHash map[chainhash.Hash]chainhash.Hash // An optional fee estimator. feeEstimator *mempool.FeeEstimator @@ -754,30 +756,10 @@ func (sm *SyncManager) current() bool { return true } -// handleBlockMsg handles block messages from all peers. -func (sm *SyncManager) handleBlockMsg(bmsg *blockMsg) { +// processBlock checks if the block connects to the best chain. +func (sm *SyncManager) processBlock(bmsg *blockMsg) (bool, error) { peer := bmsg.peer - state, exists := sm.peerStates[peer] - if !exists { - log.Warnf("Received block message from unknown peer %s", peer) - return - } - - // If we didn't ask for this block then the peer is misbehaving. blockHash := bmsg.block.Hash() - if _, exists = state.requestedBlocks[*blockHash]; !exists { - // The regression test intentionally sends some blocks twice - // to test duplicate block insertion fails. Don't disconnect - // the peer or ignore the block when we're in regression test - // mode in this case so the chain code is actually fed the - // duplicate blocks. - if sm.chainParams != &chaincfg.RegressionNetParams { - log.Warnf("Got unrequested block %v from %s -- "+ - "disconnecting", blockHash, peer.Addr()) - peer.Disconnect() - return - } - } // When in headers-first mode, if the block matches the hash of the // first header in the list of headers that are being fetched, it's @@ -803,12 +785,6 @@ func (sm *SyncManager) handleBlockMsg(bmsg *blockMsg) { } } - // Remove block from request maps. Either chain will know about it and - // so we shouldn't have any more instances of trying to fetch it, or we - // will fail the insert and thus we'll retry next time we get an inv. - delete(state.requestedBlocks, *blockHash) - delete(sm.requestedBlocks, *blockHash) - // Process the block to include validation, best chain selection, orphan // handling, etc. _, isOrphan, err := sm.chain.ProcessBlock(bmsg.block, behaviorFlags) @@ -833,7 +809,7 @@ func (sm *SyncManager) handleBlockMsg(bmsg *blockMsg) { // send it. code, reason := mempool.ErrToRejectErr(err) peer.PushRejectMsg(wire.CmdBlock, code, reason, blockHash, false) - return + return false, err } // Meta-data about the new block this peer is reporting. We use this @@ -909,22 +885,143 @@ func (sm *SyncManager) handleBlockMsg(bmsg *blockMsg) { } } + return isCheckpointBlock, nil +} + +// handleBlockMsg handles block messages from all peers. +func (sm *SyncManager) handleBlockMsg(bmsg *blockMsg) { + peer := bmsg.peer + state, exists := sm.peerStates[peer] + if !exists { + log.Warnf("Received block message from unknown peer %s", peer) + return + } + + // If we didn't ask for this block then the peer is misbehaving. + blockHash := bmsg.block.Hash() + if _, exists := state.requestedBlocks[*blockHash]; !exists { + // The regression test intentionally sends some blocks twice + // to test duplicate block insertion fails. Don't disconnect + // the peer or ignore the block when we're in regression test + // mode in this case so the chain code is actually fed the + // duplicate blocks. + if sm.chainParams != &chaincfg.RegressionNetParams { + log.Warnf("Got unrequested block %v from %s."+ + "this peer may be a stalling peer -- disconnecting", + blockHash, peer.Addr()) + peer.Disconnect() + return + } + } + + // Remove block from request maps. Either chain will know about it and + // so we shouldn't have any more instances of trying to fetch it or we + // will fail the insert and thus we'll retry next time we get an inv. + delete(sm.requestedBlocks, *blockHash) + delete(state.requestedBlocks, *blockHash) + + _, err := sm.processBlock(bmsg) + if err != nil { + return + } + // If we are not in headers first mode, it's a good time to periodically // flush the blockchain cache because we don't expect new blocks immediately. // After that, there is nothing more to do. - if !sm.headersFirstMode { - if err := sm.chain.FlushUtxoCache(blockchain.FlushPeriodic); err != nil { - log.Errorf("Error while flushing the blockchain cache: %v", err) - } + if err := sm.chain.FlushUtxoCache(blockchain.FlushPeriodic); err != nil { + log.Errorf("Error while flushing the blockchain cache: %v", err) + } +} + +// handleBlockMsgInHeadersFirst handles block messages from all peers when the +// sync manager is in headers first mode. For blocks received out of order, it +// first keeps them in memory and sends them to be processed when the next block +// from the tip is available. +func (sm *SyncManager) handleBlockMsgInHeadersFirst(bmsg *blockMsg) { + blockHash := bmsg.block.Hash() + peer := bmsg.peer + _, exists := sm.peerStates[peer] + if !exists { + log.Warnf("Received block message from unknown peer %s", peer) return } + _, exists = sm.requestedBlocks[*blockHash] + if !exists { + // The regression test intentionally sends some blocks twice + // to test duplicate block insertion fails. Don't disconnect + // the peer or ignore the block when we're in regression test + // mode in this case so the chain code is actually fed the + // duplicate blocks. + if sm.chainParams != &chaincfg.RegressionNetParams { + log.Warnf("Got unrequested block %v from %s."+ + "this peer may be a stalling peer", + blockHash, peer.Addr()) + //peer.Disconnect() + return + } + } + + // Add the block to the queue. + sm.queuedBlocks[*blockHash] = bmsg + sm.queuedBlocksPrevHash[bmsg.block.MsgBlock().Header.PrevBlock] = *blockHash + + // Remove block from the request map. Either chain will know about it + // and so we shouldn't have any more instances of trying to fetch it, we + // keep it in the queued blocks map, or we will fail the insert and thus + // we'll retry next time we get an inv. + delete(sm.requestedBlocks, *blockHash) + + // Since we may receive blocks out of order, attempt to find the next block + // and any other descendent blocks that connect to it. + processBlocks := make([]*blockMsg, 0, 1024) + + bestHash := sm.chain.BestSnapshot().Hash + for len(sm.queuedBlocks) > 0 { + hash, found := sm.queuedBlocksPrevHash[bestHash] + if !found { + break + } + + b, found := sm.queuedBlocks[hash] + if !found { + // Break when we're missing the next block in + // sequence. + break + } + + // Append the block to be processed and delete from the + // queue. + delete(sm.queuedBlocks, hash) + delete(sm.queuedBlocksPrevHash, bestHash) + processBlocks = append(processBlocks, b) + bestHash = hash + } + + var isCheckpointBlock bool + if len(processBlocks) > 0 { + for _, blockMsg := range processBlocks { + var err error + isCheckpointBlock, err = sm.processBlock(blockMsg) + if err != nil { + return + } + + // This is headers-first mode, so if the block is not a + // checkpoint, request more blocks using the header list + // when the request queue is getting short. + if !isCheckpointBlock && sm.startHeader != nil { + sm.fetchHeaderBlocks() + } + } + } + // This is headers-first mode, so if the block is not a checkpoint // request more blocks using the header list when the request queue is // getting short. if !isCheckpointBlock { if sm.startHeader != nil && - len(state.requestedBlocks) < minInFlightBlocks { + len(sm.requestedBlocks) < minInFlightBlocks { sm.fetchHeaderBlocks() } return @@ -958,7 +1055,7 @@ func (sm *SyncManager) handleBlockMsg(bmsg *blockMsg) { sm.headerList.Init() log.Infof("Reached the final checkpoint -- switching to normal mode") locator := blockchain.BlockLocator([]*chainhash.Hash{blockHash}) - err = peer.PushGetBlocksMsg(locator, &zeroHash) + err := peer.PushGetBlocksMsg(locator, &zeroHash) if err != nil { log.Warnf("Failed to send getblocks message to peer %s: %v", peer.Addr(), err) @@ -1431,7 +1528,11 @@ out: msg.reply <- struct{}{} case *blockMsg: - sm.handleBlockMsg(msg) + if sm.headersFirstMode { + sm.handleBlockMsgInHeadersFirst(msg) + } else { + sm.handleBlockMsg(msg) + } msg.reply <- struct{}{} case *invMsg: @@ -1739,19 +1840,21 @@ func (sm *SyncManager) Pause() chan<- struct{} { // block, tx, and inv updates. func New(config *Config) (*SyncManager, error) { sm := SyncManager{ - peerNotifier: config.PeerNotifier, - chain: config.Chain, - txMemPool: config.TxMemPool, - chainParams: config.ChainParams, - rejectedTxns: make(map[chainhash.Hash]struct{}), - requestedTxns: make(map[chainhash.Hash]struct{}), - requestedBlocks: make(map[chainhash.Hash]struct{}), - peerStates: make(map[*peerpkg.Peer]*peerSyncState), - progressLogger: newBlockProgressLogger("Processed", log), - msgChan: make(chan interface{}, config.MaxPeers*3), - headerList: list.New(), - quit: make(chan struct{}), - feeEstimator: config.FeeEstimator, + peerNotifier: config.PeerNotifier, + chain: config.Chain, + txMemPool: config.TxMemPool, + chainParams: config.ChainParams, + rejectedTxns: make(map[chainhash.Hash]struct{}), + requestedTxns: make(map[chainhash.Hash]struct{}), + requestedBlocks: make(map[chainhash.Hash]struct{}), + peerStates: make(map[*peerpkg.Peer]*peerSyncState), + progressLogger: newBlockProgressLogger("Processed", log), + msgChan: make(chan interface{}, config.MaxPeers*3), + headerList: list.New(), + quit: make(chan struct{}), + queuedBlocks: make(map[chainhash.Hash]*blockMsg), + queuedBlocksPrevHash: make(map[chainhash.Hash]chainhash.Hash), + feeEstimator: config.FeeEstimator, } best := sm.chain.BestSnapshot() From 28d9f2bd689a0ac97bf2f7c683295f78ad25cf46 Mon Sep 17 00:00:00 2001 From: Calvin Kim Date: Wed, 7 Aug 2024 16:27:07 +0900 Subject: [PATCH 05/11] netsync: add peerSubscription peerSubscription is added to Manager which will allow it subscribers to receive peers through the channel whenever the Manager is aware of a new peer that it's been connected to. This is useful to alert query.Workmanager that a new peer that's been connected to is eligible to download blocks from. --- netsync/manager.go | 40 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 40 insertions(+) diff --git a/netsync/manager.go b/netsync/manager.go index 413732eb28..0e8b98c22d 100644 --- a/netsync/manager.go +++ b/netsync/manager.go @@ -274,6 +274,7 @@ type SyncManager struct { nextCheckpoint *chaincfg.Checkpoint queuedBlocks map[chainhash.Hash]*blockMsg queuedBlocksPrevHash map[chainhash.Hash]chainhash.Hash + peerSubscribers []*peerSubscription // An optional fee estimator. feeEstimator *mempool.FeeEstimator @@ -523,6 +524,31 @@ func (sm *SyncManager) isSyncCandidate(peer *peerpkg.Peer) bool { return true } +// notifyPeerSubscribers notifies all the current peer subscribers of the peer +// that was passed in. +func (sm *SyncManager) notifyPeerSubscribers(peer *peerpkg.Peer) { + // Loop for alerting subscribers to the new peer that was connected to. + n := 0 + for i, sub := range sm.peerSubscribers { + select { + // Quickly check whether this subscription has been canceled. + case <-sub.cancel: + // Avoid GC leak. + sm.peerSubscribers[i] = nil + continue + default: + } + + // Keep non-canceled subscribers around. + sm.peerSubscribers[n] = sub + n++ + + sub.peers <- peer + } + // Re-align the slice to only active subscribers. + sm.peerSubscribers = sm.peerSubscribers[:n] +} + // handleNewPeerMsg deals with new peers that have signalled they may // be considered as a sync peer (they have already successfully negotiated). It // also starts syncing if needed. It is invoked from the syncHandler goroutine. @@ -542,6 +568,13 @@ func (sm *SyncManager) handleNewPeerMsg(peer *peerpkg.Peer) { requestedBlocks: make(map[chainhash.Hash]struct{}), } + // Only pass the peer off to the subscribers if we're able to sync off of + // the peer. + bestHeight := sm.chain.BestSnapshot().Height + if isSyncCandidate && peer.LastBlock() > bestHeight { + sm.notifyPeerSubscribers(peer) + } + // Start syncing by choosing the best candidate if needed. if isSyncCandidate && sm.syncPeer == nil { sm.startSync() @@ -1836,6 +1869,13 @@ func (sm *SyncManager) Pause() chan<- struct{} { return c } +// peerSubscription holds a peer subscription which we'll notify about any +// connected peers. +type peerSubscription struct { + peers chan<- query.Peer + cancel <-chan struct{} +} + // New constructs a new SyncManager. Use Start to begin processing asynchronous // block, tx, and inv updates. func New(config *Config) (*SyncManager, error) { From ee0f49d0ce15716aafc154f6f542ea5b8ce72149 Mon Sep 17 00:00:00 2001 From: Calvin Kim Date: Wed, 7 Aug 2024 16:35:40 +0900 Subject: [PATCH 06/11] netsync, main: add ConnectedPeers to Manager ConnectedPeers returns all the currently connected peers and any new peer that's additionally connected through the returned channel. This method is required for query.Workmanager as it needs ot receive peers that it can request blocks from. --- netsync/interface.go | 3 +++ netsync/manager.go | 25 +++++++++++++++++++++++++ server.go | 1 + 3 files changed, 29 insertions(+) diff --git a/netsync/interface.go b/netsync/interface.go index 6a873bd888..cd848adb1a 100644 --- a/netsync/interface.go +++ b/netsync/interface.go @@ -38,4 +38,7 @@ type Config struct { MaxPeers int FeeEstimator *mempool.FeeEstimator + + // ConnectedPeers returns all the currently connected peers. + ConnectedPeers func() []*peer.Peer } diff --git a/netsync/manager.go b/netsync/manager.go index 0e8b98c22d..eee9668cee 100644 --- a/netsync/manager.go +++ b/netsync/manager.go @@ -275,6 +275,7 @@ type SyncManager struct { queuedBlocks map[chainhash.Hash]*blockMsg queuedBlocksPrevHash map[chainhash.Hash]chainhash.Hash peerSubscribers []*peerSubscription + connectedPeers func() []*peerpkg.Peer // An optional fee estimator. feeEstimator *mempool.FeeEstimator @@ -1876,6 +1877,29 @@ type peerSubscription struct { cancel <-chan struct{} } +// ConnectedPeers returns all the currently connected peers to the channel +// and then any additional new peers on connect. +func (sm *SyncManager) ConnectedPeers() (<-chan query.Peer, func(), error) { + peers := sm.connectedPeers() + peerChan := make(chan query.Peer, len(peers)) + + for _, peer := range peers { + if sm.isSyncCandidate(peer) { + peerChan <- peer + } + } + + cancelChan := make(chan struct{}) + sm.peerSubscribers = append(sm.peerSubscribers, &peerSubscription{ + peers: peerChan, + cancel: cancelChan, + }) + + return peerChan, func() { + close(cancelChan) + }, nil +} + // New constructs a new SyncManager. Use Start to begin processing asynchronous // block, tx, and inv updates. func New(config *Config) (*SyncManager, error) { @@ -1895,6 +1919,7 @@ func New(config *Config) (*SyncManager, error) { queuedBlocks: make(map[chainhash.Hash]*blockMsg), queuedBlocksPrevHash: make(map[chainhash.Hash]chainhash.Hash), feeEstimator: config.FeeEstimator, + connectedPeers: config.ConnectedPeers, } best := sm.chain.BestSnapshot() diff --git a/server.go b/server.go index 52d3459023..66bebc63c9 100644 --- a/server.go +++ b/server.go @@ -2940,6 +2940,7 @@ func newServer(listenAddrs, agentBlacklist, agentWhitelist []string, DisableCheckpoints: cfg.DisableCheckpoints, MaxPeers: cfg.MaxPeers, FeeEstimator: s.feeEstimator, + ConnectedPeers: s.ConnectedPeers, }) if err != nil { return nil, err From 159cd2d99e1e2689a8fefcbeeb72fc1492c67144 Mon Sep 17 00:00:00 2001 From: Calvin Kim Date: Wed, 7 Aug 2024 16:37:39 +0900 Subject: [PATCH 07/11] netsync: add fetchManager and request blocks from it The blocks that were requested from headers are now sent over to query.Workmanager where it will rank peers based on their speed and request blocks from them accordingly. This allows for quicker block downloads as: 1: Workmanager will prioritize faster peers. 2: Workmanager is able to ask from multiple peers. --- netsync/manager.go | 106 ++++++++++++++++++++++++++++++++++++++++----- 1 file changed, 95 insertions(+), 11 deletions(-) diff --git a/netsync/manager.go b/netsync/manager.go index eee9668cee..de615598a0 100644 --- a/netsync/manager.go +++ b/netsync/manager.go @@ -29,6 +29,15 @@ const ( // more. minInFlightBlocks = 10 + // maxInFlightBlocks is the maximum number of blocks per peer that + // should be in the request queue for headers-first mode before + // requesting more. + maxInFlightBlocksPerPeer = 16 + + // blockDownloadWindow is the maximum number of blocks that are allowed + // to be out of sync. + blockDownloadWindow = 1024 + // maxRejectedTxns is the maximum number of rejected transactions // hashes to store in memory. maxRejectedTxns = 1000 @@ -276,6 +285,7 @@ type SyncManager struct { queuedBlocksPrevHash map[chainhash.Hash]chainhash.Hash peerSubscribers []*peerSubscription connectedPeers func() []*peerpkg.Peer + fetchManager query.WorkManager // An optional fee estimator. feeEstimator *mempool.FeeEstimator @@ -1106,11 +1116,30 @@ func (sm *SyncManager) fetchHeaderBlocks() { return } + queueFetch := func(msg *wire.MsgGetData, quit chan struct{}) { + if msg == nil || len(msg.InvList) == 0 { + return + } + + // Keep fetching until we don't have an error. + err := sm.queueFetchManager(msg) + if err != nil { + Loop: + for err != nil { + select { + case <-quit: + break Loop + default: + } + err = sm.queueFetchManager(msg) + } + } + } + // Build up a getdata request for the list of blocks the headers // describe. The size hint will be limited to wire.MaxInvPerMsg by // the function, so no need to double check it here. - gdmsg := wire.NewMsgGetDataSizeHint(uint(sm.headerList.Len())) - numRequested := 0 + msgs := make([]*wire.MsgGetData, 0, blockDownloadWindow/maxInFlightBlocksPerPeer) for e := sm.startHeader; e != nil; e = e.Next() { node, ok := e.Value.(*headerNode) if !ok { @@ -1118,18 +1147,40 @@ func (sm *SyncManager) fetchHeaderBlocks() { continue } + // Prevent the blocks from being too out of order. + if node.height-blockDownloadWindow > sm.chain.BestSnapshot().Height { + break + } + + // Check if the block is already requested. If it is just move + // to the next block. + _, requested := sm.requestedBlocks[*node.hash] + if requested { + sm.startHeader = e.Next() + continue + } + + // Check if the block is already queued. If it is just move to + // the next block. + _, queued := sm.queuedBlocks[*node.hash] + if queued { + sm.startHeader = e.Next() + continue + } + + // Check if we already have the block. iv := wire.NewInvVect(wire.InvTypeBlock, node.hash) haveInv, err := sm.haveInventory(iv) if err != nil { + // If we error out, fetch the block anyways. log.Warnf("Unexpected failure when checking for "+ "existing inventory during header block "+ "fetch: %v", err) } - if !haveInv { - syncPeerState := sm.peerStates[sm.syncPeer] + // We don't have this block so include it in the invs. + if !haveInv { sm.requestedBlocks[*node.hash] = struct{}{} - syncPeerState.requestedBlocks[*node.hash] = struct{}{} // If we're fetching from a witness enabled peer // post-fork, then ensure that we receive all the @@ -1138,16 +1189,16 @@ func (sm *SyncManager) fetchHeaderBlocks() { iv.Type = wire.InvTypeWitnessBlock } + gdmsg := wire.NewMsgGetDataSizeHint(1) gdmsg.AddInvVect(iv) - numRequested++ + msgs = append(msgs, gdmsg) } + sm.startHeader = e.Next() - if numRequested >= wire.MaxInvPerMsg { - break - } } - if len(gdmsg.InvList) > 0 { - sm.syncPeer.QueueMessage(gdmsg, nil) + + for _, m := range msgs { + go queueFetch(m, sm.quit) } } @@ -1816,6 +1867,10 @@ func (sm *SyncManager) Start() { return } + if err := sm.fetchManager.Start(); err != nil { + log.Info(err) + } + log.Trace("Starting sync manager") sm.wg.Add(1) go sm.blockHandler() @@ -1831,6 +1886,8 @@ func (sm *SyncManager) Stop() error { } log.Infof("Sync manager shutting down") + + sm.fetchManager.Stop() close(sm.quit) sm.wg.Wait() return nil @@ -1900,6 +1957,26 @@ func (sm *SyncManager) ConnectedPeers() (<-chan query.Peer, func(), error) { }, nil } +// queueFetchManager queues the given getdata to the fetch manager and waits for +// the resulting error from the channel and returns the value. +func (sm *SyncManager) queueFetchManager(msg *wire.MsgGetData) error { + r := newCheckpointedBlocksQuery(msg) + errChan := sm.fetchManager.Query( + r.requests(), + query.Cancel(sm.quit), + query.NoRetryMax(), + ) + + var err error + select { + case err = <-errChan: + return err + case <-sm.quit: + } + + return nil +} + // New constructs a new SyncManager. Use Start to begin processing asynchronous // block, tx, and inv updates. func New(config *Config) (*SyncManager, error) { @@ -1921,6 +1998,13 @@ func New(config *Config) (*SyncManager, error) { feeEstimator: config.FeeEstimator, connectedPeers: config.ConnectedPeers, } + sm.fetchManager = query.NewWorkManager( + &query.Config{ + ConnectedPeers: sm.ConnectedPeers, + NewWorker: query.NewWorker, + Ranking: query.NewPeerRanking(), + }, + ) best := sm.chain.BestSnapshot() if !config.DisableCheckpoints { From ed15b8aa6a52edc67c549f1c8c4faf21448bcd12 Mon Sep 17 00:00:00 2001 From: Calvin Kim Date: Wed, 7 Aug 2024 20:27:03 +0900 Subject: [PATCH 08/11] blockchain: store block concurrently while verifying it Storing block happens before the block validation is done and this can be a bottleneck on computers with slow disks. Allowing for concurrent block storage saves time as the disk operation can be done in parallel with the cpu operations of verifying the block. --- blockchain/accept.go | 57 +++++++++++++++++++++++++++++++++----------- 1 file changed, 43 insertions(+), 14 deletions(-) diff --git a/blockchain/accept.go b/blockchain/accept.go index 4adc2f6127..372f4542cc 100644 --- a/blockchain/accept.go +++ b/blockchain/accept.go @@ -6,6 +6,7 @@ package blockchain import ( "fmt" + "sync" "github.com/btcsuite/btcd/btcutil" "github.com/btcsuite/btcd/database" @@ -44,20 +45,36 @@ func (b *BlockChain) maybeAcceptBlock(block *btcutil.Block, flags BehaviorFlags) return false, err } - // Insert the block into the database if it's not already there. Even - // though it is possible the block will ultimately fail to connect, it - // has already passed all proof-of-work and validity tests which means - // it would be prohibitively expensive for an attacker to fill up the - // disk with a bunch of blocks that fail to connect. This is necessary - // since it allows block download to be decoupled from the much more - // expensive connection logic. It also has some other nice properties - // such as making blocks that never become part of the main chain or - // blocks that fail to connect available for further analysis. - err = b.db.Update(func(dbTx database.Tx) error { - return dbStoreBlock(dbTx, block) - }) - if err != nil { - return false, err + // Store the block in parallel if we're in headers first mode. The + // headers were already checked and this block is under the checkpoint + // so it's safe to just add it to the database while the block + // validation is happening. + var wg sync.WaitGroup + var dbStoreError error + if flags&BFFastAdd == BFFastAdd { + go func() { + wg.Add(1) + defer wg.Done() + // Insert the block into the database if it's not already there. Even + // though it is possible the block will ultimately fail to connect, it + // has already passed all proof-of-work and validity tests which means + // it would be prohibitively expensive for an attacker to fill up the + // disk with a bunch of blocks that fail to connect. This is necessary + // since it allows block download to be decoupled from the much more + // expensive connection logic. It also has some other nice properties + // such as making blocks that never become part of the main chain or + // blocks that fail to connect available for further analysis. + dbStoreError = b.db.Update(func(dbTx database.Tx) error { + return dbTx.StoreBlock(block) + }) + }() + } else { + err = b.db.Update(func(dbTx database.Tx) error { + return dbStoreBlock(dbTx, block) + }) + if err != nil { + return false, err + } } // Create a new block node for the block and add it to the node index. Even @@ -90,5 +107,17 @@ func (b *BlockChain) maybeAcceptBlock(block *btcutil.Block, flags BehaviorFlags) b.sendNotification(NTBlockAccepted, block) }() + // Wait until the block is saved. If there was a db error, then unset + // the data stored flag and flush the block index. + wg.Wait() + if dbStoreError != nil { + b.index.UnsetStatusFlags(newNode, statusDataStored) + err = b.index.flushToDB() + if err != nil { + return false, fmt.Errorf("%v. %v", err, dbStoreError) + } + return false, dbStoreError + } + return isMainChain, nil } From 7c1b80f60eacfdb6753d850e0467b121702a622b Mon Sep 17 00:00:00 2001 From: Calvin Kim Date: Wed, 7 Aug 2024 20:39:02 +0900 Subject: [PATCH 09/11] netsync: add logger for blocks downloaded from different peers during headers-first block download --- netsync/blocklogger.go | 75 ++++++++++++++++++++++++++++++++++++++++++ netsync/manager.go | 2 ++ 2 files changed, 77 insertions(+) diff --git a/netsync/blocklogger.go b/netsync/blocklogger.go index 31a6a4c509..1625c30048 100644 --- a/netsync/blocklogger.go +++ b/netsync/blocklogger.go @@ -6,6 +6,7 @@ package netsync import ( "fmt" + "sort" "sync" "time" @@ -82,3 +83,77 @@ func (b *blockProgressLogger) LogBlockHeight(block *btcutil.Block, chain *blockc func (b *blockProgressLogger) SetLastLogTime(time time.Time) { b.lastBlockLogTime = time } + +// peerLogger logs the progress of blocks downloaded from different peers during +// headers-first download. +type peerLogger struct { + lastPeerLogTime time.Time + peers map[string]int + + subsystemLogger btclog.Logger + sync.Mutex +} + +// newPeerLogger returns a new peerLogger with fields initialized. +func newPeerLogger(logger btclog.Logger) *peerLogger { + return &peerLogger{ + lastPeerLogTime: time.Now(), + subsystemLogger: logger, + peers: make(map[string]int), + } +} + +// LogPeers logs how many blocks have been received from which peers in the last +// 10 seconds. +func (p *peerLogger) LogPeers(peer string) { + p.Lock() + defer p.Unlock() + + count, found := p.peers[peer] + if found { + count++ + p.peers[peer] = count + } else { + p.peers[peer] = 1 + } + + now := time.Now() + duration := now.Sub(p.lastPeerLogTime) + if duration < time.Second*10 { + return + } + // Truncate the duration to 10s of milliseconds. + durationMillis := int64(duration / time.Millisecond) + tDuration := 10 * time.Millisecond * time.Duration(durationMillis/10) + + type peerInfo struct { + name string + count int + } + + // Sort by blocks downloaded before printing. + var sortedPeers []peerInfo + for k, v := range p.peers { + sortedPeers = append(sortedPeers, peerInfo{k, v}) + } + sort.Slice(sortedPeers, func(i, j int) bool { + return sortedPeers[i].count > sortedPeers[j].count + }) + + totalBlocks := 0 + peerDownloadStr := "" + for _, sortedPeer := range sortedPeers { + peerDownloadStr += fmt.Sprintf("%d blocks from %v, ", + sortedPeer.count, sortedPeer.name) + totalBlocks += sortedPeer.count + } + + p.subsystemLogger.Infof("Peer download stats in the last %s. total: %v, %s", + tDuration, totalBlocks, peerDownloadStr) + + // Reset fields. + p.lastPeerLogTime = now + for k := range p.peers { + delete(p.peers, k) + } +} diff --git a/netsync/manager.go b/netsync/manager.go index de615598a0..2472f37e63 100644 --- a/netsync/manager.go +++ b/netsync/manager.go @@ -264,6 +264,7 @@ type SyncManager struct { txMemPool *mempool.TxPool chainParams *chaincfg.Params progressLogger *blockProgressLogger + peerLogger *peerLogger msgChan chan interface{} wg sync.WaitGroup quit chan struct{} @@ -1990,6 +1991,7 @@ func New(config *Config) (*SyncManager, error) { requestedBlocks: make(map[chainhash.Hash]struct{}), peerStates: make(map[*peerpkg.Peer]*peerSyncState), progressLogger: newBlockProgressLogger("Processed", log), + peerLogger: newPeerLogger(log), msgChan: make(chan interface{}, config.MaxPeers*3), headerList: list.New(), quit: make(chan struct{}), From d8d8dd792a5a39d569326529c0af7bc87743d5da Mon Sep 17 00:00:00 2001 From: Calvin Kim Date: Thu, 5 Sep 2024 15:03:50 +0900 Subject: [PATCH 10/11] netsync: don't reset the requestedBlocks in headersFirst Resetting the requestedBlocks state in headersFirst is problematic since we may be banning peers that are still good. --- netsync/manager.go | 23 ++++++++++++----------- 1 file changed, 12 insertions(+), 11 deletions(-) diff --git a/netsync/manager.go b/netsync/manager.go index 2472f37e63..e6623d8827 100644 --- a/netsync/manager.go +++ b/netsync/manager.go @@ -409,11 +409,6 @@ func (sm *SyncManager) startSync() { // Start syncing from the best peer if one was selected. if bestPeer != nil { - // Clear the requestedBlocks if the sync peer changes, otherwise - // we may ignore blocks we need that the last sync peer failed - // to send. - sm.requestedBlocks = make(map[chainhash.Hash]struct{}) - locator, err := sm.chain.LatestBlockLocator() if err != nil { log.Errorf("Failed to get block locator for the "+ @@ -451,6 +446,10 @@ func (sm *SyncManager) startSync() { "%d from peer %s", best.Height+1, sm.nextCheckpoint.Height, bestPeer.Addr()) } else { + // Clear the requestedBlocks if the sync peer changes, otherwise + // we may ignore blocks we need that the last sync peer failed + // to send. + sm.requestedBlocks = make(map[chainhash.Hash]struct{}) bestPeer.PushGetBlocksMsg(locator, &zeroHash) } sm.syncPeer = bestPeer @@ -681,12 +680,14 @@ func (sm *SyncManager) clearRequestedState(state *peerSyncState) { delete(sm.requestedTxns, txHash) } - // Remove requested blocks from the global map so that they will be - // fetched from elsewhere next time we get an inv. - // TODO: we could possibly here check which peers have these blocks - // and request them now to speed things up a little. - for blockHash := range state.requestedBlocks { - delete(sm.requestedBlocks, blockHash) + if !sm.headersFirstMode { + // Remove requested blocks from the global map so that they will be + // fetched from elsewhere next time we get an inv. + // TODO: we could possibly here check which peers have these blocks + // and request them now to speed things up a little. + for blockHash := range state.requestedBlocks { + delete(sm.requestedBlocks, blockHash) + } } } From 2cbb562365f5bdc495252f6171bfb1b3bde9dd26 Mon Sep 17 00:00:00 2001 From: Calvin Kim Date: Thu, 5 Sep 2024 15:09:19 +0900 Subject: [PATCH 11/11] main: include query logging --- log.go | 33 ++++++++++++++++++--------------- 1 file changed, 18 insertions(+), 15 deletions(-) diff --git a/log.go b/log.go index 5707d7c23a..747fa8ed3c 100644 --- a/log.go +++ b/log.go @@ -21,6 +21,7 @@ import ( "github.com/btcsuite/btcd/netsync" "github.com/btcsuite/btcd/peer" "github.com/btcsuite/btcd/txscript" + "github.com/lightninglabs/neutrino/query" "github.com/btcsuite/btclog" "github.com/jrick/logrotate/rotator" @@ -54,21 +55,22 @@ var ( // application shutdown. logRotator *rotator.Rotator - adxrLog = backendLog.Logger("ADXR") - amgrLog = backendLog.Logger("AMGR") - cmgrLog = backendLog.Logger("CMGR") - bcdbLog = backendLog.Logger("BCDB") - btcdLog = backendLog.Logger("BTCD") - chanLog = backendLog.Logger("CHAN") - discLog = backendLog.Logger("DISC") - indxLog = backendLog.Logger("INDX") - minrLog = backendLog.Logger("MINR") - peerLog = backendLog.Logger("PEER") - rpcsLog = backendLog.Logger("RPCS") - scrpLog = backendLog.Logger("SCRP") - srvrLog = backendLog.Logger("SRVR") - syncLog = backendLog.Logger("SYNC") - txmpLog = backendLog.Logger("TXMP") + adxrLog = backendLog.Logger("ADXR") + amgrLog = backendLog.Logger("AMGR") + cmgrLog = backendLog.Logger("CMGR") + bcdbLog = backendLog.Logger("BCDB") + btcdLog = backendLog.Logger("BTCD") + chanLog = backendLog.Logger("CHAN") + discLog = backendLog.Logger("DISC") + indxLog = backendLog.Logger("INDX") + minrLog = backendLog.Logger("MINR") + peerLog = backendLog.Logger("PEER") + rpcsLog = backendLog.Logger("RPCS") + scrpLog = backendLog.Logger("SCRP") + srvrLog = backendLog.Logger("SRVR") + syncLog = backendLog.Logger("SYNC") + txmpLog = backendLog.Logger("TXMP") + queryLog = backendLog.Logger("QURY") ) // Initialize package-global logger variables. @@ -84,6 +86,7 @@ func init() { txscript.UseLogger(scrpLog) netsync.UseLogger(syncLog) mempool.UseLogger(txmpLog) + query.UseLogger(queryLog) } // subsystemLoggers maps each subsystem identifier to its associated logger.