Skip to content

Commit

Permalink
Diego/proto 2804 more idiomatic buildcommit fns error handling (#77)
Browse files Browse the repository at this point in the history
<!-- < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < ☺
v           ✰  Thanks for creating a PR! You're awesome! ✰
v Please note that maintainers will only review those PRs with a
completed PR template.
☺ > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >  -->

## Purpose of Changes and their Description
* idiomatic error-only return
* Idiomatic error logs
* Removed duplicated logging 
* Aligning worker/reputer messages
* Extensive use of errorsmod


## Link(s) to Ticket(s) or Issue(s) resolved by this PR
PROTO-2804

## Are these changes tested and documented?

- [X] If tested, please describe how. If not, why tests are not needed.
-- tested locally against testnet
- [X] If documented, please describe where. If not, describe why docs
are not needed. -- no need, just cosmetic/idiomatic changes
- [x] Added to `Unreleased` section of `CHANGELOG.md`?
  • Loading branch information
xmariachi authored Nov 6, 2024
2 parents bd1c0d9 + eb2debc commit 80e525b
Show file tree
Hide file tree
Showing 5 changed files with 47 additions and 66 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

* [#74](https://github.com/allora-network/allora-offchain-node/pull/74) Improve logging
* [#76](https://github.com/allora-network/allora-offchain-node/pull/76) Account sequence mismatch using expected number + other error handling improvements
* [#77](https://github.com/allora-network/allora-offchain-node/pull/77) More idiomatic buildcommit functions, use of errorsmod, error handling + duplicated error logs

### Security

Expand Down
5 changes: 3 additions & 2 deletions lib/repo_tx_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func processError(err error, infoMsg string, retryCount int64, node *NodeConfig)
case int(sdkerrors.ErrInvalidChainID.ABCICode()):
return ERROR_PROCESSING_ERROR, errorsmod.Wrapf(err, "invalid chain-id")
default:
log.Info().Str("msg", infoMsg).Msg("ABCI error, but not special case - regular retry")
log.Info().Int("errorCode", errorCode).Str("msg", infoMsg).Msg("ABCI error, but not special case - regular retry")
}
}
} else {
Expand All @@ -97,9 +97,10 @@ func processError(err error, infoMsg string, retryCount int64, node *NodeConfig)
log.Warn().Str("msg", infoMsg).Msg("Tx accepted in mempool, it will be included in the following block(s) - not retrying")
return ERROR_PROCESSING_OK, nil
} else if strings.Contains(err.Error(), ERROR_MESSAGE_DATA_ALREADY_SUBMITTED) || strings.Contains(err.Error(), ERROR_MESSAGE_CANNOT_UPDATE_EMA) {
log.Warn().Err(err).Str("msg", infoMsg).Msg("Already sent data for this epoch.")
log.Warn().Err(err).Str("msg", infoMsg).Msg("Already submitted data for this epoch.")
return ERROR_PROCESSING_OK, nil
}

return ERROR_PROCESSING_ERROR, errorsmod.Wrapf(err, "failed to process error")
}

Expand Down
60 changes: 23 additions & 37 deletions usecase/build_commit_reputer_payload.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"errors"
"fmt"

