Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add LLO wide channels integration test #16587

Open
wants to merge 2 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 69 additions & 0 deletions core/services/ocr2/plugins/llo/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -466,6 +466,75 @@ func createBridge(t *testing.T, bridgeName string, resultJSON string, borm bridg
}))
}

func addOCRJobsEVMABIEncode(
t *testing.T,
streams []Stream,
serverPubKey ed25519.PublicKey,
serverURL string,
configuratorAddress common.Address,
bootstrapPeerID string,
bootstrapNodePort int,
nodes []Node,
configStoreAddress common.Address,
clientPubKeys []ed25519.PublicKey,
pluginConfig,
relayType,
relayConfig string) (jobIDs map[int]map[uint32]int32) {
// node idx => stream id => job id
jobIDs = make(map[int]map[uint32]int32)
// Add OCR jobs - one per feed on each node
for i, node := range nodes {
if jobIDs[i] == nil {
jobIDs[i] = make(map[uint32]int32)
}
for j, strm := range streams {
// assume that streams are native, link and additionals are quote
if j < 2 {
var name string
if j == 0 {
name = "nativeprice"
} else {
name = "linkprice"
}
name = fmt.Sprintf("%s-%d-%d", name, strm.id, j)
bmBridge := createSingleDecimalBridge(t, name, i, strm.baseBenchmarkPrice, node.App.BridgeORM())
jobID := addSingleDecimalStreamJob(
t,
node,
strm.id,
bmBridge,
)
jobIDs[i][strm.id] = jobID
} else {
name := "medianprice"

name = fmt.Sprintf("%s-%d-%d", name, strm.id, j)
bmBridge := createSingleDecimalBridge(t, name, i, strm.baseBenchmarkPrice, node.App.BridgeORM())
jobID := addSingleDecimalStreamJob(
t,
node,
strm.id,
bmBridge,
)
jobIDs[i][strm.id] = jobID
}
}
addLLOJob(
t,
node,
configuratorAddress,
bootstrapPeerID,
bootstrapNodePort,
clientPubKeys[i],
"feed-1",
pluginConfig,
relayType,
relayConfig,
)
}
return jobIDs
}

func addOCRJobsEVMPremiumLegacy(
t *testing.T,
streams []Stream,
Expand Down
233 changes: 233 additions & 0 deletions core/services/ocr2/plugins/llo/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1051,6 +1051,239 @@
})
}

