From 358386b84da4244c9d3983026d40b2166167c76f Mon Sep 17 00:00:00 2001 From: gop Date: Tue, 5 Sep 2023 11:23:40 -0500 Subject: [PATCH] bugfix: Fixing the Sync of blocks after downloaded * Downloader Queue was not working if we had two consecutive dom blocks * doneCh in the sync was blocking the future downloads, this is fixed by increasing the channel size * Changed the min peers from 1 to 3 for the sync * Updated the forceSyncCycle to 60 seconds from 10 secs --- core/slice.go | 2 -- core/tx_pool.go | 6 ++--- eth/downloader/downloader.go | 6 ++--- eth/downloader/queue.go | 46 ++++++++++++++++++------------------ eth/sync.go | 15 ++++++------ 5 files changed, 37 insertions(+), 38 deletions(-) diff --git a/core/slice.go b/core/slice.go index cbfb763803..fdbc7eae59 100644 --- a/core/slice.go +++ b/core/slice.go @@ -52,7 +52,6 @@ type Slice struct { quit chan struct{} // slice quit channel domClient *quaiclient.Client - domUrl string subClients []*quaiclient.Client wg sync.WaitGroup @@ -80,7 +79,6 @@ func NewSlice(db ethdb.Database, config *Config, txConfig *TxPoolConfig, txLooku config: chainConfig, engine: engine, sliceDb: db, - domUrl: domClientUrl, quit: make(chan struct{}), badHashesCache: make(map[common.Hash]bool), } diff --git a/core/tx_pool.go b/core/tx_pool.go index 3b1b36453c..c7d31f6e4f 100644 --- a/core/tx_pool.go +++ b/core/tx_pool.go @@ -1428,14 +1428,14 @@ func (pool *TxPool) promoteExecutables(accounts []common.InternalAddress) []*typ hash := tx.Hash() pool.all.Remove(hash) } - log.Debug("Removed old queued transactions", "count", len(forwards)) + log.Trace("Removed old queued transactions", "count", len(forwards)) // Drop all transactions that are too costly (low balance or out of gas) drops, _ := list.Filter(pool.currentState.GetBalance(addr), pool.currentMaxGas) for _, tx := range drops { hash := tx.Hash() pool.all.Remove(hash) } - log.Debug("Removed unpayable queued transactions", "count", len(drops)) + log.Trace("Removed unpayable queued transactions", "count", len(drops)) queuedNofundsMeter.Mark(int64(len(drops))) // Gather all executable transactions and promote them @@ -1446,7 +1446,7 @@ func (pool *TxPool) promoteExecutables(accounts []common.InternalAddress) []*typ promoted = append(promoted, tx) } } - log.Debug("Promoted queued transactions", "count", len(promoted)) + log.Trace("Promoted queued transactions", "count", len(promoted)) queuedGauge.Dec(int64(len(readies))) // Drop all transactions over the allowed limit diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go index 9cdb82f6fd..040a1e5fcb 100644 --- a/eth/downloader/downloader.go +++ b/eth/downloader/downloader.go @@ -1128,8 +1128,6 @@ func (d *Downloader) processFullSyncContent(peerHeight uint64) error { if err := d.importBlockResults(results); err != nil { return err } - d.headNumber = results[len(results)-1].Header.NumberU64() - d.headEntropy = d.core.TotalLogS(results[len(results)-1].Header) // If all the blocks are fetched, we exit the sync process if d.headNumber == peerHeight { return errNoFetchesPending @@ -1152,7 +1150,7 @@ func (d *Downloader) importBlockResults(results []*fetchResult) error { } // Retrieve the a batch of results to import first, last := results[0].Header, results[len(results)-1].Header - log.Debug("Inserting downloaded chain", "items", len(results), + log.Info("Inserting downloaded chain", "items", len(results), "firstnum", first.Number(), "firsthash", first.Hash(), "lastnum", last.Number(), "lasthash", last.Hash(), ) @@ -1162,6 +1160,8 @@ func (d *Downloader) importBlockResults(results []*fetchResult) error { if d.core.IsBlockHashABadHash(block.Hash()) { return errBadBlockFound } + d.headNumber = block.NumberU64() + d.headEntropy = d.core.TotalLogS(block.Header()) d.core.WriteBlock(block) } return nil diff --git a/eth/downloader/queue.go b/eth/downloader/queue.go index 097d86edfe..c69a753993 100644 --- a/eth/downloader/queue.go +++ b/eth/downloader/queue.go @@ -653,33 +653,31 @@ func (q *queue) DeliverHeaders(id string, headers []*types.Header, headerProcCh headers[i], headers[j] = headers[j], headers[i] } - if len(headers) == 0 && accepted { - return 0, nil - } - - if accepted { - if headers[len(headers)-1].Number().Uint64() != request.From { - logger.Info("First header broke chain ordering", "number", headers[0].Number(), "hash", headers[0].Hash(), "expected", request.From) - accepted = false - } else if headers[0].NumberU64() != targetTo { - if targetTo != 0 { - logger.Info("Last header broke skeleton structure ", "number", headers[0].Number(), "expected", targetTo) + if len(headers) > 0 && accepted { + if accepted { + if headers[len(headers)-1].Number().Uint64() != request.From { + logger.Info("First header broke chain ordering", "number", headers[0].Number(), "hash", headers[0].Hash(), "expected", request.From) accepted = false + } else if headers[0].NumberU64() != targetTo { + if targetTo != 0 { + logger.Info("Last header broke skeleton structure ", "number", headers[0].Number(), "expected", targetTo) + accepted = false + } } } - } - if accepted { - parentHash := headers[0].Hash() - for _, header := range headers[1:] { - hash := header.Hash() - if parentHash != header.ParentHash() { - logger.Warn("Header broke chain ancestry", "number", header.Number(), "hash", hash) - accepted = false - break + if accepted { + parentHash := headers[0].Hash() + for _, header := range headers[1:] { + hash := header.Hash() + if parentHash != header.ParentHash() { + logger.Warn("Header broke chain ancestry", "number", header.Number(), "hash", hash) + accepted = false + break + } + // Set-up parent hash for next round + parentHash = hash } - // Set-up parent hash for next round - parentHash = hash } } // If the batch of headers wasn't accepted, mark as unavailable @@ -697,7 +695,9 @@ func (q *queue) DeliverHeaders(id string, headers []*types.Header, headerProcCh return 0, errors.New("delivery not accepted") } - copy(q.headerResults[targetTo-q.headerOffset:], headers) + if len(headers) > 0 { + copy(q.headerResults[targetTo-q.headerOffset:], headers) + } // Clean up a successful fetch and try to deliver any sub-results delete(q.headerTaskPool, request.From+1) diff --git a/eth/sync.go b/eth/sync.go index ae31b790a8..c1ee1a4e48 100644 --- a/eth/sync.go +++ b/eth/sync.go @@ -30,8 +30,8 @@ import ( ) const ( - forceSyncCycle = 10 * time.Second // Time interval to force syncs, even if few peers are available - defaultMinSyncPeers = 1 // Amount of peers desired to start syncing + forceSyncCycle = 60 * time.Second // Time interval to force syncs, even if few peers are available + defaultMinSyncPeers = 3 // Amount of peers desired to start syncing // This is the target size for the packs of transactions sent by txsyncLoop64. // A pack can get larger than this if a single transactions exceeds this size. @@ -274,7 +274,7 @@ func (cs *chainSyncer) modeAndLocalHead() (downloader.SyncMode, *big.Int) { // startSync launches doSync in a new goroutine. func (cs *chainSyncer) startSync(op *chainSyncOp) { - cs.doneCh = make(chan error, 1) + cs.doneCh = make(chan error, 10) go func() { cs.doneCh <- cs.handler.doSync(op) }() } @@ -282,16 +282,17 @@ func (cs *chainSyncer) startSync(op *chainSyncOp) { func (h *handler) doSync(op *chainSyncOp) error { // Run the sync cycle, and disable fast sync if we're past the pivot block err := h.downloader.Synchronise(op.peer.ID(), op.head, op.entropy, op.mode) + log.Info("Downloader exited", "err", err) if err != nil { return err } // If we've successfully finished a sync cycle and passed any required checkpoint, // enable accepting transactions from the network. head := h.core.CurrentBlock() - if head == nil { - log.Warn("doSync: head is nil", "hash", h.core.CurrentHeader().Hash(), "number", h.core.CurrentHeader().NumberArray()) - return nil - } + if head == nil { + log.Warn("doSync: head is nil", "hash", h.core.CurrentHeader().Hash(), "number", h.core.CurrentHeader().NumberArray()) + return nil + } if head.NumberU64() > 0 { // We've completed a sync cycle, notify all peers of new state. This path is // essential in star-topology networks where a gateway node needs to notify