Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(*): staker funding and reduce contention #22

Merged
merged 2 commits into from
Nov 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 8 additions & 9 deletions harness/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func startHarness(cmdCtx context.Context, cfg config.Config) error {
vig := NewSubReporter(tm, vigilanteSender)
vig.Start(ctx)

fpMgr := NewFinalityProviderManager(tm, fpmSender, zap.NewNop(), numFinalityProviders, fpMgrHome, eotsDir) // todo(lazar); fp count cfg
fpMgr := NewFinalityProviderManager(tm, fpmSender, zap.NewNop(), numFinalityProviders, fpMgrHome, eotsDir)
if err = fpMgr.Initialize(ctx, cfg.NumPubRand); err != nil {
return err
}
Expand All @@ -87,21 +87,20 @@ func startHarness(cmdCtx context.Context, cfg config.Config) error {
if err != nil {
return err
}
stakers = append(stakers, NewBTCStaker(tm, stakerSender, fpMgr.randomFp().btcPk.MustToBTCPK()))
stakers = append(stakers, NewBTCStaker(tm, stakerSender, fpMgr.randomFp().btcPk.MustToBTCPK(), tm.fundingRequests))
}

// periodically check if we need to fund the staker
go tm.fundForever(ctx)

// fund all stakers
if err := tm.fundAllParties(ctx, senders(stakers)); err != nil {
return err
}

// start stakers and defer stops
// TODO(lazar): Ideally stakers would start on different times to reduce contention
// on funding BTC wallet
for _, staker := range stakers {
if err := staker.Start(ctx); err != nil {
return err
}
// start stakers
if err := startStakersInBatches(ctx, stakers); err != nil {
return err
}

go printStatsForever(ctx, tm, stopChan, cfg)
Expand Down
17 changes: 16 additions & 1 deletion harness/babylonclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@ import (
"encoding/hex"
"fmt"
"github.com/avast/retry-go/v4"
"github.com/babylonlabs-io/babylon/app/params"
"math/rand"
"sync"
"time"

bbn "github.com/babylonlabs-io/babylon/app"
Expand All @@ -32,6 +34,18 @@ var (
RtyErr = retry.LastErrorOnly(true)
)

var (
once sync.Once
encCfg *params.EncodingConfig
)

func getEncodingConfig() *params.EncodingConfig {
once.Do(func() {
encCfg = bbn.GetEncodingConfig()
})
return encCfg
}

type Client struct {
*query.QueryClient

Expand All @@ -47,6 +61,7 @@ func New(
zapLogger *zap.Logger
err error
)
getEncodingConfig()

// ensure cfg is valid
if err := cfg.Validate(); err != nil {
Expand Down Expand Up @@ -74,7 +89,7 @@ func New(

// Create tmp Babylon0 app to retrieve and register codecs
// Need to override this manually as otherwise option from config is ignored
encCfg := bbn.GetEncodingConfig()

cp.Cdc = cosmos.Codec{
InterfaceRegistry: encCfg.InterfaceRegistry,
Marshaler: encCfg.Codec,
Expand Down
28 changes: 20 additions & 8 deletions harness/btcstaker.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,25 +23,29 @@ import (
"github.com/btcsuite/btcd/wire"
"github.com/cometbft/cometbft/crypto/tmhash"
sdk "github.com/cosmos/cosmos-sdk/types"
"strings"
"sync/atomic"
"time"
)

type BTCStaker struct {
tm *TestManager
client *SenderWithBabylonClient
fpPK *btcec.PublicKey
tm *TestManager
client *SenderWithBabylonClient
fpPK *btcec.PublicKey
fundingRequest chan sdk.AccAddress
}

func NewBTCStaker(
tm *TestManager,
client *SenderWithBabylonClient,
finalityProviderPublicKey *btcec.PublicKey,
fundingRequest chan sdk.AccAddress,
) *BTCStaker {
return &BTCStaker{
tm: tm,
client: client,
fpPK: finalityProviderPublicKey,
tm: tm,
client: client,
fpPK: finalityProviderPublicKey,
fundingRequest: fundingRequest,
}
}

Expand Down Expand Up @@ -83,7 +87,15 @@ func (s *BTCStaker) runForever(ctx context.Context, stakerAddress btcutil.Addres
}
err = s.buildAndSendStakingTransaction(ctx, stakerAddress, stakerPk, &paramsResp.Params)
if err != nil {
fmt.Printf("🚫 Err in BTC Staker: %v\n", err)
fmt.Printf("🚫 Err in BTC Staker (%s), err: %v\n", s.client.BabylonAddress.String(), err)
if strings.Contains(strings.ToLower(err.Error()), "insufficient funds") {
select {
case s.fundingRequest <- s.client.BabylonAddress:
time.Sleep(5 * time.Second)
default:
fmt.Println("fundingRequest channel is full or closed")
}
}
}
}
}
Expand Down Expand Up @@ -293,7 +305,7 @@ func (s *BTCStaker) waitForTransactionConfirmation(
txHash *chainhash.Hash,
requiredDepth uint32,
) *bstypes.InclusionProof {
t := time.NewTicker(10 * time.Second)
t := time.NewTicker(5 * time.Second)
defer t.Stop()

for {
Expand Down
104 changes: 103 additions & 1 deletion harness/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/cosmos/cosmos-sdk/types"
sdk "github.com/cosmos/cosmos-sdk/types"
banktypes "github.com/cosmos/cosmos-sdk/x/bank/types"
"golang.org/x/sync/errgroup"
"os"
"path/filepath"
"time"
Expand Down Expand Up @@ -76,6 +77,7 @@ type TestManager struct {
manger *container.Manager
babylonDir string
benchConfig benchcfg.Config
fundingRequests chan sdk.AccAddress
}

// StartManager creates a test manager
Expand Down Expand Up @@ -214,6 +216,7 @@ func StartManager(ctx context.Context, outputsInWallet uint32, epochInterval uin
manger: manager,
babylonDir: babylonDir,
benchConfig: runCfg,
fundingRequests: make(chan sdk.AccAddress, 100),
}, nil
}

Expand Down Expand Up @@ -344,7 +347,7 @@ func (tm *TestManager) fundAllParties(
var msgs []sdk.Msg

for _, sender := range senders {
msg := banktypes.NewMsgSend(fundingAddress, sender.BabylonAddress, types.NewCoins(types.NewInt64Coin("ubbn", 100000000)))
msg := banktypes.NewMsgSend(fundingAddress, sender.BabylonAddress, types.NewCoins(types.NewInt64Coin("ubbn", 100_000_000)))
msgs = append(msgs, msg)
}

Expand All @@ -364,6 +367,39 @@ func (tm *TestManager) fundAllParties(
return nil
}

func (tm *TestManager) fundBnnAddress(
ctx context.Context,
addr sdk.AccAddress,
) error {
if err := ctx.Err(); err != nil {
return fmt.Errorf("context error before funding: %w", err)
}

fundingAccount := tm.BabylonClientNode0.MustGetAddr()
fundingAddress, err := sdk.AccAddressFromBech32(fundingAccount)
if err != nil {
return fmt.Errorf("failed to parse funding address: %w", err)
}

amount := types.NewCoins(types.NewInt64Coin("ubbn", 100_000_000))
msg := banktypes.NewMsgSend(fundingAddress, addr, amount)

resp, err := tm.BabylonClientNode0.ReliablySendMsg(ctx, msg, nil, nil)
if err != nil {
return fmt.Errorf("failed to send fund transaction: %w", err)
}

if resp == nil {
return fmt.Errorf("transaction response is nil")
}

if resp.Code != 0 {
return fmt.Errorf("funding transaction failed with code %d", resp.Code)
}

return nil
}

func (tm *TestManager) listBlocksForever(ctx context.Context) {
lt := time.NewTicker(5 * time.Second)
defer lt.Stop()
Expand All @@ -388,3 +424,69 @@ func (tm *TestManager) listBlocksForever(ctx context.Context) {
}
}
}

func (tm *TestManager) fundForever(ctx context.Context) {
ticker := time.NewTicker(3 * time.Second)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
case addr := <-tm.fundingRequests:
go func() {
if err := tm.fundBnnAddress(ctx, addr); err != nil {
fmt.Printf("🚫 Failed to fund addr %s, err %v\n", addr.String(), err)
}
}()
}
}
}

func startStakersInBatches(ctx context.Context, stakers []*BTCStaker) error {
const (
batchSize = 25
batchInterval = 2 * time.Second
)

fmt.Printf("⌛ Starting %d stakers in batches of %d, with %s interval\n",
len(stakers), batchSize, batchInterval)

start := time.Now()
var g errgroup.Group
for i := 0; i < len(stakers); i += batchSize {
end := i + batchSize
if end > len(stakers) {
end = len(stakers)
}
batch := stakers[i:end]

g.Go(func() error {
return startBatch(ctx, batch)
})

// Wait before starting the next batch, unless it's the last batch
if end < len(stakers) {
select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(batchInterval):
}
}
}

elapsed := time.Since(start)
fmt.Printf("✅ All %d stakers started in %s\n", len(stakers), elapsed)

return g.Wait()
}

func startBatch(ctx context.Context, batch []*BTCStaker) error {
var g errgroup.Group
for _, staker := range batch {
g.Go(func() error {
return staker.Start(ctx)
})
}
return g.Wait()
}