diff --git a/pipeline/util.go b/pipeline/util.go
index 976370ee5..bf5762bf1 100644
--- a/pipeline/util.go
+++ b/pipeline/util.go
@@ -11,16 +11,6 @@ import (
insaneJSON "github.com/vitkovskii/insane-json"
)
-// Clone deeply copies string
-func CloneString(s string) string {
- if s == "" {
- return ""
- }
- b := make([]byte, len(s))
- copy(b, s)
- return *(*string)(unsafe.Pointer(&b))
-}
-
// ByteToStringUnsafe converts byte slice to string without memory copy
// This creates mutable string, thus unsafe method, should be used with caution (never modify provided byte slice)
func ByteToStringUnsafe(b []byte) string {
diff --git a/plugin/input/journalctl/README.md b/plugin/input/journalctl/README.md
index eb884a2c7..574485d6d 100755
--- a/plugin/input/journalctl/README.md
+++ b/plugin/input/journalctl/README.md
@@ -17,5 +17,18 @@ you can use any additional args.
+**`persistence_mode`** *`string`* *`default=async`* *`options=async|sync`*
+
+It defines how to save the offsets file:
+* `async` – it periodically saves the offsets using `async_interval`. The saving operation is skipped if offsets haven't been changed. Suitable, in most cases, it guarantees at least one delivery and makes almost no overhead.
+* `sync` – saves offsets as part of event commitment. It's very slow but excludes the possibility of event duplication in extreme situations like power loss.
+
+Save operation takes three steps:
+* Write the temporary file with all offsets;
+* Call `fsync()` on it;
+* Rename the temporary file to the original one.
+
+
+
*Generated using [__insane-doc__](https://github.com/vitkovskii/insane-doc)*
\ No newline at end of file
diff --git a/plugin/input/journalctl/debouncer.go b/plugin/input/journalctl/debouncer.go
new file mode 100644
index 000000000..bf5596589
--- /dev/null
+++ b/plugin/input/journalctl/debouncer.go
@@ -0,0 +1,27 @@
+package journalctl
+
+import (
+ "sync"
+ "time"
+)
+
+type Debouncer struct {
+ lastCall time.Time
+ // interval of time during which only 1 Do can be called
+ interval time.Duration
+ mx sync.Mutex
+}
+
+func NewDebouncer(interval time.Duration) *Debouncer {
+ return &Debouncer{interval: interval}
+}
+
+func (d *Debouncer) Do(cb func()) {
+ d.mx.Lock()
+ defer d.mx.Unlock()
+
+ if time.Since(d.lastCall) > d.interval {
+ cb()
+ d.lastCall = time.Now()
+ }
+}
diff --git a/plugin/input/journalctl/journalctl.go b/plugin/input/journalctl/journalctl.go
index 3cfc07400..d50e9f3fe 100644
--- a/plugin/input/journalctl/journalctl.go
+++ b/plugin/input/journalctl/journalctl.go
@@ -3,8 +3,11 @@
package journalctl
import (
+ "strings"
"sync/atomic"
+ "time"
+ "github.com/ozontech/file.d/cfg"
"github.com/ozontech/file.d/fd"
"github.com/ozontech/file.d/metric"
"github.com/ozontech/file.d/offset"
@@ -25,6 +28,8 @@ type Plugin struct {
currentOffset int64
logger *zap.Logger
+ offsetsDebouncer *Debouncer
+
// plugin metrics
offsetErrorsMetric *prometheus.CounterVec
@@ -32,10 +37,17 @@ type Plugin struct {
readerErrorsMetric *prometheus.CounterVec
}
-type Config struct {
- // ! config-params
- // ^ config-params
+type persistenceMode int
+const (
+ // ! "persistenceMode" #1 /`([a-z]+)`/
+ persistenceModeAsync persistenceMode = iota // * `async` – it periodically saves the offsets using `async_interval`. The saving operation is skipped if offsets haven't been changed. Suitable, in most cases, it guarantees at least one delivery and makes almost no overhead.
+ persistenceModeSync // * `sync` – saves offsets as part of event commitment. It's very slow but excludes the possibility of event duplication in extreme situations like power loss.
+)
+
+// ! config-params
+// ^ config-params
+type Config struct {
// > @3@4@5@6
// >
// > The filename to store offsets of processed messages.
@@ -51,6 +63,24 @@ type Config struct {
// for testing mostly
MaxLines int `json:"max_lines"`
+
+ // > @3@4@5@6
+ // >
+ // > It defines how to save the offsets file:
+ // > @persistenceMode|comment-list
+ // >
+ // > Save operation takes three steps:
+ // > * Write the temporary file with all offsets;
+ // > * Call `fsync()` on it;
+ // > * Rename the temporary file to the original one.
+ PersistenceMode string `json:"persistence_mode" default:"async" options:"async|sync"` // *
+ PersistenceMode_ persistenceMode
+
+ // > @3@4@5@6
+ // >
+ // > Offsets saving interval. Only used if `persistence_mode` is set to `async`.
+ AsyncInterval cfg.Duration `json:"async_interval" default:"1s" parse:"duration"`
+ AsyncInterval_ time.Duration
}
type offsetInfo struct {
@@ -93,6 +123,13 @@ func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.InputPluginPa
}
p.offInfo.Store(offInfo)
+ if p.config.PersistenceMode_ == persistenceModeAsync {
+ if p.config.AsyncInterval_ < 0 {
+ p.logger.Fatal("invalid async interval", zap.Duration("interval", p.config.AsyncInterval_))
+ }
+ p.offsetsDebouncer = NewDebouncer(p.config.AsyncInterval_)
+ }
+
readConfig := &journalReaderConfig{
output: p,
cursor: offInfo.Cursor,
@@ -128,12 +165,20 @@ func (p *Plugin) Stop() {
func (p *Plugin) Commit(event *pipeline.Event) {
offInfo := *p.offInfo.Load()
- offInfo.set(pipeline.CloneString(event.Root.Dig("__CURSOR").AsString()))
+ offInfo.set(strings.Clone(event.Root.Dig("__CURSOR").AsString()))
p.offInfo.Store(&offInfo)
- if err := offset.SaveYAML(p.config.OffsetsFile, offInfo); err != nil {
- p.offsetErrorsMetric.WithLabelValues().Inc()
- p.logger.Error("can't save offset file", zap.Error(err))
+ sync := func() {
+ if err := offset.SaveYAML(p.config.OffsetsFile, offInfo); err != nil {
+ p.offsetErrorsMetric.WithLabelValues().Inc()
+ p.logger.Error("can't save offset file", zap.Error(err))
+ }
+ }
+
+ if p.config.PersistenceMode_ == persistenceModeSync {
+ sync()
+ } else {
+ p.offsetsDebouncer.Do(sync)
}
}
diff --git a/plugin/input/journalctl/journalctl_test.go b/plugin/input/journalctl/journalctl_test.go
index 923d157af..51e0b55e3 100644
--- a/plugin/input/journalctl/journalctl_test.go
+++ b/plugin/input/journalctl/journalctl_test.go
@@ -3,16 +3,19 @@
package journalctl
import (
+ "path"
"path/filepath"
"strings"
"sync"
"testing"
+ "time"
"github.com/ozontech/file.d/cfg"
"github.com/ozontech/file.d/pipeline"
"github.com/ozontech/file.d/plugin/output/devnull"
"github.com/ozontech/file.d/test"
"github.com/stretchr/testify/assert"
+ insaneJSON "github.com/vitkovskii/insane-json"
)
func setInput(p *pipeline.Pipeline, config *Config) {
@@ -107,3 +110,27 @@ func TestOffsets(t *testing.T) {
assert.Equal(t, 1, cnt)
}
}
+
+func BenchmarkName(b *testing.B) {
+ p := Plugin{config: &Config{
+ OffsetsFile: path.Join(b.TempDir(), "offsets.yaml"),
+ }, offsetsDebouncer: NewDebouncer(time.Millisecond * 200)}
+ p.offInfo.Store(&offsetInfo{})
+
+ event := &pipeline.Event{Root: insaneJSON.Spawn()}
+ defer insaneJSON.Release(event.Root)
+
+ p.config.PersistenceMode_ = persistenceModeSync
+ b.Run("sync", func(b *testing.B) {
+ for i := 0; i < b.N; i++ {
+ p.Commit(event)
+ }
+ })
+
+ p.config.PersistenceMode_ = persistenceModeAsync
+ b.Run("async", func(b *testing.B) {
+ for i := 0; i < b.N; i++ {
+ p.Commit(event)
+ }
+ })
+}
diff --git a/plugin/output/kafka/kafka.go b/plugin/output/kafka/kafka.go
index 30da23fce..b1559ba19 100644
--- a/plugin/output/kafka/kafka.go
+++ b/plugin/output/kafka/kafka.go
@@ -189,7 +189,7 @@ func (p *Plugin) out(workerData *pipeline.WorkerData, batch *pipeline.Batch) {
if p.config.UseTopicField {
fieldValue := event.Root.Dig(p.config.TopicField).AsString()
if fieldValue != "" {
- topic = pipeline.CloneString(fieldValue)
+ topic = strings.Clone(fieldValue)
}
}
diff --git a/plugin/output/s3/s3.go b/plugin/output/s3/s3.go
index fcc03200c..fc0343282 100644
--- a/plugin/output/s3/s3.go
+++ b/plugin/output/s3/s3.go
@@ -10,6 +10,7 @@ import (
"path/filepath"
"sort"
"strconv"
+ "strings"
"sync"
"time"
@@ -339,7 +340,7 @@ func (p *Plugin) Out(event *pipeline.Event) {
// getBucketName decides which s3 bucket shall receive event.
func (p *Plugin) getBucketName(event *pipeline.Event) string {
- bucketName := pipeline.CloneString(event.Root.Dig(p.config.BucketEventField).AsString())
+ bucketName := strings.Clone(event.Root.Dig(p.config.BucketEventField).AsString())
// no BucketEventField in message, it's DefaultBucket, showtime
if bucketName == "" {