Skip to content

Commit

Permalink
Merge pull request #50 from binance-chain/issue49
Browse files Browse the repository at this point in the history
#46 async_local_client lock may not release in case of panic
  • Loading branch information
ackratos authored Jan 25, 2019
2 parents 8799753 + 364c003 commit 0586f7b
Show file tree
Hide file tree
Showing 4 changed files with 72 additions and 63 deletions.
6 changes: 5 additions & 1 deletion baseapp/baseapp.go
Original file line number Diff line number Diff line change
Expand Up @@ -543,6 +543,7 @@ func (app *BaseApp) CheckTx(txBytes []byte) (res abci.ResponseCheckTx) {
tx, ok := app.GetTxFromCache(txBytes)
if ok {
txHash := cmn.HexBytes(tmhash.Sum(txBytes)).String()
app.Logger.Debug("Handle CheckTx", "Tx", txHash)
result = app.RunTx(sdk.RunTxModeCheckAfterPre, txBytes, tx, txHash)
} else {
tx, err := app.TxDecoder(txBytes)
Expand All @@ -551,6 +552,7 @@ func (app *BaseApp) CheckTx(txBytes []byte) (res abci.ResponseCheckTx) {
} else {
app.txMsgCache.Add(string(txBytes), tx) // for recheck
txHash := cmn.HexBytes(tmhash.Sum(txBytes)).String()
app.Logger.Debug("Handle CheckTx", "Tx", txHash)
result = app.RunTx(sdk.RunTxModeCheck, txBytes, tx, txHash)
}
}
Expand All @@ -569,7 +571,7 @@ func (app *BaseApp) CheckTx(txBytes []byte) (res abci.ResponseCheckTx) {

func (app *BaseApp) preCheck(txBytes []byte, mode sdk.RunTxMode) sdk.Result {
var res sdk.Result
if app.preChecker != nil {
if app.preChecker != nil && !app.txMsgCache.Contains(string(txBytes)) {
var tx, err = app.TxDecoder(txBytes)
if err != nil {
res = err.Result()
Expand Down Expand Up @@ -630,13 +632,15 @@ func (app *BaseApp) DeliverTx(txBytes []byte) (res abci.ResponseDeliverTx) {
// here means either the tx has passed PreDeliverTx or CheckTx,
// no need to verify signature
txHash := cmn.HexBytes(tmhash.Sum(txBytes)).String()
app.Logger.Debug("Handle DeliverTx", "Tx", txHash)
result = app.RunTx(sdk.RunTxModeDeliverAfterPre, txBytes, tx, txHash)
} else {
var tx, err = app.TxDecoder(txBytes)
if err != nil {
result = err.Result()
} else {
txHash := cmn.HexBytes(tmhash.Sum(txBytes)).String()
app.Logger.Debug("Handle DeliverTx", "Tx", txHash)
result = app.RunTx(sdk.RunTxModeDeliver, txBytes, tx, txHash)
}
}
Expand Down
123 changes: 64 additions & 59 deletions server/concurrent/async_local_client.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
package concurrent

import (
"encoding/hex"
"sync"

"github.com/cosmos/cosmos-sdk/server/concurrent/pool"
abcicli "github.com/tendermint/tendermint/abci/client"

"github.com/tendermint/tendermint/abci/client"
"github.com/tendermint/tendermint/abci/types"
"github.com/tendermint/tendermint/crypto/tmhash"
cmn "github.com/tendermint/tendermint/libs/common"
"github.com/tendermint/tendermint/libs/log"
"github.com/tendermint/tendermint/proxy"
Expand Down Expand Up @@ -111,41 +112,43 @@ func (app *asyncLocalClient) checkTxWorker() {
for i := range app.checkTxQueue {
i.mtx.Lock() // wait the PreCheckTx finish
i.mtx.Unlock()
app.rwLock.Lock() // make sure not other non-CheckTx/non-DeliverTx ABCI is called
if i.reqRes.Response == nil {
tx := i.reqRes.Request.GetCheckTx().GetTx()
app.log.Debug("Handle Checktx", "Tx")
res := app.Application.CheckTx(tx)
i.reqRes.Response = types.ToResponseCheckTx(res) // Set response
}
i.reqRes.Done()
app.wgCommit.Done() // enable Commit to start
if cb := i.reqRes.GetCallback(); cb != nil {
cb(i.reqRes.Response)
}
app.Callback(i.reqRes.Request, i.reqRes.Response)
app.rwLock.Unlock() // this unlock is put after wgCommit.Done() to give commit priority
func() {
app.rwLock.Lock() // make sure not other non-CheckTx/non-DeliverTx ABCI is called
defer app.rwLock.Unlock() // this unlock is put after wgCommit.Done() to give commit priority
if i.reqRes.Response == nil {
tx := i.reqRes.Request.GetCheckTx().GetTx()
res := app.Application.CheckTx(tx)
i.reqRes.Response = types.ToResponseCheckTx(res) // Set response
}
i.reqRes.Done()
app.wgCommit.Done() // enable Commit to start
if cb := i.reqRes.GetCallback(); cb != nil {
cb(i.reqRes.Response)
}
app.Callback(i.reqRes.Request, i.reqRes.Response)
}()
}
}

func (app *asyncLocalClient) deliverTxWorker() {
for i := range app.deliverTxQueue {
i.mtx.Lock() // wait the PreCheckTx finish
i.mtx.Lock() // wait the PreDeliverTx finish
i.mtx.Unlock()
app.rwLock.Lock() // make sure not other non-CheckTx/non-DeliverTx ABCI is called
if i.reqRes.Response == nil {
tx := i.reqRes.Request.GetDeliverTx().GetTx()
app.log.Debug("Handle DeliverTx", "Tx", hex.EncodeToString(tx[:7]))
res := app.Application.DeliverTx(tx)
i.reqRes.Response = types.ToResponseDeliverTx(res) // Set response
}
i.reqRes.Done()
app.wgCommit.Done() // enable Commit to start
if cb := i.reqRes.GetCallback(); cb != nil {
cb(i.reqRes.Response)
}
app.Callback(i.reqRes.Request, i.reqRes.Response)
app.rwLock.Unlock() // this unlock is put after wgCommit.Done() to give commit priority
func() {
app.rwLock.Lock() // make sure not other non-CheckTx/non-DeliverTx ABCI is called
defer app.rwLock.Unlock() // this unlock is put after wgCommit.Done() to give commit priority
if i.reqRes.Response == nil {
tx := i.reqRes.Request.GetDeliverTx().GetTx()
res := app.Application.DeliverTx(tx)
i.reqRes.Response = types.ToResponseDeliverTx(res) // Set response
}
i.reqRes.Done()
app.wgCommit.Done() // enable Commit to start
if cb := i.reqRes.GetCallback(); cb != nil {
cb(i.reqRes.Response)
}
app.Callback(i.reqRes.Request, i.reqRes.Response)
}()
}
}

Expand Down Expand Up @@ -178,8 +181,8 @@ func (app *asyncLocalClient) InfoAsync(req types.RequestInfo) *abcicli.ReqRes {

func (app *asyncLocalClient) SetOptionAsync(req types.RequestSetOption) *abcicli.ReqRes {
app.rwLock.Lock()
defer app.rwLock.Unlock()
res := app.Application.SetOption(req)
app.rwLock.Unlock()
return app.callback(
types.ToRequestSetOption(req),
types.ToResponseSetOption(res),
Expand All @@ -192,18 +195,19 @@ func (app *asyncLocalClient) DeliverTxAsync(tx []byte) *abcicli.ReqRes {
reqres := abcicli.NewReqRes(reqp)
mtx := new(sync.Mutex)
mtx.Lock()
txHash := cmn.HexBytes(tmhash.Sum(tx)).String()
app.deliverTxQueue <- WorkItem{reqRes: reqres, mtx: mtx}
app.log.Debug("Enqueue DeliverTxAsync", "Tx", hex.EncodeToString(tx[:7]))
app.log.Debug("Enqueue DeliverTxAsync", "Tx", txHash)
//no need to lock commitLock because Commit and DeliverTx will not be called concurrently
app.wgCommit.Add(1)
app.deliverTxPool.Schedule(func() {
app.log.Debug("Start PreDeliverTx", "Tx", hex.EncodeToString(tx[:7]))
defer mtx.Unlock()
app.log.Debug("Start PreDeliverTx", "Tx", txHash)
res := app.Application.PreDeliverTx(tx)
if !res.IsOK() { // no need to call the real DeliverTx
reqres.Response = types.ToResponseDeliverTx(res)
}
app.log.Debug("Finish PreDeliverTx", "Tx", hex.EncodeToString(tx[:7]))
mtx.Unlock()
app.log.Debug("Finish PreDeliverTx", "Tx", txHash)
})

return reqres
Expand All @@ -219,30 +223,32 @@ func (app *asyncLocalClient) CheckTxAsync(tx []byte) *abcicli.ReqRes {
app.checkTxMidLock.Lock()
app.commitLock.Lock() // here would block further queue if commit is ready to go
app.checkTxMidLock.Unlock()
txHash := cmn.HexBytes(tmhash.Sum(tx)).String()
app.checkTxQueue <- WorkItem{reqRes: reqres, mtx: mtx}
app.log.Debug("Enqueue CheckTxAsync", "Tx", hex.EncodeToString(tx[:7]))
app.log.Debug("Enqueue CheckTxAsync", "Tx", txHash)
app.wgCommit.Add(1)
app.commitLock.Unlock()
app.checkTxLowLock.Unlock()
app.checkTxPool.Schedule(func() {
app.log.Debug("Start PreCheckTx", "Tx", hex.EncodeToString(tx[:7]))
defer mtx.Unlock()
app.log.Debug("Start PreCheckTx", "Tx", txHash)
res := app.Application.PreCheckTx(tx)
if !res.IsOK() { // no need to call the real CheckTx
reqres.Response = types.ToResponseCheckTx(res)
}
app.log.Debug("Finish PreCheckTx", "Tx", hex.EncodeToString(tx[:7]))
mtx.Unlock()
app.log.Debug("Finish PreCheckTx", "Tx", txHash)
})
return reqres
}

//ReCheckTxAsync here still runs synchronously
func (app *asyncLocalClient) ReCheckTxAsync(tx []byte) *abcicli.ReqRes {
app.rwLock.Lock() // wont
app.log.Debug("Start ReCheckAsync", "Tx", hex.EncodeToString(tx[:7]))
defer app.rwLock.Unlock()
txHash := cmn.HexBytes(tmhash.Sum(tx)).String()
app.log.Debug("Start ReCheckAsync", "Tx", txHash)
res := app.Application.ReCheckTx(tx)
app.log.Debug("Finish ReCheckAsync", "Tx", hex.EncodeToString(tx[:7]))
app.rwLock.Unlock()
app.log.Debug("Finish ReCheckAsync", "Tx", txHash)
return app.callback(
types.ToRequestCheckTx(tx),
types.ToResponseCheckTx(res),
Expand All @@ -264,16 +270,16 @@ func (app *asyncLocalClient) CommitAsync() *abcicli.ReqRes {
app.log.Debug("Trying to get CommitAsync lock")
app.checkTxMidLock.Lock()
app.commitLock.Lock() // this must come before the wgCommit.Wait()
defer app.commitLock.Unlock()
app.checkTxMidLock.Unlock()
app.wgCommit.Wait() // wait for all the submitted CheckTx/DeliverTx/Query finish
app.rwLock.Lock()
defer app.rwLock.Unlock()
// only checkTxLock is locked here
// because we trust deliver and commit will not call concurrently
app.log.Debug("Start CommitAsync")
res := app.Application.Commit()
app.log.Debug("Finish CommitAsync")
app.rwLock.Unlock()
app.commitLock.Unlock()
return app.callback(
types.ToRequestCommit(),
types.ToResponseCommit(res),
Expand All @@ -282,19 +288,19 @@ func (app *asyncLocalClient) CommitAsync() *abcicli.ReqRes {

func (app *asyncLocalClient) InitChainAsync(req types.RequestInitChain) *abcicli.ReqRes {
app.rwLock.Lock()
defer app.rwLock.Unlock()
res := app.Application.InitChain(req)
reqRes := app.callback(
types.ToRequestInitChain(req),
types.ToResponseInitChain(res),
)
app.rwLock.Unlock()
return reqRes
}

func (app *asyncLocalClient) BeginBlockAsync(req types.RequestBeginBlock) *abcicli.ReqRes {
app.rwLock.Lock()
defer app.rwLock.Unlock()
res := app.Application.BeginBlock(req)
app.rwLock.Unlock()
return app.callback(
types.ToRequestBeginBlock(req),
types.ToResponseBeginBlock(res),
Expand All @@ -305,16 +311,16 @@ func (app *asyncLocalClient) EndBlockAsync(req types.RequestEndBlock) *abcicli.R
app.log.Debug("Trying to get EndBlockAsync lock")
app.checkTxMidLock.Lock()
app.commitLock.Lock() // this must come before the wgCommit.Wait()
defer app.commitLock.Unlock()
app.checkTxMidLock.Unlock()
app.wgCommit.Wait() // wait for all the submitted CheckTx/DeliverTx/Query finish
app.rwLock.Lock()
defer app.rwLock.Unlock()
// only checkTxLock is locked here
// because we trust deliver and commit will not call concurrently
app.log.Debug("Starting EndBlockAsync")
res := app.Application.EndBlock(req)
app.log.Debug("Finish EndBlockAsync")
app.rwLock.Unlock()
app.commitLock.Unlock()
return app.callback(
types.ToRequestEndBlock(req),
types.ToResponseEndBlock(res),
Expand All @@ -340,24 +346,24 @@ func (app *asyncLocalClient) InfoSync(req types.RequestInfo) (*types.ResponseInf

func (app *asyncLocalClient) SetOptionSync(req types.RequestSetOption) (*types.ResponseSetOption, error) {
app.rwLock.Lock()
defer app.rwLock.Unlock()
res := app.Application.SetOption(req)
app.rwLock.Unlock()
return &res, nil
}

func (app *asyncLocalClient) DeliverTxSync(tx []byte) (*types.ResponseDeliverTx, error) {
app.rwLock.Lock()
defer app.rwLock.Unlock()
app.log.Debug("Start DeliverTxSync")
res := app.Application.DeliverTx(tx)
app.rwLock.Unlock()
return &res, nil
}

func (app *asyncLocalClient) CheckTxSync(tx []byte) (*types.ResponseCheckTx, error) {
app.rwLock.Lock()
defer app.rwLock.Unlock()
app.log.Debug("Start CheckTxSync")
res := app.Application.CheckTx(tx)
app.rwLock.Unlock()
return &res, nil
}

Expand All @@ -372,47 +378,46 @@ func (app *asyncLocalClient) CommitSync() (*types.ResponseCommit, error) {
app.log.Debug("Trying to get CommitSync Lock")
app.checkTxMidLock.Lock()
app.commitLock.Lock() // this must come before the wgCommit.Wait()
defer app.commitLock.Unlock()
app.checkTxMidLock.Unlock()
app.wgCommit.Wait() // wait for all the submitted CheckTx/DeliverTx/Query finish
app.rwLock.Lock()
defer app.rwLock.Unlock()
// only checkTxLock is locked here
// because we trust deliver and commit will not call concurrently
app.log.Debug("Start CommitSync")
res := app.Application.Commit()
app.log.Debug("Finish CommitSync")
app.rwLock.Unlock()
app.commitLock.Unlock()
return &res, nil
}

func (app *asyncLocalClient) InitChainSync(req types.RequestInitChain) (*types.ResponseInitChain, error) {
app.rwLock.Lock()
defer app.rwLock.Unlock()
res := app.Application.InitChain(req)
app.rwLock.Unlock()
return &res, nil
}

func (app *asyncLocalClient) BeginBlockSync(req types.RequestBeginBlock) (*types.ResponseBeginBlock, error) {
app.rwLock.Lock()
defer app.rwLock.Unlock()
res := app.Application.BeginBlock(req)
app.rwLock.Unlock()
return &res, nil
}

func (app *asyncLocalClient) EndBlockSync(req types.RequestEndBlock) (*types.ResponseEndBlock, error) {
app.log.Debug("Trying to get EndBlockSync lock")
app.checkTxMidLock.Lock()
app.commitLock.Lock() // this must come before the wgCommit.Wait()
defer app.commitLock.Unlock()
app.checkTxMidLock.Unlock()
app.wgCommit.Wait() // wait for all the submitted CheckTx/DeliverTx/Query finish
app.rwLock.Lock()
defer app.rwLock.Unlock()
app.log.Debug("Start EndBlockSync")
// only checkTxLock is locked here
// because we trust deliver and commit will not call concurrently
res := app.Application.EndBlock(req)
app.log.Debug("Finish EndBlockSync")
app.rwLock.Unlock()
app.commitLock.Unlock()
return &res, nil
}

Expand Down
2 changes: 1 addition & 1 deletion server/concurrent/async_local_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ func TestNewAsyncLocalClient(t *testing.T) {
cli.SetResponseCallback(func(*types.Request, *types.Response) {})
assert.NotNil(cli, "Failed to create AsyncLocalClient")
tx := make([]byte, 8)
// if all are sequential, it needs 800ms
// if all are sequential, it needs 300ms
expectStop := time.Now().Add(time.Millisecond * 300)
nonExpectShort := time.Now().Add(time.Millisecond * 25)
for i := 0; i < 2; i++ {
Expand Down
4 changes: 2 additions & 2 deletions server/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,15 @@ import (
"github.com/spf13/cobra"
"github.com/spf13/viper"

"github.com/cosmos/cosmos-sdk/server/concurrent"

"github.com/tendermint/tendermint/abci/server"
tcmd "github.com/tendermint/tendermint/cmd/tendermint/commands"
cmn "github.com/tendermint/tendermint/libs/common"
"github.com/tendermint/tendermint/node"
"github.com/tendermint/tendermint/p2p"
pvm "github.com/tendermint/tendermint/privval"
"github.com/tendermint/tendermint/proxy"

"github.com/cosmos/cosmos-sdk/server/concurrent"
)

const (
Expand Down

0 comments on commit 0586f7b

Please sign in to comment.