From 52b8df6b65f2677cfb8fa9710c317a1fa4edcc92 Mon Sep 17 00:00:00 2001 From: Gregory Cawthorne Date: Wed, 26 Feb 2025 13:40:58 +0000 Subject: [PATCH] Add LLO wide channels intergration test --- .../ocr2/plugins/llo/integration_test.go | 223 ++++++++++++++++++ 1 file changed, 223 insertions(+) diff --git a/core/services/ocr2/plugins/llo/integration_test.go b/core/services/ocr2/plugins/llo/integration_test.go index 634ab0be170..f43c94b9357 100644 --- a/core/services/ocr2/plugins/llo/integration_test.go +++ b/core/services/ocr2/plugins/llo/integration_test.go @@ -1051,6 +1051,229 @@ dp -> deribit_funding_interval_hours_parse -> deribit_funding_interval_hours_dec }) } +func TestIntegration_LLO_stress_test_with_wide_channels_and_transmit_errors(t *testing.T) { + t.Parallel() + + // logLevel: the log level to use for the nodes + // setting a more verbose log level increases cpu usage significantly + const logLevel = toml.LogLevel(zapcore.ErrorLevel) + + // NOTE: Tweak these values to increase or decrease the intensity of the + // stress test + // + // nChannels: the total number of channels + // maxQueueSize: the maximum size of the transmit queue + // nReports: the number of reports to expect per node + // nStreamsPerChannel: the number of streams per channel + + // LESS STRESSFUL + const nChannels = 2 + const maxQueueSize = 4 + const nReports = 100 + + const nStreamsPerChannel = 10_000 + + clientCSAKeys := make([]csakey.KeyV2, nNodes) + clientPubKeys := make([]ed25519.PublicKey, nNodes) + + const salt = 301 + + for i := 0; i < nNodes; i++ { + k := big.NewInt(int64(salt + i)) + key := csakey.MustNewV2XXXTestingOnly(k) + clientCSAKeys[i] = key + clientPubKeys[i] = key.PublicKey + } + + steve, backend, configurator, configuratorAddress, _, _, _, _, configStore, configStoreAddress, _, _, _, _ := setupBlockchain(t) + fromBlock := 1 + + // Setup bootstrap + bootstrapCSAKey := csakey.MustNewV2XXXTestingOnly(big.NewInt(salt - 1)) + bootstrapNodePort := freeport.GetOne(t) + appBootstrap, bootstrapPeerID, _, bootstrapKb, _ := setupNode(t, bootstrapNodePort, "bootstrap_llo", backend, bootstrapCSAKey, nil) + bootstrapNode := Node{App: appBootstrap, KeyBundle: bootstrapKb} + + t.Run("transmit queue does not grow unbounded", func(t *testing.T) { + packets := make(chan *packet, 100000) + serverKey := csakey.MustNewV2XXXTestingOnly(big.NewInt(salt - 2)) + serverPubKey := serverKey.PublicKey + srv := NewMercuryServer(t, ed25519.PrivateKey(serverKey.Raw()), packets) + + serverURL := startMercuryServer(t, srv, clientPubKeys) + + donID := uint32(888333) + streams := []Stream{ethStream, linkStream} + streamMap := make(map[uint32]Stream) + for _, strm := range streams { + streamMap[strm.id] = strm + } + + // Setup oracle nodes + oracles, nodes := setupNodes(t, nNodes, backend, clientCSAKeys, streams, func(c *chainlink.Config) { + c.Mercury.Transmitter.Protocol = ptr(config.MercuryTransmitterProtocolGRPC) + c.Mercury.Transmitter.TransmitQueueMaxSize = ptr(uint32(maxQueueSize)) // Test queue overflow + c.Log.Level = ptr(logLevel) + }) + + chainID := testutils.SimulatedChainID + relayType := "evm" + relayConfig := fmt.Sprintf(` +chainID = "%s" +fromBlock = %d +lloDonID = %d +lloConfigMode = "bluegreen" +`, chainID, fromBlock, donID) + addBootstrapJob(t, bootstrapNode, configuratorAddress, "job-3", relayType, relayConfig) + + mustEncodeOpts := func(opts *lloevm.ReportFormatEVMABIEncodeOpts) []byte { + encoded, err := json.Marshal(opts) + require.NoError(t, err) + return encoded + } + + standardMultiplier := ubig.NewI(1e18) + expirationWindow := uint32(3600) + + // Channel definitions + // 2,000 channels should produce 2,000 reports per second + channelDefinitions := llotypes.ChannelDefinitions{} + for i := uint32(0); i < nChannels; i++ { + streamDefinitions := make([]llotypes.Stream, nStreamsPerChannel+2) + streamABIDefinitions := make([]lloevm.ABIEncoder, nStreamsPerChannel) + + streamDefinitions[0] = llotypes.Stream{ + StreamID: ethStreamID, + Aggregator: llotypes.AggregatorMedian, + } + streamDefinitions[1] = llotypes.Stream{ + StreamID: linkStreamID, + Aggregator: llotypes.AggregatorMedian, + } + + for j := uint32(0); j < nStreamsPerChannel; j++ { + streamDefinitions[j+2] = llotypes.Stream{ + StreamID: ethStreamID, + Aggregator: llotypes.AggregatorMedian, + } + streamABIDefinitions[j] = lloevm.ABIEncoder{ + StreamID: ethStreamID, + Type: "int192", + Multiplier: standardMultiplier, + } + } + + feedID := utils.NewHash() + + channelDefinitions[i] = llotypes.ChannelDefinition{ + ReportFormat: llotypes.ReportFormatEVMABIEncodeUnpacked, + Streams: streamDefinitions, + Opts: mustEncodeOpts(&lloevm.ReportFormatEVMABIEncodeOpts{ + BaseUSDFee: decimal.NewFromFloat32(0.1), + ExpirationWindow: uint32(expirationWindow), + FeedID: feedID, + ABI: streamABIDefinitions, + }), + } + } + url, sha := newChannelDefinitionsServer(t, channelDefinitions) + + // Set channel definitions + _, err := configStore.SetChannelDefinitions(steve, donID, url, sha) + require.NoError(t, err) + backend.Commit() + + // one working and one broken transmission server + pluginConfig := fmt.Sprintf(`servers = { "%s" = "%x", "example.invalid" = "%x" } +donID = %d +channelDefinitionsContractAddress = "0x%x" +channelDefinitionsContractFromBlock = %d`, serverURL, serverPubKey, serverPubKey, donID, configStoreAddress, fromBlock) + addOCRJobsEVMPremiumLegacy(t, streams, serverPubKey, serverURL, configuratorAddress, bootstrapPeerID, bootstrapNodePort, nodes, configStoreAddress, clientPubKeys, pluginConfig, relayType, relayConfig) + + var blueDigest ocr2types.ConfigDigest + + { + // Set config on configurator + blueDigest = setProductionConfig( + t, donID, steve, backend, configurator, configuratorAddress, nodes, oracles, + ) + + // NOTE: Wait for 40,000 reports (should take about 5 seconds) - 2,000 reports per second * 4 transmitters * 5 seconds + // count of packets received keyed by transmitter IP + m := map[string]int{} + for pckt := range packets { + pr, ok := peer.FromContext(pckt.ctx) + require.True(t, ok) + addr := pr.Addr + req := pckt.req + + assert.Equal(t, uint32(llotypes.ReportFormatEVMABIEncodeUnpacked), req.ReportFormat) + v := make(map[string]interface{}) + err := mercury.PayloadTypes.UnpackIntoMap(v, req.Payload) + require.NoError(t, err) + report, exists := v["report"] + if !exists { + t.Fatalf("expected payload %#v to contain 'report'", v) + } + reportCtx, exists := v["reportContext"] + if !exists { + t.Fatalf("expected payload %#v to contain 'reportContext'", v) + } + + // Check the report context + assert.Equal(t, [32]byte(blueDigest), reportCtx.([3][32]uint8)[0]) // config digest + assert.Equal(t, "000000000000000000000000000000000000000000000000000d8e0d00000001", fmt.Sprintf("%x", reportCtx.([3][32]uint8)[2])) // extra hash + + reportElems := make(map[string]interface{}) + err = lloevm.BaseSchema.UnpackIntoMap(reportElems, report.([]byte)) + require.NoError(t, err) + + // Check payload values + payload := report.([]byte)[192:] + + require.Len(t, payload, 32*nStreamsPerChannel) + args := abi.Arguments([]abi.Argument{ + {Name: "benchmarkPrice", Type: mustNewType("int192")}, + }) + v2 := make(map[string]interface{}) + err2 := args.UnpackIntoMap(v2, payload) + require.NoError(t, err2) + + assert.Equal(t, "2976390000000000000000", v2["benchmarkPrice"].(*big.Int).String()) + + m[addr.String()]++ + finished := 0 + for _, cnt := range m { + if cnt >= nReports { + finished++ + } + } + if finished == 4 { + break + } + } + } + + // Shut all nodes down + for i, node := range nodes { + require.NoError(t, node.App.Stop()) + // Ensure that the transmit queue was limited + db := node.App.GetDB() + cnt := 0 + + // The failing server + err := db.GetContext(tests.Context(t), &cnt, "SELECT count(*) FROM llo_mercury_transmit_queue WHERE server_url = 'example.invalid'") + require.NoError(t, err) + assert.LessOrEqual(t, cnt, maxQueueSize, "persisted transmit queue size too large for node %d for failing server", i) + + // The succeeding server + err = db.GetContext(tests.Context(t), &cnt, "SELECT count(*) FROM llo_mercury_transmit_queue WHERE server_url = $1", serverURL) + require.NoError(t, err) + assert.LessOrEqual(t, cnt, maxQueueSize, "persisted transmit queue size too large for node %d for succeeding server", i) + } + }) +} + func TestIntegration_LLO_stress_test_and_transmit_errors(t *testing.T) { t.Parallel()