Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add nonce validation after broadcast for Hedera #13957

Merged
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/friendly-impalas-sniff.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"chainlink": minor
---

Added nonce validation immediately after broadcast for Hedera #internal
92 changes: 74 additions & 18 deletions common/txmgr/broadcaster.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,13 @@ const (
// TransmitCheckTimeout controls the maximum amount of time that will be
// spent on the transmit check.
TransmitCheckTimeout = 2 * time.Second

// maxBroadcastRetries is the number of times a transaction broadcast is retried when the sequence fails to increment on Hedera
maxHederaBroadcastRetries = 3

// hederaChainType is the string representation of the Hedera chain type
// Temporary solution until the Broadcaster is moved to the EVM code base
hederaChainType = "hedera"
)

var (
Expand Down Expand Up @@ -114,6 +121,7 @@ type Broadcaster[
sequenceTracker txmgrtypes.SequenceTracker[ADDR, SEQ]
resumeCallback ResumeCallback
chainID CHAIN_ID
chainType string
config txmgrtypes.BroadcasterChainConfig
feeConfig txmgrtypes.BroadcasterFeeConfig
txConfig txmgrtypes.BroadcasterTransactionsConfig
Expand Down Expand Up @@ -163,6 +171,7 @@ func NewBroadcaster[
lggr logger.Logger,
checkerFactory TransmitCheckerFactory[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE],
autoSyncSequence bool,
chainType string,
) *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE] {
lggr = logger.Named(lggr, "Broadcaster")
b := &Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]{
Expand All @@ -171,6 +180,7 @@ func NewBroadcaster[
client: client,
TxAttemptBuilder: txAttemptBuilder,
chainID: client.ConfiguredChainID(),
chainType: chainType,
config: config,
feeConfig: feeConfig,
txConfig: txConfig,
Expand Down Expand Up @@ -411,7 +421,7 @@ func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) hand
return fmt.Errorf("handleAnyInProgressTx failed: %w", err), true
}
if etx != nil {
if err, retryable := eb.handleInProgressTx(ctx, *etx, etx.TxAttempts[0], etx.CreatedAt); err != nil {
if err, retryable := eb.handleInProgressTx(ctx, *etx, etx.TxAttempts[0], etx.CreatedAt, 0); err != nil {
return fmt.Errorf("handleAnyInProgressTx failed: %w", err), retryable
}
}
Expand Down Expand Up @@ -464,12 +474,12 @@ func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) hand
return fmt.Errorf("processUnstartedTxs failed on UpdateTxUnstartedToInProgress: %w", err), true
}

return eb.handleInProgressTx(ctx, *etx, attempt, time.Now())
return eb.handleInProgressTx(ctx, *etx, attempt, time.Now(), 0)
}

// There can be at most one in_progress transaction per address.
// Here we complete the job that we didn't finish last time.
func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) handleInProgressTx(ctx context.Context, etx txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], attempt txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], initialBroadcastAt time.Time) (error, bool) {
func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) handleInProgressTx(ctx context.Context, etx txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], attempt txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], initialBroadcastAt time.Time, retryCount int) (error, bool) {
if etx.State != TxInProgress {
return fmt.Errorf("invariant violation: expected transaction %v to be in_progress, it was %s", etx.ID, etx.State), false
}
Expand All @@ -478,6 +488,11 @@ func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) hand
lgr.Infow("Sending transaction", "txAttemptID", attempt.ID, "txHash", attempt.Hash, "meta", etx.Meta, "feeLimit", attempt.ChainSpecificFeeLimit, "callerProvidedFeeLimit", etx.FeeLimit, "attempt", attempt, "etx", etx)
errType, err := eb.client.SendTransactionReturnCode(ctx, etx, attempt, lgr)

// The validation below is only applicable to Hedera because it has instant finality and a unique sequence behavior
if eb.chainType == hederaChainType {
errType, err = eb.validateOnChainSequence(ctx, lgr, errType, err, etx, retryCount)
}

