diff --git a/pkg/user/mocks/tx.pb.go b/pkg/user/mocks/tx.pb.go new file mode 100644 index 0000000000..8aee2647fe --- /dev/null +++ b/pkg/user/mocks/tx.pb.go @@ -0,0 +1,97 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: app/grpc/tx/tx.pb.go + +// Package mock_tx is a generated GoMock package. +package mock_tx + +import ( + context "context" + "fmt" + reflect "reflect" + + tx "github.com/celestiaorg/celestia-app/v3/app/grpc/tx" + gomock "github.com/golang/mock/gomock" + grpc "google.golang.org/grpc" +) + +// MockTxClient is a mock of TxClient interface. +type MockTxClient struct { + ctrl *gomock.Controller + recorder *MockTxClientMockRecorder +} + +// MockTxClientMockRecorder is the mock recorder for MockTxClient. +type MockTxClientMockRecorder struct { + mock *MockTxClient +} + +// NewMockTxClient creates a new mock instance. +func NewMockTxClient(ctrl *gomock.Controller) *MockTxClient { + mock := &MockTxClient{ctrl: ctrl} + mock.recorder = &MockTxClientMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockTxClient) EXPECT() *MockTxClientMockRecorder { + return m.recorder +} + +// TxStatus mocks base method. +func (m *MockTxClient) TxStatus(ctx context.Context, in *tx.TxStatusRequest, opts ...grpc.CallOption) (*tx.TxStatusResponse, error) { + m.ctrl.T.Helper() + varargs := []interface{}{ctx, in} + for _, a := range opts { + varargs = append(varargs, a) + } + ret := m.ctrl.Call(m, "TxStatus", varargs...) + ret0, _ := ret[0].(*tx.TxStatusResponse) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// TxStatus indicates an expected call of TxStatus. +func (mr *MockTxClientMockRecorder) TxStatus(ctx, in interface{}, opts ...interface{}) *gomock.Call { + fmt.Println("CALLING MOCK RECORDER") + mr.mock.ctrl.T.Helper() + varargs := append([]interface{}{ctx, in}, opts...) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "TxStatus", reflect.TypeOf((*MockTxClient)(nil).TxStatus), varargs...) +} + +// MockTxServer is a mock of TxServer interface. +type MockTxServer struct { + ctrl *gomock.Controller + recorder *MockTxServerMockRecorder +} + +// MockTxServerMockRecorder is the mock recorder for MockTxServer. +type MockTxServerMockRecorder struct { + mock *MockTxServer +} + +// NewMockTxServer creates a new mock instance. +func NewMockTxServer(ctrl *gomock.Controller) *MockTxServer { + mock := &MockTxServer{ctrl: ctrl} + mock.recorder = &MockTxServerMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockTxServer) EXPECT() *MockTxServerMockRecorder { + return m.recorder +} + +// TxStatus mocks base method. +func (m *MockTxServer) TxStatus(arg0 context.Context, arg1 *tx.TxStatusRequest) (*tx.TxStatusResponse, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "TxStatus", arg0, arg1) + ret0, _ := ret[0].(*tx.TxStatusResponse) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// TxStatus indicates an expected call of TxStatus. +func (mr *MockTxServerMockRecorder) TxStatus(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "TxStatus", reflect.TypeOf((*MockTxServer)(nil).TxStatus), arg0, arg1) +} diff --git a/pkg/user/tx_client.go b/pkg/user/tx_client.go index b90005f090..eda02745fa 100644 --- a/pkg/user/tx_client.go +++ b/pkg/user/tx_client.go @@ -12,7 +12,6 @@ import ( "time" "github.com/celestiaorg/go-square/v2/share" - blobtx "github.com/celestiaorg/go-square/v2/tx" "github.com/cosmos/cosmos-sdk/client" nodeservice "github.com/cosmos/cosmos-sdk/client/grpc/node" "github.com/cosmos/cosmos-sdk/client/grpc/tmservice" @@ -27,7 +26,6 @@ import ( "github.com/celestiaorg/celestia-app/v3/app" "github.com/celestiaorg/celestia-app/v3/app/encoding" - apperrors "github.com/celestiaorg/celestia-app/v3/app/errors" "github.com/celestiaorg/celestia-app/v3/app/grpc/tx" "github.com/celestiaorg/celestia-app/v3/pkg/appconsts" "github.com/celestiaorg/celestia-app/v3/x/blob/types" @@ -41,10 +39,9 @@ const ( type Option func(client *TxClient) -type TxPoolTx struct { - Timeout int64 - Nonce uint64 - Signer string +type PoolTxInfo struct { + Nonce uint64 + Signer string } // TxResponse is a response from the chain after @@ -126,6 +123,12 @@ func WithDefaultAccount(name string) Option { } } +func WithConsensusNode(node tx.TxClient) Option { + return func(c *TxClient) { + c.txClient = node + } +} + // TxClient is an abstraction for building, signing, and broadcasting Celestia transactions // It supports multiple accounts. If none is specified, it will // try use the default account. @@ -143,7 +146,9 @@ type TxClient struct { defaultGasPrice float64 defaultAccount string defaultAddress sdktypes.AccAddress - txPool map[string]TxPoolTx + txPool map[string]PoolTxInfo + // txClient is the client API for Tx service. + txClient tx.TxClient } // NewTxClient returns a new signer using the provided keyring @@ -176,6 +181,8 @@ func NewTxClient( defaultGasPrice: appconsts.DefaultMinGasPrice, defaultAccount: records[0].Name, defaultAddress: addr, + txPool: make(map[string]PoolTxInfo), + txClient: tx.NewTxClient(conn), } for _, opt := range options { @@ -380,40 +387,23 @@ func (client *TxClient) broadcastTx(ctx context.Context, txBytes []byte, signer return nil, err } if resp.TxResponse.Code != abci.CodeTypeOK { - if apperrors.IsNonceMismatchCode(resp.TxResponse.Code) { - _, seqNum, err := QueryAccount(ctx, client.grpc, client.registry, client.signer.accounts[signer].address) - // query the account to update the sequence number on-chain for the account - if err != nil { - return nil, fmt.Errorf("querying account for new sequence number: %w\noriginal tx response: %s", err, resp.TxResponse.RawLog) - } - if err := client.signer.SetSequence(signer, seqNum); err != nil { - return nil, fmt.Errorf("setting sequence: %w", err) - } - return client.retryBroadcastingTx(ctx, txBytes) - } broadcastTxErr := &BroadcastTxError{ TxHash: resp.TxResponse.TxHash, Code: resp.TxResponse.Code, ErrorLog: resp.TxResponse.RawLog, } - // transaction failed - // check if the signer has other txs in the pool - for txHash, tx := range client.txPool { - if tx.Signer == signer { - // set the time for when the tx failed + 1 minute (they can be resubmitted after this time) - tx.Timeout = time.Now().Add(time.Minute).Unix() - // update the nonce of other txs in the pool - tx.Nonce = seqNum - client.txPool[txHash] = tx - - } - } - // if yes, we need to adjust the nonce of the txs in the pool - // wait for them to be invalidated by their max heights - // and then resubmit the tx return nil, broadcastTxErr } + // add the broadcasted transaction to the local pool + fmt.Println(resp.TxResponse.TxHash, "TX HASH we are saving") + client.txPool[resp.TxResponse.TxHash] = PoolTxInfo{ + Nonce: client.signer.accounts[signer].Sequence(), + Signer: signer, + } + + fmt.Println("queried tx hash", client.txPool[resp.TxResponse.TxHash]) + // after the transaction has been submitted, we can increment the // sequence of the signer if err := client.signer.IncrementSequence(signer); err != nil { @@ -422,89 +412,15 @@ func (client *TxClient) broadcastTx(ctx context.Context, txBytes []byte, signer return resp.TxResponse, nil } -func (client *TxClient) updateNonces(signer string, newSeqNum uint64) { - client.mtx.Lock() - defer client.mtx.Unlock() - - latestNonce := newSeqNum - for txHash, tx := range client.txPool { - if tx.Signer == signer { - tx.Nonce = latestNonce - client.txPool[txHash] = tx - latestNonce++ - } - } -} - -// retryBroadcastingTx creates a new transaction by copying over an existing transaction but creates a new signature with the -// new sequence number. It then calls `broadcastTx` and attempts to submit the transaction -func (client *TxClient) retryBroadcastingTx(ctx context.Context, txBytes []byte) (*sdktypes.TxResponse, error) { - blobTx, isBlobTx, err := blobtx.UnmarshalBlobTx(txBytes) - if isBlobTx { - // only check the error if the bytes are supposed to be of type blob tx - if err != nil { - return nil, err - } - txBytes = blobTx.Tx - } - tx, err := client.signer.DecodeTx(txBytes) - if err != nil { - return nil, err - } - - opts := make([]TxOption, 0) - if granter := tx.FeeGranter(); granter != nil { - opts = append(opts, SetFeeGranter(granter)) - } - if payer := tx.FeePayer(); payer != nil { - opts = append(opts, SetFeePayer(payer)) - } - if memo := tx.GetMemo(); memo != "" { - opts = append(opts, SetMemo(memo)) - } - if fee := tx.GetFee(); fee != nil { - opts = append(opts, SetFee(fee.AmountOf(appconsts.BondDenom).Uint64())) - } - if gas := tx.GetGas(); gas > 0 { - opts = append(opts, SetGasLimit(gas)) - } - - txBuilder, err := client.signer.txBuilder(tx.GetMsgs(), opts...) - if err != nil { - return nil, err - } - signer, _, err := client.signer.signTransaction(txBuilder) - if err != nil { - return nil, fmt.Errorf("resigning transaction: %w", err) - } - - newTxBytes, err := client.signer.EncodeTx(txBuilder.GetTx()) - if err != nil { - return nil, err - } - - // rewrap the blob tx if it was originally a blob tx - if isBlobTx { - newTxBytes, err = blobtx.MarshalBlobTx(newTxBytes, blobTx.Blobs...) - if err != nil { - return nil, err - } - } - - return client.broadcastTx(ctx, newTxBytes, signer) -} - // ConfirmTx periodically pings the provided node for the commitment of a transaction by its // hash. It will continually loop until the context is cancelled, the tx is found or an error // is encountered. func (client *TxClient) ConfirmTx(ctx context.Context, txHash string) (*TxResponse, error) { - txClient := tx.NewTxClient(client.grpc) - pollTicker := time.NewTicker(client.pollTime) defer pollTicker.Stop() for { - resp, err := txClient.TxStatus(ctx, &tx.TxStatusRequest{TxId: txHash}) + resp, err := client.txClient.TxStatus(ctx, &tx.TxStatusRequest{TxId: txHash}) if err != nil { return nil, err } @@ -515,6 +431,7 @@ func (client *TxClient) ConfirmTx(ctx context.Context, txHash string) (*TxRespon // Continue polling if the transaction is still pending select { case <-ctx.Done(): + delete(client.txPool, txHash) return nil, ctx.Err() case <-pollTicker.C: continue @@ -531,13 +448,29 @@ func (client *TxClient) ConfirmTx(ctx context.Context, txHash string) (*TxRespon Code: resp.ExecutionCode, ErrorLog: resp.Error, } + delete(client.txPool, txHash) return nil, executionErr } + // remove it form the local pool + delete(client.txPool, txHash) return txResponse, nil case core.TxStatusEvicted: + fmt.Println("tx was evicted from the mempool") + // get transaction from the local pool + txPoolTx, exists := client.txPool[txHash] + fmt.Println("txPoolTx", txPoolTx) + if !exists { + return nil, fmt.Errorf("tx not found in tx client local pool: %s", txHash) + } + // set the signers sequence to the nonce of the tx + if err := client.signer.SetSequence(txPoolTx.Signer, txPoolTx.Nonce); err != nil { + return nil, fmt.Errorf("setting sequence: %w", err) + } + delete(client.txPool, txHash) return nil, fmt.Errorf("tx was evicted from the mempool") default: - return nil, fmt.Errorf("unknown tx: %s", txHash) + delete(client.txPool, txHash) + return nil, fmt.Errorf("transaction with hash %s not found; it was likely rejected", txHash) } } } @@ -617,6 +550,7 @@ func (client *TxClient) checkAccountLoaded(ctx context.Context, account string) if err != nil { return fmt.Errorf("retrieving address from keyring: %w", err) } + // FIXME: have a less trusting way of getting the account number and sequence accNum, sequence, err := QueryAccount(ctx, client.grpc, client.registry, addr) if err != nil { return fmt.Errorf("querying account %s: %w", account, err) @@ -645,6 +579,12 @@ func (client *TxClient) getAccountNameFromMsgs(msgs []sdktypes.Msg) (string, err return record.Name, nil } +// Method to get transaction info by hash +func (client *TxClient) GetTxInfo(hash string) (PoolTxInfo, bool) { + txInfo, exists := client.txPool[hash] + return txInfo, exists +} + // Signer exposes the tx clients underlying signer func (client *TxClient) Signer() *Signer { return client.signer diff --git a/pkg/user/tx_client_test.go b/pkg/user/tx_client_test.go index 1588c3a060..79f0890d31 100644 --- a/pkg/user/tx_client_test.go +++ b/pkg/user/tx_client_test.go @@ -5,10 +5,12 @@ import ( "testing" "time" + // "github.com/cosmos/cosmos-sdk/tests/mocks" sdk "github.com/cosmos/cosmos-sdk/types" sdktx "github.com/cosmos/cosmos-sdk/types/tx" "github.com/cosmos/cosmos-sdk/x/authz" bank "github.com/cosmos/cosmos-sdk/x/bank/types" + "github.com/golang/mock/gomock" "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" abci "github.com/tendermint/tendermint/abci/types" @@ -16,8 +18,10 @@ import ( "github.com/celestiaorg/celestia-app/v3/app" "github.com/celestiaorg/celestia-app/v3/app/encoding" + "github.com/celestiaorg/celestia-app/v3/app/grpc/tx" "github.com/celestiaorg/celestia-app/v3/pkg/appconsts" "github.com/celestiaorg/celestia-app/v3/pkg/user" + mock "github.com/celestiaorg/celestia-app/v3/pkg/user/mocks" "github.com/celestiaorg/celestia-app/v3/test/util/blobfactory" "github.com/celestiaorg/celestia-app/v3/test/util/testnode" ) @@ -166,16 +170,29 @@ func (suite *TxClientTestSuite) TestConfirmTx() { resp, err := suite.txClient.BroadcastTx(ctx, []sdk.Msg{msg}) require.NoError(t, err) + // check that transaction was indexed + txInfo, exists := suite.txClient.GetTxInfo(resp.TxHash) + require.True(t, exists) + require.Equal(t, suite.txClient.DefaultAccountName(), txInfo.Signer) + seq := suite.txClient.Signer().Account(suite.txClient.DefaultAccountName()).Sequence() + // Successfully broadcasted transaction increases the nonce + require.Equal(t, seq-1, txInfo.Nonce) + _, err = suite.txClient.ConfirmTx(ctx, resp.TxHash) require.Error(t, err) require.Contains(t, err.Error(), context.DeadlineExceeded.Error()) + + txInfo, exists = suite.txClient.GetTxInfo(resp.TxHash) + require.False(t, exists) + require.Zero(t, txInfo) }) t.Run("should error when tx is not found", func(t *testing.T) { ctx, cancel := context.WithTimeout(suite.ctx.GoContext(), 5*time.Second) defer cancel() - _, err := suite.txClient.ConfirmTx(ctx, "E32BD15CAF57AF15D17B0D63CF4E63A9835DD1CEBB059C335C79586BC3013728") - require.Contains(t, err.Error(), "unknown tx: E32BD15CAF57AF15D17B0D63CF4E63A9835DD1CEBB059C335C79586BC3013728") + resp, err := suite.txClient.ConfirmTx(ctx, "E32BD15CAF57AF15D17B0D63CF4E63A9835DD1CEBB059C335C79586BC3013728") + require.Contains(t, err.Error(), "transaction with hash E32BD15CAF57AF15D17B0D63CF4E63A9835DD1CEBB059C335C79586BC3013728 not found; it was likely rejected") + require.Nil(t, resp) }) t.Run("should return error log when execution fails", func(t *testing.T) { @@ -183,9 +200,21 @@ func (suite *TxClientTestSuite) TestConfirmTx() { msg := authz.NewMsgExec(suite.txClient.DefaultAddress(), []sdk.Msg{innerMsg}) resp, err := suite.txClient.BroadcastTx(suite.ctx.GoContext(), []sdk.Msg{&msg}, fee, gas) require.NoError(t, err) - _, err = suite.txClient.ConfirmTx(suite.ctx.GoContext(), resp.TxHash) + + txInfo, exists := suite.txClient.GetTxInfo(resp.TxHash) + require.True(t, exists) + require.Equal(t, suite.txClient.DefaultAccountName(), txInfo.Signer) + seq := suite.txClient.Signer().Account(suite.txClient.DefaultAccountName()).Sequence() + // Successfully broadcasted transaction increases the nonce + require.Equal(t, seq-1, txInfo.Nonce) + + confirmTxResp, err := suite.txClient.ConfirmTx(suite.ctx.GoContext(), resp.TxHash) require.Error(t, err) require.Contains(t, err.Error(), "authorization not found") + require.Nil(t, confirmTxResp) + txInfo, exists = suite.txClient.GetTxInfo(resp.TxHash) + require.False(t, exists) + require.Zero(t, txInfo) }) t.Run("should success when tx is found immediately", func(t *testing.T) { @@ -194,11 +223,22 @@ func (suite *TxClientTestSuite) TestConfirmTx() { resp, err := suite.txClient.BroadcastTx(suite.ctx.GoContext(), []sdk.Msg{msg}, fee, gas) require.NoError(t, err) require.NotNil(t, resp) + + txInfo, exists := suite.txClient.GetTxInfo(resp.TxHash) + require.True(t, exists) + require.Equal(t, suite.txClient.DefaultAccountName(), txInfo.Signer) + seq := suite.txClient.Signer().Account(suite.txClient.DefaultAccountName()).Sequence() + // Successfully broadcasted transaction increases the nonce + require.Equal(t, seq-1, txInfo.Nonce) + ctx, cancel := context.WithTimeout(suite.ctx.GoContext(), 30*time.Second) defer cancel() confirmTxResp, err := suite.txClient.ConfirmTx(ctx, resp.TxHash) require.NoError(t, err) require.Equal(t, abci.CodeTypeOK, confirmTxResp.Code) + txInfo, exists = suite.txClient.GetTxInfo(resp.TxHash) + require.False(t, exists) + require.Zero(t, txInfo) }) t.Run("should error when tx is found with a non-zero error code", func(t *testing.T) { @@ -209,11 +249,60 @@ func (suite *TxClientTestSuite) TestConfirmTx() { resp, err := suite.txClient.BroadcastTx(suite.ctx.GoContext(), []sdk.Msg{msg}, fee, gas) require.NoError(t, err) require.NotNil(t, resp) - _, err = suite.txClient.ConfirmTx(suite.ctx.GoContext(), resp.TxHash) + + txInfo, exists := suite.txClient.GetTxInfo(resp.TxHash) + require.True(t, exists) + require.Equal(t, suite.txClient.DefaultAccountName(), txInfo.Signer) + seq := suite.txClient.Signer().Account(suite.txClient.DefaultAccountName()).Sequence() + // Successfully broadcasted transaction increases the nonce + require.Equal(t, seq-1, txInfo.Nonce) + + confirmTxResp, err := suite.txClient.ConfirmTx(suite.ctx.GoContext(), resp.TxHash) require.Error(t, err) + require.Nil(t, confirmTxResp) code := err.(*user.ExecutionError).Code require.NotEqual(t, abci.CodeTypeOK, code) + txInfo, exists = suite.txClient.GetTxInfo(resp.TxHash) + require.False(t, exists) + require.Zero(t, txInfo) }) + + t.Run("should adjust nonce when evicted", func(t *testing.T) { + mockCtrl := gomock.NewController(t) + t.Cleanup(mockCtrl.Finish) + + // Create a mock client that always returns EVICTED status + mockTxClient := mock.NewMockTxClient(mockCtrl) + txClient, err := user.SetupTxClient(suite.ctx.GoContext(), suite.ctx.Keyring, suite.ctx.GRPCClient, suite.encCfg, user.WithGasMultiplier(1.2), user.WithConsensusNode(mockTxClient)) + require.NoError(t, err) + mockTxClient.EXPECT().TxStatus(gomock.Any(), gomock.Any()).Return(&tx.TxStatusResponse{ + Status: "EVICTED", + }, nil) + + // Signer sequence before + sequenceBeforeTx := txClient.Signer().Account(txClient.DefaultAccountName()).Sequence() + + // Broadcast a tx + addr := txClient.DefaultAddress() + msg := bank.NewMsgSend(addr, testnode.RandomAddress().(sdk.AccAddress), sdk.NewCoins(sdk.NewInt64Coin(app.BondDenom, 10))) + resp, err := txClient.BroadcastTx(suite.ctx.GoContext(), []sdk.Msg{msg}, fee, gas) + require.NoError(t, err) + require.NotNil(t, resp) + + // Tx should be evicted + _, err = txClient.ConfirmTx(suite.ctx.GoContext(), resp.TxHash) + require.Error(t, err) + require.Contains(t, err.Error(), "tx was evicted from the mempool") + // Should be removed from the txInfo map + txInfo, exists := txClient.GetTxInfo(resp.TxHash) + require.False(t, exists) + require.Zero(t, txInfo) + // Signer sequence should remain the same + seq := txClient.Signer().Account(txClient.DefaultAccountName()).Sequence() + require.Equal(t, seq, sequenceBeforeTx) + + }) + } func (suite *TxClientTestSuite) TestGasEstimation() {