Skip to content

Commit

Permalink
Merge branch 'raul/restart-with-recovery' into haris/pipeline-recover…
Browse files Browse the repository at this point in the history
…y-tests
  • Loading branch information
hariso committed Oct 3, 2024
2 parents 188ae1d + 5443f18 commit e14d9a3
Show file tree
Hide file tree
Showing 4 changed files with 60 additions and 47 deletions.
2 changes: 1 addition & 1 deletion pkg/conduit/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ func (c Config) validateErrorRecovery() error {
errs = append(errs, err)
}
if errRecoveryCfg.MaxRetries < lifecycle.InfiniteRetriesErrRecovery {
errs = append(errs, cerrors.Errorf(`"max-retries" can't be smaller than %d (infinite retries)`, lifecycle.InfiniteRetriesErrRecovery))
errs = append(errs, cerrors.Errorf(`invalid "max-retries" value. It must be %d for infinite retries or >= 0`, lifecycle.InfiniteRetriesErrRecovery))
}
if err := requirePositiveValue("healthy-after", errRecoveryCfg.HealthyAfter); err != nil {
errs = append(errs, err)
Expand Down
2 changes: 1 addition & 1 deletion pkg/conduit/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ func TestConfig_Validate(t *testing.T) {
c.Pipelines.ErrorRecovery.MaxRetries = lifecycle.InfiniteRetriesErrRecovery - 1
return c
},
want: cerrors.New(`invalid error recovery config: "max-retries" can't be smaller than -1 (infinite retries)`),
want: cerrors.New(`invalid error recovery config: invalid "max-retries" value. It must be -1 for infinite retries or >= 0`),
},
{
name: "error recovery: with 0 max-retries ",
Expand Down
100 changes: 55 additions & 45 deletions pkg/lifecycle/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,15 @@ type ErrRecoveryCfg struct {
HealthyAfter time.Duration
}

func (e *ErrRecoveryCfg) toBackoff() *backoff.Backoff {
return &backoff.Backoff{
Min: e.MinDelay,
Max: e.MaxDelay,
Factor: float64(e.BackoffFactor),
Jitter: true,
}
}

// Service manages pipelines.
type Service struct {
logger log.CtxLogger
Expand Down Expand Up @@ -168,10 +177,25 @@ func (s *Service) Start(
}

s.logger.Debug(ctx).Str(log.PipelineIDField, pl.ID).Msg("starting pipeline")

s.logger.Trace(ctx).Str(log.PipelineIDField, pl.ID).Msg("building nodes")

rp, err := s.buildRunnablePipeline(ctx, pl)
var backoffCfg *backoff.Backoff

// We check if the pipeline was previously running and get the backoff configuration from it.
oldRp, ok := s.runningPipelines.Get(pipelineID)
if !ok {
// default backoff configuration
backoffCfg = &backoff.Backoff{
Min: s.errRecoveryCfg.MinDelay,
Max: s.errRecoveryCfg.MaxDelay,
Factor: float64(s.errRecoveryCfg.BackoffFactor),
Jitter: true,
}
} else {
backoffCfg = &oldRp.backoffCfg
}

rp, err := s.buildRunnablePipeline(ctx, pl, backoffCfg)
if err != nil {
return cerrors.Errorf("could not build nodes for pipeline %s: %w", pl.ID, err)
}
Expand All @@ -187,32 +211,10 @@ func (s *Service) Start(
return nil
}

// Restart replaces the nodes of a pipeline and starts it again.
func (s *Service) Restart(ctx context.Context, rp *runnablePipeline) error {
s.logger.Debug(ctx).Str(log.PipelineIDField, rp.pipeline.ID).Msg("restarting pipeline")
s.logger.Trace(ctx).Str(log.PipelineIDField, rp.pipeline.ID).Msg("swapping nodes")

nodes, err := s.buildNodes(ctx, rp.pipeline)
if err != nil {
return cerrors.Errorf("could not build new nodes for pipeline %s: %w", rp.pipeline.ID, err)
}

// Replaces the old nodes with the new ones.
rp.n = nodes

s.logger.Trace(ctx).Str(log.PipelineIDField, rp.pipeline.ID).Msg("running nodes")
if err := s.runPipeline(ctx, rp); err != nil {
return cerrors.Errorf("failed to run pipeline %s: %w", rp.pipeline.ID, err)
}
s.logger.Info(ctx).Str(log.PipelineIDField, rp.pipeline.ID).Msg("pipeline restarted ")

return nil
}

// RestartWithBackoff restarts a pipeline with a backoff.
// StartWithBackoff starts a pipeline with a backoff.
// It'll check the number of times the pipeline has been restarted and the duration of the backoff.
// When the pipeline has reached out the maximum number of retries, it'll return a fatal error.
func (s *Service) RestartWithBackoff(ctx context.Context, rp *runnablePipeline) error {
func (s *Service) StartWithBackoff(ctx context.Context, rp *runnablePipeline) error {
s.logger.Trace(ctx).Str(log.PipelineIDField, rp.pipeline.ID).Msg("restarting with backoff")

attempt := int64(rp.backoffCfg.Attempt())
Expand Down Expand Up @@ -244,7 +246,7 @@ func (s *Service) RestartWithBackoff(ctx context.Context, rp *runnablePipeline)
return nil
}

return s.Restart(ctx, rp)
return s.Start(ctx, rp.pipeline.ID)
}

// Stop will attempt to gracefully stop a given pipeline by calling each node's
Expand Down Expand Up @@ -430,15 +432,8 @@ func (s *Service) buildNodes(ctx context.Context, pl *pipeline.Instance) ([]stre
func (s *Service) buildRunnablePipeline(
ctx context.Context,
pl *pipeline.Instance,
backoffCfg *backoff.Backoff,
) (*runnablePipeline, error) {
// Each pipeline will utilize this backoff configuration to recover from errors.
backoffCfg := &backoff.Backoff{
Min: s.errRecoveryCfg.MinDelay,
Max: s.errRecoveryCfg.MaxDelay,
Factor: float64(s.errRecoveryCfg.BackoffFactor),
Jitter: true,
}

nodes, err := s.buildNodes(ctx, pl)
if err != nil {
return nil, err
Expand Down Expand Up @@ -756,6 +751,10 @@ func (s *Service) runPipeline(ctx context.Context, rp *runnablePipeline) error {
}

// TODO: When it's recovering, we should only update the status back to running once HealthyAfter has passed.
// now:
// running -> (error) -> recovering (restart) -> running
// future (with the HealthyAfter mechanism):
// running -> (error) -> recovering (restart) -> recovering (wait for HealthyAfter) -> running
err := s.pipelines.UpdateStatus(ctx, rp.pipeline.ID, pipeline.StatusRunning, "")
if err != nil {
return err
Expand All @@ -774,16 +773,17 @@ func (s *Service) runPipeline(ctx context.Context, rp *runnablePipeline) error {
switch err {
case tomb.ErrStillAlive:
// not an actual error, the pipeline stopped gracefully
err = nil
var status pipeline.Status
if isGracefulShutdown.Load() {
// it was triggered by a graceful shutdown of Conduit
if err := s.pipelines.UpdateStatus(ctx, rp.pipeline.ID, pipeline.StatusSystemStopped, ""); err != nil {
return err
}
status = pipeline.StatusSystemStopped
} else {
// it was manually triggered by a user
if err := s.pipelines.UpdateStatus(ctx, rp.pipeline.ID, pipeline.StatusUserStopped, ""); err != nil {
return err
}
status = pipeline.StatusUserStopped
}
if err := s.pipelines.UpdateStatus(ctx, rp.pipeline.ID, status, ""); err != nil {
return err
}
default:
if cerrors.IsFatalError(err) {
Expand All @@ -793,11 +793,21 @@ func (s *Service) runPipeline(ctx context.Context, rp *runnablePipeline) error {
}
} else {
// try to recover the pipeline
if err := s.recoverPipeline(ctx, rp); err != nil {
if err := s.pipelines.UpdateStatus(ctx, rp.pipeline.ID, pipeline.StatusDegraded, fmt.Sprintf("%+v", err)); err != nil {
return err
if recoveryErr := s.recoverPipeline(ctx, rp); recoveryErr != nil {
s.logger.
Err(ctx, err).
Str(log.PipelineIDField, rp.pipeline.ID).
Msg("pipeline recovery failed stopped")

if updateErr := s.pipelines.UpdateStatus(ctx, rp.pipeline.ID, pipeline.StatusDegraded, fmt.Sprintf("%+v", recoveryErr)); updateErr != nil {
return updateErr
}
}
// recovery was triggered, so no cleanup
// (remove running pipeline, notify failure handlers, etc.)
// is needed
// this is why we return nil to skip the cleanup below.
return nil
}
}

Expand Down Expand Up @@ -825,7 +835,7 @@ func (s *Service) recoverPipeline(ctx context.Context, rp *runnablePipeline) err
}

// Exit the goroutine and attempt to restart the pipeline
return s.RestartWithBackoff(ctx, rp)
return s.StartWithBackoff(ctx, rp)
}

// notify notifies all registered FailureHandlers about an error.
Expand Down
3 changes: 3 additions & 0 deletions pkg/lifecycle/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ func TestServiceLifecycle_buildRunnablePipeline(t *testing.T) {
got, err := ls.buildRunnablePipeline(
ctx,
pl,
ls.errRecoveryCfg.toBackoff(),
)

is.NoErr(err)
Expand Down Expand Up @@ -173,6 +174,7 @@ func TestService_buildRunnablePipeline_NoSourceNode(t *testing.T) {
got, err := ls.buildRunnablePipeline(
ctx,
pl,
ls.errRecoveryCfg.toBackoff(),
)

is.True(err != nil)
Expand Down Expand Up @@ -220,6 +222,7 @@ func TestService_buildRunnablePipeline_NoDestinationNode(t *testing.T) {
got, err := ls.buildRunnablePipeline(
ctx,
pl,
ls.errRecoveryCfg.toBackoff(),
)

is.True(err != nil)
Expand Down

0 comments on commit e14d9a3

Please sign in to comment.