-
Notifications
You must be signed in to change notification settings - Fork 241
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
remove acceptor queue (part 1) #1334
base: master
Are you sure you want to change the base?
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -85,7 +85,6 @@ var ( | |
blockValidationTimer = metrics.NewRegisteredCounter("chain/block/validations/state", nil) | ||
blockWriteTimer = metrics.NewRegisteredCounter("chain/block/writes", nil) | ||
|
||
acceptorQueueGauge = metrics.NewRegisteredGauge("chain/acceptor/queue/size", nil) | ||
acceptorWorkTimer = metrics.NewRegisteredCounter("chain/acceptor/work", nil) | ||
acceptorWorkCount = metrics.NewRegisteredCounter("chain/acceptor/work/count", nil) | ||
lastAcceptedBlockBaseFeeGauge = metrics.NewRegisteredGauge("chain/block/fee/basefee", nil) | ||
|
@@ -175,7 +174,6 @@ type CacheConfig struct { | |
TriePrefetcherParallelism int // Max concurrent disk reads trie prefetcher should perform at once | ||
CommitInterval uint64 // Commit the trie every [CommitInterval] blocks. | ||
Pruning bool // Whether to disable trie write caching and GC altogether (archive node) | ||
AcceptorQueueLimit int // Blocks to queue before blocking during acceptance | ||
PopulateMissingTries *uint64 // If non-nil, sets the starting height for re-generating historical tries. | ||
PopulateMissingTriesParallelism int // Number of readers to use when trying to populate missing tries. | ||
AllowMissingTries bool // Whether to allow an archive node to run with pruning enabled | ||
|
@@ -221,7 +219,6 @@ var DefaultCacheConfig = &CacheConfig{ | |
TriePrefetcherParallelism: 16, | ||
Pruning: true, | ||
CommitInterval: 4096, | ||
AcceptorQueueLimit: 64, // Provides 2 minutes of buffer (2s block target) for a commit delay | ||
SnapshotLimit: 256, | ||
AcceptedCacheSize: 32, | ||
StateScheme: rawdb.HashScheme, | ||
|
@@ -305,25 +302,6 @@ type BlockChain struct { | |
|
||
senderCacher *TxSenderCacher | ||
|
||
// [acceptorQueue] is a processing queue for the Acceptor. This is | ||
// different than [chainAcceptedFeed], which is sent an event after an accepted | ||
// block is processed (after each loop of the accepted worker). If there is a | ||
// clean shutdown, all items inserted into the [acceptorQueue] will be processed. | ||
acceptorQueue chan *types.Block | ||
|
||
// [acceptorClosingLock], and [acceptorClosed] are used | ||
// to synchronize the closing of the [acceptorQueue] channel. | ||
// | ||
// Because we can't check if a channel is closed without reading from it | ||
// (which we don't want to do as we may remove a processing block), we need | ||
// to use a second variable to ensure we don't close a closed channel. | ||
acceptorClosingLock sync.RWMutex | ||
acceptorClosed bool | ||
|
||
// [acceptorWg] is used to wait for the acceptorQueue to clear. This is used | ||
// during shutdown and in tests. | ||
acceptorWg sync.WaitGroup | ||
|
||
// [wg] is used to wait for the async blockchain processes to finish on shutdown. | ||
wg sync.WaitGroup | ||
|
||
|
@@ -332,16 +310,6 @@ type BlockChain struct { | |
// WaitGroups are used to ensure that async processes have finished during shutdown. | ||
quit chan struct{} | ||
|
||
// [acceptorTip] is the last block processed by the acceptor. This is | ||
// returned as the LastAcceptedBlock() to ensure clients get only fully | ||
// processed blocks. This may be equal to [lastAccepted]. | ||
acceptorTip *types.Block | ||
acceptorTipLock sync.Mutex | ||
|
||
// [flattenLock] prevents the [acceptor] from flattening snapshots while | ||
// a block is being verified. | ||
flattenLock sync.Mutex | ||
|
||
// [acceptedLogsCache] stores recently accepted logs to improve the performance of eth_getLogs. | ||
acceptedLogsCache FIFOCache[common.Hash, [][]*types.Log] | ||
|
||
|
@@ -395,7 +363,6 @@ func NewBlockChain( | |
engine: engine, | ||
vmConfig: vmConfig, | ||
senderCacher: NewTxSenderCacher(runtime.NumCPU()), | ||
acceptorQueue: make(chan *types.Block, cacheConfig.AcceptorQueueLimit), | ||
quit: make(chan struct{}), | ||
acceptedLogsCache: NewFIFOCache[common.Hash, [][]*types.Log](cacheConfig.AcceptedCacheSize), | ||
} | ||
|
@@ -422,13 +389,6 @@ func NewBlockChain( | |
return nil, err | ||
} | ||
|
||
// After loading the last state (and reprocessing if necessary), we are | ||
// guaranteed that [acceptorTip] is equal to [lastAccepted]. | ||
// | ||
// It is critical to update this vaue before performing any state repairs so | ||
// that all accepted blocks can be considered. | ||
bc.acceptorTip = bc.lastAccepted | ||
|
||
// Make sure the state associated with the block is available | ||
head := bc.CurrentBlock() | ||
if !bc.HasState(head.Root) { | ||
|
@@ -461,10 +421,6 @@ func NewBlockChain( | |
latestStateSynced := rawdb.GetLatestSyncPerformed(bc.db) | ||
bc.repairTxIndexTail(latestStateSynced) | ||
} | ||
|
||
// Start processing accepted blocks effects in the background | ||
go bc.startAcceptor() | ||
|
||
// Start tx indexer if it's enabled. | ||
if bc.cacheConfig.TransactionHistory != 0 { | ||
bc.txIndexer = newTxIndexer(bc.cacheConfig.TransactionHistory, bc) | ||
|
@@ -513,12 +469,6 @@ func (bc *BlockChain) flattenSnapshot(postAbortWork func() error, hash common.Ha | |
return err | ||
} | ||
|
||
// Ensure we avoid flattening the snapshot while we are processing a block, or | ||
// block execution will fallback to reading from the trie (which is much | ||
// slower). | ||
bc.flattenLock.Lock() | ||
defer bc.flattenLock.Unlock() | ||
|
||
// Flatten the entire snap Trie to disk | ||
// | ||
// Note: This resumes snapshot generation. | ||
|
@@ -562,110 +512,41 @@ func (bc *BlockChain) warmAcceptedCaches() { | |
log.Info("Warmed accepted caches", "start", startIndex, "end", lastAccepted, "t", time.Since(startTime)) | ||
} | ||
|
||
// startAcceptor starts processing items on the [acceptorQueue]. If a [nil] | ||
// object is placed on the [acceptorQueue], the [startAcceptor] will exit. | ||
func (bc *BlockChain) startAcceptor() { | ||
log.Info("Starting Acceptor", "queue length", bc.cacheConfig.AcceptorQueueLimit) | ||
|
||
for next := range bc.acceptorQueue { | ||
start := time.Now() | ||
acceptorQueueGauge.Dec(1) | ||
|
||
if err := bc.flattenSnapshot(func() error { | ||
return bc.stateManager.AcceptTrie(next) | ||
}, next.Hash()); err != nil { | ||
log.Crit("unable to flatten snapshot from acceptor", "blockHash", next.Hash(), "err", err) | ||
} | ||
|
||
// Update last processed and transaction lookup index | ||
if err := bc.writeBlockAcceptedIndices(next); err != nil { | ||
log.Crit("failed to write accepted block effects", "err", err) | ||
} | ||
|
||
// Ensure [hc.acceptedNumberCache] and [acceptedLogsCache] have latest content | ||
bc.hc.acceptedNumberCache.Put(next.NumberU64(), next.Header()) | ||
logs := bc.collectUnflattenedLogs(next, false) | ||
bc.acceptedLogsCache.Put(next.Hash(), logs) | ||
|
||
// Update the acceptor tip before sending events to ensure that any client acting based off of | ||
// the events observes the updated acceptorTip on subsequent requests | ||
bc.acceptorTipLock.Lock() | ||
bc.acceptorTip = next | ||
bc.acceptorTipLock.Unlock() | ||
|
||
// Update accepted feeds | ||
flattenedLogs := types.FlattenLogs(logs) | ||
bc.chainAcceptedFeed.Send(ChainEvent{Block: next, Hash: next.Hash(), Logs: flattenedLogs}) | ||
if len(flattenedLogs) > 0 { | ||
bc.logsAcceptedFeed.Send(flattenedLogs) | ||
} | ||
if len(next.Transactions()) != 0 { | ||
bc.txAcceptedFeed.Send(NewTxsEvent{next.Transactions()}) | ||
} | ||
|
||
bc.acceptorWg.Done() | ||
// accept processes a block that has been verified and updates the snapshot | ||
// and indexes. | ||
func (bc *BlockChain) accept(next *types.Block) error { | ||
start := time.Now() | ||
|
||
acceptorWorkTimer.Inc(time.Since(start).Milliseconds()) | ||
acceptorWorkCount.Inc(1) | ||
// Note: in contrast to most accepted metrics, we increment the accepted log metrics in the acceptor queue because | ||
// the logs are already processed in the acceptor queue. | ||
acceptedLogsCounter.Inc(int64(len(logs))) | ||
if err := bc.flattenSnapshot(func() error { | ||
return bc.stateManager.AcceptTrie(next) | ||
}, next.Hash()); err != nil { | ||
log.Crit("unable to flatten snapshot from acceptor", "blockHash", next.Hash(), "err", err) | ||
} | ||
} | ||
|
||
// addAcceptorQueue adds a new *types.Block to the [acceptorQueue]. This will | ||
// block if there are [AcceptorQueueLimit] items in [acceptorQueue]. | ||
func (bc *BlockChain) addAcceptorQueue(b *types.Block) { | ||
// We only acquire a read lock here because it is ok to add items to the | ||
// [acceptorQueue] concurrently. | ||
bc.acceptorClosingLock.RLock() | ||
defer bc.acceptorClosingLock.RUnlock() | ||
|
||
if bc.acceptorClosed { | ||
return | ||
// Update last processed and transaction lookup index | ||
if err := bc.writeBlockAcceptedIndices(next); err != nil { | ||
log.Crit("failed to write accepted block effects", "err", err) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. should we return errs here instead of just log them? I think we should still keep the logs. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. it is correct to return the error (I will use fmt.Errorf) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. log.Crit uses |
||
} | ||
|
||
acceptorQueueGauge.Inc(1) | ||
bc.acceptorWg.Add(1) | ||
bc.acceptorQueue <- b | ||
} | ||
|
||
// DrainAcceptorQueue blocks until all items in [acceptorQueue] have been | ||
// processed. | ||
func (bc *BlockChain) DrainAcceptorQueue() { | ||
bc.acceptorClosingLock.RLock() | ||
defer bc.acceptorClosingLock.RUnlock() | ||
// Ensure [hc.acceptedNumberCache] and [acceptedLogsCache] have latest content | ||
bc.hc.acceptedNumberCache.Put(next.NumberU64(), next.Header()) | ||
logs := bc.collectUnflattenedLogs(next, false) | ||
bc.acceptedLogsCache.Put(next.Hash(), logs) | ||
|
||
if bc.acceptorClosed { | ||
return | ||
// Update accepted feeds | ||
flattenedLogs := types.FlattenLogs(logs) | ||
bc.chainAcceptedFeed.Send(ChainEvent{Block: next, Hash: next.Hash(), Logs: flattenedLogs}) | ||
if len(flattenedLogs) > 0 { | ||
bc.logsAcceptedFeed.Send(flattenedLogs) | ||
} | ||
|
||
bc.acceptorWg.Wait() | ||
} | ||
|
||
// stopAcceptor sends a signal to the Acceptor to stop processing accepted | ||
// blocks. The Acceptor will exit once all items in [acceptorQueue] have been | ||
// processed. | ||
func (bc *BlockChain) stopAcceptor() { | ||
bc.acceptorClosingLock.Lock() | ||
defer bc.acceptorClosingLock.Unlock() | ||
|
||
// If [acceptorClosed] is already false, we should just return here instead | ||
// of attempting to close [acceptorQueue] more than once (will cause | ||
// a panic). | ||
// | ||
// This typically happens when a test calls [stopAcceptor] directly (prior to | ||
// shutdown) and then [stopAcceptor] is called again in shutdown. | ||
if bc.acceptorClosed { | ||
return | ||
if len(next.Transactions()) != 0 { | ||
bc.txAcceptedFeed.Send(NewTxsEvent{next.Transactions()}) | ||
} | ||
|
||
// Although nothing should be added to [acceptorQueue] after | ||
// [acceptorClosed] is updated, we close the channel so the Acceptor | ||
// goroutine exits. | ||
bc.acceptorWg.Wait() | ||
bc.acceptorClosed = true | ||
close(bc.acceptorQueue) | ||
acceptorWorkTimer.Inc(time.Since(start).Milliseconds()) | ||
acceptorWorkCount.Inc(1) | ||
acceptedLogsCounter.Inc(int64(len(logs))) | ||
return nil | ||
} | ||
|
||
func (bc *BlockChain) InitializeSnapshots() { | ||
|
@@ -817,9 +698,6 @@ func (bc *BlockChain) writeHeadBlock(block *types.Block) { | |
|
||
// ValidateCanonicalChain confirms a canonical chain is well-formed. | ||
func (bc *BlockChain) ValidateCanonicalChain() error { | ||
// Ensure all accepted blocks are fully processed | ||
bc.DrainAcceptorQueue() | ||
|
||
current := bc.CurrentBlock() | ||
i := 0 | ||
log.Info("Beginning to validate canonical chain", "startBlock", current.Number) | ||
|
@@ -936,11 +814,6 @@ func (bc *BlockChain) stopWithoutSaving() { | |
|
||
log.Info("Closing quit channel") | ||
close(bc.quit) | ||
// Wait for accepted feed to process all remaining items | ||
log.Info("Stopping Acceptor") | ||
start := time.Now() | ||
bc.stopAcceptor() | ||
log.Info("Acceptor queue drained", "t", time.Since(start)) | ||
|
||
// Stop senderCacher's goroutines | ||
log.Info("Shutting down sender cacher") | ||
|
@@ -1041,10 +914,10 @@ func (bc *BlockChain) LastConsensusAcceptedBlock() *types.Block { | |
// | ||
// Note: During initialization, [acceptorTip] is equal to [lastAccepted]. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. should comment change here? |
||
func (bc *BlockChain) LastAcceptedBlock() *types.Block { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. can we remove "LastConsensusAcceptedBlock"? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. also I think we don't need to hold a lock everywhere we call this. seems we can just directly use the variable. should be careful with that though.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. CleanBlockRootsAboveLastAccepted is still used in offline pruning (to remove roots of non-accepted blocks), but I inlined the use of the variable in the 3 cases you mentioned There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. PR here #1339 |
||
bc.acceptorTipLock.Lock() | ||
defer bc.acceptorTipLock.Unlock() | ||
bc.chainmu.Lock() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. can we use a RLock here? |
||
defer bc.chainmu.Unlock() | ||
|
||
return bc.acceptorTip | ||
return bc.lastAccepted | ||
} | ||
|
||
// Accept sets a minimum height at which no reorg can pass. Additionally, | ||
|
@@ -1077,9 +950,12 @@ func (bc *BlockChain) Accept(block *types.Block) error { | |
} | ||
} | ||
|
||
// Enqueue block in the acceptor | ||
// Update the last accepted block | ||
bc.lastAccepted = block | ||
bc.addAcceptorQueue(block) | ||
if err := bc.accept(block); err != nil { | ||
return err | ||
} | ||
|
||
acceptedBlockGasUsedCounter.Inc(int64(block.GasUsed())) | ||
acceptedTxsCounter.Inc(int64(len(block.Transactions()))) | ||
if baseFee := block.BaseFee(); baseFee != nil { | ||
|
@@ -1365,10 +1241,6 @@ func (bc *BlockChain) insertBlock(block *types.Block, writes bool) error { | |
|
||
// Instantiate the statedb to use for processing transactions | ||
// | ||
// NOTE: Flattening a snapshot during block execution requires fetching state | ||
// entries directly from the trie (much slower). | ||
bc.flattenLock.Lock() | ||
defer bc.flattenLock.Unlock() | ||
statedb, err := state.New(parent.Root, bc.stateCache, bc.snaps) | ||
if err != nil { | ||
return err | ||
|
@@ -2129,7 +2001,6 @@ func (bc *BlockChain) ResetToStateSyncedBlock(block *types.Block) error { | |
|
||
// Update all in-memory chain markers | ||
bc.lastAccepted = block | ||
bc.acceptorTip = block | ||
bc.currentBlock.Store(block.Header()) | ||
bc.hc.SetCurrentHeader(block.Header()) | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we still need to store the latest tip with
WriteAcceptorTip
? I mean stopping updating the stored value (not to completely remove it).There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If I'm not mistaken acceptorTip can only be =< to lastAccepted. So if we decide not to update this we might end up always reprocessing the state.
I wonder after removing the acceptor queue we will ever need to reprocess the state. Maybe we can keep the check
bc.HasState(current.Root())
.I also don't fully understand how that
reprocessState
works. If acceptorTip =< lastAccepted then from what I see it would go back from acceptorTip since we do thiscurrent = bc.GetBlockByHash(acceptorTip)
. what happens to the state between acceptorTip and lastAccepted then? I might be completely wrong though.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it is correct that
acceptorTip <= lastAccepted
or thatacceptorTip == common.Hash{}
reprocessState
is kind of confusing:acceptorTip < lastAccepted
, execution will start atacceptorTip
.We can stop updating it once it matches lastAccepted, but I would prefer to keep that to the next PR so we get it right. Let me know if you prefer to include stop updating it in this PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no I think it's fine if we remove them altogether in a safer way.