if errType != client.Fatal {
etx.InitialBroadcastAt = &initialBroadcastAt
etx.BroadcastAt = &initialBroadcastAt
Expand Down Expand Up @@ -538,7 +553,7 @@ func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) hand
eb.sequenceTracker.GenerateNextSequence(etx.FromAddress, *etx.Sequence)
return err, true
case client.Underpriced:
return eb.tryAgainBumpingGas(ctx, lgr, err, etx, attempt, initialBroadcastAt)
return eb.tryAgainBumpingGas(ctx, lgr, err, etx, attempt, initialBroadcastAt, retryCount+1)
case client.InsufficientFunds:
// NOTE: This bails out of the entire cycle and essentially "blocks" on
// any transaction that gets insufficient_funds. This is OK if a
Expand Down Expand Up @@ -600,6 +615,44 @@ func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) hand
}
}

func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) validateOnChainSequence(ctx context.Context, lgr logger.SugaredLogger, errType client.SendTxReturnCode, err error, etx txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], retryCount int) (client.SendTxReturnCode, error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we rename this function to imply it's only meant for Hedara, or it's better to be generic ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's alright either way. I considered that actually but opted to leave it implicit. I'm not against changing it though if others agree

// Only check if sequence was incremented if broadcast was successful, otherwise return the existing err type
if errType != client.Successful {
return errType, err
}
// Transaction sequence cannot be nil here since a sequence is required to broadcast
txSeq := *etx.Sequence
// Retrieve the latest mined sequence from on-chain
nextSeqOnChain, err := eb.client.SequenceAt(ctx, etx.FromAddress, nil)
if err != nil {
return errType, err
}

// Check that the transaction count has incremented on-chain to include the broadcasted transaction
// Insufficient transaction fee is a common scenario in which the sequence is not incremented by the chain even though we got a successful response
// If the sequence failed to increment and hasn't reached the max retries, return the Underpriced error to try again with a bumped attempt
if nextSeqOnChain.Int64() == txSeq.Int64() && retryCount < maxHederaBroadcastRetries {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: in theory if nextSeqOnChain is < txSeq we can mark tx as Successful

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nextSeqOnChain < txSeq I think would be an indicator of a nonce gap which wouldn't return a successful errType so we would have exited early. But for a belts and braces approach, I added the condition just in case and mark the tx as Fatal in this case.

Copy link
Contributor

@huangzhen1997 huangzhen1997 Jul 30, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe @dhaidashenko you meant nextSeqOnChain > txSeq?

I agree that nextSeqOnChain < txSeq would be incorrect and caught by SendTransactionReturnCode early, and probably due to a reorg in this case ? otherwise not sure about how this is possible.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah if that's the case then classifying that as Successful is ok because we process TransactionAlreadyKnown as the same.
Also unsure if nextSeqOnChain < txSeq is a scenario we could realistically hit for this chain. I believe re-orgs aren't possible because of instant finality. But there could be some obscure scenario so this check should help.

Copy link
Collaborator

@dimriou dimriou Jul 31, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Retryable is also another option for the nextSeqOnChain < txSeq case. There are two scenarios:

  • We have a nonce gap: fatal will alert us sooner, but we might miss reorg cases and it won't recover anyway
  • There was a reorg or RPC was lagging: retryable will handle this, although I found out that Hedera doesn't handle nonce too high errors gracefully, so it will throw a bunch of errors during this process.

return client.Underpriced, nil
}

// If the transaction reaches the retry limit and fails to get included, mark it as fatally errored
// Some unknown error other than insufficient tx fee could be the cause
if nextSeqOnChain.Int64() == txSeq.Int64() && retryCount >= maxHederaBroadcastRetries {
err := fmt.Errorf("failed to broadcast transaction on %s after %d retries", hederaChainType, retryCount)
lgr.Error(err.Error())
return client.Fatal, err
}

// Belts and braces approach to detect and handle sqeuence gaps if the broadcast is considered successful
if nextSeqOnChain.Int64() < txSeq.Int64() {
err := fmt.Errorf("next expected sequence on-chain (%s) is less than the broadcasted transaction's sequence (%s)", nextSeqOnChain.String(), txSeq.String())
lgr.Criticalw("Sequence gap has been detected and needs to be filled", "error", err)
return client.Fatal, err
}

return client.Successful, nil
}

