Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: restart pipeline with recovery #1853

Merged
merged 49 commits into from
Oct 4, 2024
Merged
Show file tree
Hide file tree
Changes from 45 commits
Commits
Show all changes
49 commits
Select commit Hold shift + click to select a range
81206c7
wip
raulb Sep 16, 2024
9021106
implement restart method
raulb Sep 17, 2024
e1fbaf6
implement restart swapping nodes
raulb Sep 18, 2024
90f0304
fix method calls
raulb Sep 18, 2024
284df88
Merge branch 'main' into raul/restart-with-recovery
raulb Sep 18, 2024
859eb2d
fix some tests
raulb Sep 18, 2024
97707f5
Merge branch 'raul/restart-with-recovery' of github.com:ConduitIO/con…
raulb Sep 18, 2024
cc460ad
fix another test
raulb Sep 18, 2024
c20d89d
fix test
raulb Sep 18, 2024
ad739b5
update comment and refactor test
raulb Sep 18, 2024
51af897
refactor tests (wip)
raulb Sep 19, 2024
1e284b9
restart test WIP
raulb Sep 19, 2024
d91797a
remove redundant code
raulb Sep 19, 2024
2864578
update test
raulb Sep 19, 2024
26e7c02
fix test
raulb Sep 19, 2024
12f4f0e
Merge branch 'main' into raul/restart-with-recovery
raulb Sep 20, 2024
84b7f75
Merge branch 'main' into raul/restart-with-recovery
raulb Sep 20, 2024
65b1070
add new tests and refactor
raulb Sep 20, 2024
156ea90
Merge branch 'raul/restart-with-recovery' of github.com:ConduitIO/con…
raulb Sep 20, 2024
eaf1409
fix logger
raulb Sep 20, 2024
ef65360
cosmetic changes
raulb Sep 20, 2024
e63b7f5
move to a separate method
raulb Sep 23, 2024
c2faba2
add tracing and fix max retries
raulb Oct 1, 2024
13439ab
Merge branch 'main' into raul/restart-with-recovery
raulb Oct 1, 2024
f06ca0b
Merge branch 'raul/restart-with-recovery' of github.com:ConduitIO/con…
raulb Oct 1, 2024
d396da9
not needed
raulb Oct 1, 2024
a17eb5e
Set -1 as infinite retries for err recovery
raulb Oct 1, 2024
aaa6f61
rename flag
raulb Oct 2, 2024
29a944d
goes to degraded once it exits
raulb Oct 2, 2024
fb92158
fix const
raulb Oct 2, 2024
c4f167d
simpler implementation
raulb Oct 2, 2024
05f38e7
add test case for 0 max-retries
raulb Oct 2, 2024
9a6c884
Merge branch 'main' into raul/restart-with-recovery
raulb Oct 2, 2024
65998c3
fix test
raulb Oct 2, 2024
2f4065b
no need to kill the tomb
raulb Oct 2, 2024
00057c6
typo and fix
raulb Oct 2, 2024
b84e0dd
fix exit-on-degraded when degraded
raulb Oct 2, 2024
b0f21d4
log before erroring
raulb Oct 3, 2024
839182d
removes restart and uses start
raulb Oct 3, 2024
494f37e
clearer validation
raulb Oct 3, 2024
bded42e
handle recovery
hariso Oct 3, 2024
5443f18
Pipeline recovery: test cases (#1873)
hariso Oct 3, 2024
9eb37de
Pipeline recovery tests: add test pipeline, add test case (#1876)
hariso Oct 3, 2024
b1a0780
need to return err
raulb Oct 3, 2024
d5dc4eb
assign error
raulb Oct 3, 2024
234c33a
updated test cases
hariso Oct 3, 2024
94723c0
Merge branch 'raul/restart-with-recovery' of github.com:ConduitIO/con…
hariso Oct 3, 2024
33851ed
reuse method
hariso Oct 3, 2024
63be97b
use log.AttemptField
hariso Oct 4, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
279 changes: 279 additions & 0 deletions docs/test-cases/pipeline-recovery.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,279 @@
<!-- markdownlint-disable MD013 -->
# Test case for the pipeline recovery feature

## Test Case 01: Recovery not triggered for fatal error - DLQ

**Priority** (low/medium/high):

**Description**:
Recovery is not triggered when there is an error writing to a DLQ.

**Automated** (yes/no)

**Setup**:

**Pipeline configuration file**:

```yaml
version: "2.2"
pipelines:
- id: file-pipeline
status: running
name: file-pipeline
description: test pipeline
connectors:
- id: chaos-src
type: source
plugin: standalone:chaos
name: chaos-src
settings:
readMode: error
- id: log-dst
type: destination
plugin: builtin:log
log: file-dst
dead-letter-queue:
plugin: "builtin:postgres"
settings:
table: non_existing_table_so_that_dlq_fails
url: postgresql://meroxauser:meroxapass@localhost/meroxadb?sslmode=disable
window-size: 3
window-nack-threshold: 2
```

**Steps**:

**Expected Result**:

**Additional comments**:

---

## Test Case 02: Recovery not triggered for fatal error - processor

**Priority** (low/medium/high):

**Description**:
Recovery is not triggered when there is an error processing a record.

**Automated** (yes/no)

**Setup**:

**Pipeline configuration file**:

```yaml
```

**Steps**:

**Expected Result**:

**Additional comments**:

---

## Test Case 03: Recovery not triggered - graceful shutdown

**Priority** (low/medium/high):

**Description**:
Recovery is not triggered when Conduit is shutting down gracefully (i.e. when
typing Ctrl+C in the terminal where Conduit is running, or sending a SIGINT).

**Automated** (yes/no)

**Setup**:

**Pipeline configuration file**:

```yaml
```

**Steps**:

**Expected Result**:

**Additional comments**:

---

## Test Case 04: Recovery not triggered - user stopped pipeline

**Priority** (low/medium/high):

**Description**:
Recovery is not triggered if a user stops a pipeline (via the HTTP API's
`/v1/pipelines/pipeline-id/stop` endpoint).

**Automated** (yes/no)

**Setup**:

**Pipeline configuration file**:

```yaml
```

**Steps**:

**Expected Result**:

**Additional comments**:

---

## Test Case 05: Recovery is configured by default

**Priority** (low/medium/high):

**Description**:
Pipeline recovery is configured by default. A failing pipeline will be restarted
a number of times without any additional configuration.

**Automated** (yes/no)

**Setup**:

**Pipeline configuration file**:

```yaml
version: "2.2"
pipelines:
- id: chaos-to-log
status: running
name: chaos-to-log
description: chaos source, error on read
connectors:
- id: chaos-source-1
type: source
plugin: standalone:chaos
name: chaos-source-1
settings:
readMode: error
- id: destination1
type: destination
plugin: builtin:log
name: log-destination
```

**Steps**:

**Expected Result**:

**Additional comments**:

---

## Test Case 06: Recovery not triggered on malformed pipeline

**Priority** (low/medium/high):

**Description**:
Recovery is not triggered for a malformed pipeline, e.g. when a connector is
missing.

**Automated** (yes/no)

**Setup**:

**Pipeline configuration file**:

```yaml
version: "2.2"
pipelines:
- id: nothing-to-log
status: running
name: nothing-to-log
description: no source
connectors:
- id: destination1
type: destination
plugin: builtin:log
name: log-destination
```

**Steps**:

**Expected Result**:

**Additional comments**:

---

## Test Case 07: Conduit exits with --pipelines.exit-on-degraded=true and a pipeline failing after recovery

**Priority** (low/medium/high):

**Description**: Given a Conduit instance with
`--pipelines.exit-on-degraded=true`, and a pipeline that's failing after the
maximum number of retries configured, Conduit should shut down gracefully.

**Automated** (yes/no)

**Setup**:

**Pipeline configuration file**:

```yaml
```

**Steps**:

**Expected Result**:

**Additional comments**:

---

## Test Case 08: Conduit doesn't exit with --pipelines.exit-on-degraded=true and a pipeline that recovers after a few retries

**Priority** (low/medium/high):

**Description**:
Given a Conduit instance with `--pipelines.exit-on-degraded=true`, and a
pipeline that recovers after a few retries, Conduit should still be running.

**Automated** (yes/no)

**Setup**:

**Pipeline configuration file**:

```yaml
```

**Steps**:

**Expected Result**:

**Additional comments**:

---

## Test Case 09: Conduit exits with --pipelines.exit-on-degraded=true, --pipelines.error-recovery.max-retries=0, and a degraded pipeline

**Priority** (low/medium/high):

**Description**:
Given a Conduit instance with
`--pipelines.exit-on-degraded=true --pipelines.error-recovery.max-retries=0`,
and a pipeline that goes into a degraded state, the Conduit instance will
gracefully shut down. This is due `max-retries=0` disabling the recovery.

**Automated** (yes/no)

**Setup**:

**Pipeline configuration file**:

```yaml
```

**Steps**:

**Expected Result**:

**Additional comments**:

---
27 changes: 27 additions & 0 deletions docs/test-cases/test-case-template.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
<!-- markdownlint-disable MD041 -->

## Test Case 01: Test case title

**Priority** (low/medium/high):

**Description**:

**Automated** (yes/no)

**Setup**:

**Pipeline configuration file**:

```yaml
```

**Steps**:

1. Step 1
2. Step 2

**Expected Result**:

**Additional comments**:

---
26 changes: 16 additions & 10 deletions pkg/conduit/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
sdk "github.com/conduitio/conduit-connector-sdk"
"github.com/conduitio/conduit/pkg/foundation/cerrors"
"github.com/conduitio/conduit/pkg/foundation/log"
"github.com/conduitio/conduit/pkg/lifecycle"
"github.com/conduitio/conduit/pkg/plugin/connector/builtin"
"github.com/rs/zerolog"
"golang.org/x/exp/constraints"
Expand Down Expand Up @@ -84,14 +85,19 @@ type Config struct {
}

Pipelines struct {
Path string
ExitOnError bool
ErrorRecovery struct {
MinDelay time.Duration
MaxDelay time.Duration
Path string
ExitOnDegraded bool
ErrorRecovery struct {
// MinDelay is the minimum delay before restart: Default: 1 second
MinDelay time.Duration
// MaxDelay is the maximum delay before restart: Default: 10 minutes
MaxDelay time.Duration
// BackoffFactor is the factor by which the delay is multiplied after each restart: Default: 2
BackoffFactor int
MaxRetries int
HealthyAfter time.Duration
// MaxRetries is the maximum number of restarts before the pipeline is considered unhealthy: Default: -1 (infinite)
MaxRetries int64
// HealthyAfter is the time after which the pipeline is considered healthy: Default: 5 minutes
HealthyAfter time.Duration
}
}

Expand Down Expand Up @@ -137,7 +143,7 @@ func DefaultConfig() Config {
cfg.Pipelines.ErrorRecovery.MinDelay = time.Second
cfg.Pipelines.ErrorRecovery.MaxDelay = 10 * time.Minute
cfg.Pipelines.ErrorRecovery.BackoffFactor = 2
cfg.Pipelines.ErrorRecovery.MaxRetries = 0
cfg.Pipelines.ErrorRecovery.MaxRetries = lifecycle.InfiniteRetriesErrRecovery
cfg.Pipelines.ErrorRecovery.HealthyAfter = 5 * time.Minute

cfg.SchemaRegistry.Type = SchemaRegistryTypeBuiltin
Expand Down Expand Up @@ -206,8 +212,8 @@ func (c Config) validateErrorRecovery() error {
if err := requireNonNegativeValue("backoff-factor", errRecoveryCfg.BackoffFactor); err != nil {
errs = append(errs, err)
}
if err := requireNonNegativeValue("max-retries", errRecoveryCfg.MaxRetries); err != nil {
errs = append(errs, err)
if errRecoveryCfg.MaxRetries < lifecycle.InfiniteRetriesErrRecovery {
errs = append(errs, cerrors.Errorf(`invalid "max-retries" value. It must be %d for infinite retries or >= 0`, lifecycle.InfiniteRetriesErrRecovery))
}
if err := requirePositiveValue("healthy-after", errRecoveryCfg.HealthyAfter); err != nil {
errs = append(errs, err)
Expand Down
15 changes: 12 additions & 3 deletions pkg/conduit/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (

"github.com/conduitio/conduit-commons/database/inmemory"
"github.com/conduitio/conduit/pkg/foundation/cerrors"
"github.com/conduitio/conduit/pkg/lifecycle"
"github.com/matryer/is"
)

Expand Down Expand Up @@ -196,12 +197,20 @@ func TestConfig_Validate(t *testing.T) {
want: cerrors.New(`invalid error recovery config: "backoff-factor" config value mustn't be negative (got: -1)`),
},
{
name: "error recovery: negative max-retries",
name: "error recovery: max-retries smaller than -1",
setupConfig: func(c Config) Config {
c.Pipelines.ErrorRecovery.MaxRetries = -1
c.Pipelines.ErrorRecovery.MaxRetries = lifecycle.InfiniteRetriesErrRecovery - 1
return c
},
want: cerrors.New(`invalid error recovery config: "max-retries" config value mustn't be negative (got: -1)`),
want: cerrors.New(`invalid error recovery config: invalid "max-retries" value. It must be -1 for infinite retries or >= 0`),
},
{
name: "error recovery: with 0 max-retries ",
setupConfig: func(c Config) Config {
c.Pipelines.ErrorRecovery.MaxRetries = 0
return c
},
want: nil,
},
{
name: "error recovery: negative healthy-after",
Expand Down
Loading