diff --git a/pkg/stanza/fileconsumer/internal/tracker/tracker.go b/pkg/stanza/fileconsumer/internal/tracker/tracker.go index 568d13accc96..7315b8ba62c8 100644 --- a/pkg/stanza/fileconsumer/internal/tracker/tracker.go +++ b/pkg/stanza/fileconsumer/internal/tracker/tracker.go @@ -159,23 +159,20 @@ func (t *fileTracker) restoreArchiveIndex() { if !t.archiveEnabled() { return } - archiveIndex := 0 byteIndex, err := t.persister.Get(context.Background(), archiveIndexKey) if err != nil { t.set.Logger.Error("error while reading the archiveIndexKey. Starting from 0", zap.Error(err)) return } - archiveIndex, err = byteToIndex(byteIndex) + t.archiveIndex, err = byteToIndex(byteIndex) if err != nil { t.set.Logger.Error("error getting read index. Starting from 0", zap.Error(err)) - return - } else if archiveIndex < 0 || archiveIndex >= t.pollsToArchive { + } else if t.archiveIndex < 0 || t.archiveIndex >= t.pollsToArchive { // safety check. It can happen if `polls_to_archive` was changed. // It's best if we reset the index or else we might end up writing invalid keys t.set.Logger.Warn("the read index was found, but it exceeds the bounds. Starting from 0") - return + t.archiveIndex = 0 } - t.archiveIndex = archiveIndex } func (t *fileTracker) archive(metadata *fileset.Fileset[*reader.Metadata]) { diff --git a/pkg/stanza/operator/persister.go b/pkg/stanza/operator/persister.go index 3e4fae1c1785..feb912c9e416 100644 --- a/pkg/stanza/operator/persister.go +++ b/pkg/stanza/operator/persister.go @@ -43,5 +43,8 @@ func (p scopedPersister) Delete(ctx context.Context, key string) error { } func (p scopedPersister) Batch(ctx context.Context, ops ...storage.Operation) error { + for _, op := range ops { + op.Key = fmt.Sprintf("%s.%s", p.scope, op.Key) + } return p.Persister.Batch(ctx, ops...) }