Skip to content

Commit

Permalink
Various fixes - code cleanup - performance - bugfixes (#641)
Browse files Browse the repository at this point in the history
  • Loading branch information
dimkouv authored Feb 26, 2025
1 parent f794997 commit fdcfd13
Show file tree
Hide file tree
Showing 15 changed files with 203 additions and 103 deletions.
18 changes: 7 additions & 11 deletions commit/merkleroot/observation.go
Original file line number Diff line number Diff line change
Expand Up @@ -555,14 +555,16 @@ func (o observerImpl) ObserveLatestOnRampSeqNums(ctx context.Context) []pluginty

mu := &sync.Mutex{}
latestOnRampSeqNums := make([]plugintypes.SeqNumChain, 0, len(sourceChains))
eg := &errgroup.Group{}

wg := &sync.WaitGroup{}
wg.Add(len(sourceChains))
for _, sourceChain := range sourceChains {
eg.Go(func() error {
go func() {
defer wg.Done()
latestOnRampSeqNum, err := o.ccipReader.LatestMsgSeqNum(ctx, sourceChain)
if err != nil {
lggr.Errorf("failed to get latest msg seq num for source chain %d: %s", sourceChain, err)
return nil
return
}

mu.Lock()
Expand All @@ -571,15 +573,9 @@ func (o observerImpl) ObserveLatestOnRampSeqNums(ctx context.Context) []pluginty
plugintypes.NewSeqNumChain(sourceChain, latestOnRampSeqNum),
)
mu.Unlock()

return nil
})
}

if err := eg.Wait(); err != nil {
lggr.Warnw("call to GetExpectedNextSequenceNumber failed", "err", err)
return nil
}()
}
wg.Wait()

