Skip to content

Commit

Permalink
Add LLO wide channels intergration test
Browse files Browse the repository at this point in the history
  • Loading branch information
cawthorne committed Feb 26, 2025
1 parent fab3f96 commit 52b8df6
Showing 1 changed file with 223 additions and 0 deletions.
223 changes: 223 additions & 0 deletions core/services/ocr2/plugins/llo/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1051,6 +1051,229 @@ dp -> deribit_funding_interval_hours_parse -> deribit_funding_interval_hours_dec
})
}

func TestIntegration_LLO_stress_test_with_wide_channels_and_transmit_errors(t *testing.T) {
t.Parallel()

// logLevel: the log level to use for the nodes
// setting a more verbose log level increases cpu usage significantly
const logLevel = toml.LogLevel(zapcore.ErrorLevel)

// NOTE: Tweak these values to increase or decrease the intensity of the
// stress test
//
// nChannels: the total number of channels
// maxQueueSize: the maximum size of the transmit queue
// nReports: the number of reports to expect per node
// nStreamsPerChannel: the number of streams per channel

// LESS STRESSFUL
const nChannels = 2
const maxQueueSize = 4
const nReports = 100

const nStreamsPerChannel = 10_000

clientCSAKeys := make([]csakey.KeyV2, nNodes)
clientPubKeys := make([]ed25519.PublicKey, nNodes)

const salt = 301

for i := 0; i < nNodes; i++ {
k := big.NewInt(int64(salt + i))
key := csakey.MustNewV2XXXTestingOnly(k)
clientCSAKeys[i] = key
clientPubKeys[i] = key.PublicKey
}

steve, backend, configurator, configuratorAddress, _, _, _, _, configStore, configStoreAddress, _, _, _, _ := setupBlockchain(t)
fromBlock := 1

// Setup bootstrap
bootstrapCSAKey := csakey.MustNewV2XXXTestingOnly(big.NewInt(salt - 1))
bootstrapNodePort := freeport.GetOne(t)
appBootstrap, bootstrapPeerID, _, bootstrapKb, _ := setupNode(t, bootstrapNodePort, "bootstrap_llo", backend, bootstrapCSAKey, nil)
bootstrapNode := Node{App: appBootstrap, KeyBundle: bootstrapKb}

t.Run("transmit queue does not grow unbounded", func(t *testing.T) {
packets := make(chan *packet, 100000)
serverKey := csakey.MustNewV2XXXTestingOnly(big.NewInt(salt - 2))
serverPubKey := serverKey.PublicKey
srv := NewMercuryServer(t, ed25519.PrivateKey(serverKey.Raw()), packets)

serverURL := startMercuryServer(t, srv, clientPubKeys)

donID := uint32(888333)
streams := []Stream{ethStream, linkStream}
streamMap := make(map[uint32]Stream)
for _, strm := range streams {
streamMap[strm.id] = strm
}

// Setup oracle nodes
oracles, nodes := setupNodes(t, nNodes, backend, clientCSAKeys, streams, func(c *chainlink.Config) {
c.Mercury.Transmitter.Protocol = ptr(config.MercuryTransmitterProtocolGRPC)
c.Mercury.Transmitter.TransmitQueueMaxSize = ptr(uint32(maxQueueSize)) // Test queue overflow
c.Log.Level = ptr(logLevel)
})

chainID := testutils.SimulatedChainID
relayType := "evm"
relayConfig := fmt.Sprintf(`
chainID = "%s"
fromBlock = %d
lloDonID = %d
lloConfigMode = "bluegreen"
`, chainID, fromBlock, donID)
addBootstrapJob(t, bootstrapNode, configuratorAddress, "job-3", relayType, relayConfig)

mustEncodeOpts := func(opts *lloevm.ReportFormatEVMABIEncodeOpts) []byte {
encoded, err := json.Marshal(opts)
require.NoError(t, err)
return encoded
}

standardMultiplier := ubig.NewI(1e18)
expirationWindow := uint32(3600)

// Channel definitions
// 2,000 channels should produce 2,000 reports per second
channelDefinitions := llotypes.ChannelDefinitions{}
for i := uint32(0); i < nChannels; i++ {
streamDefinitions := make([]llotypes.Stream, nStreamsPerChannel+2)
streamABIDefinitions := make([]lloevm.ABIEncoder, nStreamsPerChannel)

streamDefinitions[0] = llotypes.Stream{
StreamID: ethStreamID,
Aggregator: llotypes.AggregatorMedian,
}
streamDefinitions[1] = llotypes.Stream{
StreamID: linkStreamID,
Aggregator: llotypes.AggregatorMedian,
}

for j := uint32(0); j < nStreamsPerChannel; j++ {
streamDefinitions[j+2] = llotypes.Stream{
StreamID: ethStreamID,
Aggregator: llotypes.AggregatorMedian,
}
streamABIDefinitions[j] = lloevm.ABIEncoder{
StreamID: ethStreamID,
Type: "int192",
Multiplier: standardMultiplier,
}
}

feedID := utils.NewHash()

channelDefinitions[i] = llotypes.ChannelDefinition{
ReportFormat: llotypes.ReportFormatEVMABIEncodeUnpacked,
Streams: streamDefinitions,
Opts: mustEncodeOpts(&lloevm.ReportFormatEVMABIEncodeOpts{
BaseUSDFee: decimal.NewFromFloat32(0.1),
ExpirationWindow: uint32(expirationWindow),

Check failure on line 1173 in core/services/ocr2/plugins/llo/integration_test.go

View workflow job for this annotation

GitHub Actions / GolangCI Lint (.)

unnecessary conversion (unconvert)
FeedID: feedID,
ABI: streamABIDefinitions,
}),
}
}
url, sha := newChannelDefinitionsServer(t, channelDefinitions)

// Set channel definitions
_, err := configStore.SetChannelDefinitions(steve, donID, url, sha)
require.NoError(t, err)
backend.Commit()

// one working and one broken transmission server
pluginConfig := fmt.Sprintf(`servers = { "%s" = "%x", "example.invalid" = "%x" }
donID = %d
channelDefinitionsContractAddress = "0x%x"
channelDefinitionsContractFromBlock = %d`, serverURL, serverPubKey, serverPubKey, donID, configStoreAddress, fromBlock)
addOCRJobsEVMPremiumLegacy(t, streams, serverPubKey, serverURL, configuratorAddress, bootstrapPeerID, bootstrapNodePort, nodes, configStoreAddress, clientPubKeys, pluginConfig, relayType, relayConfig)

var blueDigest ocr2types.ConfigDigest

{
// Set config on configurator
blueDigest = setProductionConfig(
t, donID, steve, backend, configurator, configuratorAddress, nodes, oracles,
)

// NOTE: Wait for 40,000 reports (should take about 5 seconds) - 2,000 reports per second * 4 transmitters * 5 seconds
// count of packets received keyed by transmitter IP
m := map[string]int{}
for pckt := range packets {
pr, ok := peer.FromContext(pckt.ctx)
require.True(t, ok)
addr := pr.Addr
req := pckt.req

assert.Equal(t, uint32(llotypes.ReportFormatEVMABIEncodeUnpacked), req.ReportFormat)
v := make(map[string]interface{})
err := mercury.PayloadTypes.UnpackIntoMap(v, req.Payload)
require.NoError(t, err)
report, exists := v["report"]
if !exists {
t.Fatalf("expected payload %#v to contain 'report'", v)
}
reportCtx, exists := v["reportContext"]
if !exists {
t.Fatalf("expected payload %#v to contain 'reportContext'", v)
}

// Check the report context
assert.Equal(t, [32]byte(blueDigest), reportCtx.([3][32]uint8)[0]) // config digest
assert.Equal(t, "000000000000000000000000000000000000000000000000000d8e0d00000001", fmt.Sprintf("%x", reportCtx.([3][32]uint8)[2])) // extra hash

reportElems := make(map[string]interface{})
err = lloevm.BaseSchema.UnpackIntoMap(reportElems, report.([]byte))
require.NoError(t, err)

// Check payload values
payload := report.([]byte)[192:]

require.Len(t, payload, 32*nStreamsPerChannel)
args := abi.Arguments([]abi.Argument{
{Name: "benchmarkPrice", Type: mustNewType("int192")},
})
v2 := make(map[string]interface{})
err2 := args.UnpackIntoMap(v2, payload)
require.NoError(t, err2)

assert.Equal(t, "2976390000000000000000", v2["benchmarkPrice"].(*big.Int).String())

m[addr.String()]++
finished := 0
for _, cnt := range m {
if cnt >= nReports {
finished++
}
}
if finished == 4 {
break
}
}
}

// Shut all nodes down
for i, node := range nodes {
require.NoError(t, node.App.Stop())
// Ensure that the transmit queue was limited
db := node.App.GetDB()
cnt := 0

// The failing server
err := db.GetContext(tests.Context(t), &cnt, "SELECT count(*) FROM llo_mercury_transmit_queue WHERE server_url = 'example.invalid'")
require.NoError(t, err)
assert.LessOrEqual(t, cnt, maxQueueSize, "persisted transmit queue size too large for node %d for failing server", i)

// The succeeding server
err = db.GetContext(tests.Context(t), &cnt, "SELECT count(*) FROM llo_mercury_transmit_queue WHERE server_url = $1", serverURL)
require.NoError(t, err)
assert.LessOrEqual(t, cnt, maxQueueSize, "persisted transmit queue size too large for node %d for succeeding server", i)
}
})
}

func TestIntegration_LLO_stress_test_and_transmit_errors(t *testing.T) {
t.Parallel()

Expand Down

0 comments on commit 52b8df6

Please sign in to comment.