Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[chore][tracker]: save most recent (archive) write index to disk #36799

Open
wants to merge 15 commits into
base: main
Choose a base branch
from
8 changes: 5 additions & 3 deletions pkg/stanza/fileconsumer/internal/checkpoint/checkpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import (
"errors"
"fmt"

"go.opentelemetry.io/collector/extension/experimental/storage"

"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/reader"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator"
)
Expand All @@ -21,7 +23,7 @@ func Save(ctx context.Context, persister operator.Persister, rmds []*reader.Meta
return SaveKey(ctx, persister, rmds, knownFilesKey)
}

func SaveKey(ctx context.Context, persister operator.Persister, rmds []*reader.Metadata, key string) error {
func SaveKey(ctx context.Context, persister operator.Persister, rmds []*reader.Metadata, key string, ops ...storage.Operation) error {
var buf bytes.Buffer
enc := json.NewEncoder(&buf)

Expand All @@ -37,8 +39,8 @@ func SaveKey(ctx context.Context, persister operator.Persister, rmds []*reader.M
errs = append(errs, fmt.Errorf("encode metadata: %w", err))
}
}

if err := persister.Set(ctx, key, buf.Bytes()); err != nil {
ops = append(ops, storage.SetOperation(key, buf.Bytes()))
if err := persister.Batch(ctx, ops...); err != nil {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For existing usage, this will be a no-op.

errs = append(errs, fmt.Errorf("persist known files: %w", err))
}

Expand Down
64 changes: 58 additions & 6 deletions pkg/stanza/fileconsumer/internal/tracker/tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,12 @@ package tracker // import "github.com/open-telemetry/opentelemetry-collector-con

import (
"context"
"encoding/binary"
"errors"
"fmt"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/extension/experimental/storage"
"go.uber.org/zap"

"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/checkpoint"
Expand Down Expand Up @@ -52,13 +55,18 @@ type fileTracker struct {
archiveIndex int
}

var errInvalidValue = errors.New("invalid value")

var archiveIndexKey = "knonwFiles_ai"
VihasMakwana marked this conversation as resolved.
Show resolved Hide resolved

func NewFileTracker(set component.TelemetrySettings, maxBatchFiles int, pollsToArchive int, persister operator.Persister) Tracker {
knownFiles := make([]*fileset.Fileset[*reader.Metadata], 3)
for i := 0; i < len(knownFiles); i++ {
knownFiles[i] = fileset.New[*reader.Metadata](maxBatchFiles)
}
set.Logger = set.Logger.With(zap.String("tracker", "fileTracker"))
return &fileTracker{

t := &fileTracker{
set: set,
maxBatchFiles: maxBatchFiles,
currentPollFiles: fileset.New[*reader.Reader](maxBatchFiles),
Expand All @@ -68,6 +76,9 @@ func NewFileTracker(set component.TelemetrySettings, maxBatchFiles int, pollsToA
persister: persister,
archiveIndex: 0,
}
t.restoreArchiveIndex()

return t
}

func (t *fileTracker) Add(reader *reader.Reader) {
Expand Down Expand Up @@ -144,6 +155,26 @@ func (t *fileTracker) TotalReaders() int {
return total
}

func (t *fileTracker) restoreArchiveIndex() {
if !t.archiveEnabled() {
return
}
VihasMakwana marked this conversation as resolved.
Show resolved Hide resolved
byteIndex, err := t.persister.Get(context.Background(), archiveIndexKey)
if err != nil {
t.set.Logger.Error("error while reading the archiveIndexKey. Starting from 0", zap.Error(err))
return
}
t.archiveIndex, err = byteToIndex(byteIndex)
if err != nil {
t.set.Logger.Error("error getting read index. Starting from 0", zap.Error(err))
} else if t.archiveIndex < 0 || t.archiveIndex >= t.pollsToArchive {
// safety check. It can happen if `polls_to_archive` was changed.
// It's best if we reset the index or else we might end up writing invalid keys
t.set.Logger.Warn("the read index was found, but it exceeds the bounds. Starting from 0")
t.archiveIndex = 0
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea to check for this case.

However, I wonder if we can handle it better than restarting from zero. What would it take to search the archive for the most recently updated?

I think we could maintain some kind of data structure which notes the time each archive was written. Maybe just map[index]time.Time. Then when we first create the tracker, we can load this up and find the most recent timestamp. We can also check for the case where pollsToArchive has changed and then rewrite the storage to align with the new value.

For example, if we previously saved 10 archives and find that pollsToArchive is now 5, we can find the 5 most recent indices based on the timestamp structure, then rewrite the archive files so that these are 0-4. We should probably even delete the extras from storage as well.

Copy link
Contributor Author

@VihasMakwana VihasMakwana Dec 13, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@djaglowski This solution does makes sense to me, but it becomes tricky when we eventually overwrite old archive data, as it is a ring buffer.
We might need to load the filesets in memory.
I'll find a few ways.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it becomes tricky when we eventually overwrite old archive data, as it is a ring buffer.

Can you elaborate?

We might need to load the filesets in memory.

If it's more than one at a time then it defeats the point of the archive.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you elaborate?

Consider this archive,

Screenshot 2024-12-23 at 8 37 38 PM

We've rolled over once and the latest data is at index 4 and archiveIndex (i.e. where the next data will be written) is at index 5.

Let's suppose that new polls_to_archive is 7.
We now need to construct a new, smaller archive with 7 most recent elements.
These elements are (from most recent to least recent):

14, 13, 12, 11, 10, 9, 8

We cannot simply rewrite archive in-place without caching values.

It would be much simpler to convert archive like following image,

Screenshot 2024-12-23 at 8 41 43 PM

and we would delete excess data.

Wdyt?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What would it take to search the archive for the most recently updated?

It would always be data stored at archiveIndex-1 index. We will store archiveIndex on disk, so in next collector run, we would load that value and we can find most recent data.

archiveIndex points at the next location where data will be written.
This can point to either of following:

  • Least recent data
  • Pointing to an empty slot (archive is partially filled)

}

func (t *fileTracker) archive(metadata *fileset.Fileset[*reader.Metadata]) {
// We make use of a ring buffer, where each set of files is stored under a specific index.
// Instead of discarding knownFiles[2], write it to the next index and eventually roll over.
Expand All @@ -162,13 +193,15 @@ func (t *fileTracker) archive(metadata *fileset.Fileset[*reader.Metadata]) {
// start
// index

if t.pollsToArchive <= 0 || t.persister == nil {
if !t.archiveEnabled() {
VihasMakwana marked this conversation as resolved.
Show resolved Hide resolved
return
}
if err := t.writeArchive(t.archiveIndex, metadata); err != nil {
index := t.archiveIndex
t.archiveIndex = (t.archiveIndex + 1) % t.pollsToArchive // increment the index
indexOp := storage.SetOperation(archiveIndexKey, intToByte(t.archiveIndex)) // batch the updated index with metadata
if err := t.writeArchive(index, metadata, indexOp); err != nil {
t.set.Logger.Error("error faced while saving to the archive", zap.Error(err))
}
t.archiveIndex = (t.archiveIndex + 1) % t.pollsToArchive // increment the index
}

// readArchive loads data from the archive for a given index and returns a fileset.Filset.
Expand All @@ -184,9 +217,13 @@ func (t *fileTracker) readArchive(index int) (*fileset.Fileset[*reader.Metadata]
}

// writeArchive saves data to the archive for a given index and returns an error, if encountered.
func (t *fileTracker) writeArchive(index int, rmds *fileset.Fileset[*reader.Metadata]) error {
func (t *fileTracker) writeArchive(index int, rmds *fileset.Fileset[*reader.Metadata], ops ...storage.Operation) error {
key := fmt.Sprintf("knownFiles%d", index)
return checkpoint.SaveKey(context.Background(), t.persister, rmds.Get(), key)
return checkpoint.SaveKey(context.Background(), t.persister, rmds.Get(), key, ops...)
}

func (t *fileTracker) archiveEnabled() bool {
return t.pollsToArchive > 0 && t.persister != nil
}

// FindFiles goes through archive, one fileset at a time and tries to match all fingerprints against that loaded set.
Expand Down Expand Up @@ -295,3 +332,18 @@ func (t *noStateTracker) EndPoll() {}
func (t *noStateTracker) TotalReaders() int { return 0 }

func (t *noStateTracker) FindFiles([]*fingerprint.Fingerprint) []*reader.Metadata { return nil }

func intToByte(val int) []byte {
return binary.LittleEndian.AppendUint64([]byte{}, uint64(val))
}

func byteToIndex(buf []byte) (int, error) {
if buf == nil {
return 0, nil
}
// The sizeof uint64 in binary is 8.
if len(buf) < 8 {
return 0, errInvalidValue
}
return int(binary.LittleEndian.Uint64(buf)), nil
}
VihasMakwana marked this conversation as resolved.
Show resolved Hide resolved
31 changes: 31 additions & 0 deletions pkg/stanza/fileconsumer/internal/tracker/tracker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"go.opentelemetry.io/collector/component/componenttest"

"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/checkpoint"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/fileset"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/fingerprint"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/reader"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator"
Expand Down Expand Up @@ -44,6 +45,36 @@ func TestFindFilesOrder(t *testing.T) {
}
}

func TestIndexInBounds(t *testing.T) {
persister := testutil.NewUnscopedMockPersister()
pollsToArchive := 100
tracker := NewFileTracker(componenttest.NewNopTelemetrySettings(), 0, pollsToArchive, persister).(*fileTracker)

// no index exists. archiveIndex should be 0
require.Equal(t, 0, tracker.archiveIndex)

// run archiving. Each time, index should be in bound.
for i := 0; i < 1099; i++ {
require.Equalf(t, i%pollsToArchive, tracker.archiveIndex, "Index should %d, but was %d", i%pollsToArchive, tracker.archiveIndex)
tracker.archive(&fileset.Fileset[*reader.Metadata]{})
require.Truef(t, tracker.archiveIndex >= 0 && tracker.archiveIndex < pollsToArchive, "Index should be between 0 and %d, but was %d", pollsToArchive, tracker.archiveIndex)
}
oldIndex := tracker.archiveIndex

// re-create archive
tracker = NewFileTracker(componenttest.NewNopTelemetrySettings(), 0, pollsToArchive, persister).(*fileTracker)

// index should exist and new archiveIndex should be equal to oldIndex
require.Equalf(t, oldIndex, tracker.archiveIndex, "New index should %d, but was %d", oldIndex, tracker.archiveIndex)

// re-create archive, with reduced pollsToArchive
pollsToArchive = 70
tracker = NewFileTracker(componenttest.NewNopTelemetrySettings(), 0, pollsToArchive, persister).(*fileTracker)

// index should exist but it is out of bounds. So it should reset to 0
require.Equalf(t, 0, tracker.archiveIndex, "Index should be reset to 0 but was %d", tracker.archiveIndex)
}

func populatedPersisterData(persister operator.Persister, fps []*fingerprint.Fingerprint) []bool {
md := make([]*reader.Metadata, 0)

Expand Down
10 changes: 10 additions & 0 deletions pkg/stanza/operator/persister.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,16 @@ package operator // import "github.com/open-telemetry/opentelemetry-collector-co
import (
"context"
"fmt"

"go.opentelemetry.io/collector/extension/experimental/storage"
)

// Persister is an interface used to persist data
type Persister interface {
Get(context.Context, string) ([]byte, error)
Set(context.Context, string, []byte) error
Delete(context.Context, string) error
Batch(ctx context.Context, ops ...storage.Operation) error
}

type scopedPersister struct {
Expand All @@ -38,3 +41,10 @@ func (p scopedPersister) Set(ctx context.Context, key string, value []byte) erro
func (p scopedPersister) Delete(ctx context.Context, key string) error {
return p.Persister.Delete(ctx, fmt.Sprintf("%s.%s", p.scope, key))
}

func (p scopedPersister) Batch(ctx context.Context, ops ...storage.Operation) error {
for _, op := range ops {
op.Key = fmt.Sprintf("%s.%s", p.scope, op.Key)
}
return p.Persister.Batch(ctx, ops...)
}
20 changes: 20 additions & 0 deletions pkg/stanza/testutil/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (
"strings"
"sync"

"go.opentelemetry.io/collector/extension/experimental/storage"

"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator"
)

Expand Down Expand Up @@ -46,6 +48,24 @@ func (p *mockPersister) Delete(_ context.Context, k string) error {
return nil
}

func (p *mockPersister) Batch(_ context.Context, ops ...storage.Operation) error {
var err error
for _, op := range ops {
switch op.Type {
case storage.Get:
op.Value, err = p.Get(context.Background(), op.Key)
case storage.Set:
err = p.Set(context.Background(), op.Key, op.Value)
case storage.Delete:
err = p.Delete(context.Background(), op.Key)
}
if err != nil {
return err
}
}
return nil
}

// NewUnscopedMockPersister will return a new persister for testing
func NewUnscopedMockPersister() operator.Persister {
data := make(map[string][]byte)
Expand Down
Loading