Skip to content

Commit

Permalink
fix v1 and add logs
Browse files Browse the repository at this point in the history
  • Loading branch information
amirylm committed Mar 12, 2024
1 parent 856c7d6 commit 6e10f71
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func NewLogBuffer(lggr logger.Logger, size, upkeepLogLimit int) LogBuffer {
l := new(atomic.Int32)
l.Add(int32(upkeepLogLimit))
return &logBuffer{
lggr: lggr.Named("KeepersRegistry.LogEventBufferV2"),
lggr: lggr.Named("KeepersRegistry.LogEventBufferV1"),
maxUpkeepLogs: l,
bufferSize: s,
lastBlockSeen: new(atomic.Int64),
Expand Down Expand Up @@ -236,6 +236,7 @@ func (ub *upkeepLogBuffer) dequeue(start, end int64, limit int) ([]logpoller.Log
ub.q = updatedLogs
}

ub.lggr.Debugf("Dequeued %d logs, remaining %d", len(results), remaining)
prommetrics.AutomationLogsInLogBuffer.Sub(float64(len(results)))

return results, remaining
Expand Down Expand Up @@ -280,10 +281,12 @@ func (ub *upkeepLogBuffer) enqueue(blockThreshold int64, logsToAdd ...logpoller.
ub.visited[logid] = log.BlockNumber
}
ub.q = logs
dropped := ub.clean(blockThreshold)

ub.lggr.Debugf("Enqueued %d logs, dropped %d", added, dropped)
prommetrics.AutomationLogsInLogBuffer.Add(float64(added))

return added, ub.clean(blockThreshold)
return added, dropped
}

// clean removes logs that are older than blockThreshold and drops logs if the limit for the
Expand All @@ -310,6 +313,7 @@ func (ub *upkeepLogBuffer) clean(blockThreshold int64) int {
} else {
prommetrics.AutomationLogsInLogBuffer.Dec()
// old logs are ignored and removed from visited
ub.lggr.Debugw("Dropping old log", "blockNumber", l.BlockNumber, "blockThreshold", blockThreshold, "logIndex", l.LogIndex)
logid := logID(l)
delete(ub.visited, logid)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func TestIntegration_LogEventProvider(t *testing.T) {
version string
}{
{"default version", ""},
{"v2", "v2"},
{"v1", "v1"},
}

for _, tc := range tests {
Expand Down Expand Up @@ -220,7 +220,7 @@ func TestIntegration_LogEventProvider_Backfill(t *testing.T) {
bufferVersion string
}{
{"default version", ""},
{"v2", "v2"},
{"v1", "v1"},
}

for _, tc := range tests {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,10 @@ func (p *logEventProvider) GetLatestPayloads(ctx context.Context) ([]ocr2keepers
prommetrics.AutomationLogProviderLatestBlock.Set(float64(latest.BlockNumber))
payloads := p.getPayloadsFromBuffer(latest.BlockNumber)

if len(payloads) > 0 {
p.lggr.Debugw("Fetched payloads from buffer xxx", "latestBlock", latest.BlockNumber, "payloads", len(payloads))
}

return payloads, nil
}

Expand Down Expand Up @@ -195,10 +199,13 @@ func (p *logEventProvider) getPayloadsFromBuffer(latestBlock int64) []ocr2keeper
}

switch p.opts.BufferVersion {
case "v2":
case "v1":
blockRate, upkeepLimit, maxResults := 4, 10, MaxPayloads // TODO: use config
for len(payloads) < MaxPayloads && start < latestBlock {
for len(payloads) < maxResults && start < latestBlock {
logs, _ := p.bufferV2.Dequeue(start, blockRate, upkeepLimit, maxResults-len(payloads), DefaultUpkeepSelector)
if len(logs) > 0 {
p.lggr.Debugw("Dequeued logs xxx", "start", start, "latestBlock", latestBlock, "logs", len(logs))
}
for _, l := range logs {
payload, err := p.createPayload(l.ID, l.Log)
if err == nil {
Expand Down Expand Up @@ -428,7 +435,7 @@ func (p *logEventProvider) readLogs(ctx context.Context, latest int64, filters [
}

switch p.opts.BufferVersion {
case "v2":
case "v1":
p.bufferV2.Enqueue(filter.upkeepID, filteredLogs...)
default:
p.buffer.enqueue(filter.upkeepID, filteredLogs...)
Expand Down

0 comments on commit 6e10f71

Please sign in to comment.