From 55ec72d5dfcc6f534133ccec937f685284f838d6 Mon Sep 17 00:00:00 2001 From: Dmitry Romanov Date: Thu, 1 Feb 2024 15:40:37 +0700 Subject: [PATCH] Output retry exponentially (#526) --- go.mod | 1 + go.sum | 2 + pipeline/backoff.go | 76 +++++++++++++ pipeline/backoff_test.go | 62 ++++++++++ plugin/README.md | 14 +-- plugin/output/README.md | 14 +-- plugin/output/clickhouse/README.md | 15 ++- plugin/output/clickhouse/clickhouse.go | 73 ++++++++---- plugin/output/elasticsearch/README.md | 26 +++++ plugin/output/elasticsearch/elasticsearch.go | 75 +++++++++--- plugin/output/gelf/README.md | 26 +++++ plugin/output/gelf/gelf.go | 95 ++++++++++++---- plugin/output/kafka/README.md | 26 +++++ plugin/output/kafka/kafka.go | 73 ++++++++++-- plugin/output/postgres/README.idoc.md | 3 + plugin/output/postgres/README.md | 48 +++++++- plugin/output/postgres/postgres.go | 113 +++++++++++++++---- plugin/output/postgres/postgres_test.go | 7 +- plugin/output/s3/README.md | 40 +++++-- plugin/output/s3/TESTME.md | 8 +- plugin/output/s3/s3.go | 83 ++++++++++---- plugin/output/splunk/README.md | 26 +++++ plugin/output/splunk/splunk.go | 78 ++++++++++--- 23 files changed, 822 insertions(+), 162 deletions(-) create mode 100644 pipeline/backoff.go create mode 100644 pipeline/backoff_test.go diff --git a/go.mod b/go.mod index 87ec647af..590f36e84 100644 --- a/go.mod +++ b/go.mod @@ -13,6 +13,7 @@ require ( github.com/alecthomas/units v0.0.0-20211218093645-b94a6e3cc137 github.com/alicebob/miniredis/v2 v2.30.5 github.com/bitly/go-simplejson v0.5.1 + github.com/cenkalti/backoff/v4 v4.2.1 github.com/cespare/xxhash/v2 v2.2.0 github.com/euank/go-kmsg-parser v2.0.0+incompatible github.com/go-redis/redis v6.15.9+incompatible diff --git a/go.sum b/go.sum index aeba12cfb..5312332e4 100644 --- a/go.sum +++ b/go.sum @@ -33,6 +33,8 @@ github.com/bitly/go-simplejson v0.5.1 h1:xgwPbetQScXt1gh9BmoJ6j9JMr3TElvuIyjR8pg github.com/bitly/go-simplejson v0.5.1/go.mod h1:YOPVLzCfwK14b4Sff3oP1AmGhI9T9Vsg84etUnlyp+Q= github.com/cenkalti/backoff/v3 v3.0.0 h1:ske+9nBpD9qZsTBoF41nW5L+AIuFBKMeze18XQ3eG1c= github.com/cenkalti/backoff/v3 v3.0.0/go.mod h1:cIeZDE3IrqwwJl6VUwCN6trj1oXrTS4rc0ij+ULvLYs= +github.com/cenkalti/backoff/v4 v4.2.1 h1:y4OZtCnogmCPw98Zjyt5a6+QwPLGkiQsYW5oUqylYbM= +github.com/cenkalti/backoff/v4 v4.2.1/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE= github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44= github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= diff --git a/pipeline/backoff.go b/pipeline/backoff.go new file mode 100644 index 000000000..af3401366 --- /dev/null +++ b/pipeline/backoff.go @@ -0,0 +1,76 @@ +package pipeline + +import ( + "context" + "time" + + "github.com/cenkalti/backoff/v4" +) + +type RetriableBatcher struct { + outFn RetriableBatcherOutFn + backoff backoff.BackOff + batcher *Batcher + onRetryError func(err error) +} + +type RetriableBatcherOutFn func(*WorkerData, *Batch) error + +type BackoffOpts struct { + MinRetention time.Duration + Multiplier float64 + AttemptNum uint64 +} + +func NewRetriableBatcher(batcherOpts *BatcherOptions, batcherOutFn RetriableBatcherOutFn, opts BackoffOpts, onError func(err error)) *RetriableBatcher { + boff := GetBackoff( + opts.MinRetention, + opts.Multiplier, + opts.AttemptNum, + ) + + batcherBackoff := &RetriableBatcher{ + outFn: batcherOutFn, + backoff: boff, + onRetryError: onError, + } + batcherBackoff.setBatcher(batcherOpts) + return batcherBackoff +} + +func (b *RetriableBatcher) setBatcher(batcherOpts *BatcherOptions) { + batcherOpts.OutFn = b.Out + b.batcher = NewBatcher(*batcherOpts) +} + +func (b *RetriableBatcher) Out(data *WorkerData, batch *Batch) { + b.backoff.Reset() + + err := backoff.Retry(func() error { + return b.outFn(data, batch) + }, b.backoff) + + if err != nil { + b.onRetryError(err) + } +} + +func (b *RetriableBatcher) Start(ctx context.Context) { + b.batcher.Start(ctx) +} + +func (b *RetriableBatcher) Stop() { + b.batcher.Stop() +} + +func (b *RetriableBatcher) Add(event *Event) { + b.batcher.Add(event) +} + +func GetBackoff(minRetention time.Duration, multiplier float64, attemptNum uint64) backoff.BackOff { + expBackoff := backoff.NewExponentialBackOff() + expBackoff.InitialInterval = minRetention + expBackoff.Multiplier = multiplier + expBackoff.RandomizationFactor = 0.5 + return backoff.WithMaxRetries(expBackoff, attemptNum) +} diff --git a/pipeline/backoff_test.go b/pipeline/backoff_test.go new file mode 100644 index 000000000..1d73a2bf1 --- /dev/null +++ b/pipeline/backoff_test.go @@ -0,0 +1,62 @@ +package pipeline + +import ( + "errors" + "testing" + + "github.com/ozontech/file.d/metric" + "github.com/prometheus/client_golang/prometheus" + "github.com/stretchr/testify/assert" + "go.uber.org/atomic" +) + +func TestBackoff(t *testing.T) { + errorCount := &atomic.Int32{} + errorCountBefore := errorCount.Load() + + eventCount := &atomic.Int32{} + eventCountBefore := eventCount.Load() + + errorFn := func(err error) { + errorCount.Inc() + } + + batcherBackoff := NewRetriableBatcher( + &BatcherOptions{ + MetricCtl: metric.NewCtl("", prometheus.NewRegistry()), + }, + func(workerData *WorkerData, batch *Batch) error { + eventCount.Inc() + return nil + }, + BackoffOpts{AttemptNum: 3}, + errorFn, + ) + + batcherBackoff.Out(nil, nil) + + assert.Equal(t, errorCountBefore, errorCount.Load(), "wrong error count") + assert.Equal(t, eventCountBefore+1, eventCount.Load(), "wrong event count") +} + +func TestBackoffWithError(t *testing.T) { + errorCount := &atomic.Int32{} + prevValue := errorCount.Load() + errorFn := func(err error) { + errorCount.Inc() + } + + batcherBackoff := NewRetriableBatcher( + &BatcherOptions{ + MetricCtl: metric.NewCtl("", prometheus.NewRegistry()), + }, + func(workerData *WorkerData, batch *Batch) error { + return errors.New("some error") + }, + BackoffOpts{AttemptNum: 3}, + errorFn, + ) + + batcherBackoff.Out(nil, nil) + assert.Equal(t, prevValue+1, errorCount.Load(), "wrong error count") +} diff --git a/plugin/README.md b/plugin/README.md index 3f8e0e06d..822431a6f 100755 --- a/plugin/README.md +++ b/plugin/README.md @@ -733,9 +733,9 @@ pipelines: type: http emulate_mode: "no" address: ":9200" - actions: - - type: json_decode - field: message + actions: + - type: json_decode + field: message output: type: s3 file_config: @@ -759,12 +759,12 @@ pipelines: type: http emulate_mode: "no" address: ":9200" - actions: - - type: json_decode - field: message + actions: + - type: json_decode + field: message output: type: s3 - file_plugin: + file_config: retention_interval: 10s # endpoint, access_key, secret_key, bucket are required. endpoint: "s3.fake_host.org:80" diff --git a/plugin/output/README.md b/plugin/output/README.md index daff2a16b..9b05cc4ef 100755 --- a/plugin/output/README.md +++ b/plugin/output/README.md @@ -72,9 +72,9 @@ pipelines: type: http emulate_mode: "no" address: ":9200" - actions: - - type: json_decode - field: message + actions: + - type: json_decode + field: message output: type: s3 file_config: @@ -98,12 +98,12 @@ pipelines: type: http emulate_mode: "no" address: ":9200" - actions: - - type: json_decode - field: message + actions: + - type: json_decode + field: message output: type: s3 - file_plugin: + file_config: retention_interval: 10s # endpoint, access_key, secret_key, bucket are required. endpoint: "s3.fake_host.org:80" diff --git a/plugin/output/clickhouse/README.md b/plugin/output/clickhouse/README.md index cc07b5180..a0a7ce227 100644 --- a/plugin/output/clickhouse/README.md +++ b/plugin/output/clickhouse/README.md @@ -111,7 +111,14 @@ If the strict mode is enabled file.d fails (exit with code 1) in above examples. **`retry`** *`int`* *`default=10`* Retries of insertion. If File.d cannot insert for this number of attempts, -File.d will fall with non-zero exit code. +File.d will fall with non-zero exit code or skip message (see fatal_on_failed_insert). + +
+ +**`fatal_on_failed_insert`** *`bool`* *`default=false`* + +After an insert error, fall with a non-zero exit code or not +**Experimental feature**
@@ -128,6 +135,12 @@ Retention milliseconds for retry to DB.
+**`retention_exponentially_multiplier`** *`int`* *`default=2`* + +Multiplier for exponential increase of retention between retries + +
+ **`insert_timeout`** *`cfg.Duration`* *`default=10s`* Timeout for each insert request. diff --git a/plugin/output/clickhouse/clickhouse.go b/plugin/output/clickhouse/clickhouse.go index 3e82a10b5..25ef5e8ac 100644 --- a/plugin/output/clickhouse/clickhouse.go +++ b/plugin/output/clickhouse/clickhouse.go @@ -20,6 +20,7 @@ import ( "github.com/prometheus/client_golang/prometheus" "go.uber.org/atomic" "go.uber.org/zap" + "go.uber.org/zap/zapcore" ) /*{ introduction @@ -43,7 +44,7 @@ type Plugin struct { logger *zap.Logger config *Config - batcher *pipeline.Batcher + batcher *pipeline.RetriableBatcher ctx context.Context cancelFunc context.CancelFunc @@ -227,9 +228,15 @@ type Config struct { // > @3@4@5@6 // > // > Retries of insertion. If File.d cannot insert for this number of attempts, - // > File.d will fall with non-zero exit code. + // > File.d will fall with non-zero exit code or skip message (see fatal_on_failed_insert). Retry int `json:"retry" default:"10"` // * + // > @3@4@5@6 + // > + // > After an insert error, fall with a non-zero exit code or not + // > **Experimental feature** + FatalOnFailedInsert bool `json:"fatal_on_failed_insert" default:"false"` // * + // > @3@4@5@6 // > // > Additional settings to the Clickhouse. @@ -242,6 +249,11 @@ type Config struct { Retention cfg.Duration `json:"retention" default:"50ms" parse:"duration"` // * Retention_ time.Duration + // > @3@4@5@6 + // > + // > Multiplier for exponential increase of retention between retries + RetentionExponentMultiplier int `json:"retention_exponentially_multiplier" default:"2"` // * + // > @3@4@5@6 // > // > Timeout for each insert request. @@ -328,9 +340,6 @@ func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.OutputPluginP p.registerMetrics(params.MetricCtl) p.ctx, p.cancelFunc = context.WithCancel(context.Background()) - if p.config.Retry < 1 { - p.logger.Fatal("'retry' can't be <1") - } if p.config.Retention_ < 1 { p.logger.Fatal("'retention' can't be <1") } @@ -405,17 +414,42 @@ func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.OutputPluginP } } - p.batcher = pipeline.NewBatcher(pipeline.BatcherOptions{ + batcherOpts := pipeline.BatcherOptions{ PipelineName: params.PipelineName, OutputType: outPluginType, - OutFn: p.out, Controller: params.Controller, Workers: p.config.WorkersCount_, BatchSizeCount: p.config.BatchSize_, BatchSizeBytes: p.config.BatchSizeBytes_, FlushTimeout: p.config.BatchFlushTimeout_, MetricCtl: params.MetricCtl, - }) + } + + backoffOpts := pipeline.BackoffOpts{ + MinRetention: p.config.Retention_, + Multiplier: float64(p.config.RetentionExponentMultiplier), + AttemptNum: uint64(p.config.Retry), + } + + onError := func(err error) { + var level zapcore.Level + if p.config.FatalOnFailedInsert { + level = zapcore.FatalLevel + } else { + level = zapcore.ErrorLevel + } + + p.logger.Log(level, "can't insert to the table", zap.Error(err), + zap.Int("retries", p.config.Retry), + zap.String("table", p.config.Table)) + } + + p.batcher = pipeline.NewRetriableBatcher( + &batcherOpts, + p.out, + backoffOpts, + onError, + ) p.batcher.Start(p.ctx) } @@ -443,7 +477,7 @@ func (d data) reset() { } } -func (p *Plugin) out(workerData *pipeline.WorkerData, batch *pipeline.Batch) { +func (p *Plugin) out(workerData *pipeline.WorkerData, batch *pipeline.Batch) error { if *workerData == nil { // we don't check the error, schema already validated in the Start columns, _ := inferInsaneColInputs(p.config.Columns) @@ -484,22 +518,23 @@ func (p *Plugin) out(workerData *pipeline.WorkerData, batch *pipeline.Batch) { }) var err error - for try := 0; try < p.config.Retry; try++ { + for i := range p.instances { requestID := p.requestID.Inc() - clickhouse := p.getInstance(requestID, try) - err = p.do(clickhouse, data.input) + clickhouse := p.getInstance(requestID, i) + err := p.do(clickhouse, data.input) if err == nil { - break + return nil } - p.insertErrorsMetric.Inc() - time.Sleep(p.config.Retention_) - p.logger.Error("an attempt to insert a batch failed", zap.Error(err)) } if err != nil { - p.logger.Fatal("can't insert to the table", zap.Error(err), - zap.Int("retries", p.config.Retry), - zap.String("table", p.config.Table)) + p.insertErrorsMetric.Inc() + p.logger.Error( + "an attempt to insert a batch failed", + zap.Error(err), + ) } + + return err } func (p *Plugin) do(clickhouse Clickhouse, queryInput proto.Input) error { diff --git a/plugin/output/elasticsearch/README.md b/plugin/output/elasticsearch/README.md index a7908aa47..b8587e752 100755 --- a/plugin/output/elasticsearch/README.md +++ b/plugin/output/elasticsearch/README.md @@ -94,5 +94,31 @@ Operation type to be used in batch requests. It can be `index` or `create`. Defa
+**`retry`** *`int`* *`default=10`* + +Retries of insertion. If File.d cannot insert for this number of attempts, +File.d will fall with non-zero exit code or skip message (see fatal_on_failed_insert). + +
+ +**`fatal_on_failed_insert`** *`bool`* *`default=false`* + +After an insert error, fall with a non-zero exit code or not +**Experimental feature** + +
+ +**`retention`** *`cfg.Duration`* *`default=1s`* + +Retention milliseconds for retry to DB. + +
+ +**`retention_exponentially_multiplier`** *`int`* *`default=2`* + +Multiplier for exponential increase of retention between retries + +
+
*Generated using [__insane-doc__](https://github.com/vitkovskii/insane-doc)* \ No newline at end of file diff --git a/plugin/output/elasticsearch/elasticsearch.go b/plugin/output/elasticsearch/elasticsearch.go index 3dd9e9cc3..4553cf59b 100644 --- a/plugin/output/elasticsearch/elasticsearch.go +++ b/plugin/output/elasticsearch/elasticsearch.go @@ -19,6 +19,7 @@ import ( "github.com/valyala/fasthttp" insaneJSON "github.com/vitkovskii/insane-json" "go.uber.org/zap" + "go.uber.org/zap/zapcore" ) /*{ introduction @@ -29,7 +30,6 @@ If a network error occurs, the batch will infinitely try to be delivered to the const ( outPluginType = "elasticsearch" NDJSONContentType = "application/x-ndjson" - retryDelay = time.Second ) var ( @@ -46,7 +46,7 @@ type Plugin struct { avgEventSize int time string headerPrefix string - batcher *pipeline.Batcher + batcher *pipeline.RetriableBatcher controller pipeline.OutputPluginController mu *sync.Mutex @@ -139,6 +139,29 @@ type Config struct { // > Operation type to be used in batch requests. It can be `index` or `create`. Default is `index`. // > > Check out [_bulk API doc](https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html) for details. BatchOpType string `json:"batch_op_type" default:"index" options:"index|create"` // * + + // > @3@4@5@6 + // > + // > Retries of insertion. If File.d cannot insert for this number of attempts, + // > File.d will fall with non-zero exit code or skip message (see fatal_on_failed_insert). + Retry int `json:"retry" default:"10"` // * + + // > @3@4@5@6 + // > + // > After an insert error, fall with a non-zero exit code or not + // > **Experimental feature** + FatalOnFailedInsert bool `json:"fatal_on_failed_insert" default:"false"` // * + + // > @3@4@5@6 + // > + // > Retention milliseconds for retry to DB. + Retention cfg.Duration `json:"retention" default:"1s" parse:"duration"` // * + Retention_ time.Duration + + // > @3@4@5@6 + // > + // > Multiplier for exponential increase of retention between retries + RetentionExponentMultiplier int `json:"retention_exponentially_multiplier" default:"2"` // * } type data struct { @@ -203,10 +226,10 @@ func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.OutputPluginP p.maintenance(nil) p.logger.Info("starting batcher", zap.Duration("timeout", p.config.BatchFlushTimeout_)) - p.batcher = pipeline.NewBatcher(pipeline.BatcherOptions{ + + batcherOpts := pipeline.BatcherOptions{ PipelineName: params.PipelineName, OutputType: outPluginType, - OutFn: p.out, MaintenanceFn: p.maintenance, Controller: p.controller, Workers: p.config.WorkersCount_, @@ -215,7 +238,33 @@ func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.OutputPluginP FlushTimeout: p.config.BatchFlushTimeout_, MaintenanceInterval: time.Minute, MetricCtl: params.MetricCtl, - }) + } + + backoffOpts := pipeline.BackoffOpts{ + MinRetention: p.config.Retention_, + Multiplier: float64(p.config.RetentionExponentMultiplier), + AttemptNum: uint64(p.config.Retry), + } + + onError := func(err error) { + var level zapcore.Level + if p.config.FatalOnFailedInsert { + level = zapcore.FatalLevel + } else { + level = zapcore.ErrorLevel + } + + p.logger.Log(level, "can't send to the elastic", zap.Error(err), + zap.Int("retries", p.config.Retry), + ) + } + + p.batcher = pipeline.NewRetriableBatcher( + &batcherOpts, + p.out, + backoffOpts, + onError, + ) ctx, cancel := context.WithCancel(context.Background()) p.cancel = cancel @@ -237,7 +286,7 @@ func (p *Plugin) registerMetrics(ctl *metric.Ctl) { p.indexingErrorsMetric = ctl.RegisterCounter("output_elasticsearch_index_error", "Number of elasticsearch indexing errors") } -func (p *Plugin) out(workerData *pipeline.WorkerData, batch *pipeline.Batch) { +func (p *Plugin) out(workerData *pipeline.WorkerData, batch *pipeline.Batch) error { if *workerData == nil { *workerData = &data{ outBuf: make([]byte, 0, p.config.BatchSize_*p.avgEventSize), @@ -255,14 +304,12 @@ func (p *Plugin) out(workerData *pipeline.WorkerData, batch *pipeline.Batch) { data.outBuf = p.appendEvent(data.outBuf, event) }) - for { - if err := p.send(data.outBuf); err != nil { - p.sendErrorMetric.Inc() - p.logger.Error("can't send to the elastic, will try other endpoint", zap.Error(err)) - } else { - break - } + err := p.send(data.outBuf) + if err != nil { + p.sendErrorMetric.Inc() + p.logger.Error("can't send to the elastic, will try other endpoint", zap.Error(err)) } + return err } func (p *Plugin) send(body []byte) error { @@ -279,14 +326,12 @@ func (p *Plugin) send(body []byte) error { p.setAuthHeader(req) if err := p.client.DoTimeout(req, resp, p.config.ConnectionTimeout_); err != nil { - time.Sleep(retryDelay) return fmt.Errorf("can't send batch to %s: %s", endpoint.String(), err.Error()) } respContent := resp.Body() if statusCode := resp.Header.StatusCode(); statusCode < http.StatusOK || statusCode > http.StatusAccepted { - time.Sleep(retryDelay) return fmt.Errorf("response status from %s isn't OK: status=%d, body=%s", endpoint.String(), statusCode, string(respContent)) } diff --git a/plugin/output/gelf/README.md b/plugin/output/gelf/README.md index e3073a300..41ff37bc6 100755 --- a/plugin/output/gelf/README.md +++ b/plugin/output/gelf/README.md @@ -118,5 +118,31 @@ After this timeout the batch will be sent even if batch isn't completed.
+**`retry`** *`int`* *`default=0`* + +Retries of insertion. If File.d cannot insert for this number of attempts, +File.d will fall with non-zero exit code or skip message (see fatal_on_failed_insert). + +
+ +**`fatal_on_failed_insert`** *`bool`* *`default=false`* + +After an insert error, fall with a non-zero exit code or not +**Experimental feature** + +
+ +**`retention`** *`cfg.Duration`* *`default=1s`* + +Retention milliseconds for retry to DB. + +
+ +**`retention_exponentially_multiplier`** *`int`* *`default=2`* + +Multiplier for exponential increase of retention between retries + +
+
*Generated using [__insane-doc__](https://github.com/vitkovskii/insane-doc)* \ No newline at end of file diff --git a/plugin/output/gelf/gelf.go b/plugin/output/gelf/gelf.go index 1abf1fdbd..95757151a 100644 --- a/plugin/output/gelf/gelf.go +++ b/plugin/output/gelf/gelf.go @@ -12,6 +12,7 @@ import ( "github.com/prometheus/client_golang/prometheus" insaneJSON "github.com/vitkovskii/insane-json" "go.uber.org/zap" + "go.uber.org/zap/zapcore" ) /*{ introduction @@ -41,7 +42,7 @@ type Plugin struct { config *Config logger *zap.SugaredLogger avgEventSize int - batcher *pipeline.Batcher + batcher *pipeline.RetriableBatcher controller pipeline.OutputPluginController // plugin metrics @@ -144,6 +145,29 @@ type Config struct { BatchFlushTimeout cfg.Duration `json:"batch_flush_timeout" default:"200ms" parse:"duration"` // * BatchFlushTimeout_ time.Duration + // > @3@4@5@6 + // > + // > Retries of insertion. If File.d cannot insert for this number of attempts, + // > File.d will fall with non-zero exit code or skip message (see fatal_on_failed_insert). + Retry int `json:"retry" default:"0"` // * + + // > @3@4@5@6 + // > + // > After an insert error, fall with a non-zero exit code or not + // > **Experimental feature** + FatalOnFailedInsert bool `json:"fatal_on_failed_insert" default:"false"` // * + + // > @3@4@5@6 + // > + // > Retention milliseconds for retry to DB. + Retention cfg.Duration `json:"retention" default:"1s" parse:"duration"` // * + Retention_ time.Duration + + // > @3@4@5@6 + // > + // > Multiplier for exponential increase of retention between retries + RetentionExponentMultiplier int `json:"retention_exponentially_multiplier" default:"2"` // * + // fields converted to extra fields GELF format hostField string shortMessageField string @@ -190,10 +214,9 @@ func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.OutputPluginP p.config.timestampFieldFormat = format p.config.levelField = pipeline.ByteToStringUnsafe(p.formatExtraField(nil, p.config.LevelField)) - p.batcher = pipeline.NewBatcher(pipeline.BatcherOptions{ + batcherOpts := pipeline.BatcherOptions{ PipelineName: params.PipelineName, OutputType: outPluginType, - OutFn: p.out, MaintenanceFn: p.maintenance, Controller: p.controller, Workers: p.config.WorkersCount_, @@ -202,7 +225,33 @@ func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.OutputPluginP FlushTimeout: p.config.BatchFlushTimeout_, MaintenanceInterval: p.config.ReconnectInterval_, MetricCtl: params.MetricCtl, - }) + } + + backoffOpts := pipeline.BackoffOpts{ + MinRetention: p.config.Retention_, + Multiplier: float64(p.config.RetentionExponentMultiplier), + AttemptNum: uint64(p.config.Retry), + } + + onError := func(err error) { + var level zapcore.Level + if p.config.FatalOnFailedInsert { + level = zapcore.FatalLevel + } else { + level = zapcore.ErrorLevel + } + + p.logger.Desugar().Log(level, "can't send to gelf", zap.Error(err), + zap.Int("retries", p.config.Retry), + ) + } + + p.batcher = pipeline.NewRetriableBatcher( + &batcherOpts, + p.out, + backoffOpts, + onError, + ) p.batcher.Start(context.TODO()) } @@ -219,7 +268,7 @@ func (p *Plugin) registerMetrics(ctl *metric.Ctl) { p.sendErrorMetric = ctl.RegisterCounter("output_gelf_send_error", "Total GELF send errors") } -func (p *Plugin) out(workerData *pipeline.WorkerData, batch *pipeline.Batch) { +func (p *Plugin) out(workerData *pipeline.WorkerData, batch *pipeline.Batch) error { if *workerData == nil { *workerData = &data{ outBuf: make([]byte, 0, p.config.BatchSize_*p.avgEventSize), @@ -245,32 +294,30 @@ func (p *Plugin) out(workerData *pipeline.WorkerData, batch *pipeline.Batch) { data.outBuf = outBuf data.encodeBuf = encodeBuf - for { - if data.gelf == nil { - p.logger.Infof("connecting to gelf address=%s", p.config.Endpoint) - - gelf, err := newClient(p.config.Endpoint, p.config.ConnectionTimeout_, p.config.WriteTimeout_, false, nil) - if err != nil { - p.sendErrorMetric.Inc() - p.logger.Errorf("can't connect to gelf endpoint address=%s: %s", p.config.Endpoint, err.Error()) - time.Sleep(time.Second) - continue - } - data.gelf = gelf - } + if data.gelf == nil { + p.logger.Infof("connecting to gelf address=%s", p.config.Endpoint) - _, err := data.gelf.send(outBuf) + gelf, err := newClient(p.config.Endpoint, p.config.ConnectionTimeout_, p.config.WriteTimeout_, false, nil) if err != nil { p.sendErrorMetric.Inc() - p.logger.Errorf("can't send data to gelf address=%s, err: %s", p.config.Endpoint, err.Error()) - _ = data.gelf.close() - data.gelf = nil + p.logger.Errorf("can't connect to gelf endpoint address=%s: %s", p.config.Endpoint, err.Error()) time.Sleep(time.Second) - continue + return err } + data.gelf = gelf + } - break + _, err := data.gelf.send(outBuf) + if err != nil { + p.sendErrorMetric.Inc() + p.logger.Errorf("can't send data to gelf address=%s, err: %s", p.config.Endpoint, err.Error()) + _ = data.gelf.close() + data.gelf = nil + time.Sleep(time.Second) + return err } + + return nil } func (p *Plugin) maintenance(workerData *pipeline.WorkerData) { diff --git a/plugin/output/kafka/README.md b/plugin/output/kafka/README.md index 279784df5..0dc0e3e05 100755 --- a/plugin/output/kafka/README.md +++ b/plugin/output/kafka/README.md @@ -57,6 +57,32 @@ After this timeout the batch will be sent even if batch isn't full.
+**`retry`** *`int`* *`default=10`* + +Retries of insertion. If File.d cannot insert for this number of attempts, +File.d will fall with non-zero exit code or skip message (see fatal_on_failed_insert). + +
+ +**`fatal_on_failed_insert`** *`bool`* *`default=false`* + +After an insert error, fall with a non-zero exit code or not +**Experimental feature** + +
+ +**`retention`** *`cfg.Duration`* *`default=50ms`* + +Retention milliseconds for retry. + +
+ +**`retention_exponentially_multiplier`** *`int`* *`default=2`* + +Multiplier for exponential increase of retention between retries + +
+ **`is_sasl_enabled`** *`bool`* *`default=false`* If set, the plugin will use SASL authentications mechanism. diff --git a/plugin/output/kafka/kafka.go b/plugin/output/kafka/kafka.go index a1c2c0c7a..ef880beb8 100644 --- a/plugin/output/kafka/kafka.go +++ b/plugin/output/kafka/kafka.go @@ -14,6 +14,7 @@ import ( "github.com/ozontech/file.d/xtls" "github.com/prometheus/client_golang/prometheus" "go.uber.org/zap" + "go.uber.org/zap/zapcore" ) /*{ introduction @@ -36,7 +37,7 @@ type Plugin struct { controller pipeline.OutputPluginController producer sarama.SyncProducer - batcher *pipeline.Batcher + batcher *pipeline.RetriableBatcher // plugin metrics sendErrorMetric prometheus.Counter @@ -95,6 +96,29 @@ type Config struct { BatchFlushTimeout cfg.Duration `json:"batch_flush_timeout" default:"200ms" parse:"duration"` // * BatchFlushTimeout_ time.Duration + // > @3@4@5@6 + // > + // > Retries of insertion. If File.d cannot insert for this number of attempts, + // > File.d will fall with non-zero exit code or skip message (see fatal_on_failed_insert). + Retry int `json:"retry" default:"10"` // * + + // > @3@4@5@6 + // > + // > After an insert error, fall with a non-zero exit code or not + // > **Experimental feature** + FatalOnFailedInsert bool `json:"fatal_on_failed_insert" default:"false"` // * + + // > @3@4@5@6 + // > + // > Retention milliseconds for retry. + Retention cfg.Duration `json:"retention" default:"50ms" parse:"duration"` // * + Retention_ time.Duration + + // > @3@4@5@6 + // > + // > Multiplier for exponential increase of retention between retries + RetentionExponentMultiplier int `json:"retention_exponentially_multiplier" default:"2"` // * + // > @3@4@5@6 // > // > If set, the plugin will use SASL authentications mechanism. @@ -149,20 +173,50 @@ func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.OutputPluginP p.controller = params.Controller p.registerMetrics(params.MetricCtl) + if p.config.Retention_ < 1 { + p.logger.Fatal("'retention' can't be <1") + } + p.logger.Infof("workers count=%d, batch size=%d", p.config.WorkersCount_, p.config.BatchSize_) p.producer = NewProducer(p.config, p.logger) - p.batcher = pipeline.NewBatcher(pipeline.BatcherOptions{ + + batcherOpts := pipeline.BatcherOptions{ PipelineName: params.PipelineName, OutputType: outPluginType, - OutFn: p.out, Controller: p.controller, Workers: p.config.WorkersCount_, BatchSizeCount: p.config.BatchSize_, BatchSizeBytes: p.config.BatchSizeBytes_, FlushTimeout: p.config.BatchFlushTimeout_, MetricCtl: params.MetricCtl, - }) + } + + backoffOpts := pipeline.BackoffOpts{ + MinRetention: p.config.Retention_, + Multiplier: float64(p.config.RetentionExponentMultiplier), + AttemptNum: uint64(p.config.Retry), + } + + onError := func(err error) { + var level zapcore.Level + if p.config.FatalOnFailedInsert { + level = zapcore.FatalLevel + } else { + level = zapcore.ErrorLevel + } + + p.logger.Desugar().Log(level, "can't write batch", + zap.Int("retries", p.config.Retry), + ) + } + + p.batcher = pipeline.NewRetriableBatcher( + &batcherOpts, + p.out, + backoffOpts, + onError, + ) p.batcher.Start(context.TODO()) } @@ -175,7 +229,7 @@ func (p *Plugin) registerMetrics(ctl *metric.Ctl) { p.sendErrorMetric = ctl.RegisterCounter("output_kafka_send_errors", "Total Kafka send errors") } -func (p *Plugin) out(workerData *pipeline.WorkerData, batch *pipeline.Batch) { +func (p *Plugin) out(workerData *pipeline.WorkerData, batch *pipeline.Batch) error { if *workerData == nil { *workerData = &data{ messages: make([]*sarama.ProducerMessage, p.config.BatchSize_), @@ -211,8 +265,6 @@ func (p *Plugin) out(workerData *pipeline.WorkerData, batch *pipeline.Batch) { i++ }) - data.outBuf = outBuf - err := p.producer.SendMessages(data.messages[:i]) if err != nil { errs := err.(sarama.ProducerErrors) @@ -220,8 +272,13 @@ func (p *Plugin) out(workerData *pipeline.WorkerData, batch *pipeline.Batch) { p.logger.Errorf("can't write batch: %s", e.Err.Error()) } p.sendErrorMetric.Add(float64(len(errs))) - p.controller.Error("some events from batch were not written") + p.logger.Error( + "an attempt to insert a batch failed", + zap.Error(err), + ) } + + return err } func (p *Plugin) Stop() { diff --git a/plugin/output/postgres/README.idoc.md b/plugin/output/postgres/README.idoc.md index 2037fba64..657419cef 100644 --- a/plugin/output/postgres/README.idoc.md +++ b/plugin/output/postgres/README.idoc.md @@ -3,3 +3,6 @@ ### Config params @config-params|description + +### Example +@example diff --git a/plugin/output/postgres/README.md b/plugin/output/postgres/README.md index bb14ce874..9bfd08881 100755 --- a/plugin/output/postgres/README.md +++ b/plugin/output/postgres/README.md @@ -38,9 +38,17 @@ and nullable options.
-**`retry`** *`int`* *`default=3`* +**`retry`** *`int`* *`default=10`* -Retries of insertion. +Retries of insertion. If File.d cannot insert for this number of attempts, +File.d will fall with non-zero exit code or skip message (see fatal_on_failed_insert). + +
+ +**`fatal_on_failed_insert`** *`bool`* *`default=false`* + +After an insert error, fall with a non-zero exit code or not +**Experimental feature**
@@ -52,6 +60,9 @@ Retention milliseconds for retry to DB. **`db_request_timeout`** *`cfg.Duration`* *`default=3000ms`* +Multiplier for exponential increase of retention between retries +*`cfg.Duration`* *`default=3000ms`* + Timeout for DB requests in milliseconds.
@@ -91,4 +102,37 @@ After this timeout batch will be sent even if batch isn't completed.
+### Example +**Example** +Postgres output example: +```yaml +pipelines: + example_pipeline: + input: + type: file + persistence_mode: async + watching_dir: ./ + filename_pattern: input_example.json + offsets_file: ./offsets.yaml + offsets_op: reset + output: + type: postgres + conn_string: "user=postgres host=localhost port=5432 dbname=postgres sslmode=disable pool_max_conns=10" + table: events + columns: + - name: id + type: int + - name: name + type: string + retry: 10 + retention: 1s + retention_exponentially_multiplier: 1.5 +``` + +input_example.json +```json +{"id":1,"name":"name1"} +{"id":2,"name":"name2"} +``` +
*Generated using [__insane-doc__](https://github.com/vitkovskii/insane-doc)* \ No newline at end of file diff --git a/plugin/output/postgres/postgres.go b/plugin/output/postgres/postgres.go index a793e577e..531396b75 100644 --- a/plugin/output/postgres/postgres.go +++ b/plugin/output/postgres/postgres.go @@ -17,12 +17,47 @@ import ( "github.com/prometheus/client_golang/prometheus" insaneJSON "github.com/vitkovskii/insane-json" "go.uber.org/zap" + "go.uber.org/zap/zapcore" ) /*{ introduction It sends the event batches to postgres db using pgx. }*/ +/*{ example +**Example** +Postgres output example: +```yaml +pipelines: + example_pipeline: + input: + type: file + persistence_mode: async + watching_dir: ./ + filename_pattern: input_example.json + offsets_file: ./offsets.yaml + offsets_op: reset + output: + type: postgres + conn_string: "user=postgres host=localhost port=5432 dbname=postgres sslmode=disable pool_max_conns=10" + table: events + columns: + - name: id + type: int + - name: name + type: string + retry: 10 + retention: 1s + retention_exponentially_multiplier: 1.5 +``` + +input_example.json +```json +{"id":1,"name":"name1"} +{"id":2,"name":"name2"} +``` +}*/ + var ( ErrEventDoesntHaveField = errors.New("event doesn't have field") ErrEventFieldHasWrongType = errors.New("event field has wrong type") @@ -63,7 +98,7 @@ type Plugin struct { controller pipeline.OutputPluginController logger *zap.SugaredLogger config *Config - batcher *pipeline.Batcher + batcher *pipeline.RetriableBatcher ctx context.Context cancelFunc context.CancelFunc @@ -71,9 +106,11 @@ type Plugin struct { pool PgxIface // plugin metrics + discardedEventMetric prometheus.Counter duplicatedEventMetric prometheus.Counter writtenEventMetric prometheus.Counter + insertErrorsMetric prometheus.Counter } type ConfigColumn struct { @@ -119,8 +156,15 @@ type Config struct { // > @3@4@5@6 // > - // > Retries of insertion. - Retry int `json:"retry" default:"3"` // * + // > Retries of insertion. If File.d cannot insert for this number of attempts, + // > File.d will fall with non-zero exit code or skip message (see fatal_on_failed_insert). + Retry int `json:"retry" default:"10"` // * + + // > @3@4@5@6 + // > + // > After an insert error, fall with a non-zero exit code or not + // > **Experimental feature** + FatalOnFailedInsert bool `json:"fatal_on_failed_insert" default:"false"` // * // > @3@4@5@6 // > @@ -128,6 +172,11 @@ type Config struct { Retention cfg.Duration `json:"retention" default:"50ms" parse:"duration"` // * Retention_ time.Duration + // > @3@4@5@6 + // > + // > Multiplier for exponential increase of retention between retries + RetentionExponentMultiplier int `json:"retention_exponentially_multiplier" default:"2"` + // > @3@4@5@6 // > // > Timeout for DB requests in milliseconds. @@ -182,6 +231,7 @@ func (p *Plugin) registerMetrics(ctl *metric.Ctl) { p.discardedEventMetric = ctl.RegisterCounter("output_postgres_event_discarded", "Total pgsql discarded messages") p.duplicatedEventMetric = ctl.RegisterCounter("output_postgres_event_duplicated", "Total pgsql duplicated messages") p.writtenEventMetric = ctl.RegisterCounter("output_postgres_event_written", "Total events written to pgsql") + p.insertErrorsMetric = ctl.RegisterCounter("output_postgres_insert_errors", "Total pgsql insert errors") } func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.OutputPluginParams) { @@ -192,7 +242,7 @@ func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.OutputPluginP p.registerMetrics(params.MetricCtl) if len(p.config.Columns) == 0 { - p.logger.Fatal("can't start plugin, no fields in config") + p.logger.Fatal("can't start plugin, no columns in config") } if p.config.Retry < 1 { @@ -225,17 +275,42 @@ func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.OutputPluginP } p.pool = pool - p.batcher = pipeline.NewBatcher(pipeline.BatcherOptions{ + batcherOpts := pipeline.BatcherOptions{ PipelineName: params.PipelineName, OutputType: outPluginType, - OutFn: p.out, Controller: p.controller, Workers: p.config.WorkersCount_, BatchSizeCount: p.config.BatchSize_, BatchSizeBytes: p.config.BatchSizeBytes_, FlushTimeout: p.config.BatchFlushTimeout_, MetricCtl: params.MetricCtl, - }) + } + + backoffOpts := pipeline.BackoffOpts{ + MinRetention: p.config.Retention_, + Multiplier: float64(p.config.RetentionExponentMultiplier), + AttemptNum: uint64(p.config.Retry), + } + + onError := func(err error) { + var level zapcore.Level + if p.config.FatalOnFailedInsert { + level = zapcore.FatalLevel + } else { + level = zapcore.ErrorLevel + } + + p.logger.Desugar().Log(level, "can't insert to the table", zap.Error(err), + zap.Int("retries", p.config.Retry), + zap.String("table", p.config.Table)) + } + + p.batcher = pipeline.NewRetriableBatcher( + &batcherOpts, + p.out, + backoffOpts, + onError, + ) ctx, cancel := context.WithCancel(context.Background()) p.ctx = ctx @@ -254,7 +329,7 @@ func (p *Plugin) Out(event *pipeline.Event) { p.batcher.Add(event) } -func (p *Plugin) out(_ *pipeline.WorkerData, batch *pipeline.Batch) { +func (p *Plugin) out(_ *pipeline.WorkerData, batch *pipeline.Batch) error { // _ *pipeline.WorkerData - doesn't required in this plugin, we can't parse // events for uniques through bytes. builder := p.queryBuilder.GetInsertBuilder() @@ -299,7 +374,7 @@ func (p *Plugin) out(_ *pipeline.WorkerData, batch *pipeline.Batch) { // no valid events passed. if !anyValidValue { - return + return nil } query, args, err := builder.ToSql() @@ -314,22 +389,14 @@ func (p *Plugin) out(_ *pipeline.WorkerData, batch *pipeline.Batch) { argsSliceInterface[i] = args[i-1] } - // Insert into pg with retry. - for i := p.config.Retry; i > 0; i-- { - err = p.try(query, argsSliceInterface) - if err != nil { - p.logger.Errorf("can't exec query: %s", err.Error()) - time.Sleep(p.config.Retention_) - continue - } - p.writtenEventMetric.Add(float64(len(uniqueEventsMap))) - break - } - + err = p.try(query, argsSliceInterface) if err != nil { - p.pool.Close() - p.logger.Fatalf("failed insert into %s. query: %s, args: %v, err: %v", p.config.Table, query, args, err) + p.insertErrorsMetric.Inc() + p.logger.Errorf("can't exec query: %s", err.Error()) + return err } + p.writtenEventMetric.Add(float64(len(uniqueEventsMap))) + return nil } func (p *Plugin) try(query string, argsSliceInterface []any) error { diff --git a/plugin/output/postgres/postgres_test.go b/plugin/output/postgres/postgres_test.go index 522c874be..53d905ddd 100644 --- a/plugin/output/postgres/postgres_test.go +++ b/plugin/output/postgres/postgres_test.go @@ -147,12 +147,7 @@ func TestPrivateOutWithRetry(t *testing.T) { gomock.AssignableToTypeOf(ctxMock), "INSERT INTO table1 (str_uni_1,int_1,timestamp_1) VALUES ($1,$2,$3) ON CONFLICT(str_uni_1) DO UPDATE SET int_1=EXCLUDED.int_1,timestamp_1=EXCLUDED.timestamp_1", []any{preferSimpleProtocol, strUniValue, intValue, time.Unix(int64(timestampValue), 0).Format(time.RFC3339)}, - ).Return(&rowsForTest{}, errors.New("someError")).Times(2) - mockpool.EXPECT().Query( - gomock.AssignableToTypeOf(ctxMock), - "INSERT INTO table1 (str_uni_1,int_1,timestamp_1) VALUES ($1,$2,$3) ON CONFLICT(str_uni_1) DO UPDATE SET int_1=EXCLUDED.int_1,timestamp_1=EXCLUDED.timestamp_1", - []any{preferSimpleProtocol, strUniValue, intValue, time.Unix(int64(timestampValue), 0).Format(time.RFC3339)}, - ).Return(&rowsForTest{}, nil).Times(1) + ).Return(&rowsForTest{}, errors.New("someError")).Times(1) builder, err := NewQueryBuilder(columns, table) require.NoError(t, err) diff --git a/plugin/output/s3/README.md b/plugin/output/s3/README.md index 53bef4194..5b914e248 100755 --- a/plugin/output/s3/README.md +++ b/plugin/output/s3/README.md @@ -22,9 +22,9 @@ pipelines: type: http emulate_mode: "no" address: ":9200" - actions: - - type: json_decode - field: message + actions: + - type: json_decode + field: message output: type: s3 file_config: @@ -48,12 +48,12 @@ pipelines: type: http emulate_mode: "no" address: ":9200" - actions: - - type: json_decode - field: message + actions: + - type: json_decode + field: message output: type: s3 - file_plugin: + file_config: retention_interval: 10s # endpoint, access_key, secret_key, bucket are required. endpoint: "s3.fake_host.org:80" @@ -144,4 +144,30 @@ Sets upload timeout.
+**`retry`** *`int`* *`default=10`* + +Retries of upload. If File.d cannot upload for this number of attempts, +File.d will fall with non-zero exit code or skip message (see fatal_on_failed_insert). + +
+ +**`fatal_on_failed_insert`** *`bool`* *`default=false`* + +After an insert error, fall with a non-zero exit code or not +**Experimental feature** + +
+ +**`retention`** *`cfg.Duration`* *`default=1s`* + +Retention milliseconds for retry to upload. + +
+ +**`retention_exponentially_multiplier`** *`int`* *`default=2`* + +Multiplier for exponential increase of retention between retries + +
+
*Generated using [__insane-doc__](https://github.com/vitkovskii/insane-doc)* \ No newline at end of file diff --git a/plugin/output/s3/TESTME.md b/plugin/output/s3/TESTME.md index c57992e3b..92389406c 100644 --- a/plugin/output/s3/TESTME.md +++ b/plugin/output/s3/TESTME.md @@ -52,12 +52,12 @@ pipelines: type: http emulate_mode: "no" address: ":9200" - actions: - - type: json_decode - field: message + actions: + - type: json_decode + field: message output: type: s3 - file_plugin: + file_config: retention_interval: 10s endpoint: "0.0.0.0:19001" access_key: "minio_access_key" diff --git a/plugin/output/s3/s3.go b/plugin/output/s3/s3.go index b8d0050b5..84148db68 100644 --- a/plugin/output/s3/s3.go +++ b/plugin/output/s3/s3.go @@ -13,6 +13,7 @@ import ( "sync" "time" + "github.com/cenkalti/backoff/v4" "github.com/minio/minio-go" "github.com/ozontech/file.d/cfg" "github.com/ozontech/file.d/fd" @@ -21,6 +22,7 @@ import ( "github.com/ozontech/file.d/plugin/output/file" "github.com/prometheus/client_golang/prometheus" "go.uber.org/zap" + "go.uber.org/zap/zapcore" ) /*{ introduction @@ -47,9 +49,9 @@ pipelines: type: http emulate_mode: "no" address: ":9200" - actions: - - type: json_decode - field: message + actions: + - type: json_decode + field: message output: type: s3 file_config: @@ -73,12 +75,12 @@ pipelines: type: http emulate_mode: "no" address: ":9200" - actions: - - type: json_decode - field: message + actions: + - type: json_decode + field: message output: type: s3 - file_plugin: + file_config: retention_interval: 10s # endpoint, access_key, secret_key, bucket are required. endpoint: "s3.fake_host.org:80" @@ -102,16 +104,14 @@ pipelines: }*/ const ( - fileNameSeparator = "_" - attemptIntervalMin = 1 * time.Second - dirSep = "/" - StaticBucketDir = "static_buckets" - DynamicBucketDir = "dynamic_buckets" + fileNameSeparator = "_" + dirSep = "/" + StaticBucketDir = "static_buckets" + DynamicBucketDir = "dynamic_buckets" ) var ( - attemptInterval = attemptIntervalMin - compressors = map[string]func(*zap.SugaredLogger) compressor{ + compressors = map[string]func(*zap.SugaredLogger) compressor{ zipName: newZipCompressor, } ) @@ -234,6 +234,29 @@ type Config struct { // > Sets upload timeout. UploadTimeout cfg.Duration `json:"upload_timeout" default:"1m" parse:"duration"` // * UploadTimeout_ time.Duration + + // > @3@4@5@6 + // > + // > Retries of upload. If File.d cannot upload for this number of attempts, + // > File.d will fall with non-zero exit code or skip message (see fatal_on_failed_insert). + Retry int `json:"retry" default:"10"` // * + + // > @3@4@5@6 + // > + // > After an insert error, fall with a non-zero exit code or not + // > **Experimental feature** + FatalOnFailedInsert bool `json:"fatal_on_failed_insert" default:"false"` // * + + // > @3@4@5@6 + // > + // > Retention milliseconds for retry to upload. + Retention cfg.Duration `json:"retention" default:"1s" parse:"duration"` // * + Retention_ time.Duration + + // > @3@4@5@6 + // > + // > Multiplier for exponential increase of retention between retries + RetentionExponentMultiplier int `json:"retention_exponentially_multiplier" default:"2"` // * } func (c *Config) IsMultiBucketExists(bucketName string) bool { @@ -313,7 +336,11 @@ func (p *Plugin) StartWithMinio(config pipeline.AnyConfig, params *pipeline.Outp p.compressCh = make(chan fileDTO, p.config.FileConfig.WorkersCount_) for i := 0; i < p.config.FileConfig.WorkersCount_; i++ { - go p.uploadWork() + go p.uploadWork(pipeline.GetBackoff( + p.config.Retention_, + float64(p.config.RetentionExponentMultiplier), + uint64(p.config.Retry), + )) go p.compressWork() } err = p.startPlugins(params, outPlugCount, targetDirs, fileNames) @@ -505,10 +532,10 @@ func (p *Plugin) addFileJobWithBucket(bucketName string) func(filename string) { } } -func (p *Plugin) uploadWork() { +func (p *Plugin) uploadWork(workerBackoff backoff.BackOff) { for compressed := range p.uploadCh { - sleepTime := attemptInterval - for { + workerBackoff.Reset() + err := backoff.Retry(func() error { p.logger.Infof("starting upload s3 object. fileName=%s, bucketName=%s", compressed.fileName, compressed.bucketName) err := p.uploadToS3(compressed) if err == nil { @@ -519,11 +546,23 @@ func (p *Plugin) uploadWork() { if err != nil && !os.IsNotExist(err) { p.logger.Panicf("could not delete file: %s, err: %s", compressed, err.Error()) } - break + return nil } - sleepTime += sleepTime - p.logger.Errorf("could not upload object: %s, next attempt in %s, error: %s", compressed, sleepTime.String(), err.Error()) - time.Sleep(sleepTime) + p.logger.Errorf("could not upload object: %s, error: %s", compressed, err.Error()) + return err + }, workerBackoff) + + if err != nil { + var level zapcore.Level + if p.config.FatalOnFailedInsert { + level = zapcore.FatalLevel + } else { + level = zapcore.ErrorLevel + } + + p.logger.Desugar().Log(level, "could not upload s3 object", zap.Error(err), + zap.Int("retries", p.config.Retry), + ) } } } diff --git a/plugin/output/splunk/README.md b/plugin/output/splunk/README.md index d5583c5f7..5b456845a 100755 --- a/plugin/output/splunk/README.md +++ b/plugin/output/splunk/README.md @@ -45,5 +45,31 @@ After this timeout the batch will be sent even if batch isn't completed.
+**`retry`** *`int`* *`default=10`* + +Retries of insertion. If File.d cannot insert for this number of attempts, +File.d will fall with non-zero exit code or skip message (see fatal_on_failed_insert). + +
+ +**`fatal_on_failed_insert`** *`bool`* *`default=false`* + +After an insert error, fall with a non-zero exit code or not +**Experimental feature** + +
+ +**`retention`** *`cfg.Duration`* *`default=1s`* + +Retention milliseconds for retry to DB. + +
+ +**`retention_exponentially_multiplier`** *`int`* *`default=2`* + +Multiplier for exponential increase of retention between retries + +
+
*Generated using [__insane-doc__](https://github.com/vitkovskii/insane-doc)* \ No newline at end of file diff --git a/plugin/output/splunk/splunk.go b/plugin/output/splunk/splunk.go index 269d13425..99cc07e8c 100644 --- a/plugin/output/splunk/splunk.go +++ b/plugin/output/splunk/splunk.go @@ -17,6 +17,7 @@ import ( "github.com/prometheus/client_golang/prometheus" insaneJSON "github.com/vitkovskii/insane-json" "go.uber.org/zap" + "go.uber.org/zap/zapcore" ) /*{ introduction @@ -32,7 +33,7 @@ type Plugin struct { client http.Client logger *zap.SugaredLogger avgEventSize int - batcher *pipeline.Batcher + batcher *pipeline.RetriableBatcher controller pipeline.OutputPluginController // plugin metrics @@ -82,6 +83,29 @@ type Config struct { // > After this timeout the batch will be sent even if batch isn't completed. BatchFlushTimeout cfg.Duration `json:"batch_flush_timeout" default:"200ms" parse:"duration"` // * BatchFlushTimeout_ time.Duration + + // > @3@4@5@6 + // > + // > Retries of insertion. If File.d cannot insert for this number of attempts, + // > File.d will fall with non-zero exit code or skip message (see fatal_on_failed_insert). + Retry int `json:"retry" default:"10"` // * + + // > @3@4@5@6 + // > + // > After an insert error, fall with a non-zero exit code or not + // > **Experimental feature** + FatalOnFailedInsert bool `json:"fatal_on_failed_insert" default:"false"` // * + + // > @3@4@5@6 + // > + // > Retention milliseconds for retry to DB. + Retention cfg.Duration `json:"retention" default:"1s" parse:"duration"` // * + Retention_ time.Duration + + // > @3@4@5@6 + // > + // > Multiplier for exponential increase of retention between retries + RetentionExponentMultiplier int `json:"retention_exponentially_multiplier" default:"2"` // * } type data struct { @@ -107,10 +131,9 @@ func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.OutputPluginP p.registerMetrics(params.MetricCtl) p.client = p.newClient(p.config.RequestTimeout_) - p.batcher = pipeline.NewBatcher(pipeline.BatcherOptions{ + batcherOpts := pipeline.BatcherOptions{ PipelineName: params.PipelineName, OutputType: outPluginType, - OutFn: p.out, MaintenanceFn: p.maintenance, Controller: p.controller, Workers: p.config.WorkersCount_, @@ -118,7 +141,32 @@ func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.OutputPluginP BatchSizeBytes: p.config.BatchSizeBytes_, FlushTimeout: p.config.BatchFlushTimeout_, MetricCtl: params.MetricCtl, - }) + } + + backoffOpts := pipeline.BackoffOpts{ + MinRetention: p.config.Retention_, + Multiplier: float64(p.config.RetentionExponentMultiplier), + AttemptNum: uint64(p.config.Retry), + } + + onError := func(err error) { + var level zapcore.Level + if p.config.FatalOnFailedInsert { + level = zapcore.FatalLevel + } else { + level = zapcore.ErrorLevel + } + + p.logger.Desugar().Log(level, "can't send data to splunk", zap.Error(err), + zap.Int("retries", p.config.Retry)) + } + + p.batcher = pipeline.NewRetriableBatcher( + &batcherOpts, + p.out, + backoffOpts, + onError, + ) p.batcher.Start(context.TODO()) } @@ -135,7 +183,7 @@ func (p *Plugin) Out(event *pipeline.Event) { p.batcher.Add(event) } -func (p *Plugin) out(workerData *pipeline.WorkerData, batch *pipeline.Batch) { +func (p *Plugin) out(workerData *pipeline.WorkerData, batch *pipeline.Batch) error { if *workerData == nil { *workerData = &data{ outBuf: make([]byte, 0, p.config.BatchSize_*p.avgEventSize), @@ -161,19 +209,15 @@ func (p *Plugin) out(workerData *pipeline.WorkerData, batch *pipeline.Batch) { p.logger.Debugf("trying to send: %s", outBuf) - for { - err := p.send(outBuf) - if err != nil { - p.sendErrorMetric.Inc() - p.logger.Errorf("can't send data to splunk address=%s: %s", p.config.Endpoint, err.Error()) - time.Sleep(time.Second) - - continue - } - - break + err := p.send(outBuf) + if err != nil { + p.sendErrorMetric.Inc() + p.logger.Errorf("can't send data to splunk address=%s: %s", p.config.Endpoint, err.Error()) + } else { + p.logger.Debugf("successfully sent: %s", outBuf) } - p.logger.Debugf("successfully sent: %s", outBuf) + + return err } func (p *Plugin) maintenance(_ *pipeline.WorkerData) {}