Skip to content

Commit

Permalink
fix submitTx, new errors, refactors, and minor fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
xmariachi committed Jan 13, 2025
1 parent 89c1aa9 commit a4b1362
Show file tree
Hide file tree
Showing 18 changed files with 272 additions and 216 deletions.
12 changes: 10 additions & 2 deletions lib/domain_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ type UserConfig struct {
}

type NodeConfig struct {
RPC string
Chain ChainConfig
Wallet WalletConfig
Worker []WorkerConfig
Expand Down Expand Up @@ -222,8 +223,15 @@ func (c *UserConfig) ValidateWalletConfig() error {
}

func (reputerConfig *ReputerConfig) ValidateReputerConfig() error {
if reputerConfig.GroundTruthEntrypoint != nil && !reputerConfig.GroundTruthEntrypoint.CanSourceGroundTruthAndComputeLoss() {
return errors.New("invalid loss entrypoint")
if reputerConfig.GroundTruthEntrypointName == "" ||
reputerConfig.GroundTruthEntrypoint == nil ||
(reputerConfig.GroundTruthEntrypoint != nil &&
!reputerConfig.GroundTruthEntrypoint.CanSourceGroundTruthAndComputeLoss()) {
return errors.New("invalid ground truth entrypoint")
}
if reputerConfig.LossFunctionEntrypointName == "" ||
reputerConfig.LossFunctionEntrypoint == nil {
return errors.New("invalid loss function entrypoint")
}
return nil
}
Expand Down
244 changes: 157 additions & 87 deletions lib/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,22 +20,33 @@ import (
const ErrorCodespace = "allora-offchain-lib"

var (
ErrTooManyRequests = errorsmod.Register(ErrorCodespace, 1, "too many requests")
ErrNotEnoughBalance = errorsmod.Register(ErrorCodespace, 2, "not enough balance")
ErrNotRegistered = errorsmod.Register(ErrorCodespace, 3, "not registered")
ErrStakeBelowMin = errorsmod.Register(ErrorCodespace, 4, "stake below minimum")
ErrFullMempool = errorsmod.Register(ErrorCodespace, 5, "full mempool")
HTTPError = errorsmod.Register(ErrorCodespace, 1, "http error")
ErrNotEnoughBalance = errorsmod.Register(ErrorCodespace, 2, "not enough balance")
ErrNotRegistered = errorsmod.Register(ErrorCodespace, 3, "not registered")
ErrStakeBelowMin = errorsmod.Register(ErrorCodespace, 4, "stake below minimum")
ErrFullMempool = errorsmod.Register(ErrorCodespace, 5, "full mempool")
ErrReadPanic = errorsmod.Register(ErrorCodespace, 6, "read panic")
ErrConnectionRefused = errorsmod.Register(ErrorCodespace, 7, "connection refused")
ErrUnexpectedError = errorsmod.Register(ErrorCodespace, 10, "unexpected error")
)

// Marker for ABCI error codes
const ErrorMessageAbciErrorCodeMarker = "error code:"

// Errors substrings that are not ABCI errors and do not have a specific error code
const ErrorMessageDataAlreadySubmitted = "already submitted"
const ErrorMessageCannotUpdateEma = "cannot update EMA"
const ErrorMessageWaitingForNextBlock = "waiting for next block" // This means tx is accepted in mempool but not yet included in a block
const ErrorMessageAccountSequenceMismatch = "account sequence mismatch"
const ErrorMessageTimeoutHeight = "timeout height"
const ErrorMessageNotPermittedToSubmitPayload = "not permitted to submit payload"
const ErrorMessageNotPermittedToAddStake = "not permitted to add stake"
const ErrorMessageReadFlatPanic = "{ReadFlat}: panic"
const ErrorMessageReadPerBytePanic = "{ReadPerByte}: panic"
const ErrorMessageConnectionRefused = "connection refused"
const ErrorMessageNoInferencesFoundForTopic = "no inferences found for topic"

// Excess correction in gas for txs that are not successful
const ExcessCorrectionInGas = 20000

// Error processing types
Expand All @@ -54,7 +65,12 @@ const ErrorProcessingSwitchingNode = "switch"

// HTTP status codes that trigger node switching
var HTTPStatusCodeCodesSwitchingNode = map[int]bool{
403: true, // Forbidden
429: true, // Too Many Requests
502: true, // Bad Gateway
503: true, // Service Unavailable
504: true, // Gateway Timeout
505: true, // HTTP Version Not Supported
}

// calculateExponentialBackoffDelay returns a duration based on retry count and base delay
Expand All @@ -63,102 +79,119 @@ func calculateExponentialBackoffDelaySeconds(baseDelay int64, retryCount int64)
}

// processError handles the error messages.
func ProcessErrorTx(ctx context.Context, err error, infoMsg string, retryCount int64, node *NodeConfig) (string, error) {
func ProcessErrorTx(ctx context.Context, err error, infoMsg string, retryCount, retryMax int64, node *NodeConfig) (string, error) {
if strings.Contains(err.Error(), ErrorMessageAbciErrorCodeMarker) {
re := regexp.MustCompile(`error code: '(\d+)'`)
matches := re.FindStringSubmatch(err.Error())
if len(matches) == 2 {
errorCode, parseErr := strconv.Atoi(matches[1])
if parseErr != nil {
log.Error().Err(parseErr).Str("msg", infoMsg).Msg("Failed to parse ABCI error code")
log.Error().Err(parseErr).Str("rpc", node.RPC).Str("msg", infoMsg).Msg("Failed to parse ABCI error code")
} else {
switch errorCode {
case int(sdkerrors.ErrMempoolIsFull.ABCICode()):
log.Warn().
Err(err).
Str("msg", infoMsg).
Msg("Mempool is full, switching to next node")
return ErrorProcessingSwitchingNode, ErrFullMempool
case int(sdkerrors.ErrWrongSequence.ABCICode()), int(sdkerrors.ErrInvalidSequence.ABCICode()):
log.Warn().
Err(err).
Str("msg", infoMsg).
Int64("delay", node.Wallet.AccountSequenceRetryDelay).
Msg("Account sequence mismatch detected, retrying with fixed delay")
// Wait a fixed block-related waiting time
if DoneOrWait(ctx, node.Wallet.AccountSequenceRetryDelay) {
return ErrorProcessingError, ctx.Err()
}
return ErrorProcessingContinue, nil
case int(sdkerrors.ErrInsufficientFee.ABCICode()):
log.Info().
Err(err).
Str("msg", infoMsg).
Msg("Insufficient fees")
return ErrorProcessingFees, nil
case int(feemarkettypes.ErrNoFeeCoins.ABCICode()):
log.Info().
Err(err).
Str("msg", infoMsg).
Msg("No fee coins")
return ErrorProcessingFees, nil
case int(sdkerrors.ErrTxTooLarge.ABCICode()):
return ErrorProcessingError, errorsmod.Wrapf(err, "tx too large")
case int(sdkerrors.ErrTxInMempoolCache.ABCICode()):
return ErrorProcessingError, errorsmod.Wrapf(err, "tx already in mempool cache")
case int(sdkerrors.ErrInvalidChainID.ABCICode()):
return ErrorProcessingError, errorsmod.Wrapf(err, "invalid chain-id")
case int(sdkerrors.ErrTxTimeoutHeight.ABCICode()):
return ErrorProcessingFailure, errorsmod.Wrapf(err, "tx timeout height")
case int(emissions.ErrWorkerNonceWindowNotAvailable.ABCICode()):
log.Warn().
Err(err).
Str("msg", infoMsg).
Msg("Worker window not available, retrying with exponential backoff")
delay := calculateExponentialBackoffDelaySeconds(node.Wallet.RetryDelay, retryCount)
if DoneOrWait(ctx, delay) {
return ErrorProcessingError, ctx.Err()
}
return ErrorProcessingContinue, nil
case int(emissions.ErrReputerNonceWindowNotAvailable.ABCICode()):
log.Warn().
Err(err).
Str("msg", infoMsg).
Msg("Reputer window not available, retrying with exponential backoff")
delay := calculateExponentialBackoffDelaySeconds(node.Wallet.RetryDelay, retryCount)
if DoneOrWait(ctx, delay) {
return ErrorProcessingError, ctx.Err()
}
return ErrorProcessingContinue, nil
default:
log.Info().Int("errorCode", errorCode).Str("msg", infoMsg).Msg("ABCI error, but not special case - regular retry")
}
return triageABCIErrorCode(ctx, errorCode, err, infoMsg, retryCount, retryMax, node)
}
} else {
log.Warn().Str("msg", infoMsg).Msg("Unmatched error format, cannot classify as ABCI error")
}
}

// Check if error is HTTP status code
if statusCode, statusMessage, error := ParseHTTPStatus(err.Error()); error == nil {
log.Warn().Int("statusCode", statusCode).Str("statusMessage", statusMessage).Str("msg", infoMsg).Msg("HTTP status code detected")
if statusCode, statusMessage, err := ParseHTTPStatus(err.Error()); err == nil {
log.Warn().Int("statusCode", statusCode).Str("statusMessage", statusMessage).Str("msg", infoMsg).Msg("HTTP status code detected")

if HTTPStatusCodeCodesSwitchingNode[statusCode] {
log.Warn().
Int("statusCode", statusCode).
Str("msg", infoMsg).
Msg("HTTP status error code detected, switching to next node")
return ErrorProcessingSwitchingNode, ErrTooManyRequests
if processingType, err := triageHTTPStatusError(err, node, infoMsg); err != nil {
return processingType, err
}

if processingType, err := triageStringMatchingError(ctx, err, infoMsg, node); err != nil {
return processingType, err
} else {
return processingType, nil
}
}

// triageABCIErrorCode handles specific ABCI error codes and returns appropriate processing instructions
func triageABCIErrorCode(ctx context.Context, errorCode int, err error, infoMsg string, retryCount, retryMax int64, node *NodeConfig) (string, error) {
switch errorCode {
case int(sdkerrors.ErrMempoolIsFull.ABCICode()):
// Exhaust retries before switching to next node
if retryCount >= retryMax {
log.Info().
Err(err).
Str("msg", infoMsg).
Msg("Mempool is full, switching to next node")
return ErrorProcessingSwitchingNode, ErrFullMempool
} else {
delay := calculateExponentialBackoffDelaySeconds(node.Wallet.RetryDelay, retryCount)
if DoneOrWait(ctx, delay) {
return ErrorProcessingError, ctx.Err()
}
log.Info().
Err(err).
Str("msg", infoMsg).
Msg("Mempool is full, retrying with exponential backoff")
return ErrorProcessingContinue, nil
}
case int(sdkerrors.ErrWrongSequence.ABCICode()), int(sdkerrors.ErrInvalidSequence.ABCICode()):
log.Warn().
Err(err).
Str("msg", infoMsg).
Int64("delay", node.Wallet.AccountSequenceRetryDelay).
Msg("Account sequence mismatch detected, retrying with fixed delay")
// Wait a fixed block-related waiting time
if DoneOrWait(ctx, node.Wallet.AccountSequenceRetryDelay) {
return ErrorProcessingError, ctx.Err()
}
return ErrorProcessingContinue, nil
case int(sdkerrors.ErrInsufficientFee.ABCICode()):
log.Info().
Err(err).
Str("msg", infoMsg).
Msg("Insufficient fees")
return ErrorProcessingFees, nil
case int(feemarkettypes.ErrNoFeeCoins.ABCICode()):
log.Info().
Err(err).
Str("msg", infoMsg).
Msg("No fee coins")
return ErrorProcessingFees, nil
case int(sdkerrors.ErrTxTooLarge.ABCICode()):
return ErrorProcessingError, errorsmod.Wrapf(err, "tx too large")
case int(sdkerrors.ErrTxInMempoolCache.ABCICode()):
return ErrorProcessingError, errorsmod.Wrapf(err, "tx already in mempool cache")
case int(sdkerrors.ErrInvalidChainID.ABCICode()):
return ErrorProcessingError, errorsmod.Wrapf(err, "invalid chain-id")
case int(sdkerrors.ErrTxTimeoutHeight.ABCICode()):
return ErrorProcessingFailure, errorsmod.Wrapf(err, "tx timeout height")
case int(emissions.ErrWorkerNonceWindowNotAvailable.ABCICode()):
log.Warn().
Err(err).
Str("msg", infoMsg).
Msg("Worker window not available, retrying with exponential backoff")
delay := calculateExponentialBackoffDelaySeconds(node.Wallet.RetryDelay, retryCount)
if DoneOrWait(ctx, delay) {
return ErrorProcessingError, ctx.Err()
}
return ErrorProcessingContinue, nil
case int(emissions.ErrReputerNonceWindowNotAvailable.ABCICode()):
log.Warn().
Err(err).
Str("msg", infoMsg).
Msg("Reputer window not available, retrying with exponential backoff")
delay := calculateExponentialBackoffDelaySeconds(node.Wallet.RetryDelay, retryCount)
if DoneOrWait(ctx, delay) {
return ErrorProcessingError, ctx.Err()
}
return ErrorProcessingContinue, nil
default:
log.Info().Int("errorCode", errorCode).Str("msg", infoMsg).Msg("ABCI error, but not special case - regular retry")
return ErrorProcessingError, err
}
}

// NOT ABCI error code: keep on checking for specially handled error types
// Triages error by string matching
func triageStringMatchingError(ctx context.Context, err error, infoMsg string, node *NodeConfig) (string, error) {
if strings.Contains(err.Error(), ErrorMessageAccountSequenceMismatch) {
log.Warn().
Err(err).
Str("rpc", node.RPC).
Str("msg", infoMsg).
Int64("delay", node.Wallet.AccountSequenceRetryDelay).
Msg("Account sequence mismatch detected, re-fetching sequence")
Expand All @@ -167,23 +200,56 @@ func ProcessErrorTx(ctx context.Context, err error, infoMsg string, retryCount i
}
return ErrorProcessingContinue, nil
} else if strings.Contains(err.Error(), ErrorMessageWaitingForNextBlock) {
log.Warn().Err(err).Str("msg", infoMsg).Msg("Tx accepted in mempool, it will be included in the following block(s) - not retrying")
log.Warn().Err(err).Str("rpc", node.RPC).Str("msg", infoMsg).Msg("Tx accepted in mempool, it will be included in the following block(s) - not retrying")
return ErrorProcessingOk, nil
} else if strings.Contains(err.Error(), ErrorMessageDataAlreadySubmitted) || strings.Contains(err.Error(), ErrorMessageCannotUpdateEma) {
log.Warn().Err(err).Str("msg", infoMsg).Msg("Already submitted data for this epoch.")
log.Warn().Err(err).Str("rpc", node.RPC).Str("msg", infoMsg).Msg("Already submitted data for this epoch.")
return ErrorProcessingOk, nil
} else if strings.Contains(err.Error(), ErrorMessageTimeoutHeight) {
log.Warn().Err(err).Str("msg", infoMsg).Msg("Tx failed because of timeout height")
log.Warn().Err(err).Str("rpc", node.RPC).Str("msg", infoMsg).Msg("Tx failed because of timeout height")
return ErrorProcessingFailure, err
} else if strings.Contains(err.Error(), ErrorMessageNotPermittedToSubmitPayload) {
log.Warn().Err(err).Str("msg", infoMsg).Msg("Actor is not permitted to submit payload")
log.Warn().Err(err).Str("rpc", node.RPC).Str("msg", infoMsg).Msg("Actor is not permitted to submit payload")
return ErrorProcessingFailure, err
} else if strings.Contains(err.Error(), ErrorMessageNoInferencesFoundForTopic) {
log.Warn().Err(err).Str("rpc", node.RPC).Str("msg", infoMsg).Msg("No inferences found for topic")
return ErrorProcessingFailure, err
} else if strings.Contains(err.Error(), ErrorMessageNotPermittedToAddStake) {
log.Warn().Err(err).Str("msg", infoMsg).Msg("Actor is not permitted to add stake")
log.Warn().Err(err).Str("rpc", node.RPC).Str("msg", infoMsg).Msg("Actor is not permitted to add stake")
return ErrorProcessingFailure, err
} else if strings.Contains(err.Error(), ErrorMessageReadFlatPanic) || strings.Contains(err.Error(), ErrorMessageReadPerBytePanic) {
log.Warn().Err(err).Str("rpc", node.RPC).Str("msg", infoMsg).Msg("Read panic, switching to next node")
return ErrorProcessingSwitchingNode, ErrReadPanic
} else if strings.Contains(err.Error(), ErrorMessageConnectionRefused) {
log.Warn().Err(err).Str("rpc", node.RPC).Str("msg", infoMsg).Msg("Connection refused, switching to next node")
return ErrorProcessingSwitchingNode, ErrConnectionRefused
}
log.Info().Err(err).Str("rpc", node.RPC).Str("msg", infoMsg).Msg("Unknown error")
return ErrorProcessingError, errorsmod.Wrap(ErrUnexpectedError, err.Error())
}

return ErrorProcessingError, errorsmod.Wrapf(err, "failed to process error")
// triageHTTPStatusError checks if the error contains an HTTP status code and determines if node switching is needed
func triageHTTPStatusError(err error, node *NodeConfig, infoMsg string) (string, error) {
statusCode, statusMessage, parseErr := ParseHTTPStatus(err.Error())
if parseErr == nil {
log.Warn().
Int("statusCode", statusCode).
Str("statusMessage", statusMessage).
Str("msg", infoMsg).
Msg("HTTP status code detected")

// When status code is in the list of codes that trigger node switching, switch to next node without retries
if HTTPStatusCodeCodesSwitchingNode[statusCode] {
log.Warn().
Str("rpc", node.RPC).
Int("statusCode", statusCode).
Str("statusMessage", statusMessage).
Str("msg", infoMsg).
Msg("HTTP status error code detected, switching to next node")
return ErrorProcessingSwitchingNode, HTTPError
}
}
return "", nil
}

// ParseStatus parses a status code and message from a given text string.
Expand All @@ -210,5 +276,9 @@ func ParseHTTPStatus(input string) (int, string, error) {

// Returns true if the error is a switching-node error
func IsErrorSwitchingNode(err error) bool {
return errors.Is(err, ErrTooManyRequests) || errors.Is(err, ErrFullMempool)
return errors.Is(err, HTTPError) ||
errors.Is(err, ErrFullMempool) ||
errors.Is(err, ErrReadPanic) ||
errors.Is(err, ErrConnectionRefused) ||
errors.Is(err, ErrUnexpectedError)
}
Loading

0 comments on commit a4b1362

Please sign in to comment.