Skip to content

Commit

Permalink
E35 prune small batches (#9088)
Browse files Browse the repository at this point in the history
Follow up to
#9031 (comment)

### Ordering 
`AggregatorV3Context` pruning is happening in following order: 

1. Index pruning started from lowest txNum such that `txFrom <= txn <=
txTo`. Progress is going towards bigger txNumbers.
2. Therefore, History pruning goes in same direction and happens along
with key pruning via callback.
3. Domain pruning starts from `Latest()` key which is the biggest key
available. We use `inverted steps (^step)` as a suffix for domain keys
which gives us an opportunity to prune smallest steps first. So, from
largest available key and smallest available step going backwards to
bigger steps and smaller keys. If for given key we met `savedStep >
pruneStep` we safely going to `PrevNoDup()` key without scanning and
skipping steps.
### Limiting
Pruning progress obviously changes state therefore affects execution -
invalid reads of obsolete values could happen if pruning is broken.
Pruning indices and histories is coupled, since history table is bounded
to index key and txn entries. Since index is a mapping `txNum -> {key,
key', ...}`, looks easier to limit their pruning by `txNums` at once
instead of going through whole list selecting by `limit` keys.
`AggregatorV3Context.PruneSmallBatches()` always set `txFrom=0` since
it's purpose to keep db clean but one step at a time.

domain pruning is limited by amount of keys removed at once. For slow
disks and big db (>150G) domain pruning could be very slow: Database
keep growing, slowing down pruning as well to 100.000 kv's per 10min
session which is not enough to keep db of a constant size. So, using
smaller values for `--batchSize` could solve the problem due to more
frequent call of Prune and small changes put into db.

Domain can be pruned if `savedPruneProgress` key is not for this table
nil, or smallest domain key has values of `savedStep < pruneStep` in
domain files. The downside of looking up onto smallest step is that
smallest key not guaranteed to be changed in each step which could give
us invalid estimate on smallest available key. Saved prune progress
indicates that we did not finished latest cleanup but does not give us
step number. Could be used meta tables which would contain such an info
(smallest step in table?).

#### takeouts, keep in mind
- `--batchSize` should be smaller on slower disks (even of size
`16-64M`) to keep db small. Balanced `batchSize` could increase
throughput preserving db size.
- We have some internal functions which relies on this ordering like
`define available steps in db`
- When `--batchSize` is reached, commitment evaluated and puts update
into that batch which becomes x1.4-x2 of size
  • Loading branch information
awskii authored Jan 16, 2024
1 parent 44f4579 commit 40f8b12
Show file tree
Hide file tree
Showing 16 changed files with 754 additions and 404 deletions.
3 changes: 1 addition & 2 deletions cmd/devnet/contracts/steps/subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,8 @@ package contracts_steps
import (
"context"
"fmt"
"math/big"

"github.com/ledgerwatch/erigon-lib/common/hexutil"
"math/big"

ethereum "github.com/ledgerwatch/erigon"
libcommon "github.com/ledgerwatch/erigon-lib/common"
Expand Down
205 changes: 155 additions & 50 deletions erigon-lib/state/aggregator_v3.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"os"
"path/filepath"
"runtime"
"sort"
"strings"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -535,9 +536,7 @@ func (a *AggregatorV3) buildFiles(ctx context.Context, step uint64) error {
collations = append(collations, collation)
collListMu.Unlock()

mxRunningFilesBuilding.Inc()
sf, err := d.buildFiles(ctx, step, collation, a.ps)
mxRunningFilesBuilding.Dec()
collation.Close()
if err != nil {
sf.CleanupOnError()
Expand Down Expand Up @@ -568,6 +567,7 @@ func (a *AggregatorV3) buildFiles(ctx context.Context, step uint64) error {
a.wg.Add(1)
g.Go(func() error {
defer a.wg.Done()

var collation map[string]*roaring64.Bitmap
err := a.db.View(ctx, func(tx kv.Tx) (err error) {
collation, err = d.collate(ctx, step, tx)
Expand All @@ -576,9 +576,7 @@ func (a *AggregatorV3) buildFiles(ctx context.Context, step uint64) error {
if err != nil {
return fmt.Errorf("index collation %q has failed: %w", d.filenameBase, err)
}
mxRunningFilesBuilding.Inc()
sf, err := d.buildFiles(ctx, step, collation, a.ps)
mxRunningFilesBuilding.Dec()
if err != nil {
sf.CleanupOnError()
return err
Expand Down Expand Up @@ -732,7 +730,6 @@ func (ac *AggregatorV3Context) maxTxNumInDomainFiles(cold bool) uint64 {
}

func (ac *AggregatorV3Context) CanPrune(tx kv.Tx) bool {
//fmt.Printf("can prune: from=%d < current=%d, keep=%d\n", ac.CanPruneFrom(tx)/ac.a.aggregationStep, ac.maxTxNumInDomainFiles(false)/ac.a.aggregationStep, ac.a.keepInDB)
return ac.CanPruneFrom(tx) < ac.maxTxNumInDomainFiles(false)
}
func (ac *AggregatorV3Context) CanPruneFrom(tx kv.Tx) uint64 {
Expand Down Expand Up @@ -778,20 +775,59 @@ func (ac *AggregatorV3Context) CanUnwindBeforeBlockNum(blockNum uint64, tx kv.Tx
return blockNumWithCommitment, true, nil
}

func (ac *AggregatorV3Context) PruneWithTimeout(ctx context.Context, timeout time.Duration, tx kv.RwTx) error {
cc, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
// returns true if we can prune something already aggregated
func (ac *AggregatorV3Context) nothingToPrune(tx kv.Tx) bool {
return dbg.NoPrune() || (!ac.account.CanPrune(tx) &&
!ac.storage.CanPrune(tx) &&
!ac.code.CanPrune(tx) &&
!ac.commitment.CanPrune(tx) &&
!ac.logAddrs.CanPrune(tx) &&
!ac.logTopics.CanPrune(tx) &&
!ac.tracesFrom.CanPrune(tx) &&
!ac.tracesTo.CanPrune(tx))
}

// PruneSmallBatches is not cancellable, it's over when it's over or failed.
// It fills whole timeout with pruning by small batches (of 100 keys) and making some progress
func (ac *AggregatorV3Context) PruneSmallBatches(ctx context.Context, timeout time.Duration, tx kv.RwTx) error {
started := time.Now()
localTimeout := time.NewTicker(timeout)
defer localTimeout.Stop()
logEvery := time.NewTicker(20 * time.Second)
defer logEvery.Stop()
aggLogEvery := time.NewTicker(600 * time.Second) // to hide specific domain/idx logging
defer aggLogEvery.Stop()

const pruneLimit uint64 = 10000

fullStat := &AggregatorPruneStat{Domains: make(map[string]*DomainPruneStat), Indices: make(map[string]*InvertedIndexPruneStat)}

if err := ac.Prune(cc, tx); err != nil { // prune part of retired data, before commit
if errors.Is(err, context.DeadlineExceeded) {
for {
stat, err := ac.Prune(context.Background(), tx, pruneLimit, aggLogEvery)
if err != nil {
log.Warn("[snapshots] PruneSmallBatches", "err", err)
return err
}
if stat == nil {
return nil
}
return err
}
if cc.Err() != nil { //nolint
return nil //nolint
fullStat.Accumulate(stat)

select {
case <-logEvery.C:
ac.a.logger.Info("[agg] pruning",
"until timeout", time.Until(started.Add(timeout)).String(),
"stepsRangeInDB", ac.a.StepsRangeInDBAsStr(tx),
"pruned", fullStat.String(),
)
case <-localTimeout.C:
return nil
case <-ctx.Done():
return ctx.Err()
default:
}

}
return nil
}

func (a *AggregatorV3) StepsRangeInDBAsStr(tx kv.Tx) string {
Expand All @@ -807,47 +843,123 @@ func (a *AggregatorV3) StepsRangeInDBAsStr(tx kv.Tx) string {
}, ", ")
}

func (ac *AggregatorV3Context) Prune(ctx context.Context, tx kv.RwTx) error {
if dbg.NoPrune() {
return nil
type AggregatorPruneStat struct {
Domains map[string]*DomainPruneStat
Indices map[string]*InvertedIndexPruneStat
}

func (as *AggregatorPruneStat) String() string {
names := make([]string, 0)
for k := range as.Domains {
names = append(names, k)
}

sort.Slice(names, func(i, j int) bool { return names[i] < names[j] })

var sb strings.Builder
for _, d := range names {
v, ok := as.Domains[d]
if ok && v != nil {
sb.WriteString(fmt.Sprintf("%s| %s; ", d, v.String()))
}
}
names = names[:0]
for k := range as.Indices {
names = append(names, k)
}
sort.Slice(names, func(i, j int) bool { return names[i] < names[j] })

for _, d := range names {
v, ok := as.Indices[d]
if ok && v != nil {
sb.WriteString(fmt.Sprintf("%s| %s; ", d, v.String()))
}
}
return strings.TrimSuffix(sb.String(), "; ")
}

func (as *AggregatorPruneStat) Accumulate(other *AggregatorPruneStat) {
for k, v := range other.Domains {
if _, ok := as.Domains[k]; !ok {
as.Domains[k] = v
} else {
as.Domains[k].Accumulate(v)
}
}
for k, v := range other.Indices {
if _, ok := as.Indices[k]; !ok {
as.Indices[k] = v
} else {
as.Indices[k].Accumulate(v)
}
}
}

func (ac *AggregatorV3Context) Prune(ctx context.Context, tx kv.RwTx, limit uint64, logEvery *time.Ticker) (*AggregatorPruneStat, error) {
if ac.nothingToPrune(tx) {
return nil, nil
}
defer mxPruneTookAgg.ObserveDuration(time.Now())

step, limit := ac.a.aggregatedStep.Load(), uint64(math2.MaxUint64)
txTo := (step + 1) * ac.a.aggregationStep
var txFrom uint64
if limit == 0 {
limit = uint64(math2.MaxUint64)
}

logEvery := time.NewTicker(30 * time.Second)
defer logEvery.Stop()
ac.a.logger.Info("aggregator prune", "step", step,
"range", fmt.Sprintf("[%d,%d)", txFrom, txTo), /*"limit", limit,
"stepsLimit", limit/ac.a.aggregationStep,*/"stepsRangeInDB", ac.a.StepsRangeInDBAsStr(tx))
var txFrom, txTo uint64
step := ac.a.aggregatedStep.Load()
txTo = (step + 1) * ac.a.aggregationStep

if err := ac.account.Prune(ctx, tx, step, txFrom, txTo, limit, logEvery); err != nil {
return err
if logEvery == nil {
logEvery = time.NewTicker(30 * time.Second)
defer logEvery.Stop()
}
if err := ac.storage.Prune(ctx, tx, step, txFrom, txTo, limit, logEvery); err != nil {
return err
//ac.a.logger.Debug("aggregator prune", "step", step,
// "txn_range", fmt.Sprintf("[%d,%d)", txFrom, txTo), "limit", limit,
// /*"stepsLimit", limit/ac.a.aggregationStep,*/ "stepsRangeInDB", ac.a.StepsRangeInDBAsStr(tx))
//
ap, err := ac.account.Prune(ctx, tx, step, txFrom, txTo, limit, logEvery)
if err != nil {
return nil, err
}
if err := ac.code.Prune(ctx, tx, step, txFrom, txTo, limit, logEvery); err != nil {
return err
sp, err := ac.storage.Prune(ctx, tx, step, txFrom, txTo, limit, logEvery)
if err != nil {
return nil, err
}
if err := ac.commitment.Prune(ctx, tx, step, txFrom, txTo, limit, logEvery); err != nil {
return err
cp, err := ac.code.Prune(ctx, tx, step, txFrom, txTo, limit, logEvery)
if err != nil {
return nil, err
}
if err := ac.logAddrs.Prune(ctx, tx, txFrom, txTo, limit, logEvery, false); err != nil {
return err
comps, err := ac.commitment.Prune(ctx, tx, step, txFrom, txTo, limit, logEvery)
if err != nil {
return nil, err
}
if err := ac.logTopics.Prune(ctx, tx, txFrom, txTo, limit, logEvery, false); err != nil {
return err
lap, err := ac.logAddrs.Prune(ctx, tx, txFrom, txTo, limit, logEvery, false, nil)
if err != nil {
return nil, err
}
if err := ac.tracesFrom.Prune(ctx, tx, txFrom, txTo, limit, logEvery, false); err != nil {
return err
ltp, err := ac.logTopics.Prune(ctx, tx, txFrom, txTo, limit, logEvery, false, nil)
if err != nil {
return nil, err
}
if err := ac.tracesTo.Prune(ctx, tx, txFrom, txTo, limit, logEvery, false); err != nil {
return err
tfp, err := ac.tracesFrom.Prune(ctx, tx, txFrom, txTo, limit, logEvery, false, nil)
if err != nil {
return nil, err
}
return nil
ttp, err := ac.tracesTo.Prune(ctx, tx, txFrom, txTo, limit, logEvery, false, nil)
if err != nil {
return nil, err
}
aggStat := &AggregatorPruneStat{Domains: make(map[string]*DomainPruneStat), Indices: make(map[string]*InvertedIndexPruneStat)}
aggStat.Domains[ac.account.d.filenameBase] = ap
aggStat.Domains[ac.storage.d.filenameBase] = sp
aggStat.Domains[ac.code.d.filenameBase] = cp
aggStat.Domains[ac.commitment.d.filenameBase] = comps
aggStat.Indices[ac.logAddrs.ii.filenameBase] = lap
aggStat.Indices[ac.logTopics.ii.filenameBase] = ltp
aggStat.Indices[ac.tracesFrom.ii.filenameBase] = tfp
aggStat.Indices[ac.tracesTo.ii.filenameBase] = ttp

return aggStat, nil
}

func (ac *AggregatorV3Context) LogStats(tx kv.Tx, tx2block func(endTxNumMinimax uint64) uint64) {
Expand Down Expand Up @@ -1179,21 +1291,16 @@ func (ac *AggregatorV3Context) mergeFiles(ctx context.Context, files SelectedSta
}
}()

//var predicates sync.WaitGroup
if r.accounts.any() {
log.Info(fmt.Sprintf("[snapshots] merge: %s", r.String()))
//predicates.Add(1)
g.Go(func() (err error) {
//defer predicates.Done()
mf.accounts, mf.accountsIdx, mf.accountsHist, err = ac.account.mergeFiles(ctx, files.accounts, files.accountsIdx, files.accountsHist, r.accounts, ac.a.ps)
return err
})
}

if r.storage.any() {
//predicates.Add(1)
g.Go(func() (err error) {
//defer predicates.Done()
mf.storage, mf.storageIdx, mf.storageHist, err = ac.storage.mergeFiles(ctx, files.storage, files.storageIdx, files.storageHist, r.storage, ac.a.ps)
return err
})
Expand All @@ -1205,14 +1312,12 @@ func (ac *AggregatorV3Context) mergeFiles(ctx context.Context, files SelectedSta
})
}
if r.commitment.any() {
//predicates.Wait()
//log.Info(fmt.Sprintf("[snapshots] merge commitment: %d-%d", r.accounts.historyStartTxNum/ac.a.aggregationStep, r.accounts.historyEndTxNum/ac.a.aggregationStep))
g.Go(func() (err error) {
mf.commitment, mf.commitmentIdx, mf.commitmentHist, err = ac.commitment.mergeFiles(ctx, files.commitment, files.commitmentIdx, files.commitmentHist, r.commitment, ac.a.ps)
return err
//var v4Files SelectedStaticFiles
//var v4MergedF MergedFiles
//
//// THIS merge uses strategy with replacement of hisotry keys in commitment.
//mf.commitment, mf.commitmentIdx, mf.commitmentHist, err = ac.a.commitment.mergeFiles(ctx, v4Files.FillV3(&files), v4MergedF.FillV3(&mf), r.commitment, ac.a.ps)
//return err
Expand Down
52 changes: 22 additions & 30 deletions erigon-lib/state/archive.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
package state

import (
"encoding/binary"
"fmt"
"github.com/ledgerwatch/erigon-lib/compress"
"github.com/ledgerwatch/erigon-lib/kv"
)
Expand Down Expand Up @@ -117,39 +115,33 @@ func (c *compWriter) Close() {
}
}

func SaveExecV3PruneProgress(db kv.Putter, prunedTblName string, step uint64, prunedKey []byte) error {
return db.Put(kv.TblPruningProgress, []byte(prunedTblName), append(encodeBigEndian(step), prunedKey...))
// SaveExecV3PruneProgress saves latest pruned key in given table to the database.
// nil key also allowed and means that latest pruning run has been finished.
func SaveExecV3PruneProgress(db kv.Putter, prunedTblName string, prunedKey []byte) error {
empty := make([]byte, 1)
if prunedKey != nil {
empty[0] = 1
}
return db.Put(kv.TblPruningProgress, []byte(prunedTblName), append(empty, prunedKey...))
}

// GetExecV3PruneProgress retrieves saved progress of given table pruning from the database
// ts==0 && prunedKey==nil means that pruning is finished, next prune could start
// For domains make more sense to store inverted step to have 0 as empty value meaning no progress saved
func GetExecV3PruneProgress(db kv.Getter, prunedTblName string) (ts uint64, pruned []byte, err error) {
// GetExecV3PruneProgress retrieves saved progress of given table pruning from the database.
// For now it is latest pruned key in prunedTblName
func GetExecV3PruneProgress(db kv.Getter, prunedTblName string) (pruned []byte, err error) {
v, err := db.GetOne(kv.TblPruningProgress, []byte(prunedTblName))
if err != nil {
return 0, nil, err
return nil, err
}
return unmarshalData(v)
}

func unmarshalData(data []byte) (uint64, []byte, error) {
switch {
case len(data) < 8 && len(data) > 0:
return 0, nil, fmt.Errorf("value must be at least 8 bytes, got %d", len(data))
case len(data) == 8:
// we want to preserve guarantee that if step==0 && prunedKey==nil then pruning is finished
// If return data[8:] - result will be empty array which is a valid key to prune and does not
// mean that pruning is finished.
return binary.BigEndian.Uint64(data[:8]), nil, nil
case len(data) > 8:
return binary.BigEndian.Uint64(data[:8]), data[8:], nil
switch len(v) {
case 0:
return nil, nil
case 1:
if v[0] == 1 {
return []byte{}, nil
}
// nil values returned an empty key which actually is a value
return nil, nil
default:
return 0, nil, nil
return v[1:], nil
}
}

func encodeBigEndian(n uint64) []byte {
var v [8]byte
binary.BigEndian.PutUint64(v[:], n)
return v[:]
}
Loading

0 comments on commit 40f8b12

Please sign in to comment.