Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Track recovery attempts using leaky bucket #1881

Merged
merged 7 commits into from
Oct 8, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
149 changes: 135 additions & 14 deletions docs/test-cases/pipeline-recovery.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,28 +19,24 @@ a retry.
```yaml
version: "2.2"
pipelines:
- id: file-pipeline
- id: chaos-to-log
status: running
name: file-pipeline
description: dlq write error
description: Postgres source, file destination
dead-letter-queue:
plugin: standalone:chaos
settings:
writeMode: error
connectors:
- id: chaos-src
- id: chaos
type: source
plugin: standalone:chaos
name: chaos-src
name: source
settings:
readMode: error
- id: log-dst
- id: destination
type: destination
plugin: builtin:log
log: file-dst
dead-letter-queue:
plugin: "builtin:postgres"
settings:
table: non_existing_table_so_that_dlq_fails
url: postgresql://meroxauser:meroxapass@localhost/meroxadb?sslmode=disable
window-size: 3
window-nack-threshold: 2
name: destination
```

**Steps**:
Expand All @@ -65,6 +61,28 @@ Recovery is not triggered when there is an error processing a record.
**Pipeline configuration file**:

```yaml
version: "2.2"
pipelines:
- id: generator-to-log
status: running
description: Postgres source, file destination
connectors:
- id: generator
type: source
plugin: builtin:generator
name: source
settings:
format.type: structured
format.options.id: int
format.options.name: string
operations: create
- id: destination
type: destination
plugin: builtin:log
name: destination
processors:
- id: error
plugin: "error"
```

**Steps**:
Expand Down Expand Up @@ -328,3 +346,106 @@ pipelines:
**Additional comments**:

---

## Test Case 11: Recovery triggered during a specific max-retries-window, after that pipeline is degraded

**Priority** (low/medium/high):

**Description**:

A pipeline will be allowed to fail during a specific time window, after that it will be degraded.
Combining `max-retries` and `max-retries-window` we can control how many times a pipeline can fail during a specific time window.

**Automated** (yes/no)

**Setup**:

**Pipeline configuration file**:

```yaml
version: "2.2"
pipelines:
- id: generator-to-log
status: running
description: Postgres source, file destination
connectors:
- id: postgres-source
type: source
plugin: builtin:postgres
name: source
settings:
cdcMode: logrepl
snapshotMode: never
table: employees
url: postgresql://meroxauser:meroxapass@localhost/meroxadb?sslmode=disable
- id: destination
type: destination
plugin: builtin:log
name: destination
```

**Steps**:

1. Run Conduit with `--pipelines.error-recovery.backoff-factor 1 --pipelines.error-recovery.min-delay 10s --pipelines.error-recovery.max-retries 2 --pipelines.error-recovery.max-retries-window 25s`
2. Stop postgres database
3. Leave it stopped and notice pipeline goes to degraded on attempt 3 (after ~20 seconds)

**Expected Result**:

After 20 seconds the pipeline should be degraded.

**Additional comments**:

---

## Test Case 12: Recovery triggered during a specific max-retries-window, pipeline is resilient during a specific time window

**Priority** (low/medium/high):

**Description**:

**Automated** (yes/no)

**Setup**:

**Pipeline configuration file**:

```yaml
version: "2.2"
pipelines:
- id: generator-to-log
status: running
description: Postgres source, file destination
connectors:
- id: postgres-source
type: source
plugin: builtin:postgres
name: source
settings:
cdcMode: logrepl
snapshotMode: never
table: employees
url: postgresql://meroxauser:meroxapass@localhost/meroxadb?sslmode=disable
- id: destination
type: destination
plugin: builtin:log
name: destination
```

**Steps**:

1. Run Conduit with `--pipelines.error-recovery.backoff-factor 1 --pipelines.error-recovery.min-delay 10s --pipelines.error-recovery.max-retries 2 --pipelines.error-recovery.max-retries-window 25s`
2. Stop postgres database
3. Leave it stopped until backoff attempts are 2
4. Start postgres database again
5. Leave it running for another 15 seconds
6. Notice backoff attempts are going back to 1
(repeat if needed to see how backoff attempts are increasing and decreasing)

**Expected Result**:

Pipeline should be able to recover.

**Additional comments**:

---
8 changes: 4 additions & 4 deletions pkg/conduit/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,8 @@ type Config struct {
BackoffFactor int
// MaxRetries is the maximum number of restarts before the pipeline is considered unhealthy: Default: -1 (infinite)
MaxRetries int64
// HealthyAfter is the time after which the pipeline is considered healthy: Default: 5 minutes
HealthyAfter time.Duration
// MaxRetriesWindow is the duration window in which the max retries are counted: Default: 5 minutes
MaxRetriesWindow time.Duration
}
}

