Skip to content

Commit

Permalink
Fix the test cases
Browse files Browse the repository at this point in the history
  • Loading branch information
Vihas Splunk committed Mar 23, 2023
1 parent 12aa0fa commit 74fe33f
Show file tree
Hide file tree
Showing 5 changed files with 111 additions and 95 deletions.
37 changes: 16 additions & 21 deletions pkg/stanza/fileconsumer/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,7 @@ type Manager struct {
knownFilesLock sync.RWMutex
seenPaths map[string]struct{}

currentFiles []*os.File
currentFps []*Fingerprint
currentFps []*Fingerprint

readerChan chan ReaderWrapper
pathHash map[string]*Fingerprint
Expand Down Expand Up @@ -141,15 +140,14 @@ func (m *Manager) poll(ctx context.Context) {
// Get the list of paths on disk
matches := m.finder.FindFiles()
m.consume(ctx, matches)
m.clearCurrentFiles()
m.clearCurrentFingerprints()

// Any new files that appear should be consumed entirely
m.readerFactory.fromBeginning = true
m.syncLastPollFiles(ctx)
}

func (m *Manager) clearCurrentFiles() {
m.currentFiles = make([]*os.File, 0)
func (m *Manager) clearCurrentFingerprints() {
m.currentFps = make([]*Fingerprint, 0)
}

Expand Down Expand Up @@ -184,12 +182,16 @@ func (m *Manager) consume(ctx context.Context, paths []string) {
m.Debug("Consuming files")
m.handleLostFiles(ctx)
for _, path := range paths {
reader := m.makeReader(path)
reader, fp := m.makeReader(path)
if reader == nil {
fmt.Println("Couldn't create reader for ", path)
continue
}
m.readerChan <- ReaderWrapper{reader: reader, path: path}
// add path and fingerprint as it's not consuming
m.pathHashLock.Lock()
m.pathHash[path] = fp
m.pathHashLock.Unlock()
}
}

Expand All @@ -201,8 +203,6 @@ func (m *Manager) isCurrentlyConsuming(path string, fp *Fingerprint) bool {
return true
}
}
// add path and fingerprint as it's not consuming
m.pathHash[path] = fp
return false
}

Expand Down Expand Up @@ -238,7 +238,7 @@ func (m *Manager) rollReaders(ctx context.Context, readers []*Reader) {
m.clearOldReaders()
}

func (m *Manager) makeReader(filePath string) *Reader {
func (m *Manager) makeReader(filePath string) (*Reader, *Fingerprint) {
if _, ok := m.seenPaths[filePath]; !ok {
if m.readerFactory.fromBeginning {
m.Infow("Started watching file", "path", filePath)
Expand All @@ -250,32 +250,28 @@ func (m *Manager) makeReader(filePath string) *Reader {
file, err := os.Open(filePath) // #nosec - operator must read in files defined by user
if err != nil {
m.Debugf("Failed to open file", zap.Error(err))
return nil
return nil, nil
}
m.currentFiles = append(m.currentFiles, file)
fp, err := m.readerFactory.newFingerprint(file)
if err != nil {
m.Errorw("Failed creating fingerprint", zap.Error(err))
return nil
return nil, nil
}
// Exclude any empty fingerprints or duplicate fingerprints to avoid doubling up on copy-truncate files

if len(fp.FirstBytes) == 0 {
if err = file.Close(); err != nil {
m.Errorf("problem closing file", "file", file.Name())
}
// Empty file, don't read it until we can compare its fingerprint
m.currentFiles = m.currentFiles[:len(m.currentFiles)-1]
return nil
return nil, nil
}

// check if the current file is already being consumed
if m.isCurrentlyConsuming(filePath, fp) {
if err = file.Close(); err != nil {
m.Errorf("problem closing file", "file", file.Name())
}
m.currentFiles = m.currentFiles[:len(m.currentFiles)-1]
return nil
return nil, nil
}
m.currentFps = append(m.currentFps, fp)

Expand All @@ -286,20 +282,19 @@ func (m *Manager) makeReader(filePath string) *Reader {
if err = file.Close(); err != nil {
m.Errorf("problem closing file", "file", file.Name())
}
m.currentFiles = m.currentFiles[:len(m.currentFiles)-1]
m.currentFps = m.currentFps[:len(m.currentFps)-1]
m.removePath(filePath)
return nil
return nil, nil
}
}

reader, err := m.newReader(file, fp)
if err != nil {
m.Errorw("Failed to create reader", zap.Error(err))
m.removePath(filePath)
return nil
return nil, nil
}
return reader
return reader, fp
}

// saveCurrent adds the readers from this polling interval to this list of
Expand Down
Loading

0 comments on commit 74fe33f

Please sign in to comment.