// Finds next transaction in the queue, assigns a sequence, and moves it to "in_progress" state ready for broadcast.
// Returns nil if no transactions are in queue
func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) nextUnstartedTransactionWithSequence(fromAddress ADDR) (*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], error) {
Expand All @@ -622,23 +675,26 @@ func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) next
return etx, nil
}

func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) tryAgainBumpingGas(ctx context.Context, lgr logger.Logger, txError error, etx txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], attempt txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], initialBroadcastAt time.Time) (err error, retryable bool) {
logger.With(lgr,
"sendError", txError,
"attemptFee", attempt.TxFee,
"maxGasPriceConfig", eb.feeConfig.MaxFeePrice(),
).Errorf("attempt fee %v was rejected by the node for being too low. "+
"Node returned: '%s'. "+
"Will bump and retry. ACTION REQUIRED: This is a configuration error. "+
"Consider increasing FeeEstimator.PriceDefault (current value: %s)",
attempt.TxFee, txError.Error(), eb.feeConfig.FeePriceDefault())
func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) tryAgainBumpingGas(ctx context.Context, lgr logger.Logger, txError error, etx txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], attempt txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], initialBroadcastAt time.Time, retry int) (err error, retryable bool) {
// This log error is not applicable to Hedera since the action required would not be needed for its gas estimator
if eb.chainType != hederaChainType {
logger.With(lgr,
"sendError", txError,
"attemptFee", attempt.TxFee,
"maxGasPriceConfig", eb.feeConfig.MaxFeePrice(),
).Errorf("attempt fee %v was rejected by the node for being too low. "+
"Node returned: '%s'. "+
"Will bump and retry. ACTION REQUIRED: This is a configuration error. "+
"Consider increasing FeeEstimator.PriceDefault (current value: %s)",
attempt.TxFee, txError.Error(), eb.feeConfig.FeePriceDefault())
}

replacementAttempt, bumpedFee, bumpedFeeLimit, retryable, err := eb.NewBumpTxAttempt(ctx, etx, attempt, nil, lgr)
if err != nil {
return fmt.Errorf("tryAgainBumpFee failed: %w", err), retryable
}

return eb.saveTryAgainAttempt(ctx, lgr, etx, attempt, replacementAttempt, initialBroadcastAt, bumpedFee, bumpedFeeLimit)
return eb.saveTryAgainAttempt(ctx, lgr, etx, attempt, replacementAttempt, initialBroadcastAt, bumpedFee, bumpedFeeLimit, retry)
}

func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) tryAgainWithNewEstimation(ctx context.Context, lgr logger.Logger, txError error, etx txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], attempt txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], initialBroadcastAt time.Time) (err error, retryable bool) {
Expand All @@ -655,15 +711,15 @@ func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) tryA
lgr.Warnw("L2 rejected transaction due to incorrect fee, re-estimated and will try again",
"etxID", etx.ID, "err", err, "newGasPrice", fee, "newGasLimit", feeLimit)

return eb.saveTryAgainAttempt(ctx, lgr, etx, attempt, replacementAttempt, initialBroadcastAt, fee, feeLimit)
return eb.saveTryAgainAttempt(ctx, lgr, etx, attempt, replacementAttempt, initialBroadcastAt, fee, feeLimit, 0)
}

func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) saveTryAgainAttempt(ctx context.Context, lgr logger.Logger, etx txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], attempt txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], replacementAttempt txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], initialBroadcastAt time.Time, newFee FEE, newFeeLimit uint64) (err error, retyrable bool) {
func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) saveTryAgainAttempt(ctx context.Context, lgr logger.Logger, etx txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], attempt txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], replacementAttempt txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], initialBroadcastAt time.Time, newFee FEE, newFeeLimit uint64, retry int) (err error, retyrable bool) {
if err = eb.txStore.SaveReplacementInProgressAttempt(ctx, attempt, &replacementAttempt); err != nil {
return fmt.Errorf("tryAgainWithNewFee failed: %w", err), true
}
lgr.Debugw("Bumped fee on initial send", "oldFee", attempt.TxFee.String(), "newFee", newFee.String(), "newFeeLimit", newFeeLimit)
return eb.handleInProgressTx(ctx, etx, replacementAttempt, initialBroadcastAt)
return eb.handleInProgressTx(ctx, etx, replacementAttempt, initialBroadcastAt, retry)
}

