Skip to content

Commit

Permalink
Async saving offset in the journalctl plugin
Browse files Browse the repository at this point in the history
  • Loading branch information
vadimalekseev committed Jul 30, 2023
1 parent f06730e commit bd3dbd5
Show file tree
Hide file tree
Showing 7 changed files with 122 additions and 19 deletions.
10 changes: 0 additions & 10 deletions pipeline/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,6 @@ import (
insaneJSON "github.com/vitkovskii/insane-json"
)

// Clone deeply copies string
func CloneString(s string) string {
if s == "" {
return ""
}
b := make([]byte, len(s))
copy(b, s)
return *(*string)(unsafe.Pointer(&b))
}

// ByteToStringUnsafe converts byte slice to string without memory copy
// This creates mutable string, thus unsafe method, should be used with caution (never modify provided byte slice)
func ByteToStringUnsafe(b []byte) string {
Expand Down
13 changes: 13 additions & 0 deletions plugin/input/journalctl/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,5 +17,18 @@ you can use any additional args.
<br>

**`persistence_mode`** *`string`* *`default=async`* *`options=async|sync`*

It defines how to save the offsets file:
* `async` – it periodically saves the offsets using `async_interval`. The saving operation is skipped if offsets haven't been changed. Suitable, in most cases, it guarantees at least one delivery and makes almost no overhead.
* `sync` – saves offsets as part of event commitment. It's very slow but excludes the possibility of event duplication in extreme situations like power loss.

Save operation takes three steps:
* Write the temporary file with all offsets;
* Call `fsync()` on it;
* Rename the temporary file to the original one.

<br>


<br>*Generated using [__insane-doc__](https://github.com/vitkovskii/insane-doc)*
27 changes: 27 additions & 0 deletions plugin/input/journalctl/debouncer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package journalctl

import (
"sync"
"time"
)

type Debouncer struct {
lastCall time.Time
// interval of time during which only 1 Do can be called
interval time.Duration
mx sync.Mutex
}

func NewDebouncer(interval time.Duration) *Debouncer {
return &Debouncer{interval: interval}
}

func (d *Debouncer) Do(cb func()) {
d.mx.Lock()
defer d.mx.Unlock()

if time.Since(d.lastCall) > d.interval {
cb()
d.lastCall = time.Now()
}
}
59 changes: 52 additions & 7 deletions plugin/input/journalctl/journalctl.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,11 @@
package journalctl

import (
"strings"
"sync/atomic"
"time"

"github.com/ozontech/file.d/cfg"
"github.com/ozontech/file.d/fd"
"github.com/ozontech/file.d/metric"
"github.com/ozontech/file.d/offset"
Expand All @@ -25,17 +28,26 @@ type Plugin struct {
currentOffset int64
logger *zap.Logger

offsetsDebouncer *Debouncer

// plugin metrics

offsetErrorsMetric *prometheus.CounterVec
journalCtlStopErrorMetric *prometheus.CounterVec
readerErrorsMetric *prometheus.CounterVec
}

type Config struct {
// ! config-params
// ^ config-params
type persistenceMode int

const (
// ! "persistenceMode" #1 /`([a-z]+)`/
persistenceModeAsync persistenceMode = iota // * `async` – it periodically saves the offsets using `async_interval`. The saving operation is skipped if offsets haven't been changed. Suitable, in most cases, it guarantees at least one delivery and makes almost no overhead.
persistenceModeSync // * `sync` – saves offsets as part of event commitment. It's very slow but excludes the possibility of event duplication in extreme situations like power loss.
)

// ! config-params
// ^ config-params
type Config struct {
// > @3@4@5@6
// >
// > The filename to store offsets of processed messages.
Expand All @@ -51,6 +63,24 @@ type Config struct {

// for testing mostly
MaxLines int `json:"max_lines"`

// > @3@4@5@6
// >
// > It defines how to save the offsets file:
// > @persistenceMode|comment-list
// >
// > Save operation takes three steps:
// > * Write the temporary file with all offsets;
// > * Call `fsync()` on it;
// > * Rename the temporary file to the original one.
PersistenceMode string `json:"persistence_mode" default:"async" options:"async|sync"` // *
PersistenceMode_ persistenceMode

// > @3@4@5@6
// >
// > Offsets saving interval. Only used if `persistence_mode` is set to `async`.
AsyncInterval cfg.Duration `json:"async_interval" default:"1s" parse:"duration"`
AsyncInterval_ time.Duration
}

type offsetInfo struct {
Expand Down Expand Up @@ -93,6 +123,13 @@ func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.InputPluginPa
}
p.offInfo.Store(offInfo)

if p.config.PersistenceMode_ == persistenceModeAsync {
if p.config.AsyncInterval_ < 0 {
p.logger.Fatal("invalid async interval", zap.Duration("interval", p.config.AsyncInterval_))
}
p.offsetsDebouncer = NewDebouncer(p.config.AsyncInterval_)
}

readConfig := &journalReaderConfig{
output: p,
cursor: offInfo.Cursor,
Expand Down Expand Up @@ -128,12 +165,20 @@ func (p *Plugin) Stop() {

func (p *Plugin) Commit(event *pipeline.Event) {
offInfo := *p.offInfo.Load()
offInfo.set(pipeline.CloneString(event.Root.Dig("__CURSOR").AsString()))
offInfo.set(strings.Clone(event.Root.Dig("__CURSOR").AsString()))
p.offInfo.Store(&offInfo)

if err := offset.SaveYAML(p.config.OffsetsFile, offInfo); err != nil {
p.offsetErrorsMetric.WithLabelValues().Inc()
p.logger.Error("can't save offset file", zap.Error(err))
sync := func() {
if err := offset.SaveYAML(p.config.OffsetsFile, offInfo); err != nil {
p.offsetErrorsMetric.WithLabelValues().Inc()
p.logger.Error("can't save offset file", zap.Error(err))
}
}

if p.config.PersistenceMode_ == persistenceModeSync {
sync()
} else {
p.offsetsDebouncer.Do(sync)
}
}

Expand Down
27 changes: 27 additions & 0 deletions plugin/input/journalctl/journalctl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,19 @@
package journalctl

import (
"path"
"path/filepath"
"strings"
"sync"
"testing"
"time"

"github.com/ozontech/file.d/cfg"
"github.com/ozontech/file.d/pipeline"
"github.com/ozontech/file.d/plugin/output/devnull"
"github.com/ozontech/file.d/test"
"github.com/stretchr/testify/assert"
insaneJSON "github.com/vitkovskii/insane-json"
)

func setInput(p *pipeline.Pipeline, config *Config) {
Expand Down Expand Up @@ -107,3 +110,27 @@ func TestOffsets(t *testing.T) {
assert.Equal(t, 1, cnt)
}
}

func BenchmarkName(b *testing.B) {
p := Plugin{config: &Config{
OffsetsFile: path.Join(b.TempDir(), "offsets.yaml"),
}, offsetsDebouncer: NewDebouncer(time.Millisecond * 200)}
p.offInfo.Store(&offsetInfo{})

event := &pipeline.Event{Root: insaneJSON.Spawn()}
defer insaneJSON.Release(event.Root)

p.config.PersistenceMode_ = persistenceModeSync
b.Run("sync", func(b *testing.B) {
for i := 0; i < b.N; i++ {
p.Commit(event)
}
})

p.config.PersistenceMode_ = persistenceModeAsync
b.Run("async", func(b *testing.B) {
for i := 0; i < b.N; i++ {
p.Commit(event)
}
})
}
2 changes: 1 addition & 1 deletion plugin/output/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ func (p *Plugin) out(workerData *pipeline.WorkerData, batch *pipeline.Batch) {
if p.config.UseTopicField {
fieldValue := event.Root.Dig(p.config.TopicField).AsString()
if fieldValue != "" {
topic = pipeline.CloneString(fieldValue)
topic = strings.Clone(fieldValue)
}
}

Expand Down
3 changes: 2 additions & 1 deletion plugin/output/s3/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"path/filepath"
"sort"
"strconv"
"strings"
"sync"
"time"

Expand Down Expand Up @@ -339,7 +340,7 @@ func (p *Plugin) Out(event *pipeline.Event) {

// getBucketName decides which s3 bucket shall receive event.
func (p *Plugin) getBucketName(event *pipeline.Event) string {
bucketName := pipeline.CloneString(event.Root.Dig(p.config.BucketEventField).AsString())
bucketName := strings.Clone(event.Root.Dig(p.config.BucketEventField).AsString())

// no BucketEventField in message, it's DefaultBucket, showtime
if bucketName == "" {
Expand Down

0 comments on commit bd3dbd5

Please sign in to comment.