-
Notifications
You must be signed in to change notification settings - Fork 2.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[chore][tracker]: save most recent (archive) write index to disk #36799
base: main
Are you sure you want to change the base?
Changes from 7 commits
25105a4
b586d84
7cdba33
43c4298
a7d6903
c8a4c51
ab6bdd1
c8171a4
5d5f12d
1a66069
f6f6815
2c578f5
b4adbf5
b9e55ba
a843a3b
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -5,9 +5,12 @@ package tracker // import "github.com/open-telemetry/opentelemetry-collector-con | |
|
||
import ( | ||
"context" | ||
"encoding/binary" | ||
"errors" | ||
"fmt" | ||
|
||
"go.opentelemetry.io/collector/component" | ||
"go.opentelemetry.io/collector/extension/experimental/storage" | ||
"go.uber.org/zap" | ||
|
||
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/checkpoint" | ||
|
@@ -52,13 +55,18 @@ type fileTracker struct { | |
archiveIndex int | ||
} | ||
|
||
var errInvalidValue = errors.New("invalid value") | ||
|
||
var archiveIndexKey = "knonwFiles_ai" | ||
VihasMakwana marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
func NewFileTracker(set component.TelemetrySettings, maxBatchFiles int, pollsToArchive int, persister operator.Persister) Tracker { | ||
knownFiles := make([]*fileset.Fileset[*reader.Metadata], 3) | ||
for i := 0; i < len(knownFiles); i++ { | ||
knownFiles[i] = fileset.New[*reader.Metadata](maxBatchFiles) | ||
} | ||
set.Logger = set.Logger.With(zap.String("tracker", "fileTracker")) | ||
return &fileTracker{ | ||
|
||
t := &fileTracker{ | ||
set: set, | ||
maxBatchFiles: maxBatchFiles, | ||
currentPollFiles: fileset.New[*reader.Reader](maxBatchFiles), | ||
|
@@ -68,6 +76,9 @@ func NewFileTracker(set component.TelemetrySettings, maxBatchFiles int, pollsToA | |
persister: persister, | ||
archiveIndex: 0, | ||
} | ||
t.restoreArchiveIndex() | ||
|
||
return t | ||
} | ||
|
||
func (t *fileTracker) Add(reader *reader.Reader) { | ||
|
@@ -144,6 +155,26 @@ func (t *fileTracker) TotalReaders() int { | |
return total | ||
} | ||
|
||
func (t *fileTracker) restoreArchiveIndex() { | ||
if !t.archiveEnabled() { | ||
return | ||
} | ||
VihasMakwana marked this conversation as resolved.
Show resolved
Hide resolved
|
||
byteIndex, err := t.persister.Get(context.Background(), archiveIndexKey) | ||
if err != nil { | ||
t.set.Logger.Error("error while reading the archiveIndexKey. Starting from 0", zap.Error(err)) | ||
return | ||
} | ||
t.archiveIndex, err = byteToIndex(byteIndex) | ||
if err != nil { | ||
t.set.Logger.Error("error getting read index. Starting from 0", zap.Error(err)) | ||
} else if t.archiveIndex < 0 || t.archiveIndex >= t.pollsToArchive { | ||
// safety check. It can happen if `polls_to_archive` was changed. | ||
// It's best if we reset the index or else we might end up writing invalid keys | ||
t.set.Logger.Warn("the read index was found, but it exceeds the bounds. Starting from 0") | ||
t.archiveIndex = 0 | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good idea to check for this case. However, I wonder if we can handle it better than restarting from zero. What would it take to search the archive for the most recently updated? I think we could maintain some kind of data structure which notes the time each archive was written. Maybe just For example, if we previously saved 10 archives and find that There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @djaglowski This solution does makes sense to me, but it becomes tricky when we eventually overwrite old archive data, as it is a ring buffer. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Can you elaborate?
If it's more than one at a time then it defeats the point of the archive. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Consider this archive, We've rolled over once and the latest data is at index Let's suppose that new
We cannot simply rewrite archive in-place without caching values. It would be much simpler to convert archive like following image, and we would delete excess data. Wdyt? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
It would always be data stored at
|
||
} | ||
|
||
func (t *fileTracker) archive(metadata *fileset.Fileset[*reader.Metadata]) { | ||
// We make use of a ring buffer, where each set of files is stored under a specific index. | ||
// Instead of discarding knownFiles[2], write it to the next index and eventually roll over. | ||
|
@@ -162,13 +193,15 @@ func (t *fileTracker) archive(metadata *fileset.Fileset[*reader.Metadata]) { | |
// start | ||
// index | ||
|
||
if t.pollsToArchive <= 0 || t.persister == nil { | ||
if !t.archiveEnabled() { | ||
VihasMakwana marked this conversation as resolved.
Show resolved
Hide resolved
|
||
return | ||
} | ||
if err := t.writeArchive(t.archiveIndex, metadata); err != nil { | ||
index := t.archiveIndex | ||
t.archiveIndex = (t.archiveIndex + 1) % t.pollsToArchive // increment the index | ||
indexOp := storage.SetOperation(archiveIndexKey, intToByte(t.archiveIndex)) // batch the updated index with metadata | ||
if err := t.writeArchive(index, metadata, indexOp); err != nil { | ||
t.set.Logger.Error("error faced while saving to the archive", zap.Error(err)) | ||
} | ||
t.archiveIndex = (t.archiveIndex + 1) % t.pollsToArchive // increment the index | ||
} | ||
|
||
// readArchive loads data from the archive for a given index and returns a fileset.Filset. | ||
|
@@ -184,9 +217,13 @@ func (t *fileTracker) readArchive(index int) (*fileset.Fileset[*reader.Metadata] | |
} | ||
|
||
// writeArchive saves data to the archive for a given index and returns an error, if encountered. | ||
func (t *fileTracker) writeArchive(index int, rmds *fileset.Fileset[*reader.Metadata]) error { | ||
func (t *fileTracker) writeArchive(index int, rmds *fileset.Fileset[*reader.Metadata], ops ...storage.Operation) error { | ||
key := fmt.Sprintf("knownFiles%d", index) | ||
return checkpoint.SaveKey(context.Background(), t.persister, rmds.Get(), key) | ||
return checkpoint.SaveKey(context.Background(), t.persister, rmds.Get(), key, ops...) | ||
} | ||
|
||
func (t *fileTracker) archiveEnabled() bool { | ||
return t.pollsToArchive > 0 && t.persister != nil | ||
} | ||
|
||
// FindFiles goes through archive, one fileset at a time and tries to match all fingerprints against that loaded set. | ||
|
@@ -295,3 +332,18 @@ func (t *noStateTracker) EndPoll() {} | |
func (t *noStateTracker) TotalReaders() int { return 0 } | ||
|
||
func (t *noStateTracker) FindFiles([]*fingerprint.Fingerprint) []*reader.Metadata { return nil } | ||
|
||
func intToByte(val int) []byte { | ||
return binary.LittleEndian.AppendUint64([]byte{}, uint64(val)) | ||
} | ||
|
||
func byteToIndex(buf []byte) (int, error) { | ||
if buf == nil { | ||
return 0, nil | ||
} | ||
// The sizeof uint64 in binary is 8. | ||
if len(buf) < 8 { | ||
return 0, errInvalidValue | ||
} | ||
return int(binary.LittleEndian.Uint64(buf)), nil | ||
} | ||
VihasMakwana marked this conversation as resolved.
Show resolved
Hide resolved
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For existing usage, this will be a no-op.