Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

triedb/pathdb: introduce lookup structure to optimize node query #30557

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ var (

snapshotCommitTimer = metrics.NewRegisteredResettingTimer("chain/snapshot/commits", nil)
triedbCommitTimer = metrics.NewRegisteredResettingTimer("chain/triedb/commits", nil)
prefetchWaitTimer = metrics.NewRegisteredResettingTimer("chain/prefetch/wait", nil)

blockInsertTimer = metrics.NewRegisteredResettingTimer("chain/inserts", nil)
blockValidationTimer = metrics.NewRegisteredResettingTimer("chain/validation", nil)
Expand Down Expand Up @@ -1952,12 +1953,13 @@ func (bc *BlockChain) processBlock(block *types.Block, statedb *state.StateDB, s
if statedb.StorageLoaded != 0 {
storageReadSingleTimer.Update(statedb.StorageReads / time.Duration(statedb.StorageLoaded))
}
accountUpdateTimer.Update(statedb.AccountUpdates) // Account updates are complete(in validation)
accountUpdateTimer.Update(statedb.AccountUpdates - statedb.PrefetcherWait) // Account updates are complete(in validation)
storageUpdateTimer.Update(statedb.StorageUpdates) // Storage updates are complete(in validation)
accountHashTimer.Update(statedb.AccountHashes) // Account hashes are complete(in validation)
triehash := statedb.AccountHashes // The time spent on tries hashing
trieUpdate := statedb.AccountUpdates + statedb.StorageUpdates // The time spent on tries update
blockExecutionTimer.Update(ptime - (statedb.AccountReads + statedb.StorageReads)) // The time spent on EVM processing
prefetchWaitTimer.Update(statedb.PrefetcherWait) // The time spent on waiting prefetcher to finish preload tasks
blockValidationTimer.Update(vtime - (triehash + trieUpdate)) // The time spent on block validation
blockCrossValidationTimer.Update(xvtime) // The time spent on stateless cross validation

Expand Down
17 changes: 10 additions & 7 deletions core/state/statedb.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,13 +142,15 @@ type StateDB struct {
witness *stateless.Witness

// Measurements gathered during execution for debugging purposes
AccountReads time.Duration
AccountHashes time.Duration
AccountUpdates time.Duration
AccountCommits time.Duration
StorageReads time.Duration
StorageUpdates time.Duration
StorageCommits time.Duration
AccountReads time.Duration
AccountHashes time.Duration
AccountUpdates time.Duration
AccountCommits time.Duration
StorageReads time.Duration
StorageUpdates time.Duration
StorageCommits time.Duration

PrefetcherWait time.Duration
SnapshotCommits time.Duration
TrieDBCommits time.Duration

Expand Down Expand Up @@ -866,6 +868,7 @@ func (s *StateDB) IntermediateRoot(deleteEmptyObjects bool) common.Hash {
} else {
s.trie = trie
}
s.PrefetcherWait = time.Since(start)
}
// Perform updates before deletions. This prevents resolution of unnecessary trie nodes
// in circumstances similar to the following:
Expand Down
21 changes: 19 additions & 2 deletions core/state/trie_prefetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package state
import (
"errors"
"sync"
"time"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/log"
Expand Down Expand Up @@ -55,13 +56,15 @@ type triePrefetcher struct {
accountDupWriteMeter metrics.Meter
accountDupCrossMeter metrics.Meter
accountWasteMeter metrics.Meter
accountLoadTimer metrics.ResettingTimer

storageLoadReadMeter metrics.Meter
storageLoadWriteMeter metrics.Meter
storageDupReadMeter metrics.Meter
storageDupWriteMeter metrics.Meter
storageDupCrossMeter metrics.Meter
storageWasteMeter metrics.Meter
storageLoadTimer metrics.ResettingTimer
}

func newTriePrefetcher(db Database, root common.Hash, namespace string, noreads bool) *triePrefetcher {
Expand All @@ -78,13 +81,15 @@ func newTriePrefetcher(db Database, root common.Hash, namespace string, noreads

accountLoadReadMeter: metrics.GetOrRegisterMeter(prefix+"/account/load/read", nil),
accountLoadWriteMeter: metrics.GetOrRegisterMeter(prefix+"/account/load/write", nil),
accountLoadTimer: metrics.GetOrRegisterResettingTimer(prefix+"/account/load/time", nil),
accountDupReadMeter: metrics.GetOrRegisterMeter(prefix+"/account/dup/read", nil),
accountDupWriteMeter: metrics.GetOrRegisterMeter(prefix+"/account/dup/write", nil),
accountDupCrossMeter: metrics.GetOrRegisterMeter(prefix+"/account/dup/cross", nil),
accountWasteMeter: metrics.GetOrRegisterMeter(prefix+"/account/waste", nil),

storageLoadReadMeter: metrics.GetOrRegisterMeter(prefix+"/storage/load/read", nil),
storageLoadWriteMeter: metrics.GetOrRegisterMeter(prefix+"/storage/load/write", nil),
storageLoadTimer: metrics.GetOrRegisterResettingTimer(prefix+"/storage/load/time", nil),
storageDupReadMeter: metrics.GetOrRegisterMeter(prefix+"/storage/dup/read", nil),
storageDupWriteMeter: metrics.GetOrRegisterMeter(prefix+"/storage/dup/write", nil),
storageDupCrossMeter: metrics.GetOrRegisterMeter(prefix+"/storage/dup/cross", nil),
Expand Down Expand Up @@ -120,7 +125,10 @@ func (p *triePrefetcher) report() {
if fetcher.root == p.root {
p.accountLoadReadMeter.Mark(int64(len(fetcher.seenRead)))
p.accountLoadWriteMeter.Mark(int64(len(fetcher.seenWrite)))

total := len(fetcher.seenRead) + len(fetcher.seenWrite)
if total > 0 {
p.accountLoadTimer.Update(fetcher.readTime / time.Duration(total))
}
p.accountDupReadMeter.Mark(int64(fetcher.dupsRead))
p.accountDupWriteMeter.Mark(int64(fetcher.dupsWrite))
p.accountDupCrossMeter.Mark(int64(fetcher.dupsCross))
Expand All @@ -133,7 +141,10 @@ func (p *triePrefetcher) report() {
} else {
p.storageLoadReadMeter.Mark(int64(len(fetcher.seenRead)))
p.storageLoadWriteMeter.Mark(int64(len(fetcher.seenWrite)))

total := len(fetcher.seenRead) + len(fetcher.seenWrite)
if total > 0 {
p.storageLoadTimer.Update(fetcher.readTime / time.Duration(total))
}
p.storageDupReadMeter.Mark(int64(fetcher.dupsRead))
p.storageDupWriteMeter.Mark(int64(fetcher.dupsWrite))
p.storageDupCrossMeter.Mark(int64(fetcher.dupsCross))
Expand Down Expand Up @@ -237,6 +248,7 @@ type subfetcher struct {

seenRead map[string]struct{} // Tracks the entries already loaded via read operations
seenWrite map[string]struct{} // Tracks the entries already loaded via write operations
readTime time.Duration // Total time spent on resolving states

dupsRead int // Number of duplicate preload tasks via reads only
dupsWrite int // Number of duplicate preload tasks via writes only
Expand Down Expand Up @@ -378,6 +390,7 @@ func (sf *subfetcher) loop() {
sf.tasks = nil
sf.lock.Unlock()

start := time.Now()
for _, task := range tasks {
key := string(task.key)
if task.read {
Expand Down Expand Up @@ -410,6 +423,10 @@ func (sf *subfetcher) loop() {
sf.seenWrite[key] = struct{}{}
}
}
// Count the time being spent on state resolving. While it's not very
// accurate due to some additional operations (e.g., filter out duplicated
// task), but it's already good enough for monitoring.
sf.readTime += time.Since(start)

case <-sf.stop:
// Termination is requested, abort if no more tasks are pending. If
Expand Down
3 changes: 3 additions & 0 deletions triedb/pathdb/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,9 @@ type layer interface {
// This is meant to be used during shutdown to persist the layer without
// flattening everything down (bad for reorgs).
journal(w io.Writer) error

// isStale returns whether this layer has become stale or if it's still live.
isStale() bool
}

// Config contains the settings for database.
Expand Down
29 changes: 26 additions & 3 deletions triedb/pathdb/difflayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@ type diffLayer struct {
memory uint64 // Approximate guess as to how much memory we use

parent layer // Parent layer modified by this one, never nil, **can be changed**
lock sync.RWMutex // Lock used to protect parent
stale bool // Signals that the layer became stale (referenced disk layer became stale)
lock sync.RWMutex // Lock used to protect parent and stale fields
}

// newDiffLayer creates a new diff layer on top of an existing layer.
Expand Down Expand Up @@ -95,6 +96,25 @@ func (dl *diffLayer) parentLayer() layer {
return dl.parent
}

// isStale returns whether this layer has become stale or if it's still live.
func (dl *diffLayer) isStale() bool {
dl.lock.RLock()
defer dl.lock.RUnlock()

return dl.stale
}

// markStale sets the stale flag as true.
func (dl *diffLayer) markStale() {
dl.lock.Lock()
defer dl.lock.Unlock()

if dl.stale {
panic("triedb diff layer is stale")
}
dl.stale = true
}

// node implements the layer interface, retrieving the trie node blob with the
// provided node information. No error will be returned if the node is not found.
func (dl *diffLayer) node(owner common.Hash, path []byte, depth int) ([]byte, common.Hash, *nodeLoc, error) {
Expand All @@ -103,6 +123,9 @@ func (dl *diffLayer) node(owner common.Hash, path []byte, depth int) ([]byte, co
dl.lock.RLock()
defer dl.lock.RUnlock()

if dl.stale {
return nil, common.Hash{}, nil, errSnapshotStale
}
// If the trie node is known locally, return it
subset, ok := dl.nodes[owner]
if ok {
Expand All @@ -125,7 +148,7 @@ func (dl *diffLayer) update(root common.Hash, id uint64, block uint64, nodes map
}

// persist flushes the diff layer and all its parent layers to disk layer.
func (dl *diffLayer) persist(force bool) (layer, error) {
func (dl *diffLayer) persist(force bool) (*diskLayer, error) {
if parent, ok := dl.parentLayer().(*diffLayer); ok {
// Hold the lock to prevent any read operation until the new
// parent is linked correctly.
Expand All @@ -147,7 +170,7 @@ func (dl *diffLayer) persist(force bool) (layer, error) {

// diffToDisk merges a bottom-most diff into the persistent disk layer underneath
// it. The method will panic if called onto a non-bottom-most diff layer.
func diffToDisk(layer *diffLayer, force bool) (layer, error) {
func diffToDisk(layer *diffLayer, force bool) (*diskLayer, error) {
disk, ok := layer.parentLayer().(*diskLayer)
if !ok {
panic(fmt.Sprintf("unknown layer type: %T", layer.parentLayer()))
Expand Down
2 changes: 1 addition & 1 deletion triedb/pathdb/disklayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func (dl *diskLayer) parentLayer() layer {
return nil
}

// isStale return whether this layer has become stale (was flattened across) or if
// isStale returns whether this layer has become stale (was flattened across) or if
// it's still live.
func (dl *diskLayer) isStale() bool {
dl.lock.RLock()
Expand Down
Loading