errorsmod "cosmossdk.io/errors"
alloraMath "github.com/allora-network/allora-chain/math"
emissionstypes "github.com/allora-network/allora-chain/x/emissions/types"
"github.com/cosmos/cosmos-sdk/types/tx/signing"
Expand All @@ -17,13 +18,12 @@ import (
// Get the reputer's values at the block from the chain
// Compute loss bundle with the reputer provided Loss function and ground truth
// sign and commit to chain
func (suite *UseCaseSuite) BuildCommitReputerPayload(reputer lib.ReputerConfig, nonce lib.BlockHeight) (bool, error) {
func (suite *UseCaseSuite) BuildCommitReputerPayload(reputer lib.ReputerConfig, nonce lib.BlockHeight) error {
ctx := context.Background()

valueBundle, err := suite.Node.GetReputerValuesAtBlock(reputer.TopicId, nonce)
if err != nil {
log.Error().Err(err).Uint64("topicId", reputer.TopicId).Msg("Failed to get reputer values at block")
return false, err
return errorsmod.Wrapf(err, "error getting reputer values, topic: %d, blockHeight: %d", reputer.TopicId, nonce)
}
valueBundle.ReputerRequestNonce = &emissionstypes.ReputerRequestNonce{
ReputerNonce: &emissionstypes.Nonce{BlockHeight: nonce},
Expand All @@ -32,26 +32,23 @@ func (suite *UseCaseSuite) BuildCommitReputerPayload(reputer lib.ReputerConfig,

sourceTruth, err := reputer.GroundTruthEntrypoint.GroundTruth(reputer, nonce)
if err != nil {
log.Error().Err(err).Uint64("topicId", reputer.TopicId).Msg("Failed to get source truth from reputer")
return false, err
return errorsmod.Wrapf(err, "error getting source truth from reputer, topicId: %d, blockHeight: %d", reputer.TopicId, nonce)
}
suite.Metrics.IncrementMetricsCounter(lib.TruthRequestCount, suite.Node.Chain.Address, reputer.TopicId)

lossBundle, err := suite.ComputeLossBundle(sourceTruth, valueBundle, reputer)
if err != nil {
log.Error().Err(err).Uint64("topicId", reputer.TopicId).Msg("Failed to compute loss bundle")
return false, err
return errorsmod.Wrapf(err, "error computing loss bundle, topic: %d, blockHeight: %d", reputer.TopicId, nonce)
}
suite.Metrics.IncrementMetricsCounter(lib.ReputerDataBuildCount, suite.Node.Chain.Address, reputer.TopicId)

signedValueBundle, err := suite.SignReputerValueBundle(&lossBundle)
if err != nil {
log.Error().Err(err).Uint64("topicId", reputer.TopicId).Msg("Failed to sign reputer value bundle")
return false, err
return errorsmod.Wrapf(err, "error signing reputer value bundle, topic: %d, blockHeight: %d", reputer.TopicId, nonce)
}

if err := signedValueBundle.Validate(); err != nil {
return false, err
return errorsmod.Wrapf(err, "error validating reputer value bundle, topic: %d, blockHeight: %d", reputer.TopicId, nonce)
}

req := &emissionstypes.InsertReputerPayloadRequest{
Expand All @@ -67,15 +64,14 @@ func (suite *UseCaseSuite) BuildCommitReputerPayload(reputer lib.ReputerConfig,
if suite.Node.Wallet.SubmitTx {
_, err = suite.Node.SendDataWithRetry(ctx, req, "Send Reputer Data to chain")
if err != nil {
log.Error().Err(err).Uint64("topicId", reputer.TopicId).Msgf("Error sending Reputer Data to chain: %s", err)
return false, err
return errorsmod.Wrapf(err, "error sending Reputer Data to chain, topic: %d, blockHeight: %d", reputer.TopicId, nonce)
}
suite.Metrics.IncrementMetricsCounter(lib.ReputerChainSubmissionCount, suite.Node.Chain.Address, reputer.TopicId)
} else {
log.Info().Uint64("topicId", reputer.TopicId).Msg("SubmitTx=false; Skipping sending Reputer Data to chain")
}

return true, nil
return nil
}

func (suite *UseCaseSuite) ComputeLossBundle(sourceTruth string, vb *emissionstypes.ValueBundle, reputer lib.ReputerConfig) (emissionstypes.ValueBundle, error) {
Expand All @@ -102,8 +98,7 @@ func (suite *UseCaseSuite) ComputeLossBundle(sourceTruth string, vb *emissionsty
var err error
is_never_negative, err = reputer.LossFunctionEntrypoint.IsLossFunctionNeverNegative(reputer, lossMethodOptions)
if err != nil {
log.Error().Err(err).Uint64("topicId", reputer.TopicId).Msg("Failed to determine if loss function is never negative")
return emissionstypes.ValueBundle{}, err
return emissionstypes.ValueBundle{}, errorsmod.Wrapf(err, "failed to determine if loss function is never negative")
}
// cache the result
reputer.LossFunctionParameters.IsNeverNegative = &is_never_negative
Expand All @@ -119,40 +114,38 @@ func (suite *UseCaseSuite) ComputeLossBundle(sourceTruth string, vb *emissionsty
computeLoss := func(value alloraMath.Dec, description string) (alloraMath.Dec, error) {
lossStr, err := reputer.LossFunctionEntrypoint.LossFunction(reputer, sourceTruth, value.String(), lossMethodOptions)
if err != nil {
return alloraMath.Dec{}, fmt.Errorf("error computing loss for %s: %w", description, err)
return alloraMath.Dec{}, errorsmod.Wrapf(err, "error computing loss for %s", description)
}

loss, err := alloraMath.NewDecFromString(lossStr)
if err != nil {
return alloraMath.Dec{}, fmt.Errorf("error parsing loss value for %s: %w", description, err)
return alloraMath.Dec{}, errorsmod.Wrapf(err, "error parsing loss value for %s", description)
}

if is_never_negative {
loss, err = alloraMath.Log10(loss)
if err != nil {
return alloraMath.Dec{}, fmt.Errorf("error Log10 for %s: %w", description, err)
return alloraMath.Dec{}, errorsmod.Wrapf(err, "error Log10 for %s", description)
}
}

if err := emissionstypes.ValidateDec(loss); err != nil {
return alloraMath.Dec{}, fmt.Errorf("invalid loss value for %s: %w", description, err)
return alloraMath.Dec{}, errorsmod.Wrapf(err, "invalid loss value for %s", description)
}

return loss, nil
}

// Combined Value
if combinedLoss, err := computeLoss(vb.CombinedValue, "combined value"); err != nil {
log.Error().Err(err).Msg("Error computing loss for combined value")
return emissionstypes.ValueBundle{}, err
return emissionstypes.ValueBundle{}, errorsmod.Wrapf(err, "error computing loss for combined value")
} else {
losses.CombinedValue = combinedLoss
}

// Naive Value
if naiveLoss, err := computeLoss(vb.NaiveValue, "naive value"); err != nil {
log.Error().Err(err).Msg("Error computing loss for naive value")
return emissionstypes.ValueBundle{}, err
return emissionstypes.ValueBundle{}, errorsmod.Wrapf(err, "error computing loss for naive value")
} else {
losses.NaiveValue = naiveLoss
}
Expand All @@ -161,8 +154,7 @@ func (suite *UseCaseSuite) ComputeLossBundle(sourceTruth string, vb *emissionsty
losses.InfererValues = make([]*emissionstypes.WorkerAttributedValue, len(vb.InfererValues))
for i, val := range vb.InfererValues {
if loss, err := computeLoss(val.Value, fmt.Sprintf("inferer value %d", i)); err != nil {
log.Error().Err(err).Msg("Error computing loss for inferer value")
return emissionstypes.ValueBundle{}, err
return emissionstypes.ValueBundle{}, errorsmod.Wrapf(err, "error computing loss for inferer value")
} else {
losses.InfererValues[i] = &emissionstypes.WorkerAttributedValue{Worker: val.Worker, Value: loss}
}
Expand All @@ -172,8 +164,7 @@ func (suite *UseCaseSuite) ComputeLossBundle(sourceTruth string, vb *emissionsty
losses.ForecasterValues = make([]*emissionstypes.WorkerAttributedValue, len(vb.ForecasterValues))
for i, val := range vb.ForecasterValues {
if loss, err := computeLoss(val.Value, fmt.Sprintf("forecaster value %d", i)); err != nil {
log.Error().Err(err).Msg("Error computing loss for forecaster value")
return emissionstypes.ValueBundle{}, err
return emissionstypes.ValueBundle{}, errorsmod.Wrapf(err, "error computing loss for forecaster value")
} else {
losses.ForecasterValues[i] = &emissionstypes.WorkerAttributedValue{Worker: val.Worker, Value: loss}
}
Expand All @@ -183,8 +174,7 @@ func (suite *UseCaseSuite) ComputeLossBundle(sourceTruth string, vb *emissionsty
losses.OneOutInfererValues = make([]*emissionstypes.WithheldWorkerAttributedValue, len(vb.OneOutInfererValues))
for i, val := range vb.OneOutInfererValues {
if loss, err := computeLoss(val.Value, fmt.Sprintf("one out inferer value %d", i)); err != nil {
log.Error().Err(err).Msg("Error computing loss for one out inferer value")
return emissionstypes.ValueBundle{}, err
return emissionstypes.ValueBundle{}, errorsmod.Wrapf(err, "error computing loss for one-out inferer value")
} else {
losses.OneOutInfererValues[i] = &emissionstypes.WithheldWorkerAttributedValue{Worker: val.Worker, Value: loss}
}
Expand All @@ -194,8 +184,7 @@ func (suite *UseCaseSuite) ComputeLossBundle(sourceTruth string, vb *emissionsty
losses.OneOutForecasterValues = make([]*emissionstypes.WithheldWorkerAttributedValue, len(vb.OneOutForecasterValues))
for i, val := range vb.OneOutForecasterValues {
if loss, err := computeLoss(val.Value, fmt.Sprintf("one out forecaster value %d", i)); err != nil {
log.Error().Err(err).Msg("Error computing loss for one out forecaster value")
return emissionstypes.ValueBundle{}, err
return emissionstypes.ValueBundle{}, errorsmod.Wrapf(err, "error computing loss for one-out forecaster value")
} else {
losses.OneOutForecasterValues[i] = &emissionstypes.WithheldWorkerAttributedValue{Worker: val.Worker, Value: loss}
}
Expand All @@ -205,8 +194,7 @@ func (suite *UseCaseSuite) ComputeLossBundle(sourceTruth string, vb *emissionsty
losses.OneInForecasterValues = make([]*emissionstypes.WorkerAttributedValue, len(vb.OneInForecasterValues))
for i, val := range vb.OneInForecasterValues {
if loss, err := computeLoss(val.Value, fmt.Sprintf("one in forecaster value %d", i)); err != nil {
log.Error().Err(err).Msg("Error computing loss for one in forecaster value")
return emissionstypes.ValueBundle{}, err
return emissionstypes.ValueBundle{}, errorsmod.Wrapf(err, "error computing loss for one-in forecaster value")
} else {
losses.OneInForecasterValues[i] = &emissionstypes.WorkerAttributedValue{Worker: val.Worker, Value: loss}
}
Expand All @@ -219,14 +207,12 @@ func (suite *UseCaseSuite) SignReputerValueBundle(valueBundle *emissionstypes.Va
protoBytesIn := make([]byte, 0) // Create a byte slice with initial length 0 and capacity greater than 0
protoBytesIn, err := valueBundle.XXX_Marshal(protoBytesIn, true)
if err != nil {
log.Error().Err(err).Msg("Error Marshalling valueBundle")
return &emissionstypes.ReputerValueBundle{}, err
return &emissionstypes.ReputerValueBundle{}, errorsmod.Wrapf(err, "error marshalling valueBundle")
}
sig, pk, err := suite.Node.Chain.Client.Context().Keyring.Sign(suite.Node.Chain.Account.Name, protoBytesIn, signing.SignMode_SIGN_MODE_DIRECT)
pkStr := hex.EncodeToString(pk.Bytes())
if err != nil {
log.Error().Err(err).Msg("Error signing valueBundle")
return &emissionstypes.ReputerValueBundle{}, err
return &emissionstypes.ReputerValueBundle{}, errorsmod.Wrapf(err, "error signing valueBundle")
}

reputerValueBundle := &emissionstypes.ReputerValueBundle{
Expand Down
39 changes: 16 additions & 23 deletions usecase/build_commit_worker_payload.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,20 +5,21 @@ import (
"context"
"encoding/hex"
"encoding/json"
"errors"

errorsmod "cosmossdk.io/errors"
"github.com/rs/zerolog/log"

alloraMath "github.com/allora-network/allora-chain/math"
emissionstypes "github.com/allora-network/allora-chain/x/emissions/types"
"github.com/cosmos/cosmos-sdk/types/tx/signing"
)

func (suite *UseCaseSuite) BuildCommitWorkerPayload(worker lib.WorkerConfig, nonce *emissionstypes.Nonce) (bool, error) {
func (suite *UseCaseSuite) BuildCommitWorkerPayload(worker lib.WorkerConfig, nonce *emissionstypes.Nonce) error {
ctx := context.Background()

if worker.InferenceEntrypoint == nil && worker.ForecastEntrypoint == nil {
log.Error().Msg("Worker has no valid Inference or Forecast entrypoints")
return false, nil
return errors.New("Worker has no valid Inference or Forecast entrypoints")
}

var workerResponse = lib.WorkerResponse{
Expand All @@ -28,8 +29,7 @@ func (suite *UseCaseSuite) BuildCommitWorkerPayload(worker lib.WorkerConfig, non
if worker.InferenceEntrypoint != nil {
inference, err := worker.InferenceEntrypoint.CalcInference(worker, nonce.BlockHeight)
if err != nil {
log.Error().Err(err).Str("worker", worker.InferenceEntrypoint.Name()).Msg("Error computing inference for worker")
return false, err
return errorsmod.Wrapf(err, "Error computing inference for worker, topicId: %d, blockHeight: %d", worker.TopicId, nonce.BlockHeight)
}
workerResponse.InfererValue = inference
suite.Metrics.IncrementMetricsCounter(lib.InferenceRequestCount, suite.Node.Chain.Address, worker.TopicId)
Expand All @@ -39,30 +39,27 @@ func (suite *UseCaseSuite) BuildCommitWorkerPayload(worker lib.WorkerConfig, non
forecasts := []lib.NodeValue{}
forecasts, err := worker.ForecastEntrypoint.CalcForecast(worker, nonce.BlockHeight)
if err != nil {
log.Error().Err(err).Str("worker", worker.ForecastEntrypoint.Name()).Msg("Error computing forecast for worker")
return false, err
return errorsmod.Wrapf(err, "Error computing forecast for worker, topicId: %d, blockHeight: %d", worker.TopicId, nonce.BlockHeight)
}
workerResponse.ForecasterValues = forecasts
suite.Metrics.IncrementMetricsCounter(lib.ForecastRequestCount, suite.Node.Chain.Address, worker.TopicId)
}

workerPayload, err := suite.BuildWorkerPayload(workerResponse, nonce.BlockHeight)
if err != nil {
log.Error().Err(err).Msg("Error building workerPayload")
return false, err
return errorsmod.Wrapf(err, "Error building worker payload, topicId: %d, blockHeight: %d", worker.TopicId, nonce.BlockHeight)
}
suite.Metrics.IncrementMetricsCounter(lib.WorkerDataBuildCount, suite.Node.Chain.Address, worker.TopicId)

workerDataBundle, err := suite.SignWorkerPayload(&workerPayload)
if err != nil {
log.Error().Err(err).Msg("Error signing workerPayload")
return false, err
return errorsmod.Wrapf(err, "Error signing worker payload, topicId: %d, blockHeight: %d", worker.TopicId, nonce.BlockHeight)
}
workerDataBundle.Nonce = nonce
workerDataBundle.TopicId = worker.TopicId

if err := workerDataBundle.Validate(); err != nil {
return false, err
return errorsmod.Wrapf(err, "Error validating worker data bundle, topicId: %d, blockHeight: %d", worker.TopicId, nonce.BlockHeight)
}

req := &emissionstypes.InsertWorkerPayloadRequest{
Expand All @@ -71,21 +68,21 @@ func (suite *UseCaseSuite) BuildCommitWorkerPayload(worker lib.WorkerConfig, non
}
reqJSON, err := json.Marshal(req)
if err != nil {
log.Error().Err(err).Msg("Error marshaling InsertWorkerPayload to print Msg as JSON")
log.Warn().Err(err).Msg("Error marshaling InsertWorkerPayload to print Msg as JSON")
} else {
log.Info().Str("req", string(reqJSON)).Msg("Sending InsertWorkerPayload to chain")
}

if suite.Node.Wallet.SubmitTx {
_, err = suite.Node.SendDataWithRetry(ctx, req, "Send Worker Data to chain")
if err != nil {
return false, err
return errorsmod.Wrapf(err, "Error sending Worker Data to chain, topicId: %d, blockHeight: %d", worker.TopicId, nonce.BlockHeight)
}
suite.Metrics.IncrementMetricsCounter(lib.WorkerChainSubmissionCount, suite.Node.Chain.Address, worker.TopicId)
} else {
log.Info().Uint64("topicId", worker.TopicId).Msg("SubmitTx=false; Skipping sending Worker Data to chain")
}
return true, nil
return nil
}

func (suite *UseCaseSuite) BuildWorkerPayload(workerResponse lib.WorkerResponse, nonce emissionstypes.BlockHeight) (emissionstypes.InferenceForecastBundle, error) {
Expand All @@ -95,8 +92,7 @@ func (suite *UseCaseSuite) BuildWorkerPayload(workerResponse lib.WorkerResponse,
if workerResponse.InfererValue != "" {
infererValue, err := alloraMath.NewDecFromString(workerResponse.InfererValue)
if err != nil {
log.Error().Err(err).Msg("Error converting infererValue to Dec")
return emissionstypes.InferenceForecastBundle{}, err
return emissionstypes.InferenceForecastBundle{}, errorsmod.Wrapf(err, "error converting infererValue to Dec")
}
builtInference := &emissionstypes.Inference{
TopicId: workerResponse.TopicId,
Expand All @@ -112,8 +108,7 @@ func (suite *UseCaseSuite) BuildWorkerPayload(workerResponse lib.WorkerResponse,
for _, val := range workerResponse.ForecasterValues {
decVal, err := alloraMath.NewDecFromString(val.Value)
if err != nil {
log.Error().Err(err).Msg("Error converting forecasterValue to Dec")
return emissionstypes.InferenceForecastBundle{}, err
return emissionstypes.InferenceForecastBundle{}, errorsmod.Wrapf(err, "error converting forecasterValue to Dec")
}
forecasterElements = append(forecasterElements, &emissionstypes.ForecastElement{
Inferer: val.Worker,
Expand All @@ -139,14 +134,12 @@ func (suite *UseCaseSuite) SignWorkerPayload(workerPayload *emissionstypes.Infer
protoBytesIn := make([]byte, 0) // Create a byte slice with initial length 0 and capacity greater than 0
protoBytesIn, err := workerPayload.XXX_Marshal(protoBytesIn, true)
if err != nil {
log.Error().Err(err).Msg("Error Marshalling workerPayload")
return &emissionstypes.WorkerDataBundle{}, err
return &emissionstypes.WorkerDataBundle{}, errorsmod.Wrapf(err, "error marshalling workerPayload")
}
sig, pk, err := suite.Node.Chain.Client.Context().Keyring.Sign(suite.Node.Chain.Account.Name, protoBytesIn, signing.SignMode_SIGN_MODE_DIRECT)
pkStr := hex.EncodeToString(pk.Bytes())
if err != nil {
log.Error().Err(err).Msg("Error signing the InferenceForecastsBundle message")
return &emissionstypes.WorkerDataBundle{}, err
return &emissionstypes.WorkerDataBundle{}, errorsmod.Wrapf(err, "error signing the InferenceForecastsBundle message")
}
// Create workerDataBundle with signature
workerDataBundle := &emissionstypes.WorkerDataBundle{
Expand Down
Loading

0 comments on commit 80e525b

Please sign in to comment.