diff --git a/core/services/llo/evm/report_codec_premium_legacy.go b/core/services/llo/evm/report_codec_premium_legacy.go index 94028943664..6a262bef43f 100644 --- a/core/services/llo/evm/report_codec_premium_legacy.go +++ b/core/services/llo/evm/report_codec_premium_legacy.go @@ -27,7 +27,7 @@ import ( ) var ( - _ llo.ReportCodec = ReportCodecPremiumLegacy{} + _ llo.ReportEncoder = ReportCodecPremiumLegacy{} ) type ReportCodecPremiumLegacy struct { diff --git a/core/services/llo/triggers/payload.proto b/core/services/llo/triggers/payload.proto new file mode 100644 index 00000000000..e69de29bb2d diff --git a/core/services/ocr2/delegate.go b/core/services/ocr2/delegate.go index 3b011121687..60902c9366c 100644 --- a/core/services/ocr2/delegate.go +++ b/core/services/ocr2/delegate.go @@ -1013,7 +1013,7 @@ func (d *Delegate) newServicesLLO( // Also re-use EVM keys for signing the retirement report. This isn't // required, just seems easiest since it's the only key type available for // now. - for _, rf := range []llotypes.ReportFormat{llotypes.ReportFormatJSON, llotypes.ReportFormatEVMPremiumLegacy, llotypes.ReportFormatRetirement, llotypes.ReportFormatEVMABIEncodeUnpacked} { + for _, rf := range []llotypes.ReportFormat{llotypes.ReportFormatJSON, llotypes.ReportFormatProto, llotypes.ReportFormatEVMPremiumLegacy, llotypes.ReportFormatRetirement, llotypes.ReportFormatEVMABIEncodeUnpacked} { if _, exists := kbm[rf]; !exists { // Use the first if unspecified kbs, err3 := d.ks.GetAllOfType("evm") diff --git a/core/services/ocr2/plugins/llo/integration_test.go b/core/services/ocr2/plugins/llo/integration_test.go index 634ab0be170..2754474094b 100644 --- a/core/services/ocr2/plugins/llo/integration_test.go +++ b/core/services/ocr2/plugins/llo/integration_test.go @@ -566,7 +566,7 @@ channelDefinitionsContractFromBlock = %d`, serverURL, serverPubKey, donID, confi }) } -func TestIntegration_LLO_evm_abi_encode_unpacked(t *testing.T) { +func TestIntegration_LLO_multi_report_formats(t *testing.T) { t.Parallel() testStartTimeStamp := time.Now() @@ -592,7 +592,7 @@ func TestIntegration_LLO_evm_abi_encode_unpacked(t *testing.T) { appBootstrap, bootstrapPeerID, _, bootstrapKb, _ := setupNode(t, bootstrapNodePort, "bootstrap_llo", backend, bootstrapCSAKey, nil) bootstrapNode := Node{App: appBootstrap, KeyBundle: bootstrapKb} - t.Run("generates reports using go ReportFormatEVMABIEncodeUnpacked format", func(t *testing.T) { + t.Run("generates reports using ReportFormatEVMABIEncodeUnpacked and ReportFormatProto", func(t *testing.T) { packetCh := make(chan *packet, 100000) serverKey := csakey.MustNewV2XXXTestingOnly(big.NewInt(salt - 2)) serverPubKey := serverKey.PublicKey @@ -822,6 +822,58 @@ lloConfigMode = "bluegreen" }, }), }, + // Sample proto schema (one stream) + 5: { + ReportFormat: llotypes.ReportFormatProto, + Streams: []llotypes.Stream{ + { + StreamID: benchmarkPriceStreamID, + Aggregator: llotypes.AggregatorMedian, + }, + { + StreamID: benchmarkPriceStreamID, + Aggregator: llotypes.AggregatorMode, + }, + }, + }, + // Sample proto schema (many streams) + 6: { + ReportFormat: llotypes.ReportFormatProto, + Streams: []llotypes.Stream{ + { + StreamID: benchmarkPriceStreamID, + Aggregator: llotypes.AggregatorMedian, + }, + { + StreamID: binanceFundingRateStreamID, + Aggregator: llotypes.AggregatorMedian, + }, + { + StreamID: binanceFundingTimeStreamID, + Aggregator: llotypes.AggregatorMedian, + }, + { + StreamID: binanceFundingIntervalHoursStreamID, + Aggregator: llotypes.AggregatorMedian, + }, + { + StreamID: deribitFundingRateStreamID, + Aggregator: llotypes.AggregatorMedian, + }, + { + StreamID: deribitFundingTimeStreamID, + Aggregator: llotypes.AggregatorMedian, + }, + { + StreamID: deribitFundingIntervalHoursStreamID, + Aggregator: llotypes.AggregatorMedian, + }, + { + StreamID: marketStatusStreamID, + Aggregator: llotypes.AggregatorMode, + }, + }, + }, } url, sha := newChannelDefinitionsServer(t, channelDefinitions) @@ -847,9 +899,24 @@ channelDefinitionsContractFromBlock = %d`, serverURL, serverPubKey, donID, confi "binanceFundingIntervalHours": "8", "deribitFundingRate": "5432.2345", "deribitFundingTime": "1630000000", - "deribitFundingIntervalHours": "8" + "deribitFundingIntervalHours": "8", + "ethPrice": "3976.39", + "linkPrice": "23.45" }` + pricePipeline := fmt.Sprintf(` +dp [type=bridge name="%s" requestData="{\\"data\\":{\\"data\\":\\"foo\\"}}"]; + +eth_parse [type=jsonparse path="result,ethPrice"]; +eth_decimal [type=multiply times=1 streamID=%d]; + +link_parse [type=jsonparse path="result,linkPrice"]; +link_decimal [type=multiply times=1 streamID=%d]; + +dp -> eth_parse -> eth_decimal; +dp -> link_parse -> link_decimal; +`, bridgeName, ethStreamID, linkStreamID) + dexBasedAssetPipeline := fmt.Sprintf(` dp [type=bridge name="%s" requestData="{\\"data\\":{\\"data\\":\\"foo\\"}}"]; @@ -917,6 +984,7 @@ dp -> deribit_funding_interval_hours_parse -> deribit_funding_interval_hours_dec // superBridge returns a JSON with everything you want in it, // stream specs can just pick the individual fields they need createBridge(t, bridgeName, resultJSON, node.App.BridgeORM()) + addStreamSpec(t, node, "pricePipeline", nil, pricePipeline) addStreamSpec(t, node, "dexBasedAssetPipeline", nil, dexBasedAssetPipeline) addStreamSpec(t, node, "rwaPipeline", nil, rwaPipeline) addStreamSpec(t, node, "benchmarkPricePipeline", nil, benchmarkPricePipeline) @@ -928,7 +996,7 @@ dp -> deribit_funding_interval_hours_parse -> deribit_funding_interval_hours_dec bootstrapPeerID, bootstrapNodePort, clientPubKeys[i], - "llo-evm-abi-encode-unpacked-test", + "llo-multi-report-formats-test", pluginConfig, relayType, relayConfig, @@ -941,6 +1009,7 @@ dp -> deribit_funding_interval_hours_parse -> deribit_funding_interval_hours_dec ) // NOTE: Wait for one of each type of report + // FeedIDs for ReportFormatEVMABIEncodeUnpacked feedIDs := map[[32]byte]struct{}{ dexBasedAssetFeedID: {}, rwaFeedID: {}, @@ -948,103 +1017,153 @@ dp -> deribit_funding_interval_hours_parse -> deribit_funding_interval_hours_dec fundingRateFeedID: {}, } + // ChannelIDs for ReportFormatProto + channelIDs := map[uint32]struct{}{ + 5: {}, + 6: {}, + } + for pckt := range packetCh { req := pckt.req - assert.Equal(t, uint32(llotypes.ReportFormatEVMABIEncodeUnpacked), req.ReportFormat) - v := make(map[string]interface{}) - err := mercury.PayloadTypes.UnpackIntoMap(v, req.Payload) - require.NoError(t, err) - report, exists := v["report"] - if !exists { - t.Fatalf("expected payload %#v to contain 'report'", v) - } - reportCtx, exists := v["reportContext"] - if !exists { - t.Fatalf("expected payload %#v to contain 'reportContext'", v) - } - - // Check the report context - assert.Equal(t, [32]byte(digest), reportCtx.([3][32]uint8)[0]) // config digest - assert.Equal(t, "000000000000000000000000000000000000000000000000000d8e0d00000001", fmt.Sprintf("%x", reportCtx.([3][32]uint8)[2])) // extra hash - - reportElems := make(map[string]interface{}) - err = lloevm.BaseSchema.UnpackIntoMap(reportElems, report.([]byte)) - require.NoError(t, err) - - feedID := reportElems["feedId"].([32]uint8) - delete(feedIDs, feedID) - - // Check headers - assert.GreaterOrEqual(t, reportElems["validFromTimestamp"].(uint32), uint32(testStartTimeStamp.Unix())) //nolint:gosec // G115 - assert.GreaterOrEqual(t, int(reportElems["observationsTimestamp"].(uint32)), int(testStartTimeStamp.Unix())) - // Zero fees since both eth/link stream specs are missing, don't - // care about billing for purposes of this test - assert.Equal(t, "0", reportElems["nativeFee"].(*big.Int).String()) - assert.Equal(t, "0", reportElems["linkFee"].(*big.Int).String()) - assert.Equal(t, reportElems["observationsTimestamp"].(uint32)+expirationWindow, reportElems["expiresAt"].(uint32)) - - // Check payload values - payload := report.([]byte)[192:] - switch hex.EncodeToString(feedID[:]) { - case hex.EncodeToString(dexBasedAssetFeedID[:]): - require.Len(t, payload, 96) - args := abi.Arguments([]abi.Argument{ - {Name: "benchmarkPrice", Type: mustNewType("int192")}, - {Name: "baseMarketDepth", Type: mustNewType("int192")}, - {Name: "quoteMarketDepth", Type: mustNewType("int192")}, - }) + switch req.ReportFormat { + case uint32(llotypes.ReportFormatEVMABIEncodeUnpacked): v := make(map[string]interface{}) - err := args.UnpackIntoMap(v, payload) + err := mercury.PayloadTypes.UnpackIntoMap(v, req.Payload) require.NoError(t, err) + report, exists := v["report"] + if !exists { + t.Fatalf("expected payload %#v to contain 'report'", v) + } + reportCtx, exists := v["reportContext"] + if !exists { + t.Fatalf("expected payload %#v to contain 'reportContext'", v) + } - assert.Equal(t, "2976390000000000000000", v["benchmarkPrice"].(*big.Int).String()) - assert.Equal(t, "1000", v["baseMarketDepth"].(*big.Int).String()) - assert.Equal(t, "998", v["quoteMarketDepth"].(*big.Int).String()) - case hex.EncodeToString(rwaFeedID[:]): - require.Len(t, payload, 32) - args := abi.Arguments([]abi.Argument{ - {Name: "marketStatus", Type: mustNewType("uint32")}, - }) - v := make(map[string]interface{}) - err := args.UnpackIntoMap(v, payload) - require.NoError(t, err) + // Check the report context + assert.Equal(t, [32]byte(digest), reportCtx.([3][32]uint8)[0]) // config digest + assert.Equal(t, "000000000000000000000000000000000000000000000000000d8e0d00000001", fmt.Sprintf("%x", reportCtx.([3][32]uint8)[2])) // extra hash - assert.Equal(t, uint32(1), v["marketStatus"].(uint32)) - case hex.EncodeToString(benchmarkPriceFeedID[:]): - require.Len(t, payload, 32) - args := abi.Arguments([]abi.Argument{ - {Name: "benchmarkPrice", Type: mustNewType("int192")}, - }) - v := make(map[string]interface{}) - err := args.UnpackIntoMap(v, payload) + reportElems := make(map[string]interface{}) + err = lloevm.BaseSchema.UnpackIntoMap(reportElems, report.([]byte)) require.NoError(t, err) - assert.Equal(t, "2976390000000000000000", v["benchmarkPrice"].(*big.Int).String()) - case hex.EncodeToString(fundingRateFeedID[:]): - require.Len(t, payload, 192) - args := abi.Arguments([]abi.Argument{ - {Name: "binanceFundingRate", Type: mustNewType("int192")}, - {Name: "binanceFundingTime", Type: mustNewType("int192")}, - {Name: "binanceFundingIntervalHours", Type: mustNewType("int192")}, - {Name: "deribitFundingRate", Type: mustNewType("int192")}, - {Name: "deribitFundingTime", Type: mustNewType("int192")}, - {Name: "deribitFundingIntervalHours", Type: mustNewType("int192")}, - }) - v := make(map[string]interface{}) - err := args.UnpackIntoMap(v, payload) + feedID := reportElems["feedId"].([32]uint8) + delete(feedIDs, feedID) + + // Check headers + assert.GreaterOrEqual(t, reportElems["validFromTimestamp"].(uint32), uint32(testStartTimeStamp.Unix())) //nolint:gosec // G115 + assert.GreaterOrEqual(t, int(reportElems["observationsTimestamp"].(uint32)), int(testStartTimeStamp.Unix())) + assert.Equal(t, "25148438659186", reportElems["nativeFee"].(*big.Int).String()) + assert.Equal(t, "4264392324093817", reportElems["linkFee"].(*big.Int).String()) + assert.Equal(t, reportElems["observationsTimestamp"].(uint32)+expirationWindow, reportElems["expiresAt"].(uint32)) + + // Check payload values + payload := report.([]byte)[192:] + switch hex.EncodeToString(feedID[:]) { + case hex.EncodeToString(dexBasedAssetFeedID[:]): + require.Len(t, payload, 96) + args := abi.Arguments([]abi.Argument{ + {Name: "benchmarkPrice", Type: mustNewType("int192")}, + {Name: "baseMarketDepth", Type: mustNewType("int192")}, + {Name: "quoteMarketDepth", Type: mustNewType("int192")}, + }) + v := make(map[string]interface{}) + err := args.UnpackIntoMap(v, payload) + require.NoError(t, err) + + assert.Equal(t, "2976390000000000000000", v["benchmarkPrice"].(*big.Int).String()) + assert.Equal(t, "1000", v["baseMarketDepth"].(*big.Int).String()) + assert.Equal(t, "998", v["quoteMarketDepth"].(*big.Int).String()) + case hex.EncodeToString(rwaFeedID[:]): + require.Len(t, payload, 32) + args := abi.Arguments([]abi.Argument{ + {Name: "marketStatus", Type: mustNewType("uint32")}, + }) + v := make(map[string]interface{}) + err := args.UnpackIntoMap(v, payload) + require.NoError(t, err) + + assert.Equal(t, uint32(1), v["marketStatus"].(uint32)) + case hex.EncodeToString(benchmarkPriceFeedID[:]): + require.Len(t, payload, 32) + args := abi.Arguments([]abi.Argument{ + {Name: "benchmarkPrice", Type: mustNewType("int192")}, + }) + v := make(map[string]interface{}) + err := args.UnpackIntoMap(v, payload) + require.NoError(t, err) + + assert.Equal(t, "2976390000000000000000", v["benchmarkPrice"].(*big.Int).String()) + case hex.EncodeToString(fundingRateFeedID[:]): + require.Len(t, payload, 192) + args := abi.Arguments([]abi.Argument{ + {Name: "binanceFundingRate", Type: mustNewType("int192")}, + {Name: "binanceFundingTime", Type: mustNewType("int192")}, + {Name: "binanceFundingIntervalHours", Type: mustNewType("int192")}, + {Name: "deribitFundingRate", Type: mustNewType("int192")}, + {Name: "deribitFundingTime", Type: mustNewType("int192")}, + {Name: "deribitFundingIntervalHours", Type: mustNewType("int192")}, + }) + v := make(map[string]interface{}) + err := args.UnpackIntoMap(v, payload) + require.NoError(t, err) + + assert.Equal(t, "1234", v["binanceFundingRate"].(*big.Int).String()) + assert.Equal(t, "1630000000", v["binanceFundingTime"].(*big.Int).String()) + assert.Equal(t, "8", v["binanceFundingIntervalHours"].(*big.Int).String()) + assert.Equal(t, "5432", v["deribitFundingRate"].(*big.Int).String()) + assert.Equal(t, "1630000000", v["deribitFundingTime"].(*big.Int).String()) + assert.Equal(t, "8", v["deribitFundingIntervalHours"].(*big.Int).String()) + default: + t.Fatalf("unexpected feedID: %x", feedID) + } + + case uint32(llotypes.ReportFormatProto): + p, r, err := datastreamsllo.ProtoReportCodec{}.UnpackDecode(req.Payload) require.NoError(t, err) + // Check the report context + assert.Equal(t, digest[:], p.ConfigDigest) + assert.Positive(t, p.SeqNr) + assert.GreaterOrEqual(t, len(p.Sigs), fNodes+1) + // Check the report + assert.GreaterOrEqual(t, int(r.ObservationTimestampSeconds), int(testStartTimeStamp.Unix())) + assert.LessOrEqual(t, int(r.ValidAfterSeconds), int(r.ObservationTimestampSeconds)) + + switch r.ChannelId { + case 5: + assert.Len(t, r.StreamValues, 1) + assert.Contains(t, r.StreamValues, benchmarkPriceStreamID) + assert.Equal(t, "foo", r.StreamValues[benchmarkPriceStreamID]) + case 6: + assert.Len(t, r.StreamValues, 8) + assert.Contains(t, r.StreamValues, benchmarkPriceStreamID) + datastreamsllo.UnmarshalProtoStreamValue(r.StreamValues[benchmarkPriceStreamID]) + assert.Equal(t, "foo", r.StreamValues[benchmarkPriceStreamID]) + assert.Contains(t, r.StreamValues, binanceFundingRateStreamID) + assert.Equal(t, "foo", r.StreamValues[binanceFundingRateStreamID]) + assert.Contains(t, r.StreamValues, binanceFundingTimeStreamID) + assert.Equal(t, "foo", r.StreamValues[binanceFundingTimeStreamID]) + assert.Contains(t, r.StreamValues, binanceFundingIntervalHoursStreamID) + assert.Equal(t, "foo", r.StreamValues[binanceFundingIntervalHoursStreamID]) + assert.Contains(t, r.StreamValues, deribitFundingRateStreamID) + assert.Equal(t, "foo", r.StreamValues[deribitFundingRateStreamID]) + assert.Contains(t, r.StreamValues, deribitFundingTimeStreamID) + assert.Equal(t, "foo", r.StreamValues[deribitFundingTimeStreamID]) + assert.Contains(t, r.StreamValues, deribitFundingIntervalHoursStreamID) + assert.Equal(t, "foo", r.StreamValues[deribitFundingIntervalHoursStreamID]) + assert.Contains(t, r.StreamValues, marketStatusStreamID) + assert.Equal(t, "foo", r.StreamValues[marketStatusStreamID]) + } - assert.Equal(t, "1234", v["binanceFundingRate"].(*big.Int).String()) - assert.Equal(t, "1630000000", v["binanceFundingTime"].(*big.Int).String()) - assert.Equal(t, "8", v["binanceFundingIntervalHours"].(*big.Int).String()) - assert.Equal(t, "5432", v["deribitFundingRate"].(*big.Int).String()) - assert.Equal(t, "1630000000", v["deribitFundingTime"].(*big.Int).String()) - assert.Equal(t, "8", v["deribitFundingIntervalHours"].(*big.Int).String()) + if _, exists := channelIDs[r.ChannelId]; !exists { + t.Fatalf("unexpected channelID: %d", r.ChannelId) + } + delete(channelIDs, r.ChannelId) default: - t.Fatalf("unexpected feedID: %x", feedID) + t.Fatalf("unexpected report format: %d", req.ReportFormat) } - - if len(feedIDs) == 0 { + if len(feedIDs) == 0 && len(channelIDs) == 0 { + // SUCCESS! break } } diff --git a/core/services/relay/evm/llo_config_provider.go b/core/services/relay/evm/llo_config_provider.go index 99f2824a5c9..7a665a26bd1 100644 --- a/core/services/relay/evm/llo_config_provider.go +++ b/core/services/relay/evm/llo_config_provider.go @@ -39,7 +39,7 @@ func (l *lloConfigProvider) OffchainConfigDigester() ocrtypes.OffchainConfigDige func (l *lloConfigProvider) ContractConfigTracker() ocrtypes.ContractConfigTracker { // FIXME: Only return Blue for now. This is a hack to make the bootstrap // job work, needs to support multiple config trackers here - // MERC-5954 + // MERC-6839 return l.cps[0] }