Skip to content

Commit

Permalink
Output retry exponentially (#526)
Browse files Browse the repository at this point in the history
  • Loading branch information
DmitryRomanov authored Feb 1, 2024
1 parent 1be2b75 commit 55ec72d
Show file tree
Hide file tree
Showing 23 changed files with 822 additions and 162 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ require (
github.com/alecthomas/units v0.0.0-20211218093645-b94a6e3cc137
github.com/alicebob/miniredis/v2 v2.30.5
github.com/bitly/go-simplejson v0.5.1
github.com/cenkalti/backoff/v4 v4.2.1
github.com/cespare/xxhash/v2 v2.2.0
github.com/euank/go-kmsg-parser v2.0.0+incompatible
github.com/go-redis/redis v6.15.9+incompatible
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ github.com/bitly/go-simplejson v0.5.1 h1:xgwPbetQScXt1gh9BmoJ6j9JMr3TElvuIyjR8pg
github.com/bitly/go-simplejson v0.5.1/go.mod h1:YOPVLzCfwK14b4Sff3oP1AmGhI9T9Vsg84etUnlyp+Q=
github.com/cenkalti/backoff/v3 v3.0.0 h1:ske+9nBpD9qZsTBoF41nW5L+AIuFBKMeze18XQ3eG1c=
github.com/cenkalti/backoff/v3 v3.0.0/go.mod h1:cIeZDE3IrqwwJl6VUwCN6trj1oXrTS4rc0ij+ULvLYs=
github.com/cenkalti/backoff/v4 v4.2.1 h1:y4OZtCnogmCPw98Zjyt5a6+QwPLGkiQsYW5oUqylYbM=
github.com/cenkalti/backoff/v4 v4.2.1/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE=
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44=
github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
Expand Down
76 changes: 76 additions & 0 deletions pipeline/backoff.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package pipeline

import (
"context"
"time"

"github.com/cenkalti/backoff/v4"
)

type RetriableBatcher struct {
outFn RetriableBatcherOutFn
backoff backoff.BackOff
batcher *Batcher
onRetryError func(err error)
}

type RetriableBatcherOutFn func(*WorkerData, *Batch) error

type BackoffOpts struct {
MinRetention time.Duration
Multiplier float64
AttemptNum uint64
}

func NewRetriableBatcher(batcherOpts *BatcherOptions, batcherOutFn RetriableBatcherOutFn, opts BackoffOpts, onError func(err error)) *RetriableBatcher {
boff := GetBackoff(
opts.MinRetention,
opts.Multiplier,
opts.AttemptNum,
)

batcherBackoff := &RetriableBatcher{
outFn: batcherOutFn,
backoff: boff,
onRetryError: onError,
}
batcherBackoff.setBatcher(batcherOpts)
return batcherBackoff
}

func (b *RetriableBatcher) setBatcher(batcherOpts *BatcherOptions) {
batcherOpts.OutFn = b.Out
b.batcher = NewBatcher(*batcherOpts)
}

func (b *RetriableBatcher) Out(data *WorkerData, batch *Batch) {
b.backoff.Reset()

err := backoff.Retry(func() error {
return b.outFn(data, batch)
}, b.backoff)

if err != nil {
b.onRetryError(err)
}
}

func (b *RetriableBatcher) Start(ctx context.Context) {
b.batcher.Start(ctx)
}

func (b *RetriableBatcher) Stop() {
b.batcher.Stop()
}

func (b *RetriableBatcher) Add(event *Event) {
b.batcher.Add(event)
}

func GetBackoff(minRetention time.Duration, multiplier float64, attemptNum uint64) backoff.BackOff {
expBackoff := backoff.NewExponentialBackOff()
expBackoff.InitialInterval = minRetention
expBackoff.Multiplier = multiplier
expBackoff.RandomizationFactor = 0.5
return backoff.WithMaxRetries(expBackoff, attemptNum)
}
62 changes: 62 additions & 0 deletions pipeline/backoff_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package pipeline

import (
"errors"
"testing"

"github.com/ozontech/file.d/metric"
"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/assert"
"go.uber.org/atomic"
)

func TestBackoff(t *testing.T) {
errorCount := &atomic.Int32{}
errorCountBefore := errorCount.Load()

eventCount := &atomic.Int32{}
eventCountBefore := eventCount.Load()

errorFn := func(err error) {
errorCount.Inc()
}

batcherBackoff := NewRetriableBatcher(
&BatcherOptions{
MetricCtl: metric.NewCtl("", prometheus.NewRegistry()),
},
func(workerData *WorkerData, batch *Batch) error {
eventCount.Inc()
return nil
},
BackoffOpts{AttemptNum: 3},
errorFn,
)

batcherBackoff.Out(nil, nil)

assert.Equal(t, errorCountBefore, errorCount.Load(), "wrong error count")
assert.Equal(t, eventCountBefore+1, eventCount.Load(), "wrong event count")
}

func TestBackoffWithError(t *testing.T) {
errorCount := &atomic.Int32{}
prevValue := errorCount.Load()
errorFn := func(err error) {
errorCount.Inc()
}

batcherBackoff := NewRetriableBatcher(
&BatcherOptions{
MetricCtl: metric.NewCtl("", prometheus.NewRegistry()),
},
func(workerData *WorkerData, batch *Batch) error {
return errors.New("some error")
},
BackoffOpts{AttemptNum: 3},
errorFn,
)

batcherBackoff.Out(nil, nil)
assert.Equal(t, prevValue+1, errorCount.Load(), "wrong error count")
}
14 changes: 7 additions & 7 deletions plugin/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -733,9 +733,9 @@ pipelines:
type: http
emulate_mode: "no"
address: ":9200"
actions:
- type: json_decode
field: message
actions:
- type: json_decode
field: message
output:
type: s3
file_config:
Expand All @@ -759,12 +759,12 @@ pipelines:
type: http
emulate_mode: "no"
address: ":9200"
actions:
- type: json_decode
field: message
actions:
- type: json_decode
field: message
output:
type: s3
file_plugin:
file_config:
retention_interval: 10s
# endpoint, access_key, secret_key, bucket are required.
endpoint: "s3.fake_host.org:80"
Expand Down
14 changes: 7 additions & 7 deletions plugin/output/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,9 @@ pipelines:
type: http
emulate_mode: "no"
address: ":9200"
actions:
- type: json_decode
field: message
actions:
- type: json_decode
field: message
output:
type: s3
file_config:
Expand All @@ -98,12 +98,12 @@ pipelines:
type: http
emulate_mode: "no"
address: ":9200"
actions:
- type: json_decode
field: message
actions:
- type: json_decode
field: message
output:
type: s3
file_plugin:
file_config:
retention_interval: 10s
# endpoint, access_key, secret_key, bucket are required.
endpoint: "s3.fake_host.org:80"
Expand Down
15 changes: 14 additions & 1 deletion plugin/output/clickhouse/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,14 @@ If the strict mode is enabled file.d fails (exit with code 1) in above examples.
**`retry`** *`int`* *`default=10`*

Retries of insertion. If File.d cannot insert for this number of attempts,
File.d will fall with non-zero exit code.
File.d will fall with non-zero exit code or skip message (see fatal_on_failed_insert).

<br>

**`fatal_on_failed_insert`** *`bool`* *`default=false`*

After an insert error, fall with a non-zero exit code or not
**Experimental feature**

<br>

Expand All @@ -128,6 +135,12 @@ Retention milliseconds for retry to DB.

<br>

**`retention_exponentially_multiplier`** *`int`* *`default=2`*

Multiplier for exponential increase of retention between retries

<br>

**`insert_timeout`** *`cfg.Duration`* *`default=10s`*

Timeout for each insert request.
Expand Down
73 changes: 54 additions & 19 deletions plugin/output/clickhouse/clickhouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/prometheus/client_golang/prometheus"
"go.uber.org/atomic"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
)

/*{ introduction
Expand All @@ -43,7 +44,7 @@ type Plugin struct {
logger *zap.Logger

config *Config
batcher *pipeline.Batcher
batcher *pipeline.RetriableBatcher
ctx context.Context
cancelFunc context.CancelFunc

Expand Down Expand Up @@ -227,9 +228,15 @@ type Config struct {
// > @3@4@5@6
// >
// > Retries of insertion. If File.d cannot insert for this number of attempts,
// > File.d will fall with non-zero exit code.
// > File.d will fall with non-zero exit code or skip message (see fatal_on_failed_insert).
Retry int `json:"retry" default:"10"` // *

// > @3@4@5@6
// >
// > After an insert error, fall with a non-zero exit code or not
// > **Experimental feature**
FatalOnFailedInsert bool `json:"fatal_on_failed_insert" default:"false"` // *

// > @3@4@5@6
// >
// > Additional settings to the Clickhouse.
Expand All @@ -242,6 +249,11 @@ type Config struct {
Retention cfg.Duration `json:"retention" default:"50ms" parse:"duration"` // *
Retention_ time.Duration

// > @3@4@5@6
// >
// > Multiplier for exponential increase of retention between retries
RetentionExponentMultiplier int `json:"retention_exponentially_multiplier" default:"2"` // *

// > @3@4@5@6
// >
// > Timeout for each insert request.
Expand Down Expand Up @@ -328,9 +340,6 @@ func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.OutputPluginP
p.registerMetrics(params.MetricCtl)
p.ctx, p.cancelFunc = context.WithCancel(context.Background())

if p.config.Retry < 1 {
p.logger.Fatal("'retry' can't be <1")
}
if p.config.Retention_ < 1 {
p.logger.Fatal("'retention' can't be <1")
}
Expand Down Expand Up @@ -405,17 +414,42 @@ func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.OutputPluginP
}
}

p.batcher = pipeline.NewBatcher(pipeline.BatcherOptions{
batcherOpts := pipeline.BatcherOptions{
PipelineName: params.PipelineName,
OutputType: outPluginType,
OutFn: p.out,
Controller: params.Controller,
Workers: p.config.WorkersCount_,
BatchSizeCount: p.config.BatchSize_,
BatchSizeBytes: p.config.BatchSizeBytes_,
FlushTimeout: p.config.BatchFlushTimeout_,
MetricCtl: params.MetricCtl,
})
}

backoffOpts := pipeline.BackoffOpts{
MinRetention: p.config.Retention_,
Multiplier: float64(p.config.RetentionExponentMultiplier),
AttemptNum: uint64(p.config.Retry),
}

onError := func(err error) {
var level zapcore.Level
if p.config.FatalOnFailedInsert {
level = zapcore.FatalLevel
} else {
level = zapcore.ErrorLevel
}

p.logger.Log(level, "can't insert to the table", zap.Error(err),
zap.Int("retries", p.config.Retry),
zap.String("table", p.config.Table))
}

p.batcher = pipeline.NewRetriableBatcher(
&batcherOpts,
p.out,
backoffOpts,
onError,
)

p.batcher.Start(p.ctx)
}
Expand Down Expand Up @@ -443,7 +477,7 @@ func (d data) reset() {
}
}

func (p *Plugin) out(workerData *pipeline.WorkerData, batch *pipeline.Batch) {
func (p *Plugin) out(workerData *pipeline.WorkerData, batch *pipeline.Batch) error {
if *workerData == nil {
// we don't check the error, schema already validated in the Start
columns, _ := inferInsaneColInputs(p.config.Columns)
Expand Down Expand Up @@ -484,22 +518,23 @@ func (p *Plugin) out(workerData *pipeline.WorkerData, batch *pipeline.Batch) {
})

var err error
for try := 0; try < p.config.Retry; try++ {
for i := range p.instances {
requestID := p.requestID.Inc()
clickhouse := p.getInstance(requestID, try)
err = p.do(clickhouse, data.input)
clickhouse := p.getInstance(requestID, i)
err := p.do(clickhouse, data.input)
if err == nil {
break
return nil
}
p.insertErrorsMetric.Inc()
time.Sleep(p.config.Retention_)
p.logger.Error("an attempt to insert a batch failed", zap.Error(err))
}
if err != nil {
p.logger.Fatal("can't insert to the table", zap.Error(err),
zap.Int("retries", p.config.Retry),
zap.String("table", p.config.Table))
p.insertErrorsMetric.Inc()
p.logger.Error(
"an attempt to insert a batch failed",
zap.Error(err),
)
}

return err
}

func (p *Plugin) do(clickhouse Clickhouse, queryInput proto.Input) error {
Expand Down
Loading

0 comments on commit 55ec72d

Please sign in to comment.