From 839182d9726fd45db80352b25d469fb1a218d18d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ra=C3=BAl=20Barroso?= Date: Thu, 3 Oct 2024 13:12:40 +0200 Subject: [PATCH 1/4] removes restart and uses start --- pkg/lifecycle/service.go | 71 +++++++++++++++++------------------ pkg/lifecycle/service_test.go | 3 ++ 2 files changed, 38 insertions(+), 36 deletions(-) diff --git a/pkg/lifecycle/service.go b/pkg/lifecycle/service.go index 9dcc34a38..d042ba9e8 100644 --- a/pkg/lifecycle/service.go +++ b/pkg/lifecycle/service.go @@ -57,6 +57,15 @@ type ErrRecoveryCfg struct { HealthyAfter time.Duration } +func (e *ErrRecoveryCfg) toBackoff() *backoff.Backoff { + return &backoff.Backoff{ + Min: e.MinDelay, + Max: e.MaxDelay, + Factor: float64(e.BackoffFactor), + Jitter: true, + } +} + // Service manages pipelines. type Service struct { logger log.CtxLogger @@ -168,10 +177,25 @@ func (s *Service) Start( } s.logger.Debug(ctx).Str(log.PipelineIDField, pl.ID).Msg("starting pipeline") - s.logger.Trace(ctx).Str(log.PipelineIDField, pl.ID).Msg("building nodes") - rp, err := s.buildRunnablePipeline(ctx, pl) + var backoffCfg *backoff.Backoff + + // We check if the pipeline was previously running and get the backoff configuration from it. + oldRp, ok := s.runningPipelines.Get(pipelineID) + if !ok { + // default backoff configuration + backoffCfg = &backoff.Backoff{ + Min: s.errRecoveryCfg.MinDelay, + Max: s.errRecoveryCfg.MaxDelay, + Factor: float64(s.errRecoveryCfg.BackoffFactor), + Jitter: true, + } + } else { + backoffCfg = &oldRp.backoffCfg + } + + rp, err := s.buildRunnablePipeline(ctx, pl, backoffCfg) if err != nil { return cerrors.Errorf("could not build nodes for pipeline %s: %w", pl.ID, err) } @@ -187,32 +211,10 @@ func (s *Service) Start( return nil } -// Restart replaces the nodes of a pipeline and starts it again. -func (s *Service) Restart(ctx context.Context, rp *runnablePipeline) error { - s.logger.Debug(ctx).Str(log.PipelineIDField, rp.pipeline.ID).Msg("restarting pipeline") - s.logger.Trace(ctx).Str(log.PipelineIDField, rp.pipeline.ID).Msg("swapping nodes") - - nodes, err := s.buildNodes(ctx, rp.pipeline) - if err != nil { - return cerrors.Errorf("could not build new nodes for pipeline %s: %w", rp.pipeline.ID, err) - } - - // Replaces the old nodes with the new ones. - rp.n = nodes - - s.logger.Trace(ctx).Str(log.PipelineIDField, rp.pipeline.ID).Msg("running nodes") - if err := s.runPipeline(ctx, rp); err != nil { - return cerrors.Errorf("failed to run pipeline %s: %w", rp.pipeline.ID, err) - } - s.logger.Info(ctx).Str(log.PipelineIDField, rp.pipeline.ID).Msg("pipeline restarted ") - - return nil -} - -// RestartWithBackoff restarts a pipeline with a backoff. +// StartWithBackoff starts a pipeline with a backoff. // It'll check the number of times the pipeline has been restarted and the duration of the backoff. // When the pipeline has reached out the maximum number of retries, it'll return a fatal error. -func (s *Service) RestartWithBackoff(ctx context.Context, rp *runnablePipeline) error { +func (s *Service) StartWithBackoff(ctx context.Context, rp *runnablePipeline) error { s.logger.Trace(ctx).Str(log.PipelineIDField, rp.pipeline.ID).Msg("restarting with backoff") attempt := int64(rp.backoffCfg.Attempt()) @@ -244,7 +246,7 @@ func (s *Service) RestartWithBackoff(ctx context.Context, rp *runnablePipeline) return nil } - return s.Restart(ctx, rp) + return s.Start(ctx, rp.pipeline.ID) } // Stop will attempt to gracefully stop a given pipeline by calling each node's @@ -430,15 +432,8 @@ func (s *Service) buildNodes(ctx context.Context, pl *pipeline.Instance) ([]stre func (s *Service) buildRunnablePipeline( ctx context.Context, pl *pipeline.Instance, + backoffCfg *backoff.Backoff, ) (*runnablePipeline, error) { - // Each pipeline will utilize this backoff configuration to recover from errors. - backoffCfg := &backoff.Backoff{ - Min: s.errRecoveryCfg.MinDelay, - Max: s.errRecoveryCfg.MaxDelay, - Factor: float64(s.errRecoveryCfg.BackoffFactor), - Jitter: true, - } - nodes, err := s.buildNodes(ctx, pl) if err != nil { return nil, err @@ -756,6 +751,10 @@ func (s *Service) runPipeline(ctx context.Context, rp *runnablePipeline) error { } // TODO: When it's recovering, we should only update the status back to running once HealthyAfter has passed. + // now: + // running -> (error) -> recovering (restart) -> running + // future (with the HealthyAfter mechanism): + // running -> (error) -> recovering (restart) -> recovering (wait for HealthyAfter) -> running err := s.pipelines.UpdateStatus(ctx, rp.pipeline.ID, pipeline.StatusRunning, "") if err != nil { return err @@ -825,7 +824,7 @@ func (s *Service) recoverPipeline(ctx context.Context, rp *runnablePipeline) err } // Exit the goroutine and attempt to restart the pipeline - return s.RestartWithBackoff(ctx, rp) + return s.StartWithBackoff(ctx, rp) } // notify notifies all registered FailureHandlers about an error. diff --git a/pkg/lifecycle/service_test.go b/pkg/lifecycle/service_test.go index a9953bc82..9e31f699a 100644 --- a/pkg/lifecycle/service_test.go +++ b/pkg/lifecycle/service_test.go @@ -88,6 +88,7 @@ func TestServiceLifecycle_buildRunnablePipeline(t *testing.T) { got, err := ls.buildRunnablePipeline( ctx, pl, + ls.errRecoveryCfg.toBackoff(), ) is.NoErr(err) @@ -173,6 +174,7 @@ func TestService_buildRunnablePipeline_NoSourceNode(t *testing.T) { got, err := ls.buildRunnablePipeline( ctx, pl, + ls.errRecoveryCfg.toBackoff(), ) is.True(err != nil) @@ -220,6 +222,7 @@ func TestService_buildRunnablePipeline_NoDestinationNode(t *testing.T) { got, err := ls.buildRunnablePipeline( ctx, pl, + ls.errRecoveryCfg.toBackoff(), ) is.True(err != nil) From 494f37e2bf6d5197dff3ee868dc703b7ff669eb9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ra=C3=BAl=20Barroso?= Date: Thu, 3 Oct 2024 13:23:16 +0200 Subject: [PATCH 2/4] clearer validation --- pkg/conduit/config.go | 2 +- pkg/conduit/config_test.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/conduit/config.go b/pkg/conduit/config.go index 3e2fab78d..e96b0738f 100644 --- a/pkg/conduit/config.go +++ b/pkg/conduit/config.go @@ -213,7 +213,7 @@ func (c Config) validateErrorRecovery() error { errs = append(errs, err) } if errRecoveryCfg.MaxRetries < lifecycle.InfiniteRetriesErrRecovery { - errs = append(errs, cerrors.Errorf(`"max-retries" can't be smaller than %d (infinite retries)`, lifecycle.InfiniteRetriesErrRecovery)) + errs = append(errs, cerrors.Errorf(`invalid "max-retries" value. It must be %d for infinite retries or >= 0`, lifecycle.InfiniteRetriesErrRecovery)) } if err := requirePositiveValue("healthy-after", errRecoveryCfg.HealthyAfter); err != nil { errs = append(errs, err) diff --git a/pkg/conduit/config_test.go b/pkg/conduit/config_test.go index da836b475..046551e46 100644 --- a/pkg/conduit/config_test.go +++ b/pkg/conduit/config_test.go @@ -202,7 +202,7 @@ func TestConfig_Validate(t *testing.T) { c.Pipelines.ErrorRecovery.MaxRetries = lifecycle.InfiniteRetriesErrRecovery - 1 return c }, - want: cerrors.New(`invalid error recovery config: "max-retries" can't be smaller than -1 (infinite retries)`), + want: cerrors.New(`invalid error recovery config: invalid "max-retries" value. It must be -1 for infinite retries or >= 0`), }, { name: "error recovery: with 0 max-retries ", From bded42eecd1b99de022a6b77e922cf95ef883f25 Mon Sep 17 00:00:00 2001 From: Haris Osmanagic Date: Thu, 3 Oct 2024 14:32:53 +0200 Subject: [PATCH 3/4] handle recovery --- pkg/lifecycle/service.go | 29 ++++++++++++++++++++--------- 1 file changed, 20 insertions(+), 9 deletions(-) diff --git a/pkg/lifecycle/service.go b/pkg/lifecycle/service.go index d042ba9e8..d01bddd91 100644 --- a/pkg/lifecycle/service.go +++ b/pkg/lifecycle/service.go @@ -773,16 +773,17 @@ func (s *Service) runPipeline(ctx context.Context, rp *runnablePipeline) error { switch err { case tomb.ErrStillAlive: // not an actual error, the pipeline stopped gracefully + err = nil + var status pipeline.Status if isGracefulShutdown.Load() { // it was triggered by a graceful shutdown of Conduit - if err := s.pipelines.UpdateStatus(ctx, rp.pipeline.ID, pipeline.StatusSystemStopped, ""); err != nil { - return err - } + status = pipeline.StatusSystemStopped } else { // it was manually triggered by a user - if err := s.pipelines.UpdateStatus(ctx, rp.pipeline.ID, pipeline.StatusUserStopped, ""); err != nil { - return err - } + status = pipeline.StatusUserStopped + } + if err := s.pipelines.UpdateStatus(ctx, rp.pipeline.ID, status, ""); err != nil { + return err } default: if cerrors.IsFatalError(err) { @@ -792,11 +793,21 @@ func (s *Service) runPipeline(ctx context.Context, rp *runnablePipeline) error { } } else { // try to recover the pipeline - if err := s.recoverPipeline(ctx, rp); err != nil { - if err := s.pipelines.UpdateStatus(ctx, rp.pipeline.ID, pipeline.StatusDegraded, fmt.Sprintf("%+v", err)); err != nil { - return err + if recoveryErr := s.recoverPipeline(ctx, rp); recoveryErr != nil { + s.logger. + Err(ctx, err). + Str(log.PipelineIDField, rp.pipeline.ID). + Msg("pipeline recovery failed stopped") + + if updateErr := s.pipelines.UpdateStatus(ctx, rp.pipeline.ID, pipeline.StatusDegraded, fmt.Sprintf("%+v", recoveryErr)); updateErr != nil { + return updateErr } } + // recovery was triggered, so no cleanup + // (remove running pipeline, notify failure handlers, etc.) + // is needed + // this is why we return nil to skip the cleanup below. + return nil } } From 5443f18034a204400378f5b6c60b4f1a7c1fecf4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Haris=20Osmanagi=C4=87?= Date: Thu, 3 Oct 2024 14:57:54 +0200 Subject: [PATCH 4/4] Pipeline recovery: test cases (#1873) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --------- Co-authored-by: Raúl Barroso --- docs/test-cases/pipeline-recovery.md | 227 ++++++++++++++++++++++++++ docs/test-cases/test-case-template.md | 27 +++ 2 files changed, 254 insertions(+) create mode 100644 docs/test-cases/pipeline-recovery.md create mode 100644 docs/test-cases/test-case-template.md diff --git a/docs/test-cases/pipeline-recovery.md b/docs/test-cases/pipeline-recovery.md new file mode 100644 index 000000000..1e9582ead --- /dev/null +++ b/docs/test-cases/pipeline-recovery.md @@ -0,0 +1,227 @@ +# Test case for the pipeline recovery feature + +## Test Case 01: Recovery not triggered for fatal error - DLQ + +**Priority** (low/medium/high): + +**Description**: +Recovery is not triggered when there is an error writing to a DLQ. + +**Automated** (yes/no) + +**Setup**: + +**Pipeline configuration file**: + +```yaml +``` + +**Steps**: + +**Expected Result**: + +**Additional comments**: + +--- + +## Test Case 02: Recovery not triggered for fatal error - processor + +**Priority** (low/medium/high): + +**Description**: +Recovery is not triggered when there is an error processing a record. + +**Automated** (yes/no) + +**Setup**: + +**Pipeline configuration file**: + +```yaml +``` + +**Steps**: + +**Expected Result**: + +**Additional comments**: + +--- + +## Test Case 03: Recovery not triggered - graceful shutdown + +**Priority** (low/medium/high): + +**Description**: +Recovery is not triggered when Conduit is shutting down gracefully (i.e. when +typing Ctrl+C in the terminal where Conduit is running, or sending a SIGINT). + +**Automated** (yes/no) + +**Setup**: + +**Pipeline configuration file**: + +```yaml +``` + +**Steps**: + +**Expected Result**: + +**Additional comments**: + +--- + +## Test Case 04: Recovery not triggered - user stopped pipeline + +**Priority** (low/medium/high): + +**Description**: +Recovery is not triggered if a user stops a pipeline (via the HTTP API's +`/v1/pipelines/pipeline-id/stop` endpoint). + +**Automated** (yes/no) + +**Setup**: + +**Pipeline configuration file**: + +```yaml +``` + +**Steps**: + +**Expected Result**: + +**Additional comments**: + +--- + +## Test Case 05: Recovery is configured by default + +**Priority** (low/medium/high): + +**Description**: +Pipeline recovery is configured by default. A failing pipeline will be restarted +a number of times without any additional configuration. + +**Automated** (yes/no) + +**Setup**: + +**Pipeline configuration file**: + +```yaml +version: "2.2" +pipelines: + - id: chaos-to-log + status: running + name: chaos-to-log + description: chaos source, error on read + connectors: + - id: chaos-source-1 + type: source + plugin: standalone:chaos + name: chaos-source-1 + settings: + readMode: error + - id: destination1 + type: destination + plugin: builtin:log + name: log-destination +``` + +**Steps**: + +**Expected Result**: + +**Additional comments**: + +--- + +## Test Case 06: Recovery not triggered on malformed pipeline + +**Priority** (low/medium/high): + +**Description**: +Recovery is not triggered for a malformed pipeline, e.g. when a connector is +missing. + +**Automated** (yes/no) + +**Setup**: + +**Pipeline configuration file**: + +```yaml +version: "2.2" +pipelines: + - id: nothing-to-log + status: running + name: nothing-to-log + description: no source + connectors: + - id: destination1 + type: destination + plugin: builtin:log + name: log-destination +``` + +**Steps**: + +**Expected Result**: + +**Additional comments**: + +--- + +## Test Case 07: Conduit exits with --pipelines.exit-on-degraded=true and a pipeline failing after recovery + +**Priority** (low/medium/high): + +**Description**: Given a Conduit instance with +`--pipelines.exit-on-degraded=true`, and a pipeline that's failing after the +maximum number of retries configured, Conduit should shut down gracefully. + +**Automated** (yes/no) + +**Setup**: + +**Pipeline configuration file**: + +```yaml +``` + +**Steps**: + +**Expected Result**: + +**Additional comments**: + +--- + +## Test Case 08: Conduit doesn't exit with --pipelines.exit-on-degraded=true and a pipeline that recovers after a few retries + +**Priority** (low/medium/high): + +**Description**: +Given a Conduit instance with `--pipelines.exit-on-degraded=true`, and a +pipeline that recovers after a few retries, Conduit should still be running. + +**Automated** (yes/no) + +**Setup**: + +**Pipeline configuration file**: + +```yaml +``` + +**Steps**: + +**Expected Result**: + +**Additional comments**: + +--- diff --git a/docs/test-cases/test-case-template.md b/docs/test-cases/test-case-template.md new file mode 100644 index 000000000..75c6a086d --- /dev/null +++ b/docs/test-cases/test-case-template.md @@ -0,0 +1,27 @@ + + +## Test Case 01: Test case title + +**Priority** (low/medium/high): + +**Description**: + +**Automated** (yes/no) + +**Setup**: + +**Pipeline configuration file**: + +```yaml +``` + +**Steps**: + +1. Step 1 +2. Step 2 + +**Expected Result**: + +**Additional comments**: + +---