Skip to content

Commit

Permalink
WIP Refactor + Remove duplications, NodeConfig links RPCManager, etc.
Browse files Browse the repository at this point in the history
  • Loading branch information
xmariachi committed Jan 28, 2025
1 parent b1d0a3d commit cce1e8b
Show file tree
Hide file tree
Showing 22 changed files with 337 additions and 213 deletions.
24 changes: 4 additions & 20 deletions lib/domain_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,6 @@ import (
emissions "github.com/allora-network/allora-chain/x/emissions/types"
cometrpc "github.com/cometbft/cometbft/rpc/client/http"
cmtservice "github.com/cosmos/cosmos-sdk/client/grpc/cmtservice"
keyring "github.com/cosmos/cosmos-sdk/crypto/keyring"
cryptotypes "github.com/cosmos/cosmos-sdk/crypto/types"
sdktypes "github.com/cosmos/cosmos-sdk/types"
auth "github.com/cosmos/cosmos-sdk/x/auth/types"
bank "github.com/cosmos/cosmos-sdk/x/bank/types"
feemarkettypes "github.com/skip-mev/feemarket/x/feemarket/types"
Expand Down Expand Up @@ -41,8 +38,6 @@ const (

// Properties manually provided by the user as part of UserConfig
type WalletConfig struct {
// Autogenerated
Address string // will be overwritten by the keystore.
// Provided by the user
AddressKeyName string // load a address by key from the keystore
AddressRestoreMnemonic string // load a address by mnemonic from the keystore
Expand Down Expand Up @@ -73,23 +68,14 @@ type Client struct {
GRPCClient *grpc.ClientConn
}

// Properties auto-generated based on what the user has provided in WalletConfig fields of UserConfig
// Communication with the chain
type ChainConfig struct {
Keyring keyring.Keyring
Address string // will be auto-generated based on the keystore
AddressSDK sdktypes.Address
Client *Client
PrivKey cryptotypes.PrivKey
PubKey cryptotypes.PubKey
EmissionsQueryClient emissions.QueryServiceClient
BankQueryClient bank.QueryClient
AuthQueryClient auth.QueryClient
FeeMarketQueryClient feemarkettypes.QueryClient
CometQueryClient cmtservice.ServiceClient
DefaultBondDenom string
AddressPrefix string // prefix for the allora addresses
AccNum uint64
Sequence uint64
}

type TopicActor interface {
Expand Down Expand Up @@ -142,12 +128,10 @@ type UserConfig struct {
Reputer []ReputerConfig
}

// NodeConfig is the configuration for a node
type NodeConfig struct {
ServerAddress string // RPC endpoint
Chain ChainConfig
Wallet WalletConfig
Worker []WorkerConfig
Reputer []ReputerConfig
ServerAddress string // Server endpoint address URI
Chain ChainConfig // Configuration for the chain
RPCManager *RPCManager // Link to the RPCManager that created this node
}

Expand Down
46 changes: 23 additions & 23 deletions lib/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,17 +82,16 @@ func calculateExponentialBackoffDelaySeconds(baseDelay int64, retryCount int64)

// processError handles the error messages.
func ProcessErrorTx(ctx context.Context, err error, infoMsg string, retryCount, retryMax int64, node *NodeConfig) (string, error) {
rpcManager := node.RPCManager
if strings.Contains(err.Error(), ErrorMessageAbciErrorCodeMarker) {
re := regexp.MustCompile(`error code: '(\d+)'`)
matches := re.FindStringSubmatch(err.Error())
if len(matches) == 2 {
errorCode, parseErr := strconv.ParseUint(matches[1], 10, 32)
if parseErr != nil {
log.Error().Err(parseErr).Str("rpc", rpcManager.GetCurrentTxNode().ServerAddress).Str("msg", infoMsg).Msg("Failed to parse ABCI error code, skipping ABCI error code triage")
log.Error().Err(parseErr).Str("rpc", node.ServerAddress).Str("msg", infoMsg).Msg("Failed to parse ABCI error code, skipping ABCI error code triage")
} else {
if errorCode > math.MaxUint32 {
log.Error().Str("rpc", rpcManager.GetCurrentTxNode().ServerAddress).Str("msg", infoMsg).Msg("Parsed ABCI error code exceeds uint32 bounds, skipping ABCI error code triage")
log.Error().Str("rpc", node.ServerAddress).Str("msg", infoMsg).Msg("Parsed ABCI error code exceeds uint32 bounds, skipping ABCI error code triage")
} else {
return triageABCIErrorCode(ctx, uint32(errorCode), err, infoMsg, retryCount, retryMax, node) //nolint:gosec // Safe conversion - we check bounds above
}
Expand All @@ -113,9 +112,10 @@ func ProcessErrorTx(ctx context.Context, err error, infoMsg string, retryCount,
// triageABCIErrorCode handles specific ABCI error codes and returns appropriate processing instructions
func triageABCIErrorCode(ctx context.Context, errorCode uint32, err error, infoMsg string, retryCount, retryMax int64, node *NodeConfig) (string, error) {
rpcManager := node.RPCManager
walletConfig, err := rpcManager.GetWalletConfig()
if err != nil {
return "", err
// Beware: this error must not overwrite the "err" error
walletConfig, errorWalletConfig := rpcManager.GetWalletConfig()
if errorWalletConfig != nil {
return "", errorWalletConfig
}
switch errorCode {
case sdkerrors.ErrMempoolIsFull.ABCICode():
Expand Down Expand Up @@ -197,23 +197,24 @@ func triageABCIErrorCode(ctx context.Context, errorCode uint32, err error, infoM
// Triages error by string matching
func triageStringMatchingError(ctx context.Context, err error, infoMsg string, node *NodeConfig) (string, error) {
rpcManager := node.RPCManager
walletConfig, err := rpcManager.GetWalletConfig()
if err != nil {
return "", err
walletConfig, errorWalletConfig := rpcManager.GetWalletConfig()
if errorWalletConfig != nil {
return "", errorWalletConfig
}

if strings.Contains(err.Error(), ErrorMessageAccountSequenceMismatch) {
log.Warn().
Err(err).
Str("rpc", rpcManager.GetCurrentTxNode().ServerAddress).
Str("rpc", node.ServerAddress).
Str("msg", infoMsg).
Int64("delay", walletConfig.AccountSequenceRetryDelay).
Msg("Account sequence mismatch detected, re-fetching sequence")

expectedSeqNum, currentSeqNum, err := parseSequenceFromAccountMismatchError(err.Error())
if err != nil {
log.Error().Err(err).
Str("rpc", rpcManager.GetCurrentTxNode().ServerAddress).Str("msg", infoMsg).
Str("rpc", node.ServerAddress).
Str("msg", infoMsg).
Msg("Failed to parse sequence from error - retrying with regular delay")
if DoneOrWait(ctx, walletConfig.RetryDelay) {
return ErrorProcessingError, ctx.Err()
Expand All @@ -234,40 +235,39 @@ func triageStringMatchingError(ctx context.Context, err error, infoMsg string, n
}
return ErrorProcessingContinue, nil
} else if strings.Contains(err.Error(), ErrorContextDeadlineExceeded) {
log.Warn().Err(err).Str("rpc", rpcManager.GetCurrentTxNode().ServerAddress).Str("msg", infoMsg).Msg("Context deadline exceeded, switching to next node")
log.Warn().Err(err).Str("rpc", node.ServerAddress).Str("msg", infoMsg).Msg("Context deadline exceeded, switching to next node")
return ErrorProcessingSwitchingNode, err
} else if strings.Contains(err.Error(), ErrorMessageWaitingForNextBlock) {
log.Warn().Err(err).Str("rpc", rpcManager.GetCurrentTxNode().ServerAddress).Str("msg", infoMsg).Msg("Tx accepted in mempool, it will be included in the following block(s) - not retrying")
log.Warn().Err(err).Str("rpc", node.ServerAddress).Str("msg", infoMsg).Msg("Tx accepted in mempool, it will be included in the following block(s) - not retrying")
return ErrorProcessingOk, nil
} else if strings.Contains(err.Error(), ErrorMessageDataAlreadySubmitted) || strings.Contains(err.Error(), ErrorMessageCannotUpdateEma) {
log.Warn().Err(err).Str("rpc", rpcManager.GetCurrentTxNode().ServerAddress).Str("msg", infoMsg).Msg("Already submitted data for this epoch.")
log.Warn().Err(err).Str("rpc", node.ServerAddress).Str("msg", infoMsg).Msg("Already submitted data for this epoch.")
return ErrorProcessingOk, nil
} else if strings.Contains(err.Error(), ErrorMessageTimeoutHeight) {
log.Warn().Err(err).Str("rpc", rpcManager.GetCurrentTxNode().ServerAddress).Str("msg", infoMsg).Msg("Tx failed because of timeout height")
log.Warn().Err(err).Str("rpc", node.ServerAddress).Str("msg", infoMsg).Msg("Tx failed because of timeout height")
return ErrorProcessingFailure, err
} else if strings.Contains(err.Error(), ErrorMessageNotPermittedToSubmitPayload) {
log.Warn().Err(err).Str("rpc", rpcManager.GetCurrentTxNode().ServerAddress).Str("msg", infoMsg).Msg("Actor is not permitted to submit payload")
log.Warn().Err(err).Str("rpc", node.ServerAddress).Str("msg", infoMsg).Msg("Actor is not permitted to submit payload")
return ErrorProcessingFailure, err
} else if strings.Contains(err.Error(), ErrorMessageNoInferencesFoundForTopic) {
log.Warn().Err(err).Str("rpc", rpcManager.GetCurrentTxNode().ServerAddress).Str("msg", infoMsg).Msg("No inferences found for topic")
log.Warn().Err(err).Str("rpc", node.ServerAddress).Str("msg", infoMsg).Msg("No inferences found for topic")
return ErrorProcessingFailure, err
} else if strings.Contains(err.Error(), ErrorMessageNotPermittedToAddStake) {
log.Warn().Err(err).Str("rpc", rpcManager.GetCurrentTxNode().ServerAddress).Str("msg", infoMsg).Msg("Actor is not permitted to add stake")
log.Warn().Err(err).Str("rpc", node.ServerAddress).Str("msg", infoMsg).Msg("Actor is not permitted to add stake")
return ErrorProcessingFailure, err
} else if strings.Contains(err.Error(), ErrorMessageReadFlatPanic) || strings.Contains(err.Error(), ErrorMessageReadPerBytePanic) {
log.Warn().Err(err).Str("rpc", rpcManager.GetCurrentTxNode().ServerAddress).Str("msg", infoMsg).Msg("Read panic, switching to next node")
log.Warn().Err(err).Str("rpc", node.ServerAddress).Str("msg", infoMsg).Msg("Read panic, switching to next node")
return ErrorProcessingSwitchingNode, ErrReadPanic
} else if strings.Contains(err.Error(), ErrorMessageConnectionRefused) {
log.Warn().Err(err).Str("rpc", rpcManager.GetCurrentTxNode().ServerAddress).Str("msg", infoMsg).Msg("Connection refused, switching to next node")
log.Warn().Err(err).Str("rpc", node.ServerAddress).Str("msg", infoMsg).Msg("Connection refused, switching to next node")
return ErrorProcessingSwitchingNode, ErrConnectionRefused
}
log.Info().Err(err).Str("rpc", rpcManager.GetCurrentTxNode().ServerAddress).Str("msg", infoMsg).Msg("Unknown error")
log.Info().Err(err).Str("rpc", node.ServerAddress).Str("msg", infoMsg).Msg("Unknown error")
return ErrorProcessingError, errorsmod.Wrap(ErrUnexpectedError, err.Error())
}

// triageHTTPStatusError checks if the error contains an HTTP status code and determines if node switching is needed
func triageHTTPStatusError(err error, node *NodeConfig, infoMsg string) (string, error) {
rpcManager := node.RPCManager
statusCode, statusMessage, parseErr := ParseHTTPStatus(err.Error())
if parseErr == nil {
log.Warn().
Expand All @@ -279,7 +279,7 @@ func triageHTTPStatusError(err error, node *NodeConfig, infoMsg string) (string,
// When status code is in the list of codes that trigger node switching, switch to next node without retries
if HTTPStatusCodeCodesSwitchingNode[statusCode] {
log.Warn().
Str("rpc", rpcManager.GetCurrentTxNode().ServerAddress).
Str("rpc", node.ServerAddress).
Int("statusCode", statusCode).
Str("statusMessage", statusMessage).
Str("msg", infoMsg).
Expand Down
52 changes: 4 additions & 48 deletions lib/factory_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ func (chain *ChainConfig) InitializeGRPCClient(grpcEndpoint string) (grpcConnect
return nil, fmt.Errorf("failed to connect to %s: %w", grpcEndpoint, err)
}

// spin up goroutine for monitoring and reconnect purposes
// spin up goroutine for monitoring and reconnect purposes - TODO test and configure
// go func() {
// for {
// state := chain.Client.GRPCClient.GetState()
Expand All @@ -155,45 +155,16 @@ func (chain *ChainConfig) InitializeGRPCClient(grpcEndpoint string) (grpcConnect
}

func (c *UserConfig) GenerateNodeConfig(ctx context.Context, wallet *Wallet, rpc string, grpc string) (nodeConfig *NodeConfig, err error) {

// // Get keyring
// keyring, err := GetKeyring(c.Wallet.AlloraHomeDir)
// if err != nil {
// return nil, fmt.Errorf("failed to get keyring: %w", err)
// }

// // Store address information
// var privKey cryptotypes.PrivKey
// var pubKey cryptotypes.PubKey
// var address string

// privKey, pubKey, address, addressSDK, err := GetAddressAndKeys(c.Wallet.AddressRestoreMnemonic, c.Wallet.AddressKeyName)
// if err != nil {
// return nil, fmt.Errorf("failed to get address and keys: %w", err)
// }

c.Wallet.Address = wallet.Address // Overwrite the address with the one from the keystore

log.Info().Str("rpc", rpc).Str("address", wallet.Address).Msg("Allora client created successfully")

// TODO: This is a temporary solution to get the chain config working, will be removed after refactor
alloraChain := ChainConfig{ // nolint: exhaustruct
Keyring: wallet.Keyring,
Address: wallet.Address,
AddressSDK: wallet.AddressSDK,
PrivKey: wallet.PrivKey,
PubKey: wallet.PubKey,
AddressPrefix: ADDRESS_PREFIX,
DefaultBondDenom: DEFAULT_BOND_DENOM,
Client: &Client{}, // nolint: exhaustruct
Client: &Client{}, // nolint: exhaustruct
}

Node := NodeConfig{
ServerAddress: rpc,
Chain: alloraChain,
Wallet: c.Wallet,
Worker: c.Worker,
Reputer: c.Reputer,
}

// Get RPC allora client
Expand All @@ -205,7 +176,7 @@ func (c *UserConfig) GenerateNodeConfig(ctx context.Context, wallet *Wallet, rpc
}
Node.Chain.Client.RPCClient = rpcClient
Node.ServerAddress = rpc
log.Info().Msgf("RPC Node initialized successfully, with account (sequence: %d, accNum: %d)", wallet.GetSequence(), wallet.AccountNumber)
log.Info().Msgf("RPC Node initialized successfully %s", rpc)
}

// Get GRPC allora client
Expand All @@ -224,22 +195,7 @@ func (c *UserConfig) GenerateNodeConfig(ctx context.Context, wallet *Wallet, rpc
Node.Chain.CometQueryClient = cmtservice.NewServiceClient(grpcConn)
Node.Chain.Client.GRPCClient = grpcConn
Node.ServerAddress = grpc

if wallet.GetSequence() == 0 {
_, sequence, accNum, err := Node.GetAccountInfo(ctx)
if err != nil {
return nil, fmt.Errorf("failed to get account info: %w", err)
}
wallet.SetSequence(sequence)
wallet.AccountNumber = accNum
log.Info().Msgf("GRPC Node initialized successfully, with account (sequence: %d, accNum: %d)", sequence, accNum)
} else {
log.Info().Msgf("GRPC Node initialized successfully, with account (sequence: %d, accNum: %d)", wallet.GetSequence(), wallet.AccountNumber)
}

log.Info().Msgf("GRPC Node initialized successfully %s", grpc)
}
// TODO Remove Legacy "Chain" object
Node.Chain.AccNum = wallet.AccountNumber
Node.Chain.Sequence = wallet.GetSequence()
return &Node, nil
}
15 changes: 10 additions & 5 deletions lib/repo_query_account.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,25 @@ import (
"context"
"fmt"

errorsmod "cosmossdk.io/errors"
"github.com/cosmos/cosmos-sdk/types/query"
auth "github.com/cosmos/cosmos-sdk/x/auth/types"
"github.com/rs/zerolog/log"
)

// GetBaseFee queries the current base fee from the feemarket module
func (node *NodeConfig) GetAccountInfo(ctx context.Context) (address string, sequence uint64, accNum uint64, err error) {
log.Info().Msgf("Getting account info for %s", node.Chain.Address)
func (node *NodeConfig) GetAccountInfo(ctx context.Context, inAddress string) (address string, sequence uint64, accNum uint64, err error) {
walletConfig, err := node.RPCManager.GetWalletConfig()
if err != nil {
return "", 0, 0, errorsmod.Wrapf(err, "Error getting wallet config")
}
log.Info().Msgf("Getting account info for %s", inAddress)
resp, err := QueryDataWithRetry(
ctx,
node.Wallet.MaxRetries,
node.Wallet.RetryDelay,
walletConfig.MaxRetries,
walletConfig.RetryDelay,
func(ctx context.Context, req query.PageRequest) (*auth.QueryAccountInfoResponse, error) {
return node.Chain.AuthQueryClient.AccountInfo(ctx, &auth.QueryAccountInfoRequest{Address: node.Chain.Address})
return node.Chain.AuthQueryClient.AccountInfo(ctx, &auth.QueryAccountInfoRequest{Address: inAddress})
},
query.PageRequest{}, // nolint:exhaustruct
"get account info",
Expand Down
18 changes: 13 additions & 5 deletions lib/repo_query_actor_whitelist.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,21 @@ package lib
import (
"context"

errorsmod "cosmossdk.io/errors"
emissionstypes "github.com/allora-network/allora-chain/x/emissions/types"
"github.com/cosmos/cosmos-sdk/types/query"
)

// Checks if a worker can submit to a given topic
func (node *NodeConfig) CanSubmitWorker(ctx context.Context, topicId emissionstypes.TopicId, address string) (bool, error) {
walletConfig, err := node.RPCManager.GetWalletConfig()
if err != nil {
return false, errorsmod.Wrapf(err, "Error getting wallet config")
}
resp, err := QueryDataWithRetry(
ctx,
node.Wallet.MaxRetries,
node.Wallet.RetryDelay,
walletConfig.MaxRetries,
walletConfig.RetryDelay,
func(ctx context.Context, req query.PageRequest) (*emissionstypes.CanSubmitWorkerPayloadResponse, error) {
return node.Chain.EmissionsQueryClient.CanSubmitWorkerPayload(ctx, &emissionstypes.CanSubmitWorkerPayloadRequest{
TopicId: topicId,
Expand All @@ -32,11 +37,14 @@ func (node *NodeConfig) CanSubmitWorker(ctx context.Context, topicId emissionsty

// Checks if a reputer can submit to a given topic
func (node *NodeConfig) CanSubmitReputer(ctx context.Context, topicId emissionstypes.TopicId, address string) (bool, error) {

walletConfig, err := node.RPCManager.GetWalletConfig()
if err != nil {
return false, errorsmod.Wrapf(err, "Error getting wallet config")
}
resp, err := QueryDataWithRetry(
ctx,
node.Wallet.MaxRetries,
node.Wallet.RetryDelay,
walletConfig.MaxRetries,
walletConfig.RetryDelay,
func(ctx context.Context, req query.PageRequest) (*emissionstypes.CanSubmitReputerPayloadResponse, error) {
return node.Chain.EmissionsQueryClient.CanSubmitReputerPayload(ctx, &emissionstypes.CanSubmitReputerPayloadRequest{
TopicId: topicId,
Expand Down
15 changes: 10 additions & 5 deletions lib/repo_query_balance.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,20 +3,25 @@ package lib
import (
"context"

errorsmod "cosmossdk.io/errors"
cosmossdk_io_math "cosmossdk.io/math"
"github.com/cosmos/cosmos-sdk/types/query"
banktypes "github.com/cosmos/cosmos-sdk/x/bank/types"
)

func (node *NodeConfig) GetBalance(ctx context.Context) (cosmossdk_io_math.Int, error) {
func (node *NodeConfig) GetBalance(ctx context.Context, inAddress string, denom string) (cosmossdk_io_math.Int, error) {
walletConfig, err := node.RPCManager.GetWalletConfig()
if err != nil {
return cosmossdk_io_math.Int{}, errorsmod.Wrapf(err, "Error getting wallet config")
}
resp, err := QueryDataWithRetry(
ctx,
node.Wallet.MaxRetries,
node.Wallet.RetryDelay,
walletConfig.MaxRetries,
walletConfig.RetryDelay,
func(ctx context.Context, req query.PageRequest) (*banktypes.QueryBalanceResponse, error) {
return node.Chain.BankQueryClient.Balance(ctx, &banktypes.QueryBalanceRequest{
Address: node.Chain.Address,
Denom: node.Chain.DefaultBondDenom,
Address: inAddress,
Denom: denom,
})
},
query.PageRequest{}, // nolint: exhaustruct
Expand Down
9 changes: 7 additions & 2 deletions lib/repo_query_block.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,20 @@ package lib
import (
"context"

errorsmod "cosmossdk.io/errors"
emissionstypes "github.com/allora-network/allora-chain/x/emissions/types"
"github.com/cosmos/cosmos-sdk/types/query"
)

func (node *NodeConfig) GetReputerValuesAtBlock(ctx context.Context, topicId emissionstypes.TopicId, nonce BlockHeight) (*emissionstypes.ValueBundle, error) {
walletConfig, err := node.RPCManager.GetWalletConfig()
if err != nil {
return nil, errorsmod.Wrapf(err, "Error getting wallet config")
}
resp, err := QueryDataWithRetry(
ctx,
node.Wallet.MaxRetries,
node.Wallet.RetryDelay,
walletConfig.MaxRetries,
walletConfig.RetryDelay,
func(ctx context.Context, req query.PageRequest) (*emissionstypes.GetNetworkInferencesAtBlockResponse, error) {
return node.Chain.EmissionsQueryClient.GetNetworkInferencesAtBlock(ctx, &emissionstypes.GetNetworkInferencesAtBlockRequest{
TopicId: topicId,
Expand Down
Loading

0 comments on commit cce1e8b

Please sign in to comment.