Expand Down Expand Up @@ -144,7 +144,7 @@ func DefaultConfig() Config {
cfg.Pipelines.ErrorRecovery.MaxDelay = 10 * time.Minute
cfg.Pipelines.ErrorRecovery.BackoffFactor = 2
cfg.Pipelines.ErrorRecovery.MaxRetries = lifecycle.InfiniteRetriesErrRecovery
cfg.Pipelines.ErrorRecovery.HealthyAfter = 5 * time.Minute
cfg.Pipelines.ErrorRecovery.MaxRetriesWindow = 5 * time.Minute

cfg.SchemaRegistry.Type = SchemaRegistryTypeBuiltin

Expand Down Expand Up @@ -215,7 +215,7 @@ func (c Config) validateErrorRecovery() error {
if errRecoveryCfg.MaxRetries < lifecycle.InfiniteRetriesErrRecovery {
errs = append(errs, cerrors.Errorf(`invalid "max-retries" value. It must be %d for infinite retries or >= 0`, lifecycle.InfiniteRetriesErrRecovery))
}
if err := requirePositiveValue("healthy-after", errRecoveryCfg.HealthyAfter); err != nil {
if err := requirePositiveValue("max-retries-window", errRecoveryCfg.MaxRetriesWindow); err != nil {
errs = append(errs, err)
}

Expand Down
6 changes: 3 additions & 3 deletions pkg/conduit/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,12 +213,12 @@ func TestConfig_Validate(t *testing.T) {
want: nil,
},
{
name: "error recovery: negative healthy-after",
name: "error recovery: negative max-retries-window",
setupConfig: func(c Config) Config {
c.Pipelines.ErrorRecovery.HealthyAfter = -time.Second
c.Pipelines.ErrorRecovery.MaxRetriesWindow = -time.Second
return c
},
want: cerrors.New(`invalid error recovery config: "healthy-after" config value must be positive (got: -1s)`),
want: cerrors.New(`invalid error recovery config: "max-retries-window" config value must be positive (got: -1s)`),
},
}

Expand Down
6 changes: 3 additions & 3 deletions pkg/conduit/entrypoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,9 +147,9 @@ func (*Entrypoint) Flags(cfg *Config) *flag.FlagSet {
"maximum number of retries",
)
flags.DurationVar(
&cfg.Pipelines.ErrorRecovery.HealthyAfter,
"pipelines.error-recovery.healthy-after",
cfg.Pipelines.ErrorRecovery.HealthyAfter,
&cfg.Pipelines.ErrorRecovery.MaxRetriesWindow,
"pipelines.error-recovery.max-retries-window",
cfg.Pipelines.ErrorRecovery.MaxRetriesWindow,
"amount of time running without any errors after which a pipeline is considered healthy",
)

Expand Down
10 changes: 5 additions & 5 deletions pkg/conduit/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,11 +205,11 @@ func createServices(r *Runtime) error {

// Error recovery configuration
errRecoveryCfg := &lifecycle.ErrRecoveryCfg{
MinDelay: r.Config.Pipelines.ErrorRecovery.MinDelay,
MaxDelay: r.Config.Pipelines.ErrorRecovery.MaxDelay,
BackoffFactor: r.Config.Pipelines.ErrorRecovery.BackoffFactor,
MaxRetries: r.Config.Pipelines.ErrorRecovery.MaxRetries,
HealthyAfter: r.Config.Pipelines.ErrorRecovery.HealthyAfter, // TODO: possibly create a go routine to continuously check health and reset status when needed
MinDelay: r.Config.Pipelines.ErrorRecovery.MinDelay,
MaxDelay: r.Config.Pipelines.ErrorRecovery.MaxDelay,
BackoffFactor: r.Config.Pipelines.ErrorRecovery.BackoffFactor,
MaxRetries: r.Config.Pipelines.ErrorRecovery.MaxRetries,
MaxRetriesWindow: r.Config.Pipelines.ErrorRecovery.MaxRetriesWindow,
}

plService := pipeline.NewService(r.logger, r.DB)
Expand Down
24 changes: 12 additions & 12 deletions pkg/lifecycle/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,11 @@ type FailureEvent struct {
type FailureHandler func(FailureEvent)

type ErrRecoveryCfg struct {
MinDelay time.Duration
MaxDelay time.Duration
BackoffFactor int
MaxRetries int64
HealthyAfter time.Duration
MinDelay time.Duration
MaxDelay time.Duration
BackoffFactor int
MaxRetries int64
MaxRetriesWindow time.Duration
}

func (e *ErrRecoveryCfg) toBackoff() *backoff.Backoff {
Expand Down Expand Up @@ -209,7 +209,7 @@ func (s *Service) StartWithBackoff(ctx context.Context, rp *runnablePipeline) er
// Increment number of recovery attempts.
attempt := rp.recoveryAttempts.Add(1)

if s.errRecoveryCfg.MaxRetries != InfiniteRetriesErrRecovery && attempt >= s.errRecoveryCfg.MaxRetries {
if s.errRecoveryCfg.MaxRetries != InfiniteRetriesErrRecovery && attempt > s.errRecoveryCfg.MaxRetries {
return cerrors.FatalError(cerrors.Errorf("failed to recover pipeline %s after %d attempts: %w", rp.pipeline.ID, attempt, pipeline.ErrPipelineCannotRecover))
}

Expand All @@ -220,7 +220,12 @@ func (s *Service) StartWithBackoff(ctx context.Context, rp *runnablePipeline) er
Int64(log.AttemptField, attempt).
Msg("restarting with backoff")

time.AfterFunc(duration+s.errRecoveryCfg.HealthyAfter, func() {
time.AfterFunc(duration+s.errRecoveryCfg.MaxRetriesWindow, func() {
s.logger.Info(ctx).
raulb marked this conversation as resolved.
Show resolved Hide resolved
Str(log.PipelineIDField, rp.pipeline.ID).
Dur(log.DurationField, duration).
Int64(log.AttemptField, attempt).
Msg("decreasing recovery attempts")
rp.recoveryAttempts.Add(-1) // Decrement the number of attempts after delay.
})

Expand Down Expand Up @@ -741,11 +746,6 @@ func (s *Service) runPipeline(ctx context.Context, rp *runnablePipeline) error {
})
}

// TODO: When it's recovering, we should only update the status back to running once HealthyAfter has passed.
// now:
// running -> (error) -> recovering (restart) -> running
// future (with the HealthyAfter mechanism):
// running -> (error) -> recovering (restart) -> recovering (wait for HealthyAfter) -> running
err := s.pipelines.UpdateStatus(ctx, rp.pipeline.ID, pipeline.StatusRunning, "")
if err != nil {
return err
Expand Down
10 changes: 5 additions & 5 deletions pkg/lifecycle/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -917,11 +917,11 @@ func dummyDestination(persister *connector.Persister) *connector.Instance {

func testErrRecoveryCfg() *ErrRecoveryCfg {
return &ErrRecoveryCfg{
MinDelay: time.Second,
MaxDelay: 10 * time.Minute,
BackoffFactor: 2,
MaxRetries: InfiniteRetriesErrRecovery,
HealthyAfter: 5 * time.Minute,
MinDelay: time.Second,
MaxDelay: 10 * time.Minute,
BackoffFactor: 2,
MaxRetries: InfiniteRetriesErrRecovery,
MaxRetriesWindow: 5 * time.Minute,
}
}

Expand Down
10 changes: 5 additions & 5 deletions pkg/orchestrator/orchestrator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,11 +97,11 @@ func TestPipelineSimple(t *testing.T) {
pipelineService := pipeline.NewService(logger, db)

errRecoveryCfg := &lifecycle.ErrRecoveryCfg{
MinDelay: time.Second,
MaxDelay: 10 * time.Minute,
BackoffFactor: 2,
MaxRetries: 0,
HealthyAfter: 5 * time.Minute,
MinDelay: time.Second,
MaxDelay: 10 * time.Minute,
BackoffFactor: 2,
MaxRetries: 0,
MaxRetriesWindow: 5 * time.Minute,
}

lifecycleService := lifecycle.NewService(logger, errRecoveryCfg, connectorService, processorService, connPluginService, pipelineService)
Expand Down
10 changes: 5 additions & 5 deletions pkg/provisioning/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -520,11 +520,11 @@ func TestService_IntegrationTestServices(t *testing.T) {
procService := processor.NewService(logger, db, procPluginService)

errRecoveryCfg := &lifecycle.ErrRecoveryCfg{
MinDelay: time.Second,
MaxDelay: 10 * time.Minute,
BackoffFactor: 2,
MaxRetries: 0,
HealthyAfter: 5 * time.Minute,
MinDelay: time.Second,
MaxDelay: 10 * time.Minute,
BackoffFactor: 2,
MaxRetries: 0,
MaxRetriesWindow: 5 * time.Minute,
}
lifecycleService := lifecycle.NewService(logger, errRecoveryCfg, connService, procService, connPluginService, plService)

Expand Down