From 0702cf00804938b7c7cc0e3f5053060be1618dfd Mon Sep 17 00:00:00 2001 From: Roshan <48975233+Pythonberg1997@users.noreply.github.com> Date: Mon, 8 Apr 2024 10:51:49 +0800 Subject: [PATCH] feat: add `ExecutorApp` (#595) * feat: add `ExecutorApp` * fix lint issue * fix review comments * fix abi decode methods * add unit test * fix review comments * fix review comments * add `registerErdosUpgradeHandler` * remove unused code in `registerErdosUpgradeHandler` --- app/app.go | 2 +- app/upgrade.go | 24 ++ x/storage/keeper/cross_app_executor.go | 304 ++++++++++++++++++++ x/storage/keeper/cross_app_executor_test.go | 50 ++++ x/storage/types/crosschain.go | 6 +- x/storage/types/expected_keepers.go | 20 ++ 6 files changed, 403 insertions(+), 3 deletions(-) create mode 100644 x/storage/keeper/cross_app_executor.go create mode 100644 x/storage/keeper/cross_app_executor_test.go diff --git a/app/app.go b/app/app.go index c467d47bf..7cb184c3d 100644 --- a/app/app.go +++ b/app/app.go @@ -704,7 +704,7 @@ func New( } if app.IsIavlStore() { - //enable diff for reconciliation + // enable diff for reconciliation bankIavl, ok := ms.GetCommitStore(keys[banktypes.StoreKey]).(*iavl.Store) if !ok { tmos.Exit("cannot convert bank store to ival store") diff --git a/app/upgrade.go b/app/upgrade.go index 7d4e34531..0e663f5c0 100644 --- a/app/upgrade.go +++ b/app/upgrade.go @@ -9,7 +9,9 @@ import ( bridgemoduletypes "github.com/bnb-chain/greenfield/x/bridge/types" paymentmodule "github.com/bnb-chain/greenfield/x/payment" + paymentmodulekeeper "github.com/bnb-chain/greenfield/x/payment/keeper" paymenttypes "github.com/bnb-chain/greenfield/x/payment/types" + storagemodulekeeper "github.com/bnb-chain/greenfield/x/storage/keeper" storagemoduletypes "github.com/bnb-chain/greenfield/x/storage/types" virtualgroupmodule "github.com/bnb-chain/greenfield/x/virtualgroup" virtualgrouptypes "github.com/bnb-chain/greenfield/x/virtualgroup/types" @@ -31,6 +33,7 @@ func (app *App) RegisterUpgradeHandlers(chainID string, serverCfg *serverconfig. app.registerUralUpgradeHandler() app.registerPawneeUpgradeHandler() app.registerSerengetiUpgradeHandler() + app.registerErdosUpgradeHandler() // app.register...() // ... return nil @@ -238,3 +241,24 @@ func (app *App) registerSerengetiUpgradeHandler() { return nil }) } + +func (app *App) registerErdosUpgradeHandler() { + // Register the upgrade handler + app.UpgradeKeeper.SetUpgradeHandler(upgradetypes.Erdos, + func(ctx sdk.Context, plan upgradetypes.Plan, fromVM module.VersionMap) (module.VersionMap, error) { + app.Logger().Info("upgrade to ", plan.Name) + return app.mm.RunMigrations(ctx, app.configurator, fromVM) + }) + + // Register the upgrade initializer + app.UpgradeKeeper.SetUpgradeInitializer(upgradetypes.Erdos, + func() error { + app.Logger().Info("Init Erdos upgrade") + executorApp := storagemodulekeeper.NewExecutorApp(app.StorageKeeper, storagemodulekeeper.NewMsgServerImpl(app.StorageKeeper), paymentmodulekeeper.NewMsgServerImpl(app.PaymentKeeper)) + err := app.CrossChainKeeper.RegisterChannel(storagemoduletypes.ExecutorChannel, storagemoduletypes.ExecutorChannelId, executorApp) + if err != nil { + panic(err) + } + return nil + }) +} diff --git a/x/storage/keeper/cross_app_executor.go b/x/storage/keeper/cross_app_executor.go new file mode 100644 index 000000000..982fa2af1 --- /dev/null +++ b/x/storage/keeper/cross_app_executor.go @@ -0,0 +1,304 @@ +package keeper + +import ( + "encoding/hex" + "fmt" + "runtime/debug" + "strings" + + sdk "github.com/cosmos/cosmos-sdk/types" + "github.com/ethereum/go-ethereum/accounts/abi" + "github.com/ethereum/go-ethereum/common" + + paymentmoduletypes "github.com/bnb-chain/greenfield/x/payment/types" + "github.com/bnb-chain/greenfield/x/storage/types" +) + +var _ sdk.CrossChainApplication = &ExecutorApp{} + +type ExecutorApp struct { + sKeeper types.StorageKeeper + sMsgServer types.StorageMsgServer + pMsgServer types.PaymentMsgServer +} + +func NewExecutorApp(storageKeeper types.StorageKeeper, storageMsgServer types.StorageMsgServer, paymentMsgServer types.PaymentMsgServer) *ExecutorApp { + return &ExecutorApp{ + sKeeper: storageKeeper, + sMsgServer: storageMsgServer, + pMsgServer: paymentMsgServer, + } +} + +func (app *ExecutorApp) ExecuteSynPackage(ctx sdk.Context, appCtx *sdk.CrossChainAppContext, payload []byte) (result sdk.ExecuteResult) { + // This app will not have any ack/failAck package, so we should not panic. + defer func() { + if r := recover(); r != nil { + log := fmt.Sprintf("recovered: %v\nstack:\n%v", r, string(debug.Stack())) + app.sKeeper.Logger(ctx).Error("execute executor syn package panic", "error", log) + result.Err = fmt.Errorf("execute executor syn package panic: %v", r) + } + }() + + pack, err := DeserializeSynPackage(payload) + if err != nil { + app.sKeeper.Logger(ctx).Error("deserialize executor syn package error", "payload", hex.EncodeToString(payload), "error", err.Error()) + return sdk.ExecuteResult{ + Err: fmt.Errorf("deserialize executor syn package error: %v", err), + } + } + + for i, msgBz := range pack { + msg, err := DeserializeExecutorMsg(msgBz) + if err != nil { + app.sKeeper.Logger(ctx).Error("deserialize executor msg error", "msg bytes", hex.EncodeToString(msgBz), "error", err.Error()) + return sdk.ExecuteResult{ + Err: fmt.Errorf("deserialize executor msg error: %v", err), + } + } + + err = app.msgHandler(ctx, msg) + if err != nil { + app.sKeeper.Logger(ctx).Error("execute executor msg error", "index", i, "data", msgBz, "error", err.Error()) + return sdk.ExecuteResult{ + Err: fmt.Errorf("execute executor msg error: %v", err), + } + } + } + + return result +} + +func (app *ExecutorApp) ExecuteAckPackage(ctx sdk.Context, appCtx *sdk.CrossChainAppContext, payload []byte) sdk.ExecuteResult { + app.sKeeper.Logger(ctx).Error("received execute ack package ") + return sdk.ExecuteResult{} +} + +func (app *ExecutorApp) ExecuteFailAckPackage(ctx sdk.Context, appCtx *sdk.CrossChainAppContext, payload []byte) sdk.ExecuteResult { + app.sKeeper.Logger(ctx).Error("received execute fail ack package ") + return sdk.ExecuteResult{} +} + +type MsgType uint8 + +const ( + MsgTypeCreatePaymentAccount MsgType = 1 + MsgTypeDeposit MsgType = 2 + MsgTypeDisableRefund MsgType = 3 + MsgWithdraw MsgType = 4 + MsgMigrateBucket MsgType = 5 + MsgCancelMigrateBucket MsgType = 6 + MsgUpdateBucketInfo MsgType = 7 + MsgToggleSPAsDelegatedAgent MsgType = 8 + MsgSetBucketFlowRateLimit MsgType = 9 + MsgCopyObject MsgType = 10 + MsgUpdateObjectInfo MsgType = 11 + MsgUpdateGroupExtra MsgType = 12 + MsgSetTag MsgType = 13 +) + +func (app *ExecutorApp) msgHandler(ctx sdk.Context, msg ExecutorMsg) error { + msgSender, err := sdk.AccAddressFromHexUnsafe(msg.Sender.String()) + if err != nil { + return err + } + + switch MsgType(msg.Type) { + case MsgTypeCreatePaymentAccount: + var gnfdMsg paymentmoduletypes.MsgCreatePaymentAccount + err = gnfdMsg.Unmarshal(msg.Data) + if err != nil { + return err + } + if err = checkMsg(msgSender, &gnfdMsg); err != nil { + return err + } + _, err = app.pMsgServer.CreatePaymentAccount(sdk.WrapSDKContext(ctx), &gnfdMsg) + case MsgTypeDeposit: + var gnfdMsg paymentmoduletypes.MsgDeposit + err = gnfdMsg.Unmarshal(msg.Data) + if err != nil { + return err + } + if err = checkMsg(msgSender, &gnfdMsg); err != nil { + return err + } + _, err = app.pMsgServer.Deposit(sdk.WrapSDKContext(ctx), &gnfdMsg) + case MsgTypeDisableRefund: + var gnfdMsg paymentmoduletypes.MsgDisableRefund + err = gnfdMsg.Unmarshal(msg.Data) + if err != nil { + return err + } + if err = checkMsg(msgSender, &gnfdMsg); err != nil { + return err + } + _, err = app.pMsgServer.DisableRefund(sdk.WrapSDKContext(ctx), &gnfdMsg) + case MsgWithdraw: + var gnfdMsg paymentmoduletypes.MsgWithdraw + err = gnfdMsg.Unmarshal(msg.Data) + if err != nil { + return err + } + if err = checkMsg(msgSender, &gnfdMsg); err != nil { + return err + } + _, err = app.pMsgServer.Withdraw(sdk.WrapSDKContext(ctx), &gnfdMsg) + case MsgMigrateBucket: + var gnfdMsg types.MsgMigrateBucket + err = gnfdMsg.Unmarshal(msg.Data) + if err != nil { + return err + } + if err = checkMsg(msgSender, &gnfdMsg); err != nil { + return err + } + _, err = app.sMsgServer.MigrateBucket(sdk.WrapSDKContext(ctx), &gnfdMsg) + case MsgCancelMigrateBucket: + var gnfdMsg types.MsgCancelMigrateBucket + err = gnfdMsg.Unmarshal(msg.Data) + if err != nil { + return err + } + if err = checkMsg(msgSender, &gnfdMsg); err != nil { + return err + } + _, err = app.sMsgServer.CancelMigrateBucket(sdk.WrapSDKContext(ctx), &gnfdMsg) + case MsgUpdateBucketInfo: + var gnfdMsg types.MsgUpdateBucketInfo + err = gnfdMsg.Unmarshal(msg.Data) + if err != nil { + return err + } + if err = checkMsg(msgSender, &gnfdMsg); err != nil { + return err + } + _, err = app.sMsgServer.UpdateBucketInfo(sdk.WrapSDKContext(ctx), &gnfdMsg) + case MsgToggleSPAsDelegatedAgent: + var gnfdMsg types.MsgToggleSPAsDelegatedAgent + err = gnfdMsg.Unmarshal(msg.Data) + if err != nil { + return err + } + if err = checkMsg(msgSender, &gnfdMsg); err != nil { + return err + } + _, err = app.sMsgServer.ToggleSPAsDelegatedAgent(sdk.WrapSDKContext(ctx), &gnfdMsg) + case MsgSetBucketFlowRateLimit: + var gnfdMsg types.MsgSetBucketFlowRateLimit + err = gnfdMsg.Unmarshal(msg.Data) + if err != nil { + return err + } + if err = checkMsg(msgSender, &gnfdMsg); err != nil { + return err + } + _, err = app.sMsgServer.SetBucketFlowRateLimit(sdk.WrapSDKContext(ctx), &gnfdMsg) + case MsgCopyObject: + var gnfdMsg types.MsgCopyObject + err = gnfdMsg.Unmarshal(msg.Data) + if err != nil { + return err + } + if err = checkMsg(msgSender, &gnfdMsg); err != nil { + return err + } + _, err = app.sMsgServer.CopyObject(sdk.WrapSDKContext(ctx), &gnfdMsg) + case MsgUpdateObjectInfo: + var gnfdMsg types.MsgUpdateObjectInfo + err = gnfdMsg.Unmarshal(msg.Data) + if err != nil { + return err + } + if err = checkMsg(msgSender, &gnfdMsg); err != nil { + return err + } + _, err = app.sMsgServer.UpdateObjectInfo(sdk.WrapSDKContext(ctx), &gnfdMsg) + case MsgUpdateGroupExtra: + var gnfdMsg types.MsgUpdateGroupExtra + err = gnfdMsg.Unmarshal(msg.Data) + if err != nil { + return err + } + if err = checkMsg(msgSender, &gnfdMsg); err != nil { + return err + } + _, err = app.sMsgServer.UpdateGroupExtra(sdk.WrapSDKContext(ctx), &gnfdMsg) + case MsgSetTag: + var gnfdMsg types.MsgSetTag + err = gnfdMsg.Unmarshal(msg.Data) + if err != nil { + return err + } + if err = checkMsg(msgSender, &gnfdMsg); err != nil { + return err + } + _, err = app.sMsgServer.SetTag(sdk.WrapSDKContext(ctx), &gnfdMsg) + default: + err = fmt.Errorf("invalid msg type") + } + return err +} + +type ExecutorSynPackage [][]byte + +type ExecutorMsg struct { + Sender common.Address + Type uint8 + Data []byte +} + +var ( + executorSynPackageTypeDef = `[{"type": "bytes[]"}]` + + executorMsgTypeDef = `[{"type": "address"}, {"type": "uint8"}, {"type": "bytes"}]` +) + +func DeserializeSynPackage(payload []byte) (ExecutorSynPackage, error) { + unpacked, err := abiDecode(executorSynPackageTypeDef, payload) + if err != nil { + return ExecutorSynPackage{}, err + } + + unpackedStruct := abi.ConvertType(unpacked[0], ExecutorSynPackage{}) + pkgStruct, ok := unpackedStruct.(ExecutorSynPackage) + if !ok { + return ExecutorSynPackage{}, err + } + return pkgStruct, nil +} + +func DeserializeExecutorMsg(msgBz []byte) (ExecutorMsg, error) { + unpacked, err := abiDecode(executorMsgTypeDef, msgBz) + if err != nil { + return ExecutorMsg{}, err + } + + var executorMsg ExecutorMsg + executorMsg.Sender = abi.ConvertType(unpacked[0], common.Address{}).(common.Address) + executorMsg.Type = abi.ConvertType(unpacked[1], uint8(0)).(uint8) + executorMsg.Data = abi.ConvertType(unpacked[2], []byte{}).([]byte) + return executorMsg, nil +} + +func abiDecode(typeDef string, encodedBz []byte) ([]interface{}, error) { + outDef := fmt.Sprintf(`[{ "name" : "method", "type": "function", "outputs": %s}]`, typeDef) + outAbi, err := abi.JSON(strings.NewReader(outDef)) + if err != nil { + return nil, err + } + return outAbi.Unpack("method", encodedBz) +} + +func checkMsg(msgSender sdk.AccAddress, msg sdk.Msg) error { + if err := msg.ValidateBasic(); err != nil { + return err + } + if len(msg.GetSigners()) != 1 { + return fmt.Errorf("invalid signers number") + } + if !msg.GetSigners()[0].Equals(msgSender) { + return fmt.Errorf("invalid msg sender") + } + return nil +} diff --git a/x/storage/keeper/cross_app_executor_test.go b/x/storage/keeper/cross_app_executor_test.go new file mode 100644 index 000000000..0e0cb953e --- /dev/null +++ b/x/storage/keeper/cross_app_executor_test.go @@ -0,0 +1,50 @@ +package keeper_test + +import ( + "encoding/hex" + "fmt" + "math/rand" + "testing" + + "github.com/cosmos/gogoproto/proto" + "github.com/stretchr/testify/assert" + + "github.com/bnb-chain/greenfield/testutil/sample" + "github.com/bnb-chain/greenfield/x/storage/keeper" + "github.com/bnb-chain/greenfield/x/storage/types" +) + +func TestDecodeMsg(t *testing.T) { + operator := sample.RandAccAddress() + bucketName := string(sample.RandStr(10)) + msg := types.NewMsgMigrateBucket(operator, bucketName, rand.Uint32()) + msgBz, err := proto.Marshal(msg) + if err != nil { + t.Fatal(err) + } + + var gnfdMsg types.MsgMigrateBucket + err = gnfdMsg.Unmarshal(msgBz) + if err != nil { + t.Fatal(err) + } + + assert.Equal(t, msg.Operator, gnfdMsg.Operator) + assert.Equal(t, msg.BucketName, gnfdMsg.BucketName) + assert.Equal(t, msg.DstPrimarySpId, gnfdMsg.DstPrimarySpId) +} + +func TestDecodeSynPackage(t *testing.T) { + payloadStr := "00000000000000000000000000000000000000000000000000000000000000200000000000000000000000000000000000000000000000000000000000000001000000000000000000000000000000000000000000000000000000000000002000000000000000000000000000000000000000000000000000000000000000c0000000000000000000000000bfd66d9636253f11ae43f3428e8df73b5ad6950f00000000000000000000000000000000000000000000000000000000000000010000000000000000000000000000000000000000000000000000000000000060000000000000000000000000000000000000000000000000000000000000002c0a2a3078663339466436653531616164383846364634636536614238383237323739636666466239323236360000000000000000000000000000000000000000" + payloadBz, _ := hex.DecodeString(payloadStr) + pack, err := keeper.DeserializeSynPackage(payloadBz) + if err != nil { + t.Fatal(err) + } + + msg, err := keeper.DeserializeExecutorMsg(pack[0]) + if err != nil { + t.Fatal(err) + } + fmt.Println(msg) +} diff --git a/x/storage/types/crosschain.go b/x/storage/types/crosschain.go index 5b1d3e95c..580ddeb87 100644 --- a/x/storage/types/crosschain.go +++ b/x/storage/types/crosschain.go @@ -18,11 +18,13 @@ const ( ObjectChannel = "object" GroupChannel = "group" PermissionChannel = "permission" + ExecutorChannel = "executor" BucketChannelId sdk.ChannelID = 4 ObjectChannelId sdk.ChannelID = 5 GroupChannelId sdk.ChannelID = 6 PermissionChannelId sdk.ChannelID = 7 + ExecutorChannelId sdk.ChannelID = 9 // bucket operation types @@ -1278,13 +1280,13 @@ func (p UpdateGroupMemberSynPackage) ValidateBasic() error { func DeserializeUpdateGroupMemberSynPackage(serializedPackage []byte) (interface{}, error) { unpacked, err := updateGroupMemberSynPackageArgs.Unpack(serializedPackage) if err != nil { - return nil, errors.Wrapf(ErrInvalidCrossChainPackage, "deserialize update group member sun package failed") + return nil, errors.Wrapf(ErrInvalidCrossChainPackage, "deserialize update group member syn package failed") } unpackedStruct := abi.ConvertType(unpacked[0], UpdateGroupMemberSynPackageStruct{}) pkgStruct, ok := unpackedStruct.(UpdateGroupMemberSynPackageStruct) if !ok { - return nil, errors.Wrapf(ErrInvalidCrossChainPackage, "reflect update group member sun package failed") + return nil, errors.Wrapf(ErrInvalidCrossChainPackage, "reflect update group member syn package failed") } totalMember := len(pkgStruct.Members) diff --git a/x/storage/types/expected_keepers.go b/x/storage/types/expected_keepers.go index d56c545b6..890758c6b 100644 --- a/x/storage/types/expected_keepers.go +++ b/x/storage/types/expected_keepers.go @@ -1,6 +1,7 @@ package types import ( + "context" "math/big" time "time" @@ -125,3 +126,22 @@ type StorageKeeper interface { NormalizePrincipal(ctx sdk.Context, principal *permtypes.Principal) ValidatePrincipal(ctx sdk.Context, resOwner sdk.AccAddress, principal *permtypes.Principal) error } + +type PaymentMsgServer interface { + CreatePaymentAccount(context.Context, *paymenttypes.MsgCreatePaymentAccount) (*paymenttypes.MsgCreatePaymentAccountResponse, error) + Deposit(context.Context, *paymenttypes.MsgDeposit) (*paymenttypes.MsgDepositResponse, error) + Withdraw(context.Context, *paymenttypes.MsgWithdraw) (*paymenttypes.MsgWithdrawResponse, error) + DisableRefund(context.Context, *paymenttypes.MsgDisableRefund) (*paymenttypes.MsgDisableRefundResponse, error) +} + +type StorageMsgServer interface { + UpdateBucketInfo(context.Context, *MsgUpdateBucketInfo) (*MsgUpdateBucketInfoResponse, error) + ToggleSPAsDelegatedAgent(context.Context, *MsgToggleSPAsDelegatedAgent) (*MsgToggleSPAsDelegatedAgentResponse, error) + CopyObject(context.Context, *MsgCopyObject) (*MsgCopyObjectResponse, error) + UpdateObjectInfo(context.Context, *MsgUpdateObjectInfo) (*MsgUpdateObjectInfoResponse, error) + UpdateGroupExtra(context.Context, *MsgUpdateGroupExtra) (*MsgUpdateGroupExtraResponse, error) + MigrateBucket(context.Context, *MsgMigrateBucket) (*MsgMigrateBucketResponse, error) + CancelMigrateBucket(context.Context, *MsgCancelMigrateBucket) (*MsgCancelMigrateBucketResponse, error) + SetTag(context.Context, *MsgSetTag) (*MsgSetTagResponse, error) + SetBucketFlowRateLimit(context.Context, *MsgSetBucketFlowRateLimit) (*MsgSetBucketFlowRateLimitResponse, error) +}