Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

peer, main, netsync, blockchain: parallel block downloads #2226

Open
wants to merge 11 commits into
base: master
Choose a base branch
from
57 changes: 43 additions & 14 deletions blockchain/accept.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package blockchain

import (
"fmt"
"sync"

"github.com/btcsuite/btcd/btcutil"
"github.com/btcsuite/btcd/database"
Expand Down Expand Up @@ -44,20 +45,36 @@ func (b *BlockChain) maybeAcceptBlock(block *btcutil.Block, flags BehaviorFlags)
return false, err
}

// Insert the block into the database if it's not already there. Even
// though it is possible the block will ultimately fail to connect, it
// has already passed all proof-of-work and validity tests which means
// it would be prohibitively expensive for an attacker to fill up the
// disk with a bunch of blocks that fail to connect. This is necessary
// since it allows block download to be decoupled from the much more
// expensive connection logic. It also has some other nice properties
// such as making blocks that never become part of the main chain or
// blocks that fail to connect available for further analysis.
err = b.db.Update(func(dbTx database.Tx) error {
return dbStoreBlock(dbTx, block)
})
if err != nil {
return false, err
// Store the block in parallel if we're in headers first mode. The
// headers were already checked and this block is under the checkpoint
// so it's safe to just add it to the database while the block
// validation is happening.
var wg sync.WaitGroup
var dbStoreError error
if flags&BFFastAdd == BFFastAdd {
go func() {
wg.Add(1)
defer wg.Done()
// Insert the block into the database if it's not already there. Even
// though it is possible the block will ultimately fail to connect, it
// has already passed all proof-of-work and validity tests which means
// it would be prohibitively expensive for an attacker to fill up the
// disk with a bunch of blocks that fail to connect. This is necessary
// since it allows block download to be decoupled from the much more
// expensive connection logic. It also has some other nice properties
// such as making blocks that never become part of the main chain or
// blocks that fail to connect available for further analysis.
dbStoreError = b.db.Update(func(dbTx database.Tx) error {
return dbTx.StoreBlock(block)
})
}()
} else {
err = b.db.Update(func(dbTx database.Tx) error {
return dbStoreBlock(dbTx, block)
})
if err != nil {
return false, err
}
}

// Create a new block node for the block and add it to the node index. Even
Expand Down Expand Up @@ -90,5 +107,17 @@ func (b *BlockChain) maybeAcceptBlock(block *btcutil.Block, flags BehaviorFlags)
b.sendNotification(NTBlockAccepted, block)
}()

// Wait until the block is saved. If there was a db error, then unset
// the data stored flag and flush the block index.
wg.Wait()
if dbStoreError != nil {
b.index.UnsetStatusFlags(newNode, statusDataStored)
err = b.index.flushToDB()
if err != nil {
return false, fmt.Errorf("%v. %v", err, dbStoreError)
}
return false, dbStoreError
}

