Skip to content

Commit

Permalink
Add trie.go
Browse files Browse the repository at this point in the history
  • Loading branch information
Vihas Splunk committed Apr 1, 2023
1 parent 74fe33f commit 1c27447
Show file tree
Hide file tree
Showing 4 changed files with 127 additions and 20 deletions.
3 changes: 2 additions & 1 deletion pkg/stanza/fileconsumer/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ type Config struct {

type ReaderWrapper struct {
reader *Reader
path string
fp *Fingerprint
}

// Build will build a file input operator from the supplied configuration
Expand Down Expand Up @@ -171,6 +171,7 @@ func (c Config) buildManager(logger *zap.SugaredLogger, emit EmitFunc, factory s
encodingConfig: c.Splitter.EncodingConfig,
headerSettings: hs,
},
trie: NewTrie(),
finder: c.Finder,
roller: newRoller(),
pollInterval: c.PollInterval,
Expand Down
32 changes: 14 additions & 18 deletions pkg/stanza/fileconsumer/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ type EmitFunc func(ctx context.Context, attrs *FileAttributes, token []byte)

type Manager struct {
*zap.SugaredLogger
wg sync.WaitGroup
pollerWg sync.WaitGroup
workerWg sync.WaitGroup
cancel context.CancelFunc
ctx context.Context
Expand All @@ -59,6 +59,9 @@ type Manager struct {
// readers[] store the readers of previous poll cycles and are used to keep track of lost files
readerLock sync.Mutex
readers []*Reader

// TRIE - this data structure stores the fingerprint of the files which are currently being consumed
trie *Trie
}

func (m *Manager) Start(persister operator.Persister) error {
Expand Down Expand Up @@ -93,7 +96,7 @@ func (m *Manager) Start(persister operator.Persister) error {
// Stop will stop the file monitoring process
func (m *Manager) Stop() error {
m.cancel()
m.wg.Wait()
m.pollerWg.Wait()
close(m.readerChan)
m.workerWg.Wait()
m.roller.cleanup()
Expand All @@ -109,9 +112,9 @@ func (m *Manager) Stop() error {
// startPoller kicks off a goroutine that will poll the filesystem periodically,
// checking if there are new files or new logs in the watched files
func (m *Manager) startPoller(ctx context.Context) {
m.wg.Add(1)
m.pollerWg.Add(1)
go func() {
defer m.wg.Done()
defer m.pollerWg.Done()
globTicker := time.NewTicker(m.pollInterval)
defer globTicker.Stop()

Expand Down Expand Up @@ -159,7 +162,7 @@ func (m *Manager) worker(ctx context.Context) {
if !ok {
return
}
r, path := chanData.reader, chanData.path
r, fp := chanData.reader, chanData.fp
r.ReadToEnd(ctx)
// Delete a file if deleteAfterRead is enabled and we reached the end of the file
if m.deleteAfterRead && r.eof {
Expand All @@ -174,7 +177,7 @@ func (m *Manager) worker(ctx context.Context) {
m.readers = append(m.readers, r)
m.readerLock.Unlock()
}
m.removePath(path)
m.removePath(fp)
}
}

Expand All @@ -187,28 +190,23 @@ func (m *Manager) consume(ctx context.Context, paths []string) {
fmt.Println("Couldn't create reader for ", path)
continue
}
m.readerChan <- ReaderWrapper{reader: reader, path: path}
// add path and fingerprint as it's not consuming
m.pathHashLock.Lock()
m.pathHash[path] = fp
m.trie.Put(fp.FirstBytes, true)
m.pathHashLock.Unlock()
m.readerChan <- ReaderWrapper{reader: reader, fp: fp}
}
}

func (m *Manager) isCurrentlyConsuming(path string, fp *Fingerprint) bool {
m.pathHashLock.Lock()
defer m.pathHashLock.Unlock()
if fp2, ok := m.pathHash[path]; ok {
if fp2.StartsWith(fp) || fp.StartsWith(fp2) {
return true
}
}
return false
return m.trie.Get(fp.FirstBytes) != nil
}

func (m *Manager) removePath(path string) {
func (m *Manager) removePath(fp *Fingerprint) {
m.pathHashLock.Lock()
delete(m.pathHash, path)
m.trie.Delete(fp.FirstBytes)
m.pathHashLock.Unlock()
}

Expand Down Expand Up @@ -283,15 +281,13 @@ func (m *Manager) makeReader(filePath string) (*Reader, *Fingerprint) {
m.Errorf("problem closing file", "file", file.Name())
}
m.currentFps = m.currentFps[:len(m.currentFps)-1]
m.removePath(filePath)
return nil, nil
}
}

reader, err := m.newReader(file, fp)
if err != nil {
m.Errorw("Failed to create reader", zap.Error(err))
m.removePath(filePath)
return nil, nil
}
return reader, fp
Expand Down
2 changes: 1 addition & 1 deletion pkg/stanza/fileconsumer/file_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1250,7 +1250,7 @@ func TestDeleteAfterRead_SkipPartials(t *testing.T) {

// Stop consuming before long file has been fully consumed
operator.cancel()
operator.wg.Wait()
operator.pollerWg.Wait()
close(operator.readerChan)
operator.workerWg.Wait()

Expand Down
110 changes: 110 additions & 0 deletions pkg/stanza/fileconsumer/trie.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
// Copyright 2022, OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// TRIE data structure inspired by https://github.com/dghubble/trie
// This defers from the original trie.

package fileconsumer

type Trie struct {
value interface{}
children map[byte]*Trie
}

// Trie node and the part string key of the child the path descends into.
type nodeTrie struct {
node *Trie
b byte
}

// NewPathTrie allocates and returns a new *Trie.
func NewTrie() *Trie {
return &Trie{}
}

func (trie *Trie) Get(key []byte) interface{} {
node := trie
for _, r := range key {
// We have reached end of the current path and all the previous characters have matched
// Return if current node is leaf and it is not root
if node.isLeaf() && node != trie {
return r
}
node = node.children[r]
if node == nil {
return nil
}
}
return node.value
}

// Put inserts the value into the trie at the given key
func (trie *Trie) Put(key []byte, value interface{}) {
node := trie
for _, r := range key {
child, _ := node.children[r]
if child == nil {
if node.children == nil {
node.children = map[byte]*Trie{}
}
child = NewTrie()
node.children[r] = child

// Assiging value to every child node allows us to detect partial matches.
// For eg. `123451` and `123456789` will match, even if they are not exactly same strings.
// Doing this, we store every prefix of the fingerprint.
node.value = value
}
node = child
}
node.value = value
}

// Delete removes the value associated with the given key. Returns true if a
// node was found for the given key. If the node or any of its ancestors
// becomes childless as a result, it is removed from the trie.
func (trie *Trie) Delete(key []byte) bool {
var path []nodeTrie // record ancestors to check later
node := trie
for _, b := range key {
path = append(path, nodeTrie{b: b, node: node})
node = node.children[b]
if node == nil {
// node does not exist
return false
}
}
// delete the node value
node.value = nil
// if leaf, remove it from its parent's children map. Repeat for ancestor path.
if node.isLeaf() {
// iterate backwards over path
for i := len(path) - 1; i >= 0; i-- {
parent := path[i].node
b := path[i].b
delete(parent.children, b)
if !parent.isLeaf() {
// parent has other children, stop
break
}
parent.children = nil
parent.value = nil
}
}
return true // node (internal or not) existed and its value was nil'd
}

func (trie *Trie) isLeaf() bool {
return len(trie.children) == 0
}

0 comments on commit 1c27447

Please sign in to comment.