Skip to content

Commit

Permalink
rpc bottleneck: block files mutex (e2) (erigontech#11155) (#440)
Browse files Browse the repository at this point in the history
for erigontech#11090

thank you [tholcman](https://github.com/tholcman) for finding

Co-authored-by: Alex Sharov <[email protected]>
  • Loading branch information
tholcman and AskAlexSharov authored Jul 19, 2024
1 parent addf25a commit 0f0cc41
Show file tree
Hide file tree
Showing 2 changed files with 131 additions and 70 deletions.
141 changes: 75 additions & 66 deletions turbo/snapshotsync/freezeblocks/block_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"sort"
"time"

borsnaptype "github.com/ledgerwatch/erigon/polygon/bor/snaptype"
"github.com/ledgerwatch/log/v3"

"github.com/ledgerwatch/erigon-lib/common/hexutility"
Expand Down Expand Up @@ -395,12 +396,11 @@ func (r *BlockReader) HeaderByNumber(ctx context.Context, tx kv.Getter, blockHei
}
}

view := r.sn.View()
defer view.Close()
seg, ok := view.HeadersSegment(blockHeight)
seg, ok, release := r.sn.ViewSingleFile(coresnaptype.Headers, blockHeight)
if !ok {
return
}
defer release()

h, _, err = r.headerFromSnapshot(blockHeight, seg, nil)
if err != nil {
Expand All @@ -419,9 +419,8 @@ func (r *BlockReader) HeaderByHash(ctx context.Context, tx kv.Getter, hash commo
return h, nil
}

view := r.sn.View()
defer view.Close()
segments := view.Headers()
segments, release := r.sn.ViewType(coresnaptype.Headers)
defer release()

buf := make([]byte, 128)
for i := len(segments) - 1; i >= 0; i-- {
Expand Down Expand Up @@ -451,12 +450,11 @@ func (r *BlockReader) CanonicalHash(ctx context.Context, tx kv.Getter, blockHeig
return h, nil
}

view := r.sn.View()
defer view.Close()
seg, ok := view.HeadersSegment(blockHeight)
seg, ok, release := r.sn.ViewSingleFile(coresnaptype.Headers, blockHeight)
if !ok {
return
}
defer release()

header, _, err := r.headerFromSnapshot(blockHeight, seg, nil)
if err != nil {
Expand Down Expand Up @@ -486,12 +484,12 @@ func (r *BlockReader) Header(ctx context.Context, tx kv.Getter, hash common.Hash
}
}

view := r.sn.View()
defer view.Close()
seg, ok := view.HeadersSegment(blockHeight)
seg, ok, release := r.sn.ViewSingleFile(coresnaptype.Headers, blockHeight)
if !ok {
return
}
defer release()

h, _, err = r.headerFromSnapshot(blockHeight, seg, nil)
if err != nil {
return h, err
Expand Down Expand Up @@ -526,40 +524,46 @@ func (r *BlockReader) BodyWithTransactions(ctx context.Context, tx kv.Getter, ha
}
}

view := r.sn.View()
defer view.Close()

var baseTxnID uint64
var txsAmount uint32
var buf []byte
seg, ok := view.BodiesSegment(blockHeight)
seg, ok, release := r.sn.ViewSingleFile(coresnaptype.Bodies, blockHeight)
if !ok {
if dbgLogs {
log.Info(dbgPrefix + "no bodies file for this block num")
}
return nil, nil
}
body, baseTxnID, txsAmount, buf, err = r.bodyFromSnapshot(blockHeight, seg, buf)
defer release()

var baseTxnID uint64
var txCount uint32
var buf []byte
body, baseTxnID, txCount, buf, err = r.bodyFromSnapshot(blockHeight, seg, buf)
if err != nil {
return nil, err
}
release()

if body == nil {
if dbgLogs {
log.Info(dbgPrefix + "got nil body from file")
}
return nil, nil
}
txnSeg, ok := view.TxsSegment(blockHeight)

txnSeg, ok, release := r.sn.ViewSingleFile(coresnaptype.Transactions, blockHeight)
if !ok {
if dbgLogs {
log.Info(dbgPrefix+"no transactions file for this block num", "r.sn.BlocksAvailable()", r.sn.BlocksAvailable(), "r.sn.idxMax", r.sn.idxMax.Load(), "r.sn.segmetntsMax", r.sn.segmentsMax.Load())
}
return nil, nil
}
txs, senders, err := r.txsFromSnapshot(baseTxnID, txsAmount, txnSeg, buf)
defer release()

txs, senders, err := r.txsFromSnapshot(baseTxnID, txCount, txnSeg, buf)
if err != nil {
return nil, err
}
release()

if txs == nil {
if dbgLogs {
log.Info(dbgPrefix + "got nil txs from file")
Expand Down Expand Up @@ -606,13 +610,13 @@ func (r *BlockReader) Body(ctx context.Context, tx kv.Getter, hash common.Hash,
body, _, txAmount = rawdb.ReadBody(tx, hash, blockHeight)
return body, txAmount, nil
}
view := r.sn.View()
defer view.Close()

seg, ok := view.BodiesSegment(blockHeight)
seg, ok, release := r.sn.ViewSingleFile(coresnaptype.Bodies, blockHeight)
if !ok {
return
}
defer release()

body, _, txAmount, _, err = r.bodyFromSnapshot(blockHeight, seg, nil)
if err != nil {
return nil, 0, err
Expand Down Expand Up @@ -676,15 +680,14 @@ func (r *BlockReader) blockWithSenders(ctx context.Context, tx kv.Getter, hash c
return
}

view := r.sn.View()
defer view.Close()
seg, ok := view.HeadersSegment(blockHeight)
seg, ok, release := r.sn.ViewSingleFile(coresnaptype.Headers, blockHeight)
if !ok {
if dbgLogs {
log.Info(dbgPrefix + "no header files for this block num")
}
return
}
defer release()

var buf []byte
h, buf, err := r.headerFromSnapshot(blockHeight, seg, buf)
Expand All @@ -697,21 +700,26 @@ func (r *BlockReader) blockWithSenders(ctx context.Context, tx kv.Getter, hash c
}
return
}
release()

var b *types.Body
var baseTxnId uint64
var txsAmount uint32
bodySeg, ok := view.BodiesSegment(blockHeight)
bodySeg, ok, release := r.sn.ViewSingleFile(coresnaptype.Bodies, blockHeight)
if !ok {
if dbgLogs {
log.Info(dbgPrefix + "no bodies file for this block num")
}
return
}
defer release()

b, baseTxnId, txsAmount, buf, err = r.bodyFromSnapshot(blockHeight, bodySeg, buf)
if err != nil {
return nil, nil, err
}
release()

if b == nil {
if dbgLogs {
log.Info(dbgPrefix + "got nil body from file")
Expand All @@ -730,18 +738,21 @@ func (r *BlockReader) blockWithSenders(ctx context.Context, tx kv.Getter, hash c
return block, senders, nil
}

txnSeg, ok := view.TxsSegment(blockHeight)
txnSeg, ok, release := r.sn.ViewSingleFile(coresnaptype.Transactions, blockHeight)
if !ok {
if dbgLogs {
log.Info(dbgPrefix+"no transactions file for this block num", "r.sn.BlocksAvailable()", r.sn.BlocksAvailable(), "r.sn.indicesReady", r.sn.indicesReady.Load())
}
return
}
defer release()
var txs []types.Transaction
txs, senders, err = r.txsFromSnapshot(baseTxnId, txsAmount, txnSeg, buf)
if err != nil {
return nil, nil, err
}
release()

block = types.NewBlockFromStorage(hash, h, txs, b.Uncles, b.Withdrawals)
if len(senders) != block.Transactions().Len() {
if dbgLogs {
Expand Down Expand Up @@ -997,18 +1008,18 @@ func (r *BlockReader) TxnByIdxInBlock(ctx context.Context, tx kv.Getter, blockNu
return rawdb.TxnByIdxInBlock(tx, canonicalHash, blockNum, txIdxInBlock)
}

view := r.sn.View()
defer view.Close()
seg, ok := view.BodiesSegment(blockNum)
seg, ok, release := r.sn.ViewSingleFile(coresnaptype.Bodies, blockNum)
if !ok {
return
}
defer release()

var b *types.BodyForStorage
b, _, err = r.bodyForStorageFromSnapshot(blockNum, seg, nil)
if err != nil {
return nil, err
}
release()
if b == nil {
return
}
Expand All @@ -1018,10 +1029,12 @@ func (r *BlockReader) TxnByIdxInBlock(ctx context.Context, tx kv.Getter, blockNu
return nil, nil
}

txnSeg, ok := view.TxsSegment(blockNum)
txnSeg, ok, release := r.sn.ViewSingleFile(coresnaptype.Transactions, blockNum)
if !ok {
return
}
defer release()

// +1 because block has system-txn in the beginning of block
return r.txnByID(b.BaseTxId+1+uint64(txIdxInBlock), txnSeg, nil)
}
Expand All @@ -1036,10 +1049,9 @@ func (r *BlockReader) TxnLookup(_ context.Context, tx kv.Getter, txnHash common.
return *n, true, nil
}

view := r.sn.View()
defer view.Close()

_, blockNum, ok, err := r.txnByHash(txnHash, view.Txs(), nil)
txns, release := r.sn.ViewType(coresnaptype.Transactions)
defer release()
_, blockNum, ok, err := r.txnByHash(txnHash, txns, nil)
if err != nil {
return 0, false, err
}
Expand All @@ -1048,13 +1060,11 @@ func (r *BlockReader) TxnLookup(_ context.Context, tx kv.Getter, txnHash common.
}

func (r *BlockReader) FirstTxnNumNotInSnapshots() uint64 {
view := r.sn.View()
defer view.Close()

sn, ok := view.TxsSegment(r.sn.BlocksAvailable())
sn, ok, release := r.sn.ViewSingleFile(coresnaptype.Transactions, r.sn.BlocksAvailable())
if !ok {
return 0
}
defer release()

lastTxnID := sn.Index(coresnaptype.Indexes.TxnHash).BaseDataID() + uint64(sn.Count())
return lastTxnID
Expand Down Expand Up @@ -1268,10 +1278,10 @@ func (r *BlockReader) BorStartEventID(ctx context.Context, tx kv.Tx, hash common
}

borTxHash := bortypes.ComputeBorTxHash(blockHeight, hash)
view := r.borSn.View()
defer view.Close()

segments := view.Events()
segments, release := r.borSn.ViewType(borsnaptype.BorEvents)
defer release()

for i := len(segments) - 1; i >= 0; i-- {
sn := segments[i]
if sn.from > blockHeight {
Expand Down Expand Up @@ -1349,9 +1359,10 @@ func (r *BlockReader) EventsByBlock(ctx context.Context, tx kv.Tx, hash common.H
return result, nil
}
borTxHash := bortypes.ComputeBorTxHash(blockHeight, hash)
view := r.borSn.View()
defer view.Close()
segments := view.Events()

segments, release := r.borSn.ViewType(borsnaptype.BorEvents)
defer release()

var buf []byte
result := []rlp.RawValue{}
for i := len(segments) - 1; i >= 0; i-- {
Expand Down Expand Up @@ -1389,10 +1400,9 @@ func (r *BlockReader) EventsByBlock(ctx context.Context, tx kv.Tx, hash common.H

// EventsByIdFromSnapshot returns the list of records limited by time, or the number of records along with a bool value to signify if the records were limited by time
func (r *BlockReader) EventsByIdFromSnapshot(from uint64, to time.Time, limit int) ([]*heimdall.EventRecordWithTime, bool, error) {
view := r.borSn.View()
defer view.Close()
segments, release := r.borSn.ViewType(borsnaptype.BorEvents)
defer release()

segments := view.Events()
var buf []byte
var result []*heimdall.EventRecordWithTime
stateContract := bor.GenesisContractStateReceiverABI()
Expand Down Expand Up @@ -1469,9 +1479,9 @@ func (r *BlockReader) LastFrozenEventId() uint64 {
return 0
}

view := r.borSn.View()
defer view.Close()
segments := view.Events()
segments, release := r.borSn.ViewType(borsnaptype.BorEvents)
defer release()

if len(segments) == 0 {
return 0
}
Expand Down Expand Up @@ -1526,9 +1536,9 @@ func (r *BlockReader) LastFrozenSpanId() uint64 {
return 0
}

view := r.borSn.View()
defer view.Close()
segments := view.Spans()
segments, release := r.borSn.ViewType(borsnaptype.BorSpans)
defer release()

if len(segments) == 0 {
return 0
}
Expand Down Expand Up @@ -1570,9 +1580,9 @@ func (r *BlockReader) Span(ctx context.Context, tx kv.Getter, spanId uint64) ([]
}
return common.Copy(v), nil
}
view := r.borSn.View()
defer view.Close()
segments := view.Spans()
segments, release := r.borSn.ViewType(borsnaptype.BorSpans)
defer release()

for i := len(segments) - 1; i >= 0; i-- {
sn := segments[i]
idx := sn.Index()
Expand Down Expand Up @@ -1674,9 +1684,9 @@ func (r *BlockReader) Checkpoint(ctx context.Context, tx kv.Getter, checkpointId
return common.Copy(v), nil
}

view := r.borSn.View()
defer view.Close()
segments := view.Checkpoints()
segments, release := r.borSn.ViewType(borsnaptype.BorCheckpoints)
defer release()

for i := len(segments) - 1; i >= 0; i-- {
sn := segments[i]
index := sn.Index()
Expand All @@ -1700,9 +1710,8 @@ func (r *BlockReader) LastFrozenCheckpointId() uint64 {
return 0
}

view := r.borSn.View()
defer view.Close()
segments := view.Checkpoints()
segments, release := r.borSn.ViewType(borsnaptype.BorCheckpoints)
defer release()
if len(segments) == 0 {
return 0
}
Expand Down
Loading

0 comments on commit 0f0cc41

Please sign in to comment.