From fdb17d39d9cb170d562bde1bd441197d8b1b3425 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lovro=20Ma=C5=BEgon?= Date: Fri, 4 Oct 2024 18:28:57 +0200 Subject: [PATCH 1/6] use leaky buffer to track recovery attempts --- pkg/lifecycle/service.go | 70 +++++++++++++++++------------------ pkg/lifecycle/service_test.go | 18 ++------- 2 files changed, 36 insertions(+), 52 deletions(-) diff --git a/pkg/lifecycle/service.go b/pkg/lifecycle/service.go index 9e0ef8948..2cc47f2a1 100644 --- a/pkg/lifecycle/service.go +++ b/pkg/lifecycle/service.go @@ -103,10 +103,11 @@ func NewService( } type runnablePipeline struct { - pipeline *pipeline.Instance - n []stream.Node - t *tomb.Tomb - backoffCfg backoff.Backoff + pipeline *pipeline.Instance + n []stream.Node + t *tomb.Tomb + backoff *backoff.Backoff + recoveryAttempts *atomic.Int64 } // ConnectorService can fetch and create a connector instance. @@ -179,22 +180,17 @@ func (s *Service) Start( s.logger.Debug(ctx).Str(log.PipelineIDField, pl.ID).Msg("starting pipeline") s.logger.Trace(ctx).Str(log.PipelineIDField, pl.ID).Msg("building nodes") - var backoffCfg *backoff.Backoff - - // We check if the pipeline was previously running and get the backoff configuration from it. - oldRp, ok := s.runningPipelines.Get(pipelineID) - if !ok { - // default backoff configuration - backoffCfg = s.errRecoveryCfg.toBackoff() - } else { - backoffCfg = &oldRp.backoffCfg - } - - rp, err := s.buildRunnablePipeline(ctx, pl, backoffCfg) + rp, err := s.buildRunnablePipeline(ctx, pl) if err != nil { return cerrors.Errorf("could not build nodes for pipeline %s: %w", pl.ID, err) } + // We check if the pipeline was previously running and get the backoff configuration from it. + if oldRp, ok := s.runningPipelines.Get(pipelineID); ok { + rp.backoff = oldRp.backoff + rp.recoveryAttempts = oldRp.recoveryAttempts + } + s.logger.Trace(ctx).Str(log.PipelineIDField, pl.ID).Msg("running nodes") if err := s.runPipeline(ctx, rp); err != nil { return cerrors.Errorf("failed to run pipeline %s: %w", pl.ID, err) @@ -210,34 +206,34 @@ func (s *Service) Start( // It'll check the number of times the pipeline has been restarted and the duration of the backoff. // When the pipeline has reached out the maximum number of retries, it'll return a fatal error. func (s *Service) StartWithBackoff(ctx context.Context, rp *runnablePipeline) error { - s.logger.Info(ctx).Str(log.PipelineIDField, rp.pipeline.ID).Msg("restarting with backoff") - - attempt := int64(rp.backoffCfg.Attempt()) - duration := rp.backoffCfg.Duration() - - s.logger.Trace(ctx).Dur(log.DurationField, duration).Int64(log.AttemptField, attempt).Msg("backoff configuration") + // Increment number of recovery attempts. + attempt := rp.recoveryAttempts.Add(1) if s.errRecoveryCfg.MaxRetries != InfiniteRetriesErrRecovery && attempt >= s.errRecoveryCfg.MaxRetries { return cerrors.FatalError(cerrors.Errorf("failed to recover pipeline %s after %d attempts: %w", rp.pipeline.ID, attempt, pipeline.ErrPipelineCannotRecover)) } + duration := rp.backoff.ForAttempt(float64(attempt)) + s.logger.Info(ctx). + Str(log.PipelineIDField, rp.pipeline.ID). + Dur(log.DurationField, duration). + Int64(log.AttemptField, attempt). + Msg("restarting with backoff") + + time.AfterFunc(duration+s.errRecoveryCfg.HealthyAfter, func() { + rp.recoveryAttempts.Add(-1) // Decrement the number of attempts after delay. + }) + // This results in a default delay progression of 1s, 2s, 4s, 8s, 16s, [...], 10m, 10m,... balancing the need for recovery time and minimizing downtime. - timer := time.NewTimer(duration) select { case <-ctx.Done(): return ctx.Err() case <-time.After(duration): - <-timer.C } - // Get status of pipeline to check if it already recovered. - if rp.pipeline.GetStatus() == pipeline.StatusRunning { - s.logger.Debug(ctx). - Str(log.PipelineIDField, rp.pipeline.ID). - Int64(log.AttemptField, attempt). - Int("backoffRetry.count", s.errRecoveryCfg.BackoffFactor). - Int64("backoffRetry.duration", duration.Milliseconds()). - Msg("pipeline recovered") + // The user may have stopped or restarted the pipeline while we were waiting. + actualRp, ok := s.runningPipelines.Get(rp.pipeline.ID) + if !ok || actualRp != rp { return nil } @@ -427,7 +423,6 @@ func (s *Service) buildNodes(ctx context.Context, pl *pipeline.Instance) ([]stre func (s *Service) buildRunnablePipeline( ctx context.Context, pl *pipeline.Instance, - backoffCfg *backoff.Backoff, ) (*runnablePipeline, error) { nodes, err := s.buildNodes(ctx, pl) if err != nil { @@ -435,9 +430,10 @@ func (s *Service) buildRunnablePipeline( } return &runnablePipeline{ - pipeline: pl, - n: nodes, - backoffCfg: *backoffCfg, + pipeline: pl, + n: nodes, + backoff: s.errRecoveryCfg.toBackoff(), + recoveryAttempts: &atomic.Int64{}, }, nil } @@ -792,7 +788,7 @@ func (s *Service) runPipeline(ctx context.Context, rp *runnablePipeline) error { s.logger. Err(ctx, err). Str(log.PipelineIDField, rp.pipeline.ID). - Msg("pipeline recovery failed stopped") + Msg("pipeline recovery failed") if updateErr := s.pipelines.UpdateStatus(ctx, rp.pipeline.ID, pipeline.StatusDegraded, fmt.Sprintf("%+v", recoveryErr)); updateErr != nil { return updateErr diff --git a/pkg/lifecycle/service_test.go b/pkg/lifecycle/service_test.go index 9e31f699a..2ba2fe78a 100644 --- a/pkg/lifecycle/service_test.go +++ b/pkg/lifecycle/service_test.go @@ -85,11 +85,7 @@ func TestServiceLifecycle_buildRunnablePipeline(t *testing.T) { testPipelineService{}, ) - got, err := ls.buildRunnablePipeline( - ctx, - pl, - ls.errRecoveryCfg.toBackoff(), - ) + got, err := ls.buildRunnablePipeline(ctx, pl) is.NoErr(err) @@ -171,11 +167,7 @@ func TestService_buildRunnablePipeline_NoSourceNode(t *testing.T) { wantErr := "can't build pipeline without any source connectors" - got, err := ls.buildRunnablePipeline( - ctx, - pl, - ls.errRecoveryCfg.toBackoff(), - ) + got, err := ls.buildRunnablePipeline(ctx, pl) is.True(err != nil) is.Equal(err.Error(), wantErr) @@ -219,11 +211,7 @@ func TestService_buildRunnablePipeline_NoDestinationNode(t *testing.T) { } pl.SetStatus(pipeline.StatusUserStopped) - got, err := ls.buildRunnablePipeline( - ctx, - pl, - ls.errRecoveryCfg.toBackoff(), - ) + got, err := ls.buildRunnablePipeline(ctx, pl) is.True(err != nil) is.Equal(err.Error(), wantErr) From d13604decdd31e707ee05b1f79354422dc595690 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ra=C3=BAl=20Barroso?= Date: Tue, 8 Oct 2024 16:18:36 +0200 Subject: [PATCH 2/6] rename healthy-after --- pkg/conduit/config.go | 8 ++++---- pkg/conduit/config_test.go | 6 +++--- pkg/conduit/entrypoint.go | 6 +++--- pkg/conduit/runtime.go | 2 +- pkg/lifecycle/service.go | 6 +++--- 5 files changed, 14 insertions(+), 14 deletions(-) diff --git a/pkg/conduit/config.go b/pkg/conduit/config.go index e96b0738f..441e3d2c3 100644 --- a/pkg/conduit/config.go +++ b/pkg/conduit/config.go @@ -96,8 +96,8 @@ type Config struct { BackoffFactor int // MaxRetries is the maximum number of restarts before the pipeline is considered unhealthy: Default: -1 (infinite) MaxRetries int64 - // HealthyAfter is the time after which the pipeline is considered healthy: Default: 5 minutes - HealthyAfter time.Duration + // MaxRetriesDuration is the time window in which the max retries are counted: Default: 5 minutes + MaxRetriesDuration time.Duration } } @@ -144,7 +144,7 @@ func DefaultConfig() Config { cfg.Pipelines.ErrorRecovery.MaxDelay = 10 * time.Minute cfg.Pipelines.ErrorRecovery.BackoffFactor = 2 cfg.Pipelines.ErrorRecovery.MaxRetries = lifecycle.InfiniteRetriesErrRecovery - cfg.Pipelines.ErrorRecovery.HealthyAfter = 5 * time.Minute + cfg.Pipelines.ErrorRecovery.MaxRetriesDuration = 5 * time.Minute cfg.SchemaRegistry.Type = SchemaRegistryTypeBuiltin @@ -215,7 +215,7 @@ func (c Config) validateErrorRecovery() error { if errRecoveryCfg.MaxRetries < lifecycle.InfiniteRetriesErrRecovery { errs = append(errs, cerrors.Errorf(`invalid "max-retries" value. It must be %d for infinite retries or >= 0`, lifecycle.InfiniteRetriesErrRecovery)) } - if err := requirePositiveValue("healthy-after", errRecoveryCfg.HealthyAfter); err != nil { + if err := requirePositiveValue("max-retries-duration", errRecoveryCfg.MaxRetriesDuration); err != nil { errs = append(errs, err) } diff --git a/pkg/conduit/config_test.go b/pkg/conduit/config_test.go index 046551e46..edcd8796d 100644 --- a/pkg/conduit/config_test.go +++ b/pkg/conduit/config_test.go @@ -213,12 +213,12 @@ func TestConfig_Validate(t *testing.T) { want: nil, }, { - name: "error recovery: negative healthy-after", + name: "error recovery: negative max-retries-duration", setupConfig: func(c Config) Config { - c.Pipelines.ErrorRecovery.HealthyAfter = -time.Second + c.Pipelines.ErrorRecovery.MaxRetriesDuration = -time.Second return c }, - want: cerrors.New(`invalid error recovery config: "healthy-after" config value must be positive (got: -1s)`), + want: cerrors.New(`invalid error recovery config: "max-retries-duration" config value must be positive (got: -1s)`), }, } diff --git a/pkg/conduit/entrypoint.go b/pkg/conduit/entrypoint.go index 1cd74aceb..b7120998f 100644 --- a/pkg/conduit/entrypoint.go +++ b/pkg/conduit/entrypoint.go @@ -147,9 +147,9 @@ func (*Entrypoint) Flags(cfg *Config) *flag.FlagSet { "maximum number of retries", ) flags.DurationVar( - &cfg.Pipelines.ErrorRecovery.HealthyAfter, - "pipelines.error-recovery.healthy-after", - cfg.Pipelines.ErrorRecovery.HealthyAfter, + &cfg.Pipelines.ErrorRecovery.MaxRetriesDuration, + "pipelines.error-recovery.max-retries-duration", + cfg.Pipelines.ErrorRecovery.MaxRetriesDuration, "amount of time running without any errors after which a pipeline is considered healthy", ) diff --git a/pkg/conduit/runtime.go b/pkg/conduit/runtime.go index 0de2276c9..14dcb491d 100644 --- a/pkg/conduit/runtime.go +++ b/pkg/conduit/runtime.go @@ -209,7 +209,7 @@ func createServices(r *Runtime) error { MaxDelay: r.Config.Pipelines.ErrorRecovery.MaxDelay, BackoffFactor: r.Config.Pipelines.ErrorRecovery.BackoffFactor, MaxRetries: r.Config.Pipelines.ErrorRecovery.MaxRetries, - HealthyAfter: r.Config.Pipelines.ErrorRecovery.HealthyAfter, // TODO: possibly create a go routine to continuously check health and reset status when needed + HealthyAfter: r.Config.Pipelines.ErrorRecovery.MaxRetriesDuration, } plService := pipeline.NewService(r.logger, r.DB) diff --git a/pkg/lifecycle/service.go b/pkg/lifecycle/service.go index 2cc47f2a1..99bb3340b 100644 --- a/pkg/lifecycle/service.go +++ b/pkg/lifecycle/service.go @@ -741,11 +741,11 @@ func (s *Service) runPipeline(ctx context.Context, rp *runnablePipeline) error { }) } - // TODO: When it's recovering, we should only update the status back to running once HealthyAfter has passed. + // TODO: When it's recovering, we should only update the status back to running once MaxRetriesDuration has passed. // now: // running -> (error) -> recovering (restart) -> running - // future (with the HealthyAfter mechanism): - // running -> (error) -> recovering (restart) -> recovering (wait for HealthyAfter) -> running + // future (with the MaxRetriesDuration mechanism): + // running -> (error) -> recovering (restart) -> recovering (wait for MaxRetriesDuration) -> running err := s.pipelines.UpdateStatus(ctx, rp.pipeline.ID, pipeline.StatusRunning, "") if err != nil { return err From 0d20a27992de2a07e7448b81882a05dc960f68c2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ra=C3=BAl=20Barroso?= Date: Tue, 8 Oct 2024 17:13:01 +0200 Subject: [PATCH 3/6] rename config to max-retries-window --- pkg/conduit/config.go | 8 ++++---- pkg/conduit/config_test.go | 6 +++--- pkg/conduit/entrypoint.go | 6 +++--- pkg/conduit/runtime.go | 10 +++++----- pkg/lifecycle/service.go | 20 ++++++++++---------- pkg/lifecycle/service_test.go | 10 +++++----- pkg/orchestrator/orchestrator_test.go | 10 +++++----- pkg/provisioning/service_test.go | 10 +++++----- 8 files changed, 40 insertions(+), 40 deletions(-) diff --git a/pkg/conduit/config.go b/pkg/conduit/config.go index 441e3d2c3..6d15cffd0 100644 --- a/pkg/conduit/config.go +++ b/pkg/conduit/config.go @@ -96,8 +96,8 @@ type Config struct { BackoffFactor int // MaxRetries is the maximum number of restarts before the pipeline is considered unhealthy: Default: -1 (infinite) MaxRetries int64 - // MaxRetriesDuration is the time window in which the max retries are counted: Default: 5 minutes - MaxRetriesDuration time.Duration + // MaxRetriesWindow is the duration window in which the max retries are counted: Default: 5 minutes + MaxRetriesWindow time.Duration } } @@ -144,7 +144,7 @@ func DefaultConfig() Config { cfg.Pipelines.ErrorRecovery.MaxDelay = 10 * time.Minute cfg.Pipelines.ErrorRecovery.BackoffFactor = 2 cfg.Pipelines.ErrorRecovery.MaxRetries = lifecycle.InfiniteRetriesErrRecovery - cfg.Pipelines.ErrorRecovery.MaxRetriesDuration = 5 * time.Minute + cfg.Pipelines.ErrorRecovery.MaxRetriesWindow = 5 * time.Minute cfg.SchemaRegistry.Type = SchemaRegistryTypeBuiltin @@ -215,7 +215,7 @@ func (c Config) validateErrorRecovery() error { if errRecoveryCfg.MaxRetries < lifecycle.InfiniteRetriesErrRecovery { errs = append(errs, cerrors.Errorf(`invalid "max-retries" value. It must be %d for infinite retries or >= 0`, lifecycle.InfiniteRetriesErrRecovery)) } - if err := requirePositiveValue("max-retries-duration", errRecoveryCfg.MaxRetriesDuration); err != nil { + if err := requirePositiveValue("max-retries-window", errRecoveryCfg.MaxRetriesWindow); err != nil { errs = append(errs, err) } diff --git a/pkg/conduit/config_test.go b/pkg/conduit/config_test.go index edcd8796d..b1026a26b 100644 --- a/pkg/conduit/config_test.go +++ b/pkg/conduit/config_test.go @@ -213,12 +213,12 @@ func TestConfig_Validate(t *testing.T) { want: nil, }, { - name: "error recovery: negative max-retries-duration", + name: "error recovery: negative max-retries-window", setupConfig: func(c Config) Config { - c.Pipelines.ErrorRecovery.MaxRetriesDuration = -time.Second + c.Pipelines.ErrorRecovery.MaxRetriesWindow = -time.Second return c }, - want: cerrors.New(`invalid error recovery config: "max-retries-duration" config value must be positive (got: -1s)`), + want: cerrors.New(`invalid error recovery config: "max-retries-window" config value must be positive (got: -1s)`), }, } diff --git a/pkg/conduit/entrypoint.go b/pkg/conduit/entrypoint.go index b7120998f..e175b3374 100644 --- a/pkg/conduit/entrypoint.go +++ b/pkg/conduit/entrypoint.go @@ -147,9 +147,9 @@ func (*Entrypoint) Flags(cfg *Config) *flag.FlagSet { "maximum number of retries", ) flags.DurationVar( - &cfg.Pipelines.ErrorRecovery.MaxRetriesDuration, - "pipelines.error-recovery.max-retries-duration", - cfg.Pipelines.ErrorRecovery.MaxRetriesDuration, + &cfg.Pipelines.ErrorRecovery.MaxRetriesWindow, + "pipelines.error-recovery.max-retries-window", + cfg.Pipelines.ErrorRecovery.MaxRetriesWindow, "amount of time running without any errors after which a pipeline is considered healthy", ) diff --git a/pkg/conduit/runtime.go b/pkg/conduit/runtime.go index 14dcb491d..f1fcc7de6 100644 --- a/pkg/conduit/runtime.go +++ b/pkg/conduit/runtime.go @@ -205,11 +205,11 @@ func createServices(r *Runtime) error { // Error recovery configuration errRecoveryCfg := &lifecycle.ErrRecoveryCfg{ - MinDelay: r.Config.Pipelines.ErrorRecovery.MinDelay, - MaxDelay: r.Config.Pipelines.ErrorRecovery.MaxDelay, - BackoffFactor: r.Config.Pipelines.ErrorRecovery.BackoffFactor, - MaxRetries: r.Config.Pipelines.ErrorRecovery.MaxRetries, - HealthyAfter: r.Config.Pipelines.ErrorRecovery.MaxRetriesDuration, + MinDelay: r.Config.Pipelines.ErrorRecovery.MinDelay, + MaxDelay: r.Config.Pipelines.ErrorRecovery.MaxDelay, + BackoffFactor: r.Config.Pipelines.ErrorRecovery.BackoffFactor, + MaxRetries: r.Config.Pipelines.ErrorRecovery.MaxRetries, + MaxRetriesWindow: r.Config.Pipelines.ErrorRecovery.MaxRetriesWindow, } plService := pipeline.NewService(r.logger, r.DB) diff --git a/pkg/lifecycle/service.go b/pkg/lifecycle/service.go index 99bb3340b..ceafa1679 100644 --- a/pkg/lifecycle/service.go +++ b/pkg/lifecycle/service.go @@ -50,11 +50,11 @@ type FailureEvent struct { type FailureHandler func(FailureEvent) type ErrRecoveryCfg struct { - MinDelay time.Duration - MaxDelay time.Duration - BackoffFactor int - MaxRetries int64 - HealthyAfter time.Duration + MinDelay time.Duration + MaxDelay time.Duration + BackoffFactor int + MaxRetries int64 + MaxRetriesWindow time.Duration } func (e *ErrRecoveryCfg) toBackoff() *backoff.Backoff { @@ -209,7 +209,7 @@ func (s *Service) StartWithBackoff(ctx context.Context, rp *runnablePipeline) er // Increment number of recovery attempts. attempt := rp.recoveryAttempts.Add(1) - if s.errRecoveryCfg.MaxRetries != InfiniteRetriesErrRecovery && attempt >= s.errRecoveryCfg.MaxRetries { + if s.errRecoveryCfg.MaxRetries != InfiniteRetriesErrRecovery && attempt > s.errRecoveryCfg.MaxRetries { return cerrors.FatalError(cerrors.Errorf("failed to recover pipeline %s after %d attempts: %w", rp.pipeline.ID, attempt, pipeline.ErrPipelineCannotRecover)) } @@ -220,7 +220,7 @@ func (s *Service) StartWithBackoff(ctx context.Context, rp *runnablePipeline) er Int64(log.AttemptField, attempt). Msg("restarting with backoff") - time.AfterFunc(duration+s.errRecoveryCfg.HealthyAfter, func() { + time.AfterFunc(duration+s.errRecoveryCfg.MaxRetriesWindow, func() { rp.recoveryAttempts.Add(-1) // Decrement the number of attempts after delay. }) @@ -741,11 +741,11 @@ func (s *Service) runPipeline(ctx context.Context, rp *runnablePipeline) error { }) } - // TODO: When it's recovering, we should only update the status back to running once MaxRetriesDuration has passed. + // TODO: When it's recovering, we should only update the status back to running once MaxRetriesWindow has passed. // now: // running -> (error) -> recovering (restart) -> running - // future (with the MaxRetriesDuration mechanism): - // running -> (error) -> recovering (restart) -> recovering (wait for MaxRetriesDuration) -> running + // future (with the MaxRetriesWindow mechanism): + // running -> (error) -> recovering (restart) -> recovering (wait for MaxRetriesWindow) -> running err := s.pipelines.UpdateStatus(ctx, rp.pipeline.ID, pipeline.StatusRunning, "") if err != nil { return err diff --git a/pkg/lifecycle/service_test.go b/pkg/lifecycle/service_test.go index 2ba2fe78a..16dcb60ab 100644 --- a/pkg/lifecycle/service_test.go +++ b/pkg/lifecycle/service_test.go @@ -917,11 +917,11 @@ func dummyDestination(persister *connector.Persister) *connector.Instance { func testErrRecoveryCfg() *ErrRecoveryCfg { return &ErrRecoveryCfg{ - MinDelay: time.Second, - MaxDelay: 10 * time.Minute, - BackoffFactor: 2, - MaxRetries: InfiniteRetriesErrRecovery, - HealthyAfter: 5 * time.Minute, + MinDelay: time.Second, + MaxDelay: 10 * time.Minute, + BackoffFactor: 2, + MaxRetries: InfiniteRetriesErrRecovery, + MaxRetriesWindow: 5 * time.Minute, } } diff --git a/pkg/orchestrator/orchestrator_test.go b/pkg/orchestrator/orchestrator_test.go index 373689232..727ab58b9 100644 --- a/pkg/orchestrator/orchestrator_test.go +++ b/pkg/orchestrator/orchestrator_test.go @@ -97,11 +97,11 @@ func TestPipelineSimple(t *testing.T) { pipelineService := pipeline.NewService(logger, db) errRecoveryCfg := &lifecycle.ErrRecoveryCfg{ - MinDelay: time.Second, - MaxDelay: 10 * time.Minute, - BackoffFactor: 2, - MaxRetries: 0, - HealthyAfter: 5 * time.Minute, + MinDelay: time.Second, + MaxDelay: 10 * time.Minute, + BackoffFactor: 2, + MaxRetries: 0, + MaxRetriesWindow: 5 * time.Minute, } lifecycleService := lifecycle.NewService(logger, errRecoveryCfg, connectorService, processorService, connPluginService, pipelineService) diff --git a/pkg/provisioning/service_test.go b/pkg/provisioning/service_test.go index 4423848c9..b8536bd66 100644 --- a/pkg/provisioning/service_test.go +++ b/pkg/provisioning/service_test.go @@ -520,11 +520,11 @@ func TestService_IntegrationTestServices(t *testing.T) { procService := processor.NewService(logger, db, procPluginService) errRecoveryCfg := &lifecycle.ErrRecoveryCfg{ - MinDelay: time.Second, - MaxDelay: 10 * time.Minute, - BackoffFactor: 2, - MaxRetries: 0, - HealthyAfter: 5 * time.Minute, + MinDelay: time.Second, + MaxDelay: 10 * time.Minute, + BackoffFactor: 2, + MaxRetries: 0, + MaxRetriesWindow: 5 * time.Minute, } lifecycleService := lifecycle.NewService(logger, errRecoveryCfg, connService, procService, connPluginService, plService) From a377ea3dee2efa08c8c06433d68f24e562c6deec Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ra=C3=BAl=20Barroso?= Date: Tue, 8 Oct 2024 18:34:26 +0200 Subject: [PATCH 4/6] document test cases --- docs/test-cases/pipeline-recovery.md | 149 ++++++++++++++++++++++++--- pkg/lifecycle/service.go | 10 +- 2 files changed, 140 insertions(+), 19 deletions(-) diff --git a/docs/test-cases/pipeline-recovery.md b/docs/test-cases/pipeline-recovery.md index 6b4dee553..8f426c26c 100644 --- a/docs/test-cases/pipeline-recovery.md +++ b/docs/test-cases/pipeline-recovery.md @@ -19,28 +19,24 @@ a retry. ```yaml version: "2.2" pipelines: - - id: file-pipeline + - id: chaos-to-log status: running - name: file-pipeline - description: dlq write error + description: Postgres source, file destination + dead-letter-queue: + plugin: standalone:chaos + settings: + writeMode: error connectors: - - id: chaos-src + - id: chaos type: source plugin: standalone:chaos - name: chaos-src + name: source settings: readMode: error - - id: log-dst + - id: destination type: destination plugin: builtin:log - log: file-dst - dead-letter-queue: - plugin: "builtin:postgres" - settings: - table: non_existing_table_so_that_dlq_fails - url: postgresql://meroxauser:meroxapass@localhost/meroxadb?sslmode=disable - window-size: 3 - window-nack-threshold: 2 + name: destination ``` **Steps**: @@ -65,6 +61,28 @@ Recovery is not triggered when there is an error processing a record. **Pipeline configuration file**: ```yaml +version: "2.2" +pipelines: + - id: generator-to-log + status: running + description: Postgres source, file destination + connectors: + - id: generator + type: source + plugin: builtin:generator + name: source + settings: + format.type: structured + format.options.id: int + format.options.name: string + operations: create + - id: destination + type: destination + plugin: builtin:log + name: destination + processors: + - id: error + plugin: "error" ``` **Steps**: @@ -328,3 +346,106 @@ pipelines: **Additional comments**: --- + +## Test Case 11: Recovery triggered during a specific max-retries-window, after that pipeline is degraded + +**Priority** (low/medium/high): + +**Description**: + +A pipeline will be allowed to fail during a specific time window, after that it will be degraded. +Combining `max-retries` and `max-retries-window` we can control how many times a pipeline can fail during a specific time window. + +**Automated** (yes/no) + +**Setup**: + +**Pipeline configuration file**: + +```yaml +version: "2.2" +pipelines: + - id: generator-to-log + status: running + description: Postgres source, file destination + connectors: + - id: postgres-source + type: source + plugin: builtin:postgres + name: source + settings: + cdcMode: logrepl + snapshotMode: never + table: employees + url: postgresql://meroxauser:meroxapass@localhost/meroxadb?sslmode=disable + - id: destination + type: destination + plugin: builtin:log + name: destination +``` + +**Steps**: + +1. Run conduit with `--pipelines.error-recovery.backoff-factor 1 --pipelines.error-recovery.min-delay 10s --pipelines.error-recovery.max-retries 2 --pipelines.error-recovery.max-retries-window 25s` +2. Stop postgres database +3. Leave it stopped and notice pipeline goes to degraded on attempt 3 (after ~20 seconds) + +**Expected Result**: + +After 20 seconds the pipeline should be degraded. + +**Additional comments**: + +--- + +## Test Case 12: Recovery triggered during a specific max-retries-window, pipeline is resilient during a specific time window + +**Priority** (low/medium/high): + +**Description**: + +**Automated** (yes/no) + +**Setup**: + +**Pipeline configuration file**: + +```yaml +version: "2.2" +pipelines: + - id: generator-to-log + status: running + description: Postgres source, file destination + connectors: + - id: postgres-source + type: source + plugin: builtin:postgres + name: source + settings: + cdcMode: logrepl + snapshotMode: never + table: employees + url: postgresql://meroxauser:meroxapass@localhost/meroxadb?sslmode=disable + - id: destination + type: destination + plugin: builtin:log + name: destination +``` + +**Steps**: + +1. Run conduit with `--pipelines.error-recovery.backoff-factor 1 --pipelines.error-recovery.min-delay 10s --pipelines.error-recovery.max-retries 2 --pipelines.error-recovery.max-retries-window 25s` +2. Stop postgres database +3. Leave it stopped until backoff attempts are 2 +4. Start postgres database again +5. Leave it running for another 15 seconds +6. Notice backoff attempts are going back to 1 + (repeat if needed to see how backoff attempts are increasing and decreasing) + +**Expected Result**: + +Pipeline should be able to recover. + +**Additional comments**: + +--- diff --git a/pkg/lifecycle/service.go b/pkg/lifecycle/service.go index ceafa1679..6033195a3 100644 --- a/pkg/lifecycle/service.go +++ b/pkg/lifecycle/service.go @@ -221,6 +221,11 @@ func (s *Service) StartWithBackoff(ctx context.Context, rp *runnablePipeline) er Msg("restarting with backoff") time.AfterFunc(duration+s.errRecoveryCfg.MaxRetriesWindow, func() { + s.logger.Info(ctx). + Str(log.PipelineIDField, rp.pipeline.ID). + Dur(log.DurationField, duration). + Int64(log.AttemptField, attempt). + Msg("decreasing recovery attempts") rp.recoveryAttempts.Add(-1) // Decrement the number of attempts after delay. }) @@ -741,11 +746,6 @@ func (s *Service) runPipeline(ctx context.Context, rp *runnablePipeline) error { }) } - // TODO: When it's recovering, we should only update the status back to running once MaxRetriesWindow has passed. - // now: - // running -> (error) -> recovering (restart) -> running - // future (with the MaxRetriesWindow mechanism): - // running -> (error) -> recovering (restart) -> recovering (wait for MaxRetriesWindow) -> running err := s.pipelines.UpdateStatus(ctx, rp.pipeline.ID, pipeline.StatusRunning, "") if err != nil { return err From 3db9cc63e51f0317bdb7c8d8bb74afbbec2be3c4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ra=C3=BAl=20Barroso?= Date: Tue, 8 Oct 2024 18:38:25 +0200 Subject: [PATCH 5/6] fix md linting --- docs/test-cases/pipeline-recovery.md | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/docs/test-cases/pipeline-recovery.md b/docs/test-cases/pipeline-recovery.md index 8f426c26c..30871d86f 100644 --- a/docs/test-cases/pipeline-recovery.md +++ b/docs/test-cases/pipeline-recovery.md @@ -386,9 +386,9 @@ pipelines: **Steps**: -1. Run conduit with `--pipelines.error-recovery.backoff-factor 1 --pipelines.error-recovery.min-delay 10s --pipelines.error-recovery.max-retries 2 --pipelines.error-recovery.max-retries-window 25s` +1. Run Conduit with `--pipelines.error-recovery.backoff-factor 1 --pipelines.error-recovery.min-delay 10s --pipelines.error-recovery.max-retries 2 --pipelines.error-recovery.max-retries-window 25s` 2. Stop postgres database -3. Leave it stopped and notice pipeline goes to degraded on attempt 3 (after ~20 seconds) +3. Leave it stopped and notice pipeline goes to degraded on attempt 3 (after ~20 seconds) **Expected Result**: @@ -434,7 +434,7 @@ pipelines: **Steps**: -1. Run conduit with `--pipelines.error-recovery.backoff-factor 1 --pipelines.error-recovery.min-delay 10s --pipelines.error-recovery.max-retries 2 --pipelines.error-recovery.max-retries-window 25s` +1. Run Conduit with `--pipelines.error-recovery.backoff-factor 1 --pipelines.error-recovery.min-delay 10s --pipelines.error-recovery.max-retries 2 --pipelines.error-recovery.max-retries-window 25s` 2. Stop postgres database 3. Leave it stopped until backoff attempts are 2 4. Start postgres database again From 67a6d71c1c9a381cb599f4f2249e1c47905d9451 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ra=C3=BAl=20Barroso?= Date: Tue, 8 Oct 2024 19:47:50 +0200 Subject: [PATCH 6/6] change log level --- pkg/lifecycle/service.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/lifecycle/service.go b/pkg/lifecycle/service.go index 6033195a3..056fba7cf 100644 --- a/pkg/lifecycle/service.go +++ b/pkg/lifecycle/service.go @@ -221,7 +221,7 @@ func (s *Service) StartWithBackoff(ctx context.Context, rp *runnablePipeline) er Msg("restarting with backoff") time.AfterFunc(duration+s.errRecoveryCfg.MaxRetriesWindow, func() { - s.logger.Info(ctx). + s.logger.Debug(ctx). Str(log.PipelineIDField, rp.pipeline.ID). Dur(log.DurationField, duration). Int64(log.AttemptField, attempt).