From 1a4ed4adfe79a9cc99efb97334efe91fca6aad35 Mon Sep 17 00:00:00 2001 From: 0xAustinWang Date: Thu, 13 Feb 2025 23:21:54 +0800 Subject: [PATCH 01/10] parallelize operations in the deployer group --- deployment/ccip/changeset/deployer_group.go | 1 + 1 file changed, 1 insertion(+) diff --git a/deployment/ccip/changeset/deployer_group.go b/deployment/ccip/changeset/deployer_group.go index 6f3732a760b..53427ad65a2 100644 --- a/deployment/ccip/changeset/deployer_group.go +++ b/deployment/ccip/changeset/deployer_group.go @@ -3,6 +3,7 @@ package changeset import ( "context" "fmt" + "golang.org/x/sync/errgroup" "math/big" "slices" "time" From 51b5a83fbce2dc2f88a9186f3903acb2960395dd Mon Sep 17 00:00:00 2001 From: 0xAustinWang Date: Wed, 19 Feb 2025 14:52:08 +0800 Subject: [PATCH 02/10] enable replacing ocr only --- deployment/ccip/changeset/deployer_group.go | 1 - deployment/environment/crib/ccip_deployer.go | 40 +++++++++++++++----- integration-tests/testconfig/ccip/ccip.toml | 7 ++-- 3 files changed, 34 insertions(+), 14 deletions(-) diff --git a/deployment/ccip/changeset/deployer_group.go b/deployment/ccip/changeset/deployer_group.go index 53427ad65a2..6f3732a760b 100644 --- a/deployment/ccip/changeset/deployer_group.go +++ b/deployment/ccip/changeset/deployer_group.go @@ -3,7 +3,6 @@ package changeset import ( "context" "fmt" - "golang.org/x/sync/errgroup" "math/big" "slices" "time" diff --git a/deployment/environment/crib/ccip_deployer.go b/deployment/environment/crib/ccip_deployer.go index ee3e5fe6dd1..e2c432bddd1 100644 --- a/deployment/environment/crib/ccip_deployer.go +++ b/deployment/environment/crib/ccip_deployer.go @@ -124,7 +124,7 @@ func DeployCCIPAndAddLanes(ctx context.Context, lggr logger.Logger, envConfig de // ----- Part 2 ----- lggr.Infow("setting up ocr...") - *e, err = setupOCR(e, homeChainSel, feedChainSel) + *e, err = mustOCR(e, homeChainSel, feedChainSel, true) if err != nil { return DeployCCIPOutput{}, fmt.Errorf("failed to apply changesets for setting up OCR: %w", err) } @@ -204,8 +204,7 @@ func ConnectCCIPLanes(ctx context.Context, lggr logger.Logger, envConfig devenv. }, nil } -// ConfigureCCIPOCR is a group of changesets used from CRIB to configure OCR on a new setup -// This sets up OCR on all chains in the envConfig by configuring the CCIP home chain +// ConfigureCCIPOCR is a group of changesets used from CRIB to redeploy the chainlink don on an existing setup func ConfigureCCIPOCR(ctx context.Context, lggr logger.Logger, envConfig devenv.EnvironmentConfig, homeChainSel, feedChainSel uint64, ab deployment.AddressBook) (DeployCCIPOutput, error) { e, _, err := devenv.NewEnvironment(func() context.Context { return ctx }, lggr, envConfig) if err != nil { @@ -213,8 +212,8 @@ func ConfigureCCIPOCR(ctx context.Context, lggr logger.Logger, envConfig devenv. } e.ExistingAddresses = ab - lggr.Infow("setting up ocr...") - *e, err = setupOCR(e, homeChainSel, feedChainSel) + lggr.Infow("resetting ocr...") + *e, err = mustOCR(e, homeChainSel, feedChainSel, false) if err != nil { return DeployCCIPOutput{}, fmt.Errorf("failed to apply changesets for setting up OCR: %w", err) } @@ -503,7 +502,7 @@ func setupLanes(e *deployment.Environment, state changeset.CCIPOnChainState) (de ) } -func setupOCR(e *deployment.Environment, homeChainSel uint64, feedChainSel uint64) (deployment.Environment, error) { +func mustOCR(e *deployment.Environment, homeChainSel uint64, feedChainSel uint64, newDons bool) (deployment.Environment, error) { chainSelectors := e.AllChainSelectors() var ocrConfigPerSelector = make(map[uint64]changeset.CCIPOCRParams) for selector := range e.Chains { @@ -511,8 +510,10 @@ func setupOCR(e *deployment.Environment, homeChainSel uint64, feedChainSel uint6 changeset.WithDefaultExecuteOffChainConfig(nil), ) } - return commonchangeset.Apply(nil, *e, nil, - commonchangeset.Configure( + + var commitChangeset commonchangeset.ConfiguredChangeSet + if newDons { + commitChangeset = commonchangeset.Configure( // Add the DONs and candidate commit OCR instances for the chain deployment.CreateLegacyChangeSet(changeset.AddDonAndSetCandidateChangeset), changeset.AddDonAndSetCandidateChangesetConfig{ @@ -525,7 +526,28 @@ func setupOCR(e *deployment.Environment, homeChainSel uint64, feedChainSel uint6 PluginType: types.PluginTypeCCIPCommit, }, }, - ), + ) + } else { + commitChangeset = commonchangeset.Configure( + // Add the exec OCR instances for the new chains + deployment.CreateLegacyChangeSet(changeset.SetCandidateChangeset), + changeset.SetCandidateChangesetConfig{ + SetCandidateConfigBase: changeset.SetCandidateConfigBase{ + HomeChainSelector: homeChainSel, + FeedChainSelector: feedChainSel, + }, + PluginInfo: []changeset.SetCandidatePluginInfo{ + { + OCRConfigPerRemoteChainSelector: ocrConfigPerSelector, + PluginType: types.PluginTypeCCIPCommit, + }, + }, + }, + ) + } + + return commonchangeset.Apply(nil, *e, nil, + commitChangeset, commonchangeset.Configure( // Add the exec OCR instances for the new chains deployment.CreateLegacyChangeSet(changeset.SetCandidateChangeset), diff --git a/integration-tests/testconfig/ccip/ccip.toml b/integration-tests/testconfig/ccip/ccip.toml index e8ca343bbab..9d98f796900 100644 --- a/integration-tests/testconfig/ccip/ccip.toml +++ b/integration-tests/testconfig/ccip/ccip.toml @@ -243,13 +243,12 @@ ephemeral_addresses_number = 0 [Load.CCIP.Load] # MessageTypeWeights corresponds with [data only, token only, message with token] -#MessageTypeWeights = [100,0,0] MessageTypeWeights = [100,0,0] # each destination chain will receive 1 incoming request per RequestFrequency for the duration of LoadDuration RequestFrequency = "5s" -LoadDuration = "1h" +LoadDuration = "10m" # destination chain selectors to send messages to -NumDestinationChains = 30 +NumDestinationChains = 8 # Directory where we receive environment configuration from crib CribEnvDirectory = "../../../../crib/deployments/ccip-v2/.tmp" -TimeoutDuration = "1h" \ No newline at end of file +TimeoutDuration = "20m" \ No newline at end of file From 154e8c2cd952786cf4ce3011743c9c45aafecb6f Mon Sep 17 00:00:00 2001 From: 0xAustinWang Date: Wed, 19 Feb 2025 17:34:01 +0800 Subject: [PATCH 03/10] update metrics manager --- .../load/ccip/destination_gun.go | 57 +++++++------ integration-tests/load/ccip/helpers.go | 84 +++++++++++++++++-- integration-tests/load/ccip/metrics.go | 9 ++ integration-tests/testconfig/ccip/ccip.toml | 2 +- integration-tests/testconfig/ccip/load.go | 1 + 5 files changed, 121 insertions(+), 32 deletions(-) diff --git a/integration-tests/load/ccip/destination_gun.go b/integration-tests/load/ccip/destination_gun.go index bbf86ef8fe5..0fa0436d4c1 100644 --- a/integration-tests/load/ccip/destination_gun.go +++ b/integration-tests/load/ccip/destination_gun.go @@ -241,6 +241,15 @@ func (m *DestinationGun) GetMessage(src uint64) (router.ClientEVM2AnyMessage, er m.l.Error("Error encoding receiver address") return router.ClientEVM2AnyMessage{}, err } + if (*m.testConfig.MessageTypeWeights)[0] == 100 { + return router.ClientEVM2AnyMessage{ + Receiver: rcv, + Data: common.Hex2Bytes("0xabcdefabcdef"), + TokenAmounts: nil, + FeeToken: common.HexToAddress("0x0"), + ExtraArgs: nil, + }, nil + } messages := []router.ClientEVM2AnyMessage{ { @@ -250,30 +259,30 @@ func (m *DestinationGun) GetMessage(src uint64) (router.ClientEVM2AnyMessage, er FeeToken: common.HexToAddress("0x0"), ExtraArgs: nil, }, - { - Receiver: rcv, - TokenAmounts: []router.ClientEVMTokenAmount{ - { - Token: m.state.Chains[src].LinkToken.Address(), - Amount: big.NewInt(1), - }, - }, - Data: common.Hex2Bytes("0xabcdefabcdef"), - FeeToken: common.HexToAddress("0x0"), - ExtraArgs: nil, - }, - { - Receiver: rcv, - Data: common.Hex2Bytes("message with token"), - TokenAmounts: []router.ClientEVMTokenAmount{ - { - Token: m.state.Chains[src].LinkToken.Address(), - Amount: big.NewInt(1), - }, - }, - FeeToken: common.HexToAddress("0x0"), - ExtraArgs: nil, - }, + //{ + // Receiver: rcv, + // TokenAmounts: []router.ClientEVMTokenAmount{ + // { + // Token: m.state.Chains[src].LinkToken.Address(), + // Amount: big.NewInt(1), + // }, + // }, + // Data: common.Hex2Bytes("0xabcdefabcdef"), + // FeeToken: common.HexToAddress("0x0"), + // ExtraArgs: nil, + //}, + //{ + // Receiver: rcv, + // Data: common.Hex2Bytes("message with token"), + // TokenAmounts: []router.ClientEVMTokenAmount{ + // { + // Token: m.state.Chains[src].LinkToken.Address(), + // Amount: big.NewInt(1), + // }, + // }, + // FeeToken: common.HexToAddress("0x0"), + // ExtraArgs: nil, + //}, } // Select a random message randomValue := rand.Intn(100) diff --git a/integration-tests/load/ccip/helpers.go b/integration-tests/load/ccip/helpers.go index 0a217f9e87b..aadaf50f80a 100644 --- a/integration-tests/load/ccip/helpers.go +++ b/integration-tests/load/ccip/helpers.go @@ -4,6 +4,7 @@ import ( "context" "errors" "fmt" + "github.com/smartcontractkit/chainlink/v2/core/gethwrappers/ccip/generated/v1_6_0/onramp" "math" "slices" "sync" @@ -43,18 +44,87 @@ var ( fundingAmount = new(big.Int).Mul(deployment.UBigInt(100), deployment.UBigInt(1e18)) // 100 eth ) -// todo: Have a different struct for commit/exec? -type LokiMetric struct { - SequenceNumber uint64 `json:"sequence_number"` - CommitDuration uint64 `json:"commit_duration"` - ExecDuration uint64 `json:"exec_duration"` -} - type finalSeqNrReport struct { sourceChainSelector uint64 expectedSeqNrRange ccipocr3.SeqNumRange } +func subscribeTransmitEvents( + ctx context.Context, + lggr logger.Logger, + onRamp onramp.OnRampInterface, + startBlock *uint64, + srcChainSel uint64, + loadFinished chan struct{}, + client deployment.OnchainClient, + errChan chan error, + wg *sync.WaitGroup, + metricPipe chan messageData, +) { + defer wg.Done() + lggr.Infow("starting transmit event subscriber for ", + "srcChain", srcChainSel, + "startblock", startBlock, + ) + + sink := make(chan *onramp.OnRampCCIPMessageSent) + subscription := event.Resubscribe(SubscriptionTimeout, func(_ context.Context) (event.Subscription, error) { + return onRamp.WatchCCIPMessageSent(&bind.WatchOpts{ + Context: ctx, + Start: startBlock, + }, sink, nil, nil) + }) + defer subscription.Unsubscribe() + + endChannel := make(chan struct{}) + + for { + select { + case subErr := <-subscription.Err(): + errChan <- subErr + return + case event := <-sink: + lggr.Debugw("received transmit event for", + "srcChain", srcChainSel, + "destChain", event.DestChainSelector, + "sequenceNumber", event.SequenceNumber) + + blockNum := event.Raw.BlockNumber + header, err := client.HeaderByNumber(ctx, new(big.Int).SetUint64(blockNum)) + if err != nil { + errChan <- err + } + data := messageData{ + eventType: transmitted, + srcDstSeqNum: srcDstSeqNum{ + src: srcChainSel, + dst: event.DestChainSelector, + seqNum: event.SequenceNumber, + }, + timestamp: header.Time, + } + metricPipe <- data + case <-ctx.Done(): + lggr.Errorw("received context cancel signal for transmit watcher", + "srcChain", srcChainSel) + errChan <- errors.New("timed out waiting for transmits") + return + case <-loadFinished: + lggr.Infow("load finished, waiting before stopping transmit watcher", + "srcChain", srcChainSel) + go func() { + time.Sleep(tickerDuration) + close(endChannel) + }() + case <-endChannel: + lggr.Infow("stopping transmit watcher", + "srcChain", srcChainSel) + return + } + + } +} + func subscribeCommitEvents( ctx context.Context, lggr logger.Logger, diff --git a/integration-tests/load/ccip/metrics.go b/integration-tests/load/ccip/metrics.go index 8c64747a10e..bdf543c2960 100644 --- a/integration-tests/load/ccip/metrics.go +++ b/integration-tests/load/ccip/metrics.go @@ -19,6 +19,13 @@ const ( ErrLokiPush = "failed to push metrics to Loki" ) +type LokiMetric struct { + TransmitTime uint64 `json:"transmit_time"` + SequenceNumber uint64 `json:"sequence_number"` + CommitDuration uint64 `json:"commit_duration"` + ExecDuration uint64 `json:"exec_duration"` +} + // MetricsManager is used for maintaining state of different sequence numbers // Once we've received all expected timestamps, it pushes the metrics to Loki type MetricManager struct { @@ -85,6 +92,7 @@ func (mm *MetricManager) Start(ctx context.Context) { // don't return here, we still want to push metrics to loki } SendMetricsToLoki(mm.lggr, mm.loki, lokiLabels, &LokiMetric{ + TransmitTime: timestamps[transmitted], ExecDuration: execDuration, CommitDuration: commitDuration, SequenceNumber: srcDstSeqNum.seqNum, @@ -116,6 +124,7 @@ func (mm *MetricManager) Start(ctx context.Context) { mm.lggr.Error("error setting loki labels", "error", err) } SendMetricsToLoki(mm.lggr, mm.loki, lokiLabels, &LokiMetric{ + TransmitTime: state.timestamps[transmitted], ExecDuration: state.timestamps[executed] - state.timestamps[committed], CommitDuration: state.timestamps[committed] - state.timestamps[transmitted], SequenceNumber: data.seqNum, diff --git a/integration-tests/testconfig/ccip/ccip.toml b/integration-tests/testconfig/ccip/ccip.toml index 9d98f796900..d1c42700973 100644 --- a/integration-tests/testconfig/ccip/ccip.toml +++ b/integration-tests/testconfig/ccip/ccip.toml @@ -246,7 +246,7 @@ ephemeral_addresses_number = 0 MessageTypeWeights = [100,0,0] # each destination chain will receive 1 incoming request per RequestFrequency for the duration of LoadDuration RequestFrequency = "5s" -LoadDuration = "10m" +LoadDuration = "1h" # destination chain selectors to send messages to NumDestinationChains = 8 # Directory where we receive environment configuration from crib diff --git a/integration-tests/testconfig/ccip/load.go b/integration-tests/testconfig/ccip/load.go index ddea267e8f0..5378ecf1412 100644 --- a/integration-tests/testconfig/ccip/load.go +++ b/integration-tests/testconfig/ccip/load.go @@ -16,6 +16,7 @@ type LoadConfig struct { CribEnvDirectory *string NumDestinationChains *int TimeoutDuration *string + TestLabel *string } func (l *LoadConfig) Validate(t *testing.T, e *deployment.Environment) { From 7fa87453a8f0651f1aaafd1cc822b664c58ec02b Mon Sep 17 00:00:00 2001 From: 0xAustinWang Date: Thu, 20 Feb 2025 21:29:54 +0800 Subject: [PATCH 04/10] metric changes wip --- integration-tests/load/ccip/ccip_test.go | 41 +++++------ .../load/ccip/destination_gun.go | 71 +------------------ integration-tests/load/ccip/helpers.go | 52 +++++++++++++- 3 files changed, 69 insertions(+), 95 deletions(-) diff --git a/integration-tests/load/ccip/ccip_test.go b/integration-tests/load/ccip/ccip_test.go index f23409334f4..10491780348 100644 --- a/integration-tests/load/ccip/ccip_test.go +++ b/integration-tests/load/ccip/ccip_test.go @@ -14,7 +14,6 @@ import ( "github.com/smartcontractkit/chainlink/deployment" - "github.com/smartcontractkit/chainlink-ccip/pkg/types/ccipocr3" "github.com/smartcontractkit/chainlink-common/pkg/logger" "github.com/smartcontractkit/chainlink-common/pkg/utils/tests" "github.com/smartcontractkit/chainlink-testing-framework/wasp" @@ -78,6 +77,7 @@ func TestCCIPLoad_RPS(t *testing.T) { defer close(errChan) finalSeqNrCommitChannels := make(map[uint64]chan finalSeqNrReport) finalSeqNrExecChannels := make(map[uint64]chan finalSeqNrReport) + loadFinished := make(chan struct{}) mm := NewMetricsManager(t, env.Logger) go mm.Start(ctx) @@ -137,7 +137,21 @@ func TestCCIPLoad_RPS(t *testing.T) { finalSeqNrCommitChannels[cs] = make(chan finalSeqNrReport) finalSeqNrExecChannels[cs] = make(chan finalSeqNrReport) - wg.Add(2) + wg.Add(3) + go subscribeTransmitEvents( + ctx, + lggr, + state.Chains[cs].OnRamp, + otherChains, + &block, + cs, + loadFinished, + env.Chains[cs].Client, + errChan, + &wg, + mm.InputChan, + finalSeqNrCommitChannels, + finalSeqNrExecChannels) go subscribeCommitEvents( ctx, lggr, @@ -190,28 +204,7 @@ func TestCCIPLoad_RPS(t *testing.T) { _, err = p.Run(true) require.NoError(t, err) - - for _, gun := range gunMap { - for csPair, seqNums := range gun.seqNums { - lggr.Debugw("pushing finalized sequence numbers for ", - "chainSelector", gun.chainSelector, - "sourceChainSelector", csPair.SourceChainSelector, - "seqNums", seqNums) - finalSeqNrCommitChannels[csPair.DestChainSelector] <- finalSeqNrReport{ - sourceChainSelector: csPair.SourceChainSelector, - expectedSeqNrRange: ccipocr3.SeqNumRange{ - ccipocr3.SeqNum(seqNums.Start.Load()), ccipocr3.SeqNum(seqNums.End.Load()), - }, - } - - finalSeqNrExecChannels[csPair.DestChainSelector] <- finalSeqNrReport{ - sourceChainSelector: csPair.SourceChainSelector, - expectedSeqNrRange: ccipocr3.SeqNumRange{ - ccipocr3.SeqNum(seqNums.Start.Load()), ccipocr3.SeqNum(seqNums.End.Load()), - }, - } - } - } + close(loadFinished) // after load is finished, wait for a "timeout duration" before considering that messages are timed out timeout := userOverrides.GetTimeoutDuration() diff --git a/integration-tests/load/ccip/destination_gun.go b/integration-tests/load/ccip/destination_gun.go index 0fa0436d4c1..cec7e8df5dc 100644 --- a/integration-tests/load/ccip/destination_gun.go +++ b/integration-tests/load/ccip/destination_gun.go @@ -4,12 +4,8 @@ import ( "context" "errors" "fmt" - "math" - "math/big" "math/rand" - "github.com/smartcontractkit/chainlink/deployment/ccip/changeset/testhelpers" - "github.com/ethereum/go-ethereum/accounts/abi/bind" "github.com/ethereum/go-ethereum/common" "go.uber.org/atomic" @@ -35,7 +31,6 @@ type DestinationGun struct { l logger.Logger env deployment.Environment state *ccipchangeset.CCIPOnChainState - seqNums map[testhelpers.SourceDestPair]SeqNumRange roundNum *atomic.Int32 chainSelector uint64 receiver common.Address @@ -56,21 +51,10 @@ func NewDestinationGun( chainOffset int, metricPipe chan messageData, ) (*DestinationGun, error) { - seqNums := make(map[testhelpers.SourceDestPair]SeqNumRange) - for _, cs := range env.AllChainSelectorsExcluding([]uint64{chainSelector}) { - seqNums[testhelpers.SourceDestPair{ - SourceChainSelector: cs, - DestChainSelector: chainSelector, - }] = SeqNumRange{ - Start: atomic.NewUint64(math.MaxUint64), - End: atomic.NewUint64(0), - } - } dg := DestinationGun{ l: l, env: env, state: state, - seqNums: seqNums, roundNum: &atomic.Int32{}, chainSelector: chainSelector, receiver: receiver, @@ -121,11 +105,6 @@ func (m *DestinationGun) Call(_ *wasp.Generator) *wasp.Response { acc := m.messageKeys[src] - csPair := testhelpers.SourceDestPair{ - SourceChainSelector: src, - DestChainSelector: m.chainSelector, - } - r := state.Chains[src].Router msg, err := m.GetMessage(src) @@ -165,60 +144,12 @@ func (m *DestinationGun) Call(_ *wasp.Generator) *wasp.Response { return &wasp.Response{Error: err.Error(), Group: waspGroup, Failed: true} } - blockNum, err := m.env.Chains[src].Confirm(tx) + _, err = m.env.Chains[src].Confirm(tx) if err != nil { m.l.Errorw("could not confirm tx on source", "tx", tx, "err", deployment.MaybeDataErr(err)) return &wasp.Response{Error: err.Error(), Group: waspGroup, Failed: true} } - // todo: wasp should not manage confirming the message - // instead, we should manage the sequence number atomically (at a higher level) - it, err := state.Chains[src].OnRamp.FilterCCIPMessageSent(&bind.FilterOpts{ - Start: blockNum, - End: &blockNum, - Context: context.Background(), - }, []uint64{m.chainSelector}, []uint64{}) - if err != nil { - m.l.Errorw("could not find sent message event on src chain", "src", src, "dst", m.chainSelector, "err", err) - return &wasp.Response{Error: err.Error(), Group: waspGroup, Failed: true} - } - if !it.Next() { - m.l.Errorw("Could not find event") - return &wasp.Response{Error: "Could not iterate", Group: waspGroup, Failed: true} - } - - m.l.Infow("Transmitted message with", - "sourceChain", src, - "destChain", m.chainSelector, - "sequence number", it.Event.SequenceNumber) - - // push metric to metric manager for eventual distribution to loki - blockNum = it.Event.Raw.BlockNumber - header, err := m.env.Chains[src].Client.HeaderByNumber(m.env.GetContext(), new(big.Int).SetUint64(blockNum)) - if err != nil { - return &wasp.Response{Error: "Could not get timestamp of block number", Group: waspGroup, Failed: true} - } - m.metricPipe <- messageData{ - eventType: transmitted, - srcDstSeqNum: srcDstSeqNum{ - src: src, - dst: m.chainSelector, - seqNum: it.Event.SequenceNumber, - }, - timestamp: header.Time, - round: int(requestedRound), - } - - // always store the lowest seen number as the start seq num - if it.Event.SequenceNumber < m.seqNums[csPair].Start.Load() { - m.seqNums[csPair].Start.Store(it.Event.SequenceNumber) - } - - // always store the greatest sequence number we have seen as the maximum - if it.Event.SequenceNumber > m.seqNums[csPair].End.Load() { - m.seqNums[csPair].End.Store(it.Event.SequenceNumber) - } - return &wasp.Response{Failed: false, Group: waspGroup} } diff --git a/integration-tests/load/ccip/helpers.go b/integration-tests/load/ccip/helpers.go index aadaf50f80a..7631e674b7f 100644 --- a/integration-tests/load/ccip/helpers.go +++ b/integration-tests/load/ccip/helpers.go @@ -4,7 +4,9 @@ import ( "context" "errors" "fmt" + "github.com/smartcontractkit/chainlink/deployment/ccip/changeset/testhelpers" "github.com/smartcontractkit/chainlink/v2/core/gethwrappers/ccip/generated/v1_6_0/onramp" + "go.uber.org/atomic" "math" "slices" "sync" @@ -53,6 +55,7 @@ func subscribeTransmitEvents( ctx context.Context, lggr logger.Logger, onRamp onramp.OnRampInterface, + otherChains []uint64, startBlock *uint64, srcChainSel uint64, loadFinished chan struct{}, @@ -60,6 +63,8 @@ func subscribeTransmitEvents( errChan chan error, wg *sync.WaitGroup, metricPipe chan messageData, + finalSeqNrCommitChannels map[uint64]chan finalSeqNrReport, + finalSeqNrExecChannels map[uint64]chan finalSeqNrReport, ) { defer wg.Done() lggr.Infow("starting transmit event subscriber for ", @@ -67,6 +72,17 @@ func subscribeTransmitEvents( "startblock", startBlock, ) + seqNums := make(map[testhelpers.SourceDestPair]SeqNumRange) + for _, cs := range otherChains { + seqNums[testhelpers.SourceDestPair{ + SourceChainSelector: srcChainSel, + DestChainSelector: cs, + }] = SeqNumRange{ + Start: atomic.NewUint64(math.MaxUint64), + End: atomic.NewUint64(0), + } + } + sink := make(chan *onramp.OnRampCCIPMessageSent) subscription := event.Resubscribe(SubscriptionTimeout, func(_ context.Context) (event.Subscription, error) { return onRamp.WatchCCIPMessageSent(&bind.WatchOpts{ @@ -104,6 +120,20 @@ func subscribeTransmitEvents( timestamp: header.Time, } metricPipe <- data + + csPair := testhelpers.SourceDestPair{ + SourceChainSelector: srcChainSel, + DestChainSelector: event.DestChainSelector, + } + // always store the lowest seen number as the start seq num + if event.SequenceNumber < seqNums[csPair].Start.Load() { + seqNums[csPair].Start.Store(event.SequenceNumber) + } + + // always store the greatest sequence number we have seen as the maximum + if event.SequenceNumber > seqNums[csPair].End.Load() { + seqNums[csPair].End.Store(event.SequenceNumber) + } case <-ctx.Done(): lggr.Errorw("received context cancel signal for transmit watcher", "srcChain", srcChainSel) @@ -117,8 +147,28 @@ func subscribeTransmitEvents( close(endChannel) }() case <-endChannel: - lggr.Infow("stopping transmit watcher", + lggr.Infow("sending finalized seqNums and stopping transmit watcher", "srcChain", srcChainSel) + + for csPair, seqNums := range seqNums { + lggr.Debugw("pushing finalized sequence numbers for ", + "srcChainSelector", srcChainSel, + "destChainSelector", csPair.DestChainSelector, + "seqNums", seqNums) + finalSeqNrCommitChannels[csPair.DestChainSelector] <- finalSeqNrReport{ + sourceChainSelector: csPair.SourceChainSelector, + expectedSeqNrRange: ccipocr3.SeqNumRange{ + ccipocr3.SeqNum(seqNums.Start.Load()), ccipocr3.SeqNum(seqNums.End.Load()), + }, + } + + finalSeqNrExecChannels[csPair.DestChainSelector] <- finalSeqNrReport{ + sourceChainSelector: csPair.SourceChainSelector, + expectedSeqNrRange: ccipocr3.SeqNumRange{ + ccipocr3.SeqNum(seqNums.Start.Load()), ccipocr3.SeqNum(seqNums.End.Load()), + }, + } + } return } From 040fbd017cc1c46200e146f65f8420fe25d67138 Mon Sep 17 00:00:00 2001 From: 0xAustinWang Date: Fri, 21 Feb 2025 13:33:11 +0800 Subject: [PATCH 05/10] allow load test to fire and forget --- integration-tests/load/ccip/ccip_test.go | 60 ++++++++++--------- .../load/ccip/destination_gun.go | 19 +++++- integration-tests/load/ccip/helpers.go | 27 +++++---- integration-tests/load/ccip/metrics.go | 19 ++++-- 4 files changed, 80 insertions(+), 45 deletions(-) diff --git a/integration-tests/load/ccip/ccip_test.go b/integration-tests/load/ccip/ccip_test.go index 10491780348..f15739684eb 100644 --- a/integration-tests/load/ccip/ccip_test.go +++ b/integration-tests/load/ccip/ccip_test.go @@ -35,13 +35,13 @@ var ( const simChainTestKey = "ac0974bec39a17e36ba4a6b4d238ff944bacb478cbed5efcae784d7bf4f2ff80" // step 1: setup -// Parse the test config, initialize CRIB with configurations defined +// Parse the test config // step 2: subscribe -// Create event subscribers on the offramp +// Create event subscribers in src and dest // step 3: load // Use wasp to initiate load // step 4: teardown -// Stop the chains, cleanup the environment +// wait for ccip to finish, push remaining data func TestCCIPLoad_RPS(t *testing.T) { // comment out when executing the test // t.Skip("Skipping test as this test should not be auto triggered") @@ -81,18 +81,38 @@ func TestCCIPLoad_RPS(t *testing.T) { mm := NewMetricsManager(t, env.Logger) go mm.Start(ctx) - defer mm.Stop() // gunMap holds a destinationGun for every enabled destination chain gunMap := make(map[uint64]*DestinationGun) p := wasp.NewProfile() - // Only create a destination gun if we have decided to send traffic to this chain - for ind := range *userOverrides.NumDestinationChains { - cs := env.AllChainSelectors()[ind] + + // potential source chains need a subscription + for _, cs := range env.AllChainSelectors() { latesthdr, err := env.Chains[cs].Client.HeaderByNumber(ctx, nil) require.NoError(t, err) block := latesthdr.Number.Uint64() startBlocks[cs] = &block + other := env.AllChainSelectorsExcluding([]uint64{cs}) + wg.Add(1) + go subscribeTransmitEvents( + ctx, + lggr, + state.Chains[cs].OnRamp, + other, + startBlocks[cs], + cs, + loadFinished, + env.Chains[cs].Client, + errChan, + &wg, + mm.InputChan, + finalSeqNrCommitChannels, + finalSeqNrExecChannels) + } + + // confirmed dest chains need a subscription + for ind := range *userOverrides.NumDestinationChains { + cs := env.AllChainSelectors()[ind] messageKeys := make(map[uint64]*bind.TransactOpts) other := env.AllChainSelectorsExcluding([]uint64{cs}) @@ -133,31 +153,16 @@ func TestCCIPLoad_RPS(t *testing.T) { t.Fatal(err) } - otherChains := env.AllChainSelectorsExcluding([]uint64{cs}) finalSeqNrCommitChannels[cs] = make(chan finalSeqNrReport) finalSeqNrExecChannels[cs] = make(chan finalSeqNrReport) - wg.Add(3) - go subscribeTransmitEvents( - ctx, - lggr, - state.Chains[cs].OnRamp, - otherChains, - &block, - cs, - loadFinished, - env.Chains[cs].Client, - errChan, - &wg, - mm.InputChan, - finalSeqNrCommitChannels, - finalSeqNrExecChannels) + wg.Add(2) go subscribeCommitEvents( ctx, lggr, state.Chains[cs].OffRamp, - otherChains, - &block, + other, + startBlocks[cs], cs, env.Chains[cs].Client, finalSeqNrCommitChannels[cs], @@ -168,8 +173,8 @@ func TestCCIPLoad_RPS(t *testing.T) { ctx, lggr, state.Chains[cs].OffRamp, - otherChains, - &block, + other, + startBlocks[cs], cs, env.Chains[cs].Client, finalSeqNrExecChannels[cs], @@ -212,7 +217,6 @@ func TestCCIPLoad_RPS(t *testing.T) { testTimer := time.NewTimer(timeout) go func() { <-testTimer.C - mm.Stop() cancel() }() } diff --git a/integration-tests/load/ccip/destination_gun.go b/integration-tests/load/ccip/destination_gun.go index cec7e8df5dc..08408ac2b92 100644 --- a/integration-tests/load/ccip/destination_gun.go +++ b/integration-tests/load/ccip/destination_gun.go @@ -4,7 +4,9 @@ import ( "context" "errors" "fmt" + "math/big" "math/rand" + "time" "github.com/ethereum/go-ethereum/accounts/abi/bind" "github.com/ethereum/go-ethereum/common" @@ -122,8 +124,9 @@ func (m *DestinationGun) Call(_ *wasp.Generator) *wasp.Response { "err", deployment.MaybeDataErr(err)) return &wasp.Response{Error: err.Error(), Group: waspGroup, Failed: true} } + z := big.NewInt(0) if msg.FeeToken == common.HexToAddress("0x0") { - acc.Value = fee + acc.Value = z defer func() { acc.Value = nil }() } m.l.Debugw("sending message ", @@ -141,6 +144,20 @@ func (m *DestinationGun) Call(_ *wasp.Generator) *wasp.Response { "sourceChain", src, "destchain", m.chainSelector, "err", deployment.MaybeDataErr(err)) + + // in the event of an error, still push a metric + // sequence numbers start at 1 so using 0 as a sentinel value + data := messageData{ + eventType: transmitted, + srcDstSeqNum: srcDstSeqNum{ + src: src, + dst: m.chainSelector, + seqNum: 0, + }, + timestamp: uint64(time.Now().Unix()), + } + m.metricPipe <- data + return &wasp.Response{Error: err.Error(), Group: waspGroup, Failed: true} } diff --git a/integration-tests/load/ccip/helpers.go b/integration-tests/load/ccip/helpers.go index 7631e674b7f..71cf59d05aa 100644 --- a/integration-tests/load/ccip/helpers.go +++ b/integration-tests/load/ccip/helpers.go @@ -38,7 +38,7 @@ const ( transmitted = iota committed executed - tickerDuration = 3 * time.Minute + tickerDuration = 30 * time.Second SubscriptionTimeout = 1 * time.Minute ) @@ -93,6 +93,21 @@ func subscribeTransmitEvents( defer subscription.Unsubscribe() endChannel := make(chan struct{}) + // wait for load to finish + timeout duration to allow any stragglers to + go func() { + for { + select { + case <-loadFinished: + lggr.Infow("load finished, waiting before stopping transmit watcher", + "srcChain", srcChainSel) + go func() { + time.Sleep(tickerDuration) + close(endChannel) + }() + return + } + } + }() for { select { @@ -139,17 +154,7 @@ func subscribeTransmitEvents( "srcChain", srcChainSel) errChan <- errors.New("timed out waiting for transmits") return - case <-loadFinished: - lggr.Infow("load finished, waiting before stopping transmit watcher", - "srcChain", srcChainSel) - go func() { - time.Sleep(tickerDuration) - close(endChannel) - }() case <-endChannel: - lggr.Infow("sending finalized seqNums and stopping transmit watcher", - "srcChain", srcChainSel) - for csPair, seqNums := range seqNums { lggr.Debugw("pushing finalized sequence numbers for ", "srcChainSelector", srcChainSel, diff --git a/integration-tests/load/ccip/metrics.go b/integration-tests/load/ccip/metrics.go index bdf543c2960..aa42af18ace 100644 --- a/integration-tests/load/ccip/metrics.go +++ b/integration-tests/load/ccip/metrics.go @@ -66,10 +66,6 @@ func NewMetricsManager(t *testing.T, l logger.Logger) *MetricManager { } } -func (mm *MetricManager) Stop() { - close(mm.InputChan) -} - func (mm *MetricManager) Start(ctx context.Context) { for { select { @@ -99,7 +95,6 @@ func (mm *MetricManager) Start(ctx context.Context) { }) } close(mm.InputChan) - mm.loki.Stop() return case data := <-mm.InputChan: if _, ok := mm.state[data.srcDstSeqNum]; !ok { @@ -108,6 +103,20 @@ func (mm *MetricManager) Start(ctx context.Context) { } } + if data.seqNum == 0 { + // seqNum of 0 indicates an error. Push nil values to loki + lokiLabels, err := setLokiLabels(data.src, data.dst, mm.state[data.srcDstSeqNum].round) + if err != nil { + mm.lggr.Error("error setting loki labels", "error", err) + } + SendMetricsToLoki(mm.lggr, mm.loki, lokiLabels, &LokiMetric{ + TransmitTime: data.timestamp, + ExecDuration: 0, + CommitDuration: 0, + SequenceNumber: 0, + }) + continue + } state := mm.state[data.srcDstSeqNum] state.timestamps[data.eventType] = data.timestamp if data.eventType == transmitted && data.round != -1 { From 9276b87cfcf1473b5edef966ef877b6c972e164a Mon Sep 17 00:00:00 2001 From: 0xAustinWang Date: Fri, 21 Feb 2025 13:56:32 +0800 Subject: [PATCH 06/10] cleanup unused channels, better channel closing --- integration-tests/load/ccip/ccip_test.go | 7 +---- .../load/ccip/destination_gun.go | 4 +-- integration-tests/load/ccip/helpers.go | 22 +++++---------- integration-tests/load/ccip/metrics.go | 27 +++++++++---------- 4 files changed, 21 insertions(+), 39 deletions(-) diff --git a/integration-tests/load/ccip/ccip_test.go b/integration-tests/load/ccip/ccip_test.go index f15739684eb..7c38f52d66e 100644 --- a/integration-tests/load/ccip/ccip_test.go +++ b/integration-tests/load/ccip/ccip_test.go @@ -73,13 +73,11 @@ func TestCCIPLoad_RPS(t *testing.T) { state, err := ccipchangeset.LoadOnchainState(*env) require.NoError(t, err) - errChan := make(chan error) - defer close(errChan) finalSeqNrCommitChannels := make(map[uint64]chan finalSeqNrReport) finalSeqNrExecChannels := make(map[uint64]chan finalSeqNrReport) loadFinished := make(chan struct{}) - mm := NewMetricsManager(t, env.Logger) + mm := NewMetricsManager(t, env.Logger, userOverrides) go mm.Start(ctx) // gunMap holds a destinationGun for every enabled destination chain @@ -103,7 +101,6 @@ func TestCCIPLoad_RPS(t *testing.T) { cs, loadFinished, env.Chains[cs].Client, - errChan, &wg, mm.InputChan, finalSeqNrCommitChannels, @@ -166,7 +163,6 @@ func TestCCIPLoad_RPS(t *testing.T) { cs, env.Chains[cs].Client, finalSeqNrCommitChannels[cs], - errChan, &wg, mm.InputChan) go subscribeExecutionEvents( @@ -178,7 +174,6 @@ func TestCCIPLoad_RPS(t *testing.T) { cs, env.Chains[cs].Client, finalSeqNrExecChannels[cs], - errChan, &wg, mm.InputChan) } diff --git a/integration-tests/load/ccip/destination_gun.go b/integration-tests/load/ccip/destination_gun.go index 08408ac2b92..0ac1803487d 100644 --- a/integration-tests/load/ccip/destination_gun.go +++ b/integration-tests/load/ccip/destination_gun.go @@ -4,7 +4,6 @@ import ( "context" "errors" "fmt" - "math/big" "math/rand" "time" @@ -124,9 +123,8 @@ func (m *DestinationGun) Call(_ *wasp.Generator) *wasp.Response { "err", deployment.MaybeDataErr(err)) return &wasp.Response{Error: err.Error(), Group: waspGroup, Failed: true} } - z := big.NewInt(0) if msg.FeeToken == common.HexToAddress("0x0") { - acc.Value = z + acc.Value = fee defer func() { acc.Value = nil }() } m.l.Debugw("sending message ", diff --git a/integration-tests/load/ccip/helpers.go b/integration-tests/load/ccip/helpers.go index 71cf59d05aa..17ea66a21d7 100644 --- a/integration-tests/load/ccip/helpers.go +++ b/integration-tests/load/ccip/helpers.go @@ -2,7 +2,6 @@ package ccip import ( "context" - "errors" "fmt" "github.com/smartcontractkit/chainlink/deployment/ccip/changeset/testhelpers" "github.com/smartcontractkit/chainlink/v2/core/gethwrappers/ccip/generated/v1_6_0/onramp" @@ -60,7 +59,6 @@ func subscribeTransmitEvents( srcChainSel uint64, loadFinished chan struct{}, client deployment.OnchainClient, - errChan chan error, wg *sync.WaitGroup, metricPipe chan messageData, finalSeqNrCommitChannels map[uint64]chan finalSeqNrReport, @@ -111,8 +109,7 @@ func subscribeTransmitEvents( for { select { - case subErr := <-subscription.Err(): - errChan <- subErr + case <-subscription.Err(): return case event := <-sink: lggr.Debugw("received transmit event for", @@ -123,7 +120,7 @@ func subscribeTransmitEvents( blockNum := event.Raw.BlockNumber header, err := client.HeaderByNumber(ctx, new(big.Int).SetUint64(blockNum)) if err != nil { - errChan <- err + lggr.Errorw("error getting header by number") } data := messageData{ eventType: transmitted, @@ -152,7 +149,6 @@ func subscribeTransmitEvents( case <-ctx.Done(): lggr.Errorw("received context cancel signal for transmit watcher", "srcChain", srcChainSel) - errChan <- errors.New("timed out waiting for transmits") return case <-endChannel: for csPair, seqNums := range seqNums { @@ -189,11 +185,11 @@ func subscribeCommitEvents( chainSelector uint64, client deployment.OnchainClient, finalSeqNrs chan finalSeqNrReport, - errChan chan error, wg *sync.WaitGroup, metricPipe chan messageData, ) { defer wg.Done() + defer close(finalSeqNrs) lggr.Infow("starting commit event subscriber for ", "destChain", chainSelector, @@ -221,8 +217,7 @@ func subscribeCommitEvents( for { select { - case subErr := <-subscription.Err(): - errChan <- subErr + case <-subscription.Err(): return case report := <-sink: if len(report.BlessedMerkleRoots)+len(report.UnblessedMerkleRoots) > 0 { @@ -238,7 +233,7 @@ func subscribeCommitEvents( blockNum := report.Raw.BlockNumber header, err := client.HeaderByNumber(ctx, new(big.Int).SetUint64(blockNum)) if err != nil { - errChan <- err + lggr.Errorw("error getting header by number") } data := messageData{ eventType: committed, @@ -259,7 +254,6 @@ func subscribeCommitEvents( "destChain", chainSelector, "sourceChains", srcChains, "expectedSeqNumbers", expectedRange) - errChan <- errors.New("timed out waiting for commit report") return case finalSeqNrUpdate, ok := <-finalSeqNrs: @@ -318,11 +312,11 @@ func subscribeExecutionEvents( chainSelector uint64, client deployment.OnchainClient, finalSeqNrs chan finalSeqNrReport, - errChan chan error, wg *sync.WaitGroup, metricPipe chan messageData, ) { defer wg.Done() + defer close(finalSeqNrs) lggr.Infow("starting execution event subscriber for ", "destChain", chainSelector, @@ -352,7 +346,6 @@ func subscribeExecutionEvents( case subErr := <-subscription.Err(): lggr.Errorw("error in execution subscription", "err", subErr) - errChan <- subErr return case event := <-sink: lggr.Debugw("received execution event for", @@ -364,7 +357,7 @@ func subscribeExecutionEvents( blockNum := event.Raw.BlockNumber header, err := client.HeaderByNumber(ctx, new(big.Int).SetUint64(blockNum)) if err != nil { - errChan <- err + lggr.Errorw("error getting header by number") } data := messageData{ eventType: executed, @@ -385,7 +378,6 @@ func subscribeExecutionEvents( "expectedSeqNumbers", expectedRange, "seenMessages", seenMessages, "completedSrcChains", completedSrcChains) - errChan <- errors.New("timed out waiting for execution event") return case finalSeqNrUpdate := <-finalSeqNrs: diff --git a/integration-tests/load/ccip/metrics.go b/integration-tests/load/ccip/metrics.go index aa42af18ace..30d623d49af 100644 --- a/integration-tests/load/ccip/metrics.go +++ b/integration-tests/load/ccip/metrics.go @@ -2,9 +2,8 @@ package ccip import ( "context" - "strconv" - chainselectors "github.com/smartcontractkit/chain-selectors" + "github.com/smartcontractkit/chainlink/integration-tests/testconfig/ccip" "github.com/stretchr/testify/require" "github.com/smartcontractkit/chainlink-common/pkg/logger" @@ -33,11 +32,11 @@ type MetricManager struct { loki *wasp.LokiClient InputChan chan messageData state map[srcDstSeqNum]metricState + overrides *ccip.LoadConfig } type metricState struct { timestamps [3]uint64 - round int } type srcDstSeqNum struct { @@ -50,10 +49,9 @@ type messageData struct { eventType int srcDstSeqNum timestamp uint64 - round int } -func NewMetricsManager(t *testing.T, l logger.Logger) *MetricManager { +func NewMetricsManager(t *testing.T, l logger.Logger, overrides *ccip.LoadConfig) *MetricManager { // initialize loki using endpoint from user defined env vars loki, err := wasp.NewLokiClient(wasp.NewEnvLokiConfig()) require.NoError(t, err) @@ -63,10 +61,12 @@ func NewMetricsManager(t *testing.T, l logger.Logger) *MetricManager { loki: loki, InputChan: make(chan messageData), state: make(map[srcDstSeqNum]metricState), + overrides: overrides, } } func (mm *MetricManager) Start(ctx context.Context) { + defer close(mm.InputChan) for { select { case <-ctx.Done(): @@ -82,7 +82,7 @@ func (mm *MetricManager) Start(ctx context.Context) { execDuration = timestamps[executed] - timestamps[committed] } - lokiLabels, err := setLokiLabels(srcDstSeqNum.src, srcDstSeqNum.dst, metricState.round) + lokiLabels, err := setLokiLabels(srcDstSeqNum.src, srcDstSeqNum.dst, mm.overrides) if err != nil { mm.lggr.Error("error setting loki labels", "error", err) // don't return here, we still want to push metrics to loki @@ -94,7 +94,6 @@ func (mm *MetricManager) Start(ctx context.Context) { SequenceNumber: srcDstSeqNum.seqNum, }) } - close(mm.InputChan) return case data := <-mm.InputChan: if _, ok := mm.state[data.srcDstSeqNum]; !ok { @@ -105,7 +104,7 @@ func (mm *MetricManager) Start(ctx context.Context) { if data.seqNum == 0 { // seqNum of 0 indicates an error. Push nil values to loki - lokiLabels, err := setLokiLabels(data.src, data.dst, mm.state[data.srcDstSeqNum].round) + lokiLabels, err := setLokiLabels(data.src, data.dst, mm.overrides) if err != nil { mm.lggr.Error("error setting loki labels", "error", err) } @@ -119,16 +118,14 @@ func (mm *MetricManager) Start(ctx context.Context) { } state := mm.state[data.srcDstSeqNum] state.timestamps[data.eventType] = data.timestamp - if data.eventType == transmitted && data.round != -1 { - state.round = data.round - } + mm.state[data.srcDstSeqNum] = state if data.eventType == executed { - mm.lggr.Infow("new state for received seqNum is ", "dst", data.dst, "seqNum", data.seqNum, "round", state.round, "timestamps", state.timestamps) + mm.lggr.Infow("new state for received seqNum is ", "dst", data.dst, "seqNum", data.seqNum, "timestamps", state.timestamps) } // we have all data needed to push to Loki if state.timestamps[transmitted] != 0 && state.timestamps[committed] != 0 && state.timestamps[executed] != 0 { - lokiLabels, err := setLokiLabels(data.src, data.dst, mm.state[data.srcDstSeqNum].round) + lokiLabels, err := setLokiLabels(data.src, data.dst, mm.overrides) if err != nil { mm.lggr.Error("error setting loki labels", "error", err) } @@ -151,7 +148,7 @@ func SendMetricsToLoki(l logger.Logger, lc *wasp.LokiClient, updatedLabels map[s } } -func setLokiLabels(src, dst uint64, round int) (map[string]string, error) { +func setLokiLabels(src, dst uint64, overrides *ccip.LoadConfig) (map[string]string, error) { srcChainID, err := chainselectors.GetChainIDFromSelector(src) if err != nil { return nil, err @@ -163,7 +160,7 @@ func setLokiLabels(src, dst uint64, round int) (map[string]string, error) { return map[string]string{ "sourceEvmChainId": srcChainID, "destEvmChainId": dstChainID, - "roundNum": strconv.Itoa(round), "testType": LokiLoadLabel, + "testLabel": *overrides.TestLabel, }, nil } From b8e1429c552379f8f34434ce72e62c4c662ebacf Mon Sep 17 00:00:00 2001 From: 0xAustinWang Date: Fri, 21 Feb 2025 14:02:41 +0800 Subject: [PATCH 07/10] revert temp change --- .../load/ccip/destination_gun.go | 58 ++++++++----------- 1 file changed, 25 insertions(+), 33 deletions(-) diff --git a/integration-tests/load/ccip/destination_gun.go b/integration-tests/load/ccip/destination_gun.go index 0ac1803487d..874abc5af55 100644 --- a/integration-tests/load/ccip/destination_gun.go +++ b/integration-tests/load/ccip/destination_gun.go @@ -4,6 +4,7 @@ import ( "context" "errors" "fmt" + "math/big" "math/rand" "time" @@ -187,15 +188,6 @@ func (m *DestinationGun) GetMessage(src uint64) (router.ClientEVM2AnyMessage, er m.l.Error("Error encoding receiver address") return router.ClientEVM2AnyMessage{}, err } - if (*m.testConfig.MessageTypeWeights)[0] == 100 { - return router.ClientEVM2AnyMessage{ - Receiver: rcv, - Data: common.Hex2Bytes("0xabcdefabcdef"), - TokenAmounts: nil, - FeeToken: common.HexToAddress("0x0"), - ExtraArgs: nil, - }, nil - } messages := []router.ClientEVM2AnyMessage{ { @@ -205,30 +197,30 @@ func (m *DestinationGun) GetMessage(src uint64) (router.ClientEVM2AnyMessage, er FeeToken: common.HexToAddress("0x0"), ExtraArgs: nil, }, - //{ - // Receiver: rcv, - // TokenAmounts: []router.ClientEVMTokenAmount{ - // { - // Token: m.state.Chains[src].LinkToken.Address(), - // Amount: big.NewInt(1), - // }, - // }, - // Data: common.Hex2Bytes("0xabcdefabcdef"), - // FeeToken: common.HexToAddress("0x0"), - // ExtraArgs: nil, - //}, - //{ - // Receiver: rcv, - // Data: common.Hex2Bytes("message with token"), - // TokenAmounts: []router.ClientEVMTokenAmount{ - // { - // Token: m.state.Chains[src].LinkToken.Address(), - // Amount: big.NewInt(1), - // }, - // }, - // FeeToken: common.HexToAddress("0x0"), - // ExtraArgs: nil, - //}, + { + Receiver: rcv, + TokenAmounts: []router.ClientEVMTokenAmount{ + { + Token: m.state.Chains[src].LinkToken.Address(), + Amount: big.NewInt(1), + }, + }, + Data: common.Hex2Bytes("0xabcdefabcdef"), + FeeToken: common.HexToAddress("0x0"), + ExtraArgs: nil, + }, + { + Receiver: rcv, + Data: common.Hex2Bytes("message with token"), + TokenAmounts: []router.ClientEVMTokenAmount{ + { + Token: m.state.Chains[src].LinkToken.Address(), + Amount: big.NewInt(1), + }, + }, + FeeToken: common.HexToAddress("0x0"), + ExtraArgs: nil, + }, } // Select a random message randomValue := rand.Intn(100) From 56e6c9b7d154fc0b0f712ed3bf7d297621e863ed Mon Sep 17 00:00:00 2001 From: 0xAustinWang Date: Fri, 21 Feb 2025 14:24:13 +0800 Subject: [PATCH 08/10] Add test label value --- integration-tests/load/ccip/helpers.go | 2 +- integration-tests/load/ccip/metrics.go | 18 +++++++++++------- 2 files changed, 12 insertions(+), 8 deletions(-) diff --git a/integration-tests/load/ccip/helpers.go b/integration-tests/load/ccip/helpers.go index 17ea66a21d7..082b1ec1d20 100644 --- a/integration-tests/load/ccip/helpers.go +++ b/integration-tests/load/ccip/helpers.go @@ -37,7 +37,7 @@ const ( transmitted = iota committed executed - tickerDuration = 30 * time.Second + tickerDuration = 3 * time.Minute SubscriptionTimeout = 1 * time.Minute ) diff --git a/integration-tests/load/ccip/metrics.go b/integration-tests/load/ccip/metrics.go index 30d623d49af..0f535a51fa2 100644 --- a/integration-tests/load/ccip/metrics.go +++ b/integration-tests/load/ccip/metrics.go @@ -32,7 +32,7 @@ type MetricManager struct { loki *wasp.LokiClient InputChan chan messageData state map[srcDstSeqNum]metricState - overrides *ccip.LoadConfig + testLabel string } type metricState struct { @@ -55,13 +55,17 @@ func NewMetricsManager(t *testing.T, l logger.Logger, overrides *ccip.LoadConfig // initialize loki using endpoint from user defined env vars loki, err := wasp.NewLokiClient(wasp.NewEnvLokiConfig()) require.NoError(t, err) + testLabel := "default" + if overrides.TestLabel != nil { + testLabel = *overrides.TestLabel + } return &MetricManager{ lggr: l, loki: loki, InputChan: make(chan messageData), state: make(map[srcDstSeqNum]metricState), - overrides: overrides, + testLabel: testLabel, } } @@ -82,7 +86,7 @@ func (mm *MetricManager) Start(ctx context.Context) { execDuration = timestamps[executed] - timestamps[committed] } - lokiLabels, err := setLokiLabels(srcDstSeqNum.src, srcDstSeqNum.dst, mm.overrides) + lokiLabels, err := setLokiLabels(srcDstSeqNum.src, srcDstSeqNum.dst, mm.testLabel) if err != nil { mm.lggr.Error("error setting loki labels", "error", err) // don't return here, we still want to push metrics to loki @@ -104,7 +108,7 @@ func (mm *MetricManager) Start(ctx context.Context) { if data.seqNum == 0 { // seqNum of 0 indicates an error. Push nil values to loki - lokiLabels, err := setLokiLabels(data.src, data.dst, mm.overrides) + lokiLabels, err := setLokiLabels(data.src, data.dst, mm.testLabel) if err != nil { mm.lggr.Error("error setting loki labels", "error", err) } @@ -125,7 +129,7 @@ func (mm *MetricManager) Start(ctx context.Context) { } // we have all data needed to push to Loki if state.timestamps[transmitted] != 0 && state.timestamps[committed] != 0 && state.timestamps[executed] != 0 { - lokiLabels, err := setLokiLabels(data.src, data.dst, mm.overrides) + lokiLabels, err := setLokiLabels(data.src, data.dst, mm.testLabel) if err != nil { mm.lggr.Error("error setting loki labels", "error", err) } @@ -148,7 +152,7 @@ func SendMetricsToLoki(l logger.Logger, lc *wasp.LokiClient, updatedLabels map[s } } -func setLokiLabels(src, dst uint64, overrides *ccip.LoadConfig) (map[string]string, error) { +func setLokiLabels(src, dst uint64, testLabel string) (map[string]string, error) { srcChainID, err := chainselectors.GetChainIDFromSelector(src) if err != nil { return nil, err @@ -161,6 +165,6 @@ func setLokiLabels(src, dst uint64, overrides *ccip.LoadConfig) (map[string]stri "sourceEvmChainId": srcChainID, "destEvmChainId": dstChainID, "testType": LokiLoadLabel, - "testLabel": *overrides.TestLabel, + "testLabel": testLabel, }, nil } From 60640a0d96a3f210c933eba4fa007b1edf0eacaf Mon Sep 17 00:00:00 2001 From: 0xAustinWang Date: Mon, 24 Feb 2025 12:59:07 +0800 Subject: [PATCH 09/10] update ocr values, respond to comments --- deployment/ccip/changeset/globals/config.go | 4 ++-- deployment/common/types/types.go | 4 ++-- integration-tests/load/ccip/ccip_test.go | 6 +++++- integration-tests/load/ccip/helpers.go | 23 ++++----------------- integration-tests/testconfig/ccip/ccip.toml | 4 ++-- 5 files changed, 15 insertions(+), 26 deletions(-) diff --git a/deployment/ccip/changeset/globals/config.go b/deployment/ccip/changeset/globals/config.go index cc74a3059d4..c93fdc2a97b 100644 --- a/deployment/ccip/changeset/globals/config.go +++ b/deployment/ccip/changeset/globals/config.go @@ -18,14 +18,14 @@ const ( InflightCacheExpiry = 10 * time.Minute RootSnoozeTime = 30 * time.Minute BatchingStrategyID = 0 - DeltaProgress = 30 * time.Second + DeltaProgress = 10 * time.Second DeltaResend = 10 * time.Second DeltaInitial = 20 * time.Second DeltaRound = 2 * time.Second DeltaGrace = 2 * time.Second DeltaCertifiedCommitRequest = 10 * time.Second DeltaStage = 10 * time.Second - Rmax = 3 + Rmax = 50 MaxDurationQuery = 500 * time.Millisecond MaxDurationObservation = 5 * time.Second MaxDurationShouldAcceptAttestedReport = 10 * time.Second diff --git a/deployment/common/types/types.go b/deployment/common/types/types.go index c82559276e7..cdda003b3f0 100644 --- a/deployment/common/types/types.go +++ b/deployment/common/types/types.go @@ -102,8 +102,8 @@ func (params OCRParameters) Validate() error { if params.DeltaCertifiedCommitRequest <= 0 { return errors.New("deltaCertifiedCommitRequest must be positive") } - if params.DeltaStage <= 0 { - return errors.New("deltaStage must be positive") + if params.DeltaStage < 0 { + return errors.New("deltaStage must be positive or 0 for disabled") } if params.Rmax <= 0 { return errors.New("rmax must be positive") diff --git a/integration-tests/load/ccip/ccip_test.go b/integration-tests/load/ccip/ccip_test.go index 7c38f52d66e..c99a17ff00d 100644 --- a/integration-tests/load/ccip/ccip_test.go +++ b/integration-tests/load/ccip/ccip_test.go @@ -204,7 +204,11 @@ func TestCCIPLoad_RPS(t *testing.T) { _, err = p.Run(true) require.NoError(t, err) - close(loadFinished) + // wait some duration so that transmits can happen + go func() { + time.Sleep(tickerDuration) + close(loadFinished) + }() // after load is finished, wait for a "timeout duration" before considering that messages are timed out timeout := userOverrides.GetTimeoutDuration() diff --git a/integration-tests/load/ccip/helpers.go b/integration-tests/load/ccip/helpers.go index 082b1ec1d20..552d6cc2e6d 100644 --- a/integration-tests/load/ccip/helpers.go +++ b/integration-tests/load/ccip/helpers.go @@ -76,6 +76,7 @@ func subscribeTransmitEvents( SourceChainSelector: srcChainSel, DestChainSelector: cs, }] = SeqNumRange{ + // we use the maxuint as a sentinel value here to ensure we always get the lowest possible seqnum Start: atomic.NewUint64(math.MaxUint64), End: atomic.NewUint64(0), } @@ -90,23 +91,6 @@ func subscribeTransmitEvents( }) defer subscription.Unsubscribe() - endChannel := make(chan struct{}) - // wait for load to finish + timeout duration to allow any stragglers to - go func() { - for { - select { - case <-loadFinished: - lggr.Infow("load finished, waiting before stopping transmit watcher", - "srcChain", srcChainSel) - go func() { - time.Sleep(tickerDuration) - close(endChannel) - }() - return - } - } - }() - for { select { case <-subscription.Err(): @@ -150,9 +134,10 @@ func subscribeTransmitEvents( lggr.Errorw("received context cancel signal for transmit watcher", "srcChain", srcChainSel) return - case <-endChannel: + case <-loadFinished: + lggr.Debugw("load finished, closing transmit watchers", "srcChainSel", srcChainSel) for csPair, seqNums := range seqNums { - lggr.Debugw("pushing finalized sequence numbers for ", + lggr.Infow("pushing finalized sequence numbers for ", "srcChainSelector", srcChainSel, "destChainSelector", csPair.DestChainSelector, "seqNums", seqNums) diff --git a/integration-tests/testconfig/ccip/ccip.toml b/integration-tests/testconfig/ccip/ccip.toml index 9d98f796900..32e21beaff1 100644 --- a/integration-tests/testconfig/ccip/ccip.toml +++ b/integration-tests/testconfig/ccip/ccip.toml @@ -246,9 +246,9 @@ ephemeral_addresses_number = 0 MessageTypeWeights = [100,0,0] # each destination chain will receive 1 incoming request per RequestFrequency for the duration of LoadDuration RequestFrequency = "5s" -LoadDuration = "10m" +LoadDuration = "5h" # destination chain selectors to send messages to NumDestinationChains = 8 # Directory where we receive environment configuration from crib CribEnvDirectory = "../../../../crib/deployments/ccip-v2/.tmp" -TimeoutDuration = "20m" \ No newline at end of file +TimeoutDuration = "30m" \ No newline at end of file From 380782a0af1bf8b4ade95c458691b44151694349 Mon Sep 17 00:00:00 2001 From: 0xAustinWang Date: Mon, 24 Feb 2025 13:03:25 +0800 Subject: [PATCH 10/10] goimports --- integration-tests/load/ccip/helpers.go | 8 +++++--- integration-tests/load/ccip/metrics.go | 4 +++- 2 files changed, 8 insertions(+), 4 deletions(-) diff --git a/integration-tests/load/ccip/helpers.go b/integration-tests/load/ccip/helpers.go index 552d6cc2e6d..07ab5543fb9 100644 --- a/integration-tests/load/ccip/helpers.go +++ b/integration-tests/load/ccip/helpers.go @@ -3,14 +3,16 @@ package ccip import ( "context" "fmt" - "github.com/smartcontractkit/chainlink/deployment/ccip/changeset/testhelpers" - "github.com/smartcontractkit/chainlink/v2/core/gethwrappers/ccip/generated/v1_6_0/onramp" - "go.uber.org/atomic" "math" "slices" "sync" "time" + "go.uber.org/atomic" + + "github.com/smartcontractkit/chainlink/deployment/ccip/changeset/testhelpers" + "github.com/smartcontractkit/chainlink/v2/core/gethwrappers/ccip/generated/v1_6_0/onramp" + "github.com/ethereum/go-ethereum/event" "golang.org/x/sync/errgroup" diff --git a/integration-tests/load/ccip/metrics.go b/integration-tests/load/ccip/metrics.go index 0f535a51fa2..d18248e5def 100644 --- a/integration-tests/load/ccip/metrics.go +++ b/integration-tests/load/ccip/metrics.go @@ -2,10 +2,12 @@ package ccip import ( "context" + chainselectors "github.com/smartcontractkit/chain-selectors" - "github.com/smartcontractkit/chainlink/integration-tests/testconfig/ccip" "github.com/stretchr/testify/require" + "github.com/smartcontractkit/chainlink/integration-tests/testconfig/ccip" + "github.com/smartcontractkit/chainlink-common/pkg/logger" "github.com/smartcontractkit/chainlink-testing-framework/wasp"