Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[OCC] add async scheduler to seiv2 (unused) #462

Open
wants to merge 39 commits into
base: seiv2
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
2e2d463
Expose parent on CacheKV (#352)
codchen Nov 17, 2023
f7a3c0e
Set TxIndex before generating dependencies (#358)
codchen Nov 22, 2023
cf98904
change DeliverTx to take typed tx (#360)
codchen Dec 4, 2023
0a74f75
Integrate with pending txs in mempool (#381)
codchen Dec 16, 2023
e46f9a1
Add checkTx cb (#382)
codchen Dec 20, 2023
9401eb9
Add wei support in bank keeper (#386)
codchen Dec 28, 2023
a93cbc0
Add ACL constants for Bank Wei prefix (#387)
codchen Dec 28, 2023
a85e9b4
[EVM] Add pending nonce support (#390)
stevenlanders Jan 4, 2024
b809302
rebase
codchen Jan 11, 2024
260d226
[EVM] Allow multiple txs from same account in a block (#397)
stevenlanders Jan 19, 2024
eeeba13
Expose VersionExists (#400)
codchen Jan 23, 2024
e2248dd
Add `DeleteAll` method to store type (#402)
codchen Jan 24, 2024
46ec3fb
Allow balance to go negative in SendCoinsAndWei (#417)
codchen Feb 1, 2024
d952e2a
Expose add/sub balance methods on bank keeper (#432)
codchen Feb 13, 2024
0d9eda0
Remove escrow account in wei logic (#434)
codchen Feb 14, 2024
c859681
Add evm address to keys output (#442)
codchen Feb 27, 2024
11cfa1c
Amplify cosmos tx priority (#443)
codchen Feb 27, 2024
b2e7486
occ-evm compatibility changes
udpatil Jan 8, 2024
23a17a2
update generate estimated writeset for evm compatibility
udpatil Jan 9, 2024
c54c4f0
use absoluteIndex instead of relative Index for txIndex
udpatil Jan 11, 2024
7f70613
Fix mvkv to adhere to updated KVStore interface
udpatil Jan 25, 2024
ed2ea7d
update schduler and tests
udpatil Jan 26, 2024
4322be8
update scheduler test call pattern
udpatil Feb 28, 2024
7418b51
Integrate with tendermint EVM tx replacement logic (#446)
codchen Mar 7, 2024
ae6f0c5
Fix indexesValidated and PrefillEstimates to operate on absolute idx …
udpatil Mar 11, 2024
edf3698
Modify max incarnation fallback (#460)
udpatil Mar 14, 2024
d9a0f35
add async scheduler
stevenlanders Mar 15, 2024
73e95cb
fix datarace
stevenlanders Mar 15, 2024
1823afc
fix stale tests
stevenlanders Mar 15, 2024
d78d934
fix other potential race
stevenlanders Mar 15, 2024
7d77cb8
add sequential fallback
stevenlanders Mar 15, 2024
b1599f5
cleanup
stevenlanders Mar 15, 2024
7c6493a
add occ-async flag
stevenlanders Mar 18, 2024
76de921
cleanup on config
stevenlanders Mar 18, 2024
5c11361
cleanup import format
stevenlanders Mar 18, 2024
150e7f1
s p a c i n g
stevenlanders Mar 18, 2024
16136b1
add debug line
stevenlanders Mar 18, 2024
0edfa5e
invalidate before executing a possibly-bad tx
stevenlanders Mar 18, 2024
0cc2a9b
add debug
stevenlanders Mar 18, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ jobs:
- uses: actions/checkout@v3
- uses: actions/setup-go@v3
with:
go-version: 1.19
go-version: 1.21
- name: Create a file with all core Cosmos SDK pkgs
run: go list ./... > pkgs.txt
- name: Split pkgs into 10 files
Expand Down Expand Up @@ -83,7 +83,7 @@ jobs:
- uses: actions/setup-python@v3
- uses: actions/setup-go@v3
with:
go-version: 1.19
go-version: 1.21
- uses: technote-space/[email protected]
with:
PATTERNS: |
Expand Down Expand Up @@ -118,7 +118,7 @@ jobs:
- uses: actions/checkout@v3
- uses: actions/setup-go@v3
with:
go-version: 1.19
go-version: 1.21

# Download all coverage reports from the 'tests' job
- name: Download coverage reports
Expand Down
77 changes: 59 additions & 18 deletions baseapp/abci.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@ import (
"syscall"
"time"

"github.com/cosmos/cosmos-sdk/tasks"

"github.com/armon/go-metrics"
"github.com/gogo/protobuf/proto"
abci "github.com/tendermint/tendermint/abci/types"
Expand All @@ -23,6 +21,8 @@ import (

"github.com/cosmos/cosmos-sdk/codec"
snapshottypes "github.com/cosmos/cosmos-sdk/snapshots/types"
"github.com/cosmos/cosmos-sdk/tasks"
"github.com/cosmos/cosmos-sdk/tasksv2"
"github.com/cosmos/cosmos-sdk/telemetry"
sdk "github.com/cosmos/cosmos-sdk/types"
sdkerrors "github.com/cosmos/cosmos-sdk/types/errors"
Expand Down Expand Up @@ -203,7 +203,7 @@ func (app *BaseApp) EndBlock(ctx sdk.Context, req abci.RequestEndBlock) (res abc
// internal CheckTx state if the AnteHandler passes. Otherwise, the ResponseCheckTx
// will contain releveant error information. Regardless of tx execution outcome,
// the ResponseCheckTx will contain relevant gas execution context.
func (app *BaseApp) CheckTx(ctx context.Context, req *abci.RequestCheckTx) (*abci.ResponseCheckTx, error) {
func (app *BaseApp) CheckTx(ctx context.Context, req *abci.RequestCheckTx) (*abci.ResponseCheckTxV2, error) {
defer telemetry.MeasureSince(time.Now(), "abci", "check_tx")

var mode runTxMode
Expand All @@ -220,28 +220,50 @@ func (app *BaseApp) CheckTx(ctx context.Context, req *abci.RequestCheckTx) (*abc
}

sdkCtx := app.getContextForTx(mode, req.Tx)
gInfo, result, _, priority, err := app.runTx(sdkCtx, mode, req.Tx)
tx, err := app.txDecoder(req.Tx)
if err != nil {
res := sdkerrors.ResponseCheckTx(err, 0, 0, app.trace)
return &abci.ResponseCheckTxV2{ResponseCheckTx: &res}, err
}
gInfo, result, _, priority, pendingTxChecker, expireTxHandler, txCtx, err := app.runTx(sdkCtx, mode, tx, sha256.Sum256(req.Tx))
if err != nil {
res := sdkerrors.ResponseCheckTx(err, gInfo.GasWanted, gInfo.GasUsed, app.trace)
return &res, err
return &abci.ResponseCheckTxV2{ResponseCheckTx: &res}, err
}

return &abci.ResponseCheckTx{
GasWanted: int64(gInfo.GasWanted), // TODO: Should type accept unsigned ints?
Data: result.Data,
Priority: priority,
}, nil
res := &abci.ResponseCheckTxV2{
ResponseCheckTx: &abci.ResponseCheckTx{
GasWanted: int64(gInfo.GasWanted), // TODO: Should type accept unsigned ints?
Data: result.Data,
Priority: priority,
},
ExpireTxHandler: expireTxHandler,
EVMNonce: txCtx.EVMNonce(),
EVMSenderAddress: txCtx.EVMSenderAddress(),
IsEVM: txCtx.IsEVM(),
}
if pendingTxChecker != nil {
res.IsPendingTransaction = true
res.Checker = pendingTxChecker
}

return res, nil
}

// DeliverTxBatch executes multiple txs
func (app *BaseApp) DeliverTxBatch(ctx sdk.Context, req sdk.DeliverTxBatchRequest) (res sdk.DeliverTxBatchResponse) {
scheduler := tasks.NewScheduler(app.concurrencyWorkers, app.TracingInfo, app.DeliverTx)
// This will basically no-op the actual prefill if the metadata for the txs is empty
var scheduler tasks.Scheduler
if app.occAsync {
scheduler = tasksv2.NewScheduler(app.concurrencyWorkers, app.TracingInfo, app.DeliverTx)
} else {
scheduler = tasks.NewScheduler(app.concurrencyWorkers, app.TracingInfo, app.DeliverTx)
}

// process all txs, this will also initializes the MVS if prefill estimates was disabled
txRes, err := scheduler.ProcessAll(ctx, req.TxEntries)
if err != nil {
// TODO: handle error
app.logger.Error("DeliverTxBatch failed", "err", err)
panic(err)
}

responses := make([]*sdk.DeliverTxResult, 0, len(req.TxEntries))
Expand All @@ -256,9 +278,7 @@ func (app *BaseApp) DeliverTxBatch(ctx sdk.Context, req sdk.DeliverTxBatchReques
// Otherwise, the ResponseDeliverTx will contain relevant error information.
// Regardless of tx execution outcome, the ResponseDeliverTx will contain relevant
// gas execution context.
// TODO: (occ) this is the function called from sei-chain to perform execution of a transaction.
// We'd likely replace this with an execution tasks that is scheduled by the OCC scheduler
func (app *BaseApp) DeliverTx(ctx sdk.Context, req abci.RequestDeliverTx) (res abci.ResponseDeliverTx) {
func (app *BaseApp) DeliverTx(ctx sdk.Context, req abci.RequestDeliverTx, tx sdk.Tx, checksum [32]byte) (res abci.ResponseDeliverTx) {
defer telemetry.MeasureSince(time.Now(), "abci", "deliver_tx")
defer func() {
for _, streamingListener := range app.abciListeners {
Expand All @@ -278,7 +298,7 @@ func (app *BaseApp) DeliverTx(ctx sdk.Context, req abci.RequestDeliverTx) (res a
telemetry.SetGauge(float32(gInfo.GasWanted), "tx", "gas", "wanted")
}()

gInfo, result, anteEvents, _, err := app.runTx(ctx.WithTxBytes(req.Tx).WithVoteInfos(app.voteInfos), runTxModeDeliver, req.Tx)
gInfo, result, anteEvents, _, _, _, resCtx, err := app.runTx(ctx.WithTxBytes(req.Tx).WithVoteInfos(app.voteInfos), runTxModeDeliver, tx, checksum)
if err != nil {
resultStr = "failed"
// if we have a result, use those events instead of just the anteEvents
Expand All @@ -288,13 +308,20 @@ func (app *BaseApp) DeliverTx(ctx sdk.Context, req abci.RequestDeliverTx) (res a
return sdkerrors.ResponseDeliverTxWithEvents(err, gInfo.GasWanted, gInfo.GasUsed, sdk.MarkEventsToIndex(anteEvents, app.indexEvents), app.trace)
}

return abci.ResponseDeliverTx{
res = abci.ResponseDeliverTx{
GasWanted: int64(gInfo.GasWanted), // TODO: Should type accept unsigned ints?
GasUsed: int64(gInfo.GasUsed), // TODO: Should type accept unsigned ints?
Log: result.Log,
Data: result.Data,
Events: sdk.MarkEventsToIndex(result.Events, app.indexEvents),
}
if resCtx.IsEVM() {
res.EvmTxInfo = &abci.EvmTxInfo{
SenderAddress: resCtx.EVMSenderAddress(),
Nonce: resCtx.EVMNonce(),
}
}
return
}

func (app *BaseApp) WriteStateToCommitAndGetWorkingHash() []byte {
Expand Down Expand Up @@ -1048,6 +1075,20 @@ func (app *BaseApp) ProcessProposal(ctx context.Context, req *abci.RequestProces
}
}()

defer func() {
if err := recover(); err != nil {
app.logger.Error(
"panic recovered in ProcessProposal",
"height", req.Height,
"time", req.Time,
"hash", fmt.Sprintf("%X", req.Hash),
"panic", err,
)

resp = &abci.ResponseProcessProposal{Status: abci.ResponseProcessProposal_REJECT}
}
}()

if app.processProposalHandler != nil {
resp, err = app.processProposalHandler(app.processProposalState.ctx, req)
if err != nil {
Expand Down
64 changes: 44 additions & 20 deletions baseapp/baseapp.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package baseapp

import (
"context"
"crypto/sha256"
"fmt"
"reflect"
"strings"
Expand Down Expand Up @@ -61,6 +60,7 @@ const (
FlagChainID = "chain-id"
FlagConcurrencyWorkers = "concurrency-workers"
FlagOccEnabled = "occ-enabled"
FlagOccAsync = "occ-async"
)

var (
Expand Down Expand Up @@ -158,8 +158,9 @@ type BaseApp struct { //nolint: maligned

ChainID string

votesInfoLock sync.RWMutex
commitLock *sync.Mutex
votesInfoLock sync.RWMutex
commitLock *sync.Mutex
checkTxStateLock *sync.RWMutex

compactionInterval uint64

Expand All @@ -170,6 +171,7 @@ type BaseApp struct { //nolint: maligned

concurrencyWorkers int
occEnabled bool
occAsync bool
}

type appStore struct {
Expand Down Expand Up @@ -275,10 +277,12 @@ func NewBaseApp(
TracingInfo: &tracing.Info{
Tracer: &tr,
},
commitLock: &sync.Mutex{},
commitLock: &sync.Mutex{},
checkTxStateLock: &sync.RWMutex{},
}

app.TracingInfo.SetContext(context.Background())
app.occAsync = cast.ToBool(appOpts.Get(FlagOccAsync))

for _, option := range options {
option(app)
Expand All @@ -305,6 +309,11 @@ func NewBaseApp(
app.concurrencyWorkers = config.DefaultConcurrencyWorkers
}

//TODO: remove occAsync feature flag after it's the default version
if app.occAsync {
app.logger.Info("asynchronous occ scheduler is enabled")
}

return app
}

Expand Down Expand Up @@ -530,6 +539,8 @@ func (app *BaseApp) IsSealed() bool { return app.sealed }
func (app *BaseApp) setCheckState(header tmproto.Header) {
ms := app.cms.CacheMultiStore()
ctx := sdk.NewContext(ms, header, true, app.logger).WithMinGasPrices(app.minGasPrices)
app.checkTxStateLock.Lock()
defer app.checkTxStateLock.Unlock()
if app.checkState == nil {
app.checkState = &state{
ms: ms,
Expand Down Expand Up @@ -802,16 +813,15 @@ func (app *BaseApp) getContextForTx(mode runTxMode, txBytes []byte) sdk.Context

// cacheTxContext returns a new context based off of the provided context with
// a branched multi-store.
// TODO: (occ) This is an example of where we wrap the multistore with a cache multistore, and then return a modified context using that multistore
func (app *BaseApp) cacheTxContext(ctx sdk.Context, txBytes []byte) (sdk.Context, sdk.CacheMultiStore) {
func (app *BaseApp) cacheTxContext(ctx sdk.Context, checksum [32]byte) (sdk.Context, sdk.CacheMultiStore) {
ms := ctx.MultiStore()
// TODO: https://github.com/cosmos/cosmos-sdk/issues/2824
msCache := ms.CacheMultiStore()
if msCache.TracingEnabled() {
msCache = msCache.SetTracingContext(
sdk.TraceContext(
map[string]interface{}{
"txHash": fmt.Sprintf("%X", sha256.Sum256(txBytes)),
"txHash": fmt.Sprintf("%X", checksum),
},
),
).(sdk.CacheMultiStore)
Expand All @@ -827,8 +837,16 @@ func (app *BaseApp) cacheTxContext(ctx sdk.Context, txBytes []byte) (sdk.Context
// Note, gas execution info is always returned. A reference to a Result is
// returned if the tx does not run out of gas and if all the messages are valid
// and execute successfully. An error is returned otherwise.
func (app *BaseApp) runTx(ctx sdk.Context, mode runTxMode, txBytes []byte) (gInfo sdk.GasInfo, result *sdk.Result, anteEvents []abci.Event, priority int64, err error) {

func (app *BaseApp) runTx(ctx sdk.Context, mode runTxMode, tx sdk.Tx, checksum [32]byte) (
gInfo sdk.GasInfo,
result *sdk.Result,
anteEvents []abci.Event,
priority int64,
pendingTxChecker abci.PendingTxChecker,
expireHandler abci.ExpireTxHandler,
txCtx sdk.Context,
err error,
) {
defer telemetry.MeasureThroughputSinceWithLabels(
telemetry.TxCount,
[]metrics.Label{
Expand All @@ -852,7 +870,7 @@ func (app *BaseApp) runTx(ctx sdk.Context, mode runTxMode, txBytes []byte) (gInf
spanCtx, span := app.TracingInfo.StartWithContext("RunTx", ctx.TraceSpanContext())
defer span.End()
ctx = ctx.WithTraceSpanContext(spanCtx)
span.SetAttributes(attribute.String("txHash", fmt.Sprintf("%X", sha256.Sum256(txBytes))))
span.SetAttributes(attribute.String("txHash", fmt.Sprintf("%X", checksum)))
}

// NOTE: GasWanted should be returned by the AnteHandler. GasUsed is
Expand All @@ -875,15 +893,14 @@ func (app *BaseApp) runTx(ctx sdk.Context, mode runTxMode, txBytes []byte) (gInf
gInfo = sdk.GasInfo{GasWanted: gasWanted, GasUsed: ctx.GasMeter().GasConsumed()}
}()

tx, err := app.txDecoder(txBytes)
if err != nil {
return sdk.GasInfo{}, nil, nil, 0, err
if tx == nil {
return sdk.GasInfo{}, nil, nil, 0, nil, nil, ctx, sdkerrors.Wrap(sdkerrors.ErrTxDecode, "tx decode error")
}

msgs := tx.GetMsgs()

if err := validateBasicTxMsgs(msgs); err != nil {
return sdk.GasInfo{}, nil, nil, 0, err
return sdk.GasInfo{}, nil, nil, 0, nil, nil, ctx, err
}

if app.anteHandler != nil {
Expand All @@ -904,7 +921,7 @@ func (app *BaseApp) runTx(ctx sdk.Context, mode runTxMode, txBytes []byte) (gInf
// NOTE: Alternatively, we could require that AnteHandler ensures that
// writes do not happen if aborted/failed. This may have some
// performance benefits, but it'll be more difficult to get right.
anteCtx, msCache = app.cacheTxContext(ctx, txBytes)
anteCtx, msCache = app.cacheTxContext(ctx, checksum)
anteCtx = anteCtx.WithEventManager(sdk.NewEventManager())
newCtx, err := app.anteHandler(anteCtx, tx, mode == runTxModeSimulate)

Expand All @@ -928,7 +945,7 @@ func (app *BaseApp) runTx(ctx sdk.Context, mode runTxMode, txBytes []byte) (gInf
// GasMeter expected to be set in AnteHandler
gasWanted = ctx.GasMeter().Limit()
if err != nil {
return gInfo, nil, nil, 0, err
return gInfo, nil, nil, 0, nil, nil, ctx, err
}

// Dont need to validate in checkTx mode
Expand All @@ -944,11 +961,13 @@ func (app *BaseApp) runTx(ctx sdk.Context, mode runTxMode, txBytes []byte) (gInf
op.EmitValidationFailMetrics()
}
errMessage := fmt.Sprintf("Invalid Concurrent Execution antehandler missing %d access operations", len(missingAccessOps))
return gInfo, nil, nil, 0, sdkerrors.Wrap(sdkerrors.ErrInvalidConcurrencyExecution, errMessage)
return gInfo, nil, nil, 0, nil, nil, ctx, sdkerrors.Wrap(sdkerrors.ErrInvalidConcurrencyExecution, errMessage)
}
}

priority = ctx.Priority()
pendingTxChecker = ctx.PendingTxChecker()
expireHandler = ctx.ExpireTxHandler()
msCache.Write()
anteEvents = events.ToABCIEvents()
if app.TracingEnabled {
Expand All @@ -959,7 +978,7 @@ func (app *BaseApp) runTx(ctx sdk.Context, mode runTxMode, txBytes []byte) (gInf
// Create a new Context based off of the existing Context with a MultiStore branch
// in case message processing fails. At this point, the MultiStore
// is a branch of a branch.
runMsgCtx, msCache := app.cacheTxContext(ctx, txBytes)
runMsgCtx, msCache := app.cacheTxContext(ctx, checksum)

// Attempt to execute all messages and only update state if all messages pass
// and we're in DeliverTx. Note, runMsgs will never return a reference to a
Expand All @@ -974,7 +993,10 @@ func (app *BaseApp) runTx(ctx sdk.Context, mode runTxMode, txBytes []byte) (gInf
// append the events in the order of occurrence
result.Events = append(anteEvents, result.Events...)
}
return gInfo, result, anteEvents, priority, err
if ctx.CheckTxCallback() != nil {
ctx.CheckTxCallback()(ctx, err)
}
return gInfo, result, anteEvents, priority, pendingTxChecker, expireHandler, ctx, err
}

// runMsgs iterates through a list of messages and executes them with the provided
Expand Down Expand Up @@ -1021,7 +1043,7 @@ func (app *BaseApp) runMsgs(ctx sdk.Context, msgs []sdk.Msg, mode runTxMode) (*s
err error
)

msgCtx, msgMsCache := app.cacheTxContext(ctx, []byte{})
msgCtx, msgMsCache := app.cacheTxContext(ctx, [32]byte{})
msgCtx = msgCtx.WithMessageIndex(i)

startTime := time.Now()
Expand Down Expand Up @@ -1164,5 +1186,7 @@ func (app *BaseApp) ReloadDB() error {
}

func (app *BaseApp) GetCheckCtx() sdk.Context {
app.checkTxStateLock.RLock()
defer app.checkTxStateLock.RUnlock()
return app.checkState.ctx
}
Loading
Loading