func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) saveFatallyErroredTransaction(lgr logger.Logger, etx *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) error {
Expand Down
12 changes: 8 additions & 4 deletions common/txmgr/confirmer.go
Original file line number Diff line number Diff line change
Expand Up @@ -664,11 +664,15 @@ func (ec *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) bat
}

if receipt.GetStatus() == 0 {
rpcError, errExtract := ec.client.CallContract(ctx, attempt, receipt.GetBlockNumber())
if errExtract == nil {
l.Warnw("transaction reverted on-chain", "hash", receipt.GetTxHash(), "rpcError", rpcError.String())
if receipt.GetRevertReason() != nil {
l.Warnw("transaction reverted on-chain", "hash", receipt.GetTxHash(), "revertReason", *receipt.GetRevertReason())
} else {
l.Warnw("transaction reverted on-chain unable to extract revert reason", "hash", receipt.GetTxHash(), "err", err)
rpcError, errExtract := ec.client.CallContract(ctx, attempt, receipt.GetBlockNumber())
if errExtract == nil {
l.Warnw("transaction reverted on-chain", "hash", receipt.GetTxHash(), "rpcError", rpcError.String())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: It might make sense to set this reason back on the receipt struct right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it makes sense to leave this as a log instead of adding it to the Receipt struct. Since the TXM is indifferent to a tx reverting, I don't think we should try to store this to avoid adding another column.

} else {
l.Warnw("transaction reverted on-chain unable to extract revert reason", "hash", receipt.GetTxHash(), "err", err)
}
}
// This might increment more than once e.g. in case of re-orgs going back and forth we might re-fetch the same receipt
promRevertedTxCount.WithLabelValues(ec.chainID.String()).Add(1)
Expand Down
1 change: 1 addition & 0 deletions common/txmgr/types/tx_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,4 +132,5 @@ type ChainReceipt[TX_HASH, BLOCK_HASH types.Hashable] interface {
GetFeeUsed() uint64
GetTransactionIndex() uint
GetBlockHash() BLOCK_HASH
GetRevertReason() *string
}
6 changes: 5 additions & 1 deletion core/chains/evm/config/chaintype/chaintype.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ const (
ChainArbitrum ChainType = "arbitrum"
ChainCelo ChainType = "celo"
ChainGnosis ChainType = "gnosis"
ChainHedera ChainType = "hedera"
ChainKroma ChainType = "kroma"
ChainMetis ChainType = "metis"
ChainOptimismBedrock ChainType = "optimismBedrock"
Expand All @@ -35,7 +36,7 @@ func (c ChainType) IsL2() bool {

func (c ChainType) IsValid() bool {
switch c {
case "", ChainArbitrum, ChainCelo, ChainGnosis, ChainKroma, ChainMetis, ChainOptimismBedrock, ChainScroll, ChainWeMix, ChainXLayer, ChainZkEvm, ChainZkSync:
case "", ChainArbitrum, ChainCelo, ChainGnosis, ChainHedera, ChainKroma, ChainMetis, ChainOptimismBedrock, ChainScroll, ChainWeMix, ChainXLayer, ChainZkEvm, ChainZkSync:
return true
}
return false
Expand All @@ -49,6 +50,8 @@ func ChainTypeFromSlug(slug string) ChainType {
return ChainCelo
case "gnosis":
return ChainGnosis
case "hedera":
return ChainHedera
case "kroma":
return ChainKroma
case "metis":
Expand Down Expand Up @@ -120,6 +123,7 @@ var ErrInvalidChainType = fmt.Errorf("must be one of %s or omitted", strings.Joi
string(ChainArbitrum),
string(ChainCelo),
string(ChainGnosis),
string(ChainHedera),
string(ChainKroma),
string(ChainMetis),
string(ChainOptimismBedrock),
Expand Down
Loading
Loading