return isMainChain, nil
}
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ require (
github.com/gorilla/websocket v1.5.0
github.com/jessevdk/go-flags v1.4.0
github.com/jrick/logrotate v1.0.0
github.com/lightninglabs/neutrino v0.16.0
github.com/stretchr/testify v1.8.4
github.com/syndtr/goleveldb v1.0.1-0.20210819022825-2ae1ddf74ef7
golang.org/x/crypto v0.22.0
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ github.com/jrick/logrotate v1.0.0 h1:lQ1bL/n9mBNeIXoTUoYRlK4dHuNJVofX9oWqBtPnSzI
github.com/jrick/logrotate v1.0.0/go.mod h1:LNinyqDIJnpAur+b8yyulnQw/wDuN1+BYKlTRt3OuAQ=
github.com/kkdai/bstream v0.0.0-20161212061736-f391b8402d23 h1:FOOIBWrEkLgmlgGfMuZT83xIwfPDxEI2OHu6xUmJMFE=
github.com/kkdai/bstream v0.0.0-20161212061736-f391b8402d23/go.mod h1:J+Gs4SYgM6CZQHDETBtE9HaSEkGmuNXF86RwHhHUvq4=
github.com/lightninglabs/neutrino v0.16.0 h1:YNTQG32fPR/Zg0vvJVI65OBH8l3U18LSXXtX91hx0q0=
github.com/lightninglabs/neutrino v0.16.0/go.mod h1:x3OmY2wsA18+Kc3TSV2QpSUewOCiscw2mKpXgZv2kZk=
github.com/nxadm/tail v1.4.4 h1:DQuhQpB1tVlglWS2hLQ5OV6B5r8aGxSrPc5Qo6uTN78=
github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A=
github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
Expand Down
33 changes: 18 additions & 15 deletions log.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/btcsuite/btcd/netsync"
"github.com/btcsuite/btcd/peer"
"github.com/btcsuite/btcd/txscript"
"github.com/lightninglabs/neutrino/query"

"github.com/btcsuite/btclog"
"github.com/jrick/logrotate/rotator"
Expand Down Expand Up @@ -54,21 +55,22 @@ var (
// application shutdown.
logRotator *rotator.Rotator

adxrLog = backendLog.Logger("ADXR")
amgrLog = backendLog.Logger("AMGR")
cmgrLog = backendLog.Logger("CMGR")
bcdbLog = backendLog.Logger("BCDB")
btcdLog = backendLog.Logger("BTCD")
chanLog = backendLog.Logger("CHAN")
discLog = backendLog.Logger("DISC")
indxLog = backendLog.Logger("INDX")
minrLog = backendLog.Logger("MINR")
peerLog = backendLog.Logger("PEER")
rpcsLog = backendLog.Logger("RPCS")
scrpLog = backendLog.Logger("SCRP")
srvrLog = backendLog.Logger("SRVR")
syncLog = backendLog.Logger("SYNC")
txmpLog = backendLog.Logger("TXMP")
adxrLog = backendLog.Logger("ADXR")
amgrLog = backendLog.Logger("AMGR")
cmgrLog = backendLog.Logger("CMGR")
bcdbLog = backendLog.Logger("BCDB")
btcdLog = backendLog.Logger("BTCD")
chanLog = backendLog.Logger("CHAN")
discLog = backendLog.Logger("DISC")
indxLog = backendLog.Logger("INDX")
minrLog = backendLog.Logger("MINR")
peerLog = backendLog.Logger("PEER")
rpcsLog = backendLog.Logger("RPCS")
scrpLog = backendLog.Logger("SCRP")
srvrLog = backendLog.Logger("SRVR")
syncLog = backendLog.Logger("SYNC")
txmpLog = backendLog.Logger("TXMP")
queryLog = backendLog.Logger("QURY")
)

// Initialize package-global logger variables.
Expand All @@ -84,6 +86,7 @@ func init() {
txscript.UseLogger(scrpLog)
netsync.UseLogger(syncLog)
mempool.UseLogger(txmpLog)
query.UseLogger(queryLog)
}

// subsystemLoggers maps each subsystem identifier to its associated logger.
Expand Down
75 changes: 75 additions & 0 deletions netsync/blocklogger.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package netsync

import (
"fmt"
"sort"
"sync"
"time"

Expand Down Expand Up @@ -82,3 +83,77 @@ func (b *blockProgressLogger) LogBlockHeight(block *btcutil.Block, chain *blockc
func (b *blockProgressLogger) SetLastLogTime(time time.Time) {
b.lastBlockLogTime = time
}

// peerLogger logs the progress of blocks downloaded from different peers during
// headers-first download.
type peerLogger struct {
lastPeerLogTime time.Time
peers map[string]int

subsystemLogger btclog.Logger
sync.Mutex
}

// newPeerLogger returns a new peerLogger with fields initialized.
func newPeerLogger(logger btclog.Logger) *peerLogger {
return &peerLogger{
lastPeerLogTime: time.Now(),
subsystemLogger: logger,
peers: make(map[string]int),
}
}

// LogPeers logs how many blocks have been received from which peers in the last
// 10 seconds.
func (p *peerLogger) LogPeers(peer string) {
p.Lock()
defer p.Unlock()

count, found := p.peers[peer]
if found {
count++
p.peers[peer] = count
} else {
p.peers[peer] = 1
}

now := time.Now()
duration := now.Sub(p.lastPeerLogTime)
if duration < time.Second*10 {
return
}
// Truncate the duration to 10s of milliseconds.
durationMillis := int64(duration / time.Millisecond)
tDuration := 10 * time.Millisecond * time.Duration(durationMillis/10)

type peerInfo struct {
name string
count int
}

// Sort by blocks downloaded before printing.
var sortedPeers []peerInfo
for k, v := range p.peers {
sortedPeers = append(sortedPeers, peerInfo{k, v})
}
sort.Slice(sortedPeers, func(i, j int) bool {
return sortedPeers[i].count > sortedPeers[j].count
})

totalBlocks := 0
peerDownloadStr := ""
for _, sortedPeer := range sortedPeers {
peerDownloadStr += fmt.Sprintf("%d blocks from %v, ",
sortedPeer.count, sortedPeer.name)
totalBlocks += sortedPeer.count
}

p.subsystemLogger.Infof("Peer download stats in the last %s. total: %v, %s",
tDuration, totalBlocks, peerDownloadStr)

// Reset fields.
p.lastPeerLogTime = now
for k := range p.peers {
delete(p.peers, k)
}
}
3 changes: 3 additions & 0 deletions netsync/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,4 +38,7 @@ type Config struct {
MaxPeers int

FeeEstimator *mempool.FeeEstimator

// ConnectedPeers returns all the currently connected peers.
ConnectedPeers func() []*peer.Peer
}
Loading
Loading