Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sync fixes and append queue updates #1087

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmd/utils/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ var (
DBEngineFlag = &cli.StringFlag{
Name: "db.engine",
Usage: "Backing database implementation to use ('leveldb' or 'pebble')",
Value: "pebble",
Value: "leveldb",
}
KeyStoreDirFlag = DirectoryFlag{
Name: "keystore",
Expand Down
26 changes: 18 additions & 8 deletions core/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"io"
"math/big"
"sort"
"strings"
"time"

"github.com/dominant-strategies/go-quai/common"
Expand All @@ -30,9 +31,9 @@ const (
c_appendQueueRetryPeriod = 1 // Time (in seconds) before retrying to append from AppendQueue
c_appendQueueThreshold = 1000 // Number of blocks to load from the disk to ram on every proc of append queue
c_processingCache = 10 // Number of block hashes held to prevent multi simultaneous appends on a single block hash
c_primeRetryThreshold = 900 // Number of times a block is retry to be appended before eviction from append queue in Prime
c_regionRetryThreshold = 300 // Number of times a block is retry to be appended before eviction from append queue in Region
c_zoneRetryThreshold = 100 // Number of times a block is retry to be appended before eviction from append queue in Zone
c_primeRetryThreshold = 1800 // Number of times a block is retry to be appended before eviction from append queue in Prime
c_regionRetryThreshold = 1200 // Number of times a block is retry to be appended before eviction from append queue in Region
c_zoneRetryThreshold = 600 // Number of times a block is retry to be appended before eviction from append queue in Zone
c_maxFutureBlocks = 15 // Number of blocks ahead of the current block to be put in the hashNumberList
c_appendQueueRetryPriorityThreshold = 5 // If retry counter for a block is less than this number, then its put in the special list that is tried first to be appended
c_appendQueueRemoveThreshold = 10 // Number of blocks behind the block should be from the current header to be eligble for removal from the append queue
Expand Down Expand Up @@ -130,7 +131,11 @@ func (c *Core) InsertChain(blocks types.Blocks) (int, error) {
} else if err.Error() != ErrKnownBlock.Error() {
log.Info("Append failed.", "hash", block.Hash(), "err", err)
}
c.removeFromAppendQueue(block)
if err != nil && strings.Contains(err.Error(), "connection refused") {
log.Error("Append failed because of conenction refused error")
jdowning100 marked this conversation as resolved.
Show resolved Hide resolved
} else {
c.removeFromAppendQueue(block)
}
}
}
return len(blocks), nil
Expand All @@ -145,11 +150,12 @@ func (c *Core) procAppendQueue() {
for _, hash := range c.appendQueue.Keys() {
if value, exist := c.appendQueue.Peek(hash); exist {
hashNumber := types.HashAndNumber{Hash: hash.(common.Hash), Number: value.(blockNumberAndRetryCounter).number}
if value.(blockNumberAndRetryCounter).retry < c_appendQueueRetryPriorityThreshold {
hashNumberPriorityList = append(hashNumberPriorityList, hashNumber)
}
if hashNumber.Number < c.CurrentHeader().NumberU64()+c_maxFutureBlocks {
hashNumberList = append(hashNumberList, hashNumber)
if value.(blockNumberAndRetryCounter).retry < c_appendQueueRetryPriorityThreshold {
hashNumberPriorityList = append(hashNumberPriorityList, hashNumber)
} else {
hashNumberList = append(hashNumberList, hashNumber)
}
}
}
}
Expand Down Expand Up @@ -354,6 +360,10 @@ func (c *Core) WriteBlock(block *types.Block) {
nodeCtx := common.NodeLocation.Context()
if order == nodeCtx {
c.addToAppendQueue(block)
parentHeader := c.GetHeader(block.ParentHash(), block.NumberU64()-1)
if parentHeader != nil {
jdowning100 marked this conversation as resolved.
Show resolved Hide resolved
c.InsertChain([]*types.Block{block})
}
// If a dom block comes in and we havent appended it yet
} else if order < nodeCtx && c.GetHeaderByHash(block.Hash()) == nil {
if c.sl.domClient != nil {
Expand Down
6 changes: 3 additions & 3 deletions core/slice.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@ type Slice struct {
quit chan struct{} // slice quit channel

domClient *quaiclient.Client
domUrl string
subClients []*quaiclient.Client

wg sync.WaitGroup
Expand Down Expand Up @@ -80,7 +79,6 @@ func NewSlice(db ethdb.Database, config *Config, txConfig *TxPoolConfig, txLooku
config: chainConfig,
engine: engine,
sliceDb: db,
domUrl: domClientUrl,
quit: make(chan struct{}),
badHashesCache: make(map[common.Hash]bool),
}
Expand Down Expand Up @@ -1070,7 +1068,9 @@ func (sl *Slice) loadLastState() error {
}
rawdb.DeletePhCache(sl.sliceDb)
sl.bestPhKey = rawdb.ReadBestPhKey(sl.sliceDb)
sl.miner.worker.LoadPendingBlockBody()
if sl.ProcessingState() {
sl.miner.worker.LoadPendingBlockBody()
}
return nil
}

Expand Down
6 changes: 3 additions & 3 deletions core/tx_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -1428,14 +1428,14 @@ func (pool *TxPool) promoteExecutables(accounts []common.InternalAddress) []*typ
hash := tx.Hash()
pool.all.Remove(hash)
}
log.Debug("Removed old queued transactions", "count", len(forwards))
log.Trace("Removed old queued transactions", "count", len(forwards))
// Drop all transactions that are too costly (low balance or out of gas)
drops, _ := list.Filter(pool.currentState.GetBalance(addr), pool.currentMaxGas)
for _, tx := range drops {
hash := tx.Hash()
pool.all.Remove(hash)
}
log.Debug("Removed unpayable queued transactions", "count", len(drops))
log.Trace("Removed unpayable queued transactions", "count", len(drops))
queuedNofundsMeter.Mark(int64(len(drops)))

// Gather all executable transactions and promote them
Expand All @@ -1446,7 +1446,7 @@ func (pool *TxPool) promoteExecutables(accounts []common.InternalAddress) []*typ
promoted = append(promoted, tx)
}
}
log.Debug("Promoted queued transactions", "count", len(promoted))
log.Trace("Promoted queued transactions", "count", len(promoted))
queuedGauge.Dec(int64(len(readies)))

// Drop all transactions over the allowed limit
Expand Down
10 changes: 4 additions & 6 deletions core/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,18 +277,16 @@ func newWorker(config *Config, chainConfig *params.ChainConfig, db ethdb.Databas
phBodyCache, _ := lru.New(pendingBlockBodyLimit)
worker.pendingBlockBody = phBodyCache

if headerchain.ProcessingState() {
worker.chainHeadSub = worker.hc.SubscribeChainHeadEvent(worker.chainHeadCh)
}

// Sanitize recommit interval if the user-specified one is too short.
recommit := worker.config.Recommit
if recommit < minRecommitInterval {
log.Warn("Sanitizing miner recommit interval", "provided", recommit, "updated", minRecommitInterval)
recommit = minRecommitInterval
}

if processingState {
nodeCtx := common.NodeLocation.Context()
if headerchain.ProcessingState() && nodeCtx == common.ZONE_CTX {
worker.chainHeadSub = worker.hc.SubscribeChainHeadEvent(worker.chainHeadCh)
worker.wg.Add(1)
go worker.asyncStateLoop()
}
Expand Down Expand Up @@ -366,7 +364,7 @@ func (w *worker) start() {

// stop sets the running status as 0.
func (w *worker) stop() {
if w.hc.ProcessingState() {
if w.hc.ProcessingState() && common.NodeLocation.Context() == common.ZONE_CTX {
w.chainHeadSub.Unsubscribe()
}
atomic.StoreInt32(&w.running, 0)
Expand Down
18 changes: 11 additions & 7 deletions eth/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,8 +190,8 @@ func New(stack *node.Node, config *ethconfig.Config) (*Quai, error) {
}

// Only index bloom if processing state
if eth.core.ProcessingState() {
eth.bloomIndexer = core.NewBloomIndexer(chainDb, params.BloomBitsBlocks, params.BloomConfirms)
if eth.core.ProcessingState() && nodeCtx == common.ZONE_CTX {
eth.bloomIndexer = core.NewBloomIndexer(chainDb, params.BloomBitsBlocks, params.BloomConfirms)
eth.bloomIndexer.Start(eth.Core().Slice().HeaderChain())
}

Expand Down Expand Up @@ -382,8 +382,10 @@ func (s *Quai) Protocols() []p2p.Protocol {
func (s *Quai) Start() error {
eth.StartENRUpdater(s.core, s.p2pServer.LocalNode())

// Start the bloom bits servicing goroutines
s.startBloomHandlers(params.BloomBitsBlocks)
if s.core.ProcessingState() && common.NodeLocation.Context() == common.ZONE_CTX {
// Start the bloom bits servicing goroutines
s.startBloomHandlers(params.BloomBitsBlocks)
}

// Figure out a max peers count based on the server limits
maxPeers := s.p2pServer.MaxPeers
Expand All @@ -399,9 +401,11 @@ func (s *Quai) Stop() error {
s.ethDialCandidates.Close()
s.handler.Stop()

// Then stop everything else.
s.bloomIndexer.Close()
close(s.closeBloomHandler)
if s.core.ProcessingState() && common.NodeLocation.Context() == common.ZONE_CTX {
// Then stop everything else.
s.bloomIndexer.Close()
close(s.closeBloomHandler)
}
s.core.Stop()
s.engine.Close()
rawdb.PopUncleanShutdownMarker(s.chainDb)
Expand Down
6 changes: 3 additions & 3 deletions eth/downloader/downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -1128,8 +1128,6 @@ func (d *Downloader) processFullSyncContent(peerHeight uint64) error {
if err := d.importBlockResults(results); err != nil {
return err
}
d.headNumber = results[len(results)-1].Header.NumberU64()
d.headEntropy = d.core.TotalLogS(results[len(results)-1].Header)
// If all the blocks are fetched, we exit the sync process
if d.headNumber == peerHeight {
return errNoFetchesPending
Expand All @@ -1152,7 +1150,7 @@ func (d *Downloader) importBlockResults(results []*fetchResult) error {
}
// Retrieve the a batch of results to import
first, last := results[0].Header, results[len(results)-1].Header
log.Debug("Inserting downloaded chain", "items", len(results),
log.Info("Inserting downloaded chain", "items", len(results),
"firstnum", first.Number(), "firsthash", first.Hash(),
"lastnum", last.Number(), "lasthash", last.Hash(),
)
Expand All @@ -1162,6 +1160,8 @@ func (d *Downloader) importBlockResults(results []*fetchResult) error {
if d.core.IsBlockHashABadHash(block.Hash()) {
return errBadBlockFound
}
d.headNumber = block.NumberU64()
d.headEntropy = d.core.TotalLogS(block.Header())
d.core.WriteBlock(block)
}
return nil
Expand Down
32 changes: 15 additions & 17 deletions eth/downloader/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -653,11 +653,7 @@ func (q *queue) DeliverHeaders(id string, headers []*types.Header, headerProcCh
headers[i], headers[j] = headers[j], headers[i]
}

if len(headers) == 0 && accepted {
return 0, nil
}

if accepted {
if len(headers) > 0 && accepted {
if headers[len(headers)-1].Number().Uint64() != request.From {
logger.Info("First header broke chain ordering", "number", headers[0].Number(), "hash", headers[0].Hash(), "expected", request.From)
accepted = false
Expand All @@ -667,19 +663,19 @@ func (q *queue) DeliverHeaders(id string, headers []*types.Header, headerProcCh
accepted = false
}
}
}

if accepted {
parentHash := headers[0].Hash()
for _, header := range headers[1:] {
hash := header.Hash()
if parentHash != header.ParentHash() {
logger.Warn("Header broke chain ancestry", "number", header.Number(), "hash", hash)
accepted = false
break
if accepted {
parentHash := headers[0].Hash()
for _, header := range headers[1:] {
hash := header.Hash()
if parentHash != header.ParentHash() {
logger.Warn("Header broke chain ancestry", "number", header.Number(), "hash", hash)
accepted = false
break
}
// Set-up parent hash for next round
parentHash = hash
}
// Set-up parent hash for next round
parentHash = hash
}
}
// If the batch of headers wasn't accepted, mark as unavailable
Expand All @@ -697,7 +693,9 @@ func (q *queue) DeliverHeaders(id string, headers []*types.Header, headerProcCh
return 0, errors.New("delivery not accepted")
}

copy(q.headerResults[targetTo-q.headerOffset:], headers)
if len(headers) > 0 {
copy(q.headerResults[targetTo-q.headerOffset:], headers)
}

// Clean up a successful fetch and try to deliver any sub-results
delete(q.headerTaskPool, request.From+1)
Expand Down
6 changes: 6 additions & 0 deletions eth/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -532,6 +532,12 @@ func (h *handler) missingPendingEtxsLoop() {
log.Trace("Fetching the missing pending etxs from", "peer", peer.ID(), "hash", hashAndLocation.Hash)
peer.RequestOnePendingEtxs(hashAndLocation.Hash)
}
if len(peersRunningSlice) == 0 {
for _, peer := range h.selectSomePeers() {
log.Trace("Fetching the missing pending etxs from", "peer", peer.ID(), "hash", hashAndLocation.Hash)
peer.RequestOnePendingEtxs(hashAndLocation.Hash)
}
}
case <-h.missingPendingEtxsSub.Err():
return
}
Expand Down
15 changes: 8 additions & 7 deletions eth/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ import (
)

const (
forceSyncCycle = 10 * time.Second // Time interval to force syncs, even if few peers are available
defaultMinSyncPeers = 1 // Amount of peers desired to start syncing
forceSyncCycle = 60 * time.Second // Time interval to force syncs, even if few peers are available
defaultMinSyncPeers = 3 // Amount of peers desired to start syncing

// This is the target size for the packs of transactions sent by txsyncLoop64.
// A pack can get larger than this if a single transactions exceeds this size.
Expand Down Expand Up @@ -274,24 +274,25 @@ func (cs *chainSyncer) modeAndLocalHead() (downloader.SyncMode, *big.Int) {

// startSync launches doSync in a new goroutine.
func (cs *chainSyncer) startSync(op *chainSyncOp) {
cs.doneCh = make(chan error, 1)
cs.doneCh = make(chan error, 10)
go func() { cs.doneCh <- cs.handler.doSync(op) }()
}

// doSync synchronizes the local blockchain with a remote peer.
func (h *handler) doSync(op *chainSyncOp) error {
// Run the sync cycle, and disable fast sync if we're past the pivot block
err := h.downloader.Synchronise(op.peer.ID(), op.head, op.entropy, op.mode)
log.Info("Downloader exited", "err", err)
if err != nil {
return err
}
// If we've successfully finished a sync cycle and passed any required checkpoint,
// enable accepting transactions from the network.
head := h.core.CurrentBlock()
if head == nil {
log.Warn("doSync: head is nil", "hash", h.core.CurrentHeader().Hash(), "number", h.core.CurrentHeader().NumberArray())
return nil
}
if head == nil {
log.Warn("doSync: head is nil", "hash", h.core.CurrentHeader().Hash(), "number", h.core.CurrentHeader().NumberArray())
return nil
}
if head.NumberU64() > 0 {
// We've completed a sync cycle, notify all peers of new state. This path is
// essential in star-topology networks where a gateway node needs to notify
Expand Down
2 changes: 1 addition & 1 deletion network.env.dist
Original file line number Diff line number Diff line change
Expand Up @@ -126,4 +126,4 @@ ENABLE_PPROF=false
# Output format variables
SHOW_COLORS=true
RUN_BLAKE3=false
DB_ENGINE=pebble
DB_ENGINE=leveldb
Loading