From 81206c7e5e33da68fbc80d4e5bc45851674ef803 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ra=C3=BAl=20Barroso?= Date: Mon, 16 Sep 2024 16:53:41 +0200 Subject: [PATCH 01/40] wip --- pkg/conduit/runtime.go | 16 ++++---- pkg/lifecycle/service.go | 84 +++++++++++++++++++++++++++++++++++++++- 2 files changed, 90 insertions(+), 10 deletions(-) diff --git a/pkg/conduit/runtime.go b/pkg/conduit/runtime.go index c6df8fd15..18e0c5c3f 100644 --- a/pkg/conduit/runtime.go +++ b/pkg/conduit/runtime.go @@ -64,7 +64,6 @@ import ( "github.com/conduitio/conduit/pkg/web/ui" apiv1 "github.com/conduitio/conduit/proto/api/v1" grpcruntime "github.com/grpc-ecosystem/grpc-gateway/v2/runtime" - "github.com/jpillora/backoff" "github.com/piotrkowalczuk/promgrpc/v4" promclient "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promhttp" @@ -204,18 +203,19 @@ func createServices(r *Runtime) error { tokenService, ) - errRecovery := r.Config.Pipelines.ErrorRecovery - backoffCfg := &backoff.Backoff{ - Min: errRecovery.MinDelay, - Max: errRecovery.MaxDelay, - Factor: float64(errRecovery.BackoffFactor), - Jitter: true, + // Error recovery configuration + errRecoveryCfg := &lifecycle.ErrRecoveryCfg{ + MinDelay: r.Config.Pipelines.ErrorRecovery.MinDelay, + MaxDelay: r.Config.Pipelines.ErrorRecovery.MaxDelay, + BackoffFactor: r.Config.Pipelines.ErrorRecovery.BackoffFactor, + MaxRetries: r.Config.Pipelines.ErrorRecovery.MaxRetries, + HealthyAfter: r.Config.Pipelines.ErrorRecovery.HealthyAfter, } plService := pipeline.NewService(r.logger, r.DB) connService := connector.NewService(r.logger, r.DB, r.connectorPersister) procService := processor.NewService(r.logger, r.DB, procPluginService) - lifecycleService := lifecycle.NewService(r.logger, backoffCfg, connService, procService, connPluginService, plService) + lifecycleService := lifecycle.NewService(r.logger, errRecoveryCfg, connService, procService, connPluginService, plService) provisionService := provisioning.NewService(r.DB, r.logger, plService, connService, procService, connPluginService, lifecycleService, r.Config.Pipelines.Path) orc := orchestrator.NewOrchestrator(r.DB, r.logger, plService, connService, procService, connPluginService, procPluginService, lifecycleService) diff --git a/pkg/lifecycle/service.go b/pkg/lifecycle/service.go index 2849da9fe..7d40cd1e6 100644 --- a/pkg/lifecycle/service.go +++ b/pkg/lifecycle/service.go @@ -47,11 +47,22 @@ type FailureEvent struct { type FailureHandler func(FailureEvent) +// TODO: Move to commons? +type ErrRecoveryCfg struct { + MinDelay time.Duration + MaxDelay time.Duration + BackoffFactor int + MaxRetries int + HealthyAfter time.Duration + backoffRetryCount float64 +} + // Service manages pipelines. type Service struct { logger log.CtxLogger - backoffCfg *backoff.Backoff + backoffCfg *backoff.Backoff + errRecoveryCfg *ErrRecoveryCfg pipelines PipelineService connectors ConnectorService @@ -66,14 +77,22 @@ type Service struct { // NewService initializes and returns a lifecycle.Service. func NewService( logger log.CtxLogger, - backoffCfg *backoff.Backoff, + errRecoveryCfg *ErrRecoveryCfg, connectors ConnectorService, processors ProcessorService, connectorPlugins ConnectorPluginService, pipelines PipelineService, ) *Service { + backoffCfg := &backoff.Backoff{ + Min: errRecoveryCfg.MinDelay, + Max: errRecoveryCfg.MaxDelay, + Factor: float64(errRecoveryCfg.BackoffFactor), + Jitter: true, + } + return &Service{ logger: logger.WithComponent("lifecycle.Service"), + errRecoveryCfg: errRecoveryCfg, backoffCfg: backoffCfg, connectors: connectors, processors: processors, @@ -176,6 +195,62 @@ func (s *Service) Start( return nil } +// Restart stops an existing pipeline and starts a new one with the same configuration. +func (s *Service) Restart(ctx context.Context, pipelineID string) error { + rp, ok := s.runningPipelines.Get(pipelineID) + if !ok { + return cerrors.Errorf("pipeline %s can't be restarted: %w", pipelineID, pipeline.ErrInstanceNotFound) + } + + switch rp.pipeline.GetStatus() { + case pipeline.StatusRecovering: // let it recover + return nil + case pipeline.StatusRunning: + if err := s.Stop(ctx, pipelineID, true); err != nil { + return cerrors.Errorf("could not stop pipeline %s: %w", pipelineID, err) + } + } + + return s.Start(ctx, pipelineID) +} + +// RestartWithBackoff restarts a pipeline with a backoff. +func (s *Service) RestartWithBackoff(ctx context.Context, pipelineID string) error { + for { + err := s.Restart(ctx, pipelineID) + attempt := s.backoffCfg.Attempt() + duration := s.backoffCfg.Duration() + + if err != nil && attempt < s.errRecoveryCfg.backoffRetryCount { + s.logger.Debug(ctx). + Str(log.PipelineIDField, pipelineID). + Float64("attempt", attempt). + Float64("backoffRetry.count", s.errRecoveryCfg.backoffRetryCount). + Int64("backoffRetry.duration", duration.Milliseconds()). + Msg("retrying pipeline recovery") + + select { + case <-ctx.Done(): + return ctx.Err() + case <-time.After(duration): + continue + } + } + + s.backoffCfg.Reset() + return err + + // TODO: Utilize + // MaxRetries + // HealthyAfter + + //if attempt >= s.errRecoveryCfg.MaxRetries { + // return cerrors.Errorf("failed to restart pipeline %s after %d attempts: %w", pipelineID, s.backoffCfg.Max, err) + //} + + } +} + // Stop will attempt to gracefully stop a given pipeline by calling each node's // Stop function. If force is set to true the pipeline won't stop gracefully, // instead the context for all nodes will be canceled which causes them to stop @@ -704,6 +779,11 @@ func (s *Service) runPipeline(ctx context.Context, rp *runnablePipeline) error { if err != nil { return err } + + err = s.RestartWithBackoff(ctx, rp.pipeline.ID) + if err != nil { + return err + } } } From 902110627fbb0c0dab5143839c117eb912908c31 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ra=C3=BAl=20Barroso?= Date: Tue, 17 Sep 2024 15:45:53 +0200 Subject: [PATCH 02/40] implement restart method --- pkg/conduit/config.go | 13 ++++++++---- pkg/lifecycle/service.go | 44 ++++++++++++++++++++++++++++------------ 2 files changed, 40 insertions(+), 17 deletions(-) diff --git a/pkg/conduit/config.go b/pkg/conduit/config.go index 82958dc90..e83c81394 100644 --- a/pkg/conduit/config.go +++ b/pkg/conduit/config.go @@ -87,11 +87,16 @@ type Config struct { Path string ExitOnError bool ErrorRecovery struct { - MinDelay time.Duration - MaxDelay time.Duration + // MinDelay is the minimum delay before restart: Default: 1 second + MinDelay time.Duration + // MaxDelay is the maximum delay before restart: Default: 10 minutes + MaxDelay time.Duration + // BackoffFactor is the factor by which the delay is multiplied after each restart: Default: 2 BackoffFactor int - MaxRetries int - HealthyAfter time.Duration + // MaxRetries is the maximum number of restarts before the pipeline is considered unhealthy: Default: 0 (infinite) + MaxRetries int + // HealthyAfter is the time after which the pipeline is considered healthy: Default: 5 minutes + HealthyAfter time.Duration } } diff --git a/pkg/lifecycle/service.go b/pkg/lifecycle/service.go index 7d40cd1e6..785be12c8 100644 --- a/pkg/lifecycle/service.go +++ b/pkg/lifecycle/service.go @@ -217,11 +217,32 @@ func (s *Service) Restart(ctx context.Context, pipelineID string) error { // RestartWithBackoff restarts a pipeline with a backoff. func (s *Service) RestartWithBackoff(ctx context.Context, pipelineID string) error { for { - err := s.Restart(ctx, pipelineID) attempt := s.backoffCfg.Attempt() duration := s.backoffCfg.Duration() - if err != nil && attempt < s.errRecoveryCfg.backoffRetryCount { + p, err := s.pipelines.Get(ctx, pipelineID) + if err != nil { + return fmt.Errorf("could not fetch pipeline %s: %w", pipelineID, err) + } + + if p.GetStatus() == pipeline.StatusRunning { + s.logger.Debug(ctx). + Str(log.PipelineIDField, pipelineID). + Float64("attempt", attempt). + Float64("backoffRetry.count", s.errRecoveryCfg.backoffRetryCount). + Int64("backoffRetry.duration", duration.Milliseconds()). + Msg("pipeline recovered") + s.backoffCfg.Reset() + return nil + } + + // HealthyAfter needs to be reset elsewhere + if duration > s.errRecoveryCfg.HealthyAfter { + s.backoffCfg.Reset() + return nil + } + + if attempt < s.errRecoveryCfg.backoffRetryCount { s.logger.Debug(ctx). Str(log.PipelineIDField, pipelineID). Float64("attempt", attempt). @@ -229,6 +250,11 @@ func (s *Service) RestartWithBackoff(ctx context.Context, pipelineID string) err Int64("backoffRetry.duration", duration.Milliseconds()). Msg("retrying pipeline recovery") + err := s.Restart(ctx, pipelineID) + if err == nil { + return err + } + select { case <-ctx.Done(): return ctx.Err() @@ -237,17 +263,10 @@ func (s *Service) RestartWithBackoff(ctx context.Context, pipelineID string) err } } - s.backoffCfg.Reset() + if attempt >= float64(s.errRecoveryCfg.MaxRetries) { + return cerrors.Errorf("failed to restart pipeline %s after %d attempts: %w", pipelineID, attempt, err) + } return err - - // TODO: Utilize - // MaxRetries - // HealthyAfter - - //if attempt >= s.errRecoveryCfg.MaxRetries { - // return cerrors.Errorf("failed to restart pipeline %s after %d attempts: %w", pipelineID, s.backoffCfg.Max, err) - //} - } } @@ -779,7 +798,6 @@ func (s *Service) runPipeline(ctx context.Context, rp *runnablePipeline) error { if err != nil { return err } - err = s.RestartWithBackoff(ctx, rp.pipeline.ID) if err != nil { return err From e1fbaf683d822f14d8ec188a7b3d01724d1f677e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ra=C3=BAl=20Barroso?= Date: Wed, 18 Sep 2024 09:27:40 +0200 Subject: [PATCH 03/40] implement restart swapping nodes --- pkg/conduit/runtime.go | 2 +- pkg/lifecycle/service.go | 179 ++++++++++++++++++++------------------- pkg/pipeline/errors.go | 31 +++---- 3 files changed, 108 insertions(+), 104 deletions(-) diff --git a/pkg/conduit/runtime.go b/pkg/conduit/runtime.go index 18e0c5c3f..43504062a 100644 --- a/pkg/conduit/runtime.go +++ b/pkg/conduit/runtime.go @@ -209,7 +209,7 @@ func createServices(r *Runtime) error { MaxDelay: r.Config.Pipelines.ErrorRecovery.MaxDelay, BackoffFactor: r.Config.Pipelines.ErrorRecovery.BackoffFactor, MaxRetries: r.Config.Pipelines.ErrorRecovery.MaxRetries, - HealthyAfter: r.Config.Pipelines.ErrorRecovery.HealthyAfter, + HealthyAfter: r.Config.Pipelines.ErrorRecovery.HealthyAfter, // TODO: possibly create a go routine to continuously check health and reset status when needed } plService := pipeline.NewService(r.logger, r.DB) diff --git a/pkg/lifecycle/service.go b/pkg/lifecycle/service.go index 785be12c8..3b9e081e0 100644 --- a/pkg/lifecycle/service.go +++ b/pkg/lifecycle/service.go @@ -47,21 +47,18 @@ type FailureEvent struct { type FailureHandler func(FailureEvent) -// TODO: Move to commons? type ErrRecoveryCfg struct { - MinDelay time.Duration - MaxDelay time.Duration - BackoffFactor int - MaxRetries int - HealthyAfter time.Duration - backoffRetryCount float64 + MinDelay time.Duration + MaxDelay time.Duration + BackoffFactor int + MaxRetries int + HealthyAfter time.Duration } // Service manages pipelines. type Service struct { logger log.CtxLogger - backoffCfg *backoff.Backoff errRecoveryCfg *ErrRecoveryCfg pipelines PipelineService @@ -83,17 +80,9 @@ func NewService( connectorPlugins ConnectorPluginService, pipelines PipelineService, ) *Service { - backoffCfg := &backoff.Backoff{ - Min: errRecoveryCfg.MinDelay, - Max: errRecoveryCfg.MaxDelay, - Factor: float64(errRecoveryCfg.BackoffFactor), - Jitter: true, - } - return &Service{ logger: logger.WithComponent("lifecycle.Service"), errRecoveryCfg: errRecoveryCfg, - backoffCfg: backoffCfg, connectors: connectors, processors: processors, connectorPlugins: connectorPlugins, @@ -103,9 +92,10 @@ func NewService( } type runnablePipeline struct { - pipeline *pipeline.Instance - n []stream.Node - t *tomb.Tomb + pipeline *pipeline.Instance + n []stream.Node + t *tomb.Tomb + backoffCfg backoff.Backoff } // ConnectorService can fetch and create a connector instance. @@ -195,79 +185,75 @@ func (s *Service) Start( return nil } -// Restart stops an existing pipeline and starts a new one with the same configuration. -func (s *Service) Restart(ctx context.Context, pipelineID string) error { - rp, ok := s.runningPipelines.Get(pipelineID) +// Restart stops an existing pipeline and replaces their nodes. +func (s *Service) Restart(ctx context.Context, rp *runnablePipeline) error { + rp, ok := s.runningPipelines.Get(rp.pipeline.ID) if !ok { - return cerrors.Errorf("pipeline %s can't be restarted: %w", pipelineID, pipeline.ErrInstanceNotFound) + return cerrors.Errorf("pipeline %s can't be restarted: %w", rp.pipeline.ID, pipeline.ErrInstanceNotFound) } - switch rp.pipeline.GetStatus() { - case pipeline.StatusRecovering: // let it recover - return nil - case pipeline.StatusRunning: - if err := s.Stop(ctx, pipelineID, true); err != nil { - return cerrors.Errorf("could not stop pipeline %s: %w", pipelineID, err) + // In case we want to restart a running pipeline. + if rp.pipeline.GetStatus() == pipeline.StatusRunning { + if err := s.Stop(ctx, rp.pipeline.ID, true); err != nil { + return cerrors.Errorf("could not stop pipeline %s: %w", rp.pipeline.ID, err) } } - return s.Start(ctx, pipelineID) -} + s.logger.Debug(ctx).Str(log.PipelineIDField, rp.pipeline.ID).Msg("restarting pipeline") + s.logger.Trace(ctx).Str(log.PipelineIDField, rp.pipeline.ID).Msg("swapping nodes") -// RestartWithBackoff restarts a pipeline with a backoff. -func (s *Service) RestartWithBackoff(ctx context.Context, pipelineID string) error { - for { - attempt := s.backoffCfg.Attempt() - duration := s.backoffCfg.Duration() + nodes, err := s.buildNodes(ctx, rp.pipeline) + if err != nil { + return cerrors.Errorf("could not build new nodes for pipeline %s: %w", rp.pipeline.ID, err) + } - p, err := s.pipelines.Get(ctx, pipelineID) - if err != nil { - return fmt.Errorf("could not fetch pipeline %s: %w", pipelineID, err) - } + rp.n = nodes - if p.GetStatus() == pipeline.StatusRunning { - s.logger.Debug(ctx). - Str(log.PipelineIDField, pipelineID). - Float64("attempt", attempt). - Float64("backoffRetry.count", s.errRecoveryCfg.backoffRetryCount). - Int64("backoffRetry.duration", duration.Milliseconds()). - Msg("pipeline recovered") - s.backoffCfg.Reset() - return nil - } + s.logger.Trace(ctx).Str(log.PipelineIDField, rp.pipeline.ID).Msg("running nodes") + if err := s.runPipeline(ctx, rp); err != nil { + return cerrors.Errorf("failed to run pipeline %s: %w", rp.pipeline.ID, err) + } + s.logger.Info(ctx).Str(log.PipelineIDField, rp.pipeline.ID).Msg("pipeline re-started ") - // HealthyAfter needs to be reset elsewhere - if duration > s.errRecoveryCfg.HealthyAfter { - s.backoffCfg.Reset() - return nil - } + return nil +} - if attempt < s.errRecoveryCfg.backoffRetryCount { - s.logger.Debug(ctx). - Str(log.PipelineIDField, pipelineID). - Float64("attempt", attempt). - Float64("backoffRetry.count", s.errRecoveryCfg.backoffRetryCount). - Int64("backoffRetry.duration", duration.Milliseconds()). - Msg("retrying pipeline recovery") +// RestartWithBackoff restarts a pipeline with a backoff. +func (s *Service) RestartWithBackoff(ctx context.Context, rp *runnablePipeline) error { + // backoffCfg.Attempt() returns a float64 + attempt := int(rp.backoffCfg.Attempt()) + duration := rp.backoffCfg.Duration() - err := s.Restart(ctx, pipelineID) - if err == nil { - return err - } + if attempt > s.errRecoveryCfg.MaxRetries { + return cerrors.FatalError(cerrors.Errorf("failed to recover pipeline %s after %d attempts: %w", rp.pipeline.ID, attempt, pipeline.ErrPipelineCannotRecover)) + } - select { - case <-ctx.Done(): - return ctx.Err() - case <-time.After(duration): - continue - } - } + // This results in a default delay progression of 1s, 2s, 4s, 8s, 16s, [...], 10m, 10m,... balancing the need for recovery time and minimizing downtime. + timer := time.NewTimer(duration) + select { + case <-ctx.Done(): + return ctx.Err() + case <-time.After(duration): + <-timer.C + } - if attempt >= float64(s.errRecoveryCfg.MaxRetries) { - return cerrors.Errorf("failed to restart pipeline %s after %d attempts: %w", pipelineID, attempt, err) - } - return err + // Get status of pipeline, if it recovers (it's running), + pl, err := s.pipelines.Get(ctx, rp.pipeline.ID) + if err != nil { + return cerrors.FatalError(fmt.Errorf("could not fetch pipeline %s: %w", rp.pipeline.ID, err)) } + + if pl.GetStatus() == pipeline.StatusRunning { + s.logger.Debug(ctx). + Str(log.PipelineIDField, rp.pipeline.ID). + Int("attempt", attempt). + Int("backoffRetry.count", s.errRecoveryCfg.BackoffFactor). + Int64("backoffRetry.duration", duration.Milliseconds()). + Msg("pipeline recovered") + return nil + } + + return s.Restart(ctx, rp) } // Stop will attempt to gracefully stop a given pipeline by calling each node's @@ -403,11 +389,8 @@ func (s *Service) WaitPipeline(id string) error { return p.t.Wait() } -// buildRunnablePipeline will build and connect all nodes configured in the pipeline. -func (s *Service) buildRunnablePipeline( - ctx context.Context, - pl *pipeline.Instance, -) (*runnablePipeline, error) { +// buildsNodes will build and connect all nodes configured in the pipeline. +func (s *Service) buildNodes(ctx context.Context, pl *pipeline.Instance) ([]stream.Node, error) { // setup many to many channels fanIn := stream.FaninNode{Name: "fanin"} fanOut := stream.FanoutNode{Name: "fanout"} @@ -447,10 +430,31 @@ func (s *Service) buildRunnablePipeline( for _, n := range nodes { stream.SetLogger(n, nodeLogger) } + return nodes, nil +} + +// buildRunnablePipeline will build and connect all nodes configured in the pipeline. +func (s *Service) buildRunnablePipeline( + ctx context.Context, + pl *pipeline.Instance, +) (*runnablePipeline, error) { + // Each pipeline will utilize this backoff configuration to recover from errors. + backoffCfg := &backoff.Backoff{ + Min: s.errRecoveryCfg.MinDelay, + Max: s.errRecoveryCfg.MaxDelay, + Factor: float64(s.errRecoveryCfg.BackoffFactor), + Jitter: true, + } + + nodes, err := s.buildNodes(ctx, pl) + if err != nil { + return nil, err + } return &runnablePipeline{ - pipeline: pl, - n: nodes, + pipeline: pl, + n: nodes, + backoffCfg: *backoffCfg, }, nil } @@ -798,10 +802,9 @@ func (s *Service) runPipeline(ctx context.Context, rp *runnablePipeline) error { if err != nil { return err } - err = s.RestartWithBackoff(ctx, rp.pipeline.ID) - if err != nil { - return err - } + + // Exit the goroutine and attempt to restart the pipeline + return s.RestartWithBackoff(ctx, rp) } } diff --git a/pkg/pipeline/errors.go b/pkg/pipeline/errors.go index 8a44d15b3..5e88c5bfa 100644 --- a/pkg/pipeline/errors.go +++ b/pkg/pipeline/errors.go @@ -17,19 +17,20 @@ package pipeline import "github.com/conduitio/conduit/pkg/foundation/cerrors" var ( - ErrTimeout = cerrors.New("operation timed out") - ErrGracefulShutdown = cerrors.New("graceful shutdown") - ErrForceStop = cerrors.New("force stop") - ErrPipelineRunning = cerrors.New("pipeline is running") - ErrPipelineNotRunning = cerrors.New("pipeline not running") - ErrInstanceNotFound = cerrors.New("pipeline instance not found") - ErrNameMissing = cerrors.New("must provide a pipeline name") - ErrIDMissing = cerrors.New("must provide a pipeline ID") - ErrNameAlreadyExists = cerrors.New("pipeline name already exists") - ErrInvalidCharacters = cerrors.New("pipeline ID contains invalid characters") - ErrNameOverLimit = cerrors.New("pipeline name is over the character limit (64)") - ErrIDOverLimit = cerrors.New("pipeline ID is over the character limit (64)") - ErrDescriptionOverLimit = cerrors.New("pipeline description is over the character limit (8192)") - ErrConnectorIDNotFound = cerrors.New("connector ID not found") - ErrProcessorIDNotFound = cerrors.New("processor ID not found") + ErrTimeout = cerrors.New("operation timed out") + ErrGracefulShutdown = cerrors.New("graceful shutdown") + ErrForceStop = cerrors.New("force stop") + ErrPipelineCannotRecover = cerrors.New("pipeline couldn't be recovered") + ErrPipelineRunning = cerrors.New("pipeline is running") + ErrPipelineNotRunning = cerrors.New("pipeline not running") + ErrInstanceNotFound = cerrors.New("pipeline instance not found") + ErrNameMissing = cerrors.New("must provide a pipeline name") + ErrIDMissing = cerrors.New("must provide a pipeline ID") + ErrNameAlreadyExists = cerrors.New("pipeline name already exists") + ErrInvalidCharacters = cerrors.New("pipeline ID contains invalid characters") + ErrNameOverLimit = cerrors.New("pipeline name is over the character limit (64)") + ErrIDOverLimit = cerrors.New("pipeline ID is over the character limit (64)") + ErrDescriptionOverLimit = cerrors.New("pipeline description is over the character limit (8192)") + ErrConnectorIDNotFound = cerrors.New("connector ID not found") + ErrProcessorIDNotFound = cerrors.New("processor ID not found") ) From 90f03044a5205ef5a35d42e1b84e9b4d4ef7ee5c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ra=C3=BAl=20Barroso?= Date: Wed, 18 Sep 2024 10:32:58 +0200 Subject: [PATCH 04/40] fix method calls --- pkg/lifecycle/service_test.go | 25 ++++++++----------------- pkg/orchestrator/orchestrator_test.go | 5 +---- pkg/provisioning/service.go | 2 -- pkg/provisioning/service_test.go | 8 +------- 4 files changed, 10 insertions(+), 30 deletions(-) diff --git a/pkg/lifecycle/service_test.go b/pkg/lifecycle/service_test.go index d3bb0a7f0..69b1eaa3b 100644 --- a/pkg/lifecycle/service_test.go +++ b/pkg/lifecycle/service_test.go @@ -36,7 +36,6 @@ import ( pmock "github.com/conduitio/conduit/pkg/plugin/connector/mock" "github.com/conduitio/conduit/pkg/processor" "github.com/google/uuid" - "github.com/jpillora/backoff" "github.com/matryer/is" "github.com/rs/zerolog" "go.uber.org/mock/gomock" @@ -52,7 +51,6 @@ func TestServiceLifecycle_buildRunnablePipeline(t *testing.T) { logger := log.New(zerolog.Nop()) db := &inmemory.DB{} persister := connector.NewPersister(logger, db, time.Second, 3) - b := &backoff.Backoff{} source := dummySource(persister) destination := dummyDestination(persister) @@ -72,7 +70,7 @@ func TestServiceLifecycle_buildRunnablePipeline(t *testing.T) { ls := NewService( logger, - b, + nil, testConnectorService{ source.ID: source, destination.ID: destination, @@ -145,7 +143,6 @@ func TestService_buildRunnablePipeline_NoSourceNode(t *testing.T) { logger := log.New(zerolog.Nop()) db := &inmemory.DB{} persister := connector.NewPersister(logger, db, time.Second, 3) - b := &backoff.Backoff{} destination := dummyDestination(persister) dlq := dummyDestination(persister) @@ -162,7 +159,7 @@ func TestService_buildRunnablePipeline_NoSourceNode(t *testing.T) { } pl.SetStatus(pipeline.StatusUserStopped) - ls := NewService(logger, b, testConnectorService{ + ls := NewService(logger, nil, testConnectorService{ destination.ID: destination, testDLQID: dlq, }, testProcessorService{}, @@ -191,12 +188,11 @@ func TestService_buildRunnablePipeline_NoDestinationNode(t *testing.T) { logger := log.New(zerolog.Nop()) db := &inmemory.DB{} persister := connector.NewPersister(logger, db, time.Second, 3) - b := &backoff.Backoff{} source := dummySource(persister) dlq := dummyDestination(persister) - ls := NewService(logger, b, testConnectorService{ + ls := NewService(logger, nil, testConnectorService{ source.ID: source, testDLQID: dlq, }, @@ -239,7 +235,6 @@ func TestServiceLifecycle_PipelineSuccess(t *testing.T) { db := &inmemory.DB{} persister := connector.NewPersister(logger, db, time.Second, 3) defer persister.Wait() - b := &backoff.Backoff{} ps := pipeline.NewService(logger, db) @@ -260,7 +255,7 @@ func TestServiceLifecycle_PipelineSuccess(t *testing.T) { pl, err = ps.AddConnector(ctx, pl.ID, destination.ID) is.NoErr(err) - ls := NewService(logger, b, testConnectorService{ + ls := NewService(logger, nil, testConnectorService{ source.ID: source, destination.ID: destination, testDLQID: dlq, @@ -301,7 +296,6 @@ func TestServiceLifecycle_PipelineError(t *testing.T) { logger := log.Test(t) db := &inmemory.DB{} persister := connector.NewPersister(logger, db, time.Second, 3) - b := &backoff.Backoff{} ps := pipeline.NewService(logger, db) @@ -323,7 +317,7 @@ func TestServiceLifecycle_PipelineError(t *testing.T) { pl, err = ps.AddConnector(ctx, pl.ID, destination.ID) is.NoErr(err) - ls := NewService(logger, b, testConnectorService{ + ls := NewService(logger, nil, testConnectorService{ source.ID: source, destination.ID: destination, testDLQID: dlq, @@ -388,7 +382,6 @@ func TestServiceLifecycle_StopAll_Recovering(t *testing.T) { logger := log.New(zerolog.Nop()) db := &inmemory.DB{} persister := connector.NewPersister(logger, db, time.Second, 3) - b := &backoff.Backoff{} ps := pipeline.NewService(logger, db) @@ -411,7 +404,7 @@ func TestServiceLifecycle_StopAll_Recovering(t *testing.T) { pl, err = ps.AddConnector(ctx, pl.ID, destination.ID) is.NoErr(err) - ls := NewService(logger, b, testConnectorService{ + ls := NewService(logger, nil, testConnectorService{ source.ID: source, destination.ID: destination, testDLQID: dlq, @@ -500,7 +493,6 @@ func TestServiceLifecycle_PipelineStop(t *testing.T) { logger := log.New(zerolog.Nop()) db := &inmemory.DB{} persister := connector.NewPersister(logger, db, time.Second, 3) - b := &backoff.Backoff{} ps := pipeline.NewService(logger, db) @@ -523,7 +515,7 @@ func TestServiceLifecycle_PipelineStop(t *testing.T) { pl, err = ps.AddConnector(ctx, pl.ID, destination.ID) is.NoErr(err) - ls := NewService(logger, b, testConnectorService{ + ls := NewService(logger, nil, testConnectorService{ source.ID: source, destination.ID: destination, testDLQID: dlq, @@ -563,7 +555,6 @@ func TestServiceLifecycle_Run_Rerun(t *testing.T) { logger := log.Test(t) db := &inmemory.DB{} persister := connector.NewPersister(logger, db, time.Second, 3) - b := &backoff.Backoff{} ps := pipeline.NewService(logger, db) @@ -606,7 +597,7 @@ func TestServiceLifecycle_Run_Rerun(t *testing.T) { err = ps.Init(ctx) is.NoErr(err) - ls := NewService(logger, b, testConnectorService{ + ls := NewService(logger, nil, testConnectorService{ source.ID: source, destination.ID: destination, testDLQID: dlq, diff --git a/pkg/orchestrator/orchestrator_test.go b/pkg/orchestrator/orchestrator_test.go index bcb125b27..b1fcce188 100644 --- a/pkg/orchestrator/orchestrator_test.go +++ b/pkg/orchestrator/orchestrator_test.go @@ -38,7 +38,6 @@ import ( proc_builtin "github.com/conduitio/conduit/pkg/plugin/processor/builtin" "github.com/conduitio/conduit/pkg/processor" "github.com/google/go-cmp/cmp" - "github.com/jpillora/backoff" "github.com/matryer/is" "github.com/rs/zerolog" "go.uber.org/mock/gomock" @@ -93,13 +92,11 @@ func TestPipelineSimple(t *testing.T) { nil, ) - b := &backoff.Backoff{} - connectorService := connector.NewService(logger, db, connector.NewPersister(logger, db, time.Second, 3)) processorService := processor.NewService(logger, db, procPluginService) pipelineService := pipeline.NewService(logger, db) - lifecycleService := lifecycle.NewService(logger, b, connectorService, processorService, connPluginService, pipelineService) + lifecycleService := lifecycle.NewService(logger, nil, connectorService, processorService, connPluginService, pipelineService) orc := NewOrchestrator( db, diff --git a/pkg/provisioning/service.go b/pkg/provisioning/service.go index 803e50fa7..70088c2a1 100644 --- a/pkg/provisioning/service.go +++ b/pkg/provisioning/service.go @@ -255,8 +255,6 @@ func (s *Service) provisionPipeline(ctx context.Context, cfg config.Pipeline) er // check if pipeline should be running if cfg.Status == config.StatusRunning { - // TODO set status and let the pipeline service start it - err := s.lifecycleService.Start(ctx, cfg.ID) if err != nil { return cerrors.Errorf("could not start the pipeline %q: %w", cfg.ID, err) diff --git a/pkg/provisioning/service_test.go b/pkg/provisioning/service_test.go index 3701d1fa7..fbcc88827 100644 --- a/pkg/provisioning/service_test.go +++ b/pkg/provisioning/service_test.go @@ -41,7 +41,6 @@ import ( p4 "github.com/conduitio/conduit/pkg/provisioning/test/pipelines4-integration-test" "github.com/google/go-cmp/cmp" "github.com/google/go-cmp/cmp/cmpopts" - "github.com/jpillora/backoff" "github.com/matryer/is" "github.com/rs/zerolog" "go.uber.org/mock/gomock" @@ -515,16 +514,11 @@ func TestService_IntegrationTestServices(t *testing.T) { proc_builtin.NewRegistry(logger, proc_builtin.DefaultBuiltinProcessors, schemaRegistry), nil, ) - b := &backoff.Backoff{ - Factor: 2, - Min: time.Millisecond * 100, - Max: time.Second, // 8 tries - } plService := pipeline.NewService(logger, db) connService := connector.NewService(logger, db, connector.NewPersister(logger, db, time.Second, 3)) procService := processor.NewService(logger, db, procPluginService) - lifecycleService := lifecycle.NewService(logger, b, connService, procService, connPluginService, plService) + lifecycleService := lifecycle.NewService(logger, nil, connService, procService, connPluginService, plService) // create destination file destFile := "./test/dest-file.txt" From 859eb2da5baa826f4d16fa2f0c2bf0c435b536f2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ra=C3=BAl=20Barroso?= Date: Wed, 18 Sep 2024 12:09:18 +0200 Subject: [PATCH 05/40] fix some tests --- pkg/conduit/config.go | 2 +- pkg/lifecycle/service_test.go | 37 +++++++++++++++++++++++------------ 2 files changed, 26 insertions(+), 13 deletions(-) diff --git a/pkg/conduit/config.go b/pkg/conduit/config.go index e83c81394..b7abb1cb8 100644 --- a/pkg/conduit/config.go +++ b/pkg/conduit/config.go @@ -142,7 +142,7 @@ func DefaultConfig() Config { cfg.Pipelines.ErrorRecovery.MinDelay = time.Second cfg.Pipelines.ErrorRecovery.MaxDelay = 10 * time.Minute cfg.Pipelines.ErrorRecovery.BackoffFactor = 2 - cfg.Pipelines.ErrorRecovery.MaxRetries = 0 + cfg.Pipelines.ErrorRecovery.MaxRetries = 0 // infinite retries cfg.Pipelines.ErrorRecovery.HealthyAfter = 5 * time.Minute cfg.SchemaRegistry.Type = SchemaRegistryTypeBuiltin diff --git a/pkg/lifecycle/service_test.go b/pkg/lifecycle/service_test.go index 69b1eaa3b..c14d5bbc4 100644 --- a/pkg/lifecycle/service_test.go +++ b/pkg/lifecycle/service_test.go @@ -70,7 +70,7 @@ func TestServiceLifecycle_buildRunnablePipeline(t *testing.T) { ls := NewService( logger, - nil, + testErrRecoveryCfg(), testConnectorService{ source.ID: source, destination.ID: destination, @@ -159,7 +159,7 @@ func TestService_buildRunnablePipeline_NoSourceNode(t *testing.T) { } pl.SetStatus(pipeline.StatusUserStopped) - ls := NewService(logger, nil, testConnectorService{ + ls := NewService(logger, testErrRecoveryCfg(), testConnectorService{ destination.ID: destination, testDLQID: dlq, }, testProcessorService{}, @@ -192,7 +192,7 @@ func TestService_buildRunnablePipeline_NoDestinationNode(t *testing.T) { source := dummySource(persister) dlq := dummyDestination(persister) - ls := NewService(logger, nil, testConnectorService{ + ls := NewService(logger, testErrRecoveryCfg(), testConnectorService{ source.ID: source, testDLQID: dlq, }, @@ -255,7 +255,7 @@ func TestServiceLifecycle_PipelineSuccess(t *testing.T) { pl, err = ps.AddConnector(ctx, pl.ID, destination.ID) is.NoErr(err) - ls := NewService(logger, nil, testConnectorService{ + ls := NewService(logger, testErrRecoveryCfg(), testConnectorService{ source.ID: source, destination.ID: destination, testDLQID: dlq, @@ -317,7 +317,7 @@ func TestServiceLifecycle_PipelineError(t *testing.T) { pl, err = ps.AddConnector(ctx, pl.ID, destination.ID) is.NoErr(err) - ls := NewService(logger, nil, testConnectorService{ + ls := NewService(logger, testErrRecoveryCfg(), testConnectorService{ source.ID: source, destination.ID: destination, testDLQID: dlq, @@ -404,11 +404,14 @@ func TestServiceLifecycle_StopAll_Recovering(t *testing.T) { pl, err = ps.AddConnector(ctx, pl.ID, destination.ID) is.NoErr(err) - ls := NewService(logger, nil, testConnectorService{ - source.ID: source, - destination.ID: destination, - testDLQID: dlq, - }, + ls := NewService( + logger, + testErrRecoveryCfg(), + testConnectorService{ + source.ID: source, + destination.ID: destination, + testDLQID: dlq, + }, testProcessorService{}, testConnectorPluginService{ source.Plugin: sourceDispenser, @@ -515,7 +518,7 @@ func TestServiceLifecycle_PipelineStop(t *testing.T) { pl, err = ps.AddConnector(ctx, pl.ID, destination.ID) is.NoErr(err) - ls := NewService(logger, nil, testConnectorService{ + ls := NewService(logger, testErrRecoveryCfg(), testConnectorService{ source.ID: source, destination.ID: destination, testDLQID: dlq, @@ -597,7 +600,7 @@ func TestServiceLifecycle_Run_Rerun(t *testing.T) { err = ps.Init(ctx) is.NoErr(err) - ls := NewService(logger, nil, testConnectorService{ + ls := NewService(logger, testErrRecoveryCfg(), testConnectorService{ source.ID: source, destination.ID: destination, testDLQID: dlq, @@ -753,6 +756,16 @@ func dummyDestination(persister *connector.Persister) *connector.Instance { return destination } +func testErrRecoveryCfg() *ErrRecoveryCfg { + return &ErrRecoveryCfg{ + MinDelay: time.Second, + MaxDelay: 10 * time.Minute, + BackoffFactor: 2, + MaxRetries: 0, // infinite retries + HealthyAfter: 5 * time.Minute, + } +} + // testConnectorService fulfills the ConnectorService interface. type testConnectorService map[string]*connector.Instance From cc460ad2333dcbf03de354b8c5f014119ad736f0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ra=C3=BAl=20Barroso?= Date: Wed, 18 Sep 2024 12:11:03 +0200 Subject: [PATCH 06/40] fix another test --- pkg/orchestrator/orchestrator_test.go | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/pkg/orchestrator/orchestrator_test.go b/pkg/orchestrator/orchestrator_test.go index b1fcce188..67ae8e23d 100644 --- a/pkg/orchestrator/orchestrator_test.go +++ b/pkg/orchestrator/orchestrator_test.go @@ -96,7 +96,16 @@ func TestPipelineSimple(t *testing.T) { processorService := processor.NewService(logger, db, procPluginService) pipelineService := pipeline.NewService(logger, db) - lifecycleService := lifecycle.NewService(logger, nil, connectorService, processorService, connPluginService, pipelineService) + lifecycleService := lifecycle.NewService( + logger, + &lifecycle.ErrRecoveryCfg{ + MinDelay: time.Second, + MaxDelay: 10 * time.Minute, + BackoffFactor: 2, + MaxRetries: 0, + HealthyAfter: 5 * time.Minute, + }, + connectorService, processorService, connPluginService, pipelineService) orc := NewOrchestrator( db, From c20d89d8d266af0ff49963ea9b9eb3d96101a2f2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ra=C3=BAl=20Barroso?= Date: Wed, 18 Sep 2024 12:15:13 +0200 Subject: [PATCH 07/40] fix test --- pkg/provisioning/service_test.go | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/pkg/provisioning/service_test.go b/pkg/provisioning/service_test.go index fbcc88827..4423848c9 100644 --- a/pkg/provisioning/service_test.go +++ b/pkg/provisioning/service_test.go @@ -518,7 +518,15 @@ func TestService_IntegrationTestServices(t *testing.T) { plService := pipeline.NewService(logger, db) connService := connector.NewService(logger, db, connector.NewPersister(logger, db, time.Second, 3)) procService := processor.NewService(logger, db, procPluginService) - lifecycleService := lifecycle.NewService(logger, nil, connService, procService, connPluginService, plService) + + errRecoveryCfg := &lifecycle.ErrRecoveryCfg{ + MinDelay: time.Second, + MaxDelay: 10 * time.Minute, + BackoffFactor: 2, + MaxRetries: 0, + HealthyAfter: 5 * time.Minute, + } + lifecycleService := lifecycle.NewService(logger, errRecoveryCfg, connService, procService, connPluginService, plService) // create destination file destFile := "./test/dest-file.txt" From ad739b552fcd07211ba27f1f49adb32e3be5cde3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ra=C3=BAl=20Barroso?= Date: Wed, 18 Sep 2024 19:48:34 +0200 Subject: [PATCH 08/40] update comment and refactor test --- pkg/lifecycle/service.go | 3 ++ pkg/lifecycle/service_test.go | 52 +++++++++++++++++++++-------------- 2 files changed, 34 insertions(+), 21 deletions(-) diff --git a/pkg/lifecycle/service.go b/pkg/lifecycle/service.go index 3b9e081e0..75a2094a7 100644 --- a/pkg/lifecycle/service.go +++ b/pkg/lifecycle/service.go @@ -207,6 +207,7 @@ func (s *Service) Restart(ctx context.Context, rp *runnablePipeline) error { return cerrors.Errorf("could not build new nodes for pipeline %s: %w", rp.pipeline.ID, err) } + // Replaces the old nodes with the new ones. rp.n = nodes s.logger.Trace(ctx).Str(log.PipelineIDField, rp.pipeline.ID).Msg("running nodes") @@ -219,6 +220,8 @@ func (s *Service) Restart(ctx context.Context, rp *runnablePipeline) error { } // RestartWithBackoff restarts a pipeline with a backoff. +// It'll check the number of times the pipeline has been restarted and the duration of the backoff. +// When the pipeline has reached out the maximum number of retries, it'll return a fatal error. func (s *Service) RestartWithBackoff(ctx context.Context, rp *runnablePipeline) error { // backoffCfg.Attempt() returns a float64 attempt := int(rp.backoffCfg.Attempt()) diff --git a/pkg/lifecycle/service_test.go b/pkg/lifecycle/service_test.go index c14d5bbc4..807c18139 100644 --- a/pkg/lifecycle/service_test.go +++ b/pkg/lifecycle/service_test.go @@ -245,7 +245,7 @@ func TestServiceLifecycle_PipelineSuccess(t *testing.T) { // create mocked connectors ctrl := gomock.NewController(t) wantRecords := generateRecords(10) - source, sourceDispenser := generatorSource(ctrl, persister, wantRecords, nil, true) + source, srcDispenser := asserterSource(ctrl, persister, wantRecords, nil, true) destination, destDispenser := asserterDestination(ctrl, persister, wantRecords) dlq, dlqDispenser := asserterDestination(ctrl, persister, nil) pl.DLQ.Plugin = dlq.Plugin @@ -262,7 +262,7 @@ func TestServiceLifecycle_PipelineSuccess(t *testing.T) { }, testProcessorService{}, testConnectorPluginService{ - source.Plugin: sourceDispenser, + source.Plugin: srcDispenser, destination.Plugin: destDispenser, dlq.Plugin: dlqDispenser, }, ps) @@ -307,7 +307,7 @@ func TestServiceLifecycle_PipelineError(t *testing.T) { wantErr := cerrors.New("source connector error") ctrl := gomock.NewController(t) wantRecords := generateRecords(10) - source, sourceDispenser := generatorSource(ctrl, persister, wantRecords, wantErr, false) + source, srcDispenser := asserterSource(ctrl, persister, wantRecords, wantErr, false) destination, destDispenser := asserterDestination(ctrl, persister, wantRecords) dlq, dlqDispenser := asserterDestination(ctrl, persister, nil) pl.DLQ.Plugin = dlq.Plugin @@ -324,7 +324,7 @@ func TestServiceLifecycle_PipelineError(t *testing.T) { }, testProcessorService{}, testConnectorPluginService{ - source.Plugin: sourceDispenser, + source.Plugin: srcDispenser, destination.Plugin: destDispenser, dlq.Plugin: dlqDispenser, }, ps) @@ -364,7 +364,7 @@ func TestServiceLifecycle_PipelineError(t *testing.T) { is.True(cerrors.Is(event.Error, wantErr)) } -func TestServiceLifecycle_StopAll_Recovering(t *testing.T) { +func TestServiceLifecycle_StopAll(t *testing.T) { type testCase struct { name string stopFn func(ctx context.Context, is *is.I, lifecycleService *Service, pipelineID string) @@ -394,7 +394,7 @@ func TestServiceLifecycle_StopAll_Recovering(t *testing.T) { // service that everything went well and the pipeline was gracefully shutdown ctrl := gomock.NewController(t) wantRecords := generateRecords(0) - source, sourceDispenser := generatorSource(ctrl, persister, wantRecords, nil, tc.wantSourceStop) + source, srcDispenser := asserterSource(ctrl, persister, wantRecords, nil, tc.wantSourceStop) destination, destDispenser := asserterDestination(ctrl, persister, wantRecords) dlq, dlqDispenser := asserterDestination(ctrl, persister, nil) pl.DLQ.Plugin = dlq.Plugin @@ -414,7 +414,7 @@ func TestServiceLifecycle_StopAll_Recovering(t *testing.T) { }, testProcessorService{}, testConnectorPluginService{ - source.Plugin: sourceDispenser, + source.Plugin: srcDispenser, destination.Plugin: destDispenser, dlq.Plugin: dlqDispenser, }, ps) @@ -429,7 +429,6 @@ func TestServiceLifecycle_StopAll_Recovering(t *testing.T) { // wait for pipeline to finish consuming records from the source time.Sleep(100 * time.Millisecond) - pl.SetStatus(pipeline.StatusRecovering) tc.stopFn(ctx, is, ls, pl.ID) // wait for pipeline to finish @@ -508,7 +507,7 @@ func TestServiceLifecycle_PipelineStop(t *testing.T) { // service that everything went well and the pipeline was gracefully shutdown ctrl := gomock.NewController(t) wantRecords := generateRecords(10) - source, sourceDispenser := generatorSource(ctrl, persister, wantRecords, nil, true) + source, srcDispenser := asserterSource(ctrl, persister, wantRecords, nil, true) destination, destDispenser := asserterDestination(ctrl, persister, wantRecords) dlq, dlqDispenser := asserterDestination(ctrl, persister, nil) pl.DLQ.Plugin = dlq.Plugin @@ -525,7 +524,7 @@ func TestServiceLifecycle_PipelineStop(t *testing.T) { }, testProcessorService{}, testConnectorPluginService{ - source.Plugin: sourceDispenser, + source.Plugin: srcDispenser, destination.Plugin: destDispenser, dlq.Plugin: dlqDispenser, }, ps) @@ -567,16 +566,16 @@ func TestServiceLifecycle_Run_Rerun(t *testing.T) { // create mocked connectors var ( - source *connector.Instance - sourceDispenser *pmock.Dispenser - destination *connector.Instance - destDispenser *pmock.Dispenser - dlq *connector.Instance - dlqDispenser *pmock.Dispenser + source *connector.Instance + srcDispenser *pmock.Dispenser + destination *connector.Instance + destDispenser *pmock.Dispenser + dlq *connector.Instance + dlqDispenser *pmock.Dispenser ) if expected == pipeline.StatusRunning { // mocked connectors that are expected to be started - source, sourceDispenser = generatorSource(ctrl, persister, nil, nil, true) + source, srcDispenser = asserterSource(ctrl, persister, nil, nil, true) destination, destDispenser = asserterDestination(ctrl, persister, nil) dlq, dlqDispenser = asserterDestination(ctrl, persister, nil) } else { @@ -607,7 +606,7 @@ func TestServiceLifecycle_Run_Rerun(t *testing.T) { }, testProcessorService{}, testConnectorPluginService{ - source.Plugin: sourceDispenser, + source.Plugin: srcDispenser, destination.Plugin: destDispenser, dlq.Plugin: dlqDispenser, }, ps) @@ -646,6 +645,17 @@ func TestServiceLifecycle_Run_Rerun(t *testing.T) { } } +func TestServiceLifecycle_Restart(t *testing.T) { + t.Skipf("implement me") +} + +func TestServiceLifecycle_RestartWithBackoff(t *testing.T) { + t.Skipf("implement me") + + // 1. create a runnable pipeline with existing nodes + // 2. stop the pipeline +} + func generateRecords(count int) []opencdc.Record { records := make([]opencdc.Record, count) for i := 0; i < count; i++ { @@ -661,10 +671,10 @@ func generateRecords(count int) []opencdc.Record { return records } -// generatorSource creates a connector source that fills up the returned slice +// asserterSource creates a connector source that fills up the returned slice // with generated records as they are produced. After producing the requested // number of records it returns wantErr. -func generatorSource( +func asserterSource( ctrl *gomock.Controller, persister *connector.Persister, records []opencdc.Record, @@ -694,7 +704,7 @@ func generatorSource( } // asserterDestination creates a connector destination that checks if the records it gets -// match the expected records. On teardown it also makes sure that it received +// match the expected records. On teardown, it also makes sure that it received // all expected records. func asserterDestination( ctrl *gomock.Controller, From 51af89714d28b834dc718e6d34076bd061d0b7b9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ra=C3=BAl=20Barroso?= Date: Thu, 19 Sep 2024 10:22:19 +0200 Subject: [PATCH 09/40] refactor tests (wip) --- pkg/lifecycle/service.go | 2 +- pkg/lifecycle/service_test.go | 257 ++++++++++++++++++++++------------ 2 files changed, 165 insertions(+), 94 deletions(-) diff --git a/pkg/lifecycle/service.go b/pkg/lifecycle/service.go index 75a2094a7..a19d0aac0 100644 --- a/pkg/lifecycle/service.go +++ b/pkg/lifecycle/service.go @@ -240,7 +240,7 @@ func (s *Service) RestartWithBackoff(ctx context.Context, rp *runnablePipeline) <-timer.C } - // Get status of pipeline, if it recovers (it's running), + // Get status of pipeline to check if it already recovered. pl, err := s.pipelines.Get(ctx, rp.pipeline.ID) if err != nil { return cerrors.FatalError(fmt.Errorf("could not fetch pipeline %s: %w", rp.pipeline.ID, err)) diff --git a/pkg/lifecycle/service_test.go b/pkg/lifecycle/service_test.go index 807c18139..464639a6f 100644 --- a/pkg/lifecycle/service_test.go +++ b/pkg/lifecycle/service_test.go @@ -366,81 +366,10 @@ func TestServiceLifecycle_PipelineError(t *testing.T) { func TestServiceLifecycle_StopAll(t *testing.T) { type testCase struct { - name string - stopFn func(ctx context.Context, is *is.I, lifecycleService *Service, pipelineID string) - // whether we expect the source plugin's Stop() function to be called - // (doesn't happen when force-stopping) - wantSourceStop bool - want pipeline.Status - wantErr error - } - - runTest := func(t *testing.T, tc testCase) { - is := is.New(t) - ctx, killAll := context.WithCancel(context.Background()) - defer killAll() - logger := log.New(zerolog.Nop()) - db := &inmemory.DB{} - persister := connector.NewPersister(logger, db, time.Second, 3) - - ps := pipeline.NewService(logger, db) - - // create a host pipeline - pl, err := ps.Create(ctx, uuid.NewString(), pipeline.Config{Name: "test pipeline"}, pipeline.ProvisionTypeAPI) - is.NoErr(err) - - // create mocked connectors - // source will stop and return ErrGracefulShutdown which should signal to the - // service that everything went well and the pipeline was gracefully shutdown - ctrl := gomock.NewController(t) - wantRecords := generateRecords(0) - source, srcDispenser := asserterSource(ctrl, persister, wantRecords, nil, tc.wantSourceStop) - destination, destDispenser := asserterDestination(ctrl, persister, wantRecords) - dlq, dlqDispenser := asserterDestination(ctrl, persister, nil) - pl.DLQ.Plugin = dlq.Plugin - - pl, err = ps.AddConnector(ctx, pl.ID, source.ID) - is.NoErr(err) - pl, err = ps.AddConnector(ctx, pl.ID, destination.ID) - is.NoErr(err) - - ls := NewService( - logger, - testErrRecoveryCfg(), - testConnectorService{ - source.ID: source, - destination.ID: destination, - testDLQID: dlq, - }, - testProcessorService{}, - testConnectorPluginService{ - source.Plugin: srcDispenser, - destination.Plugin: destDispenser, - dlq.Plugin: dlqDispenser, - }, ps) - - // start the pipeline now that everything is set up - err = ls.Start( - ctx, - pl.ID, - ) - is.NoErr(err) - - // wait for pipeline to finish consuming records from the source - time.Sleep(100 * time.Millisecond) - - tc.stopFn(ctx, is, ls, pl.ID) - - // wait for pipeline to finish - err = ls.WaitPipeline(pl.ID) - if tc.wantErr != nil { - is.True(err != nil) - } else { - is.NoErr(err) - is.Equal("", pl.Error) - } - - is.Equal(tc.want, pl.GetStatus()) + name string + stopFn func(ctx context.Context, is *is.I, lifecycleService *Service, pipelineID string) + want pipeline.Status + wantErr error } testCases := []testCase{ @@ -449,41 +378,183 @@ func TestServiceLifecycle_StopAll(t *testing.T) { stopFn: func(ctx context.Context, is *is.I, ls *Service, pipelineID string) { ls.StopAll(ctx, pipeline.ErrGracefulShutdown) }, - wantSourceStop: true, - want: pipeline.StatusSystemStopped, + want: pipeline.StatusSystemStopped, }, { - name: "system stop (fatal err)", + name: "user stop (graceful)", stopFn: func(ctx context.Context, is *is.I, ls *Service, pipelineID string) { - ls.StopAll(ctx, cerrors.FatalError(cerrors.New("terrible err"))) + err := ls.Stop(ctx, pipelineID, false) + is.NoErr(err) }, - wantSourceStop: true, - want: pipeline.StatusDegraded, - wantErr: cerrors.New("terrible err"), + want: pipeline.StatusUserStopped, }, { - name: "connection error", + name: "system stop (fatal error)", stopFn: func(ctx context.Context, is *is.I, ls *Service, pipelineID string) { - ls.StopAll(ctx, cerrors.New("lost connection to database")) + ls.StopAll(ctx, cerrors.FatalError(cerrors.New("terrible err"))) }, - wantSourceStop: true, - want: pipeline.StatusRecovering, - wantErr: cerrors.New("lost connection to database"), + want: pipeline.StatusDegraded, + wantErr: cerrors.New("terrible err"), }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + is := is.New(t) + ctx, killAll := context.WithCancel(context.Background()) + defer killAll() + logger := log.New(zerolog.Nop()) + db := &inmemory.DB{} + persister := connector.NewPersister(logger, db, time.Second, 3) + + ps := pipeline.NewService(logger, db) + + // create a host pipeline + pl, err := ps.Create(ctx, uuid.NewString(), pipeline.Config{Name: "test pipeline"}, pipeline.ProvisionTypeAPI) + is.NoErr(err) + + // create mocked connectors + // source will stop and return ErrGracefulShutdown which should signal to the + // service that everything went well and the pipeline was gracefully shutdown + ctrl := gomock.NewController(t) + wantRecords := generateRecords(0) + source, srcDispenser := asserterSource(ctrl, persister, wantRecords, nil, true) + destination, destDispenser := asserterDestination(ctrl, persister, wantRecords) + dlq, dlqDispenser := asserterDestination(ctrl, persister, nil) + pl.DLQ.Plugin = dlq.Plugin + + pl, err = ps.AddConnector(ctx, pl.ID, source.ID) + is.NoErr(err) + pl, err = ps.AddConnector(ctx, pl.ID, destination.ID) + is.NoErr(err) + + ls := NewService( + logger, + testErrRecoveryCfg(), + testConnectorService{ + source.ID: source, + destination.ID: destination, + testDLQID: dlq, + }, + testProcessorService{}, + testConnectorPluginService{ + source.Plugin: srcDispenser, + destination.Plugin: destDispenser, + dlq.Plugin: dlqDispenser, + }, ps) + + // start the pipeline now that everything is set up + err = ls.Start( + ctx, + pl.ID, + ) + is.NoErr(err) + + // wait for pipeline to finish consuming records from the source + time.Sleep(100 * time.Millisecond) + + tc.stopFn(ctx, is, ls, pl.ID) + + // wait for pipeline to finish + err = ls.WaitPipeline(pl.ID) + if tc.wantErr != nil { + is.True(err != nil) + } else { + is.NoErr(err) + is.Equal("", pl.Error) + } + + is.Equal(tc.want, pl.GetStatus()) + }) + } +} + +func TestServiceLifecycle_StopAll_Recovering(t *testing.T) { + type testCase struct { + name string + stopFn func(ctx context.Context, is *is.I, lifecycleService *Service, pipelineID string) + want pipeline.Status + wantErr error + } + + testCases := []testCase{ { - name: "user stop (graceful)", + name: "connection error", stopFn: func(ctx context.Context, is *is.I, ls *Service, pipelineID string) { - err := ls.Stop(ctx, pipelineID, false) - is.NoErr(err) + ls.StopAll(ctx, cerrors.New("lost connection to database")) }, - wantSourceStop: true, - want: pipeline.StatusUserStopped, + want: pipeline.StatusRunning, + wantErr: cerrors.New("lost connection to database"), }, } for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { - runTest(t, tc) + is := is.New(t) + ctx, killAll := context.WithCancel(context.Background()) + defer killAll() + logger := log.New(zerolog.Nop()) + db := &inmemory.DB{} + persister := connector.NewPersister(logger, db, time.Second, 3) + + ps := pipeline.NewService(logger, db) + + // create a host pipeline + pl, err := ps.Create(ctx, uuid.NewString(), pipeline.Config{Name: "test pipeline"}, pipeline.ProvisionTypeAPI) + is.NoErr(err) + + // create mocked connectors + // source will stop and return ErrGracefulShutdown which should signal to the + // service that everything went well and the pipeline was gracefully shutdown + ctrl := gomock.NewController(t) + wantRecords := generateRecords(0) + source, srcDispenser := asserterSource(ctrl, persister, wantRecords, nil, true) + destination, destDispenser := asserterDestination(ctrl, persister, wantRecords) + dlq, dlqDispenser := asserterDestination(ctrl, persister, nil) + pl.DLQ.Plugin = dlq.Plugin + + pl, err = ps.AddConnector(ctx, pl.ID, source.ID) + is.NoErr(err) + pl, err = ps.AddConnector(ctx, pl.ID, destination.ID) + is.NoErr(err) + + ls := NewService( + logger, + testErrRecoveryCfg(), + testConnectorService{ + source.ID: source, + destination.ID: destination, + testDLQID: dlq, + }, + testProcessorService{}, + testConnectorPluginService{ + source.Plugin: srcDispenser, + destination.Plugin: destDispenser, + dlq.Plugin: dlqDispenser, + }, ps) + + // start the pipeline now that everything is set up + err = ls.Start( + ctx, + pl.ID, + ) + is.NoErr(err) + + // wait for pipeline to finish consuming records from the source + time.Sleep(100 * time.Millisecond) + + tc.stopFn(ctx, is, ls, pl.ID) + + // wait for pipeline to finish + err = ls.WaitPipeline(pl.ID) + if tc.wantErr != nil { + is.True(err != nil) + } else { + is.NoErr(err) + is.Equal("", pl.Error) + } + + is.Equal(tc.want, pl.GetStatus()) }) } } From 1e284b99361e8c80c26d24e44827d313983ee152 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ra=C3=BAl=20Barroso?= Date: Thu, 19 Sep 2024 10:38:03 +0200 Subject: [PATCH 10/40] restart test WIP --- pkg/lifecycle/service_test.go | 59 +++++++++++++++++++++++++++++++---- 1 file changed, 53 insertions(+), 6 deletions(-) diff --git a/pkg/lifecycle/service_test.go b/pkg/lifecycle/service_test.go index 464639a6f..079df921b 100644 --- a/pkg/lifecycle/service_test.go +++ b/pkg/lifecycle/service_test.go @@ -717,14 +717,61 @@ func TestServiceLifecycle_Run_Rerun(t *testing.T) { } func TestServiceLifecycle_Restart(t *testing.T) { - t.Skipf("implement me") -} + is := is.New(t) + ctx, killAll := context.WithCancel(context.Background()) + defer killAll() + logger := log.New(zerolog.Nop()) + db := &inmemory.DB{} + persister := connector.NewPersister(logger, db, time.Second, 3) + + ps := pipeline.NewService(logger, db) + + // create a host pipeline + pl, err := ps.Create(ctx, uuid.NewString(), pipeline.Config{Name: "test pipeline"}, pipeline.ProvisionTypeAPI) + is.NoErr(err) + + // create mocked connectors + ctrl := gomock.NewController(t) + wantRecords := generateRecords(10) + source, srcDispenser := asserterSource(ctrl, persister, wantRecords, nil, true) + destination, destDispenser := asserterDestination(ctrl, persister, wantRecords) + dlq, dlqDispenser := asserterDestination(ctrl, persister, nil) + pl.DLQ.Plugin = dlq.Plugin + + pl, err = ps.AddConnector(ctx, pl.ID, source.ID) + is.NoErr(err) + pl, err = ps.AddConnector(ctx, pl.ID, destination.ID) + is.NoErr(err) -func TestServiceLifecycle_RestartWithBackoff(t *testing.T) { - t.Skipf("implement me") + ls := NewService(logger, testErrRecoveryCfg(), testConnectorService{ + source.ID: source, + destination.ID: destination, + testDLQID: dlq, + }, + testProcessorService{}, + testConnectorPluginService{ + source.Plugin: srcDispenser, + destination.Plugin: destDispenser, + dlq.Plugin: dlqDispenser, + }, ps) + + // start the pipeline + err = ls.Start(ctx, pl.ID) + is.NoErr(err) + + // wait for pipeline to start + time.Sleep(100 * time.Millisecond) + + // restart the pipeline + rp, ok := ls.runningPipelines.Get(pl.ID) + is.True(ok) + + err = ls.Restart(ctx, rp) + is.NoErr(err) + + // wait for pipeline to restart + time.Sleep(100 * time.Millisecond) - // 1. create a runnable pipeline with existing nodes - // 2. stop the pipeline } func generateRecords(count int) []opencdc.Record { From d91797a1e90e6167f6fff1ce895e1486de920f9d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ra=C3=BAl=20Barroso?= Date: Thu, 19 Sep 2024 11:52:25 +0200 Subject: [PATCH 11/40] remove redundant code --- pkg/lifecycle/service.go | 12 +----------- 1 file changed, 1 insertion(+), 11 deletions(-) diff --git a/pkg/lifecycle/service.go b/pkg/lifecycle/service.go index a19d0aac0..ec93f4d08 100644 --- a/pkg/lifecycle/service.go +++ b/pkg/lifecycle/service.go @@ -187,11 +187,6 @@ func (s *Service) Start( // Restart stops an existing pipeline and replaces their nodes. func (s *Service) Restart(ctx context.Context, rp *runnablePipeline) error { - rp, ok := s.runningPipelines.Get(rp.pipeline.ID) - if !ok { - return cerrors.Errorf("pipeline %s can't be restarted: %w", rp.pipeline.ID, pipeline.ErrInstanceNotFound) - } - // In case we want to restart a running pipeline. if rp.pipeline.GetStatus() == pipeline.StatusRunning { if err := s.Stop(ctx, rp.pipeline.ID, true); err != nil { @@ -241,12 +236,7 @@ func (s *Service) RestartWithBackoff(ctx context.Context, rp *runnablePipeline) } // Get status of pipeline to check if it already recovered. - pl, err := s.pipelines.Get(ctx, rp.pipeline.ID) - if err != nil { - return cerrors.FatalError(fmt.Errorf("could not fetch pipeline %s: %w", rp.pipeline.ID, err)) - } - - if pl.GetStatus() == pipeline.StatusRunning { + if rp.pipeline.GetStatus() == pipeline.StatusRunning { s.logger.Debug(ctx). Str(log.PipelineIDField, rp.pipeline.ID). Int("attempt", attempt). From 2864578c6b4c5830681115cb558b647f11f7d3d2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ra=C3=BAl=20Barroso?= Date: Thu, 19 Sep 2024 16:55:00 +0200 Subject: [PATCH 12/40] update test --- pkg/lifecycle/service.go | 7 -- pkg/lifecycle/service_test.go | 206 +++++++++++++++++++--------------- 2 files changed, 116 insertions(+), 97 deletions(-) diff --git a/pkg/lifecycle/service.go b/pkg/lifecycle/service.go index ec93f4d08..d1af5dc0f 100644 --- a/pkg/lifecycle/service.go +++ b/pkg/lifecycle/service.go @@ -187,13 +187,6 @@ func (s *Service) Start( // Restart stops an existing pipeline and replaces their nodes. func (s *Service) Restart(ctx context.Context, rp *runnablePipeline) error { - // In case we want to restart a running pipeline. - if rp.pipeline.GetStatus() == pipeline.StatusRunning { - if err := s.Stop(ctx, rp.pipeline.ID, true); err != nil { - return cerrors.Errorf("could not stop pipeline %s: %w", rp.pipeline.ID, err) - } - } - s.logger.Debug(ctx).Str(log.PipelineIDField, rp.pipeline.ID).Msg("restarting pipeline") s.logger.Trace(ctx).Str(log.PipelineIDField, rp.pipeline.ID).Msg("swapping nodes") diff --git a/pkg/lifecycle/service_test.go b/pkg/lifecycle/service_test.go index 079df921b..1fed61e1d 100644 --- a/pkg/lifecycle/service_test.go +++ b/pkg/lifecycle/service_test.go @@ -245,9 +245,9 @@ func TestServiceLifecycle_PipelineSuccess(t *testing.T) { // create mocked connectors ctrl := gomock.NewController(t) wantRecords := generateRecords(10) - source, srcDispenser := asserterSource(ctrl, persister, wantRecords, nil, true) - destination, destDispenser := asserterDestination(ctrl, persister, wantRecords) - dlq, dlqDispenser := asserterDestination(ctrl, persister, nil) + source, srcDispenser := asserterSource(ctrl, persister, wantRecords, nil, true, 1) + destination, destDispenser := asserterDestination(ctrl, persister, wantRecords, 1) + dlq, dlqDispenser := asserterDestination(ctrl, persister, nil, 1) pl.DLQ.Plugin = dlq.Plugin pl, err = ps.AddConnector(ctx, pl.ID, source.ID) @@ -307,9 +307,9 @@ func TestServiceLifecycle_PipelineError(t *testing.T) { wantErr := cerrors.New("source connector error") ctrl := gomock.NewController(t) wantRecords := generateRecords(10) - source, srcDispenser := asserterSource(ctrl, persister, wantRecords, wantErr, false) - destination, destDispenser := asserterDestination(ctrl, persister, wantRecords) - dlq, dlqDispenser := asserterDestination(ctrl, persister, nil) + source, srcDispenser := asserterSource(ctrl, persister, wantRecords, wantErr, false, 1) + destination, destDispenser := asserterDestination(ctrl, persister, wantRecords, 1) + dlq, dlqDispenser := asserterDestination(ctrl, persister, nil, 1) pl.DLQ.Plugin = dlq.Plugin pl, err = ps.AddConnector(ctx, pl.ID, source.ID) @@ -364,7 +364,7 @@ func TestServiceLifecycle_PipelineError(t *testing.T) { is.True(cerrors.Is(event.Error, wantErr)) } -func TestServiceLifecycle_StopAll(t *testing.T) { +func TestServiceLifecycle_Stop(t *testing.T) { type testCase struct { name string stopFn func(ctx context.Context, is *is.I, lifecycleService *Service, pipelineID string) @@ -373,13 +373,6 @@ func TestServiceLifecycle_StopAll(t *testing.T) { } testCases := []testCase{ - { - name: "system stop (graceful shutdown err)", - stopFn: func(ctx context.Context, is *is.I, ls *Service, pipelineID string) { - ls.StopAll(ctx, pipeline.ErrGracefulShutdown) - }, - want: pipeline.StatusSystemStopped, - }, { name: "user stop (graceful)", stopFn: func(ctx context.Context, is *is.I, ls *Service, pipelineID string) { @@ -388,6 +381,95 @@ func TestServiceLifecycle_StopAll(t *testing.T) { }, want: pipeline.StatusUserStopped, }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + is := is.New(t) + ctx, killAll := context.WithCancel(context.Background()) + defer killAll() + logger := log.New(zerolog.Nop()) + db := &inmemory.DB{} + persister := connector.NewPersister(logger, db, time.Second, 3) + + ps := pipeline.NewService(logger, db) + + // create a host pipeline + pl, err := ps.Create(ctx, uuid.NewString(), pipeline.Config{Name: "test pipeline"}, pipeline.ProvisionTypeAPI) + is.NoErr(err) + + // create mocked connectors + // source will stop and return ErrGracefulShutdown which should signal to the + // service that everything went well and the pipeline was gracefully shutdown + ctrl := gomock.NewController(t) + wantRecords := generateRecords(0) + source, srcDispenser := asserterSource(ctrl, persister, wantRecords, nil, true, 1) + destination, destDispenser := asserterDestination(ctrl, persister, wantRecords, 1) + dlq, dlqDispenser := asserterDestination(ctrl, persister, nil, 1) + pl.DLQ.Plugin = dlq.Plugin + + pl, err = ps.AddConnector(ctx, pl.ID, source.ID) + is.NoErr(err) + pl, err = ps.AddConnector(ctx, pl.ID, destination.ID) + is.NoErr(err) + + ls := NewService( + logger, + testErrRecoveryCfg(), + testConnectorService{ + source.ID: source, + destination.ID: destination, + testDLQID: dlq, + }, + testProcessorService{}, + testConnectorPluginService{ + source.Plugin: srcDispenser, + destination.Plugin: destDispenser, + dlq.Plugin: dlqDispenser, + }, ps) + + // start the pipeline now that everything is set up + err = ls.Start( + ctx, + pl.ID, + ) + is.NoErr(err) + + // wait for pipeline to finish consuming records from the source + time.Sleep(100 * time.Millisecond) + + tc.stopFn(ctx, is, ls, pl.ID) + + // wait for pipeline to finish + err = ls.WaitPipeline(pl.ID) + if tc.wantErr != nil { + is.True(err != nil) + } else { + is.NoErr(err) + is.Equal("", pl.Error) + } + + is.Equal(tc.want, pl.GetStatus()) + }) + } +} + +func TestServiceLifecycle_StopAll(t *testing.T) { + type testCase struct { + name string + stopFn func(ctx context.Context, is *is.I, lifecycleService *Service, pipelineID string) + want pipeline.Status + wantErr error + } + + testCases := []testCase{ + { + name: "system stop (graceful shutdown err)", + stopFn: func(ctx context.Context, is *is.I, ls *Service, pipelineID string) { + ls.StopAll(ctx, pipeline.ErrGracefulShutdown) + }, + want: pipeline.StatusSystemStopped, + }, { name: "system stop (fatal error)", stopFn: func(ctx context.Context, is *is.I, ls *Service, pipelineID string) { @@ -418,9 +500,9 @@ func TestServiceLifecycle_StopAll(t *testing.T) { // service that everything went well and the pipeline was gracefully shutdown ctrl := gomock.NewController(t) wantRecords := generateRecords(0) - source, srcDispenser := asserterSource(ctrl, persister, wantRecords, nil, true) - destination, destDispenser := asserterDestination(ctrl, persister, wantRecords) - dlq, dlqDispenser := asserterDestination(ctrl, persister, nil) + source, srcDispenser := asserterSource(ctrl, persister, wantRecords, nil, true, 1) + destination, destDispenser := asserterDestination(ctrl, persister, wantRecords, 1) + dlq, dlqDispenser := asserterDestination(ctrl, persister, nil, 1) pl.DLQ.Plugin = dlq.Plugin pl, err = ps.AddConnector(ctx, pl.ID, source.ID) @@ -508,9 +590,9 @@ func TestServiceLifecycle_StopAll_Recovering(t *testing.T) { // service that everything went well and the pipeline was gracefully shutdown ctrl := gomock.NewController(t) wantRecords := generateRecords(0) - source, srcDispenser := asserterSource(ctrl, persister, wantRecords, nil, true) - destination, destDispenser := asserterDestination(ctrl, persister, wantRecords) - dlq, dlqDispenser := asserterDestination(ctrl, persister, nil) + source, srcDispenser := asserterSource(ctrl, persister, wantRecords, nil, true, 1) + destination, destDispenser := asserterDestination(ctrl, persister, wantRecords, 1) + dlq, dlqDispenser := asserterDestination(ctrl, persister, nil, 1) pl.DLQ.Plugin = dlq.Plugin pl, err = ps.AddConnector(ctx, pl.ID, source.ID) @@ -578,9 +660,9 @@ func TestServiceLifecycle_PipelineStop(t *testing.T) { // service that everything went well and the pipeline was gracefully shutdown ctrl := gomock.NewController(t) wantRecords := generateRecords(10) - source, srcDispenser := asserterSource(ctrl, persister, wantRecords, nil, true) - destination, destDispenser := asserterDestination(ctrl, persister, wantRecords) - dlq, dlqDispenser := asserterDestination(ctrl, persister, nil) + source, srcDispenser := asserterSource(ctrl, persister, wantRecords, nil, true, 1) + destination, destDispenser := asserterDestination(ctrl, persister, wantRecords, 1) + dlq, dlqDispenser := asserterDestination(ctrl, persister, nil, 1) pl.DLQ.Plugin = dlq.Plugin pl, err = ps.AddConnector(ctx, pl.ID, source.ID) @@ -646,9 +728,9 @@ func TestServiceLifecycle_Run_Rerun(t *testing.T) { ) if expected == pipeline.StatusRunning { // mocked connectors that are expected to be started - source, srcDispenser = asserterSource(ctrl, persister, nil, nil, true) - destination, destDispenser = asserterDestination(ctrl, persister, nil) - dlq, dlqDispenser = asserterDestination(ctrl, persister, nil) + source, srcDispenser = asserterSource(ctrl, persister, nil, nil, true, 1) + destination, destDispenser = asserterDestination(ctrl, persister, nil, 1) + dlq, dlqDispenser = asserterDestination(ctrl, persister, nil, 1) } else { // dummy connectors that are not expected to be started source = dummySource(persister) @@ -716,64 +798,6 @@ func TestServiceLifecycle_Run_Rerun(t *testing.T) { } } -func TestServiceLifecycle_Restart(t *testing.T) { - is := is.New(t) - ctx, killAll := context.WithCancel(context.Background()) - defer killAll() - logger := log.New(zerolog.Nop()) - db := &inmemory.DB{} - persister := connector.NewPersister(logger, db, time.Second, 3) - - ps := pipeline.NewService(logger, db) - - // create a host pipeline - pl, err := ps.Create(ctx, uuid.NewString(), pipeline.Config{Name: "test pipeline"}, pipeline.ProvisionTypeAPI) - is.NoErr(err) - - // create mocked connectors - ctrl := gomock.NewController(t) - wantRecords := generateRecords(10) - source, srcDispenser := asserterSource(ctrl, persister, wantRecords, nil, true) - destination, destDispenser := asserterDestination(ctrl, persister, wantRecords) - dlq, dlqDispenser := asserterDestination(ctrl, persister, nil) - pl.DLQ.Plugin = dlq.Plugin - - pl, err = ps.AddConnector(ctx, pl.ID, source.ID) - is.NoErr(err) - pl, err = ps.AddConnector(ctx, pl.ID, destination.ID) - is.NoErr(err) - - ls := NewService(logger, testErrRecoveryCfg(), testConnectorService{ - source.ID: source, - destination.ID: destination, - testDLQID: dlq, - }, - testProcessorService{}, - testConnectorPluginService{ - source.Plugin: srcDispenser, - destination.Plugin: destDispenser, - dlq.Plugin: dlqDispenser, - }, ps) - - // start the pipeline - err = ls.Start(ctx, pl.ID) - is.NoErr(err) - - // wait for pipeline to start - time.Sleep(100 * time.Millisecond) - - // restart the pipeline - rp, ok := ls.runningPipelines.Get(pl.ID) - is.True(ok) - - err = ls.Restart(ctx, rp) - is.NoErr(err) - - // wait for pipeline to restart - time.Sleep(100 * time.Millisecond) - -} - func generateRecords(count int) []opencdc.Record { records := make([]opencdc.Record, count) for i := 0; i < count; i++ { @@ -798,6 +822,7 @@ func asserterSource( records []opencdc.Record, wantErr error, stop bool, + times int, ) (*connector.Instance, *pmock.Dispenser) { sourcePluginOptions := []pmock.ConfigurableSourcePluginOption{ pmock.SourcePluginWithConfigure(), @@ -811,12 +836,12 @@ func asserterSource( if stop { sourcePluginOptions = append(sourcePluginOptions, pmock.SourcePluginWithStop()) } - sourcePlugin := pmock.NewConfigurableSourcePlugin(ctrl, sourcePluginOptions...) - source := dummySource(persister) dispenser := pmock.NewDispenser(ctrl) - dispenser.EXPECT().DispenseSource().Return(sourcePlugin, nil) + dispenser.EXPECT().DispenseSource().DoAndReturn(func() (connectorPlugin.SourcePlugin, error) { + return pmock.NewConfigurableSourcePlugin(ctrl, sourcePluginOptions...), nil + }).Times(times) return source, dispenser } @@ -828,6 +853,7 @@ func asserterDestination( ctrl *gomock.Controller, persister *connector.Persister, records []opencdc.Record, + times int, ) (*connector.Instance, *pmock.Dispenser) { var lastPosition opencdc.Position if len(records) > 0 { @@ -843,12 +869,12 @@ func asserterDestination( pmock.DestinationPluginWithTeardown(), } - destinationPlugin := pmock.NewConfigurableDestinationPlugin(ctrl, destinationPluginOptions...) - dest := dummyDestination(persister) dispenser := pmock.NewDispenser(ctrl) - dispenser.EXPECT().DispenseDestination().Return(destinationPlugin, nil) + dispenser.EXPECT().DispenseDestination().DoAndReturn(func() (connectorPlugin.DestinationPlugin, error) { + return pmock.NewConfigurableDestinationPlugin(ctrl, destinationPluginOptions...), nil + }).Times(times) return dest, dispenser } From 26e7c02e6e5481b45e269516bced84c6eb0564f7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ra=C3=BAl=20Barroso?= Date: Thu, 19 Sep 2024 18:55:24 +0200 Subject: [PATCH 13/40] fix test --- pkg/lifecycle/service_test.go | 47 ++++++++++++++++++++++++++++------- 1 file changed, 38 insertions(+), 9 deletions(-) diff --git a/pkg/lifecycle/service_test.go b/pkg/lifecycle/service_test.go index 1fed61e1d..feec66b51 100644 --- a/pkg/lifecycle/service_test.go +++ b/pkg/lifecycle/service_test.go @@ -559,14 +559,16 @@ func TestServiceLifecycle_StopAll_Recovering(t *testing.T) { wantErr error } + wantErr := cerrors.New("lost connection to database") + testCases := []testCase{ { name: "connection error", stopFn: func(ctx context.Context, is *is.I, ls *Service, pipelineID string) { - ls.StopAll(ctx, cerrors.New("lost connection to database")) + ls.StopAll(ctx, wantErr) }, want: pipeline.StatusRunning, - wantErr: cerrors.New("lost connection to database"), + wantErr: wantErr, }, } @@ -575,7 +577,8 @@ func TestServiceLifecycle_StopAll_Recovering(t *testing.T) { is := is.New(t) ctx, killAll := context.WithCancel(context.Background()) defer killAll() - logger := log.New(zerolog.Nop()) + logger := log.New(zerolog.New(zerolog.NewTestWriter(t))) + logger.Logger = logger.Level(zerolog.TraceLevel) db := &inmemory.DB{} persister := connector.NewPersister(logger, db, time.Second, 3) @@ -590,9 +593,9 @@ func TestServiceLifecycle_StopAll_Recovering(t *testing.T) { // service that everything went well and the pipeline was gracefully shutdown ctrl := gomock.NewController(t) wantRecords := generateRecords(0) - source, srcDispenser := asserterSource(ctrl, persister, wantRecords, nil, true, 1) - destination, destDispenser := asserterDestination(ctrl, persister, wantRecords, 1) - dlq, dlqDispenser := asserterDestination(ctrl, persister, nil, 1) + source, srcDispenser := asserterSource(ctrl, persister, wantRecords, nil, true, 2) + destination, destDispenser := asserterDestination(ctrl, persister, wantRecords, 2) + dlq, dlqDispenser := asserterDestination(ctrl, persister, nil, 2) pl.DLQ.Plugin = dlq.Plugin pl, err = ps.AddConnector(ctx, pl.ID, source.ID) @@ -625,18 +628,44 @@ func TestServiceLifecycle_StopAll_Recovering(t *testing.T) { // wait for pipeline to finish consuming records from the source time.Sleep(100 * time.Millisecond) + c := make(cchan.Chan[error]) + go func() { + c <- ls.WaitPipeline(pl.ID) + }() + tc.stopFn(ctx, is, ls, pl.ID) + err, _, err2 := c.RecvTimeout(ctx, 10000*time.Millisecond) + + is.NoErr(err2) - // wait for pipeline to finish - err = ls.WaitPipeline(pl.ID) if tc.wantErr != nil { - is.True(err != nil) + logger.Info(ctx).Msgf("%+v", err.(interface{ Unwrap() error }).Unwrap()) + logger.Info(ctx).Msgf("%+v", tc.wantErr) + is.True(cerrors.Is(err, tc.wantErr)) } else { is.NoErr(err) is.Equal("", pl.Error) } + go func() { + c <- ls.WaitPipeline(pl.ID) + }() + + _, _, err = c.RecvTimeout(ctx, 1000*time.Millisecond) + is.True(cerrors.Is(err, context.DeadlineExceeded)) + + err = ls.Stop(ctx, pl.ID, false) + is.NoErr(err) + + // Check pipeline ended in a running state is.Equal(tc.want, pl.GetStatus()) + + go func() { + c <- ls.WaitPipeline(pl.ID) + }() + err, _, _ = c.RecvTimeout(ctx, 1000*time.Millisecond) + is.NoErr(err) + is.Equal(pipeline.StatusUserStopped, pl.GetStatus()) }) } } From 65b107022b3122d0a98420414262eeb58f687539 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ra=C3=BAl=20Barroso?= Date: Fri, 20 Sep 2024 12:08:43 +0200 Subject: [PATCH 14/40] add new tests and refactor --- pkg/lifecycle/service.go | 4 +- pkg/lifecycle/service_test.go | 192 +++++++++++++++------------------- 2 files changed, 88 insertions(+), 108 deletions(-) diff --git a/pkg/lifecycle/service.go b/pkg/lifecycle/service.go index d1af5dc0f..3173ce4e2 100644 --- a/pkg/lifecycle/service.go +++ b/pkg/lifecycle/service.go @@ -202,7 +202,7 @@ func (s *Service) Restart(ctx context.Context, rp *runnablePipeline) error { if err := s.runPipeline(ctx, rp); err != nil { return cerrors.Errorf("failed to run pipeline %s: %w", rp.pipeline.ID, err) } - s.logger.Info(ctx).Str(log.PipelineIDField, rp.pipeline.ID).Msg("pipeline re-started ") + s.logger.Info(ctx).Str(log.PipelineIDField, rp.pipeline.ID).Msg("pipeline restarted ") return nil } @@ -292,7 +292,7 @@ func (s *Service) stopForceful(ctx context.Context, rp *runnablePipeline) error Str(log.PipelineIDField, rp.pipeline.ID). Any(log.PipelineStatusField, rp.pipeline.GetStatus()). Msg("force stopping pipeline") - rp.t.Kill(pipeline.ErrForceStop) + rp.t.Kill(cerrors.FatalError(pipeline.ErrForceStop)) for _, n := range rp.n { if node, ok := n.(stream.ForceStoppableNode); ok { // stop all pub nodes diff --git a/pkg/lifecycle/service_test.go b/pkg/lifecycle/service_test.go index feec66b51..127fb97cd 100644 --- a/pkg/lifecycle/service_test.go +++ b/pkg/lifecycle/service_test.go @@ -366,21 +366,32 @@ func TestServiceLifecycle_PipelineError(t *testing.T) { func TestServiceLifecycle_Stop(t *testing.T) { type testCase struct { - name string - stopFn func(ctx context.Context, is *is.I, lifecycleService *Service, pipelineID string) - want pipeline.Status - wantErr error + name string + stopFn func(ctx context.Context, is *is.I, lifecycleService *Service, pipelineID string) + forceStop bool + want pipeline.Status + wantErr error } testCases := []testCase{ { - name: "user stop (graceful)", + name: "user stop: graceful", stopFn: func(ctx context.Context, is *is.I, ls *Service, pipelineID string) { err := ls.Stop(ctx, pipelineID, false) is.NoErr(err) }, want: pipeline.StatusUserStopped, }, + { + name: "user stop: forceful", + stopFn: func(ctx context.Context, is *is.I, ls *Service, pipelineID string) { + err := ls.Stop(ctx, pipelineID, true) + is.NoErr(err) + }, + forceStop: true, + wantErr: cerrors.FatalError(pipeline.ErrForceStop), + want: pipeline.StatusDegraded, + }, } for _, tc := range testCases { @@ -403,7 +414,7 @@ func TestServiceLifecycle_Stop(t *testing.T) { // service that everything went well and the pipeline was gracefully shutdown ctrl := gomock.NewController(t) wantRecords := generateRecords(0) - source, srcDispenser := asserterSource(ctrl, persister, wantRecords, nil, true, 1) + source, srcDispenser := asserterSource(ctrl, persister, wantRecords, nil, !tc.forceStop, 1) destination, destDispenser := asserterDestination(ctrl, persister, wantRecords, 1) dlq, dlqDispenser := asserterDestination(ctrl, persister, nil, 1) pl.DLQ.Plugin = dlq.Plugin @@ -552,122 +563,91 @@ func TestServiceLifecycle_StopAll(t *testing.T) { } func TestServiceLifecycle_StopAll_Recovering(t *testing.T) { - type testCase struct { - name string - stopFn func(ctx context.Context, is *is.I, lifecycleService *Service, pipelineID string) - want pipeline.Status - wantErr error - } - wantErr := cerrors.New("lost connection to database") - testCases := []testCase{ - { - name: "connection error", - stopFn: func(ctx context.Context, is *is.I, ls *Service, pipelineID string) { - ls.StopAll(ctx, wantErr) - }, - want: pipeline.StatusRunning, - wantErr: wantErr, - }, - } - - for _, tc := range testCases { - t.Run(tc.name, func(t *testing.T) { - is := is.New(t) - ctx, killAll := context.WithCancel(context.Background()) - defer killAll() - logger := log.New(zerolog.New(zerolog.NewTestWriter(t))) - logger.Logger = logger.Level(zerolog.TraceLevel) - db := &inmemory.DB{} - persister := connector.NewPersister(logger, db, time.Second, 3) - - ps := pipeline.NewService(logger, db) + is := is.New(t) + ctx, killAll := context.WithCancel(context.Background()) + defer killAll() + logger := log.New(zerolog.Nop()) + db := &inmemory.DB{} + persister := connector.NewPersister(logger, db, time.Second, 3) - // create a host pipeline - pl, err := ps.Create(ctx, uuid.NewString(), pipeline.Config{Name: "test pipeline"}, pipeline.ProvisionTypeAPI) - is.NoErr(err) + ps := pipeline.NewService(logger, db) - // create mocked connectors - // source will stop and return ErrGracefulShutdown which should signal to the - // service that everything went well and the pipeline was gracefully shutdown - ctrl := gomock.NewController(t) - wantRecords := generateRecords(0) - source, srcDispenser := asserterSource(ctrl, persister, wantRecords, nil, true, 2) - destination, destDispenser := asserterDestination(ctrl, persister, wantRecords, 2) - dlq, dlqDispenser := asserterDestination(ctrl, persister, nil, 2) - pl.DLQ.Plugin = dlq.Plugin + // create a host pipeline + pl, err := ps.Create(ctx, uuid.NewString(), pipeline.Config{Name: "test pipeline"}, pipeline.ProvisionTypeAPI) + is.NoErr(err) - pl, err = ps.AddConnector(ctx, pl.ID, source.ID) - is.NoErr(err) - pl, err = ps.AddConnector(ctx, pl.ID, destination.ID) - is.NoErr(err) + // create mocked connectors + // source will stop and return ErrGracefulShutdown which should signal to the + // service that everything went well and the pipeline was gracefully shutdown + ctrl := gomock.NewController(t) + wantRecords := generateRecords(0) + source, srcDispenser := asserterSource(ctrl, persister, wantRecords, nil, true, 2) + destination, destDispenser := asserterDestination(ctrl, persister, wantRecords, 2) + dlq, dlqDispenser := asserterDestination(ctrl, persister, nil, 2) + pl.DLQ.Plugin = dlq.Plugin - ls := NewService( - logger, - testErrRecoveryCfg(), - testConnectorService{ - source.ID: source, - destination.ID: destination, - testDLQID: dlq, - }, - testProcessorService{}, - testConnectorPluginService{ - source.Plugin: srcDispenser, - destination.Plugin: destDispenser, - dlq.Plugin: dlqDispenser, - }, ps) + pl, err = ps.AddConnector(ctx, pl.ID, source.ID) + is.NoErr(err) + pl, err = ps.AddConnector(ctx, pl.ID, destination.ID) + is.NoErr(err) - // start the pipeline now that everything is set up - err = ls.Start( - ctx, - pl.ID, - ) - is.NoErr(err) + ls := NewService( + logger, + testErrRecoveryCfg(), + testConnectorService{ + source.ID: source, + destination.ID: destination, + testDLQID: dlq, + }, + testProcessorService{}, + testConnectorPluginService{ + source.Plugin: srcDispenser, + destination.Plugin: destDispenser, + dlq.Plugin: dlqDispenser, + }, ps) - // wait for pipeline to finish consuming records from the source - time.Sleep(100 * time.Millisecond) + // start the pipeline now that everything is set up + err = ls.Start( + ctx, + pl.ID, + ) + is.NoErr(err) - c := make(cchan.Chan[error]) - go func() { - c <- ls.WaitPipeline(pl.ID) - }() + // wait for pipeline to finish consuming records from the source + time.Sleep(100 * time.Millisecond) - tc.stopFn(ctx, is, ls, pl.ID) - err, _, err2 := c.RecvTimeout(ctx, 10000*time.Millisecond) + c := make(cchan.Chan[error]) + go func() { + c <- ls.WaitPipeline(pl.ID) + }() - is.NoErr(err2) + ls.StopAll(ctx, wantErr) + err, _, err2 := c.RecvTimeout(ctx, 10000*time.Millisecond) - if tc.wantErr != nil { - logger.Info(ctx).Msgf("%+v", err.(interface{ Unwrap() error }).Unwrap()) - logger.Info(ctx).Msgf("%+v", tc.wantErr) - is.True(cerrors.Is(err, tc.wantErr)) - } else { - is.NoErr(err) - is.Equal("", pl.Error) - } + is.NoErr(err2) + is.True(cerrors.Is(err, wantErr)) - go func() { - c <- ls.WaitPipeline(pl.ID) - }() + go func() { + c <- ls.WaitPipeline(pl.ID) + }() - _, _, err = c.RecvTimeout(ctx, 1000*time.Millisecond) - is.True(cerrors.Is(err, context.DeadlineExceeded)) + _, _, err = c.RecvTimeout(ctx, 1000*time.Millisecond) + is.True(cerrors.Is(err, context.DeadlineExceeded)) - err = ls.Stop(ctx, pl.ID, false) - is.NoErr(err) + err = ls.Stop(ctx, pl.ID, false) + is.NoErr(err) - // Check pipeline ended in a running state - is.Equal(tc.want, pl.GetStatus()) + // Check pipeline ended in a running state + is.Equal(pipeline.StatusRunning, pl.GetStatus()) - go func() { - c <- ls.WaitPipeline(pl.ID) - }() - err, _, _ = c.RecvTimeout(ctx, 1000*time.Millisecond) - is.NoErr(err) - is.Equal(pipeline.StatusUserStopped, pl.GetStatus()) - }) - } + go func() { + c <- ls.WaitPipeline(pl.ID) + }() + err, _, _ = c.RecvTimeout(ctx, 1000*time.Millisecond) + is.NoErr(err) + is.Equal(pipeline.StatusUserStopped, pl.GetStatus()) } func TestServiceLifecycle_PipelineStop(t *testing.T) { @@ -736,7 +716,7 @@ func TestServiceLifecycle_Run_Rerun(t *testing.T) { ctx, killAll := context.WithCancel(context.Background()) defer killAll() ctrl := gomock.NewController(t) - logger := log.Test(t) + logger := log.New(zerolog.Nop()) db := &inmemory.DB{} persister := connector.NewPersister(logger, db, time.Second, 3) From eaf14095dd8ce8c60a87d12917a9abf3028e7190 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ra=C3=BAl=20Barroso?= Date: Fri, 20 Sep 2024 13:47:54 +0200 Subject: [PATCH 15/40] fix logger --- pkg/lifecycle/service_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/lifecycle/service_test.go b/pkg/lifecycle/service_test.go index 127fb97cd..f7857e18b 100644 --- a/pkg/lifecycle/service_test.go +++ b/pkg/lifecycle/service_test.go @@ -399,7 +399,7 @@ func TestServiceLifecycle_Stop(t *testing.T) { is := is.New(t) ctx, killAll := context.WithCancel(context.Background()) defer killAll() - logger := log.New(zerolog.Nop()) + logger := log.Test(t) db := &inmemory.DB{} persister := connector.NewPersister(logger, db, time.Second, 3) @@ -716,7 +716,7 @@ func TestServiceLifecycle_Run_Rerun(t *testing.T) { ctx, killAll := context.WithCancel(context.Background()) defer killAll() ctrl := gomock.NewController(t) - logger := log.New(zerolog.Nop()) + logger := log.Test(t) db := &inmemory.DB{} persister := connector.NewPersister(logger, db, time.Second, 3) From ef653602933ec12d69f7ba8e4a4ffe9746293b85 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ra=C3=BAl=20Barroso?= Date: Fri, 20 Sep 2024 16:21:54 +0200 Subject: [PATCH 16/40] cosmetic changes --- pkg/lifecycle/service.go | 8 +++++--- pkg/lifecycle/service_test.go | 13 +++++++++---- pkg/orchestrator/orchestrator_test.go | 19 +++++++++---------- 3 files changed, 23 insertions(+), 17 deletions(-) diff --git a/pkg/lifecycle/service.go b/pkg/lifecycle/service.go index 3173ce4e2..c68097370 100644 --- a/pkg/lifecycle/service.go +++ b/pkg/lifecycle/service.go @@ -185,7 +185,7 @@ func (s *Service) Start( return nil } -// Restart stops an existing pipeline and replaces their nodes. +// Restart replaces the nodes of a pipeline and starts it again. func (s *Service) Restart(ctx context.Context, rp *runnablePipeline) error { s.logger.Debug(ctx).Str(log.PipelineIDField, rp.pipeline.ID).Msg("restarting pipeline") s.logger.Trace(ctx).Str(log.PipelineIDField, rp.pipeline.ID).Msg("swapping nodes") @@ -211,7 +211,6 @@ func (s *Service) Restart(ctx context.Context, rp *runnablePipeline) error { // It'll check the number of times the pipeline has been restarted and the duration of the backoff. // When the pipeline has reached out the maximum number of retries, it'll return a fatal error. func (s *Service) RestartWithBackoff(ctx context.Context, rp *runnablePipeline) error { - // backoffCfg.Attempt() returns a float64 attempt := int(rp.backoffCfg.Attempt()) duration := rp.backoffCfg.Duration() @@ -292,6 +291,8 @@ func (s *Service) stopForceful(ctx context.Context, rp *runnablePipeline) error Str(log.PipelineIDField, rp.pipeline.ID). Any(log.PipelineStatusField, rp.pipeline.GetStatus()). Msg("force stopping pipeline") + + // Creates a FatalError to prevent the pipeline from recovering. rp.t.Kill(cerrors.FatalError(pipeline.ErrForceStop)) for _, n := range rp.n { if node, ok := n.(stream.ForceStoppableNode); ok { @@ -375,7 +376,7 @@ func (s *Service) WaitPipeline(id string) error { return p.t.Wait() } -// buildsNodes will build and connect all nodes configured in the pipeline. +// buildsNodes will build new nodes that will be assigned to the pipeline.Instance. func (s *Service) buildNodes(ctx context.Context, pl *pipeline.Instance) ([]stream.Node, error) { // setup many to many channels fanIn := stream.FaninNode{Name: "fanin"} @@ -748,6 +749,7 @@ func (s *Service) runPipeline(ctx context.Context, rp *runnablePipeline) error { }) } + // TODO: When it's recovering, we should only update the status back to running once HealthyAfter has passed. err := s.pipelines.UpdateStatus(ctx, rp.pipeline.ID, pipeline.StatusRunning, "") if err != nil { return err diff --git a/pkg/lifecycle/service_test.go b/pkg/lifecycle/service_test.go index f7857e18b..52228a6e4 100644 --- a/pkg/lifecycle/service_test.go +++ b/pkg/lifecycle/service_test.go @@ -562,15 +562,15 @@ func TestServiceLifecycle_StopAll(t *testing.T) { } } +// Creates first a pipeline that will stop with a recoverable error, to check later that it restarted and it's running. func TestServiceLifecycle_StopAll_Recovering(t *testing.T) { - wantErr := cerrors.New("lost connection to database") - is := is.New(t) ctx, killAll := context.WithCancel(context.Background()) defer killAll() logger := log.New(zerolog.Nop()) db := &inmemory.DB{} persister := connector.NewPersister(logger, db, time.Second, 3) + wantErr := cerrors.New("lost connection to database") ps := pipeline.NewService(logger, db) @@ -623,10 +623,12 @@ func TestServiceLifecycle_StopAll_Recovering(t *testing.T) { c <- ls.WaitPipeline(pl.ID) }() + // force the pipeline to stop with a recoverable error ls.StopAll(ctx, wantErr) - err, _, err2 := c.RecvTimeout(ctx, 10000*time.Millisecond) + err, _, ctxErr := c.RecvTimeout(ctx, 10000*time.Millisecond) + is.NoErr(ctxErr) - is.NoErr(err2) + // check the first pipeline stopped with the error that caused the restart is.True(cerrors.Is(err, wantErr)) go func() { @@ -636,6 +638,7 @@ func TestServiceLifecycle_StopAll_Recovering(t *testing.T) { _, _, err = c.RecvTimeout(ctx, 1000*time.Millisecond) is.True(cerrors.Is(err, context.DeadlineExceeded)) + // stop the running pipeline err = ls.Stop(ctx, pl.ID, false) is.NoErr(err) @@ -647,6 +650,8 @@ func TestServiceLifecycle_StopAll_Recovering(t *testing.T) { }() err, _, _ = c.RecvTimeout(ctx, 1000*time.Millisecond) is.NoErr(err) + + // This is to demonstrate the test indeed stopped the pipeline is.Equal(pipeline.StatusUserStopped, pl.GetStatus()) } diff --git a/pkg/orchestrator/orchestrator_test.go b/pkg/orchestrator/orchestrator_test.go index 67ae8e23d..373689232 100644 --- a/pkg/orchestrator/orchestrator_test.go +++ b/pkg/orchestrator/orchestrator_test.go @@ -96,16 +96,15 @@ func TestPipelineSimple(t *testing.T) { processorService := processor.NewService(logger, db, procPluginService) pipelineService := pipeline.NewService(logger, db) - lifecycleService := lifecycle.NewService( - logger, - &lifecycle.ErrRecoveryCfg{ - MinDelay: time.Second, - MaxDelay: 10 * time.Minute, - BackoffFactor: 2, - MaxRetries: 0, - HealthyAfter: 5 * time.Minute, - }, - connectorService, processorService, connPluginService, pipelineService) + errRecoveryCfg := &lifecycle.ErrRecoveryCfg{ + MinDelay: time.Second, + MaxDelay: 10 * time.Minute, + BackoffFactor: 2, + MaxRetries: 0, + HealthyAfter: 5 * time.Minute, + } + + lifecycleService := lifecycle.NewService(logger, errRecoveryCfg, connectorService, processorService, connPluginService, pipelineService) orc := NewOrchestrator( db, From e63b7f58f1ef74f35f067be2ee88ff5a5748b84f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ra=C3=BAl=20Barroso?= Date: Mon, 23 Sep 2024 15:28:11 -0400 Subject: [PATCH 17/40] move to a separate method --- pkg/lifecycle/service.go | 22 +++++++++++++++------- 1 file changed, 15 insertions(+), 7 deletions(-) diff --git a/pkg/lifecycle/service.go b/pkg/lifecycle/service.go index c68097370..5b558a921 100644 --- a/pkg/lifecycle/service.go +++ b/pkg/lifecycle/service.go @@ -198,6 +198,9 @@ func (s *Service) Restart(ctx context.Context, rp *runnablePipeline) error { // Replaces the old nodes with the new ones. rp.n = nodes + // clears out the tomb + rp.t.Kill(cerrors.New("restarting pipeline")) + s.logger.Trace(ctx).Str(log.PipelineIDField, rp.pipeline.ID).Msg("running nodes") if err := s.runPipeline(ctx, rp); err != nil { return cerrors.Errorf("failed to run pipeline %s: %w", rp.pipeline.ID, err) @@ -786,13 +789,7 @@ func (s *Service) runPipeline(ctx context.Context, rp *runnablePipeline) error { return err } } else { - err = s.pipelines.UpdateStatus(ctx, rp.pipeline.ID, pipeline.StatusRecovering, "") - if err != nil { - return err - } - - // Exit the goroutine and attempt to restart the pipeline - return s.RestartWithBackoff(ctx, rp) + return s.recoverPipeline(ctx, rp) } } @@ -810,6 +807,17 @@ func (s *Service) runPipeline(ctx context.Context, rp *runnablePipeline) error { return nil } +// recoverPipeline attempts to recover a pipeline that has stopped running. +func (s *Service) recoverPipeline(ctx context.Context, rp *runnablePipeline) error { + err := s.pipelines.UpdateStatus(ctx, rp.pipeline.ID, pipeline.StatusRecovering, "") + if err != nil { + return err + } + + // Exit the goroutine and attempt to restart the pipeline + return s.RestartWithBackoff(ctx, rp) +} + // notify notifies all registered FailureHandlers about an error. func (s *Service) notify(pipelineID string, err error) { if err == nil { From c2faba2ba5f9e48673e80cc9972aad6c24ea4dd1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ra=C3=BAl=20Barroso?= Date: Tue, 1 Oct 2024 13:35:34 +0200 Subject: [PATCH 18/40] add tracing and fix max retries --- pkg/lifecycle/service.go | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/pkg/lifecycle/service.go b/pkg/lifecycle/service.go index 5b558a921..e9e87901c 100644 --- a/pkg/lifecycle/service.go +++ b/pkg/lifecycle/service.go @@ -214,13 +214,18 @@ func (s *Service) Restart(ctx context.Context, rp *runnablePipeline) error { // It'll check the number of times the pipeline has been restarted and the duration of the backoff. // When the pipeline has reached out the maximum number of retries, it'll return a fatal error. func (s *Service) RestartWithBackoff(ctx context.Context, rp *runnablePipeline) error { + s.logger.Trace(ctx).Str(log.PipelineIDField, rp.pipeline.ID).Msg("restarting with backoff") + attempt := int(rp.backoffCfg.Attempt()) duration := rp.backoffCfg.Duration() - if attempt > s.errRecoveryCfg.MaxRetries { + // maxRetries 0 means infinite retries + if attempt > s.errRecoveryCfg.MaxRetries && s.errRecoveryCfg.MaxRetries != 0 { return cerrors.FatalError(cerrors.Errorf("failed to recover pipeline %s after %d attempts: %w", rp.pipeline.ID, attempt, pipeline.ErrPipelineCannotRecover)) } + s.logger.Trace(ctx).Dur(log.DurationField, duration).Int("attempt", attempt).Msg("backoff configuration") + // This results in a default delay progression of 1s, 2s, 4s, 8s, 16s, [...], 10m, 10m,... balancing the need for recovery time and minimizing downtime. timer := time.NewTimer(duration) select { @@ -771,6 +776,8 @@ func (s *Service) runPipeline(ctx context.Context, rp *runnablePipeline) error { switch err { case tomb.ErrStillAlive: // not an actual error, the pipeline stopped gracefully + err = nil + if isGracefulShutdown.Load() { // it was triggered by a graceful shutdown of Conduit err = s.pipelines.UpdateStatus(ctx, rp.pipeline.ID, pipeline.StatusSystemStopped, "") @@ -809,6 +816,8 @@ func (s *Service) runPipeline(ctx context.Context, rp *runnablePipeline) error { // recoverPipeline attempts to recover a pipeline that has stopped running. func (s *Service) recoverPipeline(ctx context.Context, rp *runnablePipeline) error { + s.logger.Trace(ctx).Str(log.PipelineIDField, rp.pipeline.ID).Msg("recovering pipeline") + err := s.pipelines.UpdateStatus(ctx, rp.pipeline.ID, pipeline.StatusRecovering, "") if err != nil { return err From d396da9aa6af8942870cc3ca0c11a2f530a26e4e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ra=C3=BAl=20Barroso?= Date: Tue, 1 Oct 2024 14:48:29 +0200 Subject: [PATCH 19/40] not needed --- pkg/lifecycle/service.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/pkg/lifecycle/service.go b/pkg/lifecycle/service.go index e9e87901c..7fbec53fa 100644 --- a/pkg/lifecycle/service.go +++ b/pkg/lifecycle/service.go @@ -776,8 +776,6 @@ func (s *Service) runPipeline(ctx context.Context, rp *runnablePipeline) error { switch err { case tomb.ErrStillAlive: // not an actual error, the pipeline stopped gracefully - err = nil - if isGracefulShutdown.Load() { // it was triggered by a graceful shutdown of Conduit err = s.pipelines.UpdateStatus(ctx, rp.pipeline.ID, pipeline.StatusSystemStopped, "") From a17eb5e29bfd780af42de4f97ea4d98582bce972 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ra=C3=BAl=20Barroso?= Date: Tue, 1 Oct 2024 22:42:14 +0200 Subject: [PATCH 20/40] Set -1 as infinite retries for err recovery --- pkg/conduit/config.go | 12 +++++++----- pkg/conduit/config_test.go | 6 +++--- pkg/conduit/entrypoint.go | 2 +- pkg/foundation/log/fields.go | 1 + pkg/lifecycle/service.go | 13 +++++++------ 5 files changed, 19 insertions(+), 15 deletions(-) diff --git a/pkg/conduit/config.go b/pkg/conduit/config.go index b7abb1cb8..8a2c7e681 100644 --- a/pkg/conduit/config.go +++ b/pkg/conduit/config.go @@ -35,6 +35,8 @@ const ( SchemaRegistryTypeConfluent = "confluent" SchemaRegistryTypeBuiltin = "builtin" + + InfiniteRetriesErrRecovery = -1 ) // Config holds all configurable values for Conduit. @@ -93,8 +95,8 @@ type Config struct { MaxDelay time.Duration // BackoffFactor is the factor by which the delay is multiplied after each restart: Default: 2 BackoffFactor int - // MaxRetries is the maximum number of restarts before the pipeline is considered unhealthy: Default: 0 (infinite) - MaxRetries int + // MaxRetries is the maximum number of restarts before the pipeline is considered unhealthy: Default: -1 (infinite) + MaxRetries int64 // HealthyAfter is the time after which the pipeline is considered healthy: Default: 5 minutes HealthyAfter time.Duration } @@ -142,7 +144,7 @@ func DefaultConfig() Config { cfg.Pipelines.ErrorRecovery.MinDelay = time.Second cfg.Pipelines.ErrorRecovery.MaxDelay = 10 * time.Minute cfg.Pipelines.ErrorRecovery.BackoffFactor = 2 - cfg.Pipelines.ErrorRecovery.MaxRetries = 0 // infinite retries + cfg.Pipelines.ErrorRecovery.MaxRetries = InfiniteRetriesErrRecovery cfg.Pipelines.ErrorRecovery.HealthyAfter = 5 * time.Minute cfg.SchemaRegistry.Type = SchemaRegistryTypeBuiltin @@ -211,8 +213,8 @@ func (c Config) validateErrorRecovery() error { if err := requireNonNegativeValue("backoff-factor", errRecoveryCfg.BackoffFactor); err != nil { errs = append(errs, err) } - if err := requireNonNegativeValue("max-retries", errRecoveryCfg.MaxRetries); err != nil { - errs = append(errs, err) + if errRecoveryCfg.MaxRetries < InfiniteRetriesErrRecovery { + errs = append(errs, cerrors.Errorf(`"max-retries" can't be smaller than %d (infinite retries)`, InfiniteRetriesErrRecovery)) } if err := requirePositiveValue("healthy-after", errRecoveryCfg.HealthyAfter); err != nil { errs = append(errs, err) diff --git a/pkg/conduit/config_test.go b/pkg/conduit/config_test.go index c252600c3..49f235959 100644 --- a/pkg/conduit/config_test.go +++ b/pkg/conduit/config_test.go @@ -196,12 +196,12 @@ func TestConfig_Validate(t *testing.T) { want: cerrors.New(`invalid error recovery config: "backoff-factor" config value mustn't be negative (got: -1)`), }, { - name: "error recovery: negative max-retries", + name: "error recovery: max-retries smaller than -1", setupConfig: func(c Config) Config { - c.Pipelines.ErrorRecovery.MaxRetries = -1 + c.Pipelines.ErrorRecovery.MaxRetries = InfiniteRetriesErrRecovery - 1 return c }, - want: cerrors.New(`invalid error recovery config: "max-retries" config value mustn't be negative (got: -1)`), + want: cerrors.New(`invalid error recovery config: "max-retries" can't be smaller than -1 (infinite retries)`), }, { name: "error recovery: negative healthy-after", diff --git a/pkg/conduit/entrypoint.go b/pkg/conduit/entrypoint.go index e74cbad61..4660ff6f7 100644 --- a/pkg/conduit/entrypoint.go +++ b/pkg/conduit/entrypoint.go @@ -129,7 +129,7 @@ func (*Entrypoint) Flags(cfg *Config) *flag.FlagSet { cfg.Pipelines.ErrorRecovery.BackoffFactor, "backoff factor applied to the last delay", ) - flags.IntVar( + flags.Int64Var( &cfg.Pipelines.ErrorRecovery.MaxRetries, "pipelines.error-recovery.max-retries", cfg.Pipelines.ErrorRecovery.MaxRetries, diff --git a/pkg/foundation/log/fields.go b/pkg/foundation/log/fields.go index 7f4b883c9..3a0da1b01 100644 --- a/pkg/foundation/log/fields.go +++ b/pkg/foundation/log/fields.go @@ -17,6 +17,7 @@ package log const ( ComponentField = "component" ConnectorIDField = "connector_id" + AttemptField = "attempt" DurationField = "duration" MessageIDField = "message_id" NodeIDField = "node_id" diff --git a/pkg/lifecycle/service.go b/pkg/lifecycle/service.go index 7fbec53fa..a0490c22f 100644 --- a/pkg/lifecycle/service.go +++ b/pkg/lifecycle/service.go @@ -39,6 +39,8 @@ import ( "gopkg.in/tomb.v2" ) +const InfiniteRetriesErrRecovery = -1 + type FailureEvent struct { // ID is the ID of the pipeline which failed. ID string @@ -51,7 +53,7 @@ type ErrRecoveryCfg struct { MinDelay time.Duration MaxDelay time.Duration BackoffFactor int - MaxRetries int + MaxRetries int64 HealthyAfter time.Duration } @@ -216,15 +218,14 @@ func (s *Service) Restart(ctx context.Context, rp *runnablePipeline) error { func (s *Service) RestartWithBackoff(ctx context.Context, rp *runnablePipeline) error { s.logger.Trace(ctx).Str(log.PipelineIDField, rp.pipeline.ID).Msg("restarting with backoff") - attempt := int(rp.backoffCfg.Attempt()) + attempt := int64(rp.backoffCfg.Attempt()) duration := rp.backoffCfg.Duration() - // maxRetries 0 means infinite retries - if attempt > s.errRecoveryCfg.MaxRetries && s.errRecoveryCfg.MaxRetries != 0 { + if s.errRecoveryCfg.MaxRetries != InfiniteRetriesErrRecovery && attempt > s.errRecoveryCfg.MaxRetries { return cerrors.FatalError(cerrors.Errorf("failed to recover pipeline %s after %d attempts: %w", rp.pipeline.ID, attempt, pipeline.ErrPipelineCannotRecover)) } - s.logger.Trace(ctx).Dur(log.DurationField, duration).Int("attempt", attempt).Msg("backoff configuration") + s.logger.Trace(ctx).Dur(log.DurationField, duration).Int64(log.AttemptField, attempt).Msg("backoff configuration") // This results in a default delay progression of 1s, 2s, 4s, 8s, 16s, [...], 10m, 10m,... balancing the need for recovery time and minimizing downtime. timer := time.NewTimer(duration) @@ -239,7 +240,7 @@ func (s *Service) RestartWithBackoff(ctx context.Context, rp *runnablePipeline) if rp.pipeline.GetStatus() == pipeline.StatusRunning { s.logger.Debug(ctx). Str(log.PipelineIDField, rp.pipeline.ID). - Int("attempt", attempt). + Int64("attempt", attempt). Int("backoffRetry.count", s.errRecoveryCfg.BackoffFactor). Int64("backoffRetry.duration", duration.Milliseconds()). Msg("pipeline recovered") From aaa6f614fa8d3166a06bd082b6204262dae3e304 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ra=C3=BAl=20Barroso?= Date: Wed, 2 Oct 2024 14:04:51 +0200 Subject: [PATCH 21/40] rename flag --- pkg/conduit/config.go | 6 +++--- pkg/conduit/entrypoint.go | 29 +++++++++++++++++++++++++---- pkg/conduit/runtime.go | 4 ++-- 3 files changed, 30 insertions(+), 9 deletions(-) diff --git a/pkg/conduit/config.go b/pkg/conduit/config.go index 8a2c7e681..eec24f2fb 100644 --- a/pkg/conduit/config.go +++ b/pkg/conduit/config.go @@ -86,9 +86,9 @@ type Config struct { } Pipelines struct { - Path string - ExitOnError bool - ErrorRecovery struct { + Path string + ExitOnDegraded bool + ErrorRecovery struct { // MinDelay is the minimum delay before restart: Default: 1 second MinDelay time.Duration // MaxDelay is the maximum delay before restart: Default: 10 minutes diff --git a/pkg/conduit/entrypoint.go b/pkg/conduit/entrypoint.go index 4660ff6f7..6443381bd 100644 --- a/pkg/conduit/entrypoint.go +++ b/pkg/conduit/entrypoint.go @@ -71,6 +71,16 @@ func (e *Entrypoint) Serve(cfg Config) { } } +func deprecatedFlag(name string) bool { + deprecatedFlags := []string{"pipelines.exit-on-error"} + for _, flag := range deprecatedFlags { + if name == flag { + return true + } + } + return false +} + // Flags returns a flag set that, when parsed, stores the values in the provided // config struct. func (*Entrypoint) Flags(cfg *Config) *flag.FlagSet { @@ -105,12 +115,23 @@ func (*Entrypoint) Flags(cfg *Config) *flag.FlagSet { cfg.Pipelines.Path, "path to the directory that has the yaml pipeline configuration files, or a single pipeline configuration file", ) + + // Deprecated: use `pipelines.exit-on-degraded` instead + // Note: If both `pipeline.exit-on-error` and `pipeline.exit-on-degraded` are set, `pipeline.exit-on-degraded` will take precedence flags.BoolVar( - &cfg.Pipelines.ExitOnError, + &cfg.Pipelines.ExitOnDegraded, "pipelines.exit-on-error", - cfg.Pipelines.ExitOnError, - "exit Conduit if a pipeline experiences an error while running", + cfg.Pipelines.ExitOnDegraded, + "Deprecated: use `exit-on-degraded` instead.\nexit Conduit if a pipeline experiences an error while running", ) + + flags.BoolVar( + &cfg.Pipelines.ExitOnDegraded, + "pipelines.exit-on-degraded", + cfg.Pipelines.ExitOnDegraded, + "exit Conduit if a pipeline enters a degraded state", + ) + flags.DurationVar( &cfg.Pipelines.ErrorRecovery.MinDelay, "pipelines.error-recovery.min-delay", @@ -155,7 +176,7 @@ func (*Entrypoint) Flags(cfg *Config) *flag.FlagSet { flags.Usage = func() { tmpFlags := flag.NewFlagSet("conduit", flag.ExitOnError) flags.VisitAll(func(f *flag.Flag) { - if f.Name == "dev" || strings.HasPrefix(f.Name, "dev.") != *showDevHelp { + if f.Name == "dev" || strings.HasPrefix(f.Name, "dev.") != *showDevHelp || deprecatedFlag(f.Name) { return // hide flag from output } // reset value to its default, to ensure default is shown correctly diff --git a/pkg/conduit/runtime.go b/pkg/conduit/runtime.go index 43504062a..596661dc2 100644 --- a/pkg/conduit/runtime.go +++ b/pkg/conduit/runtime.go @@ -769,7 +769,7 @@ func (r *Runtime) initServices(ctx context.Context, t *tomb.Tomb) error { return cerrors.Errorf("failed to init connector service: %w", err) } - if r.Config.Pipelines.ExitOnError { + if r.Config.Pipelines.ExitOnDegraded { r.lifecycleService.OnFailure(func(e lifecycle.FailureEvent) { r.logger.Warn(ctx). Err(e.Error). @@ -788,7 +788,7 @@ func (r *Runtime) initServices(ctx context.Context, t *tomb.Tomb) error { cerrors.ForEach(err, func(err error) { r.logger.Err(ctx, err).Msg("provisioning failed") }) - if r.Config.Pipelines.ExitOnError { + if r.Config.Pipelines.ExitOnDegraded { r.logger.Warn(ctx). Err(err). Msg("Conduit will shut down due to a pipeline provisioning failure and 'exit on error' enabled") From 29a944dfd3c8f753a36ed14fd7b84093043da909 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ra=C3=BAl=20Barroso?= Date: Wed, 2 Oct 2024 14:39:50 +0200 Subject: [PATCH 22/40] goes to degraded once it exits --- pkg/conduit/config.go | 9 ++++----- pkg/lifecycle/service.go | 8 +++++++- 2 files changed, 11 insertions(+), 6 deletions(-) diff --git a/pkg/conduit/config.go b/pkg/conduit/config.go index eec24f2fb..3e2fab78d 100644 --- a/pkg/conduit/config.go +++ b/pkg/conduit/config.go @@ -22,6 +22,7 @@ import ( sdk "github.com/conduitio/conduit-connector-sdk" "github.com/conduitio/conduit/pkg/foundation/cerrors" "github.com/conduitio/conduit/pkg/foundation/log" + "github.com/conduitio/conduit/pkg/lifecycle" "github.com/conduitio/conduit/pkg/plugin/connector/builtin" "github.com/rs/zerolog" "golang.org/x/exp/constraints" @@ -35,8 +36,6 @@ const ( SchemaRegistryTypeConfluent = "confluent" SchemaRegistryTypeBuiltin = "builtin" - - InfiniteRetriesErrRecovery = -1 ) // Config holds all configurable values for Conduit. @@ -144,7 +143,7 @@ func DefaultConfig() Config { cfg.Pipelines.ErrorRecovery.MinDelay = time.Second cfg.Pipelines.ErrorRecovery.MaxDelay = 10 * time.Minute cfg.Pipelines.ErrorRecovery.BackoffFactor = 2 - cfg.Pipelines.ErrorRecovery.MaxRetries = InfiniteRetriesErrRecovery + cfg.Pipelines.ErrorRecovery.MaxRetries = lifecycle.InfiniteRetriesErrRecovery cfg.Pipelines.ErrorRecovery.HealthyAfter = 5 * time.Minute cfg.SchemaRegistry.Type = SchemaRegistryTypeBuiltin @@ -213,8 +212,8 @@ func (c Config) validateErrorRecovery() error { if err := requireNonNegativeValue("backoff-factor", errRecoveryCfg.BackoffFactor); err != nil { errs = append(errs, err) } - if errRecoveryCfg.MaxRetries < InfiniteRetriesErrRecovery { - errs = append(errs, cerrors.Errorf(`"max-retries" can't be smaller than %d (infinite retries)`, InfiniteRetriesErrRecovery)) + if errRecoveryCfg.MaxRetries < lifecycle.InfiniteRetriesErrRecovery { + errs = append(errs, cerrors.Errorf(`"max-retries" can't be smaller than %d (infinite retries)`, lifecycle.InfiniteRetriesErrRecovery)) } if err := requirePositiveValue("healthy-after", errRecoveryCfg.HealthyAfter); err != nil { errs = append(errs, err) diff --git a/pkg/lifecycle/service.go b/pkg/lifecycle/service.go index a0490c22f..38d6ea719 100644 --- a/pkg/lifecycle/service.go +++ b/pkg/lifecycle/service.go @@ -795,7 +795,13 @@ func (s *Service) runPipeline(ctx context.Context, rp *runnablePipeline) error { return err } } else { - return s.recoverPipeline(ctx, rp) + err := s.recoverPipeline(ctx, rp) + if err != nil { + err := s.pipelines.UpdateStatus(ctx, rp.pipeline.ID, pipeline.StatusDegraded, fmt.Sprintf("%+v", err)) + if err != nil { + return err + } + } } } From fb92158679fea1a1f0f6f4587ff94e76289ae288 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ra=C3=BAl=20Barroso?= Date: Wed, 2 Oct 2024 14:43:49 +0200 Subject: [PATCH 23/40] fix const --- pkg/conduit/config_test.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pkg/conduit/config_test.go b/pkg/conduit/config_test.go index 49f235959..db54c6cd0 100644 --- a/pkg/conduit/config_test.go +++ b/pkg/conduit/config_test.go @@ -20,6 +20,7 @@ import ( "github.com/conduitio/conduit-commons/database/inmemory" "github.com/conduitio/conduit/pkg/foundation/cerrors" + "github.com/conduitio/conduit/pkg/lifecycle" "github.com/matryer/is" ) @@ -198,7 +199,7 @@ func TestConfig_Validate(t *testing.T) { { name: "error recovery: max-retries smaller than -1", setupConfig: func(c Config) Config { - c.Pipelines.ErrorRecovery.MaxRetries = InfiniteRetriesErrRecovery - 1 + c.Pipelines.ErrorRecovery.MaxRetries = lifecycle.InfiniteRetriesErrRecovery - 1 return c }, want: cerrors.New(`invalid error recovery config: "max-retries" can't be smaller than -1 (infinite retries)`), From c4f167de42d3f368da43ddfc7d390f53f92c7bc7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ra=C3=BAl=20Barroso?= Date: Wed, 2 Oct 2024 15:35:58 +0200 Subject: [PATCH 24/40] simpler implementation --- pkg/conduit/entrypoint.go | 17 ++++++----------- 1 file changed, 6 insertions(+), 11 deletions(-) diff --git a/pkg/conduit/entrypoint.go b/pkg/conduit/entrypoint.go index 6443381bd..1cd74aceb 100644 --- a/pkg/conduit/entrypoint.go +++ b/pkg/conduit/entrypoint.go @@ -71,16 +71,6 @@ func (e *Entrypoint) Serve(cfg Config) { } } -func deprecatedFlag(name string) bool { - deprecatedFlags := []string{"pipelines.exit-on-error"} - for _, flag := range deprecatedFlags { - if name == flag { - return true - } - } - return false -} - // Flags returns a flag set that, when parsed, stores the values in the provided // config struct. func (*Entrypoint) Flags(cfg *Config) *flag.FlagSet { @@ -172,11 +162,16 @@ func (*Entrypoint) Flags(cfg *Config) *flag.FlagSet { flags.StringVar(&cfg.dev.memprofile, "dev.memprofile", "", "write memory profile to file") flags.StringVar(&cfg.dev.blockprofile, "dev.blockprofile", "", "write block profile to file") + // Deprecated flags that are hidden from help output + deprecatedFlags := map[string]bool{ + "pipelines.exit-on-error": true, + } + // show user or dev flags flags.Usage = func() { tmpFlags := flag.NewFlagSet("conduit", flag.ExitOnError) flags.VisitAll(func(f *flag.Flag) { - if f.Name == "dev" || strings.HasPrefix(f.Name, "dev.") != *showDevHelp || deprecatedFlag(f.Name) { + if f.Name == "dev" || strings.HasPrefix(f.Name, "dev.") != *showDevHelp || deprecatedFlags[f.Name] { return // hide flag from output } // reset value to its default, to ensure default is shown correctly From 05f38e78668b366ae8468c31b8e107b4ebc9fe10 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ra=C3=BAl=20Barroso?= Date: Wed, 2 Oct 2024 15:37:35 +0200 Subject: [PATCH 25/40] add test case for 0 max-retries --- pkg/conduit/config_test.go | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/pkg/conduit/config_test.go b/pkg/conduit/config_test.go index db54c6cd0..da836b475 100644 --- a/pkg/conduit/config_test.go +++ b/pkg/conduit/config_test.go @@ -204,6 +204,14 @@ func TestConfig_Validate(t *testing.T) { }, want: cerrors.New(`invalid error recovery config: "max-retries" can't be smaller than -1 (infinite retries)`), }, + { + name: "error recovery: with 0 max-retries ", + setupConfig: func(c Config) Config { + c.Pipelines.ErrorRecovery.MaxRetries = 0 + return c + }, + want: nil, + }, { name: "error recovery: negative healthy-after", setupConfig: func(c Config) Config { From 65998c34be5c46af1dccd1c6b7a2fe84a4f03058 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ra=C3=BAl=20Barroso?= Date: Wed, 2 Oct 2024 15:52:18 +0200 Subject: [PATCH 26/40] fix test --- pkg/lifecycle/service.go | 2 ++ pkg/lifecycle/service_test.go | 2 +- 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/pkg/lifecycle/service.go b/pkg/lifecycle/service.go index 38d6ea719..c6c4af6b1 100644 --- a/pkg/lifecycle/service.go +++ b/pkg/lifecycle/service.go @@ -795,6 +795,7 @@ func (s *Service) runPipeline(ctx context.Context, rp *runnablePipeline) error { return err } } else { + // try to recover the pipeline err := s.recoverPipeline(ctx, rp) if err != nil { err := s.pipelines.UpdateStatus(ctx, rp.pipeline.ID, pipeline.StatusDegraded, fmt.Sprintf("%+v", err)) @@ -802,6 +803,7 @@ func (s *Service) runPipeline(ctx context.Context, rp *runnablePipeline) error { return err } } + return nil } } diff --git a/pkg/lifecycle/service_test.go b/pkg/lifecycle/service_test.go index 52228a6e4..a9953bc82 100644 --- a/pkg/lifecycle/service_test.go +++ b/pkg/lifecycle/service_test.go @@ -929,7 +929,7 @@ func testErrRecoveryCfg() *ErrRecoveryCfg { MinDelay: time.Second, MaxDelay: 10 * time.Minute, BackoffFactor: 2, - MaxRetries: 0, // infinite retries + MaxRetries: InfiniteRetriesErrRecovery, HealthyAfter: 5 * time.Minute, } } From 2f4065b50f5bbe46ef8a4692bfac80939f0c86f2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ra=C3=BAl=20Barroso?= Date: Wed, 2 Oct 2024 18:37:58 +0200 Subject: [PATCH 27/40] no need to kill the tomb --- pkg/lifecycle/service.go | 3 --- 1 file changed, 3 deletions(-) diff --git a/pkg/lifecycle/service.go b/pkg/lifecycle/service.go index c6c4af6b1..4d19bb947 100644 --- a/pkg/lifecycle/service.go +++ b/pkg/lifecycle/service.go @@ -200,9 +200,6 @@ func (s *Service) Restart(ctx context.Context, rp *runnablePipeline) error { // Replaces the old nodes with the new ones. rp.n = nodes - // clears out the tomb - rp.t.Kill(cerrors.New("restarting pipeline")) - s.logger.Trace(ctx).Str(log.PipelineIDField, rp.pipeline.ID).Msg("running nodes") if err := s.runPipeline(ctx, rp); err != nil { return cerrors.Errorf("failed to run pipeline %s: %w", rp.pipeline.ID, err) From 00057c657153f82998e98d9fd58d1ec3bb837ec6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ra=C3=BAl=20Barroso?= Date: Wed, 2 Oct 2024 20:11:29 +0200 Subject: [PATCH 28/40] typo and fix --- pkg/conduit/runtime.go | 4 ++-- pkg/lifecycle/service.go | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/pkg/conduit/runtime.go b/pkg/conduit/runtime.go index 596661dc2..0de2276c9 100644 --- a/pkg/conduit/runtime.go +++ b/pkg/conduit/runtime.go @@ -774,8 +774,8 @@ func (r *Runtime) initServices(ctx context.Context, t *tomb.Tomb) error { r.logger.Warn(ctx). Err(e.Error). Str(log.PipelineIDField, e.ID). - Msg("Conduit will shut down due to a pipeline failure and 'exit on error' enabled") - t.Kill(cerrors.Errorf("shut down due to 'exit on error' enabled: %w", e.Error)) + Msg("Conduit will shut down due to a pipeline failure and 'exit-on-degraded' enabled") + t.Kill(cerrors.Errorf("shut down due to 'exit-on-degraded' error: %w", e.Error)) }) } err = r.pipelineService.Init(ctx) diff --git a/pkg/lifecycle/service.go b/pkg/lifecycle/service.go index 4d19bb947..c96b4c5b5 100644 --- a/pkg/lifecycle/service.go +++ b/pkg/lifecycle/service.go @@ -218,7 +218,7 @@ func (s *Service) RestartWithBackoff(ctx context.Context, rp *runnablePipeline) attempt := int64(rp.backoffCfg.Attempt()) duration := rp.backoffCfg.Duration() - if s.errRecoveryCfg.MaxRetries != InfiniteRetriesErrRecovery && attempt > s.errRecoveryCfg.MaxRetries { + if s.errRecoveryCfg.MaxRetries != InfiniteRetriesErrRecovery && attempt >= s.errRecoveryCfg.MaxRetries { return cerrors.FatalError(cerrors.Errorf("failed to recover pipeline %s after %d attempts: %w", rp.pipeline.ID, attempt, pipeline.ErrPipelineCannotRecover)) } From b84e0ddb3de82e6a488a57b327bd672731791d7d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ra=C3=BAl=20Barroso?= Date: Thu, 3 Oct 2024 00:40:26 +0200 Subject: [PATCH 29/40] fix exit-on-degraded when degraded --- pkg/lifecycle/service.go | 21 +++++++++------------ 1 file changed, 9 insertions(+), 12 deletions(-) diff --git a/pkg/lifecycle/service.go b/pkg/lifecycle/service.go index c96b4c5b5..80d42549e 100644 --- a/pkg/lifecycle/service.go +++ b/pkg/lifecycle/service.go @@ -776,31 +776,28 @@ func (s *Service) runPipeline(ctx context.Context, rp *runnablePipeline) error { // not an actual error, the pipeline stopped gracefully if isGracefulShutdown.Load() { // it was triggered by a graceful shutdown of Conduit - err = s.pipelines.UpdateStatus(ctx, rp.pipeline.ID, pipeline.StatusSystemStopped, "") + if err := s.pipelines.UpdateStatus(ctx, rp.pipeline.ID, pipeline.StatusSystemStopped, ""); err != nil { + return err + } } else { // it was manually triggered by a user - err = s.pipelines.UpdateStatus(ctx, rp.pipeline.ID, pipeline.StatusUserStopped, "") - } - if err != nil { - return err + if err := s.pipelines.UpdateStatus(ctx, rp.pipeline.ID, pipeline.StatusUserStopped, ""); err != nil { + return err + } } default: if cerrors.IsFatalError(err) { // we use %+v to get the stack trace too - err = s.pipelines.UpdateStatus(ctx, rp.pipeline.ID, pipeline.StatusDegraded, fmt.Sprintf("%+v", err)) - if err != nil { + if err := s.pipelines.UpdateStatus(ctx, rp.pipeline.ID, pipeline.StatusDegraded, fmt.Sprintf("%+v", err)); err != nil { return err } } else { // try to recover the pipeline - err := s.recoverPipeline(ctx, rp) - if err != nil { - err := s.pipelines.UpdateStatus(ctx, rp.pipeline.ID, pipeline.StatusDegraded, fmt.Sprintf("%+v", err)) - if err != nil { + if err := s.recoverPipeline(ctx, rp); err != nil { + if err := s.pipelines.UpdateStatus(ctx, rp.pipeline.ID, pipeline.StatusDegraded, fmt.Sprintf("%+v", err)); err != nil { return err } } - return nil } } From b0f21d44b985141951d9ac21bfbcb8b6c9451a8e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ra=C3=BAl=20Barroso?= Date: Thu, 3 Oct 2024 12:03:27 +0200 Subject: [PATCH 30/40] log before erroring --- pkg/lifecycle/service.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/lifecycle/service.go b/pkg/lifecycle/service.go index 80d42549e..9dcc34a38 100644 --- a/pkg/lifecycle/service.go +++ b/pkg/lifecycle/service.go @@ -218,12 +218,12 @@ func (s *Service) RestartWithBackoff(ctx context.Context, rp *runnablePipeline) attempt := int64(rp.backoffCfg.Attempt()) duration := rp.backoffCfg.Duration() + s.logger.Trace(ctx).Dur(log.DurationField, duration).Int64(log.AttemptField, attempt).Msg("backoff configuration") + if s.errRecoveryCfg.MaxRetries != InfiniteRetriesErrRecovery && attempt >= s.errRecoveryCfg.MaxRetries { return cerrors.FatalError(cerrors.Errorf("failed to recover pipeline %s after %d attempts: %w", rp.pipeline.ID, attempt, pipeline.ErrPipelineCannotRecover)) } - s.logger.Trace(ctx).Dur(log.DurationField, duration).Int64(log.AttemptField, attempt).Msg("backoff configuration") - // This results in a default delay progression of 1s, 2s, 4s, 8s, 16s, [...], 10m, 10m,... balancing the need for recovery time and minimizing downtime. timer := time.NewTimer(duration) select { From 839182d9726fd45db80352b25d469fb1a218d18d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ra=C3=BAl=20Barroso?= Date: Thu, 3 Oct 2024 13:12:40 +0200 Subject: [PATCH 31/40] removes restart and uses start --- pkg/lifecycle/service.go | 71 +++++++++++++++++------------------ pkg/lifecycle/service_test.go | 3 ++ 2 files changed, 38 insertions(+), 36 deletions(-) diff --git a/pkg/lifecycle/service.go b/pkg/lifecycle/service.go index 9dcc34a38..d042ba9e8 100644 --- a/pkg/lifecycle/service.go +++ b/pkg/lifecycle/service.go @@ -57,6 +57,15 @@ type ErrRecoveryCfg struct { HealthyAfter time.Duration } +func (e *ErrRecoveryCfg) toBackoff() *backoff.Backoff { + return &backoff.Backoff{ + Min: e.MinDelay, + Max: e.MaxDelay, + Factor: float64(e.BackoffFactor), + Jitter: true, + } +} + // Service manages pipelines. type Service struct { logger log.CtxLogger @@ -168,10 +177,25 @@ func (s *Service) Start( } s.logger.Debug(ctx).Str(log.PipelineIDField, pl.ID).Msg("starting pipeline") - s.logger.Trace(ctx).Str(log.PipelineIDField, pl.ID).Msg("building nodes") - rp, err := s.buildRunnablePipeline(ctx, pl) + var backoffCfg *backoff.Backoff + + // We check if the pipeline was previously running and get the backoff configuration from it. + oldRp, ok := s.runningPipelines.Get(pipelineID) + if !ok { + // default backoff configuration + backoffCfg = &backoff.Backoff{ + Min: s.errRecoveryCfg.MinDelay, + Max: s.errRecoveryCfg.MaxDelay, + Factor: float64(s.errRecoveryCfg.BackoffFactor), + Jitter: true, + } + } else { + backoffCfg = &oldRp.backoffCfg + } + + rp, err := s.buildRunnablePipeline(ctx, pl, backoffCfg) if err != nil { return cerrors.Errorf("could not build nodes for pipeline %s: %w", pl.ID, err) } @@ -187,32 +211,10 @@ func (s *Service) Start( return nil } -// Restart replaces the nodes of a pipeline and starts it again. -func (s *Service) Restart(ctx context.Context, rp *runnablePipeline) error { - s.logger.Debug(ctx).Str(log.PipelineIDField, rp.pipeline.ID).Msg("restarting pipeline") - s.logger.Trace(ctx).Str(log.PipelineIDField, rp.pipeline.ID).Msg("swapping nodes") - - nodes, err := s.buildNodes(ctx, rp.pipeline) - if err != nil { - return cerrors.Errorf("could not build new nodes for pipeline %s: %w", rp.pipeline.ID, err) - } - - // Replaces the old nodes with the new ones. - rp.n = nodes - - s.logger.Trace(ctx).Str(log.PipelineIDField, rp.pipeline.ID).Msg("running nodes") - if err := s.runPipeline(ctx, rp); err != nil { - return cerrors.Errorf("failed to run pipeline %s: %w", rp.pipeline.ID, err) - } - s.logger.Info(ctx).Str(log.PipelineIDField, rp.pipeline.ID).Msg("pipeline restarted ") - - return nil -} - -// RestartWithBackoff restarts a pipeline with a backoff. +// StartWithBackoff starts a pipeline with a backoff. // It'll check the number of times the pipeline has been restarted and the duration of the backoff. // When the pipeline has reached out the maximum number of retries, it'll return a fatal error. -func (s *Service) RestartWithBackoff(ctx context.Context, rp *runnablePipeline) error { +func (s *Service) StartWithBackoff(ctx context.Context, rp *runnablePipeline) error { s.logger.Trace(ctx).Str(log.PipelineIDField, rp.pipeline.ID).Msg("restarting with backoff") attempt := int64(rp.backoffCfg.Attempt()) @@ -244,7 +246,7 @@ func (s *Service) RestartWithBackoff(ctx context.Context, rp *runnablePipeline) return nil } - return s.Restart(ctx, rp) + return s.Start(ctx, rp.pipeline.ID) } // Stop will attempt to gracefully stop a given pipeline by calling each node's @@ -430,15 +432,8 @@ func (s *Service) buildNodes(ctx context.Context, pl *pipeline.Instance) ([]stre func (s *Service) buildRunnablePipeline( ctx context.Context, pl *pipeline.Instance, + backoffCfg *backoff.Backoff, ) (*runnablePipeline, error) { - // Each pipeline will utilize this backoff configuration to recover from errors. - backoffCfg := &backoff.Backoff{ - Min: s.errRecoveryCfg.MinDelay, - Max: s.errRecoveryCfg.MaxDelay, - Factor: float64(s.errRecoveryCfg.BackoffFactor), - Jitter: true, - } - nodes, err := s.buildNodes(ctx, pl) if err != nil { return nil, err @@ -756,6 +751,10 @@ func (s *Service) runPipeline(ctx context.Context, rp *runnablePipeline) error { } // TODO: When it's recovering, we should only update the status back to running once HealthyAfter has passed. + // now: + // running -> (error) -> recovering (restart) -> running + // future (with the HealthyAfter mechanism): + // running -> (error) -> recovering (restart) -> recovering (wait for HealthyAfter) -> running err := s.pipelines.UpdateStatus(ctx, rp.pipeline.ID, pipeline.StatusRunning, "") if err != nil { return err @@ -825,7 +824,7 @@ func (s *Service) recoverPipeline(ctx context.Context, rp *runnablePipeline) err } // Exit the goroutine and attempt to restart the pipeline - return s.RestartWithBackoff(ctx, rp) + return s.StartWithBackoff(ctx, rp) } // notify notifies all registered FailureHandlers about an error. diff --git a/pkg/lifecycle/service_test.go b/pkg/lifecycle/service_test.go index a9953bc82..9e31f699a 100644 --- a/pkg/lifecycle/service_test.go +++ b/pkg/lifecycle/service_test.go @@ -88,6 +88,7 @@ func TestServiceLifecycle_buildRunnablePipeline(t *testing.T) { got, err := ls.buildRunnablePipeline( ctx, pl, + ls.errRecoveryCfg.toBackoff(), ) is.NoErr(err) @@ -173,6 +174,7 @@ func TestService_buildRunnablePipeline_NoSourceNode(t *testing.T) { got, err := ls.buildRunnablePipeline( ctx, pl, + ls.errRecoveryCfg.toBackoff(), ) is.True(err != nil) @@ -220,6 +222,7 @@ func TestService_buildRunnablePipeline_NoDestinationNode(t *testing.T) { got, err := ls.buildRunnablePipeline( ctx, pl, + ls.errRecoveryCfg.toBackoff(), ) is.True(err != nil) From 494f37e2bf6d5197dff3ee868dc703b7ff669eb9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ra=C3=BAl=20Barroso?= Date: Thu, 3 Oct 2024 13:23:16 +0200 Subject: [PATCH 32/40] clearer validation --- pkg/conduit/config.go | 2 +- pkg/conduit/config_test.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/conduit/config.go b/pkg/conduit/config.go index 3e2fab78d..e96b0738f 100644 --- a/pkg/conduit/config.go +++ b/pkg/conduit/config.go @@ -213,7 +213,7 @@ func (c Config) validateErrorRecovery() error { errs = append(errs, err) } if errRecoveryCfg.MaxRetries < lifecycle.InfiniteRetriesErrRecovery { - errs = append(errs, cerrors.Errorf(`"max-retries" can't be smaller than %d (infinite retries)`, lifecycle.InfiniteRetriesErrRecovery)) + errs = append(errs, cerrors.Errorf(`invalid "max-retries" value. It must be %d for infinite retries or >= 0`, lifecycle.InfiniteRetriesErrRecovery)) } if err := requirePositiveValue("healthy-after", errRecoveryCfg.HealthyAfter); err != nil { errs = append(errs, err) diff --git a/pkg/conduit/config_test.go b/pkg/conduit/config_test.go index da836b475..046551e46 100644 --- a/pkg/conduit/config_test.go +++ b/pkg/conduit/config_test.go @@ -202,7 +202,7 @@ func TestConfig_Validate(t *testing.T) { c.Pipelines.ErrorRecovery.MaxRetries = lifecycle.InfiniteRetriesErrRecovery - 1 return c }, - want: cerrors.New(`invalid error recovery config: "max-retries" can't be smaller than -1 (infinite retries)`), + want: cerrors.New(`invalid error recovery config: invalid "max-retries" value. It must be -1 for infinite retries or >= 0`), }, { name: "error recovery: with 0 max-retries ", From bded42eecd1b99de022a6b77e922cf95ef883f25 Mon Sep 17 00:00:00 2001 From: Haris Osmanagic Date: Thu, 3 Oct 2024 14:32:53 +0200 Subject: [PATCH 33/40] handle recovery --- pkg/lifecycle/service.go | 29 ++++++++++++++++++++--------- 1 file changed, 20 insertions(+), 9 deletions(-) diff --git a/pkg/lifecycle/service.go b/pkg/lifecycle/service.go index d042ba9e8..d01bddd91 100644 --- a/pkg/lifecycle/service.go +++ b/pkg/lifecycle/service.go @@ -773,16 +773,17 @@ func (s *Service) runPipeline(ctx context.Context, rp *runnablePipeline) error { switch err { case tomb.ErrStillAlive: // not an actual error, the pipeline stopped gracefully + err = nil + var status pipeline.Status if isGracefulShutdown.Load() { // it was triggered by a graceful shutdown of Conduit - if err := s.pipelines.UpdateStatus(ctx, rp.pipeline.ID, pipeline.StatusSystemStopped, ""); err != nil { - return err - } + status = pipeline.StatusSystemStopped } else { // it was manually triggered by a user - if err := s.pipelines.UpdateStatus(ctx, rp.pipeline.ID, pipeline.StatusUserStopped, ""); err != nil { - return err - } + status = pipeline.StatusUserStopped + } + if err := s.pipelines.UpdateStatus(ctx, rp.pipeline.ID, status, ""); err != nil { + return err } default: if cerrors.IsFatalError(err) { @@ -792,11 +793,21 @@ func (s *Service) runPipeline(ctx context.Context, rp *runnablePipeline) error { } } else { // try to recover the pipeline - if err := s.recoverPipeline(ctx, rp); err != nil { - if err := s.pipelines.UpdateStatus(ctx, rp.pipeline.ID, pipeline.StatusDegraded, fmt.Sprintf("%+v", err)); err != nil { - return err + if recoveryErr := s.recoverPipeline(ctx, rp); recoveryErr != nil { + s.logger. + Err(ctx, err). + Str(log.PipelineIDField, rp.pipeline.ID). + Msg("pipeline recovery failed stopped") + + if updateErr := s.pipelines.UpdateStatus(ctx, rp.pipeline.ID, pipeline.StatusDegraded, fmt.Sprintf("%+v", recoveryErr)); updateErr != nil { + return updateErr } } + // recovery was triggered, so no cleanup + // (remove running pipeline, notify failure handlers, etc.) + // is needed + // this is why we return nil to skip the cleanup below. + return nil } } From 5443f18034a204400378f5b6c60b4f1a7c1fecf4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Haris=20Osmanagi=C4=87?= Date: Thu, 3 Oct 2024 14:57:54 +0200 Subject: [PATCH 34/40] Pipeline recovery: test cases (#1873) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --------- Co-authored-by: Raúl Barroso --- docs/test-cases/pipeline-recovery.md | 227 ++++++++++++++++++++++++++ docs/test-cases/test-case-template.md | 27 +++ 2 files changed, 254 insertions(+) create mode 100644 docs/test-cases/pipeline-recovery.md create mode 100644 docs/test-cases/test-case-template.md diff --git a/docs/test-cases/pipeline-recovery.md b/docs/test-cases/pipeline-recovery.md new file mode 100644 index 000000000..1e9582ead --- /dev/null +++ b/docs/test-cases/pipeline-recovery.md @@ -0,0 +1,227 @@ +# Test case for the pipeline recovery feature + +## Test Case 01: Recovery not triggered for fatal error - DLQ + +**Priority** (low/medium/high): + +**Description**: +Recovery is not triggered when there is an error writing to a DLQ. + +**Automated** (yes/no) + +**Setup**: + +**Pipeline configuration file**: + +```yaml +``` + +**Steps**: + +**Expected Result**: + +**Additional comments**: + +--- + +## Test Case 02: Recovery not triggered for fatal error - processor + +**Priority** (low/medium/high): + +**Description**: +Recovery is not triggered when there is an error processing a record. + +**Automated** (yes/no) + +**Setup**: + +**Pipeline configuration file**: + +```yaml +``` + +**Steps**: + +**Expected Result**: + +**Additional comments**: + +--- + +## Test Case 03: Recovery not triggered - graceful shutdown + +**Priority** (low/medium/high): + +**Description**: +Recovery is not triggered when Conduit is shutting down gracefully (i.e. when +typing Ctrl+C in the terminal where Conduit is running, or sending a SIGINT). + +**Automated** (yes/no) + +**Setup**: + +**Pipeline configuration file**: + +```yaml +``` + +**Steps**: + +**Expected Result**: + +**Additional comments**: + +--- + +## Test Case 04: Recovery not triggered - user stopped pipeline + +**Priority** (low/medium/high): + +**Description**: +Recovery is not triggered if a user stops a pipeline (via the HTTP API's +`/v1/pipelines/pipeline-id/stop` endpoint). + +**Automated** (yes/no) + +**Setup**: + +**Pipeline configuration file**: + +```yaml +``` + +**Steps**: + +**Expected Result**: + +**Additional comments**: + +--- + +## Test Case 05: Recovery is configured by default + +**Priority** (low/medium/high): + +**Description**: +Pipeline recovery is configured by default. A failing pipeline will be restarted +a number of times without any additional configuration. + +**Automated** (yes/no) + +**Setup**: + +**Pipeline configuration file**: + +```yaml +version: "2.2" +pipelines: + - id: chaos-to-log + status: running + name: chaos-to-log + description: chaos source, error on read + connectors: + - id: chaos-source-1 + type: source + plugin: standalone:chaos + name: chaos-source-1 + settings: + readMode: error + - id: destination1 + type: destination + plugin: builtin:log + name: log-destination +``` + +**Steps**: + +**Expected Result**: + +**Additional comments**: + +--- + +## Test Case 06: Recovery not triggered on malformed pipeline + +**Priority** (low/medium/high): + +**Description**: +Recovery is not triggered for a malformed pipeline, e.g. when a connector is +missing. + +**Automated** (yes/no) + +**Setup**: + +**Pipeline configuration file**: + +```yaml +version: "2.2" +pipelines: + - id: nothing-to-log + status: running + name: nothing-to-log + description: no source + connectors: + - id: destination1 + type: destination + plugin: builtin:log + name: log-destination +``` + +**Steps**: + +**Expected Result**: + +**Additional comments**: + +--- + +## Test Case 07: Conduit exits with --pipelines.exit-on-degraded=true and a pipeline failing after recovery + +**Priority** (low/medium/high): + +**Description**: Given a Conduit instance with +`--pipelines.exit-on-degraded=true`, and a pipeline that's failing after the +maximum number of retries configured, Conduit should shut down gracefully. + +**Automated** (yes/no) + +**Setup**: + +**Pipeline configuration file**: + +```yaml +``` + +**Steps**: + +**Expected Result**: + +**Additional comments**: + +--- + +## Test Case 08: Conduit doesn't exit with --pipelines.exit-on-degraded=true and a pipeline that recovers after a few retries + +**Priority** (low/medium/high): + +**Description**: +Given a Conduit instance with `--pipelines.exit-on-degraded=true`, and a +pipeline that recovers after a few retries, Conduit should still be running. + +**Automated** (yes/no) + +**Setup**: + +**Pipeline configuration file**: + +```yaml +``` + +**Steps**: + +**Expected Result**: + +**Additional comments**: + +--- diff --git a/docs/test-cases/test-case-template.md b/docs/test-cases/test-case-template.md new file mode 100644 index 000000000..75c6a086d --- /dev/null +++ b/docs/test-cases/test-case-template.md @@ -0,0 +1,27 @@ + + +## Test Case 01: Test case title + +**Priority** (low/medium/high): + +**Description**: + +**Automated** (yes/no) + +**Setup**: + +**Pipeline configuration file**: + +```yaml +``` + +**Steps**: + +1. Step 1 +2. Step 2 + +**Expected Result**: + +**Additional comments**: + +--- From 9eb37de683c6cbc034c93e0098dc859498c0c783 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Haris=20Osmanagi=C4=87?= Date: Thu, 3 Oct 2024 15:03:25 +0200 Subject: [PATCH 35/40] Pipeline recovery tests: add test pipeline, add test case (#1876) --- docs/test-cases/pipeline-recovery.md | 52 ++++++++++++++++++++++++++++ 1 file changed, 52 insertions(+) diff --git a/docs/test-cases/pipeline-recovery.md b/docs/test-cases/pipeline-recovery.md index 1e9582ead..432cbbb2d 100644 --- a/docs/test-cases/pipeline-recovery.md +++ b/docs/test-cases/pipeline-recovery.md @@ -1,3 +1,4 @@ + # Test case for the pipeline recovery feature ## Test Case 01: Recovery not triggered for fatal error - DLQ @@ -14,6 +15,30 @@ Recovery is not triggered when there is an error writing to a DLQ. **Pipeline configuration file**: ```yaml +version: "2.2" +pipelines: + - id: file-pipeline + status: running + name: file-pipeline + description: test pipline + connectors: + - id: chaos-src + type: source + plugin: standalone:chaos + name: chaos-src + settings: + readMode: error + - id: log-dst + type: destination + plugin: builtin:log + log: file-dst + dead-letter-queue: + plugin: "builtin:postgres" + settings: + table: non_existing_table_so_that_dlq_fails + url: postgresql://meroxauser:meroxapass@localhost/meroxadb?sslmode=disable + window-size: 3 + window-nack-threshold: 2 ``` **Steps**: @@ -225,3 +250,30 @@ pipeline that recovers after a few retries, Conduit should still be running. **Additional comments**: --- + +## Test Case 09: Conduit exits with --pipelines.exit-on-degraded=true, --pipelines.error-recovery.max-retries=0, and a degraded pipeline + +**Priority** (low/medium/high): + +**Description**: +Given a Conduit instance with +`--pipelines.exit-on-degraded=true --pipelines.error-recovery.max-retries=0`, +and a pipeline that goes into a degraded state, the Conduit instance will +gracefully shut down. This is due `max-retries=0` disabling the recovery. + +**Automated** (yes/no) + +**Setup**: + +**Pipeline configuration file**: + +```yaml +``` + +**Steps**: + +**Expected Result**: + +**Additional comments**: + +--- From b1a07801502f61b6a7063d27f5c7fae2ab72183c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ra=C3=BAl=20Barroso?= Date: Thu, 3 Oct 2024 15:29:54 +0200 Subject: [PATCH 36/40] need to return err --- docs/test-cases/pipeline-recovery.md | 2 +- pkg/lifecycle/service.go | 2 ++ 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/docs/test-cases/pipeline-recovery.md b/docs/test-cases/pipeline-recovery.md index 432cbbb2d..cab8f48cb 100644 --- a/docs/test-cases/pipeline-recovery.md +++ b/docs/test-cases/pipeline-recovery.md @@ -20,7 +20,7 @@ pipelines: - id: file-pipeline status: running name: file-pipeline - description: test pipline + description: test pipeline connectors: - id: chaos-src type: source diff --git a/pkg/lifecycle/service.go b/pkg/lifecycle/service.go index d01bddd91..067bd285f 100644 --- a/pkg/lifecycle/service.go +++ b/pkg/lifecycle/service.go @@ -802,6 +802,8 @@ func (s *Service) runPipeline(ctx context.Context, rp *runnablePipeline) error { if updateErr := s.pipelines.UpdateStatus(ctx, rp.pipeline.ID, pipeline.StatusDegraded, fmt.Sprintf("%+v", recoveryErr)); updateErr != nil { return updateErr } + + return recoveryErr } // recovery was triggered, so no cleanup // (remove running pipeline, notify failure handlers, etc.) From d5dc4ebbcc7b2f636ee199bd424bb8c5f67a23a9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ra=C3=BAl=20Barroso?= Date: Thu, 3 Oct 2024 15:36:19 +0200 Subject: [PATCH 37/40] assign error --- pkg/lifecycle/service.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/pkg/lifecycle/service.go b/pkg/lifecycle/service.go index 067bd285f..afe7cac44 100644 --- a/pkg/lifecycle/service.go +++ b/pkg/lifecycle/service.go @@ -803,13 +803,13 @@ func (s *Service) runPipeline(ctx context.Context, rp *runnablePipeline) error { return updateErr } - return recoveryErr + // we assign it to err so it's returned and notified by the cleanup function + err = recoveryErr + } else { + // recovery was triggered didn't error, so no cleanup + // this is why we return nil to skip the cleanup below. + return nil } - // recovery was triggered, so no cleanup - // (remove running pipeline, notify failure handlers, etc.) - // is needed - // this is why we return nil to skip the cleanup below. - return nil } } From 234c33a443a7462967f44e375ade838295dcaa0a Mon Sep 17 00:00:00 2001 From: Haris Osmanagic Date: Thu, 3 Oct 2024 18:22:30 +0200 Subject: [PATCH 38/40] updated test cases --- docs/test-cases/pipeline-recovery.md | 57 ++++++++++++++++++++++++++-- pkg/lifecycle/service.go | 2 +- 2 files changed, 55 insertions(+), 4 deletions(-) diff --git a/docs/test-cases/pipeline-recovery.md b/docs/test-cases/pipeline-recovery.md index 432cbbb2d..6b4dee553 100644 --- a/docs/test-cases/pipeline-recovery.md +++ b/docs/test-cases/pipeline-recovery.md @@ -1,12 +1,14 @@ # Test case for the pipeline recovery feature -## Test Case 01: Recovery not triggered for fatal error - DLQ +## Test Case 01: Recovery triggered for on a DLQ write error **Priority** (low/medium/high): **Description**: -Recovery is not triggered when there is an error writing to a DLQ. +Recovery is triggered when there is an error writing to a DLQ. As for a normal +destination, a DLQ write error can be a temporary error that can be solved after +a retry. **Automated** (yes/no) @@ -20,7 +22,7 @@ pipelines: - id: file-pipeline status: running name: file-pipeline - description: test pipline + description: dlq write error connectors: - id: chaos-src type: source @@ -277,3 +279,52 @@ gracefully shut down. This is due `max-retries=0` disabling the recovery. **Additional comments**: --- + +## Test Case 10: Recovery not triggered for fatal error - DLQ threshold exceeded + +**Priority** (low/medium/high): + +**Description**: +Recovery is not triggered when the DLQ threshold is exceeded. + +**Automated** (yes/no) + +**Setup**: + +**Pipeline configuration file**: + +```yaml +version: "2.2" +pipelines: + - id: pipeline1 + status: running + name: pipeline1 + description: chaos destination with write errors, DLQ threshold specified + connectors: + - id: generator-src + type: source + plugin: builtin:generator + name: generator-src + settings: + format.type: structured + format.options.id: int + format.options.name: string + rate: "1" + - id: chaos-destination-1 + type: destination + plugin: standalone:chaos + name: chaos-destination-1 + settings: + writeMode: error + dead-letter-queue: + window-size: 2 + window-nack-threshold: 1 +``` + +**Steps**: + +**Expected Result**: + +**Additional comments**: + +--- diff --git a/pkg/lifecycle/service.go b/pkg/lifecycle/service.go index d01bddd91..708ddb652 100644 --- a/pkg/lifecycle/service.go +++ b/pkg/lifecycle/service.go @@ -215,7 +215,7 @@ func (s *Service) Start( // It'll check the number of times the pipeline has been restarted and the duration of the backoff. // When the pipeline has reached out the maximum number of retries, it'll return a fatal error. func (s *Service) StartWithBackoff(ctx context.Context, rp *runnablePipeline) error { - s.logger.Trace(ctx).Str(log.PipelineIDField, rp.pipeline.ID).Msg("restarting with backoff") + s.logger.Info(ctx).Str(log.PipelineIDField, rp.pipeline.ID).Msg("restarting with backoff") attempt := int64(rp.backoffCfg.Attempt()) duration := rp.backoffCfg.Duration() From 33851ed9b69c9b0ff03aee0835528f8857fd85fb Mon Sep 17 00:00:00 2001 From: Haris Osmanagic Date: Thu, 3 Oct 2024 18:50:46 +0200 Subject: [PATCH 39/40] reuse method --- pkg/lifecycle/service.go | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/pkg/lifecycle/service.go b/pkg/lifecycle/service.go index c26a3a88a..dcc7ba4e8 100644 --- a/pkg/lifecycle/service.go +++ b/pkg/lifecycle/service.go @@ -185,12 +185,7 @@ func (s *Service) Start( oldRp, ok := s.runningPipelines.Get(pipelineID) if !ok { // default backoff configuration - backoffCfg = &backoff.Backoff{ - Min: s.errRecoveryCfg.MinDelay, - Max: s.errRecoveryCfg.MaxDelay, - Factor: float64(s.errRecoveryCfg.BackoffFactor), - Jitter: true, - } + backoffCfg = s.errRecoveryCfg.toBackoff() } else { backoffCfg = &oldRp.backoffCfg } From 63be97b008415b74ec3075881feb8a627c4d5337 Mon Sep 17 00:00:00 2001 From: Haris Osmanagic Date: Fri, 4 Oct 2024 09:11:56 +0200 Subject: [PATCH 40/40] use log.AttemptField --- pkg/lifecycle/service.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/lifecycle/service.go b/pkg/lifecycle/service.go index dcc7ba4e8..9e0ef8948 100644 --- a/pkg/lifecycle/service.go +++ b/pkg/lifecycle/service.go @@ -234,7 +234,7 @@ func (s *Service) StartWithBackoff(ctx context.Context, rp *runnablePipeline) er if rp.pipeline.GetStatus() == pipeline.StatusRunning { s.logger.Debug(ctx). Str(log.PipelineIDField, rp.pipeline.ID). - Int64("attempt", attempt). + Int64(log.AttemptField, attempt). Int("backoffRetry.count", s.errRecoveryCfg.BackoffFactor). Int64("backoffRetry.duration", duration.Milliseconds()). Msg("pipeline recovered")