diff --git a/deployment/ccip/changeset/globals/config.go b/deployment/ccip/changeset/globals/config.go index cc74a3059d4..c93fdc2a97b 100644 --- a/deployment/ccip/changeset/globals/config.go +++ b/deployment/ccip/changeset/globals/config.go @@ -18,14 +18,14 @@ const ( InflightCacheExpiry = 10 * time.Minute RootSnoozeTime = 30 * time.Minute BatchingStrategyID = 0 - DeltaProgress = 30 * time.Second + DeltaProgress = 10 * time.Second DeltaResend = 10 * time.Second DeltaInitial = 20 * time.Second DeltaRound = 2 * time.Second DeltaGrace = 2 * time.Second DeltaCertifiedCommitRequest = 10 * time.Second DeltaStage = 10 * time.Second - Rmax = 3 + Rmax = 50 MaxDurationQuery = 500 * time.Millisecond MaxDurationObservation = 5 * time.Second MaxDurationShouldAcceptAttestedReport = 10 * time.Second diff --git a/deployment/common/types/types.go b/deployment/common/types/types.go index c82559276e7..cdda003b3f0 100644 --- a/deployment/common/types/types.go +++ b/deployment/common/types/types.go @@ -102,8 +102,8 @@ func (params OCRParameters) Validate() error { if params.DeltaCertifiedCommitRequest <= 0 { return errors.New("deltaCertifiedCommitRequest must be positive") } - if params.DeltaStage <= 0 { - return errors.New("deltaStage must be positive") + if params.DeltaStage < 0 { + return errors.New("deltaStage must be positive or 0 for disabled") } if params.Rmax <= 0 { return errors.New("rmax must be positive") diff --git a/integration-tests/load/ccip/ccip_test.go b/integration-tests/load/ccip/ccip_test.go index a486742550e..0dbf1490324 100644 --- a/integration-tests/load/ccip/ccip_test.go +++ b/integration-tests/load/ccip/ccip_test.go @@ -15,7 +15,6 @@ import ( "github.com/smartcontractkit/chainlink/deployment" ccipchangeset "github.com/smartcontractkit/chainlink/deployment/ccip/changeset" - "github.com/smartcontractkit/chainlink-ccip/pkg/types/ccipocr3" "github.com/smartcontractkit/chainlink-common/pkg/logger" "github.com/smartcontractkit/chainlink-common/pkg/utils/tests" "github.com/smartcontractkit/chainlink-testing-framework/wasp" @@ -37,13 +36,13 @@ var ( const simChainTestKey = "ac0974bec39a17e36ba4a6b4d238ff944bacb478cbed5efcae784d7bf4f2ff80" // step 1: setup -// Parse the test config, initialize CRIB with configurations defined +// Parse the test config // step 2: subscribe -// Create event subscribers on the offramp +// Create event subscribers in src and dest // step 3: load // Use wasp to initiate load // step 4: teardown -// Stop the chains, cleanup the environment +// wait for ccip to finish, push remaining data func TestCCIPLoad_RPS(t *testing.T) { // comment out when executing the test // t.Skip("Skipping test as this test should not be auto triggered") @@ -75,25 +74,43 @@ func TestCCIPLoad_RPS(t *testing.T) { state, err := ccipchangeset.LoadOnchainState(*env) require.NoError(t, err) - errChan := make(chan error) - defer close(errChan) finalSeqNrCommitChannels := make(map[uint64]chan finalSeqNrReport) finalSeqNrExecChannels := make(map[uint64]chan finalSeqNrReport) + loadFinished := make(chan struct{}) - mm := NewMetricsManager(t, env.Logger) + mm := NewMetricsManager(t, env.Logger, userOverrides) go mm.Start(ctx) - defer mm.Stop() // gunMap holds a destinationGun for every enabled destination chain gunMap := make(map[uint64]*DestinationGun) p := wasp.NewProfile() - // Only create a destination gun if we have decided to send traffic to this chain - for ind := range *userOverrides.NumDestinationChains { - cs := env.AllChainSelectors()[ind] + + // potential source chains need a subscription + for _, cs := range env.AllChainSelectors() { latesthdr, err := env.Chains[cs].Client.HeaderByNumber(ctx, nil) require.NoError(t, err) block := latesthdr.Number.Uint64() startBlocks[cs] = &block + other := env.AllChainSelectorsExcluding([]uint64{cs}) + wg.Add(1) + go subscribeTransmitEvents( + ctx, + lggr, + state.Chains[cs].OnRamp, + other, + startBlocks[cs], + cs, + loadFinished, + env.Chains[cs].Client, + &wg, + mm.InputChan, + finalSeqNrCommitChannels, + finalSeqNrExecChannels) + } + + // confirmed dest chains need a subscription + for ind := range *userOverrides.NumDestinationChains { + cs := env.AllChainSelectors()[ind] messageKeys := make(map[uint64]*bind.TransactOpts) other := env.AllChainSelectorsExcluding([]uint64{cs}) @@ -134,7 +151,6 @@ func TestCCIPLoad_RPS(t *testing.T) { t.Fatal(err) } - otherChains := env.AllChainSelectorsExcluding([]uint64{cs}) finalSeqNrCommitChannels[cs] = make(chan finalSeqNrReport) finalSeqNrExecChannels[cs] = make(chan finalSeqNrReport) @@ -143,24 +159,22 @@ func TestCCIPLoad_RPS(t *testing.T) { ctx, lggr, state.Chains[cs].OffRamp, - otherChains, - &block, + other, + startBlocks[cs], cs, env.Chains[cs].Client, finalSeqNrCommitChannels[cs], - errChan, &wg, mm.InputChan) go subscribeExecutionEvents( ctx, lggr, state.Chains[cs].OffRamp, - otherChains, - &block, + other, + startBlocks[cs], cs, env.Chains[cs].Client, finalSeqNrExecChannels[cs], - errChan, &wg, mm.InputChan) } @@ -191,28 +205,11 @@ func TestCCIPLoad_RPS(t *testing.T) { _, err = p.Run(true) require.NoError(t, err) - - for _, gun := range gunMap { - for csPair, seqNums := range gun.seqNums { - lggr.Debugw("pushing finalized sequence numbers for ", - "chainSelector", gun.chainSelector, - "sourceChainSelector", csPair.SourceChainSelector, - "seqNums", seqNums) - finalSeqNrCommitChannels[csPair.DestChainSelector] <- finalSeqNrReport{ - sourceChainSelector: csPair.SourceChainSelector, - expectedSeqNrRange: ccipocr3.SeqNumRange{ - ccipocr3.SeqNum(seqNums.Start.Load()), ccipocr3.SeqNum(seqNums.End.Load()), - }, - } - - finalSeqNrExecChannels[csPair.DestChainSelector] <- finalSeqNrReport{ - sourceChainSelector: csPair.SourceChainSelector, - expectedSeqNrRange: ccipocr3.SeqNumRange{ - ccipocr3.SeqNum(seqNums.Start.Load()), ccipocr3.SeqNum(seqNums.End.Load()), - }, - } - } - } + // wait some duration so that transmits can happen + go func() { + time.Sleep(tickerDuration) + close(loadFinished) + }() // after load is finished, wait for a "timeout duration" before considering that messages are timed out timeout := userOverrides.GetTimeoutDuration() @@ -220,7 +217,6 @@ func TestCCIPLoad_RPS(t *testing.T) { testTimer := time.NewTimer(timeout) go func() { <-testTimer.C - mm.Stop() cancel() }() } diff --git a/integration-tests/load/ccip/destination_gun.go b/integration-tests/load/ccip/destination_gun.go index df5224b88ec..095e6b68d38 100644 --- a/integration-tests/load/ccip/destination_gun.go +++ b/integration-tests/load/ccip/destination_gun.go @@ -4,12 +4,11 @@ import ( "context" "errors" "fmt" - "math" "math/big" "math/rand" + "time" ccipchangeset "github.com/smartcontractkit/chainlink/deployment/ccip/changeset" - "github.com/smartcontractkit/chainlink/deployment/ccip/changeset/testhelpers" "github.com/ethereum/go-ethereum/accounts/abi/bind" "github.com/ethereum/go-ethereum/common" @@ -35,7 +34,6 @@ type DestinationGun struct { l logger.Logger env deployment.Environment state *ccipchangeset.CCIPOnChainState - seqNums map[testhelpers.SourceDestPair]SeqNumRange roundNum *atomic.Int32 chainSelector uint64 receiver common.Address @@ -56,21 +54,10 @@ func NewDestinationGun( chainOffset int, metricPipe chan messageData, ) (*DestinationGun, error) { - seqNums := make(map[testhelpers.SourceDestPair]SeqNumRange) - for _, cs := range env.AllChainSelectorsExcluding([]uint64{chainSelector}) { - seqNums[testhelpers.SourceDestPair{ - SourceChainSelector: cs, - DestChainSelector: chainSelector, - }] = SeqNumRange{ - Start: atomic.NewUint64(math.MaxUint64), - End: atomic.NewUint64(0), - } - } dg := DestinationGun{ l: l, env: env, state: state, - seqNums: seqNums, roundNum: &atomic.Int32{}, chainSelector: chainSelector, receiver: receiver, @@ -121,11 +108,6 @@ func (m *DestinationGun) Call(_ *wasp.Generator) *wasp.Response { acc := m.messageKeys[src] - csPair := testhelpers.SourceDestPair{ - SourceChainSelector: src, - DestChainSelector: m.chainSelector, - } - r := state.Chains[src].Router msg, err := m.GetMessage(src) @@ -162,61 +144,27 @@ func (m *DestinationGun) Call(_ *wasp.Generator) *wasp.Response { "sourceChain", src, "destchain", m.chainSelector, "err", deployment.MaybeDataErr(err)) - return &wasp.Response{Error: err.Error(), Group: waspGroup, Failed: true} - } - blockNum, err := m.env.Chains[src].Confirm(tx) - if err != nil { - m.l.Errorw("could not confirm tx on source", "tx", tx, "err", deployment.MaybeDataErr(err)) - return &wasp.Response{Error: err.Error(), Group: waspGroup, Failed: true} - } + // in the event of an error, still push a metric + // sequence numbers start at 1 so using 0 as a sentinel value + data := messageData{ + eventType: transmitted, + srcDstSeqNum: srcDstSeqNum{ + src: src, + dst: m.chainSelector, + seqNum: 0, + }, + timestamp: uint64(time.Now().Unix()), + } + m.metricPipe <- data - // todo: wasp should not manage confirming the message - // instead, we should manage the sequence number atomically (at a higher level) - it, err := state.Chains[src].OnRamp.FilterCCIPMessageSent(&bind.FilterOpts{ - Start: blockNum, - End: &blockNum, - Context: context.Background(), - }, []uint64{m.chainSelector}, []uint64{}) - if err != nil { - m.l.Errorw("could not find sent message event on src chain", "src", src, "dst", m.chainSelector, "err", err) return &wasp.Response{Error: err.Error(), Group: waspGroup, Failed: true} } - if !it.Next() { - m.l.Errorw("Could not find event") - return &wasp.Response{Error: "Could not iterate", Group: waspGroup, Failed: true} - } - - m.l.Infow("Transmitted message with", - "sourceChain", src, - "destChain", m.chainSelector, - "sequence number", it.Event.SequenceNumber) - // push metric to metric manager for eventual distribution to loki - blockNum = it.Event.Raw.BlockNumber - header, err := m.env.Chains[src].Client.HeaderByNumber(m.env.GetContext(), new(big.Int).SetUint64(blockNum)) + _, err = m.env.Chains[src].Confirm(tx) if err != nil { - return &wasp.Response{Error: "Could not get timestamp of block number", Group: waspGroup, Failed: true} - } - m.metricPipe <- messageData{ - eventType: transmitted, - srcDstSeqNum: srcDstSeqNum{ - src: src, - dst: m.chainSelector, - seqNum: it.Event.SequenceNumber, - }, - timestamp: header.Time, - round: int(requestedRound), - } - - // always store the lowest seen number as the start seq num - if it.Event.SequenceNumber < m.seqNums[csPair].Start.Load() { - m.seqNums[csPair].Start.Store(it.Event.SequenceNumber) - } - - // always store the greatest sequence number we have seen as the maximum - if it.Event.SequenceNumber > m.seqNums[csPair].End.Load() { - m.seqNums[csPair].End.Store(it.Event.SequenceNumber) + m.l.Errorw("could not confirm tx on source", "tx", tx, "err", deployment.MaybeDataErr(err)) + return &wasp.Response{Error: err.Error(), Group: waspGroup, Failed: true} } return &wasp.Response{Failed: false, Group: waspGroup} diff --git a/integration-tests/load/ccip/helpers.go b/integration-tests/load/ccip/helpers.go index 0a217f9e87b..07ab5543fb9 100644 --- a/integration-tests/load/ccip/helpers.go +++ b/integration-tests/load/ccip/helpers.go @@ -2,13 +2,17 @@ package ccip import ( "context" - "errors" "fmt" "math" "slices" "sync" "time" + "go.uber.org/atomic" + + "github.com/smartcontractkit/chainlink/deployment/ccip/changeset/testhelpers" + "github.com/smartcontractkit/chainlink/v2/core/gethwrappers/ccip/generated/v1_6_0/onramp" + "github.com/ethereum/go-ethereum/event" "golang.org/x/sync/errgroup" @@ -43,18 +47,122 @@ var ( fundingAmount = new(big.Int).Mul(deployment.UBigInt(100), deployment.UBigInt(1e18)) // 100 eth ) -// todo: Have a different struct for commit/exec? -type LokiMetric struct { - SequenceNumber uint64 `json:"sequence_number"` - CommitDuration uint64 `json:"commit_duration"` - ExecDuration uint64 `json:"exec_duration"` -} - type finalSeqNrReport struct { sourceChainSelector uint64 expectedSeqNrRange ccipocr3.SeqNumRange } +func subscribeTransmitEvents( + ctx context.Context, + lggr logger.Logger, + onRamp onramp.OnRampInterface, + otherChains []uint64, + startBlock *uint64, + srcChainSel uint64, + loadFinished chan struct{}, + client deployment.OnchainClient, + wg *sync.WaitGroup, + metricPipe chan messageData, + finalSeqNrCommitChannels map[uint64]chan finalSeqNrReport, + finalSeqNrExecChannels map[uint64]chan finalSeqNrReport, +) { + defer wg.Done() + lggr.Infow("starting transmit event subscriber for ", + "srcChain", srcChainSel, + "startblock", startBlock, + ) + + seqNums := make(map[testhelpers.SourceDestPair]SeqNumRange) + for _, cs := range otherChains { + seqNums[testhelpers.SourceDestPair{ + SourceChainSelector: srcChainSel, + DestChainSelector: cs, + }] = SeqNumRange{ + // we use the maxuint as a sentinel value here to ensure we always get the lowest possible seqnum + Start: atomic.NewUint64(math.MaxUint64), + End: atomic.NewUint64(0), + } + } + + sink := make(chan *onramp.OnRampCCIPMessageSent) + subscription := event.Resubscribe(SubscriptionTimeout, func(_ context.Context) (event.Subscription, error) { + return onRamp.WatchCCIPMessageSent(&bind.WatchOpts{ + Context: ctx, + Start: startBlock, + }, sink, nil, nil) + }) + defer subscription.Unsubscribe() + + for { + select { + case <-subscription.Err(): + return + case event := <-sink: + lggr.Debugw("received transmit event for", + "srcChain", srcChainSel, + "destChain", event.DestChainSelector, + "sequenceNumber", event.SequenceNumber) + + blockNum := event.Raw.BlockNumber + header, err := client.HeaderByNumber(ctx, new(big.Int).SetUint64(blockNum)) + if err != nil { + lggr.Errorw("error getting header by number") + } + data := messageData{ + eventType: transmitted, + srcDstSeqNum: srcDstSeqNum{ + src: srcChainSel, + dst: event.DestChainSelector, + seqNum: event.SequenceNumber, + }, + timestamp: header.Time, + } + metricPipe <- data + + csPair := testhelpers.SourceDestPair{ + SourceChainSelector: srcChainSel, + DestChainSelector: event.DestChainSelector, + } + // always store the lowest seen number as the start seq num + if event.SequenceNumber < seqNums[csPair].Start.Load() { + seqNums[csPair].Start.Store(event.SequenceNumber) + } + + // always store the greatest sequence number we have seen as the maximum + if event.SequenceNumber > seqNums[csPair].End.Load() { + seqNums[csPair].End.Store(event.SequenceNumber) + } + case <-ctx.Done(): + lggr.Errorw("received context cancel signal for transmit watcher", + "srcChain", srcChainSel) + return + case <-loadFinished: + lggr.Debugw("load finished, closing transmit watchers", "srcChainSel", srcChainSel) + for csPair, seqNums := range seqNums { + lggr.Infow("pushing finalized sequence numbers for ", + "srcChainSelector", srcChainSel, + "destChainSelector", csPair.DestChainSelector, + "seqNums", seqNums) + finalSeqNrCommitChannels[csPair.DestChainSelector] <- finalSeqNrReport{ + sourceChainSelector: csPair.SourceChainSelector, + expectedSeqNrRange: ccipocr3.SeqNumRange{ + ccipocr3.SeqNum(seqNums.Start.Load()), ccipocr3.SeqNum(seqNums.End.Load()), + }, + } + + finalSeqNrExecChannels[csPair.DestChainSelector] <- finalSeqNrReport{ + sourceChainSelector: csPair.SourceChainSelector, + expectedSeqNrRange: ccipocr3.SeqNumRange{ + ccipocr3.SeqNum(seqNums.Start.Load()), ccipocr3.SeqNum(seqNums.End.Load()), + }, + } + } + return + } + + } +} + func subscribeCommitEvents( ctx context.Context, lggr logger.Logger, @@ -64,11 +172,11 @@ func subscribeCommitEvents( chainSelector uint64, client deployment.OnchainClient, finalSeqNrs chan finalSeqNrReport, - errChan chan error, wg *sync.WaitGroup, metricPipe chan messageData, ) { defer wg.Done() + defer close(finalSeqNrs) lggr.Infow("starting commit event subscriber for ", "destChain", chainSelector, @@ -96,8 +204,7 @@ func subscribeCommitEvents( for { select { - case subErr := <-subscription.Err(): - errChan <- subErr + case <-subscription.Err(): return case report := <-sink: if len(report.BlessedMerkleRoots)+len(report.UnblessedMerkleRoots) > 0 { @@ -113,7 +220,7 @@ func subscribeCommitEvents( blockNum := report.Raw.BlockNumber header, err := client.HeaderByNumber(ctx, new(big.Int).SetUint64(blockNum)) if err != nil { - errChan <- err + lggr.Errorw("error getting header by number") } data := messageData{ eventType: committed, @@ -134,7 +241,6 @@ func subscribeCommitEvents( "destChain", chainSelector, "sourceChains", srcChains, "expectedSeqNumbers", expectedRange) - errChan <- errors.New("timed out waiting for commit report") return case finalSeqNrUpdate, ok := <-finalSeqNrs: @@ -193,11 +299,11 @@ func subscribeExecutionEvents( chainSelector uint64, client deployment.OnchainClient, finalSeqNrs chan finalSeqNrReport, - errChan chan error, wg *sync.WaitGroup, metricPipe chan messageData, ) { defer wg.Done() + defer close(finalSeqNrs) lggr.Infow("starting execution event subscriber for ", "destChain", chainSelector, @@ -227,7 +333,6 @@ func subscribeExecutionEvents( case subErr := <-subscription.Err(): lggr.Errorw("error in execution subscription", "err", subErr) - errChan <- subErr return case event := <-sink: lggr.Debugw("received execution event for", @@ -239,7 +344,7 @@ func subscribeExecutionEvents( blockNum := event.Raw.BlockNumber header, err := client.HeaderByNumber(ctx, new(big.Int).SetUint64(blockNum)) if err != nil { - errChan <- err + lggr.Errorw("error getting header by number") } data := messageData{ eventType: executed, @@ -260,7 +365,6 @@ func subscribeExecutionEvents( "expectedSeqNumbers", expectedRange, "seenMessages", seenMessages, "completedSrcChains", completedSrcChains) - errChan <- errors.New("timed out waiting for execution event") return case finalSeqNrUpdate := <-finalSeqNrs: diff --git a/integration-tests/load/ccip/metrics.go b/integration-tests/load/ccip/metrics.go index 8c64747a10e..d18248e5def 100644 --- a/integration-tests/load/ccip/metrics.go +++ b/integration-tests/load/ccip/metrics.go @@ -2,11 +2,12 @@ package ccip import ( "context" - "strconv" chainselectors "github.com/smartcontractkit/chain-selectors" "github.com/stretchr/testify/require" + "github.com/smartcontractkit/chainlink/integration-tests/testconfig/ccip" + "github.com/smartcontractkit/chainlink-common/pkg/logger" "github.com/smartcontractkit/chainlink-testing-framework/wasp" @@ -19,6 +20,13 @@ const ( ErrLokiPush = "failed to push metrics to Loki" ) +type LokiMetric struct { + TransmitTime uint64 `json:"transmit_time"` + SequenceNumber uint64 `json:"sequence_number"` + CommitDuration uint64 `json:"commit_duration"` + ExecDuration uint64 `json:"exec_duration"` +} + // MetricsManager is used for maintaining state of different sequence numbers // Once we've received all expected timestamps, it pushes the metrics to Loki type MetricManager struct { @@ -26,11 +34,11 @@ type MetricManager struct { loki *wasp.LokiClient InputChan chan messageData state map[srcDstSeqNum]metricState + testLabel string } type metricState struct { timestamps [3]uint64 - round int } type srcDstSeqNum struct { @@ -43,27 +51,28 @@ type messageData struct { eventType int srcDstSeqNum timestamp uint64 - round int } -func NewMetricsManager(t *testing.T, l logger.Logger) *MetricManager { +func NewMetricsManager(t *testing.T, l logger.Logger, overrides *ccip.LoadConfig) *MetricManager { // initialize loki using endpoint from user defined env vars loki, err := wasp.NewLokiClient(wasp.NewEnvLokiConfig()) require.NoError(t, err) + testLabel := "default" + if overrides.TestLabel != nil { + testLabel = *overrides.TestLabel + } return &MetricManager{ lggr: l, loki: loki, InputChan: make(chan messageData), state: make(map[srcDstSeqNum]metricState), + testLabel: testLabel, } } -func (mm *MetricManager) Stop() { - close(mm.InputChan) -} - func (mm *MetricManager) Start(ctx context.Context) { + defer close(mm.InputChan) for { select { case <-ctx.Done(): @@ -79,19 +88,18 @@ func (mm *MetricManager) Start(ctx context.Context) { execDuration = timestamps[executed] - timestamps[committed] } - lokiLabels, err := setLokiLabels(srcDstSeqNum.src, srcDstSeqNum.dst, metricState.round) + lokiLabels, err := setLokiLabels(srcDstSeqNum.src, srcDstSeqNum.dst, mm.testLabel) if err != nil { mm.lggr.Error("error setting loki labels", "error", err) // don't return here, we still want to push metrics to loki } SendMetricsToLoki(mm.lggr, mm.loki, lokiLabels, &LokiMetric{ + TransmitTime: timestamps[transmitted], ExecDuration: execDuration, CommitDuration: commitDuration, SequenceNumber: srcDstSeqNum.seqNum, }) } - close(mm.InputChan) - mm.loki.Stop() return case data := <-mm.InputChan: if _, ok := mm.state[data.srcDstSeqNum]; !ok { @@ -100,22 +108,35 @@ func (mm *MetricManager) Start(ctx context.Context) { } } + if data.seqNum == 0 { + // seqNum of 0 indicates an error. Push nil values to loki + lokiLabels, err := setLokiLabels(data.src, data.dst, mm.testLabel) + if err != nil { + mm.lggr.Error("error setting loki labels", "error", err) + } + SendMetricsToLoki(mm.lggr, mm.loki, lokiLabels, &LokiMetric{ + TransmitTime: data.timestamp, + ExecDuration: 0, + CommitDuration: 0, + SequenceNumber: 0, + }) + continue + } state := mm.state[data.srcDstSeqNum] state.timestamps[data.eventType] = data.timestamp - if data.eventType == transmitted && data.round != -1 { - state.round = data.round - } + mm.state[data.srcDstSeqNum] = state if data.eventType == executed { - mm.lggr.Infow("new state for received seqNum is ", "dst", data.dst, "seqNum", data.seqNum, "round", state.round, "timestamps", state.timestamps) + mm.lggr.Infow("new state for received seqNum is ", "dst", data.dst, "seqNum", data.seqNum, "timestamps", state.timestamps) } // we have all data needed to push to Loki if state.timestamps[transmitted] != 0 && state.timestamps[committed] != 0 && state.timestamps[executed] != 0 { - lokiLabels, err := setLokiLabels(data.src, data.dst, mm.state[data.srcDstSeqNum].round) + lokiLabels, err := setLokiLabels(data.src, data.dst, mm.testLabel) if err != nil { mm.lggr.Error("error setting loki labels", "error", err) } SendMetricsToLoki(mm.lggr, mm.loki, lokiLabels, &LokiMetric{ + TransmitTime: state.timestamps[transmitted], ExecDuration: state.timestamps[executed] - state.timestamps[committed], CommitDuration: state.timestamps[committed] - state.timestamps[transmitted], SequenceNumber: data.seqNum, @@ -133,7 +154,7 @@ func SendMetricsToLoki(l logger.Logger, lc *wasp.LokiClient, updatedLabels map[s } } -func setLokiLabels(src, dst uint64, round int) (map[string]string, error) { +func setLokiLabels(src, dst uint64, testLabel string) (map[string]string, error) { srcChainID, err := chainselectors.GetChainIDFromSelector(src) if err != nil { return nil, err @@ -145,7 +166,7 @@ func setLokiLabels(src, dst uint64, round int) (map[string]string, error) { return map[string]string{ "sourceEvmChainId": srcChainID, "destEvmChainId": dstChainID, - "roundNum": strconv.Itoa(round), "testType": LokiLoadLabel, + "testLabel": testLabel, }, nil } diff --git a/integration-tests/testconfig/ccip/ccip.toml b/integration-tests/testconfig/ccip/ccip.toml index 9d98f796900..32e21beaff1 100644 --- a/integration-tests/testconfig/ccip/ccip.toml +++ b/integration-tests/testconfig/ccip/ccip.toml @@ -246,9 +246,9 @@ ephemeral_addresses_number = 0 MessageTypeWeights = [100,0,0] # each destination chain will receive 1 incoming request per RequestFrequency for the duration of LoadDuration RequestFrequency = "5s" -LoadDuration = "10m" +LoadDuration = "5h" # destination chain selectors to send messages to NumDestinationChains = 8 # Directory where we receive environment configuration from crib CribEnvDirectory = "../../../../crib/deployments/ccip-v2/.tmp" -TimeoutDuration = "20m" \ No newline at end of file +TimeoutDuration = "30m" \ No newline at end of file diff --git a/integration-tests/testconfig/ccip/load.go b/integration-tests/testconfig/ccip/load.go index ddea267e8f0..5378ecf1412 100644 --- a/integration-tests/testconfig/ccip/load.go +++ b/integration-tests/testconfig/ccip/load.go @@ -16,6 +16,7 @@ type LoadConfig struct { CribEnvDirectory *string NumDestinationChains *int TimeoutDuration *string + TestLabel *string } func (l *LoadConfig) Validate(t *testing.T, e *deployment.Environment) {