diff --git a/core/services/ocr2/plugins/ccip/internal/cache/commit_roots.go b/core/services/ocr2/plugins/ccip/internal/cache/commit_roots.go index 65451a49e1..c14bccba65 100644 --- a/core/services/ocr2/plugins/ccip/internal/cache/commit_roots.go +++ b/core/services/ocr2/plugins/ccip/internal/cache/commit_roots.go @@ -178,15 +178,24 @@ func (r *commitRootsCache) updateFinalizedRoots(logs []ccip.CommitStoreReportWit } var finalizedRoots []ccip.CommitStoreReportWithTxMeta + var rootsToDelete []string + messageVisibilityWindow := time.Now().Add(-r.messageVisibilityInterval) for pair := r.finalizedRoots.Oldest(); pair != nil; pair = pair.Next() { - // Evict stale items + // Mark items as stale if they are older than the messageVisibilityInterval + // SortedMap doesn't allow to iterate and delete, so we mark roots for deletion and remove them in a separate loop if time.UnixMilli(pair.Value.BlockTimestampUnixMilli).Before(messageVisibilityWindow) { - r.finalizedRoots.Delete(pair.Key) + rootsToDelete = append(rootsToDelete, pair.Key) continue } finalizedRoots = append(finalizedRoots, pair.Value) } + + // Remove stale items + for _, root := range rootsToDelete { + r.finalizedRoots.Delete(root) + } + return finalizedRoots, unfinalizedReports } diff --git a/core/services/ocr2/plugins/ccip/internal/cache/commit_roots_unit_test.go b/core/services/ocr2/plugins/ccip/internal/cache/commit_roots_unit_test.go index 55f9df7fe8..5b0e5130c6 100644 --- a/core/services/ocr2/plugins/ccip/internal/cache/commit_roots_unit_test.go +++ b/core/services/ocr2/plugins/ccip/internal/cache/commit_roots_unit_test.go @@ -55,6 +55,45 @@ func Test_CacheExpiration(t *testing.T) { }, 5*time.Second, 1*time.Second) } +func Test_CacheFullEviction(t *testing.T) { + commitStoreReader := mocks.NewCommitStoreReader(t) + cache := newCommitRootsCache(logger.TestLogger(t), commitStoreReader, 2*time.Second, 1*time.Second, time.Second, time.Second) + + maxElements := 10000 + commitRoots := make([]ccip.CommitStoreReportWithTxMeta, maxElements) + for i := 0; i < maxElements; i++ { + finalized := i >= maxElements/2 + commitRoots[i] = createCommitStoreEntry(utils.RandomBytes32(), time.Now(), finalized) + } + mockCommitStoreReader(commitStoreReader, time.Time{}, commitRoots) + + roots, err := cache.RootsEligibleForExecution(tests.Context(t)) + require.NoError(t, err) + require.Len(t, roots, maxElements) + + // Marks some of them as exeucted and some of them as snoozed + for i := 0; i < maxElements; i++ { + if i%3 == 0 { + cache.MarkAsExecuted(commitRoots[i].MerkleRoot) + } + if i%3 == 1 { + cache.Snooze(commitRoots[i].MerkleRoot) + } + } + + require.Eventually(t, func() bool { + mockCommitStoreReader(commitStoreReader, time.Time{}, []ccip.CommitStoreReportWithTxMeta{}) + roots1, err1 := cache.RootsEligibleForExecution(tests.Context(t)) + require.NoError(t, err1) + + return len(roots1) == 0 && + cache.finalizedRoots.Len() == 0 && + len(cache.snoozedRoots.Items()) == 0 && + len(cache.executedRoots.Items()) == 0 + + }, 10*time.Second, time.Second) +} + func Test_CacheProgression_Internal(t *testing.T) { ts1 := time.Now().Add(-5 * time.Hour).Truncate(time.Millisecond) ts2 := time.Now().Add(-3 * time.Hour).Truncate(time.Millisecond) diff --git a/core/services/ocr2/plugins/ccip/internal/ccipdata/v1_0_0/commit_store.go b/core/services/ocr2/plugins/ccip/internal/ccipdata/v1_0_0/commit_store.go index cfdf4d031b..b5a1d73975 100644 --- a/core/services/ocr2/plugins/ccip/internal/ccipdata/v1_0_0/commit_store.go +++ b/core/services/ocr2/plugins/ccip/internal/ccipdata/v1_0_0/commit_store.go @@ -330,6 +330,11 @@ func (c *CommitStore) GetCommitReportMatchingSeqNum(ctx context.Context, seqNr u } func (c *CommitStore) GetAcceptedCommitReportsGteTimestamp(ctx context.Context, ts time.Time, confs int) ([]cciptypes.CommitStoreReportWithTxMeta, error) { + latestBlock, err := c.lp.LatestBlock(ctx) + if err != nil { + return nil, err + } + logs, err := c.lp.LogsCreatedAfter( ctx, c.reportAcceptedSig, @@ -349,7 +354,7 @@ func (c *CommitStore) GetAcceptedCommitReportsGteTimestamp(ctx context.Context, parsedReports := make([]cciptypes.CommitStoreReportWithTxMeta, 0, len(parsedLogs)) for _, log := range parsedLogs { parsedReports = append(parsedReports, cciptypes.CommitStoreReportWithTxMeta{ - TxMeta: log.TxMeta, + TxMeta: log.TxMeta.UpdateFinalityStatus(uint64(latestBlock.FinalizedBlockNumber)), CommitStoreReport: log.Data, }) }