Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

make load trigger fire and forget #16508

Merged
merged 12 commits into from
Feb 24, 2025
74 changes: 33 additions & 41 deletions integration-tests/load/ccip/ccip_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (

"github.com/smartcontractkit/chainlink/deployment"

"github.com/smartcontractkit/chainlink-ccip/pkg/types/ccipocr3"
"github.com/smartcontractkit/chainlink-common/pkg/logger"
"github.com/smartcontractkit/chainlink-common/pkg/utils/tests"
"github.com/smartcontractkit/chainlink-testing-framework/wasp"
Expand All @@ -36,13 +35,13 @@ var (
const simChainTestKey = "ac0974bec39a17e36ba4a6b4d238ff944bacb478cbed5efcae784d7bf4f2ff80"

// step 1: setup
// Parse the test config, initialize CRIB with configurations defined
// Parse the test config
// step 2: subscribe
// Create event subscribers on the offramp
// Create event subscribers in src and dest
// step 3: load
// Use wasp to initiate load
// step 4: teardown
// Stop the chains, cleanup the environment
// wait for ccip to finish, push remaining data
func TestCCIPLoad_RPS(t *testing.T) {
// comment out when executing the test
// t.Skip("Skipping test as this test should not be auto triggered")
Expand Down Expand Up @@ -74,25 +73,43 @@ func TestCCIPLoad_RPS(t *testing.T) {
state, err := ccipchangeset.LoadOnchainState(*env)
require.NoError(t, err)

errChan := make(chan error)
defer close(errChan)
finalSeqNrCommitChannels := make(map[uint64]chan finalSeqNrReport)
finalSeqNrExecChannels := make(map[uint64]chan finalSeqNrReport)
loadFinished := make(chan struct{})

mm := NewMetricsManager(t, env.Logger)
mm := NewMetricsManager(t, env.Logger, userOverrides)
go mm.Start(ctx)
defer mm.Stop()

// gunMap holds a destinationGun for every enabled destination chain
gunMap := make(map[uint64]*DestinationGun)
p := wasp.NewProfile()
// Only create a destination gun if we have decided to send traffic to this chain
for ind := range *userOverrides.NumDestinationChains {
cs := env.AllChainSelectors()[ind]

// potential source chains need a subscription
for _, cs := range env.AllChainSelectors() {
latesthdr, err := env.Chains[cs].Client.HeaderByNumber(ctx, nil)
require.NoError(t, err)
block := latesthdr.Number.Uint64()
startBlocks[cs] = &block
other := env.AllChainSelectorsExcluding([]uint64{cs})
wg.Add(1)
go subscribeTransmitEvents(
ctx,
lggr,
state.Chains[cs].OnRamp,
other,
startBlocks[cs],
cs,
loadFinished,
env.Chains[cs].Client,
&wg,
mm.InputChan,
finalSeqNrCommitChannels,
finalSeqNrExecChannels)
}

// confirmed dest chains need a subscription
for ind := range *userOverrides.NumDestinationChains {
cs := env.AllChainSelectors()[ind]

messageKeys := make(map[uint64]*bind.TransactOpts)
other := env.AllChainSelectorsExcluding([]uint64{cs})
Expand Down Expand Up @@ -133,7 +150,6 @@ func TestCCIPLoad_RPS(t *testing.T) {
t.Fatal(err)
}

otherChains := env.AllChainSelectorsExcluding([]uint64{cs})
finalSeqNrCommitChannels[cs] = make(chan finalSeqNrReport)
finalSeqNrExecChannels[cs] = make(chan finalSeqNrReport)

Expand All @@ -142,24 +158,22 @@ func TestCCIPLoad_RPS(t *testing.T) {
ctx,
lggr,
state.Chains[cs].OffRamp,
otherChains,
&block,
other,
startBlocks[cs],
cs,
env.Chains[cs].Client,
finalSeqNrCommitChannels[cs],
errChan,
&wg,
mm.InputChan)
go subscribeExecutionEvents(
ctx,
lggr,
state.Chains[cs].OffRamp,
otherChains,
&block,
other,
startBlocks[cs],
cs,
env.Chains[cs].Client,
finalSeqNrExecChannels[cs],
errChan,
&wg,
mm.InputChan)
}
Expand Down Expand Up @@ -190,36 +204,14 @@ func TestCCIPLoad_RPS(t *testing.T) {

_, err = p.Run(true)
require.NoError(t, err)

for _, gun := range gunMap {
for csPair, seqNums := range gun.seqNums {
lggr.Debugw("pushing finalized sequence numbers for ",
"chainSelector", gun.chainSelector,
"sourceChainSelector", csPair.SourceChainSelector,
"seqNums", seqNums)
finalSeqNrCommitChannels[csPair.DestChainSelector] <- finalSeqNrReport{
sourceChainSelector: csPair.SourceChainSelector,
expectedSeqNrRange: ccipocr3.SeqNumRange{
ccipocr3.SeqNum(seqNums.Start.Load()), ccipocr3.SeqNum(seqNums.End.Load()),
},
}

finalSeqNrExecChannels[csPair.DestChainSelector] <- finalSeqNrReport{
sourceChainSelector: csPair.SourceChainSelector,
expectedSeqNrRange: ccipocr3.SeqNumRange{
ccipocr3.SeqNum(seqNums.Start.Load()), ccipocr3.SeqNum(seqNums.End.Load()),
},
}
}
}
close(loadFinished)

// after load is finished, wait for a "timeout duration" before considering that messages are timed out
timeout := userOverrides.GetTimeoutDuration()
if timeout != 0 {
testTimer := time.NewTimer(timeout)
go func() {
<-testTimer.C
mm.Stop()
cancel()
}()
}
Expand Down
85 changes: 16 additions & 69 deletions integration-tests/load/ccip/destination_gun.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,9 @@ import (
"context"
"errors"
"fmt"
"math"
"math/big"
"math/rand"

"github.com/smartcontractkit/chainlink/deployment/ccip/changeset/testhelpers"
"time"

"github.com/ethereum/go-ethereum/accounts/abi/bind"
"github.com/ethereum/go-ethereum/common"
Expand All @@ -35,7 +33,6 @@ type DestinationGun struct {
l logger.Logger
env deployment.Environment
state *ccipchangeset.CCIPOnChainState
seqNums map[testhelpers.SourceDestPair]SeqNumRange
roundNum *atomic.Int32
chainSelector uint64
receiver common.Address
Expand All @@ -56,21 +53,10 @@ func NewDestinationGun(
chainOffset int,
metricPipe chan messageData,
) (*DestinationGun, error) {
seqNums := make(map[testhelpers.SourceDestPair]SeqNumRange)
for _, cs := range env.AllChainSelectorsExcluding([]uint64{chainSelector}) {
seqNums[testhelpers.SourceDestPair{
SourceChainSelector: cs,
DestChainSelector: chainSelector,
}] = SeqNumRange{
Start: atomic.NewUint64(math.MaxUint64),
End: atomic.NewUint64(0),
}
}
dg := DestinationGun{
l: l,
env: env,
state: state,
seqNums: seqNums,
roundNum: &atomic.Int32{},
chainSelector: chainSelector,
receiver: receiver,
Expand Down Expand Up @@ -121,11 +107,6 @@ func (m *DestinationGun) Call(_ *wasp.Generator) *wasp.Response {

acc := m.messageKeys[src]

csPair := testhelpers.SourceDestPair{
SourceChainSelector: src,
DestChainSelector: m.chainSelector,
}

r := state.Chains[src].Router

msg, err := m.GetMessage(src)
Expand Down Expand Up @@ -162,61 +143,27 @@ func (m *DestinationGun) Call(_ *wasp.Generator) *wasp.Response {
"sourceChain", src,
"destchain", m.chainSelector,
"err", deployment.MaybeDataErr(err))
return &wasp.Response{Error: err.Error(), Group: waspGroup, Failed: true}
}

blockNum, err := m.env.Chains[src].Confirm(tx)
if err != nil {
m.l.Errorw("could not confirm tx on source", "tx", tx, "err", deployment.MaybeDataErr(err))
return &wasp.Response{Error: err.Error(), Group: waspGroup, Failed: true}
}
// in the event of an error, still push a metric
// sequence numbers start at 1 so using 0 as a sentinel value
data := messageData{
eventType: transmitted,
srcDstSeqNum: srcDstSeqNum{
src: src,
dst: m.chainSelector,
seqNum: 0,
},
timestamp: uint64(time.Now().Unix()),
}
m.metricPipe <- data

// todo: wasp should not manage confirming the message
// instead, we should manage the sequence number atomically (at a higher level)
it, err := state.Chains[src].OnRamp.FilterCCIPMessageSent(&bind.FilterOpts{
Start: blockNum,
End: &blockNum,
Context: context.Background(),
}, []uint64{m.chainSelector}, []uint64{})
if err != nil {
m.l.Errorw("could not find sent message event on src chain", "src", src, "dst", m.chainSelector, "err", err)
return &wasp.Response{Error: err.Error(), Group: waspGroup, Failed: true}
}
if !it.Next() {
m.l.Errorw("Could not find event")
return &wasp.Response{Error: "Could not iterate", Group: waspGroup, Failed: true}
}

m.l.Infow("Transmitted message with",
"sourceChain", src,
"destChain", m.chainSelector,
"sequence number", it.Event.SequenceNumber)

// push metric to metric manager for eventual distribution to loki
blockNum = it.Event.Raw.BlockNumber
header, err := m.env.Chains[src].Client.HeaderByNumber(m.env.GetContext(), new(big.Int).SetUint64(blockNum))
_, err = m.env.Chains[src].Confirm(tx)
if err != nil {
return &wasp.Response{Error: "Could not get timestamp of block number", Group: waspGroup, Failed: true}
}
m.metricPipe <- messageData{
eventType: transmitted,
srcDstSeqNum: srcDstSeqNum{
src: src,
dst: m.chainSelector,
seqNum: it.Event.SequenceNumber,
},
timestamp: header.Time,
round: int(requestedRound),
}

// always store the lowest seen number as the start seq num
if it.Event.SequenceNumber < m.seqNums[csPair].Start.Load() {
m.seqNums[csPair].Start.Store(it.Event.SequenceNumber)
}

// always store the greatest sequence number we have seen as the maximum
if it.Event.SequenceNumber > m.seqNums[csPair].End.Load() {
m.seqNums[csPair].End.Store(it.Event.SequenceNumber)
m.l.Errorw("could not confirm tx on source", "tx", tx, "err", deployment.MaybeDataErr(err))
return &wasp.Response{Error: err.Error(), Group: waspGroup, Failed: true}
}

return &wasp.Response{Failed: false, Group: waspGroup}
Expand Down
Loading
Loading