Skip to content

Commit

Permalink
fix: dev: rewrite reporter
Browse files Browse the repository at this point in the history
  • Loading branch information
Jolly23 committed Nov 20, 2023
1 parent 4a3c34e commit 420bb62
Show file tree
Hide file tree
Showing 17 changed files with 521 additions and 499 deletions.
245 changes: 83 additions & 162 deletions core/puissant.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,14 @@
package core

import (
"bytes"
"context"
"errors"
mapset "github.com/deckarep/golang-set/v2"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/params"
"math/big"
"strings"
)

type puissantRuntime struct {
Expand All @@ -19,15 +18,15 @@ type puissantRuntime struct {
chainConfig *params.ChainConfig
bloomProcessor *ReceiptBloomGenerator

pTxsStatuses map[types.PuissantID][]*ptxCommitStatus
pSeqID2PIDBook map[int]types.PuissantID
succeedGroup mapset.Set[types.PuissantID]
failedGroup mapset.Set[types.PuissantID]
failedTx mapset.Set[common.Hash]
bundles types.PuissantBundles

resultOK mapset.Set[types.PuissantID]
resultFailed mapset.Set[types.PuissantID]
}

func RunPuissantCommitter(
ctx context.Context,
round int,
interruptCh chan int32,
signalToErr func(int32) error,

Expand All @@ -37,148 +36,88 @@ func RunPuissantCommitter(
chain *BlockChain,
chainConfig *params.ChainConfig,
poolRemoveFn func(mapset.Set[types.PuissantID]),
) ([]*CommitterReport, bool, error) {
) (bool, error) {

var committer = &puissantRuntime{
env: workerEnv,
snapshots: newSnapshotSet(),
chain: chain,
chainConfig: chainConfig,
bloomProcessor: NewReceiptBloomGenerator(),
pTxsStatuses: make(map[types.PuissantID][]*ptxCommitStatus),
pSeqID2PIDBook: make(map[int]types.PuissantID),
succeedGroup: mapset.NewThreadUnsafeSet[types.PuissantID](),
failedGroup: mapset.NewThreadUnsafeSet[types.PuissantID](),
failedTx: mapset.NewThreadUnsafeSet[common.Hash](),
}
defer poolRemoveFn(committer.failedGroup)

for pSeq, pGroup := range puissantPool {
committer.pTxsStatuses[pGroup.ID()] = make([]*ptxCommitStatus, pGroup.TxCount())
committer.pSeqID2PIDBook[pSeq] = pGroup.ID()
for index, tx := range pGroup.Txs() {
committer.pTxsStatuses[pGroup.ID()][index] = initPTxCommitStatus(tx, workerEnv.Signer)
}
bundles: puissantPool,
resultOK: mapset.NewThreadUnsafeSet[types.PuissantID](),
resultFailed: mapset.NewThreadUnsafeSet[types.PuissantID](),
}
defer poolRemoveFn(committer.resultFailed)

puissantPool.PreparePacking(workerEnv.Header.Number.Uint64(), round)

workerEnv.PuissantTxQueue = types.NewTransactionsPuissant(workerEnv.Signer, pendingPool, puissantPool)
workerEnv.PuissantTxQueue.LogPuissantTxs()
committer.resetRunningGroup()
committer.resetPackingBundles(round)

for {
select {
case <-ctx.Done():
// timeout
return committer.finalStatistics(true)
return false, committer.finalStatistics(round, !puissantPool.IsEmpty())

case signal := <-interruptCh:
log.Info(" ⚠️ abort due to interruption", "when", "packingRunTime", "reason", signalToErr(signal))
return nil, true, signalToErr(signal)
log.Info("packing abort due to interruption", "when", "packingRunTime", "reason", signalToErr(signal))
return true, signalToErr(signal)

default:
}

if gasLeft := workerEnv.GasPool.Gas(); gasLeft < params.TxGas {
log.Warn(" 🐶 not enough gas for further transactions, commit directly", "packed-count", workerEnv.TxCount, "have", gasLeft, "want", params.TxGas)
return committer.finalStatistics(true)
return false, committer.finalStatistics(round, !puissantPool.IsEmpty())
}

tx := workerEnv.PuissantTxQueue.Peek()
if tx == nil {
return committer.finalStatistics(false)
return false, committer.finalStatistics(round, false)
}

committer.commitTransaction(tx)
committer.commitTransaction(round, tx)
}
}

type CommitterReport struct {
PuissantID types.PuissantID
Status puissantStatusCode
Info puissantInfoCode
Txs []*ptxCommitStatus
Income *big.Int
EvmRun bool
}
func (p *puissantRuntime) finalStatistics(round int, integrityCheck bool) error {
var unfinished = mapset.NewThreadUnsafeSet[types.PuissantID]()

func (p *puissantRuntime) finalStatistics(integrityCheck bool) ([]*CommitterReport, bool, error) {
if integrityCheck {
var unfinished = mapset.NewThreadUnsafeSet[types.PuissantID]()
for pid := range p.pTxsStatuses {
if !p.succeedGroup.Contains(pid) {
unfinished.Add(pid)
}
}
for _, tx := range p.env.PackedTxs {
if unfinished.Contains(tx.PuissantID()) {
return nil, false, errors.New("unfinished puissant included")
}
for _, bundle := range p.bundles {
if _, finish := bundle.FinalizeRound(round); !finish {
unfinished.Add(bundle.ID())
}
}

var report = make([]*CommitterReport, len(p.pSeqID2PIDBook))

for bSeq := 0; bSeq < len(p.pSeqID2PIDBook); bSeq++ {
var (
income = big.NewInt(0)
pid = p.pSeqID2PIDBook[bSeq]
pRpt = &CommitterReport{
PuissantID: pid,
Status: PuissantStatusWellDone,
Info: PuissantInfoCodeOk,
Txs: make([]*ptxCommitStatus, len(p.pTxsStatuses[pid])),
EvmRun: p.failedGroup.Contains(pid) || p.succeedGroup.Contains(pid),
}
)

for txSeq, txRpt := range p.pTxsStatuses[pid] {
if txRpt.error != nil {

switch {
case errors.Is(txRpt.error, types.PuiErrTxConflict):
txRpt.status = PuissantTransactionStatusConflictedBeaten
if pRpt.Info == PuissantInfoCodeOk {
pRpt.Info = PuissantInfoCodeBeaten
}

case errors.Is(txRpt.error, types.PuiErrTxConflict):
txRpt.status = PuissantTransactionStatusNoRun

default:
txRpt.status = PuissantTransactionStatusRevert
pRpt.Info = PuissantInfoCodeRevert
}
pRpt.Status = PuissantStatusDropped
txRpt.revertMsg = txRpt.error.Error()
}
if txSeq == 0 {
income = new(big.Int).Mul(txRpt.gasPrice, new(big.Int).SetUint64(txRpt.gasUsed))
if integrityCheck {
for _, tx := range p.env.PackedTxs {
if bundle, _ := tx.Bundle(); bundle != nil && unfinished.Contains(bundle.ID()) {
return errors.New("unfinished puissant included")
}
pRpt.Txs[txSeq] = txRpt
}
if pRpt.Status == PuissantStatusWellDone {
pRpt.Income = income
}
report[bSeq] = pRpt
}

return report, false, nil
return nil
}

func (p *puissantRuntime) commitTransaction(tx *types.Transaction) {
func (p *puissantRuntime) commitTransaction(round int, tx *types.Transaction) {
var (
pid, pSeq, pTxSeq = tx.PuissantInfo()
isBundleTx = pid.IsPuissant()
theBundle, bundleTxSeq = tx.Bundle()
isBundleTx = theBundle != nil
)

if isBundleTx {
p.snapshots.save(pid, p.env)
p.snapshots.save(theBundle.ID(), p.env)
}

p.env.State.SetTxContext(tx.Hash(), p.env.TxCount)

txReceipt, err := commitTransaction(tx, p.chain, p.chainConfig, p.env.Header.Coinbase,
p.env.State, p.env.GasPool, p.env.Header,
isBundleTx && !tx.AcceptsReverting(), isBundleTx && pTxSeq == 0, p.bloomProcessor)
isBundleTx && !theBundle.Revertible(tx.Hash()), isBundleTx && bundleTxSeq == 0, p.bloomProcessor)

if err == nil {
p.env.PackTx(tx, txReceipt)
Expand All @@ -197,100 +136,82 @@ func (p *puissantRuntime) commitTransaction(tx *types.Transaction) {
return
}

txStatus := p.pTxsStatuses[pid][pTxSeq]
txStatus.error = err
txStatus.hash = tx.Hash() // update hash again for auto burning tx

if err != nil {
log.Warn(" 🐶 puissant-tx ❌", "seq", pSeq, "tx", pTxSeq, "err", err)
txStatus.status = PuissantTransactionStatusRevert
//log.Warn("puissant-tx-failed", "seq", pSeq, "tx", bundleTxSeq, "err", err)

p.failedGroup.Add(pid)
theBundle.UpdateTransactionStatus(round, tx.Hash(), 0, types.PuiTransactionStatusRevert, err)

if !strings.HasPrefix(err.Error(), "execution reverted") {
p.failedTx.Add(tx.Hash())
} else {
var hasPreAffairs = false
for _txSeq := 0; _txSeq < pTxSeq; _txSeq++ {
if txStatus.gasUsed > 25000 && txStatus.hasData {
hasPreAffairs = true
break
}
}
if !hasPreAffairs {
p.failedTx.Add(tx.Hash())
}
}
p.snapshots.revert(pid, p.env)
p.resultFailed.Add(theBundle.ID())

p.snapshots.revert(theBundle.ID(), p.env)
// revert first, then reset
p.resetRunningGroup()
p.resetPackingBundles(round)

} else {
log.Info(" 🐶 puissant-tx ✅", "seq", pSeq, "tx", pTxSeq)
txStatus.gasUsed = txReceipt.GasUsed
txStatus.status = PuissantTransactionStatusOk
if p.pTxsStatuses[pid][len(p.pTxsStatuses[pid])-1].error == nil {
p.succeedGroup.Add(pid)
//log.Info("puissant-tx-pass", "seq", pSeq, "tx", bundleTxSeq)

theBundle.UpdateTransactionStatus(round, tx.Hash(), txReceipt.GasUsed, types.PuiTransactionStatusOk, nil)

if theBundle.IsPosted(round) {
p.resultOK.Add(theBundle.ID())
}
p.env.PuissantTxQueue.Pop()
}
}

func (p *puissantRuntime) resetRunningGroup() {
func (p *puissantRuntime) resetPackingBundles(round int) {
var (
included = mapset.NewThreadUnsafeSet[string]()
enabled []types.PuissantID
enabledSeq []int
included = mapset.NewThreadUnsafeSet[string]()
enabled []types.PuissantID
)

for pSeq := 0; pSeq < len(p.pSeqID2PIDBook); pSeq++ {
var (
conflict = false
failedHistory = -1
pid = p.pSeqID2PIDBook[pSeq]
txsStatus = p.pTxsStatuses[pid]
)
for _, bundle := range p.bundles {
var conflict bool

if p.failedGroup.Contains(pid) {
if p.resultFailed.Contains(bundle.ID()) {
continue
}

for _txSeq, tx := range txsStatus {
if included.Contains(tx.uniqueID) {
for _, tx := range bundle.Txs() {
if included.Contains(txUniqueID(p.env.Signer, tx)) {
conflict = true
tx.error = types.PuiErrTxConflict
tx.status = PuissantTransactionStatusConflictedBeaten
bundle.UpdateTransactionStatus(
round,
tx.Hash(),
0,
types.PuiTransactionStatusConflicted,
types.PuiTransactionStatusConflicted.ToError(),
)
break
}
if p.failedTx.Contains(tx.hash) && failedHistory == -1 {
failedHistory = _txSeq
}
}
if !conflict {
if failedHistory != -1 {
var couldRecover = false
for _txSeq := 0; _txSeq < failedHistory; _txSeq++ {
if txsStatus[_txSeq].hasData && txsStatus[_txSeq].gasLimit > 25000 {
couldRecover = true
}
}
if !couldRecover {
continue
}
for _, tx := range bundle.Txs() {
included.Add(txUniqueID(p.env.Signer, tx))
}
for _, tx := range txsStatus {
included.Add(tx.uniqueID)
}
enabled = append(enabled, pid)
enabledSeq = append(enabledSeq, pSeq)
enabled = append(enabled, bundle.ID())
}
}
if len(enabled) > 0 {
log.Info(" 🐶 running-group-reset 🔧", "len", len(enabled), "new-set", enabledSeq)
}
p.env.PuissantTxQueue.ResetEnable(enabled)
}

func txUniqueID(signer types.Signer, tx *types.Transaction) string {
var (
id bytes.Buffer
nonce = tx.Nonce()
sender, _ = types.Sender(signer, tx)
)

id.Grow(common.AddressLength + 4)
id.Write(sender.Bytes())
id.WriteByte(byte(nonce))
id.WriteByte(byte(nonce >> 8))
id.WriteByte(byte(nonce >> 16))
id.WriteByte(byte(nonce >> 24))

return id.String()
}

type envData struct {
storeEnv *MinerEnvironment
snapshotID int
Expand Down
Loading

0 comments on commit 420bb62

Please sign in to comment.