Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(submitter): refactoring points #57

Merged
merged 8 commits into from
Sep 20, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion e2etest/reporter_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func (tm *TestManager) GenerateAndSubmitBlockNBlockStartingFromDepth(t *testing.
}

func TestReporter_BoostrapUnderFrequentBTCHeaders(t *testing.T) {
t.Parallel()
//t.Parallel() // todo(lazar): this test when run in parallel is very flaky, investigate why
// no need to much mature outputs, we are not going to submit transactions in this test
numMatureOutputs := uint32(150)

Expand Down
2 changes: 2 additions & 0 deletions e2etest/submitter_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package e2etest

import (
"github.com/babylonlabs-io/vigilante/testutil"
promtestutil "github.com/prometheus/client_golang/prometheus/testutil"
"math/rand"
"testing"
"time"
Expand Down Expand Up @@ -188,4 +189,5 @@ func TestSubmitterSubmissionReplace(t *testing.T) {
blockWithOpReturnTransactions := tm.mineBlock(t)
// block should have 2 transactions, 1 from submitter and 1 coinbase
require.Equal(t, len(blockWithOpReturnTransactions.Transactions), 3)
require.True(t, promtestutil.ToFloat64(vigilantSubmitter.Metrics().ResentCheckpointsCounter) == 1)
}
119 changes: 78 additions & 41 deletions submitter/relayer/relayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/lightningnetwork/lnd/kvdb"
"math"
"strconv"
"strings"
"time"

"github.com/babylonlabs-io/babylon/btctxformatter"
Expand All @@ -33,6 +34,10 @@ const (
dustThreshold btcutil.Amount = 546
)

var (
TxNotFoundErr = errors.New("-5: No such mempool or blockchain transaction. Use gettransaction for wallet transactions")
)

type GetLatestCheckpointFunc func() (*store.StoredCheckpoint, bool, error)
type GetRawTransactionFunc func(txHash *chainhash.Hash) (*btcutil.Tx, error)
type SendTransactionFunc func(tx *wire.MsgTx) (*chainhash.Hash, error)
Expand Down Expand Up @@ -149,6 +154,18 @@ func (rl *Relayer) SendCheckpointToBTC(ckpt *ckpttypes.RawCheckpointWithMetaResp
return nil
}

return nil
}

// MaybeResubmitSecondCheckpointTx based on the resend interval attempts to resubmit 2nd ckpt tx with a bumped fee
func (rl *Relayer) MaybeResubmitSecondCheckpointTx(ckpt *ckpttypes.RawCheckpointWithMetaResponse) error {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

better to have a doc string to explain the idea of MaybeResubmitSecondCheckpointTx

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

only suggestion for naming

Suggested change
func (rl *Relayer) MaybeResubmitSecondCheckpointTx(ckpt *ckpttypes.RawCheckpointWithMetaResponse) error {
func (rl *Relayer) ResubmitSecondCheckpointTxIfNeeded(ckpt *ckpttypes.RawCheckpointWithMetaResponse) error {

ckptEpoch := ckpt.Ckpt.EpochNum
if ckpt.Status != ckpttypes.Sealed {
rl.logger.Errorf("The checkpoint for epoch %v is not sealed", ckptEpoch)
rl.metrics.InvalidCheckpointCounter.Inc()
return nil
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we add a check that the epoch of the checkpoint equals that of the last submitted checkpoint?

lastSubmittedEpoch := rl.lastSubmittedCheckpoint.Epoch
if ckptEpoch < lastSubmittedEpoch {
rl.logger.Errorf("The checkpoint for epoch %v is lower than the last submission for epoch %v",
Expand All @@ -158,55 +175,51 @@ func (rl *Relayer) SendCheckpointToBTC(ckpt *ckpttypes.RawCheckpointWithMetaResp
return nil
}

// now that the checkpoint has been sent, we should try to resend it
// if the resend interval has passed
durSeconds := uint(time.Since(rl.lastSubmittedCheckpoint.Ts).Seconds())
if durSeconds >= rl.config.ResendIntervalSeconds {
rl.logger.Debugf("The checkpoint for epoch %v was sent more than %v seconds ago but not included on BTC",
ckptEpoch, rl.config.ResendIntervalSeconds)
if durSeconds < rl.config.ResendIntervalSeconds {
return nil
}

bumpedFee := rl.calculateBumpedFee(rl.lastSubmittedCheckpoint)
rl.logger.Debugf("The checkpoint for epoch %v was sent more than %v seconds ago but not included on BTC",
ckptEpoch, rl.config.ResendIntervalSeconds)

// make sure the bumped fee is effective
if !rl.shouldResendCheckpoint(rl.lastSubmittedCheckpoint, bumpedFee) {
return nil
}
bumpedFee := rl.calculateBumpedFee(rl.lastSubmittedCheckpoint)

rl.logger.Debugf("Resending the second tx of the checkpoint %v, old fee of the second tx: %v Satoshis, txid: %s",
ckptEpoch, rl.lastSubmittedCheckpoint.Tx2.Fee, rl.lastSubmittedCheckpoint.Tx2.TxId.String())
// make sure the bumped fee is effective
if !rl.shouldResendCheckpoint(rl.lastSubmittedCheckpoint, bumpedFee) {
return nil
}

resubmittedTx2, err := rl.resendSecondTxOfCheckpointToBTC(rl.lastSubmittedCheckpoint.Tx2, bumpedFee)
if err != nil {
rl.metrics.FailedResentCheckpointsCounter.Inc()
return fmt.Errorf("failed to re-send the second tx of the checkpoint %v: %w", rl.lastSubmittedCheckpoint.Epoch, err)
}
rl.logger.Debugf("Resending the second tx of the checkpoint %v, old fee of the second tx: %v Satoshis, txid: %s",
ckptEpoch, rl.lastSubmittedCheckpoint.Tx2.Fee, rl.lastSubmittedCheckpoint.Tx2.TxId.String())

// record the metrics of the resent tx2
rl.metrics.NewSubmittedCheckpointSegmentGaugeVec.WithLabelValues(
strconv.Itoa(int(ckptEpoch)),
"1",
resubmittedTx2.TxId.String(),
strconv.Itoa(int(resubmittedTx2.Fee)),
).SetToCurrentTime()
rl.metrics.ResentCheckpointsCounter.Inc()

rl.logger.Infof("Successfully re-sent the second tx of the checkpoint %v, txid: %s, bumped fee: %v Satoshis",
rl.lastSubmittedCheckpoint.Epoch, resubmittedTx2.TxId.String(), resubmittedTx2.Fee)

// update the second tx of the last submitted checkpoint as it is replaced
rl.lastSubmittedCheckpoint.Tx2 = resubmittedTx2

err = storeCkptFunc(
rl.lastSubmittedCheckpoint.Tx1.Tx,
rl.lastSubmittedCheckpoint.Tx2.Tx,
rl.lastSubmittedCheckpoint.Epoch,
)
if err != nil {
return err
}
resubmittedTx2, err := rl.resendSecondTxOfCheckpointToBTC(rl.lastSubmittedCheckpoint.Tx2, bumpedFee)
if err != nil {
rl.metrics.FailedResentCheckpointsCounter.Inc()
return fmt.Errorf("failed to re-send the second tx of the checkpoint %v: %w", rl.lastSubmittedCheckpoint.Epoch, err)
}

return nil
// record the metrics of the resent tx2
rl.metrics.NewSubmittedCheckpointSegmentGaugeVec.WithLabelValues(
strconv.Itoa(int(ckptEpoch)),
"1",
resubmittedTx2.TxId.String(),
strconv.Itoa(int(resubmittedTx2.Fee)),
).SetToCurrentTime()
rl.metrics.ResentCheckpointsCounter.Inc()

rl.logger.Infof("Successfully re-sent the second tx of the checkpoint %v, txid: %s, bumped fee: %v Satoshis",
rl.lastSubmittedCheckpoint.Epoch, resubmittedTx2.TxId.String(), resubmittedTx2.Fee)

// update the second tx of the last submitted checkpoint as it is replaced
rl.lastSubmittedCheckpoint.Tx2 = resubmittedTx2

storedCkpt := store.NewStoredCheckpoint(
rl.lastSubmittedCheckpoint.Tx1.Tx,
rl.lastSubmittedCheckpoint.Tx2.Tx,
rl.lastSubmittedCheckpoint.Epoch,
)
return rl.store.PutCheckpoint(storedCkpt)
}

func (rl *Relayer) shouldSendCompleteCkpt(ckptEpoch uint64) bool {
Expand Down Expand Up @@ -240,6 +253,16 @@ func (rl *Relayer) calculateBumpedFee(ckptInfo *types.CheckpointInfo) btcutil.Am

// resendSecondTxOfCheckpointToBTC resends the second tx of the checkpoint with bumpedFee
func (rl *Relayer) resendSecondTxOfCheckpointToBTC(tx2 *types.BtcTxInfo, bumpedFee btcutil.Amount) (*types.BtcTxInfo, error) {
found, err := rl.inMempool()
if err != nil {
return nil, err
}

// No need to resend, transaction already confirmed
if !found {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any way that the Tx could be dropped from the mempool and it is not confirmed?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if tx would have too low fee it is possible that it could be evicted from the mempool without inclusion in th ledger. Maybe it is worth adding the check if tx is is already in the ledger then ? In btc staker there is method - https://github.com/babylonchain/btc-staker/blob/main/walletcontroller/client.go#L243, which retrieves tx from node and marks wheter it is from the mempool or from the ledger. Maybe we can re-use it here ?

return nil, nil
}

// set output value of the second tx to be the balance minus the bumped fee
// if the bumped fee is higher than the balance, then set the bumped fee to
// be equal to the balance to ensure the output value is not negative
Expand Down Expand Up @@ -272,6 +295,20 @@ func (rl *Relayer) resendSecondTxOfCheckpointToBTC(tx2 *types.BtcTxInfo, bumpedF
return tx2, nil
}

func (rl *Relayer) inMempool() (bool, error) {
_, err := rl.GetRawTransaction(rl.lastSubmittedCheckpoint.Tx2.TxId)
if err != nil {
if strings.Contains(err.Error(), TxNotFoundErr.Error()) {
return false, nil
}
// Return the error if it's not a "not found" error
return false, err
}

// the transaction is in the mempool
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

here indicates that either the tx is either in the mempool or on the chain. I think we only need to resend tx2 if it is in the mempool or it can't be found at all. Can we differentiate it?

return true, nil
}

// calcMinRelayFee returns the minimum transaction fee required for a
// transaction with the passed serialized size to be accepted into the memory
// pool and relayed.
Expand Down
11 changes: 9 additions & 2 deletions submitter/submitter.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,15 +202,22 @@ func (s *Submitter) processCheckpoints() {
select {
case ckpt := <-s.poller.GetSealedCheckpointChan():
s.logger.Infof("A sealed raw checkpoint for epoch %v is found", ckpt.Ckpt.EpochNum)
err := s.relayer.SendCheckpointToBTC(ckpt)
if err != nil {
if err := s.relayer.SendCheckpointToBTC(ckpt); err != nil {
s.logger.Errorf("Failed to submit the raw checkpoint for %v: %v", ckpt.Ckpt.EpochNum, err)
s.metrics.FailedCheckpointsCounter.Inc()
}
if err := s.relayer.MaybeResubmitSecondCheckpointTx(ckpt); err != nil {
s.logger.Errorf("Failed to resubmit the raw checkpoint for %v: %v", ckpt.Ckpt.EpochNum, err)
s.metrics.FailedCheckpointsCounter.Inc()
}
s.metrics.SecondsSinceLastCheckpointGauge.Set(0)
case <-quit:
// We have been asked to stop
return
}
}
}

func (s *Submitter) Metrics() *metrics.SubmitterMetrics {
return s.metrics
}
Loading