-
Notifications
You must be signed in to change notification settings - Fork 1.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add nonce validation after broadcast for Hedera #13957
Changes from 6 commits
4625cfa
21b4a91
f0ce40f
5f0f849
d13cb3d
e7d0678
8dd90fc
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,5 @@ | ||
--- | ||
"chainlink": minor | ||
--- | ||
|
||
Added nonce validation immediately after broadcast for Hedera #internal |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -34,6 +34,13 @@ const ( | |
// TransmitCheckTimeout controls the maximum amount of time that will be | ||
// spent on the transmit check. | ||
TransmitCheckTimeout = 2 * time.Second | ||
|
||
// maxBroadcastRetries is the number of times a transaction broadcast is retried when the sequence fails to increment on Hedera | ||
maxHederaBroadcastRetries = 3 | ||
|
||
// hederaChainType is the string representation of the Hedera chain type | ||
// Temporary solution until the Broadcaster is moved to the EVM code base | ||
hederaChainType = "hedera" | ||
) | ||
|
||
var ( | ||
|
@@ -114,6 +121,7 @@ type Broadcaster[ | |
sequenceTracker txmgrtypes.SequenceTracker[ADDR, SEQ] | ||
resumeCallback ResumeCallback | ||
chainID CHAIN_ID | ||
chainType string | ||
config txmgrtypes.BroadcasterChainConfig | ||
feeConfig txmgrtypes.BroadcasterFeeConfig | ||
txConfig txmgrtypes.BroadcasterTransactionsConfig | ||
|
@@ -163,6 +171,7 @@ func NewBroadcaster[ | |
lggr logger.Logger, | ||
checkerFactory TransmitCheckerFactory[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], | ||
autoSyncSequence bool, | ||
chainType string, | ||
) *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE] { | ||
lggr = logger.Named(lggr, "Broadcaster") | ||
b := &Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]{ | ||
|
@@ -171,6 +180,7 @@ func NewBroadcaster[ | |
client: client, | ||
TxAttemptBuilder: txAttemptBuilder, | ||
chainID: client.ConfiguredChainID(), | ||
chainType: chainType, | ||
config: config, | ||
feeConfig: feeConfig, | ||
txConfig: txConfig, | ||
|
@@ -411,7 +421,7 @@ func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) hand | |
return fmt.Errorf("handleAnyInProgressTx failed: %w", err), true | ||
} | ||
if etx != nil { | ||
if err, retryable := eb.handleInProgressTx(ctx, *etx, etx.TxAttempts[0], etx.CreatedAt); err != nil { | ||
if err, retryable := eb.handleInProgressTx(ctx, *etx, etx.TxAttempts[0], etx.CreatedAt, 0); err != nil { | ||
return fmt.Errorf("handleAnyInProgressTx failed: %w", err), retryable | ||
} | ||
} | ||
|
@@ -464,12 +474,12 @@ func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) hand | |
return fmt.Errorf("processUnstartedTxs failed on UpdateTxUnstartedToInProgress: %w", err), true | ||
} | ||
|
||
return eb.handleInProgressTx(ctx, *etx, attempt, time.Now()) | ||
return eb.handleInProgressTx(ctx, *etx, attempt, time.Now(), 0) | ||
} | ||
|
||
// There can be at most one in_progress transaction per address. | ||
// Here we complete the job that we didn't finish last time. | ||
func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) handleInProgressTx(ctx context.Context, etx txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], attempt txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], initialBroadcastAt time.Time) (error, bool) { | ||
func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) handleInProgressTx(ctx context.Context, etx txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], attempt txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], initialBroadcastAt time.Time, retryCount int) (error, bool) { | ||
if etx.State != TxInProgress { | ||
return fmt.Errorf("invariant violation: expected transaction %v to be in_progress, it was %s", etx.ID, etx.State), false | ||
} | ||
|
@@ -478,6 +488,11 @@ func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) hand | |
lgr.Infow("Sending transaction", "txAttemptID", attempt.ID, "txHash", attempt.Hash, "meta", etx.Meta, "feeLimit", attempt.ChainSpecificFeeLimit, "callerProvidedFeeLimit", etx.FeeLimit, "attempt", attempt, "etx", etx) | ||
errType, err := eb.client.SendTransactionReturnCode(ctx, etx, attempt, lgr) | ||
|
||
// The validation below is only applicable to Hedera because it has instant finality and a unique sequence behavior | ||
if eb.chainType == hederaChainType { | ||
errType, err = eb.validateOnChainSequence(ctx, lgr, errType, err, etx, retryCount) | ||
} | ||
|
||
if errType != client.Fatal { | ||
etx.InitialBroadcastAt = &initialBroadcastAt | ||
etx.BroadcastAt = &initialBroadcastAt | ||
|
@@ -538,7 +553,7 @@ func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) hand | |
eb.sequenceTracker.GenerateNextSequence(etx.FromAddress, *etx.Sequence) | ||
return err, true | ||
case client.Underpriced: | ||
return eb.tryAgainBumpingGas(ctx, lgr, err, etx, attempt, initialBroadcastAt) | ||
return eb.tryAgainBumpingGas(ctx, lgr, err, etx, attempt, initialBroadcastAt, retryCount+1) | ||
case client.InsufficientFunds: | ||
// NOTE: This bails out of the entire cycle and essentially "blocks" on | ||
// any transaction that gets insufficient_funds. This is OK if a | ||
|
@@ -600,6 +615,44 @@ func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) hand | |
} | ||
} | ||
|
||
func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) validateOnChainSequence(ctx context.Context, lgr logger.SugaredLogger, errType client.SendTxReturnCode, err error, etx txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], retryCount int) (client.SendTxReturnCode, error) { | ||
// Only check if sequence was incremented if broadcast was successful, otherwise return the existing err type | ||
if errType != client.Successful { | ||
return errType, err | ||
} | ||
// Transaction sequence cannot be nil here since a sequence is required to broadcast | ||
txSeq := *etx.Sequence | ||
// Retrieve the latest mined sequence from on-chain | ||
nextSeqOnChain, err := eb.client.SequenceAt(ctx, etx.FromAddress, nil) | ||
if err != nil { | ||
return errType, err | ||
} | ||
|
||
// Check that the transaction count has incremented on-chain to include the broadcasted transaction | ||
// Insufficient transaction fee is a common scenario in which the sequence is not incremented by the chain even though we got a successful response | ||
// If the sequence failed to increment and hasn't reached the max retries, return the Underpriced error to try again with a bumped attempt | ||
if nextSeqOnChain.Int64() == txSeq.Int64() && retryCount < maxHederaBroadcastRetries { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: in theory if nextSeqOnChain is < txSeq we can mark tx as There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Maybe @dhaidashenko you meant I agree that There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ah if that's the case then classifying that as There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
return client.Underpriced, nil | ||
} | ||
|
||
// If the transaction reaches the retry limit and fails to get included, mark it as fatally errored | ||
// Some unknown error other than insufficient tx fee could be the cause | ||
if nextSeqOnChain.Int64() == txSeq.Int64() && retryCount >= maxHederaBroadcastRetries { | ||
err := fmt.Errorf("failed to broadcast transaction on %s after %d retries", hederaChainType, retryCount) | ||
lgr.Error(err.Error()) | ||
return client.Fatal, err | ||
} | ||
|
||
// Belts and braces approach to detect and handle sqeuence gaps if the broadcast is considered successful | ||
if nextSeqOnChain.Int64() < txSeq.Int64() { | ||
err := fmt.Errorf("next expected sequence on-chain (%s) is less than the broadcasted transaction's sequence (%s)", nextSeqOnChain.String(), txSeq.String()) | ||
lgr.Criticalw("Sequence gap has been detected and needs to be filled", "error", err) | ||
return client.Fatal, err | ||
} | ||
|
||
return client.Successful, nil | ||
} | ||
|
||
// Finds next transaction in the queue, assigns a sequence, and moves it to "in_progress" state ready for broadcast. | ||
// Returns nil if no transactions are in queue | ||
func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) nextUnstartedTransactionWithSequence(fromAddress ADDR) (*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], error) { | ||
|
@@ -622,23 +675,26 @@ func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) next | |
return etx, nil | ||
} | ||
|
||
func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) tryAgainBumpingGas(ctx context.Context, lgr logger.Logger, txError error, etx txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], attempt txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], initialBroadcastAt time.Time) (err error, retryable bool) { | ||
logger.With(lgr, | ||
"sendError", txError, | ||
"attemptFee", attempt.TxFee, | ||
"maxGasPriceConfig", eb.feeConfig.MaxFeePrice(), | ||
).Errorf("attempt fee %v was rejected by the node for being too low. "+ | ||
"Node returned: '%s'. "+ | ||
"Will bump and retry. ACTION REQUIRED: This is a configuration error. "+ | ||
"Consider increasing FeeEstimator.PriceDefault (current value: %s)", | ||
attempt.TxFee, txError.Error(), eb.feeConfig.FeePriceDefault()) | ||
func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) tryAgainBumpingGas(ctx context.Context, lgr logger.Logger, txError error, etx txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], attempt txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], initialBroadcastAt time.Time, retry int) (err error, retryable bool) { | ||
// This log error is not applicable to Hedera since the action required would not be needed for its gas estimator | ||
if eb.chainType != hederaChainType { | ||
logger.With(lgr, | ||
"sendError", txError, | ||
"attemptFee", attempt.TxFee, | ||
"maxGasPriceConfig", eb.feeConfig.MaxFeePrice(), | ||
).Errorf("attempt fee %v was rejected by the node for being too low. "+ | ||
"Node returned: '%s'. "+ | ||
"Will bump and retry. ACTION REQUIRED: This is a configuration error. "+ | ||
"Consider increasing FeeEstimator.PriceDefault (current value: %s)", | ||
attempt.TxFee, txError.Error(), eb.feeConfig.FeePriceDefault()) | ||
} | ||
|
||
replacementAttempt, bumpedFee, bumpedFeeLimit, retryable, err := eb.NewBumpTxAttempt(ctx, etx, attempt, nil, lgr) | ||
if err != nil { | ||
return fmt.Errorf("tryAgainBumpFee failed: %w", err), retryable | ||
} | ||
|
||
return eb.saveTryAgainAttempt(ctx, lgr, etx, attempt, replacementAttempt, initialBroadcastAt, bumpedFee, bumpedFeeLimit) | ||
return eb.saveTryAgainAttempt(ctx, lgr, etx, attempt, replacementAttempt, initialBroadcastAt, bumpedFee, bumpedFeeLimit, retry) | ||
} | ||
|
||
func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) tryAgainWithNewEstimation(ctx context.Context, lgr logger.Logger, txError error, etx txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], attempt txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], initialBroadcastAt time.Time) (err error, retryable bool) { | ||
|
@@ -655,15 +711,15 @@ func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) tryA | |
lgr.Warnw("L2 rejected transaction due to incorrect fee, re-estimated and will try again", | ||
"etxID", etx.ID, "err", err, "newGasPrice", fee, "newGasLimit", feeLimit) | ||
|
||
return eb.saveTryAgainAttempt(ctx, lgr, etx, attempt, replacementAttempt, initialBroadcastAt, fee, feeLimit) | ||
return eb.saveTryAgainAttempt(ctx, lgr, etx, attempt, replacementAttempt, initialBroadcastAt, fee, feeLimit, 0) | ||
} | ||
|
||
func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) saveTryAgainAttempt(ctx context.Context, lgr logger.Logger, etx txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], attempt txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], replacementAttempt txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], initialBroadcastAt time.Time, newFee FEE, newFeeLimit uint64) (err error, retyrable bool) { | ||
func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) saveTryAgainAttempt(ctx context.Context, lgr logger.Logger, etx txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], attempt txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], replacementAttempt txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], initialBroadcastAt time.Time, newFee FEE, newFeeLimit uint64, retry int) (err error, retyrable bool) { | ||
if err = eb.txStore.SaveReplacementInProgressAttempt(ctx, attempt, &replacementAttempt); err != nil { | ||
return fmt.Errorf("tryAgainWithNewFee failed: %w", err), true | ||
} | ||
lgr.Debugw("Bumped fee on initial send", "oldFee", attempt.TxFee.String(), "newFee", newFee.String(), "newFeeLimit", newFeeLimit) | ||
return eb.handleInProgressTx(ctx, etx, replacementAttempt, initialBroadcastAt) | ||
return eb.handleInProgressTx(ctx, etx, replacementAttempt, initialBroadcastAt, retry) | ||
} | ||
|
||
func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) saveFatallyErroredTransaction(lgr logger.Logger, etx *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) error { | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -664,11 +664,15 @@ func (ec *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) bat | |
} | ||
|
||
if receipt.GetStatus() == 0 { | ||
rpcError, errExtract := ec.client.CallContract(ctx, attempt, receipt.GetBlockNumber()) | ||
if errExtract == nil { | ||
l.Warnw("transaction reverted on-chain", "hash", receipt.GetTxHash(), "rpcError", rpcError.String()) | ||
if receipt.GetRevertReason() != nil { | ||
l.Warnw("transaction reverted on-chain", "hash", receipt.GetTxHash(), "revertReason", *receipt.GetRevertReason()) | ||
} else { | ||
l.Warnw("transaction reverted on-chain unable to extract revert reason", "hash", receipt.GetTxHash(), "err", err) | ||
rpcError, errExtract := ec.client.CallContract(ctx, attempt, receipt.GetBlockNumber()) | ||
if errExtract == nil { | ||
l.Warnw("transaction reverted on-chain", "hash", receipt.GetTxHash(), "rpcError", rpcError.String()) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: It might make sense to set this reason back on the receipt struct right? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think it makes sense to leave this as a log instead of adding it to the Receipt struct. Since the TXM is indifferent to a tx reverting, I don't think we should try to store this to avoid adding another column. |
||
} else { | ||
l.Warnw("transaction reverted on-chain unable to extract revert reason", "hash", receipt.GetTxHash(), "err", err) | ||
} | ||
} | ||
// This might increment more than once e.g. in case of re-orgs going back and forth we might re-fetch the same receipt | ||
promRevertedTxCount.WithLabelValues(ec.chainID.String()).Add(1) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shall we rename this function to imply it's only meant for
Hedara
, or it's better to be generic ?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's alright either way. I considered that actually but opted to leave it implicit. I'm not against changing it though if others agree