diff --git a/lib/domain_config.go b/lib/domain_config.go index 1b16cdf..97c5642 100644 --- a/lib/domain_config.go +++ b/lib/domain_config.go @@ -7,9 +7,6 @@ import ( emissions "github.com/allora-network/allora-chain/x/emissions/types" cometrpc "github.com/cometbft/cometbft/rpc/client/http" cmtservice "github.com/cosmos/cosmos-sdk/client/grpc/cmtservice" - keyring "github.com/cosmos/cosmos-sdk/crypto/keyring" - cryptotypes "github.com/cosmos/cosmos-sdk/crypto/types" - sdktypes "github.com/cosmos/cosmos-sdk/types" auth "github.com/cosmos/cosmos-sdk/x/auth/types" bank "github.com/cosmos/cosmos-sdk/x/bank/types" feemarkettypes "github.com/skip-mev/feemarket/x/feemarket/types" @@ -41,8 +38,6 @@ const ( // Properties manually provided by the user as part of UserConfig type WalletConfig struct { - // Autogenerated - Address string // will be overwritten by the keystore. // Provided by the user AddressKeyName string // load a address by key from the keystore AddressRestoreMnemonic string // load a address by mnemonic from the keystore @@ -73,23 +68,14 @@ type Client struct { GRPCClient *grpc.ClientConn } -// Properties auto-generated based on what the user has provided in WalletConfig fields of UserConfig +// Communication with the chain type ChainConfig struct { - Keyring keyring.Keyring - Address string // will be auto-generated based on the keystore - AddressSDK sdktypes.Address Client *Client - PrivKey cryptotypes.PrivKey - PubKey cryptotypes.PubKey EmissionsQueryClient emissions.QueryServiceClient BankQueryClient bank.QueryClient AuthQueryClient auth.QueryClient FeeMarketQueryClient feemarkettypes.QueryClient CometQueryClient cmtservice.ServiceClient - DefaultBondDenom string - AddressPrefix string // prefix for the allora addresses - AccNum uint64 - Sequence uint64 } type TopicActor interface { @@ -142,12 +128,10 @@ type UserConfig struct { Reputer []ReputerConfig } +// NodeConfig is the configuration for a node type NodeConfig struct { - ServerAddress string // RPC endpoint - Chain ChainConfig - Wallet WalletConfig - Worker []WorkerConfig - Reputer []ReputerConfig + ServerAddress string // Server endpoint address URI + Chain ChainConfig // Configuration for the chain RPCManager *RPCManager // Link to the RPCManager that created this node } diff --git a/lib/errors.go b/lib/errors.go index f40c6ea..9d6832c 100644 --- a/lib/errors.go +++ b/lib/errors.go @@ -82,17 +82,16 @@ func calculateExponentialBackoffDelaySeconds(baseDelay int64, retryCount int64) // processError handles the error messages. func ProcessErrorTx(ctx context.Context, err error, infoMsg string, retryCount, retryMax int64, node *NodeConfig) (string, error) { - rpcManager := node.RPCManager if strings.Contains(err.Error(), ErrorMessageAbciErrorCodeMarker) { re := regexp.MustCompile(`error code: '(\d+)'`) matches := re.FindStringSubmatch(err.Error()) if len(matches) == 2 { errorCode, parseErr := strconv.ParseUint(matches[1], 10, 32) if parseErr != nil { - log.Error().Err(parseErr).Str("rpc", rpcManager.GetCurrentTxNode().ServerAddress).Str("msg", infoMsg).Msg("Failed to parse ABCI error code, skipping ABCI error code triage") + log.Error().Err(parseErr).Str("rpc", node.ServerAddress).Str("msg", infoMsg).Msg("Failed to parse ABCI error code, skipping ABCI error code triage") } else { if errorCode > math.MaxUint32 { - log.Error().Str("rpc", rpcManager.GetCurrentTxNode().ServerAddress).Str("msg", infoMsg).Msg("Parsed ABCI error code exceeds uint32 bounds, skipping ABCI error code triage") + log.Error().Str("rpc", node.ServerAddress).Str("msg", infoMsg).Msg("Parsed ABCI error code exceeds uint32 bounds, skipping ABCI error code triage") } else { return triageABCIErrorCode(ctx, uint32(errorCode), err, infoMsg, retryCount, retryMax, node) //nolint:gosec // Safe conversion - we check bounds above } @@ -113,9 +112,10 @@ func ProcessErrorTx(ctx context.Context, err error, infoMsg string, retryCount, // triageABCIErrorCode handles specific ABCI error codes and returns appropriate processing instructions func triageABCIErrorCode(ctx context.Context, errorCode uint32, err error, infoMsg string, retryCount, retryMax int64, node *NodeConfig) (string, error) { rpcManager := node.RPCManager - walletConfig, err := rpcManager.GetWalletConfig() - if err != nil { - return "", err + // Beware: this error must not overwrite the "err" error + walletConfig, errorWalletConfig := rpcManager.GetWalletConfig() + if errorWalletConfig != nil { + return "", errorWalletConfig } switch errorCode { case sdkerrors.ErrMempoolIsFull.ABCICode(): @@ -197,15 +197,15 @@ func triageABCIErrorCode(ctx context.Context, errorCode uint32, err error, infoM // Triages error by string matching func triageStringMatchingError(ctx context.Context, err error, infoMsg string, node *NodeConfig) (string, error) { rpcManager := node.RPCManager - walletConfig, err := rpcManager.GetWalletConfig() - if err != nil { - return "", err + walletConfig, errorWalletConfig := rpcManager.GetWalletConfig() + if errorWalletConfig != nil { + return "", errorWalletConfig } if strings.Contains(err.Error(), ErrorMessageAccountSequenceMismatch) { log.Warn(). Err(err). - Str("rpc", rpcManager.GetCurrentTxNode().ServerAddress). + Str("rpc", node.ServerAddress). Str("msg", infoMsg). Int64("delay", walletConfig.AccountSequenceRetryDelay). Msg("Account sequence mismatch detected, re-fetching sequence") @@ -213,7 +213,8 @@ func triageStringMatchingError(ctx context.Context, err error, infoMsg string, n expectedSeqNum, currentSeqNum, err := parseSequenceFromAccountMismatchError(err.Error()) if err != nil { log.Error().Err(err). - Str("rpc", rpcManager.GetCurrentTxNode().ServerAddress).Str("msg", infoMsg). + Str("rpc", node.ServerAddress). + Str("msg", infoMsg). Msg("Failed to parse sequence from error - retrying with regular delay") if DoneOrWait(ctx, walletConfig.RetryDelay) { return ErrorProcessingError, ctx.Err() @@ -234,40 +235,39 @@ func triageStringMatchingError(ctx context.Context, err error, infoMsg string, n } return ErrorProcessingContinue, nil } else if strings.Contains(err.Error(), ErrorContextDeadlineExceeded) { - log.Warn().Err(err).Str("rpc", rpcManager.GetCurrentTxNode().ServerAddress).Str("msg", infoMsg).Msg("Context deadline exceeded, switching to next node") + log.Warn().Err(err).Str("rpc", node.ServerAddress).Str("msg", infoMsg).Msg("Context deadline exceeded, switching to next node") return ErrorProcessingSwitchingNode, err } else if strings.Contains(err.Error(), ErrorMessageWaitingForNextBlock) { - log.Warn().Err(err).Str("rpc", rpcManager.GetCurrentTxNode().ServerAddress).Str("msg", infoMsg).Msg("Tx accepted in mempool, it will be included in the following block(s) - not retrying") + log.Warn().Err(err).Str("rpc", node.ServerAddress).Str("msg", infoMsg).Msg("Tx accepted in mempool, it will be included in the following block(s) - not retrying") return ErrorProcessingOk, nil } else if strings.Contains(err.Error(), ErrorMessageDataAlreadySubmitted) || strings.Contains(err.Error(), ErrorMessageCannotUpdateEma) { - log.Warn().Err(err).Str("rpc", rpcManager.GetCurrentTxNode().ServerAddress).Str("msg", infoMsg).Msg("Already submitted data for this epoch.") + log.Warn().Err(err).Str("rpc", node.ServerAddress).Str("msg", infoMsg).Msg("Already submitted data for this epoch.") return ErrorProcessingOk, nil } else if strings.Contains(err.Error(), ErrorMessageTimeoutHeight) { - log.Warn().Err(err).Str("rpc", rpcManager.GetCurrentTxNode().ServerAddress).Str("msg", infoMsg).Msg("Tx failed because of timeout height") + log.Warn().Err(err).Str("rpc", node.ServerAddress).Str("msg", infoMsg).Msg("Tx failed because of timeout height") return ErrorProcessingFailure, err } else if strings.Contains(err.Error(), ErrorMessageNotPermittedToSubmitPayload) { - log.Warn().Err(err).Str("rpc", rpcManager.GetCurrentTxNode().ServerAddress).Str("msg", infoMsg).Msg("Actor is not permitted to submit payload") + log.Warn().Err(err).Str("rpc", node.ServerAddress).Str("msg", infoMsg).Msg("Actor is not permitted to submit payload") return ErrorProcessingFailure, err } else if strings.Contains(err.Error(), ErrorMessageNoInferencesFoundForTopic) { - log.Warn().Err(err).Str("rpc", rpcManager.GetCurrentTxNode().ServerAddress).Str("msg", infoMsg).Msg("No inferences found for topic") + log.Warn().Err(err).Str("rpc", node.ServerAddress).Str("msg", infoMsg).Msg("No inferences found for topic") return ErrorProcessingFailure, err } else if strings.Contains(err.Error(), ErrorMessageNotPermittedToAddStake) { - log.Warn().Err(err).Str("rpc", rpcManager.GetCurrentTxNode().ServerAddress).Str("msg", infoMsg).Msg("Actor is not permitted to add stake") + log.Warn().Err(err).Str("rpc", node.ServerAddress).Str("msg", infoMsg).Msg("Actor is not permitted to add stake") return ErrorProcessingFailure, err } else if strings.Contains(err.Error(), ErrorMessageReadFlatPanic) || strings.Contains(err.Error(), ErrorMessageReadPerBytePanic) { - log.Warn().Err(err).Str("rpc", rpcManager.GetCurrentTxNode().ServerAddress).Str("msg", infoMsg).Msg("Read panic, switching to next node") + log.Warn().Err(err).Str("rpc", node.ServerAddress).Str("msg", infoMsg).Msg("Read panic, switching to next node") return ErrorProcessingSwitchingNode, ErrReadPanic } else if strings.Contains(err.Error(), ErrorMessageConnectionRefused) { - log.Warn().Err(err).Str("rpc", rpcManager.GetCurrentTxNode().ServerAddress).Str("msg", infoMsg).Msg("Connection refused, switching to next node") + log.Warn().Err(err).Str("rpc", node.ServerAddress).Str("msg", infoMsg).Msg("Connection refused, switching to next node") return ErrorProcessingSwitchingNode, ErrConnectionRefused } - log.Info().Err(err).Str("rpc", rpcManager.GetCurrentTxNode().ServerAddress).Str("msg", infoMsg).Msg("Unknown error") + log.Info().Err(err).Str("rpc", node.ServerAddress).Str("msg", infoMsg).Msg("Unknown error") return ErrorProcessingError, errorsmod.Wrap(ErrUnexpectedError, err.Error()) } // triageHTTPStatusError checks if the error contains an HTTP status code and determines if node switching is needed func triageHTTPStatusError(err error, node *NodeConfig, infoMsg string) (string, error) { - rpcManager := node.RPCManager statusCode, statusMessage, parseErr := ParseHTTPStatus(err.Error()) if parseErr == nil { log.Warn(). @@ -279,7 +279,7 @@ func triageHTTPStatusError(err error, node *NodeConfig, infoMsg string) (string, // When status code is in the list of codes that trigger node switching, switch to next node without retries if HTTPStatusCodeCodesSwitchingNode[statusCode] { log.Warn(). - Str("rpc", rpcManager.GetCurrentTxNode().ServerAddress). + Str("rpc", node.ServerAddress). Int("statusCode", statusCode). Str("statusMessage", statusMessage). Str("msg", infoMsg). diff --git a/lib/factory_config.go b/lib/factory_config.go index 5a5e0b1..0549b87 100644 --- a/lib/factory_config.go +++ b/lib/factory_config.go @@ -135,7 +135,7 @@ func (chain *ChainConfig) InitializeGRPCClient(grpcEndpoint string) (grpcConnect return nil, fmt.Errorf("failed to connect to %s: %w", grpcEndpoint, err) } - // spin up goroutine for monitoring and reconnect purposes + // spin up goroutine for monitoring and reconnect purposes - TODO test and configure // go func() { // for { // state := chain.Client.GRPCClient.GetState() @@ -155,45 +155,16 @@ func (chain *ChainConfig) InitializeGRPCClient(grpcEndpoint string) (grpcConnect } func (c *UserConfig) GenerateNodeConfig(ctx context.Context, wallet *Wallet, rpc string, grpc string) (nodeConfig *NodeConfig, err error) { - - // // Get keyring - // keyring, err := GetKeyring(c.Wallet.AlloraHomeDir) - // if err != nil { - // return nil, fmt.Errorf("failed to get keyring: %w", err) - // } - - // // Store address information - // var privKey cryptotypes.PrivKey - // var pubKey cryptotypes.PubKey - // var address string - - // privKey, pubKey, address, addressSDK, err := GetAddressAndKeys(c.Wallet.AddressRestoreMnemonic, c.Wallet.AddressKeyName) - // if err != nil { - // return nil, fmt.Errorf("failed to get address and keys: %w", err) - // } - - c.Wallet.Address = wallet.Address // Overwrite the address with the one from the keystore - log.Info().Str("rpc", rpc).Str("address", wallet.Address).Msg("Allora client created successfully") // TODO: This is a temporary solution to get the chain config working, will be removed after refactor alloraChain := ChainConfig{ // nolint: exhaustruct - Keyring: wallet.Keyring, - Address: wallet.Address, - AddressSDK: wallet.AddressSDK, - PrivKey: wallet.PrivKey, - PubKey: wallet.PubKey, - AddressPrefix: ADDRESS_PREFIX, - DefaultBondDenom: DEFAULT_BOND_DENOM, - Client: &Client{}, // nolint: exhaustruct + Client: &Client{}, // nolint: exhaustruct } Node := NodeConfig{ ServerAddress: rpc, Chain: alloraChain, - Wallet: c.Wallet, - Worker: c.Worker, - Reputer: c.Reputer, } // Get RPC allora client @@ -205,7 +176,7 @@ func (c *UserConfig) GenerateNodeConfig(ctx context.Context, wallet *Wallet, rpc } Node.Chain.Client.RPCClient = rpcClient Node.ServerAddress = rpc - log.Info().Msgf("RPC Node initialized successfully, with account (sequence: %d, accNum: %d)", wallet.GetSequence(), wallet.AccountNumber) + log.Info().Msgf("RPC Node initialized successfully %s", rpc) } // Get GRPC allora client @@ -224,22 +195,7 @@ func (c *UserConfig) GenerateNodeConfig(ctx context.Context, wallet *Wallet, rpc Node.Chain.CometQueryClient = cmtservice.NewServiceClient(grpcConn) Node.Chain.Client.GRPCClient = grpcConn Node.ServerAddress = grpc - - if wallet.GetSequence() == 0 { - _, sequence, accNum, err := Node.GetAccountInfo(ctx) - if err != nil { - return nil, fmt.Errorf("failed to get account info: %w", err) - } - wallet.SetSequence(sequence) - wallet.AccountNumber = accNum - log.Info().Msgf("GRPC Node initialized successfully, with account (sequence: %d, accNum: %d)", sequence, accNum) - } else { - log.Info().Msgf("GRPC Node initialized successfully, with account (sequence: %d, accNum: %d)", wallet.GetSequence(), wallet.AccountNumber) - } - + log.Info().Msgf("GRPC Node initialized successfully %s", grpc) } - // TODO Remove Legacy "Chain" object - Node.Chain.AccNum = wallet.AccountNumber - Node.Chain.Sequence = wallet.GetSequence() return &Node, nil } diff --git a/lib/repo_query_account.go b/lib/repo_query_account.go index 920e973..677aaa9 100644 --- a/lib/repo_query_account.go +++ b/lib/repo_query_account.go @@ -4,20 +4,25 @@ import ( "context" "fmt" + errorsmod "cosmossdk.io/errors" "github.com/cosmos/cosmos-sdk/types/query" auth "github.com/cosmos/cosmos-sdk/x/auth/types" "github.com/rs/zerolog/log" ) // GetBaseFee queries the current base fee from the feemarket module -func (node *NodeConfig) GetAccountInfo(ctx context.Context) (address string, sequence uint64, accNum uint64, err error) { - log.Info().Msgf("Getting account info for %s", node.Chain.Address) +func (node *NodeConfig) GetAccountInfo(ctx context.Context, inAddress string) (address string, sequence uint64, accNum uint64, err error) { + walletConfig, err := node.RPCManager.GetWalletConfig() + if err != nil { + return "", 0, 0, errorsmod.Wrapf(err, "Error getting wallet config") + } + log.Info().Msgf("Getting account info for %s", inAddress) resp, err := QueryDataWithRetry( ctx, - node.Wallet.MaxRetries, - node.Wallet.RetryDelay, + walletConfig.MaxRetries, + walletConfig.RetryDelay, func(ctx context.Context, req query.PageRequest) (*auth.QueryAccountInfoResponse, error) { - return node.Chain.AuthQueryClient.AccountInfo(ctx, &auth.QueryAccountInfoRequest{Address: node.Chain.Address}) + return node.Chain.AuthQueryClient.AccountInfo(ctx, &auth.QueryAccountInfoRequest{Address: inAddress}) }, query.PageRequest{}, // nolint:exhaustruct "get account info", diff --git a/lib/repo_query_actor_whitelist.go b/lib/repo_query_actor_whitelist.go index 1665a47..ffd64ff 100644 --- a/lib/repo_query_actor_whitelist.go +++ b/lib/repo_query_actor_whitelist.go @@ -3,16 +3,21 @@ package lib import ( "context" + errorsmod "cosmossdk.io/errors" emissionstypes "github.com/allora-network/allora-chain/x/emissions/types" "github.com/cosmos/cosmos-sdk/types/query" ) // Checks if a worker can submit to a given topic func (node *NodeConfig) CanSubmitWorker(ctx context.Context, topicId emissionstypes.TopicId, address string) (bool, error) { + walletConfig, err := node.RPCManager.GetWalletConfig() + if err != nil { + return false, errorsmod.Wrapf(err, "Error getting wallet config") + } resp, err := QueryDataWithRetry( ctx, - node.Wallet.MaxRetries, - node.Wallet.RetryDelay, + walletConfig.MaxRetries, + walletConfig.RetryDelay, func(ctx context.Context, req query.PageRequest) (*emissionstypes.CanSubmitWorkerPayloadResponse, error) { return node.Chain.EmissionsQueryClient.CanSubmitWorkerPayload(ctx, &emissionstypes.CanSubmitWorkerPayloadRequest{ TopicId: topicId, @@ -32,11 +37,14 @@ func (node *NodeConfig) CanSubmitWorker(ctx context.Context, topicId emissionsty // Checks if a reputer can submit to a given topic func (node *NodeConfig) CanSubmitReputer(ctx context.Context, topicId emissionstypes.TopicId, address string) (bool, error) { - + walletConfig, err := node.RPCManager.GetWalletConfig() + if err != nil { + return false, errorsmod.Wrapf(err, "Error getting wallet config") + } resp, err := QueryDataWithRetry( ctx, - node.Wallet.MaxRetries, - node.Wallet.RetryDelay, + walletConfig.MaxRetries, + walletConfig.RetryDelay, func(ctx context.Context, req query.PageRequest) (*emissionstypes.CanSubmitReputerPayloadResponse, error) { return node.Chain.EmissionsQueryClient.CanSubmitReputerPayload(ctx, &emissionstypes.CanSubmitReputerPayloadRequest{ TopicId: topicId, diff --git a/lib/repo_query_balance.go b/lib/repo_query_balance.go index b95ed83..4576589 100644 --- a/lib/repo_query_balance.go +++ b/lib/repo_query_balance.go @@ -3,20 +3,25 @@ package lib import ( "context" + errorsmod "cosmossdk.io/errors" cosmossdk_io_math "cosmossdk.io/math" "github.com/cosmos/cosmos-sdk/types/query" banktypes "github.com/cosmos/cosmos-sdk/x/bank/types" ) -func (node *NodeConfig) GetBalance(ctx context.Context) (cosmossdk_io_math.Int, error) { +func (node *NodeConfig) GetBalance(ctx context.Context, inAddress string, denom string) (cosmossdk_io_math.Int, error) { + walletConfig, err := node.RPCManager.GetWalletConfig() + if err != nil { + return cosmossdk_io_math.Int{}, errorsmod.Wrapf(err, "Error getting wallet config") + } resp, err := QueryDataWithRetry( ctx, - node.Wallet.MaxRetries, - node.Wallet.RetryDelay, + walletConfig.MaxRetries, + walletConfig.RetryDelay, func(ctx context.Context, req query.PageRequest) (*banktypes.QueryBalanceResponse, error) { return node.Chain.BankQueryClient.Balance(ctx, &banktypes.QueryBalanceRequest{ - Address: node.Chain.Address, - Denom: node.Chain.DefaultBondDenom, + Address: inAddress, + Denom: denom, }) }, query.PageRequest{}, // nolint: exhaustruct diff --git a/lib/repo_query_block.go b/lib/repo_query_block.go index e9f2f65..df85c80 100644 --- a/lib/repo_query_block.go +++ b/lib/repo_query_block.go @@ -3,15 +3,20 @@ package lib import ( "context" + errorsmod "cosmossdk.io/errors" emissionstypes "github.com/allora-network/allora-chain/x/emissions/types" "github.com/cosmos/cosmos-sdk/types/query" ) func (node *NodeConfig) GetReputerValuesAtBlock(ctx context.Context, topicId emissionstypes.TopicId, nonce BlockHeight) (*emissionstypes.ValueBundle, error) { + walletConfig, err := node.RPCManager.GetWalletConfig() + if err != nil { + return nil, errorsmod.Wrapf(err, "Error getting wallet config") + } resp, err := QueryDataWithRetry( ctx, - node.Wallet.MaxRetries, - node.Wallet.RetryDelay, + walletConfig.MaxRetries, + walletConfig.RetryDelay, func(ctx context.Context, req query.PageRequest) (*emissionstypes.GetNetworkInferencesAtBlockResponse, error) { return node.Chain.EmissionsQueryClient.GetNetworkInferencesAtBlock(ctx, &emissionstypes.GetNetworkInferencesAtBlockRequest{ TopicId: topicId, diff --git a/lib/repo_query_fee.go b/lib/repo_query_fee.go index c9e3d6e..8dcc678 100644 --- a/lib/repo_query_fee.go +++ b/lib/repo_query_fee.go @@ -5,6 +5,7 @@ import ( "fmt" "strconv" + errorsmod "cosmossdk.io/errors" "github.com/cosmos/cosmos-sdk/types/query" "github.com/rs/zerolog/log" feemarkettypes "github.com/skip-mev/feemarket/x/feemarket/types" @@ -24,13 +25,17 @@ func SetGasPrice(price float64) { } // GetBaseFee queries the current base fee from the feemarket module -func (node *NodeConfig) GetBaseFee(ctx context.Context) (float64, error) { +func (node *NodeConfig) GetBaseFee(ctx context.Context, denom string) (float64, error) { + walletConfig, err := node.RPCManager.GetWalletConfig() + if err != nil { + return 0, errorsmod.Wrapf(err, "Error getting wallet config") + } resp, err := QueryDataWithRetry( ctx, - node.Wallet.MaxRetries, - node.Wallet.RetryDelay, + walletConfig.MaxRetries, + walletConfig.RetryDelay, func(ctx context.Context, req query.PageRequest) (*feemarkettypes.GasPriceResponse, error) { - return node.Chain.FeeMarketQueryClient.GasPrice(ctx, &feemarkettypes.GasPriceRequest{Denom: node.Chain.DefaultBondDenom}) + return node.Chain.FeeMarketQueryClient.GasPrice(ctx, &feemarkettypes.GasPriceRequest{Denom: denom}) }, query.PageRequest{}, // nolint:exhaustruct "get base fee", diff --git a/lib/repo_query_nonce.go b/lib/repo_query_nonce.go index 055b7fc..eed06b1 100644 --- a/lib/repo_query_nonce.go +++ b/lib/repo_query_nonce.go @@ -3,16 +3,21 @@ package lib import ( "context" + errorsmod "cosmossdk.io/errors" emissionstypes "github.com/allora-network/allora-chain/x/emissions/types" "github.com/cosmos/cosmos-sdk/types/query" ) // Gets the latest open worker nonce for a given topic, with retries func (node *NodeConfig) GetLatestOpenWorkerNonceByTopicId(ctx context.Context, topicId emissionstypes.TopicId) (*emissionstypes.Nonce, error) { + walletConfig, err := node.RPCManager.GetWalletConfig() + if err != nil { + return &emissionstypes.Nonce{}, errorsmod.Wrapf(err, "Error getting wallet config") + } resp, err := QueryDataWithRetry( ctx, - node.Wallet.MaxRetries, - node.Wallet.RetryDelay, + walletConfig.MaxRetries, + walletConfig.RetryDelay, func(ctx context.Context, req query.PageRequest) (*emissionstypes.GetUnfulfilledWorkerNoncesResponse, error) { return node.Chain.EmissionsQueryClient.GetUnfulfilledWorkerNonces(ctx, &emissionstypes.GetUnfulfilledWorkerNoncesRequest{ TopicId: topicId, @@ -35,10 +40,14 @@ func (node *NodeConfig) GetLatestOpenWorkerNonceByTopicId(ctx context.Context, t // Gets the oldest open reputer nonce for a given topic, with retries func (node *NodeConfig) GetOldestReputerNonceByTopicId(ctx context.Context, topicId emissionstypes.TopicId) (*emissionstypes.Nonce, error) { + walletConfig, err := node.RPCManager.GetWalletConfig() + if err != nil { + return &emissionstypes.Nonce{}, errorsmod.Wrapf(err, "Error getting wallet config") + } resp, err := QueryDataWithRetry( ctx, - node.Wallet.MaxRetries, - node.Wallet.RetryDelay, + walletConfig.MaxRetries, + walletConfig.RetryDelay, func(ctx context.Context, req query.PageRequest) (*emissionstypes.GetUnfulfilledReputerNoncesResponse, error) { return node.Chain.EmissionsQueryClient.GetUnfulfilledReputerNonces(ctx, &emissionstypes.GetUnfulfilledReputerNoncesRequest{ TopicId: topicId, diff --git a/lib/repo_query_registration.go b/lib/repo_query_registration.go index ed32adf..6c8556c 100644 --- a/lib/repo_query_registration.go +++ b/lib/repo_query_registration.go @@ -2,7 +2,6 @@ package lib import ( "context" - "errors" emissionstypes "github.com/allora-network/allora-chain/x/emissions/types" "github.com/cosmos/cosmos-sdk/types/query" @@ -10,18 +9,24 @@ import ( // Checks if the worker is registered in a topic, with retries func (node *NodeConfig) IsWorkerRegistered(ctx context.Context, topicId uint64) (bool, error) { - if node.Worker == nil { - return false, errors.New("no worker to register") + walletConfig, err := node.RPCManager.GetWalletConfig() + if err != nil { + return false, err + } + + wallet, err := node.RPCManager.GetWallet() + if err != nil { + return false, err } resp, err := QueryDataWithRetry( ctx, - node.Wallet.MaxRetries, - node.Wallet.RetryDelay, + walletConfig.MaxRetries, + walletConfig.RetryDelay, func(ctx context.Context, req query.PageRequest) (*emissionstypes.IsWorkerRegisteredInTopicIdResponse, error) { return node.Chain.EmissionsQueryClient.IsWorkerRegisteredInTopicId(ctx, &emissionstypes.IsWorkerRegisteredInTopicIdRequest{ TopicId: topicId, - Address: node.Wallet.Address, + Address: wallet.Address, }) }, query.PageRequest{}, // nolint: exhaustruct @@ -37,18 +42,24 @@ func (node *NodeConfig) IsWorkerRegistered(ctx context.Context, topicId uint64) // Checks if the reputer is registered in a topic, with retries func (node *NodeConfig) IsReputerRegistered(ctx context.Context, topicId uint64) (bool, error) { - if node.Reputer == nil { - return false, errors.New("no reputer to register") + walletConfig, err := node.RPCManager.GetWalletConfig() + if err != nil { + return false, err + } + + wallet, err := node.RPCManager.GetWallet() + if err != nil { + return false, err } resp, err := QueryDataWithRetry( ctx, - node.Wallet.MaxRetries, - node.Wallet.RetryDelay, + walletConfig.MaxRetries, + walletConfig.RetryDelay, func(ctx context.Context, req query.PageRequest) (*emissionstypes.IsReputerRegisteredInTopicIdResponse, error) { return node.Chain.EmissionsQueryClient.IsReputerRegisteredInTopicId(ctx, &emissionstypes.IsReputerRegisteredInTopicIdRequest{ TopicId: topicId, - Address: node.Wallet.Address, + Address: wallet.Address, }) }, query.PageRequest{}, // nolint: exhaustruct diff --git a/lib/repo_query_stake.go b/lib/repo_query_stake.go index b85f2c8..bd8cec5 100644 --- a/lib/repo_query_stake.go +++ b/lib/repo_query_stake.go @@ -3,6 +3,7 @@ package lib import ( "context" + errorsmod "cosmossdk.io/errors" cosmossdk_io_math "cosmossdk.io/math" emissionstypes "github.com/allora-network/allora-chain/x/emissions/types" "github.com/cosmos/cosmos-sdk/types/query" @@ -14,10 +15,14 @@ func (node *NodeConfig) GetReputerStakeInTopic( topicId emissionstypes.TopicId, reputer Address, ) (cosmossdk_io_math.Int, error) { + walletConfig, err := node.RPCManager.GetWalletConfig() + if err != nil { + return cosmossdk_io_math.Int{}, errorsmod.Wrapf(err, "Error getting wallet config") + } resp, err := QueryDataWithRetry( ctx, - node.Wallet.MaxRetries, - node.Wallet.RetryDelay, + walletConfig.MaxRetries, + walletConfig.RetryDelay, func(ctx context.Context, req query.PageRequest) (*emissionstypes.GetStakeFromReputerInTopicInSelfResponse, error) { return node.Chain.EmissionsQueryClient.GetStakeFromReputerInTopicInSelf(ctx, &emissionstypes.GetStakeFromReputerInTopicInSelfRequest{ ReputerAddress: reputer, diff --git a/lib/repo_query_status.go b/lib/repo_query_status.go index 5bd263c..0c35180 100644 --- a/lib/repo_query_status.go +++ b/lib/repo_query_status.go @@ -3,15 +3,20 @@ package lib import ( "context" + errorsmod "cosmossdk.io/errors" cmtservice "github.com/cosmos/cosmos-sdk/client/grpc/cmtservice" "github.com/cosmos/cosmos-sdk/types/query" ) func (node *NodeConfig) GetBlockHeight(ctx context.Context) (BlockHeight, error) { + walletConfig, err := node.RPCManager.GetWalletConfig() + if err != nil { + return 0, errorsmod.Wrapf(err, "Error getting wallet config") + } resp, err := QueryDataWithRetry( ctx, - node.Wallet.MaxRetries, - node.Wallet.RetryDelay, + walletConfig.MaxRetries, + walletConfig.RetryDelay, func(ctx context.Context, req query.PageRequest) (*cmtservice.GetLatestBlockResponse, error) { return node.Chain.CometQueryClient.GetLatestBlock(ctx, &cmtservice.GetLatestBlockRequest{}) }, diff --git a/lib/repo_query_topic.go b/lib/repo_query_topic.go index f65ef87..2c4303c 100644 --- a/lib/repo_query_topic.go +++ b/lib/repo_query_topic.go @@ -4,16 +4,21 @@ import ( "context" "errors" + errorsmod "cosmossdk.io/errors" emissionstypes "github.com/allora-network/allora-chain/x/emissions/types" "github.com/cosmos/cosmos-sdk/types/query" ) // Gets topic info for a given topic ID, with retries func (node *NodeConfig) GetTopicInfo(ctx context.Context, topicId emissionstypes.TopicId) (*emissionstypes.Topic, error) { + walletConfig, err := node.RPCManager.GetWalletConfig() + if err != nil { + return nil, errorsmod.Wrapf(err, "Error getting wallet config") + } resp, err := QueryDataWithRetry( ctx, - node.Wallet.MaxRetries, - node.Wallet.RetryDelay, + walletConfig.MaxRetries, + walletConfig.RetryDelay, func(ctx context.Context, req query.PageRequest) (*emissionstypes.GetTopicResponse, error) { return node.Chain.EmissionsQueryClient.GetTopic(ctx, &emissionstypes.GetTopicRequest{ TopicId: topicId, diff --git a/lib/repo_query_utils.go b/lib/repo_query_utils.go index d9ba13b..56c721c 100644 --- a/lib/repo_query_utils.go +++ b/lib/repo_query_utils.go @@ -20,7 +20,11 @@ func QueryDataWithRetry[T any]( node *NodeConfig, ) (T, error) { var result T - var err error + + walletConfig, err := node.RPCManager.GetWalletConfig() + if err != nil { + return result, errorsmod.Wrapf(err, "Error getting wallet config") + } for retryCount := int64(0); retryCount <= maxRetries; retryCount++ { log.Trace().Msgf("QueryDataWithRetry iteration started (%d/%d): %s", retryCount, maxRetries, infoMsg) @@ -32,16 +36,16 @@ func QueryDataWithRetry[T any]( // Log the error for each retry. log.Error().Err(err).Msgf("Query failed, retrying... (Retry %d/%d): %s", retryCount, maxRetries, infoMsg) - errorResponse, err := ProcessErrorTx(ctx, err, infoMsg, retryCount, node.Wallet.MaxRetries, node) + errorResponse, err := ProcessErrorTx(ctx, err, infoMsg, retryCount, walletConfig.MaxRetries, node) switch errorResponse { case ErrorProcessingOk: return result, nil case ErrorProcessingError: // if error has not been handled, sleep and retry with regular delay if err != nil { - log.Error().Err(err).Str("rpc", node.ServerAddress).Str("msg", infoMsg).Msgf("Failed, retrying... (Retry %d/%d)", retryCount, node.Wallet.MaxRetries) + log.Error().Err(err).Str("rpc", node.ServerAddress).Str("msg", infoMsg).Msgf("Failed, retrying... (Retry %d/%d)", retryCount, walletConfig.MaxRetries) // Wait for the uniform delay before retrying - if DoneOrWait(ctx, node.Wallet.RetryDelay) { + if DoneOrWait(ctx, walletConfig.RetryDelay) { return result, ctx.Err() } continue diff --git a/lib/rpc_manager.go b/lib/rpc_manager.go index cf110cd..29f0184 100644 --- a/lib/rpc_manager.go +++ b/lib/rpc_manager.go @@ -107,6 +107,16 @@ func NewRPCManager(ctx context.Context, userConfig UserConfig) (*RPCManager, err rpcManager.txNodes = txNodes rpcManager.queryIdx = 0 rpcManager.txIdx = 0 + + // Initialize the wallet with the account info + _, sequence, accNum, err := rpcManager.GetCurrentQueryNode().GetAccountInfo(ctx, wallet.Address) + if err != nil { + return nil, fmt.Errorf("failed to get account info: %w", err) + } + wallet.SetSequence(sequence) + wallet.AccountNumber = accNum + log.Info().Msgf("Wallet initialized successfully, with account (sequence: %d, accNum: %d)", sequence, accNum) + return rpcManager, nil } diff --git a/main.go b/main.go index f5541aa..d96e25e 100644 --- a/main.go +++ b/main.go @@ -130,6 +130,7 @@ func main() { // Check and set defaults for the user config if any values are not set finalUserConfig.CheckAndSetDefaults() + // Creates the RPCManagerand initialises the NodeConfigs rpcManager, err := lib.NewRPCManager(ctx, finalUserConfig) if err != nil { log.Error().Err(err).Msg("Failed to initialize RPCManager, exiting") diff --git a/usecase/build_commit_reputer_payload.go b/usecase/build_commit_reputer_payload.go index b336c18..f4f3b68 100644 --- a/usecase/build_commit_reputer_payload.go +++ b/usecase/build_commit_reputer_payload.go @@ -20,6 +20,14 @@ import ( func (suite *UseCaseSuite) BuildCommitReputerPayload(ctx context.Context, reputer lib.ReputerConfig, nonce lib.BlockHeight, timeoutHeight uint64) error { log := log.With().Uint64("topicId", reputer.TopicId).Str("actorType", "reputer").Logger() log.Info().Msg("Building reputer payload") + wallet, err := suite.RPCManager.GetWallet() + if err != nil { + return errorsmod.Wrapf(err, "Error getting wallet") + } + walletConfig, err := suite.RPCManager.GetWalletConfig() + if err != nil { + return errorsmod.Wrapf(err, "Error getting wallet config") + } valueBundle, err := lib.RunWithNodeRetry( ctx, @@ -36,19 +44,19 @@ func (suite *UseCaseSuite) BuildCommitReputerPayload(ctx context.Context, repute valueBundle.ReputerRequestNonce = &emissionstypes.ReputerRequestNonce{ ReputerNonce: &emissionstypes.Nonce{BlockHeight: nonce}, } - valueBundle.Reputer = suite.RPCManager.GetCurrentQueryNode().Wallet.Address + valueBundle.Reputer = wallet.Address sourceTruth, err := reputer.GroundTruthEntrypoint.GroundTruth(reputer, nonce) if err != nil { return errorsmod.Wrapf(err, "error getting source truth from reputer, topicId: %d, blockHeight: %d", reputer.TopicId, nonce) } - suite.Metrics.IncrementMetricsCounter(lib.TruthRequestCount, suite.RPCManager.GetCurrentQueryNode().Chain.Address, reputer.TopicId) + suite.Metrics.IncrementMetricsCounter(lib.TruthRequestCount, wallet.Address, reputer.TopicId) lossBundle, err := suite.ComputeLossBundle(sourceTruth, valueBundle, reputer) if err != nil { return errorsmod.Wrapf(err, "error computing loss bundle, topic: %d, blockHeight: %d", reputer.TopicId, nonce) } - suite.Metrics.IncrementMetricsCounter(lib.ReputerDataBuildCount, suite.RPCManager.GetCurrentQueryNode().Chain.Address, reputer.TopicId) + suite.Metrics.IncrementMetricsCounter(lib.ReputerDataBuildCount, wallet.Address, reputer.TopicId) signedValueBundle, err := suite.SignReputerValueBundle(&lossBundle) if err != nil { @@ -60,7 +68,7 @@ func (suite *UseCaseSuite) BuildCommitReputerPayload(ctx context.Context, repute } req := &emissionstypes.InsertReputerPayloadRequest{ - Sender: suite.RPCManager.GetCurrentQueryNode().Wallet.Address, + Sender: wallet.Address, ReputerValueBundle: signedValueBundle, } reqJSON, err := json.Marshal(req) @@ -70,12 +78,12 @@ func (suite *UseCaseSuite) BuildCommitReputerPayload(ctx context.Context, repute log.Debug().Msgf("Sending InsertReputerPayload to chain %s", string(reqJSON)) } - if suite.RPCManager.GetCurrentQueryNode().Wallet.SubmitTx { + if walletConfig.SubmitTx { _, err = suite.RPCManager.SendDataWithNodeRetry(ctx, req, timeoutHeight, "Send Reputer Data to chain") if err != nil { return errorsmod.Wrapf(err, "error sending Reputer Data to chain, topic: %d, blockHeight: %d", reputer.TopicId, nonce) } - suite.Metrics.IncrementMetricsCounter(lib.ReputerChainSubmissionCount, suite.RPCManager.GetCurrentQueryNode().Chain.Address, reputer.TopicId) + suite.Metrics.IncrementMetricsCounter(lib.ReputerChainSubmissionCount, wallet.Address, reputer.TopicId) } else { log.Info().Msg("SubmitTx=false; Skipping sending Reputer Data to chain") } @@ -212,9 +220,11 @@ func (suite *UseCaseSuite) ComputeLossBundle(sourceTruth string, vb *emissionsty } func (suite *UseCaseSuite) SignReputerValueBundle(valueBundle *emissionstypes.ValueBundle) (*emissionstypes.ReputerValueBundle, error) { - sig, pk, err := lib.MarshallAndSignByPrivKey(valueBundle, - suite.RPCManager.GetCurrentQueryNode().Chain.PrivKey, - suite.RPCManager.GetCurrentQueryNode().Chain.AddressSDK) + wallet, err := suite.RPCManager.GetWallet() + if err != nil { + return &emissionstypes.ReputerValueBundle{}, errorsmod.Wrapf(err, "error getting wallet") // nolint: exhaustruct + } + sig, pk, err := lib.MarshallAndSignByPrivKey(valueBundle, wallet.PrivKey, wallet.AddressSDK) if err != nil { return &emissionstypes.ReputerValueBundle{}, errorsmod.Wrapf(err, "error signing the InferenceForecastsBundle message") // nolint: exhaustruct } diff --git a/usecase/build_commit_worker_payload.go b/usecase/build_commit_worker_payload.go index ab059bb..d6e65f0 100644 --- a/usecase/build_commit_worker_payload.go +++ b/usecase/build_commit_worker_payload.go @@ -18,6 +18,15 @@ func (suite *UseCaseSuite) BuildCommitWorkerPayload(ctx context.Context, worker log := log.With().Uint64("topicId", worker.TopicId).Str("actorType", "worker").Logger() log.Info().Msg("Building worker payload") + wallet, err := suite.RPCManager.GetWallet() + if err != nil { + return errorsmod.Wrapf(err, "Error getting wallet") + } + walletConfig, err := suite.RPCManager.GetWalletConfig() + if err != nil { + return errorsmod.Wrapf(err, "Error getting wallet config") + } + if worker.InferenceEntrypoint == nil && worker.ForecastEntrypoint == nil { return errors.New("Worker has no valid Inference or Forecast entrypoints") } @@ -32,7 +41,7 @@ func (suite *UseCaseSuite) BuildCommitWorkerPayload(ctx context.Context, worker return errorsmod.Wrapf(err, "Error computing inference for worker, topicId: %d, blockHeight: %d", worker.TopicId, nonce.BlockHeight) } workerResponse.InfererValue = inference - suite.Metrics.IncrementMetricsCounter(lib.InferenceRequestCount, suite.RPCManager.GetCurrentQueryNode().Chain.Address, worker.TopicId) + suite.Metrics.IncrementMetricsCounter(lib.InferenceRequestCount, wallet.Address, worker.TopicId) } if worker.ForecastEntrypoint != nil { @@ -41,14 +50,14 @@ func (suite *UseCaseSuite) BuildCommitWorkerPayload(ctx context.Context, worker return errorsmod.Wrapf(err, "Error computing forecast for worker, topicId: %d, blockHeight: %d", worker.TopicId, nonce.BlockHeight) } workerResponse.ForecasterValues = forecasts - suite.Metrics.IncrementMetricsCounter(lib.ForecastRequestCount, suite.RPCManager.GetCurrentQueryNode().Chain.Address, worker.TopicId) + suite.Metrics.IncrementMetricsCounter(lib.ForecastRequestCount, wallet.Address, worker.TopicId) } workerPayload, err := suite.BuildWorkerPayload(workerResponse, nonce.BlockHeight) if err != nil { return errorsmod.Wrapf(err, "Error building worker payload, topicId: %d, blockHeight: %d", worker.TopicId, nonce.BlockHeight) } - suite.Metrics.IncrementMetricsCounter(lib.WorkerDataBuildCount, suite.RPCManager.GetCurrentQueryNode().Chain.Address, worker.TopicId) + suite.Metrics.IncrementMetricsCounter(lib.WorkerDataBuildCount, wallet.Address, worker.TopicId) workerDataBundle, err := suite.SignWorkerPayload(&workerPayload) if err != nil { @@ -62,7 +71,7 @@ func (suite *UseCaseSuite) BuildCommitWorkerPayload(ctx context.Context, worker } req := &emissionstypes.InsertWorkerPayloadRequest{ - Sender: suite.RPCManager.GetCurrentQueryNode().Wallet.Address, + Sender: wallet.Address, WorkerDataBundle: workerDataBundle, } reqJSON, err := json.Marshal(req) @@ -72,12 +81,12 @@ func (suite *UseCaseSuite) BuildCommitWorkerPayload(ctx context.Context, worker log.Info().Str("req", string(reqJSON)).Msg("Sending InsertWorkerPayload to chain") } - if suite.RPCManager.GetCurrentQueryNode().Wallet.SubmitTx { + if walletConfig.SubmitTx { _, err = suite.RPCManager.SendDataWithNodeRetry(ctx, req, timeoutHeight, "Send Worker Data to chain") if err != nil { return errorsmod.Wrapf(err, "Error sending Worker Data to chain, topicId: %d, blockHeight: %d", worker.TopicId, nonce.BlockHeight) } - suite.Metrics.IncrementMetricsCounter(lib.WorkerChainSubmissionCount, suite.RPCManager.GetCurrentQueryNode().Chain.Address, worker.TopicId) + suite.Metrics.IncrementMetricsCounter(lib.WorkerChainSubmissionCount, wallet.Address, worker.TopicId) } else { log.Info().Msg("SubmitTx=false; Skipping sending Worker Data to chain") } @@ -85,6 +94,10 @@ func (suite *UseCaseSuite) BuildCommitWorkerPayload(ctx context.Context, worker } func (suite *UseCaseSuite) BuildWorkerPayload(workerResponse lib.WorkerResponse, nonce emissionstypes.BlockHeight) (emissionstypes.InferenceForecastBundle, error) { + wallet, err := suite.RPCManager.GetWallet() + if err != nil { + return emissionstypes.InferenceForecastBundle{}, errorsmod.Wrapf(err, "error getting wallet") // nolint: exhaustruct + } inferenceForecastsBundle := emissionstypes.InferenceForecastBundle{} // nolint: exhaustruct @@ -95,7 +108,7 @@ func (suite *UseCaseSuite) BuildWorkerPayload(workerResponse lib.WorkerResponse, } builtInference := &emissionstypes.Inference{ // nolint: exhaustruct TopicId: workerResponse.TopicId, - Inferer: suite.RPCManager.GetCurrentQueryNode().Wallet.Address, + Inferer: wallet.Address, Value: infererValue, BlockHeight: nonce, } @@ -119,7 +132,7 @@ func (suite *UseCaseSuite) BuildWorkerPayload(workerResponse lib.WorkerResponse, forecasterValues := &emissionstypes.Forecast{ // nolint: exhaustruct TopicId: workerResponse.TopicId, BlockHeight: nonce, - Forecaster: suite.RPCManager.GetCurrentQueryNode().Wallet.Address, + Forecaster: wallet.Address, ForecastElements: forecasterElements, } inferenceForecastsBundle.Forecast = forecasterValues @@ -130,14 +143,18 @@ func (suite *UseCaseSuite) BuildWorkerPayload(workerResponse lib.WorkerResponse, func (suite *UseCaseSuite) SignWorkerPayload(workerPayload *emissionstypes.InferenceForecastBundle) (*emissionstypes.WorkerDataBundle, error) { // Marshall and sign the bundle - sig, pk, err := lib.MarshallAndSignByPrivKey(workerPayload, suite.RPCManager.GetCurrentQueryNode().Chain.PrivKey, suite.RPCManager.GetCurrentQueryNode().Chain.AddressSDK) + wallet, err := suite.RPCManager.GetWallet() + if err != nil { + return &emissionstypes.WorkerDataBundle{}, errorsmod.Wrapf(err, "error getting wallet") // nolint: exhaustruct + } + sig, pk, err := lib.MarshallAndSignByPrivKey(workerPayload, wallet.PrivKey, wallet.AddressSDK) if err != nil { return &emissionstypes.WorkerDataBundle{}, errorsmod.Wrapf(err, "error signing the InferenceForecastsBundle message") // nolint: exhaustruct } pkStr := hex.EncodeToString(pk) // Create workerDataBundle with signature workerDataBundle := &emissionstypes.WorkerDataBundle{ // nolint: exhaustruct - Worker: suite.RPCManager.GetCurrentQueryNode().Wallet.Address, + Worker: wallet.Address, InferenceForecastsBundle: workerPayload, InferencesForecastsBundleSignature: sig, Pubkey: pkStr, diff --git a/usecase/build_commit_worker_payload_test.go b/usecase/build_commit_worker_payload_test.go index fd8d57b..bf2df9b 100644 --- a/usecase/build_commit_worker_payload_test.go +++ b/usecase/build_commit_worker_payload_test.go @@ -121,18 +121,23 @@ func TestComputeWorkerBundle(t *testing.T) { tt.workerConfig.InferenceEntrypoint = mockAdapter tt.workerConfig.ForecastEntrypoint = mockAdapter + // Create mock wallet + mockWallet := &lib.Wallet{ // nolint:exhaustruct + Address: tt.address, + // Other wallet fields are not required for this test + } + // Replace RPCManager creation with mock mockRPCManager := &lib.MockRPCManager{} //nolint:exhaustruct - mockNodeConfig := &lib.NodeConfig{ //nolint:exhaustruct - Wallet: lib.WalletConfig{ //nolint:exhaustruct - Address: tt.address, - }, - } + mockNodeConfig := &lib.NodeConfig{} //nolint:exhaustruct + + // Add mock expectations mockRPCManager.On("GetCurrentQueryNode").Return(mockNodeConfig) mockRPCManager.On("GetCurrentTxNode").Return(mockNodeConfig) - suite := &UseCaseSuite{RPCManager: mockRPCManager} // nolint: exhaustruct + mockRPCManager.On("GetWallet").Return(mockWallet, nil) + + suite := &UseCaseSuite{RPCManager: mockRPCManager} //nolint:exhaustruct - suite.RPCManager.GetCurrentQueryNode().Wallet.Address = tt.address response, err := suite.BuildWorkerPayload(tt.workerConfig, 1) if tt.expectError { require.Error(t, err) diff --git a/usecase/gas_price_routine.go b/usecase/gas_price_routine.go index 4011ef1..1489af3 100644 --- a/usecase/gas_price_routine.go +++ b/usecase/gas_price_routine.go @@ -11,6 +11,16 @@ import ( // UpdateGasPriceRoutine continuously updates the gas price at a specified interval func (suite *UseCaseSuite) UpdateGasPriceRoutine(ctx context.Context) { + wallet, err := suite.RPCManager.GetWallet() + if err != nil { + log.Error().Err(err).Msg("Error getting wallet") + return + } + walletConfig, err := suite.RPCManager.GetWalletConfig() + if err != nil { + log.Error().Err(err).Msg("Error getting wallet config") + return + } for { select { case <-ctx.Done(): @@ -22,9 +32,9 @@ func (suite *UseCaseSuite) UpdateGasPriceRoutine(ctx context.Context) { suite.RPCManager, func(node *lib.NodeConfig) (float64, error) { return WithTimeoutResult(ctx, - time.Duration(suite.RPCManager.GetCurrentQueryNode().Wallet.TimeoutRPCSecondsQuery)*time.Second, + time.Duration(walletConfig.TimeoutRPCSecondsQuery)*time.Second, func(ctx context.Context) (float64, error) { - return suite.RPCManager.GetCurrentQueryNode().GetBaseFee(ctx) + return suite.RPCManager.GetCurrentQueryNode().GetBaseFee(ctx, wallet.DefaultBondDenom) }) }, "get base fee", @@ -35,7 +45,7 @@ func (suite *UseCaseSuite) UpdateGasPriceRoutine(ctx context.Context) { } lib.SetGasPrice(price) log.Debug().Float64("gasPrice", lib.GetGasPrice()).Msg("Updating fee price routine: updating value.") - time.Sleep(time.Duration(suite.RPCManager.GetCurrentQueryNode().Wallet.GasPriceUpdateInterval) * time.Second) + time.Sleep(time.Duration(walletConfig.GasPriceUpdateInterval) * time.Second) } } } diff --git a/usecase/repo_tx_registration.go b/usecase/repo_tx_registration.go index cfddc91..2e24261 100644 --- a/usecase/repo_tx_registration.go +++ b/usecase/repo_tx_registration.go @@ -4,6 +4,7 @@ import ( lib "allora_offchain_node/lib" "context" + errorsmod "cosmossdk.io/errors" emissionstypes "github.com/allora-network/allora-chain/x/emissions/types" "github.com/rs/zerolog/log" ) @@ -14,8 +15,18 @@ func (suite *UseCaseSuite) RegisterWorkerIdempotently(ctx context.Context, confi log := log.With().Uint64("topicId", config.TopicId).Str("actorType", "worker").Logger() log.Info().Msg("Registering worker") + walletConfig, err := suite.RPCManager.GetWalletConfig() + if err != nil { + return false, errorsmod.Wrapf(err, "Error getting wallet config") + } rpcManager := suite.RPCManager queryNode := rpcManager.GetCurrentQueryNode() + wallet, err := rpcManager.GetWallet() + if err != nil { + log.Error().Err(err).Msg("Could not get wallet") + return false, err + } + isRegistered, err := queryNode.IsWorkerRegistered(ctx, config.TopicId) if err != nil { log.Error().Err(err).Str("rpc", queryNode.ServerAddress).Msg("Could not check if the node is already registered for topic as worker, skipping") @@ -37,7 +48,8 @@ func (suite *UseCaseSuite) RegisterWorkerIdempotently(ctx context.Context, confi // Switch to spread the load queryNode = rpcManager.SwitchToNextQueryNode() - balance, err := queryNode.GetBalance(ctx) + + balance, err := queryNode.GetBalance(ctx, wallet.Address, wallet.DefaultBondDenom) if err != nil { log.Error().Err(err).Msg("Could not check if the node has enough balance to register, skipping") return false, err @@ -47,11 +59,6 @@ func (suite *UseCaseSuite) RegisterWorkerIdempotently(ctx context.Context, confi return false, lib.ErrNotEnoughBalance } - wallet, err := rpcManager.GetWallet() - if err != nil { - log.Error().Err(err).Msg("Could not get wallet") - return false, err - } msg := &emissionstypes.RegisterRequest{ Sender: wallet.Address, TopicId: config.TopicId, @@ -74,8 +81,8 @@ func (suite *UseCaseSuite) RegisterWorkerIdempotently(ctx context.Context, confi } // Give time for the tx to be included in a block - log.Debug().Int64("delay", int64(queryNode.Wallet.BlockDurationEstimated)*2).Msg("Waiting to check registration status to be included in a block...") - if lib.DoneOrWait(ctx, int64(queryNode.Wallet.BlockDurationEstimated)*2) { + log.Debug().Int64("delay", int64(walletConfig.BlockDurationEstimated)*2).Msg("Waiting to check registration status to be included in a block...") + if lib.DoneOrWait(ctx, int64(walletConfig.BlockDurationEstimated)*2) { log.Error().Err(ctx.Err()).Str("rpc", queryNode.ServerAddress).Msg("Waiting to check registration status failed") return false, ctx.Err() } @@ -95,7 +102,10 @@ func (suite *UseCaseSuite) RegisterAndStakeReputerIdempotently(ctx context.Conte log := log.With().Uint64("topicId", config.TopicId).Str("actorType", "reputer").Logger() log.Info().Msg("Registering reputer") - // Initialize variables to be used throughout the function + walletConfig, err := suite.RPCManager.GetWalletConfig() + if err != nil { + return false, errorsmod.Wrapf(err, "Error getting wallet config") + } rpcManager := suite.RPCManager queryNode := rpcManager.GetCurrentQueryNode() wallet, err := rpcManager.GetWallet() @@ -115,7 +125,7 @@ func (suite *UseCaseSuite) RegisterAndStakeReputerIdempotently(ctx context.Conte } else { log.Info().Msg("Node not yet registered. Attempting registration...") - balance, err := queryNode.GetBalance(ctx) + balance, err := queryNode.GetBalance(ctx, wallet.Address, wallet.DefaultBondDenom) if err != nil { log.Error().Err(err).Msg("Could not check if the node has enough balance to register, skipping") return false, err @@ -151,8 +161,8 @@ func (suite *UseCaseSuite) RegisterAndStakeReputerIdempotently(ctx context.Conte } // Give time for the tx to be included in a block - log.Debug().Int64("delay", int64(queryNode.Wallet.BlockDurationEstimated)*2).Msg("Waiting to check registration status to be included in a block...") - if lib.DoneOrWait(ctx, int64(queryNode.Wallet.BlockDurationEstimated)*2) { + log.Debug().Int64("delay", int64(walletConfig.BlockDurationEstimated)*2).Msg("Waiting to check registration status to be included in a block...") + if lib.DoneOrWait(ctx, int64(walletConfig.BlockDurationEstimated)*2) { log.Error().Err(ctx.Err()).Str("rpc", queryNode.ServerAddress).Msg("Waiting to check registration status failed") return false, ctx.Err() } @@ -211,8 +221,8 @@ func (suite *UseCaseSuite) RegisterAndStakeReputerIdempotently(ctx context.Conte } // Give time for the tx to be included in a block - log.Debug().Int64("delay", int64(queryNode.Wallet.BlockDurationEstimated)*2).Msg("Waiting to check stake status to be included in a block...") - if lib.DoneOrWait(ctx, int64(queryNode.Wallet.BlockDurationEstimated)*2) { + log.Debug().Int64("delay", int64(walletConfig.BlockDurationEstimated)*2).Msg("Waiting to check stake status to be included in a block...") + if lib.DoneOrWait(ctx, int64(walletConfig.BlockDurationEstimated)*2) { log.Error().Err(ctx.Err()).Str("rpc", queryNode.ServerAddress).Msg("Waiting to check stake status failed") return false, ctx.Err() } diff --git a/usecase/spawn_actor_processes.go b/usecase/spawn_actor_processes.go index 4329476..44f2afe 100644 --- a/usecase/spawn_actor_processes.go +++ b/usecase/spawn_actor_processes.go @@ -47,15 +47,25 @@ type ActorProcessParams[T lib.TopicActor] struct { // Spawns the actor processes and any associated non-essential routines func (suite *UseCaseSuite) Spawn(ctx context.Context) { - if suite.RPCManager.GetCurrentQueryNode().Wallet.GasPrices == lib.AutoGasPrices { + wallet, err := suite.RPCManager.GetWallet() + if err != nil { + log.Error().Err(err).Msg("Error getting wallet") + return + } + walletConfig, err := suite.RPCManager.GetWalletConfig() + if err != nil { + log.Error().Err(err).Msg("Error getting wallet config") + return + } + if walletConfig.GasPrices == lib.AutoGasPrices { log.Info().Msg("auto gas prices. Updating fee price routine: starting.") - price, err := WithTimeoutResult(ctx, time.Duration(suite.RPCManager.GetCurrentQueryNode().Wallet.TimeoutRPCSecondsQuery)*time.Second, + price, err := WithTimeoutResult(ctx, time.Duration(walletConfig.TimeoutRPCSecondsQuery)*time.Second, func(ctx context.Context) (float64, error) { return lib.RunWithNodeRetry( ctx, suite.RPCManager, func(node *lib.NodeConfig) (float64, error) { - return node.GetBaseFee(ctx) + return node.GetBaseFee(ctx, wallet.DefaultBondDenom) }, "get base fee", lib.GRPC_MODE, @@ -70,7 +80,7 @@ func (suite *UseCaseSuite) Spawn(ctx context.Context) { // After intialization, start auto-update routine go suite.UpdateGasPriceRoutine(ctx) } else { - price, err := strconv.ParseFloat(suite.RPCManager.GetCurrentQueryNode().Wallet.GasPrices, 64) + price, err := strconv.ParseFloat(walletConfig.GasPrices, 64) if err != nil { log.Error().Err(err).Msg("Invalid gas prices format") return @@ -85,7 +95,7 @@ func (suite *UseCaseSuite) Spawn(ctx context.Context) { // Run worker process per topic alreadyStartedWorkerForTopic := make(map[emissionstypes.TopicId]bool) - for _, worker := range suite.RPCManager.GetCurrentQueryNode().Worker { + for _, worker := range suite.UserConfig.Worker { if _, ok := alreadyStartedWorkerForTopic[worker.TopicId]; ok { log.Warn().Uint64("topicId", worker.TopicId).Msg("Worker already started for topicId") continue @@ -99,7 +109,7 @@ func (suite *UseCaseSuite) Spawn(ctx context.Context) { log.Error().Uint64("topicId", worker.TopicId).Err(ctx.Err()).Msg("Worker process finished") }(worker) - if lib.DoneOrWait(ctx, suite.RPCManager.GetCurrentQueryNode().Wallet.LaunchRoutineDelay) { + if lib.DoneOrWait(ctx, walletConfig.LaunchRoutineDelay) { log.Error().Msg("Worker process finished") return } @@ -107,7 +117,7 @@ func (suite *UseCaseSuite) Spawn(ctx context.Context) { // Run reputer process per topic alreadyStartedReputerForTopic := make(map[emissionstypes.TopicId]bool) - for _, reputer := range suite.RPCManager.GetCurrentQueryNode().Reputer { + for _, reputer := range suite.UserConfig.Reputer { if _, ok := alreadyStartedReputerForTopic[reputer.TopicId]; ok { log.Warn().Uint64("topicId", reputer.TopicId).Msg("Reputer already started for topicId") continue @@ -121,7 +131,7 @@ func (suite *UseCaseSuite) Spawn(ctx context.Context) { log.Error().Uint64("topicId", reputer.TopicId).Err(ctx.Err()).Msg("Reputer process finished") }(reputer) - if lib.DoneOrWait(ctx, suite.RPCManager.GetCurrentQueryNode().Wallet.LaunchRoutineDelay) { + if lib.DoneOrWait(ctx, walletConfig.LaunchRoutineDelay) { log.Error().Msg("Reputer process finished") return } @@ -139,8 +149,16 @@ func (suite *UseCaseSuite) Spawn(ctx context.Context) { // Attempts to build and commit a worker payload for a given nonce // Returns the nonce height acted upon (the received one or the new one if any) func (suite *UseCaseSuite) processWorkerPayload(ctx context.Context, worker lib.WorkerConfig, latestNonceHeightActedUpon int64, timeoutHeight uint64) (int64, error) { + walletConfig, err := suite.RPCManager.GetWalletConfig() + if err != nil { + return 0, errorsmod.Wrapf(err, "Error getting wallet config") + } + wallet, err := suite.RPCManager.GetWallet() + if err != nil { + return 0, errorsmod.Wrapf(err, "Error getting wallet") + } // Get latest nonce with RPC timeout - latestOpenWorkerNonce, err := WithTimeoutResult(ctx, time.Duration(suite.RPCManager.GetCurrentQueryNode().Wallet.TimeoutRPCSecondsQuery)*time.Second, + latestOpenWorkerNonce, err := WithTimeoutResult(ctx, time.Duration(walletConfig.TimeoutRPCSecondsQuery)*time.Second, func(ctx context.Context) (*emissionstypes.Nonce, error) { return suite.RPCManager.GetCurrentQueryNode().GetLatestOpenWorkerNonceByTopicId(ctx, worker.TopicId) }) @@ -152,9 +170,9 @@ func (suite *UseCaseSuite) processWorkerPayload(ctx context.Context, worker lib. if latestOpenWorkerNonce.BlockHeight > latestNonceHeightActedUpon { // Check whitelist with RPC timeout - isWhitelisted, err := WithTimeoutResult(ctx, time.Duration(suite.RPCManager.GetCurrentQueryNode().Wallet.TimeoutRPCSecondsQuery)*time.Second, + isWhitelisted, err := WithTimeoutResult(ctx, time.Duration(walletConfig.TimeoutRPCSecondsQuery)*time.Second, func(ctx context.Context) (bool, error) { - return suite.RPCManager.GetCurrentQueryNode().CanSubmitWorker(ctx, worker.TopicId, suite.RPCManager.GetCurrentQueryNode().Wallet.Address) + return suite.RPCManager.GetCurrentQueryNode().CanSubmitWorker(ctx, worker.TopicId, wallet.Address) }) if err != nil { @@ -167,7 +185,7 @@ func (suite *UseCaseSuite) processWorkerPayload(ctx context.Context, worker lib. } // Build and commit payload with transaction timeout - err = WithTimeout(ctx, time.Duration(suite.RPCManager.GetCurrentQueryNode().Wallet.TimeoutRPCSecondsTx)*time.Second, + err = WithTimeout(ctx, time.Duration(walletConfig.TimeoutRPCSecondsTx)*time.Second, func(ctx context.Context) error { return suite.BuildCommitWorkerPayload(ctx, worker, latestOpenWorkerNonce, timeoutHeight) }) @@ -191,6 +209,14 @@ func (suite *UseCaseSuite) processWorkerPayload(ctx context.Context, worker lib. func (suite *UseCaseSuite) processReputerPayload(ctx context.Context, reputer lib.ReputerConfig, latestNonceHeightActedUpon int64, timeoutHeight uint64) (int64, error) { log := log.With().Uint64("topicId", reputer.TopicId).Str("actorType", "reputer").Logger() log.Info().Msg("Processing reputer payload") + walletConfig, err := suite.RPCManager.GetWalletConfig() + if err != nil { + return 0, errorsmod.Wrapf(err, "Error getting wallet config") + } + wallet, err := suite.RPCManager.GetWallet() + if err != nil { + return 0, errorsmod.Wrapf(err, "Error getting wallet") + } // Get nonce with RPC timeout nonce, err := lib.RunWithNodeRetry( @@ -198,7 +224,7 @@ func (suite *UseCaseSuite) processReputerPayload(ctx context.Context, reputer li suite.RPCManager, func(node *lib.NodeConfig) (*emissionstypes.Nonce, error) { return WithTimeoutResult(ctx, - time.Duration(suite.RPCManager.GetCurrentQueryNode().Wallet.TimeoutRPCSecondsQuery)*time.Second, + time.Duration(walletConfig.TimeoutRPCSecondsQuery)*time.Second, func(ctx context.Context) (*emissionstypes.Nonce, error) { return node.GetOldestReputerNonceByTopicId(ctx, reputer.TopicId) }) @@ -218,9 +244,9 @@ func (suite *UseCaseSuite) processReputerPayload(ctx context.Context, reputer li suite.RPCManager, func(node *lib.NodeConfig) (bool, error) { return WithTimeoutResult(ctx, - time.Duration(suite.RPCManager.GetCurrentQueryNode().Wallet.TimeoutRPCSecondsQuery)*time.Second, + time.Duration(walletConfig.TimeoutRPCSecondsQuery)*time.Second, func(ctx context.Context) (bool, error) { - return node.CanSubmitReputer(ctx, reputer.TopicId, node.Wallet.Address) + return node.CanSubmitReputer(ctx, reputer.TopicId, wallet.Address) }) }, "check reputer whitelist", @@ -236,7 +262,7 @@ func (suite *UseCaseSuite) processReputerPayload(ctx context.Context, reputer li } // Build and commit payload with transaction timeout - err = WithTimeout(ctx, time.Duration(suite.RPCManager.GetCurrentQueryNode().Wallet.TimeoutRPCSecondsTx)*time.Second, + err = WithTimeout(ctx, time.Duration(walletConfig.TimeoutRPCSecondsTx)*time.Second, func(ctx context.Context) error { return suite.BuildCommitReputerPayload(ctx, reputer, nonce.BlockHeight, timeoutHeight) }) @@ -285,12 +311,22 @@ func (suite *UseCaseSuite) runWorkerProcess(ctx context.Context, worker lib.Work log := log.With().Uint64("topicId", worker.TopicId).Str("actorType", "worker").Logger() log.Info().Msg("Running worker process for topic") + walletConfig, err := suite.RPCManager.GetWalletConfig() + if err != nil { + log.Error().Err(err).Msg("Failed to get wallet config") + return + } + wallet, err := suite.RPCManager.GetWallet() + if err != nil { + log.Error().Err(err).Msg("Failed to get wallet") + return + } // Handle registration registered, err := lib.RunWithNodeRetry( ctx, suite.RPCManager, func(node *lib.NodeConfig) (bool, error) { - return WithTimeoutResult(ctx, time.Duration(suite.RPCManager.GetCurrentQueryNode().Wallet.TimeoutRPCSecondsRegistration)*time.Second, + return WithTimeoutResult(ctx, time.Duration(walletConfig.TimeoutRPCSecondsRegistration)*time.Second, func(ctx context.Context) (bool, error) { return suite.RegisterWorkerIdempotently(ctx, worker) }) @@ -341,9 +377,9 @@ func (suite *UseCaseSuite) runWorkerProcess(ctx context.Context, worker lib.Work ctx, suite.RPCManager, func(node *lib.NodeConfig) (bool, error) { - return WithTimeoutResult(ctx, time.Duration(suite.RPCManager.GetCurrentQueryNode().Wallet.TimeoutRPCSecondsQuery)*time.Second, + return WithTimeoutResult(ctx, time.Duration(walletConfig.TimeoutRPCSecondsQuery)*time.Second, func(ctx context.Context) (bool, error) { - return node.CanSubmitWorker(ctx, worker.TopicId, node.Wallet.Address) + return node.CanSubmitWorker(ctx, worker.TopicId, wallet.Address) }) }, "check worker whitelist", @@ -367,14 +403,23 @@ func (suite *UseCaseSuite) runReputerProcess(ctx context.Context, reputer lib.Re // Create a logger with the topicId log := log.With().Uint64("topicId", reputer.TopicId).Str("actorType", "reputer").Logger() log.Debug().Msg("Running reputer process for topic") - + walletConfig, err := suite.RPCManager.GetWalletConfig() + if err != nil { + log.Error().Err(err).Msg("Failed to get wallet config") + return + } + wallet, err := suite.RPCManager.GetWallet() + if err != nil { + log.Error().Err(err).Msg("Failed to get wallet") + return + } // Handle registration and staking registeredAndStaked, err := lib.RunWithNodeRetry( ctx, suite.RPCManager, func(node *lib.NodeConfig) (bool, error) { return WithTimeoutResult(ctx, - time.Duration(suite.RPCManager.GetCurrentQueryNode().Wallet.TimeoutRPCSecondsRegistration)*time.Second, + time.Duration(walletConfig.TimeoutRPCSecondsRegistration)*time.Second, func(ctx context.Context) (bool, error) { return suite.RegisterAndStakeReputerIdempotently(ctx, reputer) }) @@ -425,9 +470,9 @@ func (suite *UseCaseSuite) runReputerProcess(ctx context.Context, reputer lib.Re suite.RPCManager, func(node *lib.NodeConfig) (bool, error) { return WithTimeoutResult(ctx, - time.Duration(suite.RPCManager.GetCurrentQueryNode().Wallet.TimeoutRPCSecondsQuery)*time.Second, + time.Duration(walletConfig.TimeoutRPCSecondsQuery)*time.Second, func(ctx context.Context) (bool, error) { - return node.CanSubmitReputer(ctx, reputer.TopicId, node.Wallet.Address) + return node.CanSubmitReputer(ctx, reputer.TopicId, wallet.Address) }) }, "check reputer whitelist", @@ -452,9 +497,14 @@ func (suite *UseCaseSuite) runReputerProcess(ctx context.Context, reputer lib.Re func runActorProcess[T lib.TopicActor](ctx context.Context, suite *UseCaseSuite, params ActorProcessParams[T]) { // Create a logger with the topicId and actorType log := log.With().Uint64("topicId", params.Config.GetTopicId()).Str("actorType", params.ActorType).Logger() - log.Debug().Msg("Running actor process for topic") + walletConfig, err := suite.RPCManager.GetWalletConfig() + if err != nil { + log.Error().Err(err).Msg("Failed to get wallet config") + return + } + topicInfo, err := queryTopicInfo(ctx, suite, params.Config) if err != nil { log.Error().Err(err).Msg("Failed to get topic info after retries") @@ -469,7 +519,7 @@ func runActorProcess[T lib.TopicActor](ctx context.Context, suite *UseCaseSuite, for { log.Trace().Msg("Start iteration, querying latest block") // Query the latest block - currentBlockHeight, err = WithTimeoutResult(ctx, time.Duration(suite.RPCManager.GetCurrentQueryNode().Wallet.TimeoutRPCSecondsQuery)*time.Second, + currentBlockHeight, err = WithTimeoutResult(ctx, time.Duration(walletConfig.TimeoutRPCSecondsQuery)*time.Second, func(ctx context.Context) (lib.BlockHeight, error) { return suite.RPCManager.GetCurrentQueryNode().GetBlockHeight(ctx) }) @@ -506,7 +556,7 @@ func runActorProcess[T lib.TopicActor](ctx context.Context, suite *UseCaseSuite, // Wait for an epochLength with a correction factor, it will self-adjust from there waitingTimeInSeconds, err := calculateTimeDistanceInSeconds( epochLength, - suite.RPCManager.GetCurrentQueryNode().Wallet.BlockDurationEstimated, + walletConfig.BlockDurationEstimated, NEW_TOPIC_CORRECTION_FACTOR, ) if err != nil { @@ -550,8 +600,8 @@ func runActorProcess[T lib.TopicActor](ctx context.Context, suite *UseCaseSuite, waitingTimeInSeconds, err = calculateTimeDistanceInSeconds( distanceUntilNextEpoch, - suite.RPCManager.GetCurrentQueryNode().Wallet.BlockDurationEstimated, - suite.RPCManager.GetCurrentQueryNode().Wallet.WindowCorrectionFactor, + walletConfig.BlockDurationEstimated, + walletConfig.WindowCorrectionFactor, ) if err != nil { log.Error().Err(err).Msg("Error calculating time distance to next epoch after sending tx") @@ -567,7 +617,7 @@ func runActorProcess[T lib.TopicActor](ctx context.Context, suite *UseCaseSuite, // Inconsistent topic data, wait until the next epoch waitingTimeInSeconds, err = calculateTimeDistanceInSeconds( epochLength, - suite.RPCManager.GetCurrentQueryNode().Wallet.BlockDurationEstimated, + walletConfig.BlockDurationEstimated, NEARNESS_CORRECTION_FACTOR, ) if err != nil { @@ -588,7 +638,7 @@ func runActorProcess[T lib.TopicActor](ctx context.Context, suite *UseCaseSuite, closeBlockDistance := distanceUntilNextEpoch + offset waitingTimeInSeconds, err = calculateTimeDistanceInSeconds( closeBlockDistance, - suite.RPCManager.GetCurrentQueryNode().Wallet.BlockDurationEstimated, + walletConfig.BlockDurationEstimated, NEARNESS_CORRECTION_FACTOR, ) if err != nil { @@ -607,8 +657,8 @@ func runActorProcess[T lib.TopicActor](ctx context.Context, suite *UseCaseSuite, // Far distance, bigger waits until the submission window opens waitingTimeInSeconds, err = calculateTimeDistanceInSeconds( distanceUntilNextEpoch, - suite.RPCManager.GetCurrentQueryNode().Wallet.BlockDurationEstimated, - suite.RPCManager.GetCurrentQueryNode().Wallet.WindowCorrectionFactor, + walletConfig.BlockDurationEstimated, + walletConfig.WindowCorrectionFactor, ) if err != nil { log.Error().Err(err).Msg("Error calculating far distance to epochLength") @@ -635,8 +685,12 @@ func queryTopicInfo[T lib.TopicActor]( suite *UseCaseSuite, config T, ) (*emissionstypes.Topic, error) { + walletConfig, err := suite.RPCManager.GetWalletConfig() + if err != nil { + return nil, errorsmod.Wrapf(err, "Error getting wallet config") + } topicInfo, err := WithTimeoutResult(ctx, - time.Duration(suite.RPCManager.GetCurrentQueryNode().Wallet.TimeoutRPCSecondsQuery)*time.Second, + time.Duration(walletConfig.TimeoutRPCSecondsQuery)*time.Second, func(ctx context.Context) (*emissionstypes.Topic, error) { return suite.RPCManager.GetCurrentQueryNode().GetTopicInfo(ctx, config.GetTopicId()) })