Skip to content

Commit

Permalink
Merge pull request #2123 from kcalvinalvin/2024-02-15-no-utxocache-lo…
Browse files Browse the repository at this point in the history
…ading-on-reorgs

blockchain: fix inconsistent utxocache and database on reorg
  • Loading branch information
Roasbeef authored Mar 9, 2024
2 parents b66f5b8 + 078815b commit e63bf03
Show file tree
Hide file tree
Showing 5 changed files with 185 additions and 112 deletions.
103 changes: 38 additions & 65 deletions blockchain/chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -569,7 +569,7 @@ func (b *BlockChain) getReorganizeNodes(node *blockNode) (*list.List, *list.List
//
// This function MUST be called with the chain state lock held (for writes).
func (b *BlockChain) connectBlock(node *blockNode, block *btcutil.Block,
view *UtxoViewpoint, stxos []SpentTxOut) error {
stxos []SpentTxOut) error {

// Make sure it's extending the end of the best chain.
prevHash := &block.MsgBlock().Header.PrevBlock
Expand Down Expand Up @@ -611,18 +611,6 @@ func (b *BlockChain) connectBlock(node *blockNode, block *btcutil.Block,
curTotalTxns+numTxns, CalcPastMedianTime(node),
)

// If a utxoviewpoint was passed in, we'll be writing that viewpoint
// directly to the database on disk. In order for the database to be
// consistent, we must flush the cache before writing the viewpoint.
if view != nil {
err = b.db.Update(func(dbTx database.Tx) error {
return b.utxoCache.flush(dbTx, FlushRequired, state)
})
if err != nil {
return err
}
}

// Atomically insert info into the database.
err = b.db.Update(func(dbTx database.Tx) error {
// If the pruneTarget isn't 0, we should attempt to delete older blocks
Expand Down Expand Up @@ -676,16 +664,6 @@ func (b *BlockChain) connectBlock(node *blockNode, block *btcutil.Block,
return err
}

// Update the utxo set using the state of the utxo view. This
// entails removing all of the utxos spent and adding the new
// ones created by the block.
//
// A nil viewpoint is a no-op.
err = dbPutUtxoView(dbTx, view)
if err != nil {
return err
}

// Update the transaction spend journal by adding a record for
// the block that contains all txos spent by it.
err = dbPutSpendJournalEntry(dbTx, block.Hash(), stxos)
Expand All @@ -709,12 +687,6 @@ func (b *BlockChain) connectBlock(node *blockNode, block *btcutil.Block,
return err
}

// Prune fully spent entries and mark all entries in the view unmodified
// now that the modifications have been committed to the database.
if view != nil {
view.commit()
}

// This node is now the end of the best chain.
b.bestChain.SetTip(node)

Expand Down Expand Up @@ -796,6 +768,15 @@ func (b *BlockChain) disconnectBlock(node *blockNode, block *btcutil.Block, view
return err
}

// Flush the cache on every disconnect. Since the code for
// reorganization modifies the database directly, the cache
// will be left in an inconsistent state if we don't flush it
// prior to the dbPutUtxoView that happends below.
err = b.utxoCache.flush(dbTx, FlushRequired, state)
if err != nil {
return err
}

// Update the utxo set using the state of the utxo view. This
// entails restoring all of the utxos spent and removing the new
// ones created by the block.
Expand Down Expand Up @@ -880,22 +861,16 @@ func countSpentOutputs(block *btcutil.Block) int {
//
// This function may modify node statuses in the block index without flushing.
//
// This function never leaves the utxo set in an inconsistent state for block
// disconnects.
//
// This function MUST be called with the chain state lock held (for writes).
func (b *BlockChain) reorganizeChain(detachNodes, attachNodes *list.List) error {
// Nothing to do if no reorganize nodes were provided.
if detachNodes.Len() == 0 && attachNodes.Len() == 0 {
return nil
}

// The rest of the reorg depends on all STXOs already being in the database
// so we flush before reorg.
err := b.db.Update(func(dbTx database.Tx) error {
return b.utxoCache.flush(dbTx, FlushRequired, b.BestSnapshot())
})
if err != nil {
return err
}

// Ensure the provided nodes match the current best chain.
tip := b.bestChain.Tip()
if detachNodes.Len() != 0 {
Expand Down Expand Up @@ -957,7 +932,7 @@ func (b *BlockChain) reorganizeChain(detachNodes, attachNodes *list.List) error

// Load all of the utxos referenced by the block that aren't
// already in the view.
err = view.fetchInputUtxos(b.db, nil, block)
err = view.fetchInputUtxos(b.utxoCache, block)
if err != nil {
return err
}
Expand Down Expand Up @@ -1024,7 +999,7 @@ func (b *BlockChain) reorganizeChain(detachNodes, attachNodes *list.List) error
// checkConnectBlock gets skipped, we still need to update the UTXO
// view.
if b.index.NodeStatus(n).KnownValid() {
err = view.fetchInputUtxos(b.db, nil, block)
err = view.fetchInputUtxos(b.utxoCache, block)
if err != nil {
return err
}
Expand Down Expand Up @@ -1061,11 +1036,21 @@ func (b *BlockChain) reorganizeChain(detachNodes, attachNodes *list.List) error
newBest = n
}

// Flush the utxo cache for the block disconnect below. The disconnect
// code assumes that it's directly modifying the database so the cache
// will be left in an inconsistent state. It needs to be flushed beforehand
// in order for that to not happen.
err := b.db.Update(func(dbTx database.Tx) error {
return b.utxoCache.flush(dbTx, FlushRequired, b.BestSnapshot())
})
if err != nil {
return err
}

// Reset the view for the actual connection code below. This is
// required because the view was previously modified when checking if
// the reorg would be successful and the connection code requires the
// view to be valid from the viewpoint of each block being connected or
// disconnected.
// view to be valid from the viewpoint of each block being disconnected.
view = NewUtxoViewpoint()
view.SetBestHash(&b.bestChain.Tip().hash)

Expand All @@ -1076,7 +1061,7 @@ func (b *BlockChain) reorganizeChain(detachNodes, attachNodes *list.List) error

// Load all of the utxos referenced by the block that aren't
// already in the view.
err := view.fetchInputUtxos(b.db, nil, block)
err := view.fetchInputUtxos(b.utxoCache, block)
if err != nil {
return err
}
Expand All @@ -1089,51 +1074,39 @@ func (b *BlockChain) reorganizeChain(detachNodes, attachNodes *list.List) error
return err
}

// Update the database and chain state.
// Update the database and chain state. The cache will be flushed
// here before the utxoview modifications happen to the database.
err = b.disconnectBlock(n, block, view)
if err != nil {
return err
}
}

// Connect the new best chain blocks.
// Connect the new best chain blocks using the utxocache directly. It's more
// efficient and since we already checked that the blocks are correct and that
// the transactions connect properly, it's ok to access the cache. If we suddenly
// crash here, we are able to recover as well.
for i, e := 0, attachNodes.Front(); e != nil; i, e = i+1, e.Next() {
n := e.Value.(*blockNode)
block := attachBlocks[i]

// Load all of the utxos referenced by the block that aren't
// already in the view.
err := view.fetchInputUtxos(b.db, nil, block)
if err != nil {
return err
}

// Update the view to mark all utxos referenced by the block
// Update the cache to mark all utxos referenced by the block
// as spent and add all transactions being created by this block
// to it. Also, provide an stxo slice so the spent txout
// details are generated.
stxos := make([]SpentTxOut, 0, countSpentOutputs(block))
err = view.connectTransactions(block, &stxos)
err = b.utxoCache.connectTransactions(block, &stxos)
if err != nil {
return err
}

// Update the database and chain state.
err = b.connectBlock(n, block, view, stxos)
err = b.connectBlock(n, block, stxos)
if err != nil {
return err
}
}

// We call the flush at the end to update the last flush hash to the new
// best tip.
err = b.db.Update(func(dbTx database.Tx) error {
return b.utxoCache.flush(dbTx, FlushRequired, b.BestSnapshot())
})
if err != nil {
return err
}

// Log the point where the chain forked and old and new best chain
// heads.
if forkNode != nil {
Expand Down Expand Up @@ -1225,7 +1198,7 @@ func (b *BlockChain) connectBestChain(node *blockNode, block *btcutil.Block, fla
}

// Connect the block to the main chain.
err = b.connectBlock(node, block, nil, stxos)
err = b.connectBlock(node, block, stxos)
if err != nil {
// If we got hit with a rule error, then we'll mark
// that status of the block as invalid and flush the
Expand Down
66 changes: 66 additions & 0 deletions blockchain/fullblocks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,70 @@ func TestFullBlocks(t *testing.T) {
}
defer teardownFunc()

testBlockDisconnectExpectUTXO := func(item fullblocktests.BlockDisconnectExpectUTXO) {
expectedCallBack := func(notification *blockchain.Notification) {
switch notification.Type {

case blockchain.NTBlockDisconnected:
block, ok := notification.Data.(*btcutil.Block)
if !ok {
t.Fatalf("expected a block")
}

// Return early if the block we get isn't the relevant
// block.
if !block.Hash().IsEqual(&item.BlockHash) {
return
}

entry, err := chain.FetchUtxoEntry(item.OutPoint)
if err != nil {
t.Fatal(err)
}

if entry == nil || entry.IsSpent() {
t.Logf("expected utxo %v to exist but it's "+
"nil or spent\n", item.OutPoint.String())
t.Fatalf("expected utxo %v to exist but it's "+
"nil or spent", item.OutPoint.String())
}
}
}
unexpectedCallBack := func(notification *blockchain.Notification) {
switch notification.Type {
case blockchain.NTBlockDisconnected:
block, ok := notification.Data.(*btcutil.Block)
if !ok {
t.Fatalf("expected a block")
}

// Return early if the block we get isn't the relevant
// block.
if !block.Hash().IsEqual(&item.BlockHash) {
return
}

entry, err := chain.FetchUtxoEntry(item.OutPoint)
if err != nil {
t.Fatal(err)
}

if entry != nil && !entry.IsSpent() {
t.Logf("unexpected utxo %v to exist but it's "+
"not nil and not spent", item.OutPoint.String())
t.Fatalf("unexpected utxo %v exists but it's "+
"not nil and not spent\n", item.OutPoint.String())
}
}
}

if item.Expected {
chain.Subscribe(expectedCallBack)
} else {
chain.Subscribe(unexpectedCallBack)
}
}

// testAcceptedBlock attempts to process the block in the provided test
// instance and ensures that it was accepted according to the flags
// specified in the test.
Expand Down Expand Up @@ -300,6 +364,8 @@ func TestFullBlocks(t *testing.T) {
testOrphanOrRejectedBlock(item)
case fullblocktests.ExpectedTip:
testExpectedTip(item)
case fullblocktests.BlockDisconnectExpectUTXO:
testBlockDisconnectExpectUTXO(item)
default:
t.Fatalf("test #%d, item #%d is not one of "+
"the supported test instance types -- "+
Expand Down
Loading

0 comments on commit e63bf03

Please sign in to comment.