diff --git a/go.mod b/go.mod index 657849e77..880e5f2bd 100644 --- a/go.mod +++ b/go.mod @@ -49,6 +49,7 @@ require ( go.uber.org/multierr v1.11.0 go.uber.org/zap v1.26.0 golang.org/x/crypto v0.27.0 + golang.org/x/exp v0.0.0-20240531132922-fd00a4e0eefc golang.org/x/net v0.29.0 golang.org/x/oauth2 v0.21.0 golang.org/x/sync v0.8.0 @@ -197,7 +198,6 @@ require ( github.com/yudai/golcs v0.0.0-20170316035057-ecda9a501e82 // indirect go.mongodb.org/mongo-driver v1.15.0 // indirect golang.org/x/arch v0.7.0 // indirect - golang.org/x/exp v0.0.0-20240531132922-fd00a4e0eefc // indirect golang.org/x/mod v0.20.0 // indirect golang.org/x/sys v0.25.0 // indirect golang.org/x/term v0.24.0 // indirect diff --git a/pkg/reconciler/pipeline/controller.go b/pkg/reconciler/pipeline/controller.go index c2f908052..3ae7c49c0 100644 --- a/pkg/reconciler/pipeline/controller.go +++ b/pkg/reconciler/pipeline/controller.go @@ -24,6 +24,7 @@ import ( "github.com/imdario/mergo" "go.uber.org/zap" + "golang.org/x/exp/maps" appv1 "k8s.io/api/apps/v1" batchv1 "k8s.io/api/batch/v1" corev1 "k8s.io/api/core/v1" @@ -181,8 +182,8 @@ func (r *pipelineReconciler) reconcile(ctx context.Context, pl *dfv1.Pipeline) ( } // check if any changes related to pause/resume lifecycle for the pipeline - if isLifecycleChange(pl) { - oldPhase := pl.Status.Phase + oldPhase := pl.Status.Phase + if isLifecycleChange(pl) && oldPhase != pl.Spec.Lifecycle.GetDesiredPhase() { requeue, err := r.updateDesiredState(ctx, pl) if err != nil { logMsg := fmt.Sprintf("Updated desired pipeline phase failed: %v", zap.Error(err)) @@ -611,7 +612,7 @@ func buildVertices(pl *dfv1.Pipeline) map[string]dfv1.Vertex { copyVertexTemplate(pl, vCopy) copyVertexLimits(pl, vCopy) replicas := int32(1) - // If the desired phase is paused or we are in the middle of pausing we should not start any vertex replicas + // If the desired phase is paused, or we are in the middle of pausing we should not start any vertex replicas if isLifecycleChange(pl) { replicas = int32(0) } else if v.IsReduceUDF() { @@ -830,39 +831,48 @@ func (r *pipelineReconciler) resumePipeline(ctx context.Context, pl *dfv1.Pipeli } func (r *pipelineReconciler) pausePipeline(ctx context.Context, pl *dfv1.Pipeline) (bool, error) { - // check that annotations / pause timestamp annotation exist + var ( + drainCompleted = false + daemonClient daemonclient.DaemonClient + errWhileDrain error + ) + pl.Status.MarkPhasePausing() + if pl.GetAnnotations() == nil || pl.GetAnnotations()[dfv1.KeyPauseTimestamp] == "" { + _, err := r.scaleDownSourceVertices(ctx, pl) + if err != nil { + // If there's an error requeue the request + return true, err + } patchJson := `{"metadata":{"annotations":{"` + dfv1.KeyPauseTimestamp + `":"` + time.Now().Format(time.RFC3339) + `"}}}` - if err := r.client.Patch(ctx, pl, client.RawPatch(types.MergePatchType, []byte(patchJson))); err != nil && !apierrors.IsNotFound(err) { + if err = r.client.Patch(ctx, pl, client.RawPatch(types.MergePatchType, []byte(patchJson))); err != nil && !apierrors.IsNotFound(err) { return true, err } + // This is to give some time to process the new messages, + // otherwise check IsDrained directly may get incorrect information + return true, nil } - pl.Status.MarkPhasePausing() - updated, err := r.scaleDownSourceVertices(ctx, pl) - if err != nil || updated { - // If there's an error, or scaling down happens, requeue the request - // This is to give some time to process the new messages, otherwise check IsDrained directly may get incorrect information - return updated, err - } - - var daemonError error - var drainCompleted = false - + // Check if all the source vertex pods have scaled down to zero + sourcePodsTerminated, err := r.noSourceVertexPodsRunning(ctx, pl) + // If the sources have scaled down successfully then check for the buffer information. // Check for the daemon to obtain the buffer draining information, in case we see an error trying to // retrieve this we do not exit prematurely to allow honoring the pause timeout for a consistent error // - In case the timeout has not occurred we would trigger a requeue // - If the timeout has occurred even after getting the drained error, we will try to pause the pipeline - daemonClient, daemonError := daemonclient.NewGRPCDaemonServiceClient(pl.GetDaemonServiceURL()) - if daemonClient != nil { - defer func() { - _ = daemonClient.Close() - }() - drainCompleted, err = daemonClient.IsDrained(ctx, pl.Name) - if err != nil { - daemonError = err + if sourcePodsTerminated { + daemonClient, err = daemonclient.NewGRPCDaemonServiceClient(pl.GetDaemonServiceURL()) + if daemonClient != nil { + defer func() { + _ = daemonClient.Close() + }() + drainCompleted, err = daemonClient.IsDrained(ctx, pl.Name) } } + if err != nil { + errWhileDrain = err + } + pauseTimestamp, err := time.Parse(time.RFC3339, pl.GetAnnotations()[dfv1.KeyPauseTimestamp]) if err != nil { return false, err @@ -874,8 +884,8 @@ func (r *pipelineReconciler) pausePipeline(ctx context.Context, pl *dfv1.Pipelin if err != nil { return true, err } - if daemonError != nil { - r.logger.Errorw("Error in fetching Drained status, Pausing due to timeout", zap.Error(daemonError)) + if errWhileDrain != nil { + r.logger.Errorw("Errors encountered while pausing, moving to paused after timeout", zap.Error(errWhileDrain)) } // if the drain completed successfully, then set the DrainedOnPause field to true if drainCompleted { @@ -884,7 +894,20 @@ func (r *pipelineReconciler) pausePipeline(ctx context.Context, pl *dfv1.Pipelin pl.Status.MarkPhasePaused() return false, nil } - return true, daemonError + return true, err +} + +// noSourceVertexPodsRunning checks whether any source vertex has running replicas +func (r *pipelineReconciler) noSourceVertexPodsRunning(ctx context.Context, pl *dfv1.Pipeline) (bool, error) { + sources := pl.Spec.GetSourcesByName() + pods := corev1.PodList{} + label := fmt.Sprintf("%s=%s, %s in (%s)", dfv1.KeyPipelineName, pl.Name, + dfv1.KeyVertexName, strings.Join(maps.Keys(sources), ",")) + selector, _ := labels.Parse(label) + if err := r.client.List(ctx, &pods, &client.ListOptions{Namespace: pl.Namespace, LabelSelector: selector}); err != nil { + return false, err + } + return len(pods.Items) == 0, nil } func (r *pipelineReconciler) scaleDownSourceVertices(ctx context.Context, pl *dfv1.Pipeline) (bool, error) { @@ -965,6 +988,8 @@ func (r *pipelineReconciler) checkChildrenResourceStatus(ctx context.Context, pi return } } + // if all conditions are True, clear the status message. + pipeline.Status.Message = "" }() // get the daemon deployment and update the status of it to the pipeline