diff --git a/docs/test-cases/pipeline-recovery.md b/docs/test-cases/pipeline-recovery.md new file mode 100644 index 000000000..6b4dee553 --- /dev/null +++ b/docs/test-cases/pipeline-recovery.md @@ -0,0 +1,330 @@ + +# Test case for the pipeline recovery feature + +## Test Case 01: Recovery triggered for on a DLQ write error + +**Priority** (low/medium/high): + +**Description**: +Recovery is triggered when there is an error writing to a DLQ. As for a normal +destination, a DLQ write error can be a temporary error that can be solved after +a retry. + +**Automated** (yes/no) + +**Setup**: + +**Pipeline configuration file**: + +```yaml +version: "2.2" +pipelines: + - id: file-pipeline + status: running + name: file-pipeline + description: dlq write error + connectors: + - id: chaos-src + type: source + plugin: standalone:chaos + name: chaos-src + settings: + readMode: error + - id: log-dst + type: destination + plugin: builtin:log + log: file-dst + dead-letter-queue: + plugin: "builtin:postgres" + settings: + table: non_existing_table_so_that_dlq_fails + url: postgresql://meroxauser:meroxapass@localhost/meroxadb?sslmode=disable + window-size: 3 + window-nack-threshold: 2 +``` + +**Steps**: + +**Expected Result**: + +**Additional comments**: + +--- + +## Test Case 02: Recovery not triggered for fatal error - processor + +**Priority** (low/medium/high): + +**Description**: +Recovery is not triggered when there is an error processing a record. + +**Automated** (yes/no) + +**Setup**: + +**Pipeline configuration file**: + +```yaml +``` + +**Steps**: + +**Expected Result**: + +**Additional comments**: + +--- + +## Test Case 03: Recovery not triggered - graceful shutdown + +**Priority** (low/medium/high): + +**Description**: +Recovery is not triggered when Conduit is shutting down gracefully (i.e. when +typing Ctrl+C in the terminal where Conduit is running, or sending a SIGINT). + +**Automated** (yes/no) + +**Setup**: + +**Pipeline configuration file**: + +```yaml +``` + +**Steps**: + +**Expected Result**: + +**Additional comments**: + +--- + +## Test Case 04: Recovery not triggered - user stopped pipeline + +**Priority** (low/medium/high): + +**Description**: +Recovery is not triggered if a user stops a pipeline (via the HTTP API's +`/v1/pipelines/pipeline-id/stop` endpoint). + +**Automated** (yes/no) + +**Setup**: + +**Pipeline configuration file**: + +```yaml +``` + +**Steps**: + +**Expected Result**: + +**Additional comments**: + +--- + +## Test Case 05: Recovery is configured by default + +**Priority** (low/medium/high): + +**Description**: +Pipeline recovery is configured by default. A failing pipeline will be restarted +a number of times without any additional configuration. + +**Automated** (yes/no) + +**Setup**: + +**Pipeline configuration file**: + +```yaml +version: "2.2" +pipelines: + - id: chaos-to-log + status: running + name: chaos-to-log + description: chaos source, error on read + connectors: + - id: chaos-source-1 + type: source + plugin: standalone:chaos + name: chaos-source-1 + settings: + readMode: error + - id: destination1 + type: destination + plugin: builtin:log + name: log-destination +``` + +**Steps**: + +**Expected Result**: + +**Additional comments**: + +--- + +## Test Case 06: Recovery not triggered on malformed pipeline + +**Priority** (low/medium/high): + +**Description**: +Recovery is not triggered for a malformed pipeline, e.g. when a connector is +missing. + +**Automated** (yes/no) + +**Setup**: + +**Pipeline configuration file**: + +```yaml +version: "2.2" +pipelines: + - id: nothing-to-log + status: running + name: nothing-to-log + description: no source + connectors: + - id: destination1 + type: destination + plugin: builtin:log + name: log-destination +``` + +**Steps**: + +**Expected Result**: + +**Additional comments**: + +--- + +## Test Case 07: Conduit exits with --pipelines.exit-on-degraded=true and a pipeline failing after recovery + +**Priority** (low/medium/high): + +**Description**: Given a Conduit instance with +`--pipelines.exit-on-degraded=true`, and a pipeline that's failing after the +maximum number of retries configured, Conduit should shut down gracefully. + +**Automated** (yes/no) + +**Setup**: + +**Pipeline configuration file**: + +```yaml +``` + +**Steps**: + +**Expected Result**: + +**Additional comments**: + +--- + +## Test Case 08: Conduit doesn't exit with --pipelines.exit-on-degraded=true and a pipeline that recovers after a few retries + +**Priority** (low/medium/high): + +**Description**: +Given a Conduit instance with `--pipelines.exit-on-degraded=true`, and a +pipeline that recovers after a few retries, Conduit should still be running. + +**Automated** (yes/no) + +**Setup**: + +**Pipeline configuration file**: + +```yaml +``` + +**Steps**: + +**Expected Result**: + +**Additional comments**: + +--- + +## Test Case 09: Conduit exits with --pipelines.exit-on-degraded=true, --pipelines.error-recovery.max-retries=0, and a degraded pipeline + +**Priority** (low/medium/high): + +**Description**: +Given a Conduit instance with +`--pipelines.exit-on-degraded=true --pipelines.error-recovery.max-retries=0`, +and a pipeline that goes into a degraded state, the Conduit instance will +gracefully shut down. This is due `max-retries=0` disabling the recovery. + +**Automated** (yes/no) + +**Setup**: + +**Pipeline configuration file**: + +```yaml +``` + +**Steps**: + +**Expected Result**: + +**Additional comments**: + +--- + +## Test Case 10: Recovery not triggered for fatal error - DLQ threshold exceeded + +**Priority** (low/medium/high): + +**Description**: +Recovery is not triggered when the DLQ threshold is exceeded. + +**Automated** (yes/no) + +**Setup**: + +**Pipeline configuration file**: + +```yaml +version: "2.2" +pipelines: + - id: pipeline1 + status: running + name: pipeline1 + description: chaos destination with write errors, DLQ threshold specified + connectors: + - id: generator-src + type: source + plugin: builtin:generator + name: generator-src + settings: + format.type: structured + format.options.id: int + format.options.name: string + rate: "1" + - id: chaos-destination-1 + type: destination + plugin: standalone:chaos + name: chaos-destination-1 + settings: + writeMode: error + dead-letter-queue: + window-size: 2 + window-nack-threshold: 1 +``` + +**Steps**: + +**Expected Result**: + +**Additional comments**: + +--- diff --git a/docs/test-cases/test-case-template.md b/docs/test-cases/test-case-template.md new file mode 100644 index 000000000..75c6a086d --- /dev/null +++ b/docs/test-cases/test-case-template.md @@ -0,0 +1,27 @@ + + +## Test Case 01: Test case title + +**Priority** (low/medium/high): + +**Description**: + +**Automated** (yes/no) + +**Setup**: + +**Pipeline configuration file**: + +```yaml +``` + +**Steps**: + +1. Step 1 +2. Step 2 + +**Expected Result**: + +**Additional comments**: + +--- diff --git a/pkg/conduit/config.go b/pkg/conduit/config.go index 82958dc90..e96b0738f 100644 --- a/pkg/conduit/config.go +++ b/pkg/conduit/config.go @@ -22,6 +22,7 @@ import ( sdk "github.com/conduitio/conduit-connector-sdk" "github.com/conduitio/conduit/pkg/foundation/cerrors" "github.com/conduitio/conduit/pkg/foundation/log" + "github.com/conduitio/conduit/pkg/lifecycle" "github.com/conduitio/conduit/pkg/plugin/connector/builtin" "github.com/rs/zerolog" "golang.org/x/exp/constraints" @@ -84,14 +85,19 @@ type Config struct { } Pipelines struct { - Path string - ExitOnError bool - ErrorRecovery struct { - MinDelay time.Duration - MaxDelay time.Duration + Path string + ExitOnDegraded bool + ErrorRecovery struct { + // MinDelay is the minimum delay before restart: Default: 1 second + MinDelay time.Duration + // MaxDelay is the maximum delay before restart: Default: 10 minutes + MaxDelay time.Duration + // BackoffFactor is the factor by which the delay is multiplied after each restart: Default: 2 BackoffFactor int - MaxRetries int - HealthyAfter time.Duration + // MaxRetries is the maximum number of restarts before the pipeline is considered unhealthy: Default: -1 (infinite) + MaxRetries int64 + // HealthyAfter is the time after which the pipeline is considered healthy: Default: 5 minutes + HealthyAfter time.Duration } } @@ -137,7 +143,7 @@ func DefaultConfig() Config { cfg.Pipelines.ErrorRecovery.MinDelay = time.Second cfg.Pipelines.ErrorRecovery.MaxDelay = 10 * time.Minute cfg.Pipelines.ErrorRecovery.BackoffFactor = 2 - cfg.Pipelines.ErrorRecovery.MaxRetries = 0 + cfg.Pipelines.ErrorRecovery.MaxRetries = lifecycle.InfiniteRetriesErrRecovery cfg.Pipelines.ErrorRecovery.HealthyAfter = 5 * time.Minute cfg.SchemaRegistry.Type = SchemaRegistryTypeBuiltin @@ -206,8 +212,8 @@ func (c Config) validateErrorRecovery() error { if err := requireNonNegativeValue("backoff-factor", errRecoveryCfg.BackoffFactor); err != nil { errs = append(errs, err) } - if err := requireNonNegativeValue("max-retries", errRecoveryCfg.MaxRetries); err != nil { - errs = append(errs, err) + if errRecoveryCfg.MaxRetries < lifecycle.InfiniteRetriesErrRecovery { + errs = append(errs, cerrors.Errorf(`invalid "max-retries" value. It must be %d for infinite retries or >= 0`, lifecycle.InfiniteRetriesErrRecovery)) } if err := requirePositiveValue("healthy-after", errRecoveryCfg.HealthyAfter); err != nil { errs = append(errs, err) diff --git a/pkg/conduit/config_test.go b/pkg/conduit/config_test.go index c252600c3..046551e46 100644 --- a/pkg/conduit/config_test.go +++ b/pkg/conduit/config_test.go @@ -20,6 +20,7 @@ import ( "github.com/conduitio/conduit-commons/database/inmemory" "github.com/conduitio/conduit/pkg/foundation/cerrors" + "github.com/conduitio/conduit/pkg/lifecycle" "github.com/matryer/is" ) @@ -196,12 +197,20 @@ func TestConfig_Validate(t *testing.T) { want: cerrors.New(`invalid error recovery config: "backoff-factor" config value mustn't be negative (got: -1)`), }, { - name: "error recovery: negative max-retries", + name: "error recovery: max-retries smaller than -1", setupConfig: func(c Config) Config { - c.Pipelines.ErrorRecovery.MaxRetries = -1 + c.Pipelines.ErrorRecovery.MaxRetries = lifecycle.InfiniteRetriesErrRecovery - 1 return c }, - want: cerrors.New(`invalid error recovery config: "max-retries" config value mustn't be negative (got: -1)`), + want: cerrors.New(`invalid error recovery config: invalid "max-retries" value. It must be -1 for infinite retries or >= 0`), + }, + { + name: "error recovery: with 0 max-retries ", + setupConfig: func(c Config) Config { + c.Pipelines.ErrorRecovery.MaxRetries = 0 + return c + }, + want: nil, }, { name: "error recovery: negative healthy-after", diff --git a/pkg/conduit/entrypoint.go b/pkg/conduit/entrypoint.go index e74cbad61..1cd74aceb 100644 --- a/pkg/conduit/entrypoint.go +++ b/pkg/conduit/entrypoint.go @@ -105,12 +105,23 @@ func (*Entrypoint) Flags(cfg *Config) *flag.FlagSet { cfg.Pipelines.Path, "path to the directory that has the yaml pipeline configuration files, or a single pipeline configuration file", ) + + // Deprecated: use `pipelines.exit-on-degraded` instead + // Note: If both `pipeline.exit-on-error` and `pipeline.exit-on-degraded` are set, `pipeline.exit-on-degraded` will take precedence flags.BoolVar( - &cfg.Pipelines.ExitOnError, + &cfg.Pipelines.ExitOnDegraded, "pipelines.exit-on-error", - cfg.Pipelines.ExitOnError, - "exit Conduit if a pipeline experiences an error while running", + cfg.Pipelines.ExitOnDegraded, + "Deprecated: use `exit-on-degraded` instead.\nexit Conduit if a pipeline experiences an error while running", + ) + + flags.BoolVar( + &cfg.Pipelines.ExitOnDegraded, + "pipelines.exit-on-degraded", + cfg.Pipelines.ExitOnDegraded, + "exit Conduit if a pipeline enters a degraded state", ) + flags.DurationVar( &cfg.Pipelines.ErrorRecovery.MinDelay, "pipelines.error-recovery.min-delay", @@ -129,7 +140,7 @@ func (*Entrypoint) Flags(cfg *Config) *flag.FlagSet { cfg.Pipelines.ErrorRecovery.BackoffFactor, "backoff factor applied to the last delay", ) - flags.IntVar( + flags.Int64Var( &cfg.Pipelines.ErrorRecovery.MaxRetries, "pipelines.error-recovery.max-retries", cfg.Pipelines.ErrorRecovery.MaxRetries, @@ -151,11 +162,16 @@ func (*Entrypoint) Flags(cfg *Config) *flag.FlagSet { flags.StringVar(&cfg.dev.memprofile, "dev.memprofile", "", "write memory profile to file") flags.StringVar(&cfg.dev.blockprofile, "dev.blockprofile", "", "write block profile to file") + // Deprecated flags that are hidden from help output + deprecatedFlags := map[string]bool{ + "pipelines.exit-on-error": true, + } + // show user or dev flags flags.Usage = func() { tmpFlags := flag.NewFlagSet("conduit", flag.ExitOnError) flags.VisitAll(func(f *flag.Flag) { - if f.Name == "dev" || strings.HasPrefix(f.Name, "dev.") != *showDevHelp { + if f.Name == "dev" || strings.HasPrefix(f.Name, "dev.") != *showDevHelp || deprecatedFlags[f.Name] { return // hide flag from output } // reset value to its default, to ensure default is shown correctly diff --git a/pkg/conduit/runtime.go b/pkg/conduit/runtime.go index c6df8fd15..0de2276c9 100644 --- a/pkg/conduit/runtime.go +++ b/pkg/conduit/runtime.go @@ -64,7 +64,6 @@ import ( "github.com/conduitio/conduit/pkg/web/ui" apiv1 "github.com/conduitio/conduit/proto/api/v1" grpcruntime "github.com/grpc-ecosystem/grpc-gateway/v2/runtime" - "github.com/jpillora/backoff" "github.com/piotrkowalczuk/promgrpc/v4" promclient "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promhttp" @@ -204,18 +203,19 @@ func createServices(r *Runtime) error { tokenService, ) - errRecovery := r.Config.Pipelines.ErrorRecovery - backoffCfg := &backoff.Backoff{ - Min: errRecovery.MinDelay, - Max: errRecovery.MaxDelay, - Factor: float64(errRecovery.BackoffFactor), - Jitter: true, + // Error recovery configuration + errRecoveryCfg := &lifecycle.ErrRecoveryCfg{ + MinDelay: r.Config.Pipelines.ErrorRecovery.MinDelay, + MaxDelay: r.Config.Pipelines.ErrorRecovery.MaxDelay, + BackoffFactor: r.Config.Pipelines.ErrorRecovery.BackoffFactor, + MaxRetries: r.Config.Pipelines.ErrorRecovery.MaxRetries, + HealthyAfter: r.Config.Pipelines.ErrorRecovery.HealthyAfter, // TODO: possibly create a go routine to continuously check health and reset status when needed } plService := pipeline.NewService(r.logger, r.DB) connService := connector.NewService(r.logger, r.DB, r.connectorPersister) procService := processor.NewService(r.logger, r.DB, procPluginService) - lifecycleService := lifecycle.NewService(r.logger, backoffCfg, connService, procService, connPluginService, plService) + lifecycleService := lifecycle.NewService(r.logger, errRecoveryCfg, connService, procService, connPluginService, plService) provisionService := provisioning.NewService(r.DB, r.logger, plService, connService, procService, connPluginService, lifecycleService, r.Config.Pipelines.Path) orc := orchestrator.NewOrchestrator(r.DB, r.logger, plService, connService, procService, connPluginService, procPluginService, lifecycleService) @@ -769,13 +769,13 @@ func (r *Runtime) initServices(ctx context.Context, t *tomb.Tomb) error { return cerrors.Errorf("failed to init connector service: %w", err) } - if r.Config.Pipelines.ExitOnError { + if r.Config.Pipelines.ExitOnDegraded { r.lifecycleService.OnFailure(func(e lifecycle.FailureEvent) { r.logger.Warn(ctx). Err(e.Error). Str(log.PipelineIDField, e.ID). - Msg("Conduit will shut down due to a pipeline failure and 'exit on error' enabled") - t.Kill(cerrors.Errorf("shut down due to 'exit on error' enabled: %w", e.Error)) + Msg("Conduit will shut down due to a pipeline failure and 'exit-on-degraded' enabled") + t.Kill(cerrors.Errorf("shut down due to 'exit-on-degraded' error: %w", e.Error)) }) } err = r.pipelineService.Init(ctx) @@ -788,7 +788,7 @@ func (r *Runtime) initServices(ctx context.Context, t *tomb.Tomb) error { cerrors.ForEach(err, func(err error) { r.logger.Err(ctx, err).Msg("provisioning failed") }) - if r.Config.Pipelines.ExitOnError { + if r.Config.Pipelines.ExitOnDegraded { r.logger.Warn(ctx). Err(err). Msg("Conduit will shut down due to a pipeline provisioning failure and 'exit on error' enabled") diff --git a/pkg/foundation/log/fields.go b/pkg/foundation/log/fields.go index 7f4b883c9..3a0da1b01 100644 --- a/pkg/foundation/log/fields.go +++ b/pkg/foundation/log/fields.go @@ -17,6 +17,7 @@ package log const ( ComponentField = "component" ConnectorIDField = "connector_id" + AttemptField = "attempt" DurationField = "duration" MessageIDField = "message_id" NodeIDField = "node_id" diff --git a/pkg/lifecycle/service.go b/pkg/lifecycle/service.go index 2849da9fe..9e0ef8948 100644 --- a/pkg/lifecycle/service.go +++ b/pkg/lifecycle/service.go @@ -39,6 +39,8 @@ import ( "gopkg.in/tomb.v2" ) +const InfiniteRetriesErrRecovery = -1 + type FailureEvent struct { // ID is the ID of the pipeline which failed. ID string @@ -47,11 +49,28 @@ type FailureEvent struct { type FailureHandler func(FailureEvent) +type ErrRecoveryCfg struct { + MinDelay time.Duration + MaxDelay time.Duration + BackoffFactor int + MaxRetries int64 + HealthyAfter time.Duration +} + +func (e *ErrRecoveryCfg) toBackoff() *backoff.Backoff { + return &backoff.Backoff{ + Min: e.MinDelay, + Max: e.MaxDelay, + Factor: float64(e.BackoffFactor), + Jitter: true, + } +} + // Service manages pipelines. type Service struct { logger log.CtxLogger - backoffCfg *backoff.Backoff + errRecoveryCfg *ErrRecoveryCfg pipelines PipelineService connectors ConnectorService @@ -66,7 +85,7 @@ type Service struct { // NewService initializes and returns a lifecycle.Service. func NewService( logger log.CtxLogger, - backoffCfg *backoff.Backoff, + errRecoveryCfg *ErrRecoveryCfg, connectors ConnectorService, processors ProcessorService, connectorPlugins ConnectorPluginService, @@ -74,7 +93,7 @@ func NewService( ) *Service { return &Service{ logger: logger.WithComponent("lifecycle.Service"), - backoffCfg: backoffCfg, + errRecoveryCfg: errRecoveryCfg, connectors: connectors, processors: processors, connectorPlugins: connectorPlugins, @@ -84,9 +103,10 @@ func NewService( } type runnablePipeline struct { - pipeline *pipeline.Instance - n []stream.Node - t *tomb.Tomb + pipeline *pipeline.Instance + n []stream.Node + t *tomb.Tomb + backoffCfg backoff.Backoff } // ConnectorService can fetch and create a connector instance. @@ -157,10 +177,20 @@ func (s *Service) Start( } s.logger.Debug(ctx).Str(log.PipelineIDField, pl.ID).Msg("starting pipeline") - s.logger.Trace(ctx).Str(log.PipelineIDField, pl.ID).Msg("building nodes") - rp, err := s.buildRunnablePipeline(ctx, pl) + var backoffCfg *backoff.Backoff + + // We check if the pipeline was previously running and get the backoff configuration from it. + oldRp, ok := s.runningPipelines.Get(pipelineID) + if !ok { + // default backoff configuration + backoffCfg = s.errRecoveryCfg.toBackoff() + } else { + backoffCfg = &oldRp.backoffCfg + } + + rp, err := s.buildRunnablePipeline(ctx, pl, backoffCfg) if err != nil { return cerrors.Errorf("could not build nodes for pipeline %s: %w", pl.ID, err) } @@ -176,6 +206,44 @@ func (s *Service) Start( return nil } +// StartWithBackoff starts a pipeline with a backoff. +// It'll check the number of times the pipeline has been restarted and the duration of the backoff. +// When the pipeline has reached out the maximum number of retries, it'll return a fatal error. +func (s *Service) StartWithBackoff(ctx context.Context, rp *runnablePipeline) error { + s.logger.Info(ctx).Str(log.PipelineIDField, rp.pipeline.ID).Msg("restarting with backoff") + + attempt := int64(rp.backoffCfg.Attempt()) + duration := rp.backoffCfg.Duration() + + s.logger.Trace(ctx).Dur(log.DurationField, duration).Int64(log.AttemptField, attempt).Msg("backoff configuration") + + if s.errRecoveryCfg.MaxRetries != InfiniteRetriesErrRecovery && attempt >= s.errRecoveryCfg.MaxRetries { + return cerrors.FatalError(cerrors.Errorf("failed to recover pipeline %s after %d attempts: %w", rp.pipeline.ID, attempt, pipeline.ErrPipelineCannotRecover)) + } + + // This results in a default delay progression of 1s, 2s, 4s, 8s, 16s, [...], 10m, 10m,... balancing the need for recovery time and minimizing downtime. + timer := time.NewTimer(duration) + select { + case <-ctx.Done(): + return ctx.Err() + case <-time.After(duration): + <-timer.C + } + + // Get status of pipeline to check if it already recovered. + if rp.pipeline.GetStatus() == pipeline.StatusRunning { + s.logger.Debug(ctx). + Str(log.PipelineIDField, rp.pipeline.ID). + Int64(log.AttemptField, attempt). + Int("backoffRetry.count", s.errRecoveryCfg.BackoffFactor). + Int64("backoffRetry.duration", duration.Milliseconds()). + Msg("pipeline recovered") + return nil + } + + return s.Start(ctx, rp.pipeline.ID) +} + // Stop will attempt to gracefully stop a given pipeline by calling each node's // Stop function. If force is set to true the pipeline won't stop gracefully, // instead the context for all nodes will be canceled which causes them to stop @@ -226,7 +294,9 @@ func (s *Service) stopForceful(ctx context.Context, rp *runnablePipeline) error Str(log.PipelineIDField, rp.pipeline.ID). Any(log.PipelineStatusField, rp.pipeline.GetStatus()). Msg("force stopping pipeline") - rp.t.Kill(pipeline.ErrForceStop) + + // Creates a FatalError to prevent the pipeline from recovering. + rp.t.Kill(cerrors.FatalError(pipeline.ErrForceStop)) for _, n := range rp.n { if node, ok := n.(stream.ForceStoppableNode); ok { // stop all pub nodes @@ -309,11 +379,8 @@ func (s *Service) WaitPipeline(id string) error { return p.t.Wait() } -// buildRunnablePipeline will build and connect all nodes configured in the pipeline. -func (s *Service) buildRunnablePipeline( - ctx context.Context, - pl *pipeline.Instance, -) (*runnablePipeline, error) { +// buildsNodes will build new nodes that will be assigned to the pipeline.Instance. +func (s *Service) buildNodes(ctx context.Context, pl *pipeline.Instance) ([]stream.Node, error) { // setup many to many channels fanIn := stream.FaninNode{Name: "fanin"} fanOut := stream.FanoutNode{Name: "fanout"} @@ -353,10 +420,24 @@ func (s *Service) buildRunnablePipeline( for _, n := range nodes { stream.SetLogger(n, nodeLogger) } + return nodes, nil +} + +// buildRunnablePipeline will build and connect all nodes configured in the pipeline. +func (s *Service) buildRunnablePipeline( + ctx context.Context, + pl *pipeline.Instance, + backoffCfg *backoff.Backoff, +) (*runnablePipeline, error) { + nodes, err := s.buildNodes(ctx, pl) + if err != nil { + return nil, err + } return &runnablePipeline{ - pipeline: pl, - n: nodes, + pipeline: pl, + n: nodes, + backoffCfg: *backoffCfg, }, nil } @@ -664,6 +745,11 @@ func (s *Service) runPipeline(ctx context.Context, rp *runnablePipeline) error { }) } + // TODO: When it's recovering, we should only update the status back to running once HealthyAfter has passed. + // now: + // running -> (error) -> recovering (restart) -> running + // future (with the HealthyAfter mechanism): + // running -> (error) -> recovering (restart) -> recovering (wait for HealthyAfter) -> running err := s.pipelines.UpdateStatus(ctx, rp.pipeline.ID, pipeline.StatusRunning, "") if err != nil { return err @@ -682,27 +768,42 @@ func (s *Service) runPipeline(ctx context.Context, rp *runnablePipeline) error { switch err { case tomb.ErrStillAlive: // not an actual error, the pipeline stopped gracefully + err = nil + var status pipeline.Status if isGracefulShutdown.Load() { // it was triggered by a graceful shutdown of Conduit - err = s.pipelines.UpdateStatus(ctx, rp.pipeline.ID, pipeline.StatusSystemStopped, "") + status = pipeline.StatusSystemStopped } else { // it was manually triggered by a user - err = s.pipelines.UpdateStatus(ctx, rp.pipeline.ID, pipeline.StatusUserStopped, "") + status = pipeline.StatusUserStopped } - if err != nil { + if err := s.pipelines.UpdateStatus(ctx, rp.pipeline.ID, status, ""); err != nil { return err } default: if cerrors.IsFatalError(err) { // we use %+v to get the stack trace too - err = s.pipelines.UpdateStatus(ctx, rp.pipeline.ID, pipeline.StatusDegraded, fmt.Sprintf("%+v", err)) - if err != nil { + if err := s.pipelines.UpdateStatus(ctx, rp.pipeline.ID, pipeline.StatusDegraded, fmt.Sprintf("%+v", err)); err != nil { return err } } else { - err = s.pipelines.UpdateStatus(ctx, rp.pipeline.ID, pipeline.StatusRecovering, "") - if err != nil { - return err + // try to recover the pipeline + if recoveryErr := s.recoverPipeline(ctx, rp); recoveryErr != nil { + s.logger. + Err(ctx, err). + Str(log.PipelineIDField, rp.pipeline.ID). + Msg("pipeline recovery failed stopped") + + if updateErr := s.pipelines.UpdateStatus(ctx, rp.pipeline.ID, pipeline.StatusDegraded, fmt.Sprintf("%+v", recoveryErr)); updateErr != nil { + return updateErr + } + + // we assign it to err so it's returned and notified by the cleanup function + err = recoveryErr + } else { + // recovery was triggered didn't error, so no cleanup + // this is why we return nil to skip the cleanup below. + return nil } } } @@ -721,6 +822,19 @@ func (s *Service) runPipeline(ctx context.Context, rp *runnablePipeline) error { return nil } +// recoverPipeline attempts to recover a pipeline that has stopped running. +func (s *Service) recoverPipeline(ctx context.Context, rp *runnablePipeline) error { + s.logger.Trace(ctx).Str(log.PipelineIDField, rp.pipeline.ID).Msg("recovering pipeline") + + err := s.pipelines.UpdateStatus(ctx, rp.pipeline.ID, pipeline.StatusRecovering, "") + if err != nil { + return err + } + + // Exit the goroutine and attempt to restart the pipeline + return s.StartWithBackoff(ctx, rp) +} + // notify notifies all registered FailureHandlers about an error. func (s *Service) notify(pipelineID string, err error) { if err == nil { diff --git a/pkg/lifecycle/service_test.go b/pkg/lifecycle/service_test.go index d3bb0a7f0..9e31f699a 100644 --- a/pkg/lifecycle/service_test.go +++ b/pkg/lifecycle/service_test.go @@ -36,7 +36,6 @@ import ( pmock "github.com/conduitio/conduit/pkg/plugin/connector/mock" "github.com/conduitio/conduit/pkg/processor" "github.com/google/uuid" - "github.com/jpillora/backoff" "github.com/matryer/is" "github.com/rs/zerolog" "go.uber.org/mock/gomock" @@ -52,7 +51,6 @@ func TestServiceLifecycle_buildRunnablePipeline(t *testing.T) { logger := log.New(zerolog.Nop()) db := &inmemory.DB{} persister := connector.NewPersister(logger, db, time.Second, 3) - b := &backoff.Backoff{} source := dummySource(persister) destination := dummyDestination(persister) @@ -72,7 +70,7 @@ func TestServiceLifecycle_buildRunnablePipeline(t *testing.T) { ls := NewService( logger, - b, + testErrRecoveryCfg(), testConnectorService{ source.ID: source, destination.ID: destination, @@ -90,6 +88,7 @@ func TestServiceLifecycle_buildRunnablePipeline(t *testing.T) { got, err := ls.buildRunnablePipeline( ctx, pl, + ls.errRecoveryCfg.toBackoff(), ) is.NoErr(err) @@ -145,7 +144,6 @@ func TestService_buildRunnablePipeline_NoSourceNode(t *testing.T) { logger := log.New(zerolog.Nop()) db := &inmemory.DB{} persister := connector.NewPersister(logger, db, time.Second, 3) - b := &backoff.Backoff{} destination := dummyDestination(persister) dlq := dummyDestination(persister) @@ -162,7 +160,7 @@ func TestService_buildRunnablePipeline_NoSourceNode(t *testing.T) { } pl.SetStatus(pipeline.StatusUserStopped) - ls := NewService(logger, b, testConnectorService{ + ls := NewService(logger, testErrRecoveryCfg(), testConnectorService{ destination.ID: destination, testDLQID: dlq, }, testProcessorService{}, @@ -176,6 +174,7 @@ func TestService_buildRunnablePipeline_NoSourceNode(t *testing.T) { got, err := ls.buildRunnablePipeline( ctx, pl, + ls.errRecoveryCfg.toBackoff(), ) is.True(err != nil) @@ -191,12 +190,11 @@ func TestService_buildRunnablePipeline_NoDestinationNode(t *testing.T) { logger := log.New(zerolog.Nop()) db := &inmemory.DB{} persister := connector.NewPersister(logger, db, time.Second, 3) - b := &backoff.Backoff{} source := dummySource(persister) dlq := dummyDestination(persister) - ls := NewService(logger, b, testConnectorService{ + ls := NewService(logger, testErrRecoveryCfg(), testConnectorService{ source.ID: source, testDLQID: dlq, }, @@ -224,6 +222,7 @@ func TestService_buildRunnablePipeline_NoDestinationNode(t *testing.T) { got, err := ls.buildRunnablePipeline( ctx, pl, + ls.errRecoveryCfg.toBackoff(), ) is.True(err != nil) @@ -239,7 +238,6 @@ func TestServiceLifecycle_PipelineSuccess(t *testing.T) { db := &inmemory.DB{} persister := connector.NewPersister(logger, db, time.Second, 3) defer persister.Wait() - b := &backoff.Backoff{} ps := pipeline.NewService(logger, db) @@ -250,9 +248,9 @@ func TestServiceLifecycle_PipelineSuccess(t *testing.T) { // create mocked connectors ctrl := gomock.NewController(t) wantRecords := generateRecords(10) - source, sourceDispenser := generatorSource(ctrl, persister, wantRecords, nil, true) - destination, destDispenser := asserterDestination(ctrl, persister, wantRecords) - dlq, dlqDispenser := asserterDestination(ctrl, persister, nil) + source, srcDispenser := asserterSource(ctrl, persister, wantRecords, nil, true, 1) + destination, destDispenser := asserterDestination(ctrl, persister, wantRecords, 1) + dlq, dlqDispenser := asserterDestination(ctrl, persister, nil, 1) pl.DLQ.Plugin = dlq.Plugin pl, err = ps.AddConnector(ctx, pl.ID, source.ID) @@ -260,14 +258,14 @@ func TestServiceLifecycle_PipelineSuccess(t *testing.T) { pl, err = ps.AddConnector(ctx, pl.ID, destination.ID) is.NoErr(err) - ls := NewService(logger, b, testConnectorService{ + ls := NewService(logger, testErrRecoveryCfg(), testConnectorService{ source.ID: source, destination.ID: destination, testDLQID: dlq, }, testProcessorService{}, testConnectorPluginService{ - source.Plugin: sourceDispenser, + source.Plugin: srcDispenser, destination.Plugin: destDispenser, dlq.Plugin: dlqDispenser, }, ps) @@ -301,7 +299,6 @@ func TestServiceLifecycle_PipelineError(t *testing.T) { logger := log.Test(t) db := &inmemory.DB{} persister := connector.NewPersister(logger, db, time.Second, 3) - b := &backoff.Backoff{} ps := pipeline.NewService(logger, db) @@ -313,9 +310,9 @@ func TestServiceLifecycle_PipelineError(t *testing.T) { wantErr := cerrors.New("source connector error") ctrl := gomock.NewController(t) wantRecords := generateRecords(10) - source, sourceDispenser := generatorSource(ctrl, persister, wantRecords, wantErr, false) - destination, destDispenser := asserterDestination(ctrl, persister, wantRecords) - dlq, dlqDispenser := asserterDestination(ctrl, persister, nil) + source, srcDispenser := asserterSource(ctrl, persister, wantRecords, wantErr, false, 1) + destination, destDispenser := asserterDestination(ctrl, persister, wantRecords, 1) + dlq, dlqDispenser := asserterDestination(ctrl, persister, nil, 1) pl.DLQ.Plugin = dlq.Plugin pl, err = ps.AddConnector(ctx, pl.ID, source.ID) @@ -323,14 +320,14 @@ func TestServiceLifecycle_PipelineError(t *testing.T) { pl, err = ps.AddConnector(ctx, pl.ID, destination.ID) is.NoErr(err) - ls := NewService(logger, b, testConnectorService{ + ls := NewService(logger, testErrRecoveryCfg(), testConnectorService{ source.ID: source, destination.ID: destination, testDLQID: dlq, }, testProcessorService{}, testConnectorPluginService{ - source.Plugin: sourceDispenser, + source.Plugin: srcDispenser, destination.Plugin: destDispenser, dlq.Plugin: dlqDispenser, }, ps) @@ -370,82 +367,113 @@ func TestServiceLifecycle_PipelineError(t *testing.T) { is.True(cerrors.Is(event.Error, wantErr)) } -func TestServiceLifecycle_StopAll_Recovering(t *testing.T) { +func TestServiceLifecycle_Stop(t *testing.T) { type testCase struct { - name string - stopFn func(ctx context.Context, is *is.I, lifecycleService *Service, pipelineID string) - // whether we expect the source plugin's Stop() function to be called - // (doesn't happen when force-stopping) - wantSourceStop bool - want pipeline.Status - wantErr error + name string + stopFn func(ctx context.Context, is *is.I, lifecycleService *Service, pipelineID string) + forceStop bool + want pipeline.Status + wantErr error } - runTest := func(t *testing.T, tc testCase) { - is := is.New(t) - ctx, killAll := context.WithCancel(context.Background()) - defer killAll() - logger := log.New(zerolog.Nop()) - db := &inmemory.DB{} - persister := connector.NewPersister(logger, db, time.Second, 3) - b := &backoff.Backoff{} + testCases := []testCase{ + { + name: "user stop: graceful", + stopFn: func(ctx context.Context, is *is.I, ls *Service, pipelineID string) { + err := ls.Stop(ctx, pipelineID, false) + is.NoErr(err) + }, + want: pipeline.StatusUserStopped, + }, + { + name: "user stop: forceful", + stopFn: func(ctx context.Context, is *is.I, ls *Service, pipelineID string) { + err := ls.Stop(ctx, pipelineID, true) + is.NoErr(err) + }, + forceStop: true, + wantErr: cerrors.FatalError(pipeline.ErrForceStop), + want: pipeline.StatusDegraded, + }, + } - ps := pipeline.NewService(logger, db) + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + is := is.New(t) + ctx, killAll := context.WithCancel(context.Background()) + defer killAll() + logger := log.Test(t) + db := &inmemory.DB{} + persister := connector.NewPersister(logger, db, time.Second, 3) - // create a host pipeline - pl, err := ps.Create(ctx, uuid.NewString(), pipeline.Config{Name: "test pipeline"}, pipeline.ProvisionTypeAPI) - is.NoErr(err) + ps := pipeline.NewService(logger, db) - // create mocked connectors - // source will stop and return ErrGracefulShutdown which should signal to the - // service that everything went well and the pipeline was gracefully shutdown - ctrl := gomock.NewController(t) - wantRecords := generateRecords(0) - source, sourceDispenser := generatorSource(ctrl, persister, wantRecords, nil, tc.wantSourceStop) - destination, destDispenser := asserterDestination(ctrl, persister, wantRecords) - dlq, dlqDispenser := asserterDestination(ctrl, persister, nil) - pl.DLQ.Plugin = dlq.Plugin + // create a host pipeline + pl, err := ps.Create(ctx, uuid.NewString(), pipeline.Config{Name: "test pipeline"}, pipeline.ProvisionTypeAPI) + is.NoErr(err) - pl, err = ps.AddConnector(ctx, pl.ID, source.ID) - is.NoErr(err) - pl, err = ps.AddConnector(ctx, pl.ID, destination.ID) - is.NoErr(err) + // create mocked connectors + // source will stop and return ErrGracefulShutdown which should signal to the + // service that everything went well and the pipeline was gracefully shutdown + ctrl := gomock.NewController(t) + wantRecords := generateRecords(0) + source, srcDispenser := asserterSource(ctrl, persister, wantRecords, nil, !tc.forceStop, 1) + destination, destDispenser := asserterDestination(ctrl, persister, wantRecords, 1) + dlq, dlqDispenser := asserterDestination(ctrl, persister, nil, 1) + pl.DLQ.Plugin = dlq.Plugin + + pl, err = ps.AddConnector(ctx, pl.ID, source.ID) + is.NoErr(err) + pl, err = ps.AddConnector(ctx, pl.ID, destination.ID) + is.NoErr(err) - ls := NewService(logger, b, testConnectorService{ - source.ID: source, - destination.ID: destination, - testDLQID: dlq, - }, - testProcessorService{}, - testConnectorPluginService{ - source.Plugin: sourceDispenser, - destination.Plugin: destDispenser, - dlq.Plugin: dlqDispenser, - }, ps) + ls := NewService( + logger, + testErrRecoveryCfg(), + testConnectorService{ + source.ID: source, + destination.ID: destination, + testDLQID: dlq, + }, + testProcessorService{}, + testConnectorPluginService{ + source.Plugin: srcDispenser, + destination.Plugin: destDispenser, + dlq.Plugin: dlqDispenser, + }, ps) + + // start the pipeline now that everything is set up + err = ls.Start( + ctx, + pl.ID, + ) + is.NoErr(err) - // start the pipeline now that everything is set up - err = ls.Start( - ctx, - pl.ID, - ) - is.NoErr(err) + // wait for pipeline to finish consuming records from the source + time.Sleep(100 * time.Millisecond) - // wait for pipeline to finish consuming records from the source - time.Sleep(100 * time.Millisecond) + tc.stopFn(ctx, is, ls, pl.ID) - pl.SetStatus(pipeline.StatusRecovering) - tc.stopFn(ctx, is, ls, pl.ID) + // wait for pipeline to finish + err = ls.WaitPipeline(pl.ID) + if tc.wantErr != nil { + is.True(err != nil) + } else { + is.NoErr(err) + is.Equal("", pl.Error) + } - // wait for pipeline to finish - err = ls.WaitPipeline(pl.ID) - if tc.wantErr != nil { - is.True(err != nil) - } else { - is.NoErr(err) - is.Equal("", pl.Error) - } + is.Equal(tc.want, pl.GetStatus()) + }) + } +} - is.Equal(tc.want, pl.GetStatus()) +func TestServiceLifecycle_StopAll(t *testing.T) { + type testCase struct { + name string + stopFn func(ctx context.Context, is *is.I, lifecycleService *Service, pipelineID string) + want pipeline.Status + wantErr error } testCases := []testCase{ @@ -454,45 +482,182 @@ func TestServiceLifecycle_StopAll_Recovering(t *testing.T) { stopFn: func(ctx context.Context, is *is.I, ls *Service, pipelineID string) { ls.StopAll(ctx, pipeline.ErrGracefulShutdown) }, - wantSourceStop: true, - want: pipeline.StatusSystemStopped, + want: pipeline.StatusSystemStopped, }, { - name: "system stop (fatal err)", + name: "system stop (fatal error)", stopFn: func(ctx context.Context, is *is.I, ls *Service, pipelineID string) { ls.StopAll(ctx, cerrors.FatalError(cerrors.New("terrible err"))) }, - wantSourceStop: true, - want: pipeline.StatusDegraded, - wantErr: cerrors.New("terrible err"), - }, - { - name: "connection error", - stopFn: func(ctx context.Context, is *is.I, ls *Service, pipelineID string) { - ls.StopAll(ctx, cerrors.New("lost connection to database")) - }, - wantSourceStop: true, - want: pipeline.StatusRecovering, - wantErr: cerrors.New("lost connection to database"), - }, - { - name: "user stop (graceful)", - stopFn: func(ctx context.Context, is *is.I, ls *Service, pipelineID string) { - err := ls.Stop(ctx, pipelineID, false) - is.NoErr(err) - }, - wantSourceStop: true, - want: pipeline.StatusUserStopped, + want: pipeline.StatusDegraded, + wantErr: cerrors.New("terrible err"), }, } for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { - runTest(t, tc) + is := is.New(t) + ctx, killAll := context.WithCancel(context.Background()) + defer killAll() + logger := log.New(zerolog.Nop()) + db := &inmemory.DB{} + persister := connector.NewPersister(logger, db, time.Second, 3) + + ps := pipeline.NewService(logger, db) + + // create a host pipeline + pl, err := ps.Create(ctx, uuid.NewString(), pipeline.Config{Name: "test pipeline"}, pipeline.ProvisionTypeAPI) + is.NoErr(err) + + // create mocked connectors + // source will stop and return ErrGracefulShutdown which should signal to the + // service that everything went well and the pipeline was gracefully shutdown + ctrl := gomock.NewController(t) + wantRecords := generateRecords(0) + source, srcDispenser := asserterSource(ctrl, persister, wantRecords, nil, true, 1) + destination, destDispenser := asserterDestination(ctrl, persister, wantRecords, 1) + dlq, dlqDispenser := asserterDestination(ctrl, persister, nil, 1) + pl.DLQ.Plugin = dlq.Plugin + + pl, err = ps.AddConnector(ctx, pl.ID, source.ID) + is.NoErr(err) + pl, err = ps.AddConnector(ctx, pl.ID, destination.ID) + is.NoErr(err) + + ls := NewService( + logger, + testErrRecoveryCfg(), + testConnectorService{ + source.ID: source, + destination.ID: destination, + testDLQID: dlq, + }, + testProcessorService{}, + testConnectorPluginService{ + source.Plugin: srcDispenser, + destination.Plugin: destDispenser, + dlq.Plugin: dlqDispenser, + }, ps) + + // start the pipeline now that everything is set up + err = ls.Start( + ctx, + pl.ID, + ) + is.NoErr(err) + + // wait for pipeline to finish consuming records from the source + time.Sleep(100 * time.Millisecond) + + tc.stopFn(ctx, is, ls, pl.ID) + + // wait for pipeline to finish + err = ls.WaitPipeline(pl.ID) + if tc.wantErr != nil { + is.True(err != nil) + } else { + is.NoErr(err) + is.Equal("", pl.Error) + } + + is.Equal(tc.want, pl.GetStatus()) }) } } +// Creates first a pipeline that will stop with a recoverable error, to check later that it restarted and it's running. +func TestServiceLifecycle_StopAll_Recovering(t *testing.T) { + is := is.New(t) + ctx, killAll := context.WithCancel(context.Background()) + defer killAll() + logger := log.New(zerolog.Nop()) + db := &inmemory.DB{} + persister := connector.NewPersister(logger, db, time.Second, 3) + wantErr := cerrors.New("lost connection to database") + + ps := pipeline.NewService(logger, db) + + // create a host pipeline + pl, err := ps.Create(ctx, uuid.NewString(), pipeline.Config{Name: "test pipeline"}, pipeline.ProvisionTypeAPI) + is.NoErr(err) + + // create mocked connectors + // source will stop and return ErrGracefulShutdown which should signal to the + // service that everything went well and the pipeline was gracefully shutdown + ctrl := gomock.NewController(t) + wantRecords := generateRecords(0) + source, srcDispenser := asserterSource(ctrl, persister, wantRecords, nil, true, 2) + destination, destDispenser := asserterDestination(ctrl, persister, wantRecords, 2) + dlq, dlqDispenser := asserterDestination(ctrl, persister, nil, 2) + pl.DLQ.Plugin = dlq.Plugin + + pl, err = ps.AddConnector(ctx, pl.ID, source.ID) + is.NoErr(err) + pl, err = ps.AddConnector(ctx, pl.ID, destination.ID) + is.NoErr(err) + + ls := NewService( + logger, + testErrRecoveryCfg(), + testConnectorService{ + source.ID: source, + destination.ID: destination, + testDLQID: dlq, + }, + testProcessorService{}, + testConnectorPluginService{ + source.Plugin: srcDispenser, + destination.Plugin: destDispenser, + dlq.Plugin: dlqDispenser, + }, ps) + + // start the pipeline now that everything is set up + err = ls.Start( + ctx, + pl.ID, + ) + is.NoErr(err) + + // wait for pipeline to finish consuming records from the source + time.Sleep(100 * time.Millisecond) + + c := make(cchan.Chan[error]) + go func() { + c <- ls.WaitPipeline(pl.ID) + }() + + // force the pipeline to stop with a recoverable error + ls.StopAll(ctx, wantErr) + err, _, ctxErr := c.RecvTimeout(ctx, 10000*time.Millisecond) + is.NoErr(ctxErr) + + // check the first pipeline stopped with the error that caused the restart + is.True(cerrors.Is(err, wantErr)) + + go func() { + c <- ls.WaitPipeline(pl.ID) + }() + + _, _, err = c.RecvTimeout(ctx, 1000*time.Millisecond) + is.True(cerrors.Is(err, context.DeadlineExceeded)) + + // stop the running pipeline + err = ls.Stop(ctx, pl.ID, false) + is.NoErr(err) + + // Check pipeline ended in a running state + is.Equal(pipeline.StatusRunning, pl.GetStatus()) + + go func() { + c <- ls.WaitPipeline(pl.ID) + }() + err, _, _ = c.RecvTimeout(ctx, 1000*time.Millisecond) + is.NoErr(err) + + // This is to demonstrate the test indeed stopped the pipeline + is.Equal(pipeline.StatusUserStopped, pl.GetStatus()) +} + func TestServiceLifecycle_PipelineStop(t *testing.T) { is := is.New(t) ctx, killAll := context.WithCancel(context.Background()) @@ -500,7 +665,6 @@ func TestServiceLifecycle_PipelineStop(t *testing.T) { logger := log.New(zerolog.Nop()) db := &inmemory.DB{} persister := connector.NewPersister(logger, db, time.Second, 3) - b := &backoff.Backoff{} ps := pipeline.NewService(logger, db) @@ -513,9 +677,9 @@ func TestServiceLifecycle_PipelineStop(t *testing.T) { // service that everything went well and the pipeline was gracefully shutdown ctrl := gomock.NewController(t) wantRecords := generateRecords(10) - source, sourceDispenser := generatorSource(ctrl, persister, wantRecords, nil, true) - destination, destDispenser := asserterDestination(ctrl, persister, wantRecords) - dlq, dlqDispenser := asserterDestination(ctrl, persister, nil) + source, srcDispenser := asserterSource(ctrl, persister, wantRecords, nil, true, 1) + destination, destDispenser := asserterDestination(ctrl, persister, wantRecords, 1) + dlq, dlqDispenser := asserterDestination(ctrl, persister, nil, 1) pl.DLQ.Plugin = dlq.Plugin pl, err = ps.AddConnector(ctx, pl.ID, source.ID) @@ -523,14 +687,14 @@ func TestServiceLifecycle_PipelineStop(t *testing.T) { pl, err = ps.AddConnector(ctx, pl.ID, destination.ID) is.NoErr(err) - ls := NewService(logger, b, testConnectorService{ + ls := NewService(logger, testErrRecoveryCfg(), testConnectorService{ source.ID: source, destination.ID: destination, testDLQID: dlq, }, testProcessorService{}, testConnectorPluginService{ - source.Plugin: sourceDispenser, + source.Plugin: srcDispenser, destination.Plugin: destDispenser, dlq.Plugin: dlqDispenser, }, ps) @@ -563,7 +727,6 @@ func TestServiceLifecycle_Run_Rerun(t *testing.T) { logger := log.Test(t) db := &inmemory.DB{} persister := connector.NewPersister(logger, db, time.Second, 3) - b := &backoff.Backoff{} ps := pipeline.NewService(logger, db) @@ -573,18 +736,18 @@ func TestServiceLifecycle_Run_Rerun(t *testing.T) { // create mocked connectors var ( - source *connector.Instance - sourceDispenser *pmock.Dispenser - destination *connector.Instance - destDispenser *pmock.Dispenser - dlq *connector.Instance - dlqDispenser *pmock.Dispenser + source *connector.Instance + srcDispenser *pmock.Dispenser + destination *connector.Instance + destDispenser *pmock.Dispenser + dlq *connector.Instance + dlqDispenser *pmock.Dispenser ) if expected == pipeline.StatusRunning { // mocked connectors that are expected to be started - source, sourceDispenser = generatorSource(ctrl, persister, nil, nil, true) - destination, destDispenser = asserterDestination(ctrl, persister, nil) - dlq, dlqDispenser = asserterDestination(ctrl, persister, nil) + source, srcDispenser = asserterSource(ctrl, persister, nil, nil, true, 1) + destination, destDispenser = asserterDestination(ctrl, persister, nil, 1) + dlq, dlqDispenser = asserterDestination(ctrl, persister, nil, 1) } else { // dummy connectors that are not expected to be started source = dummySource(persister) @@ -606,14 +769,14 @@ func TestServiceLifecycle_Run_Rerun(t *testing.T) { err = ps.Init(ctx) is.NoErr(err) - ls := NewService(logger, b, testConnectorService{ + ls := NewService(logger, testErrRecoveryCfg(), testConnectorService{ source.ID: source, destination.ID: destination, testDLQID: dlq, }, testProcessorService{}, testConnectorPluginService{ - source.Plugin: sourceDispenser, + source.Plugin: srcDispenser, destination.Plugin: destDispenser, dlq.Plugin: dlqDispenser, }, ps) @@ -667,15 +830,16 @@ func generateRecords(count int) []opencdc.Record { return records } -// generatorSource creates a connector source that fills up the returned slice +// asserterSource creates a connector source that fills up the returned slice // with generated records as they are produced. After producing the requested // number of records it returns wantErr. -func generatorSource( +func asserterSource( ctrl *gomock.Controller, persister *connector.Persister, records []opencdc.Record, wantErr error, stop bool, + times int, ) (*connector.Instance, *pmock.Dispenser) { sourcePluginOptions := []pmock.ConfigurableSourcePluginOption{ pmock.SourcePluginWithConfigure(), @@ -689,23 +853,24 @@ func generatorSource( if stop { sourcePluginOptions = append(sourcePluginOptions, pmock.SourcePluginWithStop()) } - sourcePlugin := pmock.NewConfigurableSourcePlugin(ctrl, sourcePluginOptions...) - source := dummySource(persister) dispenser := pmock.NewDispenser(ctrl) - dispenser.EXPECT().DispenseSource().Return(sourcePlugin, nil) + dispenser.EXPECT().DispenseSource().DoAndReturn(func() (connectorPlugin.SourcePlugin, error) { + return pmock.NewConfigurableSourcePlugin(ctrl, sourcePluginOptions...), nil + }).Times(times) return source, dispenser } // asserterDestination creates a connector destination that checks if the records it gets -// match the expected records. On teardown it also makes sure that it received +// match the expected records. On teardown, it also makes sure that it received // all expected records. func asserterDestination( ctrl *gomock.Controller, persister *connector.Persister, records []opencdc.Record, + times int, ) (*connector.Instance, *pmock.Dispenser) { var lastPosition opencdc.Position if len(records) > 0 { @@ -721,12 +886,12 @@ func asserterDestination( pmock.DestinationPluginWithTeardown(), } - destinationPlugin := pmock.NewConfigurableDestinationPlugin(ctrl, destinationPluginOptions...) - dest := dummyDestination(persister) dispenser := pmock.NewDispenser(ctrl) - dispenser.EXPECT().DispenseDestination().Return(destinationPlugin, nil) + dispenser.EXPECT().DispenseDestination().DoAndReturn(func() (connectorPlugin.DestinationPlugin, error) { + return pmock.NewConfigurableDestinationPlugin(ctrl, destinationPluginOptions...), nil + }).Times(times) return dest, dispenser } @@ -762,6 +927,16 @@ func dummyDestination(persister *connector.Persister) *connector.Instance { return destination } +func testErrRecoveryCfg() *ErrRecoveryCfg { + return &ErrRecoveryCfg{ + MinDelay: time.Second, + MaxDelay: 10 * time.Minute, + BackoffFactor: 2, + MaxRetries: InfiniteRetriesErrRecovery, + HealthyAfter: 5 * time.Minute, + } +} + // testConnectorService fulfills the ConnectorService interface. type testConnectorService map[string]*connector.Instance diff --git a/pkg/orchestrator/orchestrator_test.go b/pkg/orchestrator/orchestrator_test.go index bcb125b27..373689232 100644 --- a/pkg/orchestrator/orchestrator_test.go +++ b/pkg/orchestrator/orchestrator_test.go @@ -38,7 +38,6 @@ import ( proc_builtin "github.com/conduitio/conduit/pkg/plugin/processor/builtin" "github.com/conduitio/conduit/pkg/processor" "github.com/google/go-cmp/cmp" - "github.com/jpillora/backoff" "github.com/matryer/is" "github.com/rs/zerolog" "go.uber.org/mock/gomock" @@ -93,13 +92,19 @@ func TestPipelineSimple(t *testing.T) { nil, ) - b := &backoff.Backoff{} - connectorService := connector.NewService(logger, db, connector.NewPersister(logger, db, time.Second, 3)) processorService := processor.NewService(logger, db, procPluginService) pipelineService := pipeline.NewService(logger, db) - lifecycleService := lifecycle.NewService(logger, b, connectorService, processorService, connPluginService, pipelineService) + errRecoveryCfg := &lifecycle.ErrRecoveryCfg{ + MinDelay: time.Second, + MaxDelay: 10 * time.Minute, + BackoffFactor: 2, + MaxRetries: 0, + HealthyAfter: 5 * time.Minute, + } + + lifecycleService := lifecycle.NewService(logger, errRecoveryCfg, connectorService, processorService, connPluginService, pipelineService) orc := NewOrchestrator( db, diff --git a/pkg/pipeline/errors.go b/pkg/pipeline/errors.go index 8a44d15b3..5e88c5bfa 100644 --- a/pkg/pipeline/errors.go +++ b/pkg/pipeline/errors.go @@ -17,19 +17,20 @@ package pipeline import "github.com/conduitio/conduit/pkg/foundation/cerrors" var ( - ErrTimeout = cerrors.New("operation timed out") - ErrGracefulShutdown = cerrors.New("graceful shutdown") - ErrForceStop = cerrors.New("force stop") - ErrPipelineRunning = cerrors.New("pipeline is running") - ErrPipelineNotRunning = cerrors.New("pipeline not running") - ErrInstanceNotFound = cerrors.New("pipeline instance not found") - ErrNameMissing = cerrors.New("must provide a pipeline name") - ErrIDMissing = cerrors.New("must provide a pipeline ID") - ErrNameAlreadyExists = cerrors.New("pipeline name already exists") - ErrInvalidCharacters = cerrors.New("pipeline ID contains invalid characters") - ErrNameOverLimit = cerrors.New("pipeline name is over the character limit (64)") - ErrIDOverLimit = cerrors.New("pipeline ID is over the character limit (64)") - ErrDescriptionOverLimit = cerrors.New("pipeline description is over the character limit (8192)") - ErrConnectorIDNotFound = cerrors.New("connector ID not found") - ErrProcessorIDNotFound = cerrors.New("processor ID not found") + ErrTimeout = cerrors.New("operation timed out") + ErrGracefulShutdown = cerrors.New("graceful shutdown") + ErrForceStop = cerrors.New("force stop") + ErrPipelineCannotRecover = cerrors.New("pipeline couldn't be recovered") + ErrPipelineRunning = cerrors.New("pipeline is running") + ErrPipelineNotRunning = cerrors.New("pipeline not running") + ErrInstanceNotFound = cerrors.New("pipeline instance not found") + ErrNameMissing = cerrors.New("must provide a pipeline name") + ErrIDMissing = cerrors.New("must provide a pipeline ID") + ErrNameAlreadyExists = cerrors.New("pipeline name already exists") + ErrInvalidCharacters = cerrors.New("pipeline ID contains invalid characters") + ErrNameOverLimit = cerrors.New("pipeline name is over the character limit (64)") + ErrIDOverLimit = cerrors.New("pipeline ID is over the character limit (64)") + ErrDescriptionOverLimit = cerrors.New("pipeline description is over the character limit (8192)") + ErrConnectorIDNotFound = cerrors.New("connector ID not found") + ErrProcessorIDNotFound = cerrors.New("processor ID not found") ) diff --git a/pkg/provisioning/service.go b/pkg/provisioning/service.go index 803e50fa7..70088c2a1 100644 --- a/pkg/provisioning/service.go +++ b/pkg/provisioning/service.go @@ -255,8 +255,6 @@ func (s *Service) provisionPipeline(ctx context.Context, cfg config.Pipeline) er // check if pipeline should be running if cfg.Status == config.StatusRunning { - // TODO set status and let the pipeline service start it - err := s.lifecycleService.Start(ctx, cfg.ID) if err != nil { return cerrors.Errorf("could not start the pipeline %q: %w", cfg.ID, err) diff --git a/pkg/provisioning/service_test.go b/pkg/provisioning/service_test.go index 3701d1fa7..4423848c9 100644 --- a/pkg/provisioning/service_test.go +++ b/pkg/provisioning/service_test.go @@ -41,7 +41,6 @@ import ( p4 "github.com/conduitio/conduit/pkg/provisioning/test/pipelines4-integration-test" "github.com/google/go-cmp/cmp" "github.com/google/go-cmp/cmp/cmpopts" - "github.com/jpillora/backoff" "github.com/matryer/is" "github.com/rs/zerolog" "go.uber.org/mock/gomock" @@ -515,16 +514,19 @@ func TestService_IntegrationTestServices(t *testing.T) { proc_builtin.NewRegistry(logger, proc_builtin.DefaultBuiltinProcessors, schemaRegistry), nil, ) - b := &backoff.Backoff{ - Factor: 2, - Min: time.Millisecond * 100, - Max: time.Second, // 8 tries - } plService := pipeline.NewService(logger, db) connService := connector.NewService(logger, db, connector.NewPersister(logger, db, time.Second, 3)) procService := processor.NewService(logger, db, procPluginService) - lifecycleService := lifecycle.NewService(logger, b, connService, procService, connPluginService, plService) + + errRecoveryCfg := &lifecycle.ErrRecoveryCfg{ + MinDelay: time.Second, + MaxDelay: 10 * time.Minute, + BackoffFactor: 2, + MaxRetries: 0, + HealthyAfter: 5 * time.Minute, + } + lifecycleService := lifecycle.NewService(logger, errRecoveryCfg, connService, procService, connPluginService, plService) // create destination file destFile := "./test/dest-file.txt"