Skip to content

Commit

Permalink
w
Browse files Browse the repository at this point in the history
  • Loading branch information
samsondav committed Feb 17, 2025
1 parent 64550df commit 97eb3e2
Show file tree
Hide file tree
Showing 5 changed files with 211 additions and 92 deletions.
2 changes: 1 addition & 1 deletion core/services/llo/evm/report_codec_premium_legacy.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (
)

var (
_ llo.ReportCodec = ReportCodecPremiumLegacy{}
_ llo.ReportEncoder = ReportCodecPremiumLegacy{}
)

type ReportCodecPremiumLegacy struct {
Expand Down
Empty file.
2 changes: 1 addition & 1 deletion core/services/ocr2/delegate.go
Original file line number Diff line number Diff line change
Expand Up @@ -1013,7 +1013,7 @@ func (d *Delegate) newServicesLLO(
// Also re-use EVM keys for signing the retirement report. This isn't
// required, just seems easiest since it's the only key type available for
// now.
for _, rf := range []llotypes.ReportFormat{llotypes.ReportFormatJSON, llotypes.ReportFormatEVMPremiumLegacy, llotypes.ReportFormatRetirement, llotypes.ReportFormatEVMABIEncodeUnpacked} {
for _, rf := range []llotypes.ReportFormat{llotypes.ReportFormatJSON, llotypes.ReportFormatProto, llotypes.ReportFormatEVMPremiumLegacy, llotypes.ReportFormatRetirement, llotypes.ReportFormatEVMABIEncodeUnpacked} {
if _, exists := kbm[rf]; !exists {
// Use the first if unspecified
kbs, err3 := d.ks.GetAllOfType("evm")
Expand Down
297 changes: 208 additions & 89 deletions core/services/ocr2/plugins/llo/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -566,7 +566,7 @@ channelDefinitionsContractFromBlock = %d`, serverURL, serverPubKey, donID, confi
})
}

func TestIntegration_LLO_evm_abi_encode_unpacked(t *testing.T) {
func TestIntegration_LLO_multi_report_formats(t *testing.T) {
t.Parallel()

testStartTimeStamp := time.Now()
Expand All @@ -592,7 +592,7 @@ func TestIntegration_LLO_evm_abi_encode_unpacked(t *testing.T) {
appBootstrap, bootstrapPeerID, _, bootstrapKb, _ := setupNode(t, bootstrapNodePort, "bootstrap_llo", backend, bootstrapCSAKey, nil)
bootstrapNode := Node{App: appBootstrap, KeyBundle: bootstrapKb}

t.Run("generates reports using go ReportFormatEVMABIEncodeUnpacked format", func(t *testing.T) {
t.Run("generates reports using ReportFormatEVMABIEncodeUnpacked and ReportFormatProto", func(t *testing.T) {
packetCh := make(chan *packet, 100000)
serverKey := csakey.MustNewV2XXXTestingOnly(big.NewInt(salt - 2))
serverPubKey := serverKey.PublicKey
Expand Down Expand Up @@ -822,6 +822,58 @@ lloConfigMode = "bluegreen"
},
}),
},
// Sample proto schema (one stream)
5: {
ReportFormat: llotypes.ReportFormatProto,
Streams: []llotypes.Stream{
{
StreamID: benchmarkPriceStreamID,
Aggregator: llotypes.AggregatorMedian,
},
{
StreamID: benchmarkPriceStreamID,
Aggregator: llotypes.AggregatorMode,
},
},
},
// Sample proto schema (many streams)
6: {
ReportFormat: llotypes.ReportFormatProto,
Streams: []llotypes.Stream{
{
StreamID: benchmarkPriceStreamID,
Aggregator: llotypes.AggregatorMedian,
},
{
StreamID: binanceFundingRateStreamID,
Aggregator: llotypes.AggregatorMedian,
},
{
StreamID: binanceFundingTimeStreamID,
Aggregator: llotypes.AggregatorMedian,
},
{
StreamID: binanceFundingIntervalHoursStreamID,
Aggregator: llotypes.AggregatorMedian,
},
{
StreamID: deribitFundingRateStreamID,
Aggregator: llotypes.AggregatorMedian,
},
{
StreamID: deribitFundingTimeStreamID,
Aggregator: llotypes.AggregatorMedian,
},
{
StreamID: deribitFundingIntervalHoursStreamID,
Aggregator: llotypes.AggregatorMedian,
},
{
StreamID: marketStatusStreamID,
Aggregator: llotypes.AggregatorMode,
},
},
},
}
url, sha := newChannelDefinitionsServer(t, channelDefinitions)

Expand All @@ -847,9 +899,24 @@ channelDefinitionsContractFromBlock = %d`, serverURL, serverPubKey, donID, confi
"binanceFundingIntervalHours": "8",
"deribitFundingRate": "5432.2345",
"deribitFundingTime": "1630000000",
"deribitFundingIntervalHours": "8"
"deribitFundingIntervalHours": "8",
"ethPrice": "3976.39",
"linkPrice": "23.45"
}`

pricePipeline := fmt.Sprintf(`
dp [type=bridge name="%s" requestData="{\\"data\\":{\\"data\\":\\"foo\\"}}"];
eth_parse [type=jsonparse path="result,ethPrice"];
eth_decimal [type=multiply times=1 streamID=%d];
link_parse [type=jsonparse path="result,linkPrice"];
link_decimal [type=multiply times=1 streamID=%d];
dp -> eth_parse -> eth_decimal;
dp -> link_parse -> link_decimal;
`, bridgeName, ethStreamID, linkStreamID)

dexBasedAssetPipeline := fmt.Sprintf(`
dp [type=bridge name="%s" requestData="{\\"data\\":{\\"data\\":\\"foo\\"}}"];
Expand Down Expand Up @@ -917,6 +984,7 @@ dp -> deribit_funding_interval_hours_parse -> deribit_funding_interval_hours_dec
// superBridge returns a JSON with everything you want in it,
// stream specs can just pick the individual fields they need
createBridge(t, bridgeName, resultJSON, node.App.BridgeORM())
addStreamSpec(t, node, "pricePipeline", nil, pricePipeline)
addStreamSpec(t, node, "dexBasedAssetPipeline", nil, dexBasedAssetPipeline)
addStreamSpec(t, node, "rwaPipeline", nil, rwaPipeline)
addStreamSpec(t, node, "benchmarkPricePipeline", nil, benchmarkPricePipeline)
Expand All @@ -928,7 +996,7 @@ dp -> deribit_funding_interval_hours_parse -> deribit_funding_interval_hours_dec
bootstrapPeerID,
bootstrapNodePort,
clientPubKeys[i],
"llo-evm-abi-encode-unpacked-test",
"llo-multi-report-formats-test",
pluginConfig,
relayType,
relayConfig,
Expand All @@ -941,110 +1009,161 @@ dp -> deribit_funding_interval_hours_parse -> deribit_funding_interval_hours_dec
)

// NOTE: Wait for one of each type of report
// FeedIDs for ReportFormatEVMABIEncodeUnpacked
feedIDs := map[[32]byte]struct{}{
dexBasedAssetFeedID: {},
rwaFeedID: {},
benchmarkPriceFeedID: {},
fundingRateFeedID: {},
}

// ChannelIDs for ReportFormatProto
channelIDs := map[uint32]struct{}{
5: {},
6: {},
}

for pckt := range packetCh {
req := pckt.req
assert.Equal(t, uint32(llotypes.ReportFormatEVMABIEncodeUnpacked), req.ReportFormat)
v := make(map[string]interface{})
err := mercury.PayloadTypes.UnpackIntoMap(v, req.Payload)
require.NoError(t, err)
report, exists := v["report"]
if !exists {
t.Fatalf("expected payload %#v to contain 'report'", v)
}
reportCtx, exists := v["reportContext"]
if !exists {
t.Fatalf("expected payload %#v to contain 'reportContext'", v)
}

// Check the report context
assert.Equal(t, [32]byte(digest), reportCtx.([3][32]uint8)[0]) // config digest
assert.Equal(t, "000000000000000000000000000000000000000000000000000d8e0d00000001", fmt.Sprintf("%x", reportCtx.([3][32]uint8)[2])) // extra hash

reportElems := make(map[string]interface{})
err = lloevm.BaseSchema.UnpackIntoMap(reportElems, report.([]byte))
require.NoError(t, err)

feedID := reportElems["feedId"].([32]uint8)
delete(feedIDs, feedID)

// Check headers
assert.GreaterOrEqual(t, reportElems["validFromTimestamp"].(uint32), uint32(testStartTimeStamp.Unix())) //nolint:gosec // G115
assert.GreaterOrEqual(t, int(reportElems["observationsTimestamp"].(uint32)), int(testStartTimeStamp.Unix()))
// Zero fees since both eth/link stream specs are missing, don't
// care about billing for purposes of this test
assert.Equal(t, "0", reportElems["nativeFee"].(*big.Int).String())
assert.Equal(t, "0", reportElems["linkFee"].(*big.Int).String())
assert.Equal(t, reportElems["observationsTimestamp"].(uint32)+expirationWindow, reportElems["expiresAt"].(uint32))

// Check payload values
payload := report.([]byte)[192:]
switch hex.EncodeToString(feedID[:]) {
case hex.EncodeToString(dexBasedAssetFeedID[:]):
require.Len(t, payload, 96)
args := abi.Arguments([]abi.Argument{
{Name: "benchmarkPrice", Type: mustNewType("int192")},
{Name: "baseMarketDepth", Type: mustNewType("int192")},
{Name: "quoteMarketDepth", Type: mustNewType("int192")},
})
switch req.ReportFormat {
case uint32(llotypes.ReportFormatEVMABIEncodeUnpacked):
v := make(map[string]interface{})
err := args.UnpackIntoMap(v, payload)
err := mercury.PayloadTypes.UnpackIntoMap(v, req.Payload)
require.NoError(t, err)
report, exists := v["report"]
if !exists {
t.Fatalf("expected payload %#v to contain 'report'", v)
}
reportCtx, exists := v["reportContext"]
if !exists {
t.Fatalf("expected payload %#v to contain 'reportContext'", v)
}

assert.Equal(t, "2976390000000000000000", v["benchmarkPrice"].(*big.Int).String())
assert.Equal(t, "1000", v["baseMarketDepth"].(*big.Int).String())
assert.Equal(t, "998", v["quoteMarketDepth"].(*big.Int).String())
case hex.EncodeToString(rwaFeedID[:]):
require.Len(t, payload, 32)
args := abi.Arguments([]abi.Argument{
{Name: "marketStatus", Type: mustNewType("uint32")},
})
v := make(map[string]interface{})
err := args.UnpackIntoMap(v, payload)
require.NoError(t, err)
// Check the report context
assert.Equal(t, [32]byte(digest), reportCtx.([3][32]uint8)[0]) // config digest
assert.Equal(t, "000000000000000000000000000000000000000000000000000d8e0d00000001", fmt.Sprintf("%x", reportCtx.([3][32]uint8)[2])) // extra hash

assert.Equal(t, uint32(1), v["marketStatus"].(uint32))
case hex.EncodeToString(benchmarkPriceFeedID[:]):
require.Len(t, payload, 32)
args := abi.Arguments([]abi.Argument{
{Name: "benchmarkPrice", Type: mustNewType("int192")},
})
v := make(map[string]interface{})
err := args.UnpackIntoMap(v, payload)
reportElems := make(map[string]interface{})
err = lloevm.BaseSchema.UnpackIntoMap(reportElems, report.([]byte))
require.NoError(t, err)

assert.Equal(t, "2976390000000000000000", v["benchmarkPrice"].(*big.Int).String())
case hex.EncodeToString(fundingRateFeedID[:]):
require.Len(t, payload, 192)
args := abi.Arguments([]abi.Argument{
{Name: "binanceFundingRate", Type: mustNewType("int192")},
{Name: "binanceFundingTime", Type: mustNewType("int192")},
{Name: "binanceFundingIntervalHours", Type: mustNewType("int192")},
{Name: "deribitFundingRate", Type: mustNewType("int192")},
{Name: "deribitFundingTime", Type: mustNewType("int192")},
{Name: "deribitFundingIntervalHours", Type: mustNewType("int192")},
})
v := make(map[string]interface{})
err := args.UnpackIntoMap(v, payload)
feedID := reportElems["feedId"].([32]uint8)
delete(feedIDs, feedID)

// Check headers
assert.GreaterOrEqual(t, reportElems["validFromTimestamp"].(uint32), uint32(testStartTimeStamp.Unix())) //nolint:gosec // G115
assert.GreaterOrEqual(t, int(reportElems["observationsTimestamp"].(uint32)), int(testStartTimeStamp.Unix()))
assert.Equal(t, "25148438659186", reportElems["nativeFee"].(*big.Int).String())
assert.Equal(t, "4264392324093817", reportElems["linkFee"].(*big.Int).String())
assert.Equal(t, reportElems["observationsTimestamp"].(uint32)+expirationWindow, reportElems["expiresAt"].(uint32))

// Check payload values
payload := report.([]byte)[192:]
switch hex.EncodeToString(feedID[:]) {
case hex.EncodeToString(dexBasedAssetFeedID[:]):
require.Len(t, payload, 96)
args := abi.Arguments([]abi.Argument{
{Name: "benchmarkPrice", Type: mustNewType("int192")},
{Name: "baseMarketDepth", Type: mustNewType("int192")},
{Name: "quoteMarketDepth", Type: mustNewType("int192")},
})
v := make(map[string]interface{})
err := args.UnpackIntoMap(v, payload)
require.NoError(t, err)

assert.Equal(t, "2976390000000000000000", v["benchmarkPrice"].(*big.Int).String())
assert.Equal(t, "1000", v["baseMarketDepth"].(*big.Int).String())
assert.Equal(t, "998", v["quoteMarketDepth"].(*big.Int).String())
case hex.EncodeToString(rwaFeedID[:]):
require.Len(t, payload, 32)
args := abi.Arguments([]abi.Argument{
{Name: "marketStatus", Type: mustNewType("uint32")},
})
v := make(map[string]interface{})
err := args.UnpackIntoMap(v, payload)
require.NoError(t, err)

assert.Equal(t, uint32(1), v["marketStatus"].(uint32))
case hex.EncodeToString(benchmarkPriceFeedID[:]):
require.Len(t, payload, 32)
args := abi.Arguments([]abi.Argument{
{Name: "benchmarkPrice", Type: mustNewType("int192")},
})
v := make(map[string]interface{})
err := args.UnpackIntoMap(v, payload)
require.NoError(t, err)

assert.Equal(t, "2976390000000000000000", v["benchmarkPrice"].(*big.Int).String())
case hex.EncodeToString(fundingRateFeedID[:]):
require.Len(t, payload, 192)
args := abi.Arguments([]abi.Argument{
{Name: "binanceFundingRate", Type: mustNewType("int192")},
{Name: "binanceFundingTime", Type: mustNewType("int192")},
{Name: "binanceFundingIntervalHours", Type: mustNewType("int192")},
{Name: "deribitFundingRate", Type: mustNewType("int192")},
{Name: "deribitFundingTime", Type: mustNewType("int192")},
{Name: "deribitFundingIntervalHours", Type: mustNewType("int192")},
})
v := make(map[string]interface{})
err := args.UnpackIntoMap(v, payload)
require.NoError(t, err)

assert.Equal(t, "1234", v["binanceFundingRate"].(*big.Int).String())
assert.Equal(t, "1630000000", v["binanceFundingTime"].(*big.Int).String())
assert.Equal(t, "8", v["binanceFundingIntervalHours"].(*big.Int).String())
assert.Equal(t, "5432", v["deribitFundingRate"].(*big.Int).String())
assert.Equal(t, "1630000000", v["deribitFundingTime"].(*big.Int).String())
assert.Equal(t, "8", v["deribitFundingIntervalHours"].(*big.Int).String())
default:
t.Fatalf("unexpected feedID: %x", feedID)
}

case uint32(llotypes.ReportFormatProto):
p, r, err := datastreamsllo.ProtoReportCodec{}.UnpackDecode(req.Payload)
require.NoError(t, err)
// Check the report context
assert.Equal(t, digest[:], p.ConfigDigest)
assert.Positive(t, p.SeqNr)
assert.GreaterOrEqual(t, len(p.Sigs), fNodes+1)
// Check the report
assert.GreaterOrEqual(t, int(r.ObservationTimestampSeconds), int(testStartTimeStamp.Unix()))
assert.LessOrEqual(t, int(r.ValidAfterSeconds), int(r.ObservationTimestampSeconds))

switch r.ChannelId {
case 5:
assert.Len(t, r.StreamValues, 1)
assert.Contains(t, r.StreamValues, benchmarkPriceStreamID)
assert.Equal(t, "foo", r.StreamValues[benchmarkPriceStreamID])
case 6:
assert.Len(t, r.StreamValues, 8)
assert.Contains(t, r.StreamValues, benchmarkPriceStreamID)
datastreamsllo.UnmarshalProtoStreamValue(r.StreamValues[benchmarkPriceStreamID])
assert.Equal(t, "foo", r.StreamValues[benchmarkPriceStreamID])
assert.Contains(t, r.StreamValues, binanceFundingRateStreamID)
assert.Equal(t, "foo", r.StreamValues[binanceFundingRateStreamID])
assert.Contains(t, r.StreamValues, binanceFundingTimeStreamID)
assert.Equal(t, "foo", r.StreamValues[binanceFundingTimeStreamID])
assert.Contains(t, r.StreamValues, binanceFundingIntervalHoursStreamID)
assert.Equal(t, "foo", r.StreamValues[binanceFundingIntervalHoursStreamID])
assert.Contains(t, r.StreamValues, deribitFundingRateStreamID)
assert.Equal(t, "foo", r.StreamValues[deribitFundingRateStreamID])
assert.Contains(t, r.StreamValues, deribitFundingTimeStreamID)
assert.Equal(t, "foo", r.StreamValues[deribitFundingTimeStreamID])
assert.Contains(t, r.StreamValues, deribitFundingIntervalHoursStreamID)
assert.Equal(t, "foo", r.StreamValues[deribitFundingIntervalHoursStreamID])
assert.Contains(t, r.StreamValues, marketStatusStreamID)
assert.Equal(t, "foo", r.StreamValues[marketStatusStreamID])
}

assert.Equal(t, "1234", v["binanceFundingRate"].(*big.Int).String())
assert.Equal(t, "1630000000", v["binanceFundingTime"].(*big.Int).String())
assert.Equal(t, "8", v["binanceFundingIntervalHours"].(*big.Int).String())
assert.Equal(t, "5432", v["deribitFundingRate"].(*big.Int).String())
assert.Equal(t, "1630000000", v["deribitFundingTime"].(*big.Int).String())
assert.Equal(t, "8", v["deribitFundingIntervalHours"].(*big.Int).String())
if _, exists := channelIDs[r.ChannelId]; !exists {
t.Fatalf("unexpected channelID: %d", r.ChannelId)
}
delete(channelIDs, r.ChannelId)
default:
t.Fatalf("unexpected feedID: %x", feedID)
t.Fatalf("unexpected report format: %d", req.ReportFormat)
}

if len(feedIDs) == 0 {
if len(feedIDs) == 0 && len(channelIDs) == 0 {
// SUCCESS!
break
}
}
Expand Down
2 changes: 1 addition & 1 deletion core/services/relay/evm/llo_config_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func (l *lloConfigProvider) OffchainConfigDigester() ocrtypes.OffchainConfigDige
func (l *lloConfigProvider) ContractConfigTracker() ocrtypes.ContractConfigTracker {
// FIXME: Only return Blue for now. This is a hack to make the bootstrap
// job work, needs to support multiple config trackers here
// MERC-5954
// MERC-6839
return l.cps[0]
}

Expand Down

0 comments on commit 97eb3e2

Please sign in to comment.