sort.Slice(latestOnRampSeqNums, func(i, j int) bool {
return latestOnRampSeqNums[i].ChainSel < latestOnRampSeqNums[j].ChainSel
Expand Down
5 changes: 5 additions & 0 deletions commit/merkleroot/outcome.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,11 @@ func reportRangesOutcome(
rmnRemoteConfig = observedRMNRemoteConfig[dstChain]
}

if len(rangesToReport) == 0 {
lggr.Info("No ranges to report, outcomeType is ReportEmpty")
return Outcome{OutcomeType: ReportEmpty}
}

outcome := Outcome{
OutcomeType: ReportIntervalsSelected,
RangesSelectedForReport: rangesToReport,
Expand Down
9 changes: 2 additions & 7 deletions commit/merkleroot/outcome_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -647,13 +647,8 @@ func Test_reportRangesOutcome(t *testing.T) {
expectedOutcome Outcome
}{
{
name: "base empty outcome",
expectedOutcome: Outcome{
OutcomeType: ReportIntervalsSelected,
RangesSelectedForReport: []plugintypes.ChainRange{},
OffRampNextSeqNums: []plugintypes.SeqNumChain{},
RMNRemoteCfg: rmntypes.RemoteConfig{},
},
name: "base empty outcome",
expectedOutcome: Outcome{OutcomeType: ReportEmpty},
},
{
name: "simple scenario with one chain",
Expand Down
2 changes: 1 addition & 1 deletion commit/merkleroot/rmn/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -824,7 +824,7 @@ func transformAndSortObservations(
return attrSigObservations
}

// selectsRoots selects the roots from the signed observations.
// selectRoots selects the roots from the signed observations.
// If there are more than one valid roots based on the provided F it returns an error.
func selectRoots(
observations []rmnSignedObservationWithMeta,
Expand Down
10 changes: 3 additions & 7 deletions commit/merkleroot/validate_observation.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func validateObservedMerkleRoots(
return fmt.Errorf("%s invalid: chain already appears in another observed root", root)
}

if len(root.OnRampAddress) == 0 {
if root.OnRampAddress.IsZeroOrEmpty() {
return fmt.Errorf("%s invalid: empty OnRampAddress", root)
}

Expand Down Expand Up @@ -123,10 +123,6 @@ func validateObservedOnRampMaxSeqNums(
return fmt.Errorf("duplicate onRampMaxSeqNum for chain %d", seqNumChain.ChainSel)
}

if seqNumChain.ChainSel == 0 {
return fmt.Errorf("onRampMaxSeqNum for chain %d has chain selector 0", seqNumChain.ChainSel)
}

seenChains.Add(seqNumChain.ChainSel)
}

Expand Down Expand Up @@ -192,8 +188,8 @@ func validateRMNRemoteConfig(
return fmt.Errorf("not enough signers to cover F+1 threshold")
}

if len(rmnRemoteConfig.ContractAddress) == 0 {
return fmt.Errorf("empty ContractAddress")
if rmnRemoteConfig.ContractAddress.IsZeroOrEmpty() {
return fmt.Errorf("empty ContractAddress: %s", rmnRemoteConfig.ContractAddress)
}

seenNodeIndexes := mapset.NewSet[uint64]()
Expand Down
26 changes: 9 additions & 17 deletions commit/plugin_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,7 @@ func TestPlugin_E2E_AllNodesAgree_TokenPrices(t *testing.T) {

nodes := make([]ocr3types.ReportingPlugin[[]byte], len(oracleIDs))

merkleOutcome := baseMerkleOutcome(params.rmnReportCfg)
merkleOutcome := reportEmptyMerkleRootOutcome()

testCases := []struct {
name string
Expand Down Expand Up @@ -321,6 +321,7 @@ func TestPlugin_E2E_AllNodesAgree_TokenPrices(t *testing.T) {
},
BlessedMerkleRoots: make([]ccipocr3.MerkleRootChain, 0),
UnblessedMerkleRoots: make([]ccipocr3.MerkleRootChain, 0),
RMNSignatures: make([]ccipocr3.RMNECDSASignature, 0),
},
},
},
Expand Down Expand Up @@ -422,6 +423,7 @@ func TestPlugin_E2E_AllNodesAgree_TokenPrices(t *testing.T) {
},
BlessedMerkleRoots: make([]ccipocr3.MerkleRootChain, 0),
UnblessedMerkleRoots: make([]ccipocr3.MerkleRootChain, 0),
RMNSignatures: make([]ccipocr3.RMNECDSASignature, 0),
},
},
},
Expand Down Expand Up @@ -465,7 +467,7 @@ func TestPlugin_E2E_AllNodesAgree_TokenPrices(t *testing.T) {

func TestPlugin_E2E_AllNodesAgree_ChainFee(t *testing.T) {
params := defaultNodeParams(t)
merkleOutcome := baseMerkleOutcome(params.rmnReportCfg)
merkleOutcome := reportEmptyMerkleRootOutcome()
nodes := make([]ocr3types.ReportingPlugin[[]byte], len(oracleIDs))

newFeeComponents, newNativePrice, packedGasPrice := newRandomFees()
Expand Down Expand Up @@ -572,7 +574,7 @@ func TestPlugin_E2E_AllNodesAgree_ChainFee(t *testing.T) {
ChainFeeOutcome: expectedChain1FeeOutcome,
},
expOutcome: committypes.Outcome{
MerkleRootOutcome: noReportMerkleOutcome(params.rmnReportCfg),
MerkleRootOutcome: merkleOutcome,
ChainFeeOutcome: expectedChain1FeeOutcome,
MainOutcome: committypes.MainOutcome{InflightPriceOcrSequenceNumber: 1, RemainingPriceChecks: 10},
},
Expand All @@ -599,7 +601,7 @@ func TestPlugin_E2E_AllNodesAgree_ChainFee(t *testing.T) {
ChainFeeOutcome: expectedChain1FeeOutcome,
},
expOutcome: committypes.Outcome{
MerkleRootOutcome: noReportMerkleOutcome(params.rmnReportCfg),
MerkleRootOutcome: reportEmptyMerkleRootOutcome(),
ChainFeeOutcome: chainfee.Outcome{
GasPrices: []ccipocr3.GasPriceChain{
{
Expand Down Expand Up @@ -633,7 +635,7 @@ func TestPlugin_E2E_AllNodesAgree_ChainFee(t *testing.T) {
ChainFeeOutcome: expectedChain1FeeOutcome,
},
expOutcome: committypes.Outcome{
MerkleRootOutcome: noReportMerkleOutcome(params.rmnReportCfg),
MerkleRootOutcome: reportEmptyMerkleRootOutcome(),
ChainFeeOutcome: chainfee.Outcome{
GasPrices: []ccipocr3.GasPriceChain{
{
Expand Down Expand Up @@ -1005,18 +1007,8 @@ var merkleRoot1 = ccipocr3.Bytes32{0x4a, 0x44, 0xdc, 0x15, 0x36, 0x42, 0x4, 0xa8
0x90, 0x39, 0x45, 0x5c, 0xc1, 0x60, 0x82, 0x81, 0x82, 0xf, 0xe2, 0xb2, 0x4f, 0x1e, 0x52,
0x33, 0xad, 0xe6, 0xaf, 0x1d, 0xd5}

func baseMerkleOutcome(r rmntypes.RemoteConfig) merkleroot.Outcome {
return merkleroot.Outcome{
OutcomeType: merkleroot.ReportIntervalsSelected,
RMNRemoteCfg: r,
}
}

func noReportMerkleOutcome(r rmntypes.RemoteConfig) merkleroot.Outcome {
return merkleroot.Outcome{
OutcomeType: merkleroot.ReportEmpty,
RMNRemoteCfg: r,
}
func reportEmptyMerkleRootOutcome() merkleroot.Outcome {
return merkleroot.Outcome{OutcomeType: merkleroot.ReportEmpty}
}

func newRandomFees() (components types.ChainFeeComponents, nativePrice ccipocr3.BigInt, usdPrices ccipocr3.BigInt) {
Expand Down
2 changes: 1 addition & 1 deletion execute/observation.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ func (p *Plugin) getCommitReportsObservation(
}

// Get pending exec reports.
groupedCommits, fullyExecutedFinalized, fullyExecutedUnfinalized, err := getPendingExecutedReports(
groupedCommits, fullyExecutedFinalized, fullyExecutedUnfinalized, err := getPendingReportsForExecution(
ctx,
p.ccipReader,
p.commitRootsCache.CanExecute,
Expand Down
12 changes: 6 additions & 6 deletions execute/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ func (p *Plugin) Query(ctx context.Context, outctx ocr3types.OutcomeContext) (ty

type CanExecuteHandle = func(sel cciptypes.ChainSelector, merkleRoot cciptypes.Bytes32) bool

// getPendingExecutedReports is used to find commit reports which need to be executed.
// getPendingReportsForExecution is used to find commit reports which need to be executed.
//
// The function checks execution status at two levels:
// 1. Gets all executed messages (both finalized and unfinalized) via primitives.Unconfirmed
Expand All @@ -159,7 +159,7 @@ type CanExecuteHandle = func(sel cciptypes.ChainSelector, merkleRoot cciptypes.B
// - fullyExecutedFinalized: All messages executed with finality (mark as executed)
// - fullyExecutedUnfinalized: All messages executed but not finalized (snooze)
// - groupedCommits: Reports with unexecuted messages (available for execution)
func getPendingExecutedReports(
func getPendingReportsForExecution(
ctx context.Context,
ccipReader readerpkg.CCIPReader,
canExecute CanExecuteHandle,
Expand Down Expand Up @@ -286,8 +286,8 @@ func (p *Plugin) ValidateObservation(
return fmt.Errorf("error finding supported chains by node: %w", err)
}

state := previousOutcome.State.Next()
if state == exectypes.Initialized || state == exectypes.GetCommitReports {
nextState := previousOutcome.State.Next()
if nextState == exectypes.GetCommitReports {
err = validateNoMessageRelatedObservations(
decodedObservation.Messages,
decodedObservation.TokenData,
Expand All @@ -303,7 +303,7 @@ func (p *Plugin) ValidateObservation(
}

// check message related validations when states can contain messages
if state == exectypes.GetMessages || state == exectypes.Filter {
if nextState == exectypes.GetMessages || nextState == exectypes.Filter {
if err = validateMsgsReadingEligibility(supportedChains, decodedObservation.Messages); err != nil {
return fmt.Errorf("validate observer reading eligibility: %w", err)
}
Expand Down Expand Up @@ -337,7 +337,7 @@ func validateCommonStateObservations(
return fmt.Errorf("validate commit reports reading eligibility: %w", err)
}

if err := validateObservedSequenceNumbers(decodedObservation.CommitReports); err != nil {
if err := validateObservedSequenceNumbers(supportedChains, decodedObservation.CommitReports); err != nil {
return fmt.Errorf("validate observed sequence numbers: %w", err)
}

Expand Down
28 changes: 24 additions & 4 deletions execute/plugin_functions.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ func validateCommitReportsReadingEligibility(
}
for _, data := range observedData[chainSel] {
if data.SourceChain != chainSel {
return fmt.Errorf("observer not allowed to read from chain %d", data.SourceChain)
return fmt.Errorf("invalid observed data, key=%d but data chain=%d",
chainSel, data.SourceChain)
}
}
}
Expand Down Expand Up @@ -101,15 +102,24 @@ func validateHashesExist(
}

for chain, msgs := range observedMsgs {
_, ok := hashes[chain]
hashesForChain, ok := hashes[chain]
if !ok {
return fmt.Errorf("hash not found for chain %d", chain)
}

if len(msgs) != len(hashesForChain) {
return fmt.Errorf("unexpected number of message hashes for chain %d: expected %d, got %d",
chain, len(msgs), len(hashesForChain))
}

for seq, msg := range msgs {
if _, ok := hashes[chain][seq]; !ok {
h, exists := hashes[chain][seq]
if !exists {
return fmt.Errorf("hash not found for message %s", msg)
}
if h.IsEmpty() {
return fmt.Errorf("hash is empty for message %s", msg)
}
}
}

Expand All @@ -123,6 +133,11 @@ func validateMessagesConformToCommitReports(
observedData exectypes.CommitObservations,
observedMsgs exectypes.MessageObservations,
) error {
if len(observedData) != len(observedMsgs) {
return fmt.Errorf("count of observed data=%d and observed msgs=%d do not match",
len(observedData), len(observedMsgs))
}

msgsCount := 0
for chain, report := range observedData {
for _, data := range report {
Expand Down Expand Up @@ -154,9 +169,14 @@ func validateMessagesConformToCommitReports(
// validateObservedSequenceNumbers checks if the sequence numbers of the provided messages are unique for each chain
// and that they match the observed max sequence numbers.
func validateObservedSequenceNumbers(
supportedChains mapset.Set[cciptypes.ChainSelector],
observedData map[cciptypes.ChainSelector][]exectypes.CommitData,
) error {
for _, commitData := range observedData {
for chainSel, commitData := range observedData {
if !supportedChains.Contains(chainSel) {
return fmt.Errorf("observed a non-supported chain %d", chainSel)
}

// observed commitData must not contain duplicates

observedMerkleRoots := mapset.NewSet[string]()
Expand Down
Loading

0 comments on commit fdcfd13

Please sign in to comment.