From 52b8df6b65f2677cfb8fa9710c317a1fa4edcc92 Mon Sep 17 00:00:00 2001 From: Gregory Cawthorne Date: Wed, 26 Feb 2025 13:40:58 +0000 Subject: [PATCH 1/2] Add LLO wide channels intergration test --- .../ocr2/plugins/llo/integration_test.go | 223 ++++++++++++++++++ 1 file changed, 223 insertions(+) diff --git a/core/services/ocr2/plugins/llo/integration_test.go b/core/services/ocr2/plugins/llo/integration_test.go index 634ab0be170..f43c94b9357 100644 --- a/core/services/ocr2/plugins/llo/integration_test.go +++ b/core/services/ocr2/plugins/llo/integration_test.go @@ -1051,6 +1051,229 @@ dp -> deribit_funding_interval_hours_parse -> deribit_funding_interval_hours_dec }) } +func TestIntegration_LLO_stress_test_with_wide_channels_and_transmit_errors(t *testing.T) { + t.Parallel() + + // logLevel: the log level to use for the nodes + // setting a more verbose log level increases cpu usage significantly + const logLevel = toml.LogLevel(zapcore.ErrorLevel) + + // NOTE: Tweak these values to increase or decrease the intensity of the + // stress test + // + // nChannels: the total number of channels + // maxQueueSize: the maximum size of the transmit queue + // nReports: the number of reports to expect per node + // nStreamsPerChannel: the number of streams per channel + + // LESS STRESSFUL + const nChannels = 2 + const maxQueueSize = 4 + const nReports = 100 + + const nStreamsPerChannel = 10_000 + + clientCSAKeys := make([]csakey.KeyV2, nNodes) + clientPubKeys := make([]ed25519.PublicKey, nNodes) + + const salt = 301 + + for i := 0; i < nNodes; i++ { + k := big.NewInt(int64(salt + i)) + key := csakey.MustNewV2XXXTestingOnly(k) + clientCSAKeys[i] = key + clientPubKeys[i] = key.PublicKey + } + + steve, backend, configurator, configuratorAddress, _, _, _, _, configStore, configStoreAddress, _, _, _, _ := setupBlockchain(t) + fromBlock := 1 + + // Setup bootstrap + bootstrapCSAKey := csakey.MustNewV2XXXTestingOnly(big.NewInt(salt - 1)) + bootstrapNodePort := freeport.GetOne(t) + appBootstrap, bootstrapPeerID, _, bootstrapKb, _ := setupNode(t, bootstrapNodePort, "bootstrap_llo", backend, bootstrapCSAKey, nil) + bootstrapNode := Node{App: appBootstrap, KeyBundle: bootstrapKb} + + t.Run("transmit queue does not grow unbounded", func(t *testing.T) { + packets := make(chan *packet, 100000) + serverKey := csakey.MustNewV2XXXTestingOnly(big.NewInt(salt - 2)) + serverPubKey := serverKey.PublicKey + srv := NewMercuryServer(t, ed25519.PrivateKey(serverKey.Raw()), packets) + + serverURL := startMercuryServer(t, srv, clientPubKeys) + + donID := uint32(888333) + streams := []Stream{ethStream, linkStream} + streamMap := make(map[uint32]Stream) + for _, strm := range streams { + streamMap[strm.id] = strm + } + + // Setup oracle nodes + oracles, nodes := setupNodes(t, nNodes, backend, clientCSAKeys, streams, func(c *chainlink.Config) { + c.Mercury.Transmitter.Protocol = ptr(config.MercuryTransmitterProtocolGRPC) + c.Mercury.Transmitter.TransmitQueueMaxSize = ptr(uint32(maxQueueSize)) // Test queue overflow + c.Log.Level = ptr(logLevel) + }) + + chainID := testutils.SimulatedChainID + relayType := "evm" + relayConfig := fmt.Sprintf(` +chainID = "%s" +fromBlock = %d +lloDonID = %d +lloConfigMode = "bluegreen" +`, chainID, fromBlock, donID) + addBootstrapJob(t, bootstrapNode, configuratorAddress, "job-3", relayType, relayConfig) + + mustEncodeOpts := func(opts *lloevm.ReportFormatEVMABIEncodeOpts) []byte { + encoded, err := json.Marshal(opts) + require.NoError(t, err) + return encoded + } + + standardMultiplier := ubig.NewI(1e18) + expirationWindow := uint32(3600) + + // Channel definitions + // 2,000 channels should produce 2,000 reports per second + channelDefinitions := llotypes.ChannelDefinitions{} + for i := uint32(0); i < nChannels; i++ { + streamDefinitions := make([]llotypes.Stream, nStreamsPerChannel+2) + streamABIDefinitions := make([]lloevm.ABIEncoder, nStreamsPerChannel) + + streamDefinitions[0] = llotypes.Stream{ + StreamID: ethStreamID, + Aggregator: llotypes.AggregatorMedian, + } + streamDefinitions[1] = llotypes.Stream{ + StreamID: linkStreamID, + Aggregator: llotypes.AggregatorMedian, + } + + for j := uint32(0); j < nStreamsPerChannel; j++ { + streamDefinitions[j+2] = llotypes.Stream{ + StreamID: ethStreamID, + Aggregator: llotypes.AggregatorMedian, + } + streamABIDefinitions[j] = lloevm.ABIEncoder{ + StreamID: ethStreamID, + Type: "int192", + Multiplier: standardMultiplier, + } + } + + feedID := utils.NewHash() + + channelDefinitions[i] = llotypes.ChannelDefinition{ + ReportFormat: llotypes.ReportFormatEVMABIEncodeUnpacked, + Streams: streamDefinitions, + Opts: mustEncodeOpts(&lloevm.ReportFormatEVMABIEncodeOpts{ + BaseUSDFee: decimal.NewFromFloat32(0.1), + ExpirationWindow: uint32(expirationWindow), + FeedID: feedID, + ABI: streamABIDefinitions, + }), + } + } + url, sha := newChannelDefinitionsServer(t, channelDefinitions) + + // Set channel definitions + _, err := configStore.SetChannelDefinitions(steve, donID, url, sha) + require.NoError(t, err) + backend.Commit() + + // one working and one broken transmission server + pluginConfig := fmt.Sprintf(`servers = { "%s" = "%x", "example.invalid" = "%x" } +donID = %d +channelDefinitionsContractAddress = "0x%x" +channelDefinitionsContractFromBlock = %d`, serverURL, serverPubKey, serverPubKey, donID, configStoreAddress, fromBlock) + addOCRJobsEVMPremiumLegacy(t, streams, serverPubKey, serverURL, configuratorAddress, bootstrapPeerID, bootstrapNodePort, nodes, configStoreAddress, clientPubKeys, pluginConfig, relayType, relayConfig) + + var blueDigest ocr2types.ConfigDigest + + { + // Set config on configurator + blueDigest = setProductionConfig( + t, donID, steve, backend, configurator, configuratorAddress, nodes, oracles, + ) + + // NOTE: Wait for 40,000 reports (should take about 5 seconds) - 2,000 reports per second * 4 transmitters * 5 seconds + // count of packets received keyed by transmitter IP + m := map[string]int{} + for pckt := range packets { + pr, ok := peer.FromContext(pckt.ctx) + require.True(t, ok) + addr := pr.Addr + req := pckt.req + + assert.Equal(t, uint32(llotypes.ReportFormatEVMABIEncodeUnpacked), req.ReportFormat) + v := make(map[string]interface{}) + err := mercury.PayloadTypes.UnpackIntoMap(v, req.Payload) + require.NoError(t, err) + report, exists := v["report"] + if !exists { + t.Fatalf("expected payload %#v to contain 'report'", v) + } + reportCtx, exists := v["reportContext"] + if !exists { + t.Fatalf("expected payload %#v to contain 'reportContext'", v) + } + + // Check the report context + assert.Equal(t, [32]byte(blueDigest), reportCtx.([3][32]uint8)[0]) // config digest + assert.Equal(t, "000000000000000000000000000000000000000000000000000d8e0d00000001", fmt.Sprintf("%x", reportCtx.([3][32]uint8)[2])) // extra hash + + reportElems := make(map[string]interface{}) + err = lloevm.BaseSchema.UnpackIntoMap(reportElems, report.([]byte)) + require.NoError(t, err) + + // Check payload values + payload := report.([]byte)[192:] + + require.Len(t, payload, 32*nStreamsPerChannel) + args := abi.Arguments([]abi.Argument{ + {Name: "benchmarkPrice", Type: mustNewType("int192")}, + }) + v2 := make(map[string]interface{}) + err2 := args.UnpackIntoMap(v2, payload) + require.NoError(t, err2) + + assert.Equal(t, "2976390000000000000000", v2["benchmarkPrice"].(*big.Int).String()) + + m[addr.String()]++ + finished := 0 + for _, cnt := range m { + if cnt >= nReports { + finished++ + } + } + if finished == 4 { + break + } + } + } + + // Shut all nodes down + for i, node := range nodes { + require.NoError(t, node.App.Stop()) + // Ensure that the transmit queue was limited + db := node.App.GetDB() + cnt := 0 + + // The failing server + err := db.GetContext(tests.Context(t), &cnt, "SELECT count(*) FROM llo_mercury_transmit_queue WHERE server_url = 'example.invalid'") + require.NoError(t, err) + assert.LessOrEqual(t, cnt, maxQueueSize, "persisted transmit queue size too large for node %d for failing server", i) + + // The succeeding server + err = db.GetContext(tests.Context(t), &cnt, "SELECT count(*) FROM llo_mercury_transmit_queue WHERE server_url = $1", serverURL) + require.NoError(t, err) + assert.LessOrEqual(t, cnt, maxQueueSize, "persisted transmit queue size too large for node %d for succeeding server", i) + } + }) +} + func TestIntegration_LLO_stress_test_and_transmit_errors(t *testing.T) { t.Parallel() From 7296c325c6becd1986c37eb9cbec0841e9b50a7e Mon Sep 17 00:00:00 2001 From: Gregory Cawthorne Date: Wed, 26 Feb 2025 14:42:26 +0000 Subject: [PATCH 2/2] Add unique stream ids --- .../services/ocr2/plugins/llo/helpers_test.go | 69 +++++++++++++++++++ .../ocr2/plugins/llo/integration_test.go | 18 +++-- 2 files changed, 83 insertions(+), 4 deletions(-) diff --git a/core/services/ocr2/plugins/llo/helpers_test.go b/core/services/ocr2/plugins/llo/helpers_test.go index cdcd77fc97d..980ada796db 100644 --- a/core/services/ocr2/plugins/llo/helpers_test.go +++ b/core/services/ocr2/plugins/llo/helpers_test.go @@ -466,6 +466,75 @@ func createBridge(t *testing.T, bridgeName string, resultJSON string, borm bridg })) } +func addOCRJobsEVMABIEncode( + t *testing.T, + streams []Stream, + serverPubKey ed25519.PublicKey, + serverURL string, + configuratorAddress common.Address, + bootstrapPeerID string, + bootstrapNodePort int, + nodes []Node, + configStoreAddress common.Address, + clientPubKeys []ed25519.PublicKey, + pluginConfig, + relayType, + relayConfig string) (jobIDs map[int]map[uint32]int32) { + // node idx => stream id => job id + jobIDs = make(map[int]map[uint32]int32) + // Add OCR jobs - one per feed on each node + for i, node := range nodes { + if jobIDs[i] == nil { + jobIDs[i] = make(map[uint32]int32) + } + for j, strm := range streams { + // assume that streams are native, link and additionals are quote + if j < 2 { + var name string + if j == 0 { + name = "nativeprice" + } else { + name = "linkprice" + } + name = fmt.Sprintf("%s-%d-%d", name, strm.id, j) + bmBridge := createSingleDecimalBridge(t, name, i, strm.baseBenchmarkPrice, node.App.BridgeORM()) + jobID := addSingleDecimalStreamJob( + t, + node, + strm.id, + bmBridge, + ) + jobIDs[i][strm.id] = jobID + } else { + name := "medianprice" + + name = fmt.Sprintf("%s-%d-%d", name, strm.id, j) + bmBridge := createSingleDecimalBridge(t, name, i, strm.baseBenchmarkPrice, node.App.BridgeORM()) + jobID := addSingleDecimalStreamJob( + t, + node, + strm.id, + bmBridge, + ) + jobIDs[i][strm.id] = jobID + } + } + addLLOJob( + t, + node, + configuratorAddress, + bootstrapPeerID, + bootstrapNodePort, + clientPubKeys[i], + "feed-1", + pluginConfig, + relayType, + relayConfig, + ) + } + return jobIDs +} + func addOCRJobsEVMPremiumLegacy( t *testing.T, streams []Stream, diff --git a/core/services/ocr2/plugins/llo/integration_test.go b/core/services/ocr2/plugins/llo/integration_test.go index f43c94b9357..408262a51b5 100644 --- a/core/services/ocr2/plugins/llo/integration_test.go +++ b/core/services/ocr2/plugins/llo/integration_test.go @@ -1069,9 +1069,9 @@ func TestIntegration_LLO_stress_test_with_wide_channels_and_transmit_errors(t *t // LESS STRESSFUL const nChannels = 2 const maxQueueSize = 4 - const nReports = 100 + const nReports = 10 - const nStreamsPerChannel = 10_000 + const nStreamsPerChannel = 3000 clientCSAKeys := make([]csakey.KeyV2, nNodes) clientPubKeys := make([]ed25519.PublicKey, nNodes) @@ -1103,7 +1103,12 @@ func TestIntegration_LLO_stress_test_with_wide_channels_and_transmit_errors(t *t serverURL := startMercuryServer(t, srv, clientPubKeys) donID := uint32(888333) - streams := []Stream{ethStream, linkStream} + + streams := make([]Stream, nStreamsPerChannel+2) + + streams[0] = ethStream + streams[1] = linkStream + streamMap := make(map[uint32]Stream) for _, strm := range streams { streamMap[strm.id] = strm @@ -1152,6 +1157,11 @@ lloConfigMode = "bluegreen" } for j := uint32(0); j < nStreamsPerChannel; j++ { + newStreamID := ethStreamID + 2 + j + streams[j+2] = Stream{ + id: newStreamID, + baseBenchmarkPrice: decimal.NewFromFloat32(2_976.39), + } streamDefinitions[j+2] = llotypes.Stream{ StreamID: ethStreamID, Aggregator: llotypes.AggregatorMedian, @@ -1188,7 +1198,7 @@ lloConfigMode = "bluegreen" donID = %d channelDefinitionsContractAddress = "0x%x" channelDefinitionsContractFromBlock = %d`, serverURL, serverPubKey, serverPubKey, donID, configStoreAddress, fromBlock) - addOCRJobsEVMPremiumLegacy(t, streams, serverPubKey, serverURL, configuratorAddress, bootstrapPeerID, bootstrapNodePort, nodes, configStoreAddress, clientPubKeys, pluginConfig, relayType, relayConfig) + addOCRJobsEVMABIEncode(t, streams, serverPubKey, serverURL, configuratorAddress, bootstrapPeerID, bootstrapNodePort, nodes, configStoreAddress, clientPubKeys, pluginConfig, relayType, relayConfig) var blueDigest ocr2types.ConfigDigest