Skip to content

Commit

Permalink
feat: fill parlia db from snapshots (#463)
Browse files Browse the repository at this point in the history
  • Loading branch information
MatusKysel authored Jul 30, 2024
1 parent a9df09c commit fe4358e
Show file tree
Hide file tree
Showing 10 changed files with 46 additions and 164 deletions.
2 changes: 1 addition & 1 deletion cmd/integration/commands/stages.go
Original file line number Diff line number Diff line change
Expand Up @@ -1670,7 +1670,7 @@ func newSync(ctx context.Context, db kv.RwDB, miningConfig *params.MiningConfig,
}

stages := stages2.NewDefaultStages(context.Background(), db, snapDb, blobStore, p2p.Config{}, &cfg, sentryControlServer, notifications, nil, blockReader, blockRetire, agg, nil, nil,
heimdallClient, recents, signatures, logger)
engine, heimdallClient, recents, signatures, logger)
sync := stagedsync.New(cfg.Sync, stages, stagedsync.DefaultUnwindOrder, stagedsync.DefaultPruneOrder, logger)

miner := stagedsync.NewMiningState(&cfg.Miner)
Expand Down
132 changes: 5 additions & 127 deletions consensus/parlia/parlia.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package parlia

import (
"bytes"
"context"
"encoding/hex"
"errors"
"fmt"
Expand All @@ -29,7 +28,6 @@ import (
"github.com/holiman/uint256"
"github.com/ledgerwatch/erigon-lib/common/hexutility"
"github.com/ledgerwatch/erigon-lib/kv"
"github.com/ledgerwatch/erigon/core/rawdb"
"github.com/ledgerwatch/erigon/core/state"
"github.com/ledgerwatch/erigon/core/types"
"github.com/ledgerwatch/log/v3"
Expand Down Expand Up @@ -218,7 +216,6 @@ type Parlia struct {
genesisHash libcommon.Hash
db kv.RwDB // Database to store and retrieve snapshot checkpoints
BlobStore services.BlobStorage
chainDb kv.RwDB

recentSnaps *lru.ARCCache[libcommon.Hash, *Snapshot] // Snapshots for recent block to speed up
signatures *lru.ARCCache[libcommon.Hash, libcommon.Address] // Signatures of recent blocks to speed up mining
Expand Down Expand Up @@ -248,7 +245,6 @@ func New(
db kv.RwDB,
blobStore services.BlobStorage,
blockReader services.FullBlockReader,
chainDb kv.RwDB,
logger log.Logger,
) *Parlia {
// get parlia config
Expand Down Expand Up @@ -289,7 +285,6 @@ func New(
config: parliaConfig,
db: db,
BlobStore: blobStore,
chainDb: chainDb,
recentSnaps: recentSnaps,
signatures: signatures,
validatorSetABIBeforeLuban: vABIBeforeLuban,
Expand Down Expand Up @@ -657,24 +652,19 @@ func (p *Parlia) verifyCascadingFields(chain consensus.ChainHeaderReader, header
}

// All basic checks passed, verify the seal and return
return p.verifySeal(chain, header, parents)
return p.verifySeal(header, snap)
}

// verifySeal checks whether the signature contained in the header satisfies the
// consensus protocol requirements. The method accepts an optional list of parent
// headers that aren't yet part of the local blockchain to generate the snapshots
// from.
func (p *Parlia) verifySeal(chain consensus.ChainHeaderReader, header *types.Header, parents []*types.Header) error {
func (p *Parlia) verifySeal(header *types.Header, snap *Snapshot) error {
// Verifying the genesis block is not supported
number := header.Number.Uint64()
if number == 0 {
return errUnknownBlock
}
// Retrieve the snapshot needed to verify this header and cache it
snap, err := p.snapshot(chain, number-1, header.ParentHash, parents, true /* verify */)
if err != nil {
return err
}

// Resolve the authorization key and check against validators
signer, err := ecrecover(header, p.signatures, p.chainConfig.ChainID)
Expand Down Expand Up @@ -768,15 +758,8 @@ func (p *Parlia) snapshot(chain consensus.ChainHeaderReader, number uint64, hash
var (
headers []*types.Header
snap *Snapshot
doLog bool
)

if s, ok := p.recentSnaps.Get(hash); ok {
snap = s
} else {
doLog = true
}

for snap == nil {
// If an in-memory snapshot was found, use that
if s, ok := p.recentSnaps.Get(hash); ok {
Expand All @@ -787,7 +770,7 @@ func (p *Parlia) snapshot(chain consensus.ChainHeaderReader, number uint64, hash
// If an on-disk checkpoint snapshot can be found, use that
if number%CheckpointInterval == 0 {
if s, err := loadSnapshot(p.config, p.signatures, p.db, number, hash); err == nil {
//log.Trace("Loaded snapshot from disk", "number", number, "hash", hash)
log.Trace("Loaded snapshot from disk", "number", number, "hash", hash)
snap = s
if !verify || snap != nil {
break
Expand Down Expand Up @@ -824,10 +807,6 @@ func (p *Parlia) snapshot(chain consensus.ChainHeaderReader, number uint64, hash
}
parents = parents[:len(parents)-1]
} else {
if doLog && number%100_000 == 0 {
// No explicit parents (or no more left), reach out to the database
p.logger.Info("[parlia] snapshots build, gather headers", "block", number)
}
header = chain.GetHeader(hash, number)
if header == nil {
return nil, fmt.Errorf("header = %v, hash = %v, err = %v", number, hash, consensus.ErrUnknownAncestor)
Expand All @@ -847,18 +826,17 @@ func (p *Parlia) snapshot(chain consensus.ChainHeaderReader, number uint64, hash
headers[i], headers[len(headers)-1-i] = headers[len(headers)-1-i], headers[i]
}

snap, err := snap.apply(headers, chain, parents, p.chainConfig, doLog)
snap, err := snap.apply(headers, chain, parents, p.chainConfig, p.recentSnaps)
if err != nil {
return nil, err
}
p.recentSnaps.Add(snap.Hash, snap)

// If we've generated a new checkpoint snapshot, save to disk
if verify && snap.Number%CheckpointInterval == 0 && len(headers) > 0 {
if err = snap.store(p.db); err != nil {
return nil, err
}
//log.Trace("Stored snapshot to disk", "number", snap.Number, "hash", snap.Hash)
log.Trace("Stored snapshot to disk", "number", snap.Number, "hash", snap.Hash)
}
return snap, err
}
Expand Down Expand Up @@ -1241,82 +1219,6 @@ func (p *Parlia) Authorize(val libcommon.Address, signFn SignFn) {
// Note, the method returns immediately and will send the result async. More
// than one result may also be returned depending on the consensus algorithm.
func (p *Parlia) Seal(chain consensus.ChainHeaderReader, block *types.Block, results chan<- *types.Block, stop <-chan struct{}) error {
header := block.Header()

// Sealing the genesis block is not supported
number := header.Number.Uint64()
if number == 0 {
return errUnknownBlock
}
// For 0-period chains, refuse to seal empty blocks (no reward but would spin sealing)
if p.config.Period == 0 && len(block.Transactions()) == 0 {
p.logger.Info("[parlia] Sealing paused, waiting for transactions")
return nil
}
// Don't hold the val fields for the entire sealing procedure
p.signerLock.RLock()
val, signFn := p.val, p.signFn
p.signerLock.RUnlock()

snap, err := p.snapshot(chain, number-1, header.ParentHash, nil, false /* verify */)
if err != nil {
return err
}

// Bail out if we're unauthorized to sign a block
if _, authorized := snap.Validators[val]; !authorized {
return fmt.Errorf("parlia.Seal: %w", errUnauthorizedValidator)
}

// If we're amongst the recent signers, wait for the next block
for seen, recent := range snap.Recents {
if recent == val {
// Signer is among recent, only wait if the current block doesn't shift it out
if limit := uint64(len(snap.Validators)/2 + 1); number < limit || seen > number-limit {
p.logger.Info("[parlia] Signed recently, must wait for others")
return nil
}
}
}

// Sweet, the protocol permits us to sign the block, wait for our time
delay := p.delayForRamanujanFork(snap, header)

p.logger.Info("Sealing block with", "number", number, "delay", delay, "headerDifficulty", header.Difficulty, "val", val.Hex(), "headerHash", header.Hash().Hex(), "gasUsed", header.GasUsed, "block txn number", block.Transactions().Len(), "State Root", header.Root)

// Sign all the things!
sig, err := signFn(val, crypto.Keccak256(parliaRLP(header, p.chainConfig.ChainID)), p.chainConfig.ChainID)
if err != nil {
return err
}
copy(header.Extra[len(header.Extra)-extraSeal:], sig)

// Wait until sealing is terminated or delay timeout.
//log.Trace("Waiting for slot to sign and propagate", "delay", common.PrettyDuration(delay))
go func() {
select {
case <-stop:
return
case <-time.After(delay):
}
if p.shouldWaitForCurrentBlockProcess(p.chainDb, header, snap) {
p.logger.Info("[parlia] Waiting for received in turn block to process")
select {
case <-stop:
p.logger.Info("[parlia] Received block process finished, abort block seal")
return
case <-time.After(time.Duration(processBackOffTime) * time.Second):
p.logger.Info("[parlia] Process backoff time exhausted, start to seal block")
}
}

select {
case results <- block.WithSeal(header):
default:
p.logger.Warn("[parlia] Sealing result is not read by miner", "sealhash", types.SealHash(header, p.chainConfig.ChainID))
}
}()

return nil
}

Expand Down Expand Up @@ -1419,30 +1321,6 @@ func (p *Parlia) IsSystemContract(to *libcommon.Address) bool {
return isToSystemContract(*to)
}

func (p *Parlia) shouldWaitForCurrentBlockProcess(chainDb kv.RwDB, header *types.Header, snap *Snapshot) bool {
if header.Difficulty.Cmp(diffInTurn) == 0 {
return false
}

roTx, err := chainDb.BeginRo(context.Background())
if err != nil {
return false
}
defer roTx.Rollback()
hash := rawdb.ReadHeadHeaderHash(roTx)
number := rawdb.ReadHeaderNumber(roTx, hash)

highestVerifiedHeader := rawdb.ReadHeader(roTx, hash, *number)
if highestVerifiedHeader == nil {
return false
}

if header.ParentHash == highestVerifiedHeader.ParentHash {
return true
}
return false
}

func (p *Parlia) EnoughDistance(chain consensus.ChainReader, header *types.Header) bool {
snap, err := p.snapshot(chain, header.Number.Uint64()-1, header.ParentHash, nil, false /* verify */)
if err != nil {
Expand Down
19 changes: 0 additions & 19 deletions consensus/parlia/ramanujanfork.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,31 +2,12 @@ package parlia

import (
"fmt"
"math/rand"
"time"

"github.com/ledgerwatch/erigon/consensus"
"github.com/ledgerwatch/erigon/core/types"
)

const (
wiggleTimeBeforeFork = 500 * time.Millisecond // Random delay (per signer) to allow concurrent signers
fixedBackOffTimeBeforeFork = 200 * time.Millisecond
)

func (p *Parlia) delayForRamanujanFork(snap *Snapshot, header *types.Header) time.Duration {
delay := time.Until(time.Unix(int64(header.Time), 0)) // nolint: gosimple
if p.chainConfig.IsRamanujan(header.Number.Uint64()) {
return delay
}
if header.Difficulty.Cmp(diffNoTurn) == 0 {
// It's not our turn explicitly to sign, delay it a bit
wiggle := time.Duration(len(snap.Validators)/2+1) * wiggleTimeBeforeFork
delay += fixedBackOffTimeBeforeFork + time.Duration(rand.Int63n(int64(wiggle))) // nolint
}
return delay
}

func (p *Parlia) blockTimeForRamanujanFork(snap *Snapshot, header, parent *types.Header) uint64 {
blockTime := parent.Time + p.config.Period
if p.chainConfig.IsRamanujan(header.Number.Uint64()) {
Expand Down
13 changes: 9 additions & 4 deletions consensus/parlia/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ func (s *Snapshot) updateAttestation(header *types.Header, chainConfig *chain.Co
}
}

func (s *Snapshot) apply(headers []*types.Header, chain consensus.ChainHeaderReader, parents []*types.Header, chainConfig *chain.Config, doLog bool) (*Snapshot, error) {
func (s *Snapshot) apply(headers []*types.Header, chain consensus.ChainHeaderReader, parents []*types.Header, chainConfig *chain.Config, recentSnaps *lru.ARCCache[libcommon.Hash, *Snapshot]) (*Snapshot, error) {
// Allow passing in no headers for cleaner code
if len(headers) == 0 {
return s, nil
Expand All @@ -260,7 +260,7 @@ func (s *Snapshot) apply(headers []*types.Header, chain consensus.ChainHeaderRea

for _, header := range headers {
number := header.Number.Uint64()
if doLog && number%100_000 == 0 {
if number%100_000 == 0 {
log.Info("[parlia] snapshots build, recover from headers", "block", number)
}
// Delete the oldest validator from the recent list to allow it signing again
Expand Down Expand Up @@ -365,9 +365,14 @@ func (s *Snapshot) apply(headers []*types.Header, chain consensus.ChainHeaderRea
delete(snap.RecentForkHashes, number-i)
}
}
snap.Number = number
snap.Hash = header.Hash()
if snap.Number+s.config.Epoch >= headers[len(headers)-1].Number.Uint64() {
historySnap := snap.copy()
recentSnaps.Add(historySnap.Hash, historySnap)
}
}
snap.Number += uint64(len(headers))
snap.Hash = headers[len(headers)-1].Hash()

return snap, nil
}

Expand Down
2 changes: 1 addition & 1 deletion core/rawdb/rawdbreset/reset_stages.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ func ResetBlocks(tx kv.RwTx, db kv.RoDB, agg *state.Aggregator,
}

if br.FreezingCfg().Enabled && br.FrozenBlocks() > 0 {
if err := stagedsync.FillDBFromSnapshots("filling_db_from_snapshots", context.Background(), tx, dirs, br, agg, logger); err != nil {
if err := stagedsync.FillDBFromSnapshots("filling_db_from_snapshots", context.Background(), tx, dirs, br, agg, cc, engine, logger); err != nil {
return err
}
_ = stages.SaveStageProgress(tx, stages.Snapshots, br.FrozenBlocks())
Expand Down
4 changes: 2 additions & 2 deletions eth/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -808,7 +808,7 @@ func New(ctx context.Context, stack *node.Node, config *ethconfig.Config, logger
backend.ethBackendRPC, backend.miningRPC, backend.stateChangesClient = ethBackendRPC, miningRPC, stateDiffClient

backend.syncStages = stages2.NewDefaultStages(backend.sentryCtx, backend.chainDB, snapDb, blobStore, p2pConfig, config, backend.sentriesClient, backend.notifications, backend.downloaderClient,
blockReader, blockRetire, backend.agg, backend.silkworm, backend.forkValidator, heimdallClient, recents, signatures, logger)
blockReader, blockRetire, backend.agg, backend.silkworm, backend.forkValidator, backend.engine, heimdallClient, recents, signatures, logger)
backend.syncUnwindOrder = stagedsync.DefaultUnwindOrder
backend.syncPruneOrder = stagedsync.DefaultPruneOrder
backend.stagedSync = stagedsync.New(config.Sync, backend.syncStages, backend.syncUnwindOrder, backend.syncPruneOrder, logger)
Expand All @@ -831,7 +831,7 @@ func New(ctx context.Context, stack *node.Node, config *ethconfig.Config, logger
}

checkStateRoot := true
pipelineStages := stages2.NewPipelineStages(ctx, chainKv, blobStore, config, p2pConfig, backend.sentriesClient, backend.notifications, backend.downloaderClient, blockReader, blockRetire, backend.agg, backend.silkworm, backend.forkValidator, logger, checkStateRoot)
pipelineStages := stages2.NewPipelineStages(ctx, chainKv, blobStore, config, p2pConfig, backend.sentriesClient, backend.notifications, backend.downloaderClient, blockReader, blockRetire, backend.agg, backend.silkworm, backend.forkValidator, backend.engine, logger, checkStateRoot)
backend.pipelineStagedSync = stagedsync.New(config.Sync, pipelineStages, stagedsync.PipelineUnwindOrder, stagedsync.PipelinePruneOrder, logger)
backend.eth1ExecutionServer = eth1.NewEthereumExecutionModule(blockReader, chainKv, backend.pipelineStagedSync, backend.forkValidator, chainConfig, assembleBlockPOS, hook, backend.notifications.Accumulator, backend.notifications.StateChangesConsumer, logger, backend.engine, config.HistoryV3, ctx)
executionRpc := direct.NewExecutionClientDirect(backend.eth1ExecutionServer)
Expand Down
7 changes: 4 additions & 3 deletions eth/ethconsensusconfig/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,12 @@ package ethconsensusconfig

import (
"context"
"github.com/ledgerwatch/erigon/core/blob_storage"
"github.com/spf13/afero"
"math"
"path/filepath"

"github.com/ledgerwatch/erigon/core/blob_storage"
"github.com/spf13/afero"

"github.com/davecgh/go-spew/spew"
"github.com/ledgerwatch/log/v3"

Expand Down Expand Up @@ -122,7 +123,7 @@ func CreateConsensusEngine(ctx context.Context, nodeConfig *nodecfg.Config, chai
}
blobStore := blob_storage.NewBlobStore(blobDb, afero.NewBasePathFs(afero.NewOsFs(), nodeConfig.Dirs.DataDir), math.MaxUint64, chainConfig, blockReader)

eng = parlia.New(chainConfig, db, blobStore, blockReader, chainDb[0], logger)
eng = parlia.New(chainConfig, db, blobStore, blockReader, logger)
}
case *borcfg.BorConfig:
// If Matic bor consensus is requested, set it up
Expand Down
Loading

0 comments on commit fe4358e

Please sign in to comment.