From 28feecb05ccf0f741f0c49e6d044712ab33a23e5 Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Mon, 31 Jul 2023 10:19:34 +0300 Subject: [PATCH 1/2] Bump github.com/bitly/go-simplejson from 0.5.0 to 0.5.1 (#439) Bumps [github.com/bitly/go-simplejson](https://github.com/bitly/go-simplejson) from 0.5.0 to 0.5.1. - [Release notes](https://github.com/bitly/go-simplejson/releases) - [Commits](https://github.com/bitly/go-simplejson/compare/v0.5.0...v0.5.1) --- updated-dependencies: - dependency-name: github.com/bitly/go-simplejson dependency-type: direct:production update-type: version-update:semver-patch ... Signed-off-by: dependabot[bot] Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> --- go.mod | 3 +-- go.sum | 6 ++---- 2 files changed, 3 insertions(+), 6 deletions(-) diff --git a/go.mod b/go.mod index e0e76ef60..b2bbf74fa 100644 --- a/go.mod +++ b/go.mod @@ -10,7 +10,7 @@ require ( github.com/alecthomas/kingpin v2.2.6+incompatible github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d github.com/alicebob/miniredis/v2 v2.30.4 - github.com/bitly/go-simplejson v0.5.0 + github.com/bitly/go-simplejson v0.5.1 github.com/euank/go-kmsg-parser v2.0.0+incompatible github.com/go-redis/redis v6.15.9+incompatible github.com/golang/mock v1.6.0 @@ -44,7 +44,6 @@ require ( github.com/andybalholm/brotli v1.0.5 // indirect github.com/benbjohnson/clock v1.1.0 // indirect github.com/beorn7/perks v1.0.1 // indirect - github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869 // indirect github.com/cenkalti/backoff/v3 v3.0.0 // indirect github.com/cespare/xxhash/v2 v2.1.1 // indirect github.com/cilium/ebpf v0.9.1 // indirect diff --git a/go.sum b/go.sum index 4ee442b36..d085dc58e 100644 --- a/go.sum +++ b/go.sum @@ -33,10 +33,8 @@ github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+Ce github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= github.com/bgentry/speakeasy v0.1.0/go.mod h1:+zsyZBPWlz7T6j88CTgSN5bM796AkVf0kBD4zp0CCIs= -github.com/bitly/go-simplejson v0.5.0 h1:6IH+V8/tVMab511d5bn4M7EwGXZf9Hj6i2xSwkNEM+Y= -github.com/bitly/go-simplejson v0.5.0/go.mod h1:cXHtHw4XUPsvGaxgjIAn8PhEWG9NfngEKAMDJEczWVA= -github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869 h1:DDGfHa7BWjL4YnC6+E63dPcxHo2sUxDIu8g3QgEJdRY= -github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869/go.mod h1:Ekp36dRnpXw/yCqJaO+ZrUyxD+3VXMFFr56k5XYrpB4= +github.com/bitly/go-simplejson v0.5.1 h1:xgwPbetQScXt1gh9BmoJ6j9JMr3TElvuIyjR8pgdoow= +github.com/bitly/go-simplejson v0.5.1/go.mod h1:YOPVLzCfwK14b4Sff3oP1AmGhI9T9Vsg84etUnlyp+Q= github.com/cenkalti/backoff/v3 v3.0.0 h1:ske+9nBpD9qZsTBoF41nW5L+AIuFBKMeze18XQ3eG1c= github.com/cenkalti/backoff/v3 v3.0.0/go.mod h1:cIeZDE3IrqwwJl6VUwCN6trj1oXrTS4rc0ij+ULvLYs= github.com/cespare/xxhash/v2 v2.1.1 h1:6MnRN8NT7+YBpUIWxHtefFZOKTAPgGjpQSxqLNn0+qY= From 634eeff1098aa85b97c6dcd400be12714c373124 Mon Sep 17 00:00:00 2001 From: Vadim Alekseev Date: Mon, 31 Jul 2023 18:32:58 +0300 Subject: [PATCH 2/2] CI: Run tests with race flag (#437) * Run tests with race flag * Setup benchmarks * Fix race in the test * Fix getting by index * Fix race in the test * Fix race in the discard plugin tests * Fix races in the journalctl tests * More logs in the unstable test * Fix races & refactor journalctl plugin * Try to fix file's tests for the CI runner * Fix race in the test * Increase capacity when perf mode is enabled --- .github/workflows/ci.yml | 14 +++++- pipeline/pipeline.go | 2 +- .../add_file_name/add_file_name_test.go | 9 ++-- plugin/action/discard/discard_test.go | 8 +-- plugin/input/file/file_test.go | 9 ++-- plugin/input/journalctl/journalctl.go | 50 +++++++++++-------- plugin/input/journalctl/journalctl_test.go | 8 ++- plugin/input/journalctl/reader.go | 17 ++----- plugin/input/k8s/k8s_test.go | 44 ++++++++++------ test/test.go | 8 +-- 10 files changed, 99 insertions(+), 70 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 090c37c48..6e97ff8ff 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -16,9 +16,14 @@ jobs: strategy: fail-fast: false matrix: + flags: [ '' ] go: [ '1.20' ] arch: [ amd64 ] runner: [ ubuntu-latest ] + include: + - arch: amd64 + runner: ubuntu-latest + flags: -race steps: - name: Checkout code uses: actions/checkout@v3 @@ -40,7 +45,7 @@ jobs: GOARCH: ${{ matrix.arch }} GOFLAGS: ${{ matrix.flags }} LOG_LEVEL: error - run: go test -coverprofile=profile.out -covermode=atomic -coverpkg=./... ./... + run: go test -coverprofile=profile.out -covermode=atomic -v -coverpkg=./... ./... - name: Upload artifact uses: actions/upload-artifact@v3 @@ -50,6 +55,13 @@ jobs: if-no-files-found: error retention-days: 1 + - name: Run benchmarks + if: ${{ !contains(matrix.flags, '-race') }} + env: + GOARCH: ${{ matrix.arch }} + GOFLAGS: ${{ matrix.flags }} + run: go test --timeout=15m --benchtime=3x --benchmem --bench="BenchmarkLightJsonReadPar" --run=$^ ./plugin/input/file/... + e2e_test: runs-on: ubuntu-latest steps: diff --git a/pipeline/pipeline.go b/pipeline/pipeline.go index f1749d442..5bc58842a 100644 --- a/pipeline/pipeline.go +++ b/pipeline/pipeline.go @@ -612,7 +612,7 @@ func (p *Pipeline) expandProcs() { } for x := 0; x < int(to-from); x++ { - proc := p.newProc(p.Procs[from].id + x) + proc := p.newProc(p.Procs[from-1].id + x) p.Procs = append(p.Procs, proc) proc.start(p.actionParams, p.logger.Sugar()) } diff --git a/plugin/action/add_file_name/add_file_name_test.go b/plugin/action/add_file_name/add_file_name_test.go index 3d0772a41..13b4b7210 100644 --- a/plugin/action/add_file_name/add_file_name_test.go +++ b/plugin/action/add_file_name/add_file_name_test.go @@ -1,6 +1,7 @@ package add_file_name import ( + "strings" "sync" "testing" @@ -15,9 +16,9 @@ func TestModify(t *testing.T) { wg := &sync.WaitGroup{} wg.Add(2) - outEvents := make([]*pipeline.Event, 0) + outEvents := make([]string, 0) output.SetOutFn(func(e *pipeline.Event) { - outEvents = append(outEvents, e) + outEvents = append(outEvents, strings.Clone(e.Root.Dig("file").AsString())) wg.Done() }) @@ -28,6 +29,6 @@ func TestModify(t *testing.T) { p.Stop() assert.Equal(t, 2, len(outEvents), "wrong out events count") - assert.Equal(t, "my_file", outEvents[0].Root.Dig("file").AsString(), "wrong field value") - assert.Equal(t, "my_file", outEvents[1].Root.Dig("file").AsString(), "wrong field value") + assert.Equal(t, "my_file", outEvents[0], "wrong field value") + assert.Equal(t, "my_file", outEvents[1], "wrong field value") } diff --git a/plugin/action/discard/discard_test.go b/plugin/action/discard/discard_test.go index 5e424c2c4..fe9e92691 100644 --- a/plugin/action/discard/discard_test.go +++ b/plugin/action/discard/discard_test.go @@ -121,10 +121,10 @@ func TestDiscardRegex(t *testing.T) { inEvents++ }) - outEvents := make([]*pipeline.Event, 0) + cntOutEvents := 0 output.SetOutFn(func(e *pipeline.Event) { + cntOutEvents++ wg.Done() - outEvents = append(outEvents, e) }) input.In(0, "test.log", 0, []byte(`{"field1":"0000 one 0000"}`)) @@ -139,7 +139,7 @@ func TestDiscardRegex(t *testing.T) { p.Stop() assert.Equal(t, 7, inEvents, "wrong in events count") - assert.Equal(t, 4, len(outEvents), "wrong out events count") + assert.Equal(t, 4, cntOutEvents, "wrong out events count") } func TestDiscardMatchInvert(t *testing.T) { @@ -164,8 +164,8 @@ func TestDiscardMatchInvert(t *testing.T) { outEvents := make([]*pipeline.Event, 0) output.SetOutFn(func(e *pipeline.Event) { - wg.Done() outEvents = append(outEvents, e) + wg.Done() }) input.In(0, "test", 0, []byte(`{"field1":"not_value1"}`)) diff --git a/plugin/input/file/file_test.go b/plugin/input/file/file_test.go index be2bd5104..7f2eb1845 100644 --- a/plugin/input/file/file_test.go +++ b/plugin/input/file/file_test.go @@ -85,7 +85,7 @@ func pluginConfig(opts ...string) *Config { OffsetsFile: filepath.Join(offsetsDir, offsetsFile), PersistenceMode: "async", OffsetsOp: op, - MaintenanceInterval: "100ms", + MaintenanceInterval: "5s", } _ = cfg.Parse(config, map[string]int{"gomaxprocs": runtime.GOMAXPROCS(0)}) @@ -101,10 +101,6 @@ func renameFile(oldFile string, newFile string) { } func closeFile(f *os.File) { - if err := f.Sync(); err != nil { - panic(err) - } - if err := f.Close(); err != nil { panic(err.Error()) } @@ -477,7 +473,7 @@ func TestReadContinue(t *testing.T) { } for i := range inputEvents { - require.Equal(t, inputEvents[i], outputEvents[i], "wrong event") + require.Equalf(t, inputEvents[i], outputEvents[i], "wrong event, all events: %v", inputEvents) } assertOffsetsAreEqual(t, genOffsetsContent(file, size), getContent(getConfigByPipeline(p).OffsetsFile)) @@ -805,6 +801,7 @@ func TestReadManyPreparedFilesRace(t *testing.T) { if testing.Short() { t.Skip("skip long tests in short mode") } + lineCount := 2 blockCount := 128 * 128 fileCount := 32 diff --git a/plugin/input/journalctl/journalctl.go b/plugin/input/journalctl/journalctl.go index 785f59768..3cfc07400 100644 --- a/plugin/input/journalctl/journalctl.go +++ b/plugin/input/journalctl/journalctl.go @@ -3,11 +3,14 @@ package journalctl import ( + "sync/atomic" + "github.com/ozontech/file.d/fd" "github.com/ozontech/file.d/metric" "github.com/ozontech/file.d/offset" "github.com/ozontech/file.d/pipeline" "github.com/prometheus/client_golang/prometheus" + "go.uber.org/zap" ) /*{ introduction @@ -15,10 +18,12 @@ Reads `journalctl` output. }*/ type Plugin struct { - params *pipeline.InputPluginParams - config *Config - reader *journalReader - offInfo *offsetInfo + params *pipeline.InputPluginParams + config *Config + reader *journalReader + offInfo atomic.Pointer[offsetInfo] + currentOffset int64 + logger *zap.Logger // plugin metrics @@ -51,8 +56,6 @@ type Config struct { type offsetInfo struct { Offset int64 `json:"offset"` Cursor string `json:"cursor"` - - current int64 } func (o *offsetInfo) set(cursor string) { @@ -61,8 +64,8 @@ func (o *offsetInfo) set(cursor string) { } func (p *Plugin) Write(bytes []byte) (int, error) { - p.params.Controller.In(0, "journalctl", p.offInfo.current, bytes, false) - p.offInfo.current++ + p.params.Controller.In(0, "journalctl", p.currentOffset, bytes, false) + p.currentOffset++ return len(bytes), nil } @@ -80,24 +83,26 @@ func Factory() (pipeline.AnyPlugin, pipeline.AnyConfig) { func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.InputPluginParams) { p.params = params p.config = config.(*Config) + p.logger = params.Logger.Desugar() p.registerMetrics(params.MetricCtl) - p.offInfo = &offsetInfo{} - if err := offset.LoadYAML(p.config.OffsetsFile, p.offInfo); err != nil { + offInfo := &offsetInfo{} + if err := offset.LoadYAML(p.config.OffsetsFile, offInfo); err != nil { p.offsetErrorsMetric.WithLabelValues().Inc() - p.params.Logger.Error("can't load offset file: %s", err.Error()) + p.logger.Error("can't load offset file", zap.Error(err)) } + p.offInfo.Store(offInfo) readConfig := &journalReaderConfig{ output: p, - cursor: p.offInfo.Cursor, + cursor: offInfo.Cursor, maxLines: p.config.MaxLines, - logger: p.params.Logger, + logger: p.logger, } p.reader = newJournalReader(readConfig, p.readerErrorsMetric) p.reader.args = append(p.reader.args, p.config.JournalArgs...) if err := p.reader.start(); err != nil { - p.params.Logger.Error("failure during start: %s", err.Error()) + p.logger.Fatal("failure during start", zap.Error(err)) } } @@ -111,25 +116,28 @@ func (p *Plugin) Stop() { err := p.reader.stop() if err != nil { p.journalCtlStopErrorMetric.WithLabelValues().Inc() - p.params.Logger.Error("can't stop journalctl cmd: %s", err.Error()) + p.logger.Error("can't stop journalctl cmd", zap.Error(err)) } - if err := offset.SaveYAML(p.config.OffsetsFile, p.offInfo); err != nil { + offsets := *p.offInfo.Load() + if err := offset.SaveYAML(p.config.OffsetsFile, offsets); err != nil { p.offsetErrorsMetric.WithLabelValues().Inc() - p.params.Logger.Error("can't save offset file: %s", err.Error()) + p.logger.Error("can't save offset file", zap.Error(err)) } } func (p *Plugin) Commit(event *pipeline.Event) { - p.offInfo.set(pipeline.CloneString(event.Root.Dig("__CURSOR").AsString())) + offInfo := *p.offInfo.Load() + offInfo.set(pipeline.CloneString(event.Root.Dig("__CURSOR").AsString())) + p.offInfo.Store(&offInfo) - if err := offset.SaveYAML(p.config.OffsetsFile, p.offInfo); err != nil { + if err := offset.SaveYAML(p.config.OffsetsFile, offInfo); err != nil { p.offsetErrorsMetric.WithLabelValues().Inc() - p.params.Logger.Error("can't save offset file: %s", err.Error()) + p.logger.Error("can't save offset file", zap.Error(err)) } } // PassEvent decides pass or discard event. -func (p *Plugin) PassEvent(event *pipeline.Event) bool { +func (p *Plugin) PassEvent(_ *pipeline.Event) bool { return true } diff --git a/plugin/input/journalctl/journalctl_test.go b/plugin/input/journalctl/journalctl_test.go index 6e347a751..923d157af 100644 --- a/plugin/input/journalctl/journalctl_test.go +++ b/plugin/input/journalctl/journalctl_test.go @@ -4,6 +4,7 @@ package journalctl import ( "path/filepath" + "strings" "sync" "testing" @@ -56,8 +57,11 @@ func TestPipeline(t *testing.T) { wg.Add(lines) setOutput(p, func(event *pipeline.Event) { assert.Equal(t, int(event.Offset), total) - wg.Done() total++ + if total > lines { + t.Fatal("'total' more than lines") + } + wg.Done() }) p.Start() @@ -89,7 +93,7 @@ func TestOffsets(t *testing.T) { setInput(p, config) setOutput(p, func(event *pipeline.Event) { - cursors[event.Root.Dig("__CURSOR").AsString()]++ + cursors[strings.Clone(event.Root.Dig("__CURSOR").AsString())]++ wg.Done() }) diff --git a/plugin/input/journalctl/reader.go b/plugin/input/journalctl/reader.go index c28920d24..26ecfd38f 100644 --- a/plugin/input/journalctl/reader.go +++ b/plugin/input/journalctl/reader.go @@ -6,20 +6,17 @@ import ( "os/exec" "strings" - "github.com/ozontech/file.d/logger" "github.com/prometheus/client_golang/prometheus" "go.uber.org/zap" ) -//nolint:unused type journalReaderConfig struct { output io.Writer cursor string - logger *zap.SugaredLogger + logger *zap.Logger maxLines int } -//nolint:unused type journalReader struct { config *journalReaderConfig cmd *exec.Cmd @@ -29,7 +26,6 @@ type journalReader struct { readerErrorsMetric *prometheus.CounterVec } -//nolint:unused func (r *journalReader) readLines(rd io.Reader, config *journalReaderConfig) { reader := bufio.NewReaderSize(rd, 1024*1024*10) // max message size totalLines := 0 @@ -39,7 +35,7 @@ func (r *journalReader) readLines(rd io.Reader, config *journalReaderConfig) { if config.cursor != "" { _, _, err := reader.ReadLine() if err != nil { - logger.Fatalf(err.Error()) + r.config.logger.Fatal(err.Error()) } } @@ -50,13 +46,13 @@ func (r *journalReader) readLines(rd io.Reader, config *journalReaderConfig) { } if err != nil { r.readerErrorsMetric.WithLabelValues().Inc() - config.logger.Error(err) + r.config.logger.Error(err.Error()) continue } _, err = config.output.Write(bytes) if err != nil { r.readerErrorsMetric.WithLabelValues().Inc() - config.logger.Error(err) + r.config.logger.Error(err.Error()) } totalLines++ @@ -66,7 +62,6 @@ func (r *journalReader) readLines(rd io.Reader, config *journalReaderConfig) { } } -//nolint:deadcode,unused func newJournalReader(config *journalReaderConfig, readerErrorsCounter *prometheus.CounterVec) *journalReader { res := &journalReader{ config: config, @@ -83,9 +78,8 @@ func newJournalReader(config *journalReaderConfig, readerErrorsCounter *promethe return res } -//nolint:unused func (r *journalReader) start() error { - r.config.logger.Infof(`running "journalctl %s"`, strings.Join(r.args, " ")) + r.config.logger.Info(`running journalctl`, zap.String("args", strings.Join(r.args, " "))) r.cmd = exec.Command("journalctl", r.args...) out, err := r.cmd.StdoutPipe() @@ -102,7 +96,6 @@ func (r *journalReader) start() error { return nil } -//nolint:unused func (r *journalReader) stop() error { return r.cmd.Process.Kill() } diff --git a/plugin/input/k8s/k8s_test.go b/plugin/input/k8s/k8s_test.go index cd378b56f..ae51a4459 100644 --- a/plugin/input/k8s/k8s_test.go +++ b/plugin/input/k8s/k8s_test.go @@ -3,6 +3,7 @@ package k8s import ( "fmt" "os" + "strings" "sync" "testing" "time" @@ -78,23 +79,33 @@ func TestEnrichment(t *testing.T) { putMeta(podInfo) selfNodeName = "node_1" - var event *pipeline.Event = nil - filename := getLogFilename("/k8s-logs", item) + var ( + k8sPod string + k8sNamespace string + k8sContainer string + k8sNode string + k8sNodeLabelZone string + ) input.SetCommitFn(func(e *pipeline.Event) { - event = e + k8sPod = strings.Clone(e.Root.Dig("k8s_pod").AsString()) + k8sNamespace = strings.Clone(e.Root.Dig("k8s_namespace").AsString()) + k8sContainer = strings.Clone(e.Root.Dig("k8s_container").AsString()) + k8sNode = strings.Clone(e.Root.Dig("k8s_node").AsString()) + k8sNodeLabelZone = strings.Clone(e.Root.Dig("k8s_node_label_zone").AsString()) wg.Done() }) + filename := getLogFilename("/k8s-logs", item) input.In(0, filename, 0, []byte(`{"time":"time","log":"log\n"}`)) wg.Wait() p.Stop() - assert.Equal(t, "advanced-logs-checker-1566485760-trtrq", event.Root.Dig("k8s_pod").AsString(), "wrong event field") - assert.Equal(t, "sre", event.Root.Dig("k8s_namespace").AsString(), "wrong event field") - assert.Equal(t, "duty-bot", event.Root.Dig("k8s_container").AsString(), "wrong event field") - assert.Equal(t, "node_1", event.Root.Dig("k8s_node").AsString(), "wrong event field") - assert.Equal(t, "z34", event.Root.Dig("k8s_node_label_zone").AsString(), "wrong event field") + assert.Equal(t, "advanced-logs-checker-1566485760-trtrq", k8sPod, "wrong event field") + assert.Equal(t, "sre", k8sNamespace, "wrong event field") + assert.Equal(t, "duty-bot", k8sContainer, "wrong event field") + assert.Equal(t, "node_1", k8sNode, "wrong event field") + assert.Equal(t, "z34", k8sNodeLabelZone, "wrong event field") } func TestAllowedLabels(t *testing.T) { @@ -153,10 +164,11 @@ func TestK8SJoin(t *testing.T) { podInfo := getPodInfo(item, true) putMeta(podInfo) - outEvents := make([]*pipeline.Event, 0) + outLogs := make([]string, 0) + outOffsets := make([]int64, 0) output.SetOutFn(func(e *pipeline.Event) { - event := *e - outEvents = append(outEvents, &event) + outLogs = append(outLogs, strings.Clone(e.Root.Dig("log").AsEscapedString())) + outOffsets = append(outOffsets, e.Offset) wg.Done() }) @@ -173,7 +185,7 @@ func TestK8SJoin(t *testing.T) { wg.Wait() p.Stop() - assert.Equal(t, 4, len(outEvents)) + assert.Equal(t, 4, len(outLogs)) logs := []string{"\"one line log 1\\n\"", "\"error joined\\n\"", "\"this is joined log 2\\n\"", "\"one line log 3\\n\""} offsets := []int64{10, 70, 60, 80} @@ -198,10 +210,10 @@ func TestK8SJoin(t *testing.T) { offsets = offsets[:len(offsets)-1] } - check(outEvents[0].Root.Dig("log").AsEscapedString(), outEvents[0].Offset) - check(outEvents[1].Root.Dig("log").AsEscapedString(), outEvents[1].Offset) - check(outEvents[2].Root.Dig("log").AsEscapedString(), outEvents[2].Offset) - check(outEvents[3].Root.Dig("log").AsEscapedString(), outEvents[3].Offset) + check(outLogs[0], outOffsets[0]) + check(outLogs[1], outOffsets[1]) + check(outLogs[2], outOffsets[2]) + check(outLogs[3], outOffsets[3]) } func TestCleanUp(t *testing.T) { diff --git a/test/test.go b/test/test.go index 382ab125d..5483f7fcf 100644 --- a/test/test.go +++ b/test/test.go @@ -74,7 +74,7 @@ func startCasePipeline(act func(pipeline *pipeline.Pipeline), out func(event *pi if x.Load() <= 0 { break } - if time.Since(t) > time.Second*20 { + if time.Since(t) > time.Minute*2 { panic("too long act") } } @@ -108,13 +108,15 @@ func NewPipeline(actions []*pipeline.ActionPluginStaticInfo, pipelineOpts ...str eventTimeout = 10 * time.Millisecond } + capacity := 256 if perf { parallel = true + capacity = 20000 } settings := &pipeline.Settings{ - Capacity: 4096, - MaintenanceInterval: time.Second * 10, + Capacity: capacity, + MaintenanceInterval: time.Second * 5, EventTimeout: eventTimeout, AntispamThreshold: 0, AvgEventSize: 2048,