From 6e10f71687838fc075128682e4b3f8825a8bd1bb Mon Sep 17 00:00:00 2001 From: amirylm Date: Tue, 12 Mar 2024 19:20:35 +0200 Subject: [PATCH] fix v1 and add logs --- .../evmregistry/v21/logprovider/buffer_v1.go | 8 ++++++-- .../evmregistry/v21/logprovider/integration_test.go | 4 ++-- .../evmregistry/v21/logprovider/provider.go | 13 ++++++++++--- 3 files changed, 18 insertions(+), 7 deletions(-) diff --git a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/buffer_v1.go b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/buffer_v1.go index 6f0075da2a3..dbd131af919 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/buffer_v1.go +++ b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/buffer_v1.go @@ -55,7 +55,7 @@ func NewLogBuffer(lggr logger.Logger, size, upkeepLogLimit int) LogBuffer { l := new(atomic.Int32) l.Add(int32(upkeepLogLimit)) return &logBuffer{ - lggr: lggr.Named("KeepersRegistry.LogEventBufferV2"), + lggr: lggr.Named("KeepersRegistry.LogEventBufferV1"), maxUpkeepLogs: l, bufferSize: s, lastBlockSeen: new(atomic.Int64), @@ -236,6 +236,7 @@ func (ub *upkeepLogBuffer) dequeue(start, end int64, limit int) ([]logpoller.Log ub.q = updatedLogs } + ub.lggr.Debugf("Dequeued %d logs, remaining %d", len(results), remaining) prommetrics.AutomationLogsInLogBuffer.Sub(float64(len(results))) return results, remaining @@ -280,10 +281,12 @@ func (ub *upkeepLogBuffer) enqueue(blockThreshold int64, logsToAdd ...logpoller. ub.visited[logid] = log.BlockNumber } ub.q = logs + dropped := ub.clean(blockThreshold) + ub.lggr.Debugf("Enqueued %d logs, dropped %d", added, dropped) prommetrics.AutomationLogsInLogBuffer.Add(float64(added)) - return added, ub.clean(blockThreshold) + return added, dropped } // clean removes logs that are older than blockThreshold and drops logs if the limit for the @@ -310,6 +313,7 @@ func (ub *upkeepLogBuffer) clean(blockThreshold int64) int { } else { prommetrics.AutomationLogsInLogBuffer.Dec() // old logs are ignored and removed from visited + ub.lggr.Debugw("Dropping old log", "blockNumber", l.BlockNumber, "blockThreshold", blockThreshold, "logIndex", l.LogIndex) logid := logID(l) delete(ub.visited, logid) } diff --git a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/integration_test.go b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/integration_test.go index 038ca460c5a..c1c153a0997 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/integration_test.go +++ b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/integration_test.go @@ -44,7 +44,7 @@ func TestIntegration_LogEventProvider(t *testing.T) { version string }{ {"default version", ""}, - {"v2", "v2"}, + {"v1", "v1"}, } for _, tc := range tests { @@ -220,7 +220,7 @@ func TestIntegration_LogEventProvider_Backfill(t *testing.T) { bufferVersion string }{ {"default version", ""}, - {"v2", "v2"}, + {"v1", "v1"}, } for _, tc := range tests { diff --git a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/provider.go b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/provider.go index 942434547c9..b7ca44fc3b8 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/provider.go +++ b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/provider.go @@ -168,6 +168,10 @@ func (p *logEventProvider) GetLatestPayloads(ctx context.Context) ([]ocr2keepers prommetrics.AutomationLogProviderLatestBlock.Set(float64(latest.BlockNumber)) payloads := p.getPayloadsFromBuffer(latest.BlockNumber) + if len(payloads) > 0 { + p.lggr.Debugw("Fetched payloads from buffer xxx", "latestBlock", latest.BlockNumber, "payloads", len(payloads)) + } + return payloads, nil } @@ -195,10 +199,13 @@ func (p *logEventProvider) getPayloadsFromBuffer(latestBlock int64) []ocr2keeper } switch p.opts.BufferVersion { - case "v2": + case "v1": blockRate, upkeepLimit, maxResults := 4, 10, MaxPayloads // TODO: use config - for len(payloads) < MaxPayloads && start < latestBlock { + for len(payloads) < maxResults && start < latestBlock { logs, _ := p.bufferV2.Dequeue(start, blockRate, upkeepLimit, maxResults-len(payloads), DefaultUpkeepSelector) + if len(logs) > 0 { + p.lggr.Debugw("Dequeued logs xxx", "start", start, "latestBlock", latestBlock, "logs", len(logs)) + } for _, l := range logs { payload, err := p.createPayload(l.ID, l.Log) if err == nil { @@ -428,7 +435,7 @@ func (p *logEventProvider) readLogs(ctx context.Context, latest int64, filters [ } switch p.opts.BufferVersion { - case "v2": + case "v1": p.bufferV2.Enqueue(filter.upkeepID, filteredLogs...) default: p.buffer.enqueue(filter.upkeepID, filteredLogs...)