Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

E35 prune small batches #9088

Merged
merged 83 commits into from
Jan 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
83 commits
Select commit Hold shift + click to select a range
90e956d
wip save
awskii Dec 27, 2023
7516903
save
awskii Dec 27, 2023
a3f2b58
save
awskii Dec 27, 2023
e0d7ed0
save
awskii Dec 27, 2023
42901c7
wip save test
awskii Dec 28, 2023
6e7ae96
Merge branch 'e35' into e35_prune_small
awskii Dec 28, 2023
a15f973
save
awskii Dec 28, 2023
e8125a5
save
awskii Dec 29, 2023
3e8903e
save
awskii Dec 29, 2023
4d64cb8
save
awskii Dec 29, 2023
d3aec5d
save
awskii Dec 29, 2023
3ee60b4
Merge branch 'e35' into e35_prune_small
awskii Dec 29, 2023
eb3cdb6
save
awskii Dec 29, 2023
bab881a
save
awskii Dec 29, 2023
dbdaf62
save
awskii Dec 29, 2023
a80a50b
save
awskii Dec 30, 2023
6ccd5d2
save
awskii Jan 2, 2024
77ae6b4
Merge branch 'e35' into e35_prune_small
awskii Jan 2, 2024
f1a0564
Merge branch 'e35' into e35_prune_small
awskii Jan 3, 2024
05fe3b5
save
awskii Jan 3, 2024
89590a9
save
awskii Jan 3, 2024
cb41783
save
awskii Jan 3, 2024
1029b6c
Merge branch 'e35' into e35_prune_small
awskii Jan 4, 2024
5e34e84
save
awskii Jan 4, 2024
ec46532
save
awskii Jan 4, 2024
1754b88
save
awskii Jan 4, 2024
188bca5
save
awskii Jan 4, 2024
d0ce1f5
use progress for domains
awskii Jan 4, 2024
04cf0a6
save
awskii Jan 4, 2024
a004898
save
awskii Jan 4, 2024
97fe267
save
awskii Jan 5, 2024
1c8752f
save
awskii Jan 5, 2024
8c43a77
save
awskii Jan 5, 2024
51f58fc
added agg prune stats
awskii Jan 6, 2024
ce1b99d
save
awskii Jan 6, 2024
3bb4133
save
awskii Jan 6, 2024
fb12283
save
awskii Jan 6, 2024
3adb48f
ok
awskii Jan 6, 2024
db5fdbf
save
awskii Jan 7, 2024
9243b51
save
awskii Jan 7, 2024
4cf726c
hmmm
awskii Jan 7, 2024
b157a8f
ok
awskii Jan 8, 2024
431fa9d
ok
awskii Jan 8, 2024
a2f2862
Merge branch 'e35' into e35_prune_small
awskii Jan 8, 2024
44638be
ok
awskii Jan 8, 2024
a116184
ok
awskii Jan 8, 2024
8b77f73
ok
awskii Jan 9, 2024
5298c3d
ok!
awskii Jan 9, 2024
18b34b7
increased prunelimit up to 10k
awskii Jan 9, 2024
a9bf201
Merge branch 'e35' into e35_prune_small
awskii Jan 9, 2024
0f5d1bc
save
awskii Jan 9, 2024
9846c81
Merge branch 'e35' into e35_prune_small
awskii Jan 10, 2024
c68e757
save
awskii Jan 10, 2024
3dc95dd
per-key index pruning granularity
awskii Jan 10, 2024
3aef903
save
awskii Jan 10, 2024
4463958
Revert "per-key index pruning granularity"
awskii Jan 10, 2024
6f0f59b
save
awskii Jan 10, 2024
98415bc
save
awskii Jan 11, 2024
77a0102
save
awskii Jan 11, 2024
be1894d
save
awskii Jan 11, 2024
03284cc
Merge branch 'e35' into e35_prune_small
awskii Jan 11, 2024
f82ac2d
save
awskii Jan 11, 2024
8200e7c
save
awskii Jan 11, 2024
32a96e1
save
awskii Jan 11, 2024
9db9320
save.
awskii Jan 11, 2024
a7338f3
save
awskii Jan 12, 2024
e30d54d
save
awskii Jan 12, 2024
adbb324
save
awskii Jan 12, 2024
96a39fa
save
awskii Jan 15, 2024
434ac0f
Merge origin/e35 into e35_prune_small
awskii Jan 15, 2024
e345d9d
test
awskii Jan 15, 2024
6cc3267
test
awskii Jan 15, 2024
dd5bc2a
test
awskii Jan 15, 2024
8d06fba
test
awskii Jan 15, 2024
fa79710
no save, test
awskii Jan 15, 2024
7b5c92b
test
awskii Jan 15, 2024
035242a
test
awskii Jan 15, 2024
795e38f
save
awskii Jan 15, 2024
7b2adb4
save ok
awskii Jan 15, 2024
c5579c8
ok
awskii Jan 15, 2024
cd9655d
save
awskii Jan 16, 2024
265a705
Merge branch 'e35' into e35_prune_small
awskii Jan 16, 2024
8a65996
ok!
awskii Jan 16, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions cmd/devnet/contracts/steps/subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,8 @@ package contracts_steps
import (
"context"
"fmt"
"math/big"

"github.com/ledgerwatch/erigon-lib/common/hexutil"
"math/big"

ethereum "github.com/ledgerwatch/erigon"
libcommon "github.com/ledgerwatch/erigon-lib/common"
Expand Down
205 changes: 155 additions & 50 deletions erigon-lib/state/aggregator_v3.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"os"
"path/filepath"
"runtime"
"sort"
"strings"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -535,9 +536,7 @@ func (a *AggregatorV3) buildFiles(ctx context.Context, step uint64) error {
collations = append(collations, collation)
collListMu.Unlock()

mxRunningFilesBuilding.Inc()
sf, err := d.buildFiles(ctx, step, collation, a.ps)
mxRunningFilesBuilding.Dec()
collation.Close()
if err != nil {
sf.CleanupOnError()
Expand Down Expand Up @@ -568,6 +567,7 @@ func (a *AggregatorV3) buildFiles(ctx context.Context, step uint64) error {
a.wg.Add(1)
g.Go(func() error {
defer a.wg.Done()

var collation map[string]*roaring64.Bitmap
err := a.db.View(ctx, func(tx kv.Tx) (err error) {
collation, err = d.collate(ctx, step, tx)
Expand All @@ -576,9 +576,7 @@ func (a *AggregatorV3) buildFiles(ctx context.Context, step uint64) error {
if err != nil {
return fmt.Errorf("index collation %q has failed: %w", d.filenameBase, err)
}
mxRunningFilesBuilding.Inc()
sf, err := d.buildFiles(ctx, step, collation, a.ps)
mxRunningFilesBuilding.Dec()
if err != nil {
sf.CleanupOnError()
return err
Expand Down Expand Up @@ -732,7 +730,6 @@ func (ac *AggregatorV3Context) maxTxNumInDomainFiles(cold bool) uint64 {
}

func (ac *AggregatorV3Context) CanPrune(tx kv.Tx) bool {
//fmt.Printf("can prune: from=%d < current=%d, keep=%d\n", ac.CanPruneFrom(tx)/ac.a.aggregationStep, ac.maxTxNumInDomainFiles(false)/ac.a.aggregationStep, ac.a.keepInDB)
return ac.CanPruneFrom(tx) < ac.maxTxNumInDomainFiles(false)
}
func (ac *AggregatorV3Context) CanPruneFrom(tx kv.Tx) uint64 {
Expand Down Expand Up @@ -778,20 +775,59 @@ func (ac *AggregatorV3Context) CanUnwindBeforeBlockNum(blockNum uint64, tx kv.Tx
return blockNumWithCommitment, true, nil
}

func (ac *AggregatorV3Context) PruneWithTimeout(ctx context.Context, timeout time.Duration, tx kv.RwTx) error {
cc, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
// returns true if we can prune something already aggregated
func (ac *AggregatorV3Context) nothingToPrune(tx kv.Tx) bool {
return dbg.NoPrune() || (!ac.account.CanPrune(tx) &&
!ac.storage.CanPrune(tx) &&
!ac.code.CanPrune(tx) &&
!ac.commitment.CanPrune(tx) &&
!ac.logAddrs.CanPrune(tx) &&
!ac.logTopics.CanPrune(tx) &&
!ac.tracesFrom.CanPrune(tx) &&
!ac.tracesTo.CanPrune(tx))
}

// PruneSmallBatches is not cancellable, it's over when it's over or failed.
// It fills whole timeout with pruning by small batches (of 100 keys) and making some progress
func (ac *AggregatorV3Context) PruneSmallBatches(ctx context.Context, timeout time.Duration, tx kv.RwTx) error {
started := time.Now()
localTimeout := time.NewTicker(timeout)
defer localTimeout.Stop()
logEvery := time.NewTicker(20 * time.Second)
defer logEvery.Stop()
aggLogEvery := time.NewTicker(600 * time.Second) // to hide specific domain/idx logging
defer aggLogEvery.Stop()

const pruneLimit uint64 = 10000

fullStat := &AggregatorPruneStat{Domains: make(map[string]*DomainPruneStat), Indices: make(map[string]*InvertedIndexPruneStat)}

if err := ac.Prune(cc, tx); err != nil { // prune part of retired data, before commit
if errors.Is(err, context.DeadlineExceeded) {
for {
stat, err := ac.Prune(context.Background(), tx, pruneLimit, aggLogEvery)
if err != nil {
log.Warn("[snapshots] PruneSmallBatches", "err", err)
return err
}
if stat == nil {
return nil
}
return err
}
if cc.Err() != nil { //nolint
return nil //nolint
fullStat.Accumulate(stat)

select {
case <-logEvery.C:
ac.a.logger.Info("[agg] pruning",
"until timeout", time.Until(started.Add(timeout)).String(),
"stepsRangeInDB", ac.a.StepsRangeInDBAsStr(tx),
"pruned", fullStat.String(),
)
case <-localTimeout.C:
return nil
case <-ctx.Done():
return ctx.Err()
default:
}

}
return nil
}

func (a *AggregatorV3) StepsRangeInDBAsStr(tx kv.Tx) string {
Expand All @@ -807,47 +843,123 @@ func (a *AggregatorV3) StepsRangeInDBAsStr(tx kv.Tx) string {
}, ", ")
}

func (ac *AggregatorV3Context) Prune(ctx context.Context, tx kv.RwTx) error {
if dbg.NoPrune() {
return nil
type AggregatorPruneStat struct {
Domains map[string]*DomainPruneStat
Indices map[string]*InvertedIndexPruneStat
}

func (as *AggregatorPruneStat) String() string {
names := make([]string, 0)
for k := range as.Domains {
names = append(names, k)
}

sort.Slice(names, func(i, j int) bool { return names[i] < names[j] })

var sb strings.Builder
for _, d := range names {
v, ok := as.Domains[d]
if ok && v != nil {
sb.WriteString(fmt.Sprintf("%s| %s; ", d, v.String()))
}
}
names = names[:0]
for k := range as.Indices {
names = append(names, k)
}
sort.Slice(names, func(i, j int) bool { return names[i] < names[j] })

for _, d := range names {
v, ok := as.Indices[d]
if ok && v != nil {
sb.WriteString(fmt.Sprintf("%s| %s; ", d, v.String()))
}
}
return strings.TrimSuffix(sb.String(), "; ")
}

func (as *AggregatorPruneStat) Accumulate(other *AggregatorPruneStat) {
for k, v := range other.Domains {
if _, ok := as.Domains[k]; !ok {
as.Domains[k] = v
} else {
as.Domains[k].Accumulate(v)
}
}
for k, v := range other.Indices {
if _, ok := as.Indices[k]; !ok {
as.Indices[k] = v
} else {
as.Indices[k].Accumulate(v)
}
}
}

func (ac *AggregatorV3Context) Prune(ctx context.Context, tx kv.RwTx, limit uint64, logEvery *time.Ticker) (*AggregatorPruneStat, error) {
if ac.nothingToPrune(tx) {
return nil, nil
}
defer mxPruneTookAgg.ObserveDuration(time.Now())

step, limit := ac.a.aggregatedStep.Load(), uint64(math2.MaxUint64)
txTo := (step + 1) * ac.a.aggregationStep
var txFrom uint64
if limit == 0 {
limit = uint64(math2.MaxUint64)
}

logEvery := time.NewTicker(30 * time.Second)
defer logEvery.Stop()
ac.a.logger.Info("aggregator prune", "step", step,
"range", fmt.Sprintf("[%d,%d)", txFrom, txTo), /*"limit", limit,
"stepsLimit", limit/ac.a.aggregationStep,*/"stepsRangeInDB", ac.a.StepsRangeInDBAsStr(tx))
var txFrom, txTo uint64
step := ac.a.aggregatedStep.Load()
txTo = (step + 1) * ac.a.aggregationStep

if err := ac.account.Prune(ctx, tx, step, txFrom, txTo, limit, logEvery); err != nil {
return err
if logEvery == nil {
logEvery = time.NewTicker(30 * time.Second)
defer logEvery.Stop()
}
if err := ac.storage.Prune(ctx, tx, step, txFrom, txTo, limit, logEvery); err != nil {
return err
//ac.a.logger.Debug("aggregator prune", "step", step,
// "txn_range", fmt.Sprintf("[%d,%d)", txFrom, txTo), "limit", limit,
// /*"stepsLimit", limit/ac.a.aggregationStep,*/ "stepsRangeInDB", ac.a.StepsRangeInDBAsStr(tx))
//
ap, err := ac.account.Prune(ctx, tx, step, txFrom, txTo, limit, logEvery)
if err != nil {
return nil, err
}
if err := ac.code.Prune(ctx, tx, step, txFrom, txTo, limit, logEvery); err != nil {
return err
sp, err := ac.storage.Prune(ctx, tx, step, txFrom, txTo, limit, logEvery)
if err != nil {
return nil, err
}
if err := ac.commitment.Prune(ctx, tx, step, txFrom, txTo, limit, logEvery); err != nil {
return err
cp, err := ac.code.Prune(ctx, tx, step, txFrom, txTo, limit, logEvery)
if err != nil {
return nil, err
}
if err := ac.logAddrs.Prune(ctx, tx, txFrom, txTo, limit, logEvery, false); err != nil {
return err
comps, err := ac.commitment.Prune(ctx, tx, step, txFrom, txTo, limit, logEvery)
if err != nil {
return nil, err
}
if err := ac.logTopics.Prune(ctx, tx, txFrom, txTo, limit, logEvery, false); err != nil {
return err
lap, err := ac.logAddrs.Prune(ctx, tx, txFrom, txTo, limit, logEvery, false, nil)
if err != nil {
return nil, err
}
if err := ac.tracesFrom.Prune(ctx, tx, txFrom, txTo, limit, logEvery, false); err != nil {
return err
ltp, err := ac.logTopics.Prune(ctx, tx, txFrom, txTo, limit, logEvery, false, nil)
if err != nil {
return nil, err
}
if err := ac.tracesTo.Prune(ctx, tx, txFrom, txTo, limit, logEvery, false); err != nil {
return err
tfp, err := ac.tracesFrom.Prune(ctx, tx, txFrom, txTo, limit, logEvery, false, nil)
if err != nil {
return nil, err
}
return nil
ttp, err := ac.tracesTo.Prune(ctx, tx, txFrom, txTo, limit, logEvery, false, nil)
if err != nil {
return nil, err
}
aggStat := &AggregatorPruneStat{Domains: make(map[string]*DomainPruneStat), Indices: make(map[string]*InvertedIndexPruneStat)}
aggStat.Domains[ac.account.d.filenameBase] = ap
aggStat.Domains[ac.storage.d.filenameBase] = sp
aggStat.Domains[ac.code.d.filenameBase] = cp
aggStat.Domains[ac.commitment.d.filenameBase] = comps
aggStat.Indices[ac.logAddrs.ii.filenameBase] = lap
aggStat.Indices[ac.logTopics.ii.filenameBase] = ltp
aggStat.Indices[ac.tracesFrom.ii.filenameBase] = tfp
aggStat.Indices[ac.tracesTo.ii.filenameBase] = ttp

return aggStat, nil
}

func (ac *AggregatorV3Context) LogStats(tx kv.Tx, tx2block func(endTxNumMinimax uint64) uint64) {
Expand Down Expand Up @@ -1179,21 +1291,16 @@ func (ac *AggregatorV3Context) mergeFiles(ctx context.Context, files SelectedSta
}
}()

//var predicates sync.WaitGroup
if r.accounts.any() {
log.Info(fmt.Sprintf("[snapshots] merge: %s", r.String()))
//predicates.Add(1)
g.Go(func() (err error) {
//defer predicates.Done()
mf.accounts, mf.accountsIdx, mf.accountsHist, err = ac.account.mergeFiles(ctx, files.accounts, files.accountsIdx, files.accountsHist, r.accounts, ac.a.ps)
return err
})
}

if r.storage.any() {
//predicates.Add(1)
g.Go(func() (err error) {
//defer predicates.Done()
mf.storage, mf.storageIdx, mf.storageHist, err = ac.storage.mergeFiles(ctx, files.storage, files.storageIdx, files.storageHist, r.storage, ac.a.ps)
return err
})
Expand All @@ -1205,14 +1312,12 @@ func (ac *AggregatorV3Context) mergeFiles(ctx context.Context, files SelectedSta
})
}
if r.commitment.any() {
//predicates.Wait()
//log.Info(fmt.Sprintf("[snapshots] merge commitment: %d-%d", r.accounts.historyStartTxNum/ac.a.aggregationStep, r.accounts.historyEndTxNum/ac.a.aggregationStep))
g.Go(func() (err error) {
mf.commitment, mf.commitmentIdx, mf.commitmentHist, err = ac.commitment.mergeFiles(ctx, files.commitment, files.commitmentIdx, files.commitmentHist, r.commitment, ac.a.ps)
return err
//var v4Files SelectedStaticFiles
//var v4MergedF MergedFiles
//
//// THIS merge uses strategy with replacement of hisotry keys in commitment.
//mf.commitment, mf.commitmentIdx, mf.commitmentHist, err = ac.a.commitment.mergeFiles(ctx, v4Files.FillV3(&files), v4MergedF.FillV3(&mf), r.commitment, ac.a.ps)
//return err
Expand Down
52 changes: 22 additions & 30 deletions erigon-lib/state/archive.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
package state

import (
"encoding/binary"
"fmt"
"github.com/ledgerwatch/erigon-lib/compress"
"github.com/ledgerwatch/erigon-lib/kv"
)
Expand Down Expand Up @@ -117,39 +115,33 @@ func (c *compWriter) Close() {
}
}

func SaveExecV3PruneProgress(db kv.Putter, prunedTblName string, step uint64, prunedKey []byte) error {
return db.Put(kv.TblPruningProgress, []byte(prunedTblName), append(encodeBigEndian(step), prunedKey...))
// SaveExecV3PruneProgress saves latest pruned key in given table to the database.
// nil key also allowed and means that latest pruning run has been finished.
func SaveExecV3PruneProgress(db kv.Putter, prunedTblName string, prunedKey []byte) error {
empty := make([]byte, 1)
if prunedKey != nil {
empty[0] = 1
}
return db.Put(kv.TblPruningProgress, []byte(prunedTblName), append(empty, prunedKey...))
}

// GetExecV3PruneProgress retrieves saved progress of given table pruning from the database
// ts==0 && prunedKey==nil means that pruning is finished, next prune could start
// For domains make more sense to store inverted step to have 0 as empty value meaning no progress saved
func GetExecV3PruneProgress(db kv.Getter, prunedTblName string) (ts uint64, pruned []byte, err error) {
// GetExecV3PruneProgress retrieves saved progress of given table pruning from the database.
// For now it is latest pruned key in prunedTblName
func GetExecV3PruneProgress(db kv.Getter, prunedTblName string) (pruned []byte, err error) {
v, err := db.GetOne(kv.TblPruningProgress, []byte(prunedTblName))
if err != nil {
return 0, nil, err
return nil, err
}
return unmarshalData(v)
}

func unmarshalData(data []byte) (uint64, []byte, error) {
switch {
case len(data) < 8 && len(data) > 0:
return 0, nil, fmt.Errorf("value must be at least 8 bytes, got %d", len(data))
case len(data) == 8:
// we want to preserve guarantee that if step==0 && prunedKey==nil then pruning is finished
// If return data[8:] - result will be empty array which is a valid key to prune and does not
// mean that pruning is finished.
return binary.BigEndian.Uint64(data[:8]), nil, nil
case len(data) > 8:
return binary.BigEndian.Uint64(data[:8]), data[8:], nil
switch len(v) {
case 0:
return nil, nil
case 1:
if v[0] == 1 {
return []byte{}, nil
}
// nil values returned an empty key which actually is a value
return nil, nil
default:
return 0, nil, nil
return v[1:], nil
}
}

func encodeBigEndian(n uint64) []byte {
var v [8]byte
binary.BigEndian.PutUint64(v[:], n)
return v[:]
}
Loading
Loading