Skip to content

Commit

Permalink
Async commit for the journalctl plugin
Browse files Browse the repository at this point in the history
  • Loading branch information
vadimalekseev committed Jul 31, 2023
1 parent bd3dbd5 commit 248c3cb
Show file tree
Hide file tree
Showing 5 changed files with 102 additions and 36 deletions.
3 changes: 2 additions & 1 deletion pipeline/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -611,8 +611,9 @@ func (p *Pipeline) expandProcs() {
p.logger.Warn("too many processors", zap.Int32("new", to))
}

lastProc := p.Procs[from-1]
for x := 0; x < int(to-from); x++ {
proc := p.newProc(p.Procs[from-1].id + x)
proc := p.newProc(lastProc.id + x) // create new proc with last+1 id
p.Procs = append(p.Procs, proc)
proc.start(p.actionParams, p.logger.Sugar())
}
Expand Down
79 changes: 79 additions & 0 deletions plugin/input/journalctl/commiter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
package journalctl

import (
"strings"
"sync"
"sync/atomic"

"github.com/ozontech/file.d/pipeline"
)

type SaveOffsetsFunc func(info offsetInfo)

type Commiter interface {
Commit(event *pipeline.Event)
Shutdown()
}

type SyncCommiter struct {
offset offsetInfo
save SaveOffsetsFunc
}

func NewSyncCommiter(save SaveOffsetsFunc) *SyncCommiter {
return &SyncCommiter{save: save}
}

var _ Commiter = &SyncCommiter{}

func (a *SyncCommiter) Commit(event *pipeline.Event) {
a.offset.set(strings.Clone(event.Root.Dig("__CURSOR").AsString()))
a.save(a.offset)
}

func (a *SyncCommiter) Shutdown() {
// do nothing because we are saved the offsets in the commit func
}

type AsyncCommiter struct {
sync.Mutex

offset atomic.Pointer[offsetInfo]
debouncer Debouncer
save SaveOffsetsFunc
}

func NewAsyncCommiter(debouncer Debouncer, save SaveOffsetsFunc) *AsyncCommiter {
commiter := &AsyncCommiter{debouncer: debouncer, save: save}
commiter.offset.Store(&offsetInfo{})
return commiter
}

var _ Commiter = &AsyncCommiter{}

func (a *AsyncCommiter) Commit(event *pipeline.Event) {
offInfo := *a.offset.Load()
offInfo.set(strings.Clone(event.Root.Dig("__CURSOR").AsString()))
a.offset.Store(&offInfo)

// save offsets
a.Lock()
defer a.Unlock()

if !a.debouncer.Ready() {
return
}

a.debouncer.Do(func() {
a.save(offInfo)
})
}

func (a *AsyncCommiter) Shutdown() {
a.Lock()
defer a.Unlock()

a.debouncer.Do(func() {
a.save(*a.offset.Load())
})
}
15 changes: 7 additions & 8 deletions plugin/input/journalctl/debouncer.go
Original file line number Diff line number Diff line change
@@ -1,27 +1,26 @@
package journalctl

import (
"sync"
"time"
)

type Debouncer struct {
lastCall time.Time
// interval of time during which only 1 Do can be called
interval time.Duration
mx sync.Mutex
}

func NewDebouncer(interval time.Duration) *Debouncer {
return &Debouncer{interval: interval}
func NewDebouncer(interval time.Duration) Debouncer {
return Debouncer{interval: interval}
}

func (d *Debouncer) Do(cb func()) {
d.mx.Lock()
defer d.mx.Unlock()

if time.Since(d.lastCall) > d.interval {
if d.Ready() {
cb()
d.lastCall = time.Now()
}
}

func (d *Debouncer) Ready() bool {
return time.Since(d.lastCall) > d.interval
}
36 changes: 11 additions & 25 deletions plugin/input/journalctl/journalctl.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@
package journalctl

import (
"strings"
"sync/atomic"
"time"

"github.com/ozontech/file.d/cfg"
Expand All @@ -24,11 +22,10 @@ type Plugin struct {
params *pipeline.InputPluginParams
config *Config
reader *journalReader
offInfo atomic.Pointer[offsetInfo]
currentOffset int64
logger *zap.Logger

offsetsDebouncer *Debouncer
commiter Commiter

// plugin metrics

Expand Down Expand Up @@ -121,13 +118,14 @@ func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.InputPluginPa
p.offsetErrorsMetric.WithLabelValues().Inc()
p.logger.Error("can't load offset file", zap.Error(err))
}
p.offInfo.Store(offInfo)

if p.config.PersistenceMode_ == persistenceModeAsync {
if p.config.AsyncInterval_ < 0 {
p.logger.Fatal("invalid async interval", zap.Duration("interval", p.config.AsyncInterval_))
}
p.offsetsDebouncer = NewDebouncer(p.config.AsyncInterval_)
p.commiter = NewAsyncCommiter(NewDebouncer(p.config.AsyncInterval_), p.sync)
} else {
p.commiter = NewSyncCommiter(p.sync)
}

readConfig := &journalReaderConfig{
Expand Down Expand Up @@ -156,29 +154,17 @@ func (p *Plugin) Stop() {
p.logger.Error("can't stop journalctl cmd", zap.Error(err))
}

offsets := *p.offInfo.Load()
if err := offset.SaveYAML(p.config.OffsetsFile, offsets); err != nil {
p.offsetErrorsMetric.WithLabelValues().Inc()
p.logger.Error("can't save offset file", zap.Error(err))
}
p.commiter.Shutdown()
}

func (p *Plugin) Commit(event *pipeline.Event) {
offInfo := *p.offInfo.Load()
offInfo.set(strings.Clone(event.Root.Dig("__CURSOR").AsString()))
p.offInfo.Store(&offInfo)

sync := func() {
if err := offset.SaveYAML(p.config.OffsetsFile, offInfo); err != nil {
p.offsetErrorsMetric.WithLabelValues().Inc()
p.logger.Error("can't save offset file", zap.Error(err))
}
}
p.commiter.Commit(event)
}

if p.config.PersistenceMode_ == persistenceModeSync {
sync()
} else {
p.offsetsDebouncer.Do(sync)
func (p *Plugin) sync(offInfo offsetInfo) {
if err := offset.SaveYAML(p.config.OffsetsFile, offInfo); err != nil {
p.offsetErrorsMetric.WithLabelValues().Inc()
p.logger.Error("can't save offset file", zap.Error(err))
}
}

Expand Down
5 changes: 3 additions & 2 deletions plugin/input/journalctl/journalctl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,8 +114,8 @@ func TestOffsets(t *testing.T) {
func BenchmarkName(b *testing.B) {
p := Plugin{config: &Config{
OffsetsFile: path.Join(b.TempDir(), "offsets.yaml"),
}, offsetsDebouncer: NewDebouncer(time.Millisecond * 200)}
p.offInfo.Store(&offsetInfo{})
}}
p.commiter = NewSyncCommiter(p.sync)

event := &pipeline.Event{Root: insaneJSON.Spawn()}
defer insaneJSON.Release(event.Root)
Expand All @@ -127,6 +127,7 @@ func BenchmarkName(b *testing.B) {
}
})

p.commiter = NewAsyncCommiter(NewDebouncer(time.Second), p.sync)
p.config.PersistenceMode_ = persistenceModeAsync
b.Run("async", func(b *testing.B) {
for i := 0; i < b.N; i++ {
Expand Down

0 comments on commit 248c3cb

Please sign in to comment.