Skip to content

Commit

Permalink
Added key sorting and concurrent trimming to reduce overhead
Browse files Browse the repository at this point in the history
  • Loading branch information
jdowning100 committed Sep 5, 2024
1 parent a0aa063 commit 0ca58f5
Show file tree
Hide file tree
Showing 7 changed files with 150 additions and 36 deletions.
96 changes: 75 additions & 21 deletions consensus/blake3pow/consensus.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
"math/big"
"runtime"
"runtime/debug"
"sort"
"sync"
"time"

mapset "github.com/deckarep/golang-set"
Expand Down Expand Up @@ -677,16 +679,49 @@ func (blake3pow *Blake3pow) Finalize(chain consensus.ChainHeaderReader, batch et
}
}
start := time.Now()
collidingKeys, err := rawdb.ReadCollidingKeys(chain.Database(), header.ParentHash(nodeCtx))
if err != nil {
blake3pow.logger.Errorf("Failed to read colliding keys for block %s: %+v", header.ParentHash(nodeCtx).String(), err)
}
newCollidingKeys := make([][]byte, 0)
trimmedUtxos := make([]*types.SpentUtxoEntry, 0)
var wg sync.WaitGroup
var lock sync.Mutex
for denomination, depth := range trimDepths {
if header.NumberU64(nodeCtx) > depth+1 {
nextBlockToTrim := rawdb.ReadCanonicalHash(chain.Database(), header.NumberU64(nodeCtx)-depth)
TrimBlock(chain, batch, true, denomination, header.NumberU64(nodeCtx)-depth, nextBlockToTrim, &utxosDelete, &trimmedUtxos, &utxoSetSize, !setRoots, blake3pow.logger) // setRoots is false when we are processing the block
wg.Add(1)
go func(denomination uint8, depth uint64) {
nextBlockToTrim := rawdb.ReadCanonicalHash(chain.Database(), header.NumberU64(nodeCtx)-depth)
collisions := TrimBlock(chain, batch, denomination, true, header.NumberU64(nodeCtx)-depth, nextBlockToTrim, &utxosDelete, &trimmedUtxos, nil, &utxoSetSize, !setRoots, &lock, blake3pow.logger) // setRoots is false when we are processing the block
if len(collisions) > 0 {
lock.Lock()
newCollidingKeys = append(newCollidingKeys, collisions...)
lock.Unlock()
}
wg.Done()
}(denomination, depth)
}
}
if len(collidingKeys) > 0 {
wg.Add(1)
go func() {
// Trim colliding/duplicate keys here - an optimization could be to do this above in parallel with the other trims
collisions := TrimBlock(chain, batch, 0, false, 0, common.Hash{}, &utxosDelete, &trimmedUtxos, collidingKeys, &utxoSetSize, !setRoots, &lock, blake3pow.logger)
if len(collisions) > 0 {
lock.Lock()
newCollidingKeys = append(newCollidingKeys, collisions...)
lock.Unlock()
}
wg.Done()
}()
}
wg.Wait()
blake3pow.logger.Infof("Trimmed %d UTXOs from db in %s", len(trimmedUtxos), common.PrettyDuration(time.Since(start)))
if !setRoots {
rawdb.WriteTrimmedUTXOs(batch, header.Hash(), trimmedUtxos)
if len(newCollidingKeys) > 0 {
rawdb.WriteCollidingKeys(batch, header.Hash(), newCollidingKeys)
}
}
for _, hash := range utxosCreate {
multiSet.Add(hash.Bytes())
Expand Down Expand Up @@ -717,23 +752,44 @@ type UtxoEntryWithIndex struct {
Key []byte
}

func TrimBlock(chain consensus.ChainHeaderReader, batch ethdb.Batch, checkDenomination bool, denomination uint8, blockHeight uint64, blockHash common.Hash, utxosDelete *[]common.Hash, trimmedUtxos *[]*types.SpentUtxoEntry, utxoSetSize *uint64, deleteFromDb bool, logger *log.Logger) {
func TrimBlock(chain consensus.ChainHeaderReader, batch ethdb.Batch, denomination uint8, checkDenom bool, blockHeight uint64, blockHash common.Hash, utxosDelete *[]common.Hash, trimmedUtxos *[]*types.SpentUtxoEntry, collidingKeys [][]byte, utxoSetSize *uint64, deleteFromDb bool, lock *sync.Mutex, logger *log.Logger) [][]byte {
utxosCreated, _ := rawdb.ReadCreatedUTXOKeys(chain.Database(), blockHash)
if utxosCreated == nil {
// This is likely always going to be the case, as the prune depth will almost always be shorter than the trim depth
utxosCreated, _ = rawdb.ReadPrunedUTXOKeys(chain.Database(), blockHeight)
}
logger.Infof("UTXOs created in block %d: %d", blockHeight, len(utxosCreated))
utxos := make(map[common.Hash][]*UtxoEntryWithIndex)
if len(collidingKeys) > 0 {
logger.Infof("Colliding keys: %d", len(collidingKeys))
utxosCreated = append(utxosCreated, collidingKeys...)
sort.Slice(utxosCreated, func(i, j int) bool {
return utxosCreated[i][len(utxosCreated[i])-1] < utxosCreated[j][len(utxosCreated[j])-1]
})
}
newCollisions := make([][]byte, 0)
// Start by grabbing all the UTXOs created in the block (that are still in the UTXO set)
for _, key := range utxosCreated {
if len(key) == 0 {
logger.Errorf("Empty key found, denomination: %d", denomination)
continue
}
if checkDenom && (len(key) == rawdb.UtxoKeyWithDenominationLength || len(key) == rawdb.PrunedUtxoKeyWithDenominationLength) {
if key[len(key)-1] != denomination {
if key[len(key)-1] > denomination {
break // The keys are stored in order of denomination, so we can stop checking here
} else {
continue
}
} else {
key = key[:len(key)-1] // remove the denomination byte
}
}
i := 0
it := chain.Database().NewIterator(key, nil)
for it.Next() {
data := it.Value()
if len(data) == 0 {
logger.Infof("Empty key found, denomination: %d", denomination)
continue
}
utxoProto := new(types.ProtoTxOut)
Expand All @@ -751,35 +807,33 @@ func TrimBlock(chain consensus.ChainHeaderReader, batch ethdb.Batch, checkDenomi
}).Error("Invalid utxo Proto")
continue
}
if checkDenomination && utxo.Denomination != denomination {
if checkDenom && utxo.Denomination != denomination {
continue
}
txHash, index, err := rawdb.ReverseUtxoKey(it.Key())
if err != nil {
logger.WithField("err", err).Error("Failed to parse utxo key")
continue
}
utxos[txHash] = append(utxos[txHash], &UtxoEntryWithIndex{utxo, index, it.Key()})
}
it.Release()
}

// Next, check if they are eligible for deletion and delete them
for txHash, utxoEntries := range utxos {
blockNumberForTx := rawdb.ReadTxLookupEntry(chain.Database(), txHash)
if blockNumberForTx != nil && *blockNumberForTx != blockHeight { // collision, wrong tx
logger.Infof("Collision: tx %s was created in block %d, but is in block %d", txHash.String(), *blockNumberForTx, blockHeight)
continue
}
for _, utxo := range utxoEntries {
*utxosDelete = append(*utxosDelete, types.UTXOHash(txHash, utxo.Index, utxo.UtxoEntry))
lock.Lock()
*utxosDelete = append(*utxosDelete, types.UTXOHash(txHash, index, utxo))
if deleteFromDb {
batch.Delete(utxo.Key)
*trimmedUtxos = append(*trimmedUtxos, &types.SpentUtxoEntry{OutPoint: types.OutPoint{txHash, utxo.Index}, UtxoEntry: utxo.UtxoEntry})
batch.Delete(it.Key())
*trimmedUtxos = append(*trimmedUtxos, &types.SpentUtxoEntry{OutPoint: types.OutPoint{txHash, index}, UtxoEntry: utxo})
}
*utxoSetSize--
lock.Unlock()
i++
if i >= types.MaxTrimCollisionsPerKeyPerBlock {
// This will rarely ever happen, but if it does, we should continue trimming this key in the next block
logger.WithField("blockHeight", blockHeight).Error("MaxTrimCollisionsPerBlock exceeded")
newCollisions = append(newCollisions, key)
break
}
}
it.Release()
}
return newCollisions
}

func UpdateTrimDepths(trimDepths map[uint8]uint64, utxoSetSize uint64) bool {
Expand Down
25 changes: 15 additions & 10 deletions core/chain_indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -351,8 +351,15 @@ func (c *ChainIndexer) PruneOldBlockData(blockHeight uint64) {
// Don't keep it if it can't be decoded into UtxoEntry
continue
}*/
// Reduce key size to 8 bytes
key = key[:8]
if len(key) == rawdb.UtxoKeyWithDenominationLength {
if key[rawdb.UtxoKeyWithDenominationLength-1] > types.MaxTrimDenomination {
// Don't keep it if the denomination is not trimmed
continue
}
key[rawdb.PrunedUtxoKeyWithDenominationLength-1] = key[rawdb.UtxoKeyWithDenominationLength-1] // place the denomination at the end of the key
}
// Reduce key size to 9 bytes
key = key[:rawdb.PrunedUtxoKeyWithDenominationLength]
createdUtxosToKeep = append(createdUtxosToKeep, key)
}
rawdb.WritePrunedUTXOKeys(c.chainDb, blockHeight, createdUtxosToKeep)
Expand All @@ -374,6 +381,7 @@ func (c *ChainIndexer) PruneOldBlockData(blockHeight uint64) {
rawdb.DeleteSpentUTXOs(c.chainDb, blockHash)
rawdb.DeleteTrimmedUTXOs(c.chainDb, blockHash)
rawdb.DeleteTrimDepths(c.chainDb, blockHash)
rawdb.DeleteCollidingKeys(c.chainDb, blockHash)
}

func (c *ChainIndexer) UTXOKeyPruner() {
Expand All @@ -400,7 +408,7 @@ func (c *ChainIndexer) UTXOKeyPruner() {
}
key := rawdb.UtxoKey(spentUtxo.TxHash, spentUtxo.Index)
for i := 0; i < len(utxoKeys); i++ {
if compareMinLength(utxoKeys[i], key) {
if comparePrunedKeyWithFullKey(utxoKeys[i], key) {
// Remove the key by shifting the slice to the left
utxoKeys = append(utxoKeys[:i], utxoKeys[i+1:]...)
break
Expand All @@ -415,14 +423,11 @@ func (c *ChainIndexer) UTXOKeyPruner() {
}
}

func compareMinLength(a, b []byte) bool {
minLen := len(a)
if len(b) < minLen {
minLen = len(b)
}
func comparePrunedKeyWithFullKey(a, b []byte) bool {

// Compare the slices up to the length of the shorter slice
for i := 0; i < minLen; i++ {
// Compare the slices up to the length of the pruned key
// The 9th byte (position 8) is the denomination in the pruned utxo key
for i := 0; i < rawdb.PrunedUtxoKeyWithDenominationLength-1; i++ {
if a[i] != b[i] {
return false
}
Expand Down
3 changes: 3 additions & 0 deletions core/headerchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -423,6 +423,9 @@ func (hc *HeaderChain) SetCurrentHeader(head *types.WorkObject) error {
return err
}
for _, key := range utxoKeys {
if len(key) == rawdb.UtxoKeyWithDenominationLength {
key = key[:rawdb.UtxoKeyLength] // The last byte of the key is the denomination (but only in CreatedUTXOKeys)
}
hc.headerDb.Delete(key)
}
}
Expand Down
34 changes: 34 additions & 0 deletions core/rawdb/accessors_chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package rawdb

import (
"encoding/binary"
"sort"

"github.com/dominant-strategies/go-quai/common"
"github.com/dominant-strategies/go-quai/core/types"
Expand Down Expand Up @@ -1341,6 +1342,10 @@ func DeleteSpentUTXOs(db ethdb.KeyValueWriter, blockHash common.Hash) {
}

func WriteCreatedUTXOKeys(db ethdb.KeyValueWriter, blockHash common.Hash, createdUTXOKeys [][]byte) error {
// Sort each key by the denomination in the key
sort.Slice(createdUTXOKeys, func(i, j int) bool {
return createdUTXOKeys[i][len(createdUTXOKeys[i])-1] < createdUTXOKeys[j][len(createdUTXOKeys[j])-1] // the last byte is the denomination
})
protoKeys := &types.ProtoKeys{Keys: make([][]byte, 0, len(createdUTXOKeys))}

protoKeys.Keys = append(protoKeys.Keys, createdUTXOKeys...)
Expand Down Expand Up @@ -1513,3 +1518,32 @@ func DeleteTrimDepths(db ethdb.KeyValueWriter, blockHash common.Hash) {
db.Logger().WithField("err", err).Fatal("Failed to delete trim depths")
}
}

func ReadCollidingKeys(db ethdb.Reader, blockHash common.Hash) ([][]byte, error) {
data, _ := db.Get(collidingKeysKey(blockHash))
if len(data) == 0 {
return nil, nil
}
protoKeys := new(types.ProtoKeys)
if err := proto.Unmarshal(data, protoKeys); err != nil {
return nil, err
}
return protoKeys.Keys, nil
}

func WriteCollidingKeys(db ethdb.KeyValueWriter, blockHash common.Hash, keys [][]byte) error {
protoKeys := &types.ProtoKeys{Keys: make([][]byte, 0, len(keys))}
protoKeys.Keys = append(protoKeys.Keys, keys...)

data, err := proto.Marshal(protoKeys)
if err != nil {
db.Logger().WithField("err", err).Fatal("Failed to rlp encode utxo")
}
return db.Put(collidingKeysKey(blockHash), data)
}

func DeleteCollidingKeys(db ethdb.KeyValueWriter, blockHash common.Hash) {
if err := db.Delete(collidingKeysKey(blockHash)); err != nil {
db.Logger().WithField("err", err).Fatal("Failed to delete colliding keys")
}
}
16 changes: 16 additions & 0 deletions core/rawdb/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ var (
spentUTXOsPrefix = []byte("sutxo") // spentUTXOsPrefix + hash -> []types.SpentTxOut
trimmedUTXOsPrefix = []byte("tutxo") // trimmedUTXOsPrefix + hash -> []types.SpentTxOut
trimDepthsPrefix = []byte("td") // trimDepthsPrefix + hash -> uint64
collidingKeysPrefix = []byte("ck") // collidingKeysPrefix + hash -> [][]byte
createdUTXOsPrefix = []byte("cutxo") // createdUTXOsPrefix + hash -> []common.Hash
prunedUTXOKeysPrefix = []byte("putxo") // prunedUTXOKeysPrefix + num (uint64 big endian) -> hash
utxoSetSizePrefix = []byte("us") // utxoSetSizePrefix + hash -> uint64
Expand Down Expand Up @@ -326,6 +327,8 @@ func addressUtxosKey(address string) []byte {
return append(AddressUtxosPrefix, address[:]...)
}

var UtxoKeyLength = len(utxoPrefix) + common.HashLength + 2

// This can be optimized via VLQ encoding as btcd has done
// this key is 36 bytes long and can probably be reduced to 32 bytes
func UtxoKey(hash common.Hash, index uint16) []byte {
Expand All @@ -334,6 +337,15 @@ func UtxoKey(hash common.Hash, index uint16) []byte {
return append(utxoPrefix, append(hash.Bytes(), indexBytes...)...)
}

var UtxoKeyWithDenominationLength = len(utxoPrefix) + common.HashLength + 3
var PrunedUtxoKeyWithDenominationLength = 9

func UtxoKeyWithDenomination(hash common.Hash, index uint16, denomination uint8) []byte {
indexBytes := make([]byte, 2)
binary.BigEndian.PutUint16(indexBytes, index)
return append(utxoPrefix, append(hash.Bytes(), append(indexBytes, denomination)...)...)
}

func ReverseUtxoKey(key []byte) (common.Hash, uint16, error) {
if len(key) != len(utxoPrefix)+common.HashLength+2 {
return common.Hash{}, 0, fmt.Errorf("invalid key length %d", len(key))
Expand Down Expand Up @@ -374,3 +386,7 @@ func lastTrimmedBlockKey(hash common.Hash) []byte {
func trimDepthsKey(hash common.Hash) []byte {
return append(trimDepthsPrefix, hash.Bytes()...)
}

func collidingKeysKey(hash common.Hash) []byte {
return append(collidingKeysPrefix, hash.Bytes()...)
}
8 changes: 4 additions & 4 deletions core/state_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -423,7 +423,7 @@ func (p *StateProcessor) Process(block *types.WorkObject, batch ethdb.Batch) (ty
return nil, nil, nil, nil, 0, nil, 0, err
}
utxosCreatedDeleted.UtxosCreatedHashes = append(utxosCreatedDeleted.UtxosCreatedHashes, types.UTXOHash(etx.Hash(), outputIndex, utxo))
utxosCreatedDeleted.UtxosCreatedKeys = append(utxosCreatedDeleted.UtxosCreatedKeys, rawdb.UtxoKey(etx.Hash(), outputIndex))
utxosCreatedDeleted.UtxosCreatedKeys = append(utxosCreatedDeleted.UtxosCreatedKeys, rawdb.UtxoKeyWithDenomination(etx.Hash(), outputIndex, utxo.Denomination))
p.logger.Debugf("Creating UTXO for coinbase %032x with denomination %d index %d\n", tx.Hash(), denomination, outputIndex)
outputIndex++
}
Expand Down Expand Up @@ -479,7 +479,7 @@ func (p *StateProcessor) Process(block *types.WorkObject, batch ethdb.Batch) (ty
return nil, nil, nil, nil, 0, nil, 0, err
}
utxosCreatedDeleted.UtxosCreatedHashes = append(utxosCreatedDeleted.UtxosCreatedHashes, types.UTXOHash(etx.Hash(), outputIndex, utxo))
utxosCreatedDeleted.UtxosCreatedKeys = append(utxosCreatedDeleted.UtxosCreatedKeys, rawdb.UtxoKey(etx.Hash(), outputIndex))
utxosCreatedDeleted.UtxosCreatedKeys = append(utxosCreatedDeleted.UtxosCreatedKeys, rawdb.UtxoKeyWithDenomination(etx.Hash(), outputIndex, utxo.Denomination))
p.logger.Infof("Converting Quai to Qi %032x with denomination %d index %d lock %d\n", tx.Hash(), denomination, outputIndex, lock)
outputIndex++
}
Expand All @@ -491,7 +491,7 @@ func (p *StateProcessor) Process(block *types.WorkObject, batch ethdb.Batch) (ty
return nil, nil, nil, nil, 0, nil, 0, err
}
utxosCreatedDeleted.UtxosCreatedHashes = append(utxosCreatedDeleted.UtxosCreatedHashes, types.UTXOHash(etx.OriginatingTxHash(), etx.ETXIndex(), utxo))
utxosCreatedDeleted.UtxosCreatedKeys = append(utxosCreatedDeleted.UtxosCreatedKeys, rawdb.UtxoKey(etx.OriginatingTxHash(), etx.ETXIndex()))
utxosCreatedDeleted.UtxosCreatedKeys = append(utxosCreatedDeleted.UtxosCreatedKeys, rawdb.UtxoKeyWithDenomination(etx.OriginatingTxHash(), etx.ETXIndex(), utxo.Denomination))
// This Qi ETX should cost more gas
if err := gp.SubGas(params.CallValueTransferGas); err != nil {
return nil, nil, nil, nil, 0, nil, 0, err
Expand Down Expand Up @@ -1081,7 +1081,7 @@ func ProcessQiTx(tx *types.Transaction, chain ChainContext, checkSig bool, curre
return nil, nil, err, nil
}
utxosCreatedDeleted.UtxosCreatedHashes = append(utxosCreatedDeleted.UtxosCreatedHashes, types.UTXOHash(tx.Hash(), uint16(txOutIdx), utxo))
utxosCreatedDeleted.UtxosCreatedKeys = append(utxosCreatedDeleted.UtxosCreatedKeys, rawdb.UtxoKey(tx.Hash(), uint16(txOutIdx)))
utxosCreatedDeleted.UtxosCreatedKeys = append(utxosCreatedDeleted.UtxosCreatedKeys, rawdb.UtxoKeyWithDenomination(tx.Hash(), uint16(txOutIdx), utxo.Denomination))
}
}
elapsedTime = time.Since(stepStart)
Expand Down
4 changes: 3 additions & 1 deletion core/types/utxo.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@ import (
const (
MaxDenomination = 16

MaxOutputIndex = math.MaxUint16
MaxOutputIndex = math.MaxUint16
MaxTrimDenomination = 8
MaxTrimCollisionsPerKeyPerBlock = 1000
)

var MaxQi = new(big.Int).Mul(big.NewInt(math.MaxInt64), big.NewInt(params.Ether)) // This is just a default; determine correct value later
Expand Down

0 comments on commit 0ca58f5

Please sign in to comment.