func TestIntegration_LLO_stress_test_with_wide_channels_and_transmit_errors(t *testing.T) {
t.Parallel()

// logLevel: the log level to use for the nodes
// setting a more verbose log level increases cpu usage significantly
const logLevel = toml.LogLevel(zapcore.ErrorLevel)

// NOTE: Tweak these values to increase or decrease the intensity of the
// stress test
//
// nChannels: the total number of channels
// maxQueueSize: the maximum size of the transmit queue
// nReports: the number of reports to expect per node
// nStreamsPerChannel: the number of streams per channel

// LESS STRESSFUL
const nChannels = 2
const maxQueueSize = 4
const nReports = 10

const nStreamsPerChannel = 3000

clientCSAKeys := make([]csakey.KeyV2, nNodes)
clientPubKeys := make([]ed25519.PublicKey, nNodes)

const salt = 301

for i := 0; i < nNodes; i++ {
k := big.NewInt(int64(salt + i))
key := csakey.MustNewV2XXXTestingOnly(k)
clientCSAKeys[i] = key
clientPubKeys[i] = key.PublicKey
}

steve, backend, configurator, configuratorAddress, _, _, _, _, configStore, configStoreAddress, _, _, _, _ := setupBlockchain(t)
fromBlock := 1

// Setup bootstrap
bootstrapCSAKey := csakey.MustNewV2XXXTestingOnly(big.NewInt(salt - 1))
bootstrapNodePort := freeport.GetOne(t)
appBootstrap, bootstrapPeerID, _, bootstrapKb, _ := setupNode(t, bootstrapNodePort, "bootstrap_llo", backend, bootstrapCSAKey, nil)
bootstrapNode := Node{App: appBootstrap, KeyBundle: bootstrapKb}

t.Run("transmit queue does not grow unbounded", func(t *testing.T) {
packets := make(chan *packet, 100000)
serverKey := csakey.MustNewV2XXXTestingOnly(big.NewInt(salt - 2))
serverPubKey := serverKey.PublicKey
srv := NewMercuryServer(t, ed25519.PrivateKey(serverKey.Raw()), packets)

serverURL := startMercuryServer(t, srv, clientPubKeys)

donID := uint32(888333)

streams := make([]Stream, nStreamsPerChannel+2)

streams[0] = ethStream
streams[1] = linkStream

streamMap := make(map[uint32]Stream)
for _, strm := range streams {
streamMap[strm.id] = strm
}

// Setup oracle nodes
oracles, nodes := setupNodes(t, nNodes, backend, clientCSAKeys, streams, func(c *chainlink.Config) {
c.Mercury.Transmitter.Protocol = ptr(config.MercuryTransmitterProtocolGRPC)
c.Mercury.Transmitter.TransmitQueueMaxSize = ptr(uint32(maxQueueSize)) // Test queue overflow
c.Log.Level = ptr(logLevel)
})

chainID := testutils.SimulatedChainID
relayType := "evm"
relayConfig := fmt.Sprintf(`
chainID = "%s"
fromBlock = %d
lloDonID = %d
lloConfigMode = "bluegreen"
`, chainID, fromBlock, donID)
addBootstrapJob(t, bootstrapNode, configuratorAddress, "job-3", relayType, relayConfig)

mustEncodeOpts := func(opts *lloevm.ReportFormatEVMABIEncodeOpts) []byte {
encoded, err := json.Marshal(opts)
require.NoError(t, err)
return encoded
}

standardMultiplier := ubig.NewI(1e18)
expirationWindow := uint32(3600)

// Channel definitions
// 2,000 channels should produce 2,000 reports per second
channelDefinitions := llotypes.ChannelDefinitions{}
for i := uint32(0); i < nChannels; i++ {
streamDefinitions := make([]llotypes.Stream, nStreamsPerChannel+2)
streamABIDefinitions := make([]lloevm.ABIEncoder, nStreamsPerChannel)

streamDefinitions[0] = llotypes.Stream{
StreamID: ethStreamID,
Aggregator: llotypes.AggregatorMedian,
}
streamDefinitions[1] = llotypes.Stream{
StreamID: linkStreamID,
Aggregator: llotypes.AggregatorMedian,
}

for j := uint32(0); j < nStreamsPerChannel; j++ {
newStreamID := ethStreamID + 2 + j
streams[j+2] = Stream{
id: newStreamID,
baseBenchmarkPrice: decimal.NewFromFloat32(2_976.39),
}
streamDefinitions[j+2] = llotypes.Stream{
StreamID: ethStreamID,
Aggregator: llotypes.AggregatorMedian,
}
streamABIDefinitions[j] = lloevm.ABIEncoder{
StreamID: ethStreamID,
Type: "int192",
Multiplier: standardMultiplier,
}
}

feedID := utils.NewHash()

channelDefinitions[i] = llotypes.ChannelDefinition{
ReportFormat: llotypes.ReportFormatEVMABIEncodeUnpacked,
Streams: streamDefinitions,
Opts: mustEncodeOpts(&lloevm.ReportFormatEVMABIEncodeOpts{
BaseUSDFee: decimal.NewFromFloat32(0.1),
ExpirationWindow: uint32(expirationWindow),

Check failure on line 1183 in core/services/ocr2/plugins/llo/integration_test.go

View workflow job for this annotation

GitHub Actions / GolangCI Lint (.)

unnecessary conversion (unconvert)
FeedID: feedID,
ABI: streamABIDefinitions,
}),
}
}
url, sha := newChannelDefinitionsServer(t, channelDefinitions)

// Set channel definitions
_, err := configStore.SetChannelDefinitions(steve, donID, url, sha)
require.NoError(t, err)
backend.Commit()

// one working and one broken transmission server
pluginConfig := fmt.Sprintf(`servers = { "%s" = "%x", "example.invalid" = "%x" }
donID = %d
channelDefinitionsContractAddress = "0x%x"
channelDefinitionsContractFromBlock = %d`, serverURL, serverPubKey, serverPubKey, donID, configStoreAddress, fromBlock)
addOCRJobsEVMABIEncode(t, streams, serverPubKey, serverURL, configuratorAddress, bootstrapPeerID, bootstrapNodePort, nodes, configStoreAddress, clientPubKeys, pluginConfig, relayType, relayConfig)

var blueDigest ocr2types.ConfigDigest

{
// Set config on configurator
blueDigest = setProductionConfig(
t, donID, steve, backend, configurator, configuratorAddress, nodes, oracles,
)

// NOTE: Wait for 40,000 reports (should take about 5 seconds) - 2,000 reports per second * 4 transmitters * 5 seconds
// count of packets received keyed by transmitter IP
m := map[string]int{}
for pckt := range packets {
pr, ok := peer.FromContext(pckt.ctx)
require.True(t, ok)
addr := pr.Addr
req := pckt.req

assert.Equal(t, uint32(llotypes.ReportFormatEVMABIEncodeUnpacked), req.ReportFormat)
v := make(map[string]interface{})
err := mercury.PayloadTypes.UnpackIntoMap(v, req.Payload)
require.NoError(t, err)
report, exists := v["report"]
if !exists {
t.Fatalf("expected payload %#v to contain 'report'", v)
}
reportCtx, exists := v["reportContext"]
if !exists {
t.Fatalf("expected payload %#v to contain 'reportContext'", v)
}

// Check the report context
assert.Equal(t, [32]byte(blueDigest), reportCtx.([3][32]uint8)[0]) // config digest
assert.Equal(t, "000000000000000000000000000000000000000000000000000d8e0d00000001", fmt.Sprintf("%x", reportCtx.([3][32]uint8)[2])) // extra hash

reportElems := make(map[string]interface{})
err = lloevm.BaseSchema.UnpackIntoMap(reportElems, report.([]byte))
require.NoError(t, err)

// Check payload values
payload := report.([]byte)[192:]

require.Len(t, payload, 32*nStreamsPerChannel)
args := abi.Arguments([]abi.Argument{
{Name: "benchmarkPrice", Type: mustNewType("int192")},
})
v2 := make(map[string]interface{})
err2 := args.UnpackIntoMap(v2, payload)
require.NoError(t, err2)

assert.Equal(t, "2976390000000000000000", v2["benchmarkPrice"].(*big.Int).String())

m[addr.String()]++
finished := 0
for _, cnt := range m {
if cnt >= nReports {
finished++
}
}
if finished == 4 {
break
}
}
}

// Shut all nodes down
for i, node := range nodes {
require.NoError(t, node.App.Stop())
// Ensure that the transmit queue was limited
db := node.App.GetDB()
cnt := 0

// The failing server
err := db.GetContext(tests.Context(t), &cnt, "SELECT count(*) FROM llo_mercury_transmit_queue WHERE server_url = 'example.invalid'")
require.NoError(t, err)
assert.LessOrEqual(t, cnt, maxQueueSize, "persisted transmit queue size too large for node %d for failing server", i)

// The succeeding server
err = db.GetContext(tests.Context(t), &cnt, "SELECT count(*) FROM llo_mercury_transmit_queue WHERE server_url = $1", serverURL)
require.NoError(t, err)
assert.LessOrEqual(t, cnt, maxQueueSize, "persisted transmit queue size too large for node %d for succeeding server", i)
}
})
}

func TestIntegration_LLO_stress_test_and_transmit_errors(t *testing.T) {
t.Parallel()

Expand Down
Loading