From 45e57404c6b5aa043f4a613a84f2d0fe8f212d52 Mon Sep 17 00:00:00 2001 From: Wantong Jiang Date: Tue, 17 Dec 2024 17:55:45 +0000 Subject: [PATCH 1/2] implement stagedUpdateRun execution --- pkg/controllers/updaterun/controller.go | 80 ++- .../updaterun/controller_integration_test.go | 39 +- pkg/controllers/updaterun/execution.go | 482 +++++++++++++++++ .../updaterun/execution_integration_test.go | 499 ++++++++++++++++++ pkg/controllers/updaterun/execution_test.go | 334 ++++++++++++ .../initialization_integration_test.go | 51 +- pkg/controllers/updaterun/validation.go | 29 +- .../updaterun/validation_integration_test.go | 63 ++- pkg/controllers/updaterun/validation_test.go | 8 +- 9 files changed, 1491 insertions(+), 94 deletions(-) create mode 100644 pkg/controllers/updaterun/execution.go create mode 100644 pkg/controllers/updaterun/execution_integration_test.go create mode 100644 pkg/controllers/updaterun/execution_test.go diff --git a/pkg/controllers/updaterun/controller.go b/pkg/controllers/updaterun/controller.go index 253815aaf..5f6c23d72 100644 --- a/pkg/controllers/updaterun/controller.go +++ b/pkg/controllers/updaterun/controller.go @@ -13,6 +13,7 @@ import ( "time" "k8s.io/apimachinery/pkg/api/meta" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/tools/record" "k8s.io/client-go/util/workqueue" @@ -40,10 +41,6 @@ var ( // errInitializedFailed is the error when the ClusterStagedUpdateRun fails to initialize. // It is a wrapped error of errStagedUpdatedAborted, because some initialization functions are reused in the validation step. errInitializedFailed = fmt.Errorf("%w: failed to initialize the clusterStagedUpdateRun", errStagedUpdatedAborted) - - // stageUpdatingWaitTime is the time to wait before rechecking the stage update status. - // Put it as a variable for convenient testing. - stageUpdatingWaitTime = 60 * time.Second ) // Reconciler reconciles a ClusterStagedUpdateRun object. @@ -127,10 +124,35 @@ func (r *Reconciler) Reconcile(ctx context.Context, req runtime.Request) (runtim klog.V(2).InfoS("The clusterStagedUpdateRun is validated", "clusterStagedUpdateRun", runObjRef) } - // TODO(wantjian): execute the clusterStagedUpdateRun and fix the requeue time. - klog.V(2).InfoS("Executing the clusterStagedUpdateRun", "clusterStagedUpdateRun", runObjRef, "updatingStageIndex", updatingStageIndex, - "toBeUpdatedBindings count", len(toBeUpdatedBindings), "toBeDeletedBindings count", len(toBeDeletedBindings)) - return runtime.Result{RequeueAfter: stageUpdatingWaitTime}, nil + // The previous run is completed but the update to the status failed. + if updatingStageIndex == -1 { + klog.V(2).InfoS("The clusterStagedUpdateRun is completed", "clusterStagedUpdateRun", runObjRef) + return runtime.Result{}, r.recordUpdateRunSucceeded(ctx, &updateRun) + } + + // Execute the updateRun. + klog.V(2).InfoS("Continue to execute the clusterStagedUpdateRun", "updatingStageIndex", updatingStageIndex, "clusterStagedUpdateRun", runObjRef) + finished, waitTime, execErr := r.execute(ctx, &updateRun, updatingStageIndex, toBeUpdatedBindings, toBeDeletedBindings) + if execErr != nil && errors.Is(execErr, errStagedUpdatedAborted) { + // errStagedUpdatedAborted cannot be retried. + return runtime.Result{}, r.recordUpdateRunFailed(ctx, &updateRun, execErr.Error()) + } + + if finished { + klog.V(2).InfoS("The clusterStagedUpdateRun is completed", "clusterStagedUpdateRun", runObjRef) + return runtime.Result{}, r.recordUpdateRunSucceeded(ctx, &updateRun) + } + + // The execution is not finished yet or it encounters a retriable error. + // We need to record the status and requeue. + if updateErr := r.recordUpdateRunStatus(ctx, &updateRun); updateErr != nil { + return runtime.Result{}, updateErr + } + klog.V(2).InfoS("The clusterStagedUpdateRun is not finished yet", "requeueWaitTime", waitTime, "execErr", execErr, "clusterStagedUpdateRun", runObjRef) + if execErr != nil { + return runtime.Result{}, execErr + } + return runtime.Result{Requeue: true, RequeueAfter: waitTime}, nil } // handleDelete handles the deletion of the clusterStagedUpdateRun object. @@ -162,6 +184,48 @@ func (r *Reconciler) ensureFinalizer(ctx context.Context, updateRun *placementv1 return r.Update(ctx, updateRun, client.FieldOwner(utils.UpdateRunControllerFieldManagerName)) } +// recordUpdateRunSucceeded records the succeeded condition in the ClusterStagedUpdateRun status. +func (r *Reconciler) recordUpdateRunSucceeded(ctx context.Context, updateRun *placementv1alpha1.ClusterStagedUpdateRun) error { + meta.SetStatusCondition(&updateRun.Status.Conditions, metav1.Condition{ + Type: string(placementv1alpha1.StagedUpdateRunConditionSucceeded), + Status: metav1.ConditionTrue, + ObservedGeneration: updateRun.Generation, + Reason: condition.UpdateRunSucceededReason, + }) + if updateErr := r.Client.Status().Update(ctx, updateRun); updateErr != nil { + klog.ErrorS(updateErr, "Failed to update the ClusterStagedUpdateRun status as succeeded", "clusterStagedUpdateRun", klog.KObj(updateRun)) + // updateErr can be retried. + return controller.NewUpdateIgnoreConflictError(updateErr) + } + return nil +} + +// recordUpdateRunFailed records the failed condition in the ClusterStagedUpdateRun status. +func (r *Reconciler) recordUpdateRunFailed(ctx context.Context, updateRun *placementv1alpha1.ClusterStagedUpdateRun, message string) error { + meta.SetStatusCondition(&updateRun.Status.Conditions, metav1.Condition{ + Type: string(placementv1alpha1.StagedUpdateRunConditionSucceeded), + Status: metav1.ConditionFalse, + ObservedGeneration: updateRun.Generation, + Reason: condition.UpdateRunFailedReason, + Message: message, + }) + if updateErr := r.Client.Status().Update(ctx, updateRun); updateErr != nil { + klog.ErrorS(updateErr, "Failed to update the ClusterStagedUpdateRun status as failed", "clusterStagedUpdateRun", klog.KObj(updateRun)) + // updateErr can be retried. + return controller.NewUpdateIgnoreConflictError(updateErr) + } + return nil +} + +// recordUpdateRunStatus records the ClusterStagedUpdateRun status. +func (r *Reconciler) recordUpdateRunStatus(ctx context.Context, updateRun *placementv1alpha1.ClusterStagedUpdateRun) error { + if updateErr := r.Client.Status().Update(ctx, updateRun); updateErr != nil { + klog.ErrorS(updateErr, "Failed to update the ClusterStagedUpdateRun status", "clusterStagedUpdateRun", klog.KObj(updateRun)) + return controller.NewUpdateIgnoreConflictError(updateErr) + } + return nil +} + // SetupWithManager sets up the controller with the Manager. func (r *Reconciler) SetupWithManager(mgr runtime.Manager) error { r.recorder = mgr.GetEventRecorderFor("clusterresource-stagedupdaterun-controller") diff --git a/pkg/controllers/updaterun/controller_integration_test.go b/pkg/controllers/updaterun/controller_integration_test.go index d546a3905..1f655d539 100644 --- a/pkg/controllers/updaterun/controller_integration_test.go +++ b/pkg/controllers/updaterun/controller_integration_test.go @@ -21,6 +21,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" + "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" clusterv1beta1 "go.goms.io/fleet/apis/cluster/v1beta1" @@ -327,7 +328,7 @@ func generateTestClusterStagedUpdateStrategy() *placementv1alpha1.ClusterStagedU { Type: placementv1alpha1.AfterStageTaskTypeTimedWait, WaitTime: metav1.Duration{ - Duration: time.Minute * 10, + Duration: time.Second * 4, }, }, }, @@ -469,7 +470,7 @@ func validateApprovalRequestCount(ctx context.Context, count int) { }, timeout, interval).Should(Equal(count), "approval requests count mismatch") } -func generateTrueCondition(updateRun *placementv1alpha1.ClusterStagedUpdateRun, condType any) metav1.Condition { +func generateTrueCondition(obj client.Object, condType any) metav1.Condition { reason, typeStr := "", "" switch cond := condType.(type) { case placementv1alpha1.StagedUpdateRunConditionType: @@ -498,16 +499,38 @@ func generateTrueCondition(updateRun *placementv1alpha1.ClusterStagedUpdateRun, reason = condition.ClusterUpdatingSucceededReason } typeStr = string(cond) + case placementv1alpha1.AfterStageTaskConditionType: + switch cond { + case placementv1alpha1.AfterStageTaskConditionWaitTimeElapsed: + reason = condition.AfterStageTaskWaitTimeElapsedReason + case placementv1alpha1.AfterStageTaskConditionApprovalRequestCreated: + reason = condition.AfterStageTaskApprovalRequestCreatedReason + case placementv1alpha1.AfterStageTaskConditionApprovalRequestApproved: + reason = condition.AfterStageTaskApprovalRequestApprovedReason + } + typeStr = string(cond) + case placementv1alpha1.ApprovalRequestConditionType: + switch cond { + case placementv1alpha1.ApprovalRequestConditionApproved: + reason = "LGTM" + } + typeStr = string(cond) + case placementv1beta1.ResourceBindingConditionType: + switch cond { + case placementv1beta1.ResourceBindingAvailable: + reason = condition.AvailableReason + } + typeStr = string(cond) } return metav1.Condition{ Status: metav1.ConditionTrue, Type: typeStr, - ObservedGeneration: updateRun.Generation, + ObservedGeneration: obj.GetGeneration(), Reason: reason, } } -func generateFalseCondition(updateRun *placementv1alpha1.ClusterStagedUpdateRun, condType any) metav1.Condition { +func generateFalseCondition(obj client.Object, condType any) metav1.Condition { reason, typeStr := "", "" switch cond := condType.(type) { case placementv1alpha1.StagedUpdateRunConditionType: @@ -530,11 +553,17 @@ func generateFalseCondition(updateRun *placementv1alpha1.ClusterStagedUpdateRun, reason = condition.ClusterUpdatingFailedReason } typeStr = string(cond) + case placementv1beta1.ResourceBindingConditionType: + switch cond { + case placementv1beta1.ResourceBindingApplied: + reason = condition.ApplyFailedReason + } + typeStr = string(cond) } return metav1.Condition{ Status: metav1.ConditionFalse, Type: typeStr, - ObservedGeneration: updateRun.Generation, + ObservedGeneration: obj.GetGeneration(), Reason: reason, } } diff --git a/pkg/controllers/updaterun/execution.go b/pkg/controllers/updaterun/execution.go new file mode 100644 index 000000000..3c2d6693a --- /dev/null +++ b/pkg/controllers/updaterun/execution.go @@ -0,0 +1,482 @@ +/* +Copyright (c) Microsoft Corporation. +Licensed under the MIT license. +*/ + +package updaterun + +import ( + "context" + "errors" + "fmt" + "reflect" + "time" + + apierrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/api/meta" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/klog/v2" + "sigs.k8s.io/controller-runtime/pkg/client" + + placementv1alpha1 "go.goms.io/fleet/apis/placement/v1alpha1" + placementv1beta1 "go.goms.io/fleet/apis/placement/v1beta1" + "go.goms.io/fleet/pkg/utils/condition" + "go.goms.io/fleet/pkg/utils/controller" +) + +var ( + // clusterUpdatingWaitTime is the time to wait before rechecking the cluster update status. + // Put it as a variable for convenient testing. + clusterUpdatingWaitTime = 15 * time.Second + + // stageUpdatingWaitTime is the time to wait before rechecking the stage update status. + // Put it as a variable for convenient testing. + stageUpdatingWaitTime = 60 * time.Second +) + +// execute executes the update run by updating the clusters in the updating stage specified by updatingStageIndex. +func (r *Reconciler) execute( + ctx context.Context, + updateRun *placementv1alpha1.ClusterStagedUpdateRun, + updatingStageIndex int, + toBeUpdatedBindings, toBeDeletedBindings []*placementv1beta1.ClusterResourceBinding, +) (bool, time.Duration, error) { + // Mark the update run as started regardless of whether it's already marked. + markUpdateRunStarted(updateRun) + + if updatingStageIndex < len(updateRun.Status.StagesStatus) { + updatingStage := &updateRun.Status.StagesStatus[updatingStageIndex] + waitTime, execErr := r.executeUpdatingStage(ctx, updateRun, updatingStageIndex, toBeUpdatedBindings) + if errors.Is(execErr, errStagedUpdatedAborted) { + markStageUpdatingFailed(updatingStage, updateRun.Generation, execErr.Error()) + return true, waitTime, execErr + } + // The execution has not finished yet. + return false, waitTime, execErr + } + // All the stages have finished, now start the delete stage. + finished, execErr := r.executeDeleteStage(ctx, updateRun, toBeDeletedBindings) + if errors.Is(execErr, errStagedUpdatedAborted) { + markStageUpdatingFailed(updateRun.Status.DeletionStageStatus, updateRun.Generation, execErr.Error()) + return true, 0, execErr + } + return finished, clusterUpdatingWaitTime, execErr +} + +// executeUpdatingStage executes a single updating stage by updating the clusterResourceBindings. +func (r *Reconciler) executeUpdatingStage( + ctx context.Context, + updateRun *placementv1alpha1.ClusterStagedUpdateRun, + updatingStageIndex int, + toBeUpdatedBindings []*placementv1beta1.ClusterResourceBinding, +) (time.Duration, error) { + updatingStageStatus := &updateRun.Status.StagesStatus[updatingStageIndex] + resourceSnapshotName := updateRun.Spec.ResourceSnapshotIndex + updateRunRef := klog.KObj(updateRun) + // Create the map of the toBeUpdatedBindings. + toBeUpdatedBindingsMap := make(map[string]*placementv1beta1.ClusterResourceBinding, len(toBeUpdatedBindings)) + for _, binding := range toBeUpdatedBindings { + toBeUpdatedBindingsMap[binding.Spec.TargetCluster] = binding + } + finishedClusterCount := 0 + + // Go through each cluster in the stage and check if it's updated. + for i := range updatingStageStatus.Clusters { + clusterStatus := &updatingStageStatus.Clusters[i] + clusterStartedCond := meta.FindStatusCondition(clusterStatus.Conditions, string(placementv1alpha1.ClusterUpdatingConditionStarted)) + clusterUpdateSucceededCond := meta.FindStatusCondition(clusterStatus.Conditions, string(placementv1alpha1.ClusterUpdatingConditionSucceeded)) + if condition.IsConditionStatusFalse(clusterUpdateSucceededCond, updateRun.Generation) { + // The cluster is marked as failed to update. + failedErr := fmt.Errorf("the cluster `%s` in the stage %s has failed", clusterStatus.ClusterName, updatingStageStatus.StageName) + klog.ErrorS(failedErr, "The cluster has failed to be updated", "clusterStagedUpdateRun", updateRunRef) + return 0, fmt.Errorf("%w: %s", errStagedUpdatedAborted, failedErr.Error()) + } + if condition.IsConditionStatusTrue(clusterUpdateSucceededCond, updateRun.Generation) { + // The cluster has been updated successfully. + finishedClusterCount++ + continue + } + // The cluster is either updating or not started yet. + binding := toBeUpdatedBindingsMap[clusterStatus.ClusterName] + if !condition.IsConditionStatusTrue(clusterStartedCond, updateRun.Generation) { + // The cluster has not started updating yet. + if !isBindingSyncedWithClusterStatus(updateRun, binding, clusterStatus) { + klog.V(2).InfoS("Found the first cluster that needs to be updated", "cluster", clusterStatus.ClusterName, "stage", updatingStageStatus.StageName, "clusterStagedUpdateRun", updateRunRef) + // The binding is not up-to-date with the cluster status. + binding.Spec.State = placementv1beta1.BindingStateBound + binding.Spec.ResourceSnapshotName = resourceSnapshotName + binding.Spec.ResourceOverrideSnapshots = clusterStatus.ResourceOverrideSnapshots + binding.Spec.ClusterResourceOverrideSnapshots = clusterStatus.ClusterResourceOverrideSnapshots + binding.Spec.ApplyStrategy = updateRun.Status.ApplyStrategy + if err := r.Client.Update(ctx, binding); err != nil { + klog.ErrorS(err, "Failed to update binding to be bound with the matching spec of the updateRun", "binding", klog.KObj(binding), "clusterStagedUpdateRun", updateRunRef) + return 0, controller.NewUpdateIgnoreConflictError(err) + } + klog.V(2).InfoS("Updated the status of a binding to bound", "binding", klog.KObj(binding), "cluster", clusterStatus.ClusterName, "stage", updatingStageStatus.StageName, "clusterStagedUpdateRun", updateRunRef) + } else { + klog.V(2).InfoS("Found the first binding that is updating but the cluster status has not been updated", "cluster", clusterStatus.ClusterName, "stage", updatingStageStatus.StageName, "clusterStagedUpdateRun", updateRunRef) + if binding.Spec.State != placementv1beta1.BindingStateBound { + binding.Spec.State = placementv1beta1.BindingStateBound + if err := r.Client.Update(ctx, binding); err != nil { + klog.ErrorS(err, "Failed to update a binding to be bound", "binding", klog.KObj(binding), "cluster", clusterStatus.ClusterName, "stage", updatingStageStatus.StageName, "clusterStagedUpdateRun", updateRunRef) + return 0, controller.NewUpdateIgnoreConflictError(err) + } + klog.V(2).InfoS("Updated the status of a binding to bound", "binding", klog.KObj(binding), "cluster", clusterStatus.ClusterName, "stage", updatingStageStatus.StageName, "clusterStagedUpdateRun", updateRunRef) + } else { + if _, updateErr := checkClusterUpdateResult(binding, clusterStatus, updatingStageStatus, updateRun); updateErr != nil { + return clusterUpdatingWaitTime, updateErr + } + } + } + markClusterUpdatingStarted(clusterStatus, updateRun.Generation) + if finishedClusterCount == 0 { + markStageUpdatingStarted(updatingStageStatus, updateRun.Generation) + } + // No need to continue as we only support one cluster updating at a time for now. + return clusterUpdatingWaitTime, nil + } + + // Now the cluster has to be updating, the binding should point to the right resource snapshot and the binding should be bound. + if !isBindingSyncedWithClusterStatus(updateRun, binding, clusterStatus) || binding.Spec.State != placementv1beta1.BindingStateBound { + unexpectedErr := controller.NewUnexpectedBehaviorError(fmt.Errorf("the updating cluster `%s` in the stage %s does not match the cluster status: %+v, binding: %+v", clusterStatus.ClusterName, updatingStageStatus.StageName, clusterStatus, binding.Spec)) + klog.ErrorS(unexpectedErr, "The binding has been changed during updating, please check if there's concurrent clusterStagedUpdateRun", "clusterStagedUpdateRun", updateRunRef) + markClusterUpdatingFailed(clusterStatus, updateRun.Generation, unexpectedErr.Error()) + return 0, fmt.Errorf("%w: %s", errStagedUpdatedAborted, unexpectedErr.Error()) + } + finished, updateErr := checkClusterUpdateResult(binding, clusterStatus, updatingStageStatus, updateRun) + if finished { + finishedClusterCount++ + continue + } + // No need to continue as we only support one cluster updating at a time for now. + return clusterUpdatingWaitTime, updateErr + } + + if finishedClusterCount == len(updatingStageStatus.Clusters) { + // All the clusters in the stage have been updated. + markStageUpdatingWaiting(updatingStageStatus, updateRun.Generation) + klog.V(2).InfoS("The stage has finished all cluster updating", "stage", updatingStageStatus.StageName, "clusterStagedUpdateRun", updateRunRef) + // Check if the after stage tasks are ready. + approved, err := r.checkAfterStageTasksStatus(ctx, updatingStageIndex, updateRun) + if err != nil { + return 0, err + } + if approved { + markStageUpdatingSucceeded(updatingStageStatus, updateRun.Generation) + // No need to wait to get to the next stage. + return 0, nil + } + return stageUpdatingWaitTime, nil + } + return clusterUpdatingWaitTime, nil +} + +// executeDeleteStage executes the delete stage by deleting the clusterResourceBindings. +func (r *Reconciler) executeDeleteStage( + ctx context.Context, + updateRun *placementv1alpha1.ClusterStagedUpdateRun, + toBeDeletedBindings []*placementv1beta1.ClusterResourceBinding, +) (bool, error) { + updateRunRef := klog.KObj(updateRun) + existingDeleteStageStatus := updateRun.Status.DeletionStageStatus + existingDeleteStageClusterMap := make(map[string]*placementv1alpha1.ClusterUpdatingStatus, len(existingDeleteStageStatus.Clusters)) + for i := range existingDeleteStageStatus.Clusters { + existingDeleteStageClusterMap[existingDeleteStageStatus.Clusters[i].ClusterName] = &existingDeleteStageStatus.Clusters[i] + } + deletingBinding := 0 + for _, binding := range toBeDeletedBindings { + curCluster, exist := existingDeleteStageClusterMap[binding.Spec.TargetCluster] + if !exist { + // This is unexpected because we already checked in validation. + missingErr := controller.NewUnexpectedBehaviorError(fmt.Errorf("the to be deleted cluster `%s` is not in the deleting stage during execution", binding.Spec.TargetCluster)) + klog.ErrorS(missingErr, "The cluster in the deleting stage does not include all the to be deleted binding", "clusterStagedUpdateRun", updateRunRef) + return false, fmt.Errorf("%w: %s", errStagedUpdatedAborted, missingErr.Error()) + } + // In validation, we already check the binding must exist in the status. + delete(existingDeleteStageClusterMap, binding.Spec.TargetCluster) + // Make sure the cluster is not marked as deleted as the binding is still there. + if condition.IsConditionStatusTrue(meta.FindStatusCondition(curCluster.Conditions, string(placementv1alpha1.ClusterUpdatingConditionSucceeded)), updateRun.Generation) { + unexpectedErr := controller.NewUnexpectedBehaviorError(fmt.Errorf("the deleted cluster `%s` in the deleting stage still has a clusterResourceBinding", binding.Spec.TargetCluster)) + klog.ErrorS(unexpectedErr, "The cluster in the deleting stage is not removed yet but marked as deleted", "cluster", curCluster.ClusterName, "clusterStagedUpdateRun", updateRunRef) + return false, fmt.Errorf("%w: %s", errStagedUpdatedAborted, unexpectedErr.Error()) + } + if condition.IsConditionStatusTrue(meta.FindStatusCondition(curCluster.Conditions, string(placementv1alpha1.ClusterUpdatingConditionStarted)), updateRun.Generation) { + // The cluster status is marked as being deleted. + if binding.DeletionTimestamp.IsZero() { + // The cluster is marked as deleting but the binding is not deleting. + unexpectedErr := controller.NewUnexpectedBehaviorError(fmt.Errorf("the cluster `%s` in the deleting stage is marked as deleting but its corresponding binding is not deleting", curCluster.ClusterName)) + klog.ErrorS(unexpectedErr, "The binding should be deleting before we mark a cluster deleting", "clusterStatus", curCluster, "clusterStagedUpdateRun", updateRunRef) + return false, fmt.Errorf("%w: %s", errStagedUpdatedAborted, unexpectedErr.Error()) + } + deletingBinding++ + continue + } + // The cluster status is not deleting yet + if err := r.Client.Delete(ctx, binding); err != nil { + klog.ErrorS(err, "Failed to delete a binding in the update run", "binding", klog.KObj(binding), "cluster", curCluster.ClusterName, "clusterStagedUpdateRun", updateRunRef) + return false, controller.NewAPIServerError(false, err) + } + klog.V(2).InfoS("Deleted a binding pointing to a to be deleted cluster", "binding", klog.KObj(binding), "cluster", curCluster.ClusterName, "clusterStagedUpdateRun", updateRunRef) + markClusterUpdatingStarted(curCluster, updateRun.Generation) + if deletingBinding == 0 { + markStageUpdatingStarted(updateRun.Status.DeletionStageStatus, updateRun.Generation) + } + deletingBinding++ + } + // The rest of the clusters in the stage are not in the toBeDeletedBindings so it should be marked as delete succeeded. + for _, clusterStatus := range existingDeleteStageClusterMap { + // Make sure the cluster is marked as deleted. + if !condition.IsConditionStatusTrue(meta.FindStatusCondition(clusterStatus.Conditions, string(placementv1alpha1.ClusterUpdatingConditionStarted)), updateRun.Generation) { + markClusterUpdatingStarted(clusterStatus, updateRun.Generation) + } + markClusterUpdatingSucceeded(clusterStatus, updateRun.Generation) + } + klog.InfoS("The delete stage is progressing", "numberOfDeletingClusters", len(toBeDeletedBindings), "clusterStagedUpdateRun", updateRunRef) + if len(toBeDeletedBindings) == 0 { + markStageUpdatingSucceeded(updateRun.Status.DeletionStageStatus, updateRun.Generation) + } + return len(toBeDeletedBindings) == 0, nil +} + +// checkAfterStageTasksStatus checks if the after stage tasks have finished. +// Tt returns if the after stage tasks have finished or error if the after stage tasks failed. +func (r *Reconciler) checkAfterStageTasksStatus(ctx context.Context, updatingStageIndex int, updateRun *placementv1alpha1.ClusterStagedUpdateRun) (bool, error) { + updateRunRef := klog.KObj(updateRun) + updatingStageStatus := &updateRun.Status.StagesStatus[updatingStageIndex] + updatingStage := &updateRun.Status.StagedUpdateStrategySnapshot.Stages[updatingStageIndex] + if updatingStage.AfterStageTasks == nil { + klog.V(2).InfoS("There is no after stage task for this stage", "stage", updatingStage.Name, "clusterStagedUpdateRun", updateRunRef) + return true, nil + } + for i, task := range updatingStage.AfterStageTasks { + switch task.Type { + case placementv1alpha1.AfterStageTaskTypeTimedWait: + waitStartTime := meta.FindStatusCondition(updatingStageStatus.Conditions, string(placementv1alpha1.StageUpdatingConditionProgressing)).LastTransitionTime.Time + // Check if the wait time has passed. + if waitStartTime.Add(task.WaitTime.Duration).After(time.Now()) { + klog.V(2).InfoS("The after stage task still need to wait", "waitStartTime", waitStartTime, "waitTime", task.WaitTime, "stage", updatingStage.Name, "clusterStagedUpdateRun", updateRunRef) + return false, nil + } + markAfterStageWaitTimeElapsed(&updatingStageStatus.AfterStageTaskStatus[i], updateRun.Generation) + klog.V(2).InfoS("The after stage wait task has completed", "stage", updatingStage.Name, "clusterStagedUpdateRun", updateRunRef) + case placementv1alpha1.AfterStageTaskTypeApproval: + // Check if the approval request has been created. + approvalRequest := placementv1alpha1.ClusterApprovalRequest{ + ObjectMeta: metav1.ObjectMeta{ + Name: updatingStageStatus.AfterStageTaskStatus[i].ApprovalRequestName, + Labels: map[string]string{ + placementv1alpha1.TargetUpdatingStageNameLabel: updatingStage.Name, + placementv1alpha1.TargetUpdateRunLabel: updateRun.Name, + placementv1alpha1.IsLatestUpdateRunApprovalLabel: "true", + }, + }, + Spec: placementv1alpha1.ApprovalRequestSpec{ + TargetUpdateRun: updateRun.Name, + TargetStage: updatingStage.Name, + }, + } + requestRef := klog.KObj(&approvalRequest) + if err := r.Client.Create(ctx, &approvalRequest); err != nil { + if apierrors.IsAlreadyExists(err) { + // The approval task already exists. + markAfterStageRequestCreated(&updatingStageStatus.AfterStageTaskStatus[i], updateRun.Generation) + if err = r.Client.Get(ctx, client.ObjectKeyFromObject(&approvalRequest), &approvalRequest); err != nil { + klog.ErrorS(err, "Failed to get the already existing approval request", "approvalRequest", requestRef, "stage", updatingStage.Name, "clusterStagedUpdateRun", updateRunRef) + return false, controller.NewAPIServerError(true, err) + } + if approvalRequest.Spec.TargetStage != updatingStage.Name || approvalRequest.Spec.TargetUpdateRun != updateRun.Name { + unexpectedErr := controller.NewUnexpectedBehaviorError(fmt.Errorf("the approval request task `%s` is targeting update run `%s` and stage `%s` ", approvalRequest.Name, approvalRequest.Spec.TargetStage, approvalRequest.Spec.TargetUpdateRun)) + klog.ErrorS(unexpectedErr, "Found an approval request targeting wrong stage", "approvalRequestTask", requestRef, "stage", updatingStage.Name, "clusterStagedUpdateRun", updateRunRef) + return false, fmt.Errorf("%w: %s", errStagedUpdatedAborted, unexpectedErr.Error()) + } + if !condition.IsConditionStatusTrue(meta.FindStatusCondition(approvalRequest.Status.Conditions, string(placementv1alpha1.ApprovalRequestConditionApproved)), approvalRequest.Generation) { + klog.V(2).InfoS("The approval request has not been approved yet", "approvalRequestTask", requestRef, "stage", updatingStage.Name, "clusterStagedUpdateRun", updateRunRef) + return false, nil + } + klog.V(2).InfoS("The approval request has been approved", "approvalRequestTask", requestRef, "stage", updatingStage.Name, "clusterStagedUpdateRun", updateRunRef) + markAfterStageRequestApproved(&updatingStageStatus.AfterStageTaskStatus[i], updateRun.Generation) + } else { + // retriable error + klog.ErrorS(err, "Failed to create the approval request", "approvalRequest", requestRef, "stage", updatingStage.Name, "clusterStagedUpdateRun", updateRunRef) + return false, controller.NewAPIServerError(false, err) + } + } else { + // The approval request has been created for the first time. + klog.V(2).InfoS("The approval request has been created", "approvalRequestTask", requestRef, "stage", updatingStage.Name, "clusterStagedUpdateRun", updateRunRef) + markAfterStageRequestCreated(&updatingStageStatus.AfterStageTaskStatus[i], updateRun.Generation) + return false, nil + } + } + } + // All the after stage tasks have been finished or the for loop will return before this line. + return true, nil +} + +// isBindingSyncedWithClusterStatus checks if the binding is up-to-date with the cluster status. +func isBindingSyncedWithClusterStatus(updateRun *placementv1alpha1.ClusterStagedUpdateRun, binding *placementv1beta1.ClusterResourceBinding, cluster *placementv1alpha1.ClusterUpdatingStatus) bool { + if binding.Spec.ResourceSnapshotName != updateRun.Spec.ResourceSnapshotIndex { + klog.ErrorS(fmt.Errorf("binding has different resourceSnapshotName, want: %s, got: %s", updateRun.Spec.ResourceSnapshotIndex, binding.Spec.ResourceSnapshotName), "ClusterResourceBinding is not up-to-date", "clusterResourceBinding", klog.KObj(binding), "clusterStagedUpdateRun", klog.KObj(updateRun)) + return false + } + if !reflect.DeepEqual(cluster.ResourceOverrideSnapshots, binding.Spec.ResourceOverrideSnapshots) { + klog.ErrorS(fmt.Errorf("binding has different resourceOverrideSnapshots, want: %v, got: %v", cluster.ResourceOverrideSnapshots, binding.Spec.ResourceOverrideSnapshots), "ClusterResourceBinding is not up-to-date", "clusterResourceBinding", klog.KObj(binding), "clusterStagedUpdateRun", klog.KObj(updateRun)) + return false + } + if !reflect.DeepEqual(cluster.ClusterResourceOverrideSnapshots, binding.Spec.ClusterResourceOverrideSnapshots) { + klog.ErrorS(fmt.Errorf("binding has different clusterResourceOverrideSnapshots, want: %v, got: %v", cluster.ClusterResourceOverrideSnapshots, binding.Spec.ClusterResourceOverrideSnapshots), "ClusterResourceBinding is not up-to-date", "clusterResourceBinding", klog.KObj(binding), "clusterStagedUpdateRun", klog.KObj(updateRun)) + return false + } + if !reflect.DeepEqual(binding.Spec.ApplyStrategy, updateRun.Status.ApplyStrategy) { + klog.ErrorS(fmt.Errorf("binding has different applyStrategy, want: %v, got: %v", updateRun.Status.ApplyStrategy, binding.Spec.ApplyStrategy), "ClusterResourceBinding is not up-to-date", "clusterResourceBinding", klog.KObj(binding), "clusterStagedUpdateRun", klog.KObj(updateRun)) + return false + } + return true +} + +// checkClusterUpdateResult checks if the cluster has been updated successfully. +// It returns if the cluster has been updated successfully and the error if the cluster update failed. +func checkClusterUpdateResult( + binding *placementv1beta1.ClusterResourceBinding, + clusterStatus *placementv1alpha1.ClusterUpdatingStatus, + updatingStage *placementv1alpha1.StageUpdatingStatus, + updateRun *placementv1alpha1.ClusterStagedUpdateRun, +) (bool, error) { + availCond := binding.GetCondition(string(placementv1beta1.ResourceBindingAvailable)) + if condition.IsConditionStatusTrue(availCond, binding.Generation) { + // The resource updated on the cluster is available. + klog.InfoS("The cluster has been updated", "cluster", clusterStatus.ClusterName, "stage", updatingStage.StageName, "clusterStagedUpdateRun", klog.KObj(updateRun)) + markClusterUpdatingSucceeded(clusterStatus, updateRun.Generation) + return true, nil + } + for i := condition.OverriddenCondition; i <= condition.AppliedCondition; i++ { + bindingCond := binding.GetCondition(string(i.ResourceBindingConditionType())) + if condition.IsConditionStatusFalse(bindingCond, binding.Generation) { + // We have no way to know if the failed condition is recoverable or not so we just let it run + klog.InfoS("The cluster updating encountered an error", "failedCondition", bindingCond, "cluster", clusterStatus.ClusterName, "stage", updatingStage.StageName, "clusterStagedUpdateRun", klog.KObj(updateRun)) + // TODO(wantjian): identify some non-recoverable error and mark the cluster updating as failed + return false, fmt.Errorf("the cluster updating encountered an error at stage `%s`, err := `%s`", string(i.ResourceBindingConditionType()), bindingCond.Message) + } + } + return false, nil +} + +// markUpdateRunStarted marks the update run as started in memory. +func markUpdateRunStarted(updateRun *placementv1alpha1.ClusterStagedUpdateRun) { + meta.SetStatusCondition(&updateRun.Status.Conditions, metav1.Condition{ + Type: string(placementv1alpha1.StagedUpdateRunConditionProgressing), + Status: metav1.ConditionTrue, + ObservedGeneration: updateRun.Generation, + Reason: condition.UpdateRunStartedReason, + }) +} + +// markStageUpdatingStarted marks the stage updating status as started in memory. +func markStageUpdatingStarted(stageUpdatingStatus *placementv1alpha1.StageUpdatingStatus, generation int64) { + if stageUpdatingStatus.StartTime == nil { + stageUpdatingStatus.StartTime = &metav1.Time{Time: time.Now()} + } + meta.SetStatusCondition(&stageUpdatingStatus.Conditions, metav1.Condition{ + Type: string(placementv1alpha1.StageUpdatingConditionProgressing), + Status: metav1.ConditionTrue, + ObservedGeneration: generation, + Reason: condition.StageUpdatingStartedReason, + }) +} + +// markStageUpdatingWaiting marks the stage updating status as waiting in memory. +func markStageUpdatingWaiting(stageUpdatingStatus *placementv1alpha1.StageUpdatingStatus, generation int64) { + meta.SetStatusCondition(&stageUpdatingStatus.Conditions, metav1.Condition{ + Type: string(placementv1alpha1.StageUpdatingConditionProgressing), + Status: metav1.ConditionFalse, + ObservedGeneration: generation, + Reason: condition.StageUpdatingWaitingReason, + }) +} + +// markStageUpdatingSucceeded marks the stage updating status as succeeded in memory. +func markStageUpdatingSucceeded(stageUpdatingStatus *placementv1alpha1.StageUpdatingStatus, generation int64) { + if stageUpdatingStatus.EndTime == nil { + stageUpdatingStatus.EndTime = &metav1.Time{Time: time.Now()} + } + meta.SetStatusCondition(&stageUpdatingStatus.Conditions, metav1.Condition{ + Type: string(placementv1alpha1.StageUpdatingConditionSucceeded), + Status: metav1.ConditionTrue, + ObservedGeneration: generation, + Reason: condition.StageUpdatingSucceededReason, + }) +} + +// markStageUpdatingFailed marks the stage updating status as failed in memory. +func markStageUpdatingFailed(stageUpdatingStatus *placementv1alpha1.StageUpdatingStatus, generation int64, message string) { + if stageUpdatingStatus.EndTime == nil { + stageUpdatingStatus.EndTime = &metav1.Time{Time: time.Now()} + } + meta.SetStatusCondition(&stageUpdatingStatus.Conditions, metav1.Condition{ + Type: string(placementv1alpha1.StageUpdatingConditionSucceeded), + Status: metav1.ConditionFalse, + ObservedGeneration: generation, + Reason: condition.StageUpdatingFailedReason, + Message: message, + }) +} + +// markClusterUpdatingStarted marks the cluster updating status as started in memory. +func markClusterUpdatingStarted(clusterUpdatingStatus *placementv1alpha1.ClusterUpdatingStatus, generation int64) { + meta.SetStatusCondition(&clusterUpdatingStatus.Conditions, metav1.Condition{ + Type: string(placementv1alpha1.ClusterUpdatingConditionStarted), + Status: metav1.ConditionTrue, + ObservedGeneration: generation, + Reason: condition.ClusterUpdatingStartedReason, + }) +} + +// markClusterUpdatingSucceeded marks the cluster updating status as succeeded in memory. +func markClusterUpdatingSucceeded(clusterUpdatingStatus *placementv1alpha1.ClusterUpdatingStatus, generation int64) { + meta.SetStatusCondition(&clusterUpdatingStatus.Conditions, metav1.Condition{ + Type: string(placementv1alpha1.ClusterUpdatingConditionSucceeded), + Status: metav1.ConditionTrue, + ObservedGeneration: generation, + Reason: condition.ClusterUpdatingSucceededReason, + }) +} + +// markClusterUpdatingFailed marks the cluster updating status as failed in memory. +func markClusterUpdatingFailed(clusterUpdatingStatus *placementv1alpha1.ClusterUpdatingStatus, generation int64, message string) { + meta.SetStatusCondition(&clusterUpdatingStatus.Conditions, metav1.Condition{ + Type: string(placementv1alpha1.ClusterUpdatingConditionSucceeded), + Status: metav1.ConditionFalse, + ObservedGeneration: generation, + Reason: condition.ClusterUpdatingFailedReason, + Message: message, + }) +} + +// markAfterStageRequestCreated marks the Approval after stage task as ApprovalRequestCreated in memory. +func markAfterStageRequestCreated(afterStageTaskStatus *placementv1alpha1.AfterStageTaskStatus, generation int64) { + meta.SetStatusCondition(&afterStageTaskStatus.Conditions, metav1.Condition{ + Type: string(placementv1alpha1.AfterStageTaskConditionApprovalRequestCreated), + Status: metav1.ConditionTrue, + ObservedGeneration: generation, + Reason: condition.AfterStageTaskApprovalRequestCreatedReason, + }) +} + +// markAfterStageRequestApproved marks the Approval after stage task as Approved in memory. +func markAfterStageRequestApproved(afterStageTaskStatus *placementv1alpha1.AfterStageTaskStatus, generation int64) { + meta.SetStatusCondition(&afterStageTaskStatus.Conditions, metav1.Condition{ + Type: string(placementv1alpha1.AfterStageTaskConditionApprovalRequestApproved), + Status: metav1.ConditionTrue, + ObservedGeneration: generation, + Reason: condition.AfterStageTaskApprovalRequestApprovedReason, + }) +} + +// markAfterStageWaitTimeElapsed marks the TimeWait after stage task as TimeElapsed in memory. +func markAfterStageWaitTimeElapsed(afterStageTaskStatus *placementv1alpha1.AfterStageTaskStatus, generation int64) { + meta.SetStatusCondition(&afterStageTaskStatus.Conditions, metav1.Condition{ + Type: string(placementv1alpha1.AfterStageTaskConditionWaitTimeElapsed), + Status: metav1.ConditionTrue, + ObservedGeneration: generation, + Reason: condition.AfterStageTaskWaitTimeElapsedReason, + }) +} diff --git a/pkg/controllers/updaterun/execution_integration_test.go b/pkg/controllers/updaterun/execution_integration_test.go new file mode 100644 index 000000000..62cedadb5 --- /dev/null +++ b/pkg/controllers/updaterun/execution_integration_test.go @@ -0,0 +1,499 @@ +/* +Copyright (c) Microsoft Corporation. +Licensed under the MIT license. +*/ + +package updaterun + +import ( + "context" + "encoding/json" + "fmt" + "strconv" + "time" + + "github.com/google/go-cmp/cmp" + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + corev1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/api/meta" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + + clusterv1beta1 "go.goms.io/fleet/apis/cluster/v1beta1" + placementv1alpha1 "go.goms.io/fleet/apis/placement/v1alpha1" + placementv1beta1 "go.goms.io/fleet/apis/placement/v1beta1" + "go.goms.io/fleet/pkg/utils" + "go.goms.io/fleet/pkg/utils/condition" +) + +var _ = Describe("UpdateRun execution tests", func() { + var updateRun *placementv1alpha1.ClusterStagedUpdateRun + var crp *placementv1beta1.ClusterResourcePlacement + var policySnapshot *placementv1beta1.ClusterSchedulingPolicySnapshot + var updateStrategy *placementv1alpha1.ClusterStagedUpdateStrategy + var resourceBindings []*placementv1beta1.ClusterResourceBinding + var targetClusters []*clusterv1beta1.MemberCluster + var unscheduledCluster []*clusterv1beta1.MemberCluster + var resourceSnapshot *placementv1beta1.ClusterResourceSnapshot + var clusterResourceOverride *placementv1alpha1.ClusterResourceOverrideSnapshot + var wantStatus *placementv1alpha1.StagedUpdateRunStatus + + BeforeEach(OncePerOrdered, func() { + testUpdateRunName = "updaterun-" + utils.RandStr() + testCRPName = "crp-" + utils.RandStr() + testResourceSnapshotName = "snapshot-" + utils.RandStr() + testUpdateStrategyName = "updatestrategy-" + utils.RandStr() + testCROName = "cro-" + utils.RandStr() + updateRunNamespacedName = types.NamespacedName{Name: testUpdateRunName} + + updateRun = generateTestClusterStagedUpdateRun() + crp = generateTestClusterResourcePlacement() + policySnapshot = generateTestClusterSchedulingPolicySnapshot(1) + updateStrategy = generateTestClusterStagedUpdateStrategy() + clusterResourceOverride = generateTestClusterResourceOverride() + + resourceBindings = make([]*placementv1beta1.ClusterResourceBinding, numTargetClusters+numUnscheduledClusters) + targetClusters = make([]*clusterv1beta1.MemberCluster, numTargetClusters) + for i := range targetClusters { + // split the clusters into 2 regions + region := regionEastus + if i%2 == 0 { + region = regionWestus + } + // reserse the order of the clusters by index + targetClusters[i] = generateTestMemberCluster(numTargetClusters-1-i, "cluster-"+strconv.Itoa(i), map[string]string{"group": "prod", "region": region}) + resourceBindings[i] = generateTestClusterResourceBinding(policySnapshot.Name, targetClusters[i].Name) + } + + unscheduledCluster = make([]*clusterv1beta1.MemberCluster, numUnscheduledClusters) + for i := range unscheduledCluster { + unscheduledCluster[i] = generateTestMemberCluster(i, "unscheduled-cluster-"+strconv.Itoa(i), map[string]string{"group": "staging"}) + // update the policySnapshot name so that these clusters are considered to-be-deleted + resourceBindings[numTargetClusters+i] = generateTestClusterResourceBinding(policySnapshot.Name+"a", unscheduledCluster[i].Name) + } + + var err error + testNamespace, err = json.Marshal(corev1.Namespace{ + TypeMeta: metav1.TypeMeta{ + APIVersion: "v1", + Kind: "Namespace", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "test-namespace", + Labels: map[string]string{ + "fleet.azure.com/name": "test-namespace", + }, + }, + }) + Expect(err).To(Succeed()) + resourceSnapshot = generateTestClusterResourceSnapshot() + + // Set smaller wait time for testing + stageUpdatingWaitTime = time.Second * 3 + clusterUpdatingWaitTime = time.Second * 2 + + By("Creating a new clusterResourcePlacement") + Expect(k8sClient.Create(ctx, crp)).To(Succeed()) + + By("Creating scheduling policy snapshot") + Expect(k8sClient.Create(ctx, policySnapshot)).To(Succeed()) + + By("Setting the latest policy snapshot condition as fully scheduled") + meta.SetStatusCondition(&policySnapshot.Status.Conditions, metav1.Condition{ + Type: string(placementv1beta1.PolicySnapshotScheduled), + Status: metav1.ConditionTrue, + ObservedGeneration: policySnapshot.Generation, + Reason: "scheduled", + }) + Expect(k8sClient.Status().Update(ctx, policySnapshot)).Should(Succeed(), "failed to update the policy snapshot condition") + + By("Creating the member clusters") + for _, cluster := range targetClusters { + Expect(k8sClient.Create(ctx, cluster)).To(Succeed()) + } + for _, cluster := range unscheduledCluster { + Expect(k8sClient.Create(ctx, cluster)).To(Succeed()) + } + + By("Creating a bunch of ClusterResourceBindings") + for _, binding := range resourceBindings { + Expect(k8sClient.Create(ctx, binding)).To(Succeed()) + } + + By("Creating a clusterStagedUpdateStrategy") + Expect(k8sClient.Create(ctx, updateStrategy)).To(Succeed()) + + By("Creating a new resource snapshot") + Expect(k8sClient.Create(ctx, resourceSnapshot)).To(Succeed()) + + By("Creating a new cluster resource override") + Expect(k8sClient.Create(ctx, clusterResourceOverride)).To(Succeed()) + + By("Creating a new clusterStagedUpdateRun") + Expect(k8sClient.Create(ctx, updateRun)).To(Succeed()) + + By("Validating the initialization succeededand and the execution started") + initialized := generateSucceededInitializationStatus(crp, updateRun, policySnapshot, updateStrategy, clusterResourceOverride) + wantStatus = generateExecutionStartedStatus(updateRun, initialized) + validateClusterStagedUpdateRunStatus(ctx, updateRun, wantStatus, "") + }) + + AfterEach(OncePerOrdered, func() { + By("Deleting the clusterStagedUpdateRun") + Expect(k8sClient.Delete(ctx, updateRun)).Should(Succeed()) + updateRun = nil + + By("Deleting the clusterResourcePlacement") + Expect(k8sClient.Delete(ctx, crp)).Should(SatisfyAny(Succeed(), utils.NotFoundMatcher{})) + crp = nil + + By("Deleting the clusterSchedulingPolicySnapshot") + Expect(k8sClient.Delete(ctx, policySnapshot)).Should(SatisfyAny(Succeed(), utils.NotFoundMatcher{})) + policySnapshot = nil + + By("Deleting the clusterResourceBindings") + for _, binding := range resourceBindings { + Expect(k8sClient.Delete(ctx, binding)).Should(SatisfyAny(Succeed(), utils.NotFoundMatcher{})) + } + resourceBindings = nil + + By("Deleting the member clusters") + for _, cluster := range targetClusters { + Expect(k8sClient.Delete(ctx, cluster)).Should(SatisfyAny(Succeed(), utils.NotFoundMatcher{})) + } + for _, cluster := range unscheduledCluster { + Expect(k8sClient.Delete(ctx, cluster)).Should(SatisfyAny(Succeed(), utils.NotFoundMatcher{})) + } + targetClusters, unscheduledCluster = nil, nil + + By("Deleting the clusterStagedUpdateStrategy") + Expect(k8sClient.Delete(ctx, updateStrategy)).Should(SatisfyAny(Succeed(), utils.NotFoundMatcher{})) + updateStrategy = nil + + By("Deleting the clusterResourceSnapshot") + Expect(k8sClient.Delete(ctx, resourceSnapshot)).Should(SatisfyAny(Succeed(), utils.NotFoundMatcher{})) + resourceSnapshot = nil + + By("Deleting the clusterResourceOverride") + Expect(k8sClient.Delete(ctx, clusterResourceOverride)).Should(SatisfyAny(Succeed(), utils.NotFoundMatcher{})) + clusterResourceOverride = nil + }) + + Context("Cluster staged update run should update clusters one by one", Ordered, func() { + It("Should mark the 1st cluster in the 1st stage as succeeded", func() { + By("Validating the 1st clusterResourceBinding is updated to Bound") + binding := resourceBindings[numTargetClusters-1] // cluster-9 + validateBindingState(ctx, binding, resourceSnapshot.Name, updateRun, 0) + + By("Updating the 1st clusterResourceBinding to Available") + meta.SetStatusCondition(&binding.Status.Conditions, generateTrueCondition(binding, placementv1beta1.ResourceBindingAvailable)) + Expect(k8sClient.Status().Update(ctx, binding)).Should(Succeed(), "failed to update the binding status") + + By("Validating the 1st cluster has succeeded and 2nd cluster has started") + wantStatus.StagesStatus[0].Clusters[0].Conditions = append(wantStatus.StagesStatus[0].Clusters[0].Conditions, generateTrueCondition(updateRun, placementv1alpha1.ClusterUpdatingConditionSucceeded)) + wantStatus.StagesStatus[0].Clusters[1].Conditions = append(wantStatus.StagesStatus[0].Clusters[1].Conditions, generateTrueCondition(updateRun, placementv1alpha1.ClusterUpdatingConditionStarted)) + validateClusterStagedUpdateRunStatus(ctx, updateRun, wantStatus, "") + + By("Validating the 1st stage has startTime set") + Expect(updateRun.Status.StagesStatus[0].StartTime).ShouldNot(BeNil()) + }) + + It("Should mark the 2nd cluster in the 1st stage as succeeded", func() { + By("Validating the 2nd clusterResourceBinding is updated to Bound") + binding := resourceBindings[numTargetClusters-3] // cluster-7 + validateBindingState(ctx, binding, resourceSnapshot.Name, updateRun, 0) + + By("Updating the 2nd clusterResourceBinding to Available") + meta.SetStatusCondition(&binding.Status.Conditions, generateTrueCondition(binding, placementv1beta1.ResourceBindingAvailable)) + Expect(k8sClient.Status().Update(ctx, binding)).Should(Succeed(), "failed to update the binding status") + + By("Validating the 2nd cluster has succeeded and 3rd cluster has started") + wantStatus.StagesStatus[0].Clusters[1].Conditions = append(wantStatus.StagesStatus[0].Clusters[1].Conditions, generateTrueCondition(updateRun, placementv1alpha1.ClusterUpdatingConditionSucceeded)) + wantStatus.StagesStatus[0].Clusters[2].Conditions = append(wantStatus.StagesStatus[0].Clusters[2].Conditions, generateTrueCondition(updateRun, placementv1alpha1.ClusterUpdatingConditionStarted)) + validateClusterStagedUpdateRunStatus(ctx, updateRun, wantStatus, "") + }) + + It("Should mark the 3rd cluster in the 1st stage as succeeded", func() { + By("Validating the 3rd clusterResourceBinding is updated to Bound") + binding := resourceBindings[numTargetClusters-5] // cluster-5 + validateBindingState(ctx, binding, resourceSnapshot.Name, updateRun, 0) + + By("Updating the 3rd clusterResourceBinding to Available") + meta.SetStatusCondition(&binding.Status.Conditions, generateTrueCondition(binding, placementv1beta1.ResourceBindingAvailable)) + Expect(k8sClient.Status().Update(ctx, binding)).Should(Succeed(), "failed to update the binding status") + + By("Validating the 3rd cluster has succeeded and 4th cluster has started") + wantStatus.StagesStatus[0].Clusters[2].Conditions = append(wantStatus.StagesStatus[0].Clusters[2].Conditions, generateTrueCondition(updateRun, placementv1alpha1.ClusterUpdatingConditionSucceeded)) + wantStatus.StagesStatus[0].Clusters[3].Conditions = append(wantStatus.StagesStatus[0].Clusters[3].Conditions, generateTrueCondition(updateRun, placementv1alpha1.ClusterUpdatingConditionStarted)) + validateClusterStagedUpdateRunStatus(ctx, updateRun, wantStatus, "") + }) + + It("Should mark the 4th cluster in the 1st stage as succeeded", func() { + By("Validating the 4th clusterResourceBinding is updated to Bound") + binding := resourceBindings[numTargetClusters-7] // cluster-3 + validateBindingState(ctx, binding, resourceSnapshot.Name, updateRun, 0) + + By("Updating the 4th clusterResourceBinding to Available") + meta.SetStatusCondition(&binding.Status.Conditions, generateTrueCondition(binding, placementv1beta1.ResourceBindingAvailable)) + Expect(k8sClient.Status().Update(ctx, binding)).Should(Succeed(), "failed to update the binding status") + + By("Validating the 4th cluster has succeeded and 5th cluster has started") + wantStatus.StagesStatus[0].Clusters[3].Conditions = append(wantStatus.StagesStatus[0].Clusters[3].Conditions, generateTrueCondition(updateRun, placementv1alpha1.ClusterUpdatingConditionSucceeded)) + wantStatus.StagesStatus[0].Clusters[4].Conditions = append(wantStatus.StagesStatus[0].Clusters[4].Conditions, generateTrueCondition(updateRun, placementv1alpha1.ClusterUpdatingConditionStarted)) + validateClusterStagedUpdateRunStatus(ctx, updateRun, wantStatus, "") + }) + + It("Should mark the 5th cluster in the 1st stage as succeeded", func() { + By("Validating the 5th clusterResourceBinding is updated to Bound") + binding := resourceBindings[numTargetClusters-9] // cluster-1 + validateBindingState(ctx, binding, resourceSnapshot.Name, updateRun, 0) + + By("Updating the 5th clusterResourceBinding to Available") + meta.SetStatusCondition(&binding.Status.Conditions, generateTrueCondition(binding, placementv1beta1.ResourceBindingAvailable)) + Expect(k8sClient.Status().Update(ctx, binding)).Should(Succeed(), "failed to update the binding status") + + By("Validating the 5th cluster has succeeded and stage waiting for AfterStageTask") + stageWaitingCondition := generateFalseCondition(updateRun, placementv1alpha1.StageUpdatingConditionProgressing) + stageWaitingCondition.Reason = condition.StageUpdatingWaitingReason + wantStatus.StagesStatus[0].Clusters[4].Conditions = append(wantStatus.StagesStatus[0].Clusters[4].Conditions, generateTrueCondition(updateRun, placementv1alpha1.ClusterUpdatingConditionSucceeded)) + wantStatus.StagesStatus[0].Conditions[0] = stageWaitingCondition // The progressing condition now becomes false with waiting reason. + validateClusterStagedUpdateRunStatus(ctx, updateRun, wantStatus, "") + }) + + It("Should complete the 1st stage after wait time passed and move on to the 2nd stage", func() { + By("Validating the waitTime after stage task has completed and 2nd stage has started") + // AfterStageTask completed. + wantStatus.StagesStatus[0].AfterStageTaskStatus[0].Conditions = append(wantStatus.StagesStatus[0].AfterStageTaskStatus[0].Conditions, + generateTrueCondition(updateRun, placementv1alpha1.AfterStageTaskConditionWaitTimeElapsed)) + // 1st stage completed. + wantStatus.StagesStatus[0].Conditions = append(wantStatus.StagesStatus[0].Conditions, generateTrueCondition(updateRun, placementv1alpha1.StageUpdatingConditionSucceeded)) + // 2nd stage started. + wantStatus.StagesStatus[1].Conditions = append(wantStatus.StagesStatus[1].Conditions, generateTrueCondition(updateRun, placementv1alpha1.StageUpdatingConditionProgressing)) + // 1st cluster in 2nd stage started. + wantStatus.StagesStatus[1].Clusters[0].Conditions = append(wantStatus.StagesStatus[1].Clusters[0].Conditions, generateTrueCondition(updateRun, placementv1alpha1.ClusterUpdatingConditionStarted)) + validateClusterStagedUpdateRunStatus(ctx, updateRun, wantStatus, "") + + By("Validating the 1st stage has endTime set") + Expect(updateRun.Status.StagesStatus[0].EndTime).ShouldNot(BeNil()) + + By("Validating the waitTime after stage task only completes after the wait time") + waitStartTime := meta.FindStatusCondition(updateRun.Status.StagesStatus[0].Conditions, string(placementv1alpha1.StageUpdatingConditionProgressing)).LastTransitionTime.Time + waitEndTime := meta.FindStatusCondition(updateRun.Status.StagesStatus[0].AfterStageTaskStatus[0].Conditions, string(placementv1alpha1.AfterStageTaskConditionWaitTimeElapsed)).LastTransitionTime.Time + // In this test, I set wait time to be 4 seconds, while stageClusterUpdatingWaitTime is 3 seconds. + // So it needs 2 rounds of reconcile to wait for the waitTime to elapse, waitEndTime - waitStartTime should be around 6 seconds. + Expect(waitStartTime.Add(updateStrategy.Spec.Stages[0].AfterStageTasks[0].WaitTime.Duration).Before(waitEndTime)).Should(BeTrue()) + }) + + It("Should mark the 1st cluster in the 2nd stage as succeeded", func() { + By("Validating the 1st clusterResourceBinding is updated to Bound") + binding := resourceBindings[0] // cluster-0 + validateBindingState(ctx, binding, resourceSnapshot.Name, updateRun, 1) + + By("Updating the 1st clusterResourceBinding to Available") + meta.SetStatusCondition(&binding.Status.Conditions, generateTrueCondition(binding, placementv1beta1.ResourceBindingAvailable)) + Expect(k8sClient.Status().Update(ctx, binding)).Should(Succeed(), "failed to update the binding status") + + By("Validating the 1st cluster has succeeded and 2nd cluster has started") + wantStatus.StagesStatus[1].Clusters[0].Conditions = append(wantStatus.StagesStatus[1].Clusters[0].Conditions, generateTrueCondition(updateRun, placementv1alpha1.ClusterUpdatingConditionSucceeded)) + wantStatus.StagesStatus[1].Clusters[1].Conditions = append(wantStatus.StagesStatus[1].Clusters[1].Conditions, generateTrueCondition(updateRun, placementv1alpha1.ClusterUpdatingConditionStarted)) + validateClusterStagedUpdateRunStatus(ctx, updateRun, wantStatus, "") + + By("Validating the 2nd stage has startTime set") + Expect(updateRun.Status.StagesStatus[0].StartTime).ShouldNot(BeNil()) + }) + + It("Should mark the 2nd cluster in the 2nd stage as succeeded", func() { + By("Validating the 2nd clusterResourceBinding is updated to Bound") + binding := resourceBindings[2] // cluster-2 + validateBindingState(ctx, binding, resourceSnapshot.Name, updateRun, 1) + + By("Updating the 2nd clusterResourceBinding to Available") + meta.SetStatusCondition(&binding.Status.Conditions, generateTrueCondition(binding, placementv1beta1.ResourceBindingAvailable)) + Expect(k8sClient.Status().Update(ctx, binding)).Should(Succeed(), "failed to update the binding status") + + By("Validating the 2nd cluster has succeeded and 3rd cluster has started") + wantStatus.StagesStatus[1].Clusters[1].Conditions = append(wantStatus.StagesStatus[1].Clusters[1].Conditions, generateTrueCondition(updateRun, placementv1alpha1.ClusterUpdatingConditionSucceeded)) + wantStatus.StagesStatus[1].Clusters[2].Conditions = append(wantStatus.StagesStatus[1].Clusters[2].Conditions, generateTrueCondition(updateRun, placementv1alpha1.ClusterUpdatingConditionStarted)) + validateClusterStagedUpdateRunStatus(ctx, updateRun, wantStatus, "") + }) + + It("Should mark the 3rd cluster in the 2nd stage as succeeded", func() { + By("Validating the 3rd clusterResourceBinding is updated to Bound") + binding := resourceBindings[4] // cluster-4 + validateBindingState(ctx, binding, resourceSnapshot.Name, updateRun, 1) + + By("Updating the 3rd clusterResourceBinding to Available") + meta.SetStatusCondition(&binding.Status.Conditions, generateTrueCondition(binding, placementv1beta1.ResourceBindingAvailable)) + Expect(k8sClient.Status().Update(ctx, binding)).Should(Succeed(), "failed to update the binding status") + + By("Validating the 3rd cluster has succeeded and 4th cluster has started") + wantStatus.StagesStatus[1].Clusters[2].Conditions = append(wantStatus.StagesStatus[1].Clusters[2].Conditions, generateTrueCondition(updateRun, placementv1alpha1.ClusterUpdatingConditionSucceeded)) + wantStatus.StagesStatus[1].Clusters[3].Conditions = append(wantStatus.StagesStatus[1].Clusters[3].Conditions, generateTrueCondition(updateRun, placementv1alpha1.ClusterUpdatingConditionStarted)) + validateClusterStagedUpdateRunStatus(ctx, updateRun, wantStatus, "") + }) + + It("Should mark the 4th cluster in the 2nd stage as succeeded", func() { + By("Validating the 4th clusterResourceBinding is updated to Bound") + binding := resourceBindings[6] // cluster-6 + validateBindingState(ctx, binding, resourceSnapshot.Name, updateRun, 1) + + By("Updating the 4th clusterResourceBinding to Available") + meta.SetStatusCondition(&binding.Status.Conditions, generateTrueCondition(binding, placementv1beta1.ResourceBindingAvailable)) + Expect(k8sClient.Status().Update(ctx, binding)).Should(Succeed(), "failed to update the binding status") + + By("Validating the 4th cluster has succeeded and 5th cluster has started") + wantStatus.StagesStatus[1].Clusters[3].Conditions = append(wantStatus.StagesStatus[1].Clusters[3].Conditions, generateTrueCondition(updateRun, placementv1alpha1.ClusterUpdatingConditionSucceeded)) + wantStatus.StagesStatus[1].Clusters[4].Conditions = append(wantStatus.StagesStatus[1].Clusters[4].Conditions, generateTrueCondition(updateRun, placementv1alpha1.ClusterUpdatingConditionStarted)) + validateClusterStagedUpdateRunStatus(ctx, updateRun, wantStatus, "") + }) + + It("Should mark the 5th cluster in the 2nd stage as succeeded", func() { + By("Validating the 5th clusterResourceBinding is updated to Bound") + binding := resourceBindings[8] // cluster-8 + validateBindingState(ctx, binding, resourceSnapshot.Name, updateRun, 1) + + By("Updating the 5th clusterResourceBinding to Available") + meta.SetStatusCondition(&binding.Status.Conditions, generateTrueCondition(binding, placementv1beta1.ResourceBindingAvailable)) + Expect(k8sClient.Status().Update(ctx, binding)).Should(Succeed(), "failed to update the binding status") + + By("Validating the 5th cluster has succeeded and the stage waiting for AfterStageTask") + wantStatus.StagesStatus[1].Clusters[4].Conditions = append(wantStatus.StagesStatus[1].Clusters[4].Conditions, generateTrueCondition(updateRun, placementv1alpha1.ClusterUpdatingConditionSucceeded)) + stageWaitingCondition := generateFalseCondition(updateRun, placementv1alpha1.StageUpdatingConditionProgressing) + stageWaitingCondition.Reason = condition.StageUpdatingWaitingReason + wantStatus.StagesStatus[1].Conditions[0] = stageWaitingCondition // The progressing condition now becomes false with waiting reason. + wantStatus.StagesStatus[1].AfterStageTaskStatus[0].Conditions = append(wantStatus.StagesStatus[1].AfterStageTaskStatus[0].Conditions, + generateTrueCondition(updateRun, placementv1alpha1.AfterStageTaskConditionApprovalRequestCreated)) + validateClusterStagedUpdateRunStatus(ctx, updateRun, wantStatus, "") + }) + + It("Should complete the 2nd stage after the ApprovalRequest is approved and move on to the delete stage", func() { + By("Validating the approvalRequest has been created") + approvalRequest := &placementv1alpha1.ClusterApprovalRequest{} + wantApprovalRequest := &placementv1alpha1.ClusterApprovalRequest{ + ObjectMeta: metav1.ObjectMeta{ + Name: updateRun.Status.StagesStatus[1].AfterStageTaskStatus[0].ApprovalRequestName, + Labels: map[string]string{ + placementv1alpha1.TargetUpdatingStageNameLabel: updateRun.Status.StagesStatus[1].StageName, + placementv1alpha1.TargetUpdateRunLabel: updateRun.Name, + placementv1alpha1.IsLatestUpdateRunApprovalLabel: "true", + }, + }, + Spec: placementv1alpha1.ApprovalRequestSpec{ + TargetUpdateRun: updateRun.Name, + TargetStage: updateRun.Status.StagesStatus[1].StageName, + }, + } + Eventually(func() error { + if err := k8sClient.Get(ctx, types.NamespacedName{Name: wantApprovalRequest.Name}, approvalRequest); err != nil { + return err + } + if diff := cmp.Diff(wantApprovalRequest.Spec, approvalRequest.Spec); diff != "" { + return fmt.Errorf("approvalRequest has different spec (-want +got):\n%s", diff) + } + if diff := cmp.Diff(wantApprovalRequest.Labels, approvalRequest.Labels); diff != "" { + return fmt.Errorf("approvalRequest has different labels (-want +got):\n%s", diff) + } + return nil + }, timeout, interval).Should(Succeed(), "failed to validate the approvalRequest") + + By("Approving the approvalRequest") + meta.SetStatusCondition(&approvalRequest.Status.Conditions, generateTrueCondition(approvalRequest, placementv1alpha1.ApprovalRequestConditionApproved)) + Expect(k8sClient.Status().Update(ctx, approvalRequest)).Should(Succeed(), "failed to update the approvalRequest status") + + By("Validating the 2nd stage has completed and the delete stage has started") + wantStatus.StagesStatus[1].AfterStageTaskStatus[0].Conditions = append(wantStatus.StagesStatus[1].AfterStageTaskStatus[0].Conditions, + generateTrueCondition(updateRun, placementv1alpha1.AfterStageTaskConditionApprovalRequestApproved)) + wantStatus.StagesStatus[1].Conditions = append(wantStatus.StagesStatus[1].Conditions, generateTrueCondition(updateRun, placementv1alpha1.StageUpdatingConditionSucceeded)) + + wantStatus.DeletionStageStatus.Conditions = append(wantStatus.DeletionStageStatus.Conditions, generateTrueCondition(updateRun, placementv1alpha1.StageUpdatingConditionProgressing)) + for i := range wantStatus.DeletionStageStatus.Clusters { + wantStatus.DeletionStageStatus.Clusters[i].Conditions = append(wantStatus.DeletionStageStatus.Clusters[i].Conditions, generateTrueCondition(updateRun, placementv1alpha1.ClusterUpdatingConditionStarted)) + } + validateClusterStagedUpdateRunStatus(ctx, updateRun, wantStatus, "") + }) + + It("Should delete all the clusterResourceBindings in the delete stage and complete the update run", func() { + By("Validating the to-be-deleted bindings are all deleted") + Eventually(func() error { + for i := numTargetClusters; i < numTargetClusters+numUnscheduledClusters; i++ { + binding := &placementv1beta1.ClusterResourceBinding{} + err := k8sClient.Get(ctx, types.NamespacedName{Name: resourceBindings[i].Name}, binding) + if err == nil { + return fmt.Errorf("binding %s is not deleted", binding.Name) + } + if !apierrors.IsNotFound(err) { + return fmt.Errorf("Get binding %s does not return a not-found error: %w", binding.Name, err) + } + } + return nil + }, timeout, interval).Should(Succeed(), "failed to validate the deletion of the to-be-deleted bindings") + + By("Validating the delete stage and the clusterStagedUpdateRun has completed") + for i := range wantStatus.DeletionStageStatus.Clusters { + wantStatus.DeletionStageStatus.Clusters[i].Conditions = append(wantStatus.DeletionStageStatus.Clusters[i].Conditions, generateTrueCondition(updateRun, placementv1alpha1.ClusterUpdatingConditionSucceeded)) + } + wantStatus.DeletionStageStatus.Conditions = append(wantStatus.DeletionStageStatus.Conditions, generateTrueCondition(updateRun, placementv1alpha1.StageUpdatingConditionSucceeded)) + wantStatus.Conditions = append(wantStatus.Conditions, generateTrueCondition(updateRun, placementv1alpha1.StagedUpdateRunConditionSucceeded)) + validateClusterStagedUpdateRunStatus(ctx, updateRun, wantStatus, "") + }) + }) + + Context("Cluster staged update run should abort the execution within a failed updating stage", Ordered, func() { + It("Should keep waiting for the 1st cluster while it's not available", func() { + By("Validating the 1st clusterResourceBinding is updated to Bound") + binding := resourceBindings[numTargetClusters-1] // cluster-9 + validateBindingState(ctx, binding, resourceSnapshot.Name, updateRun, 0) + + By("Updating the 1st clusterResourceBinding to ApplyFailed") + meta.SetStatusCondition(&binding.Status.Conditions, generateFalseCondition(binding, placementv1beta1.ResourceBindingApplied)) + Expect(k8sClient.Status().Update(ctx, binding)).Should(Succeed(), "failed to update the binding status") + + By("Validating the updateRun is stuck in the 1st cluster of the 1st stage") + validateClusterStagedUpdateRunStatus(ctx, updateRun, wantStatus, "") + validateClusterStagedUpdateRunStatusConsistently(ctx, updateRun, wantStatus, "") + }) + + It("Should abort the execution if the binding has unexpected state", func() { + By("Validating the 1st clusterResourceBinding is updated to Bound") + binding := resourceBindings[numTargetClusters-1] // cluster-9 + validateBindingState(ctx, binding, resourceSnapshot.Name, updateRun, 0) + + By("Updating the 1st clusterResourceBinding's state to Scheduled (from Bound)") + binding.Spec.State = placementv1beta1.BindingStateScheduled + Expect(k8sClient.Update(ctx, binding)).Should(Succeed(), "failed to update the binding state") + + By("Validating the updateRun has failed") + wantStatus.StagesStatus[0].Clusters[0].Conditions = append(wantStatus.StagesStatus[0].Clusters[0].Conditions, generateFalseCondition(updateRun, placementv1alpha1.ClusterUpdatingConditionSucceeded)) + wantStatus.StagesStatus[0].Conditions = append(wantStatus.StagesStatus[0].Conditions, generateFalseCondition(updateRun, placementv1alpha1.StageUpdatingConditionSucceeded)) + wantStatus.Conditions = append(wantStatus.Conditions, generateFalseCondition(updateRun, placementv1alpha1.StagedUpdateRunConditionSucceeded)) + validateClusterStagedUpdateRunStatus(ctx, updateRun, wantStatus, "") + }) + }) +}) + +func validateBindingState(ctx context.Context, binding *placementv1beta1.ClusterResourceBinding, resourceSnapshotName string, updateRun *placementv1alpha1.ClusterStagedUpdateRun, stage int) { + Eventually(func() error { + if err := k8sClient.Get(ctx, types.NamespacedName{Name: binding.Name}, binding); err != nil { + return err + } + + if binding.Spec.State != placementv1beta1.BindingStateBound { + return fmt.Errorf("binding %s is not in Bound state, got %s", binding.Name, binding.Spec.State) + } + if binding.Spec.ResourceSnapshotName != resourceSnapshotName { + return fmt.Errorf("binding %s has different resourceSnapshot name, got %s, want %s", binding.Name, binding.Spec.ResourceSnapshotName, resourceSnapshotName) + } + if diff := cmp.Diff(binding.Spec.ResourceOverrideSnapshots, updateRun.Status.StagesStatus[stage].Clusters[0].ResourceOverrideSnapshots); diff != "" { + return fmt.Errorf("binding %s has different resourceOverrideSnapshots (-want +got):\n%s", binding.Name, diff) + } + if diff := cmp.Diff(binding.Spec.ClusterResourceOverrideSnapshots, updateRun.Status.StagesStatus[stage].Clusters[0].ClusterResourceOverrideSnapshots); diff != "" { + return fmt.Errorf("binding %s has different clusterResourceOverrideSnapshots(-want +got):\n%s", binding.Name, diff) + } + if diff := cmp.Diff(binding.Spec.ApplyStrategy, updateRun.Status.ApplyStrategy); diff != "" { + return fmt.Errorf("binding %s has different applyStrategy (-want +got):\n%s", binding.Name, diff) + } + return nil + }, timeout, interval).Should(Succeed(), "failed to validate the binding state") +} diff --git a/pkg/controllers/updaterun/execution_test.go b/pkg/controllers/updaterun/execution_test.go new file mode 100644 index 000000000..6023ab52f --- /dev/null +++ b/pkg/controllers/updaterun/execution_test.go @@ -0,0 +1,334 @@ +/* +Copyright (c) Microsoft Corporation. +Licensed under the MIT license. +*/ + +package updaterun + +import ( + "testing" + + "k8s.io/apimachinery/pkg/api/meta" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + placementv1alpha1 "go.goms.io/fleet/apis/placement/v1alpha1" + placementv1beta1 "go.goms.io/fleet/apis/placement/v1beta1" + "go.goms.io/fleet/pkg/utils/condition" +) + +func TestIsBindingSyncedWithClusterStatus(t *testing.T) { + tests := []struct { + name string + updateRun *placementv1alpha1.ClusterStagedUpdateRun + binding *placementv1beta1.ClusterResourceBinding + cluster *placementv1alpha1.ClusterUpdatingStatus + wantEqual bool + }{ + { + name: "isBindingSyncedWithClusterStatus should return false if binding and updateRun have different resourceSnapshot", + updateRun: &placementv1alpha1.ClusterStagedUpdateRun{ + Spec: placementv1alpha1.StagedUpdateRunSpec{ + ResourceSnapshotIndex: "test-snapshot", + }, + }, + binding: &placementv1beta1.ClusterResourceBinding{ + Spec: placementv1beta1.ResourceBindingSpec{ + ResourceSnapshotName: "test-snapshot-1", + }, + }, + wantEqual: false, + }, + { + name: "isBindingSyncedWithClusterStatus should return false if binding and cluster status have different resourceOverrideSnapshot list", + updateRun: &placementv1alpha1.ClusterStagedUpdateRun{ + Spec: placementv1alpha1.StagedUpdateRunSpec{ + ResourceSnapshotIndex: "test-snapshot", + }, + }, + binding: &placementv1beta1.ClusterResourceBinding{ + Spec: placementv1beta1.ResourceBindingSpec{ + ResourceSnapshotName: "test-snapshot", + ResourceOverrideSnapshots: []placementv1beta1.NamespacedName{ + { + Name: "ro2", + Namespace: "ns2", + }, + { + Name: "ro1", + Namespace: "ns1", + }, + }, + }, + }, + cluster: &placementv1alpha1.ClusterUpdatingStatus{ + ResourceOverrideSnapshots: []placementv1beta1.NamespacedName{ + { + Name: "ro1", + Namespace: "ns1", + }, + { + Name: "ro2", + Namespace: "ns2", + }, + }, + }, + wantEqual: false, + }, + { + name: "isBindingSyncedWithClusterStatus should return false if binding and cluster status have different clusterResourceOverrideSnapshot list", + updateRun: &placementv1alpha1.ClusterStagedUpdateRun{ + Spec: placementv1alpha1.StagedUpdateRunSpec{ + ResourceSnapshotIndex: "test-snapshot", + }, + }, + binding: &placementv1beta1.ClusterResourceBinding{ + Spec: placementv1beta1.ResourceBindingSpec{ + ResourceSnapshotName: "test-snapshot", + ResourceOverrideSnapshots: []placementv1beta1.NamespacedName{ + {Name: "ro1", Namespace: "ns1"}, + {Name: "ro2", Namespace: "ns2"}, + }, + ClusterResourceOverrideSnapshots: []string{"cr1", "cr2"}, + }, + }, + cluster: &placementv1alpha1.ClusterUpdatingStatus{ + ResourceOverrideSnapshots: []placementv1beta1.NamespacedName{ + {Name: "ro1", Namespace: "ns1"}, + {Name: "ro2", Namespace: "ns2"}, + }, + ClusterResourceOverrideSnapshots: []string{"cr1"}, + }, + wantEqual: false, + }, + { + name: "isBindingSyncedWithClusterStatus should return false if binding and updateRun have different applyStrategy", + updateRun: &placementv1alpha1.ClusterStagedUpdateRun{ + Spec: placementv1alpha1.StagedUpdateRunSpec{ + ResourceSnapshotIndex: "test-snapshot", + }, + Status: placementv1alpha1.StagedUpdateRunStatus{ + ApplyStrategy: &placementv1beta1.ApplyStrategy{ + Type: placementv1beta1.ApplyStrategyTypeClientSideApply, + }, + }, + }, + binding: &placementv1beta1.ClusterResourceBinding{ + Spec: placementv1beta1.ResourceBindingSpec{ + ResourceSnapshotName: "test-snapshot", + ResourceOverrideSnapshots: []placementv1beta1.NamespacedName{ + {Name: "ro1", Namespace: "ns1"}, + {Name: "ro2", Namespace: "ns2"}, + }, + ClusterResourceOverrideSnapshots: []string{"cr1", "cr2"}, + ApplyStrategy: &placementv1beta1.ApplyStrategy{ + Type: placementv1beta1.ApplyStrategyTypeReportDiff, + }, + }, + }, + cluster: &placementv1alpha1.ClusterUpdatingStatus{ + ResourceOverrideSnapshots: []placementv1beta1.NamespacedName{ + {Name: "ro1", Namespace: "ns1"}, + {Name: "ro2", Namespace: "ns2"}, + }, + ClusterResourceOverrideSnapshots: []string{"cr1", "cr2"}, + }, + wantEqual: false, + }, + { + name: "isBindingSyncedWithClusterStatus should return true if resourceSnapshot, applyStrategy, and override lists are all deep equal", + updateRun: &placementv1alpha1.ClusterStagedUpdateRun{ + Spec: placementv1alpha1.StagedUpdateRunSpec{ + ResourceSnapshotIndex: "test-snapshot", + }, + Status: placementv1alpha1.StagedUpdateRunStatus{ + ApplyStrategy: &placementv1beta1.ApplyStrategy{ + Type: placementv1beta1.ApplyStrategyTypeReportDiff, + }, + }, + }, + binding: &placementv1beta1.ClusterResourceBinding{ + Spec: placementv1beta1.ResourceBindingSpec{ + ResourceSnapshotName: "test-snapshot", + ResourceOverrideSnapshots: []placementv1beta1.NamespacedName{ + {Name: "ro1", Namespace: "ns1"}, + {Name: "ro2", Namespace: "ns2"}, + }, + ClusterResourceOverrideSnapshots: []string{"cr1", "cr2"}, + ApplyStrategy: &placementv1beta1.ApplyStrategy{ + Type: placementv1beta1.ApplyStrategyTypeReportDiff, + }, + }, + }, + cluster: &placementv1alpha1.ClusterUpdatingStatus{ + ResourceOverrideSnapshots: []placementv1beta1.NamespacedName{ + {Name: "ro1", Namespace: "ns1"}, + {Name: "ro2", Namespace: "ns2"}, + }, + ClusterResourceOverrideSnapshots: []string{"cr1", "cr2"}, + }, + wantEqual: true, + }, + } + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + got := isBindingSyncedWithClusterStatus(test.updateRun, test.binding, test.cluster) + if got != test.wantEqual { + t.Fatalf("isBindingSyncedWithClusterStatus() got %v; want %v", got, test.wantEqual) + } + }) + } +} + +func TestCheckClusterUpdateResult(t *testing.T) { + updatingStage := &placementv1alpha1.StageUpdatingStatus{ + StageName: "test-stage", + } + updateRun := &placementv1alpha1.ClusterStagedUpdateRun{ + ObjectMeta: metav1.ObjectMeta{ + Generation: 1, + }, + } + tests := []struct { + name string + binding *placementv1beta1.ClusterResourceBinding + clusterStatus *placementv1alpha1.ClusterUpdatingStatus + wantSucceeded bool + wantErr bool + }{ + { + name: "checkClusterUpdateResult should return true if the binding has available condition", + binding: &placementv1beta1.ClusterResourceBinding{ + ObjectMeta: metav1.ObjectMeta{Generation: 1}, + Status: placementv1beta1.ResourceBindingStatus{ + Conditions: []metav1.Condition{ + { + Type: string(placementv1beta1.ResourceBindingAvailable), + Status: metav1.ConditionTrue, + ObservedGeneration: 1, + Reason: condition.AvailableReason, + }, + }, + }, + }, + clusterStatus: &placementv1alpha1.ClusterUpdatingStatus{ClusterName: "test-cluster"}, + wantSucceeded: true, + wantErr: false, + }, + { + name: "checkClusterUpdateResult should return false and error if the binding has false overridden condition", + binding: &placementv1beta1.ClusterResourceBinding{ + ObjectMeta: metav1.ObjectMeta{Generation: 1}, + Status: placementv1beta1.ResourceBindingStatus{ + Conditions: []metav1.Condition{ + { + Type: string(placementv1beta1.ResourceBindingOverridden), + Status: metav1.ConditionFalse, + ObservedGeneration: 1, + Reason: condition.OverriddenFailedReason, + }, + }, + }, + }, + clusterStatus: &placementv1alpha1.ClusterUpdatingStatus{ClusterName: "test-cluster"}, + wantSucceeded: false, + wantErr: true, + }, + { + name: "checkClusterUpdateResult should return false and error if the binding has false workSynchronized condition", + binding: &placementv1beta1.ClusterResourceBinding{ + ObjectMeta: metav1.ObjectMeta{Generation: 1}, + Status: placementv1beta1.ResourceBindingStatus{ + Conditions: []metav1.Condition{ + { + Type: string(placementv1beta1.ResourceBindingWorkSynchronized), + Status: metav1.ConditionFalse, + ObservedGeneration: 1, + Reason: condition.WorkNotSynchronizedYetReason, + }, + }, + }, + }, + clusterStatus: &placementv1alpha1.ClusterUpdatingStatus{ClusterName: "test-cluster"}, + wantSucceeded: false, + wantErr: true, + }, + { + name: "checkClusterUpdateResult should return false and error if the binding has false applied condition", + binding: &placementv1beta1.ClusterResourceBinding{ + ObjectMeta: metav1.ObjectMeta{Generation: 1}, + Status: placementv1beta1.ResourceBindingStatus{ + Conditions: []metav1.Condition{ + { + Type: string(placementv1beta1.ResourceBindingApplied), + Status: metav1.ConditionFalse, + ObservedGeneration: 1, + Reason: condition.ApplyFailedReason, + }, + }, + }, + }, + clusterStatus: &placementv1alpha1.ClusterUpdatingStatus{ClusterName: "test-cluster"}, + wantSucceeded: false, + wantErr: true, + }, + { + name: "checkClusterUpdateResult should return false but no error if the binding is not available yet", + binding: &placementv1beta1.ClusterResourceBinding{ + ObjectMeta: metav1.ObjectMeta{Generation: 1}, + Status: placementv1beta1.ResourceBindingStatus{ + Conditions: []metav1.Condition{ + { + Type: string(placementv1beta1.ResourceBindingOverridden), + Status: metav1.ConditionTrue, + ObservedGeneration: 1, + Reason: condition.OverriddenSucceededReason, + }, + { + Type: string(placementv1beta1.ResourceBindingWorkSynchronized), + Status: metav1.ConditionTrue, + ObservedGeneration: 1, + Reason: condition.WorkSynchronizedReason, + }, + { + Type: string(placementv1beta1.ResourceBindingApplied), + Status: metav1.ConditionTrue, + ObservedGeneration: 1, + Reason: condition.ApplySucceededReason, + }, + }, + }, + }, + clusterStatus: &placementv1alpha1.ClusterUpdatingStatus{ClusterName: "test-cluster"}, + wantSucceeded: false, + wantErr: false, + }, + { + name: "checkClusterUpdateResult should return false but no error if the binding does not have any conditions", + binding: &placementv1beta1.ClusterResourceBinding{ + ObjectMeta: metav1.ObjectMeta{Generation: 1}, + Status: placementv1beta1.ResourceBindingStatus{ + Conditions: []metav1.Condition{}, + }, + }, + clusterStatus: &placementv1alpha1.ClusterUpdatingStatus{ClusterName: "test-cluster"}, + wantSucceeded: false, + wantErr: false, + }, + } + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + gotSucceeded, gotErr := checkClusterUpdateResult(test.binding, test.clusterStatus, updatingStage, updateRun) + if gotSucceeded != test.wantSucceeded { + t.Fatalf("checkClusterUpdateResult() got %v; want %v", gotSucceeded, test.wantSucceeded) + } + if (gotErr != nil) != test.wantErr { + t.Fatalf("checkClusterUpdateResult() got error %v; want error %v", gotErr, test.wantErr) + } + if test.wantSucceeded { + if !condition.IsConditionStatusTrue(meta.FindStatusCondition(test.clusterStatus.Conditions, string(placementv1alpha1.ClusterUpdatingConditionSucceeded)), updateRun.Generation) { + t.Fatalf("checkClusterUpdateResult() failed to set ClusterUpdatingConditionSucceeded condition") + } + } + }) + } +} diff --git a/pkg/controllers/updaterun/initialization_integration_test.go b/pkg/controllers/updaterun/initialization_integration_test.go index 97b931818..0f86f5d1a 100644 --- a/pkg/controllers/updaterun/initialization_integration_test.go +++ b/pkg/controllers/updaterun/initialization_integration_test.go @@ -32,9 +32,15 @@ var ( cmpOptions = []cmp.Option{ cmpopts.IgnoreFields(metav1.Condition{}, "LastTransitionTime"), cmpopts.IgnoreFields(metav1.Condition{}, "Message"), + cmpopts.IgnoreFields(placementv1alpha1.StageUpdatingStatus{}, "StartTime", "EndTime"), } ) +const ( + regionEastus = "eastus" + regionWestus = "westus" +) + var _ = Describe("Updaterun initialization tests", func() { var updateRun *placementv1alpha1.ClusterStagedUpdateRun var crp *placementv1beta1.ClusterResourcePlacement @@ -64,9 +70,9 @@ var _ = Describe("Updaterun initialization tests", func() { targetClusters = make([]*clusterv1beta1.MemberCluster, numTargetClusters) for i := range targetClusters { // split the clusters into 2 regions - region := "eastus" + region := regionEastus if i%2 == 0 { - region = "westus" + region = regionWestus } // reserse the order of the clusters by index targetClusters[i] = generateTestMemberCluster(numTargetClusters-1-i, "cluster-"+strconv.Itoa(i), map[string]string{"group": "prod", "region": region}) @@ -97,7 +103,8 @@ var _ = Describe("Updaterun initialization tests", func() { resourceSnapshot = generateTestClusterResourceSnapshot() // Set smaller wait time for testing - stageUpdatingWaitTime = time.Second * 2 + stageUpdatingWaitTime = time.Second * 3 + clusterUpdatingWaitTime = time.Second * 2 }) AfterEach(func() { @@ -620,29 +627,12 @@ var _ = Describe("Updaterun initialization tests", func() { Expect(k8sClient.Create(ctx, updateRun)).To(Succeed()) By("Validating the clusterStagedUpdateRun stats") - want := generateSucceededInitializationStatus(crp, updateRun, policySnapshot, updateStrategy, clusterResourceOverride) - Eventually(func() error { - if err := k8sClient.Get(ctx, updateRunNamespacedName, updateRun); err != nil { - return err - } - - if diff := cmp.Diff(*want, updateRun.Status, cmpOptions...); diff != "" { - return fmt.Errorf("status mismatch: (-want +got):\n%s", diff) - } - - return nil - }, timeout, interval).Should(Succeed(), "failed to validate the clusterStagedUpdateRun initialized successfully") + initialized := generateSucceededInitializationStatus(crp, updateRun, policySnapshot, updateStrategy, clusterResourceOverride) + want := generateExecutionStartedStatus(updateRun, initialized) + validateClusterStagedUpdateRunStatus(ctx, updateRun, want, "") By("Validating the clusterStagedUpdateRun initialized consistently") - Consistently(func() error { - if err := k8sClient.Get(ctx, updateRunNamespacedName, updateRun); err != nil { - return err - } - if diff := cmp.Diff(*want, updateRun.Status, cmpOptions...); diff != "" { - return fmt.Errorf("status mismatch: (-want +got):\n%s", diff) - } - return nil - }, duration, interval).Should(Succeed(), "failed to validate the clusterStagedUpdateRun initialized consistently") + validateClusterStagedUpdateRunStatusConsistently(ctx, updateRun, want, "") }) }) }) @@ -721,3 +711,16 @@ func generateSucceededInitializationStatus( }, } } + +func generateExecutionStartedStatus( + updateRun *placementv1alpha1.ClusterStagedUpdateRun, + initialized *placementv1alpha1.StagedUpdateRunStatus, +) *placementv1alpha1.StagedUpdateRunStatus { + // Mark updateRun execution has started. + initialized.Conditions = append(initialized.Conditions, generateTrueCondition(updateRun, placementv1alpha1.StagedUpdateRunConditionProgressing)) + // Mark updateRun 1st stage has started. + initialized.StagesStatus[0].Conditions = append(initialized.StagesStatus[0].Conditions, generateTrueCondition(updateRun, placementv1alpha1.StageUpdatingConditionProgressing)) + // Mark updateRun 1st cluster in the 1st stage has started. + initialized.StagesStatus[0].Clusters[0].Conditions = []metav1.Condition{generateTrueCondition(updateRun, placementv1alpha1.ClusterUpdatingConditionStarted)} + return initialized +} diff --git a/pkg/controllers/updaterun/validation.go b/pkg/controllers/updaterun/validation.go index 412498970..bff1086f9 100644 --- a/pkg/controllers/updaterun/validation.go +++ b/pkg/controllers/updaterun/validation.go @@ -11,7 +11,6 @@ import ( "reflect" "k8s.io/apimachinery/pkg/api/meta" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/klog/v2" placementv1alpha1 "go.goms.io/fleet/apis/placement/v1alpha1" @@ -305,30 +304,6 @@ func validateDeleteStageStatus( klog.ErrorS(failedErr, "The delete stage has failed", "stageCond", deleteStageFinishedCond, "clusterStagedUpdateRun", updateRunRef) return -1, fmt.Errorf("%w: %s", errStagedUpdatedAborted, failedErr.Error()) } - // The delete stage is still updating. - if condition.IsConditionStatusTrue(deleteStageProgressingCond, updateRun.Generation) { - klog.InfoS("The delete stage is updating", "clusterStagedUpdateRun", updateRunRef) - return totalStages, nil - } - // All stages have finished, but the delete stage is not active or finished. - unexpectedErr := controller.NewUnexpectedBehaviorError(fmt.Errorf("the delete stage is not active, but all stages finished")) - klog.ErrorS(unexpectedErr, "There is no stage active", "clusterStagedUpdateRun", updateRunRef) - return -1, fmt.Errorf("%w: %s", errStagedUpdatedAborted, unexpectedErr.Error()) -} - -// recordUpdateRunFailed records the failed condition in the ClusterStagedUpdateRun status. -func (r *Reconciler) recordUpdateRunFailed(ctx context.Context, updateRun *placementv1alpha1.ClusterStagedUpdateRun, message string) error { - meta.SetStatusCondition(&updateRun.Status.Conditions, metav1.Condition{ - Type: string(placementv1alpha1.StagedUpdateRunConditionSucceeded), - Status: metav1.ConditionFalse, - ObservedGeneration: updateRun.Generation, - Reason: condition.UpdateRunFailedReason, - Message: message, - }) - if updateErr := r.Client.Status().Update(ctx, updateRun); updateErr != nil { - klog.ErrorS(updateErr, "Failed to update the ClusterStagedUpdateRun status as failed", "clusterStagedUpdateRun", klog.KObj(updateRun)) - // updateErr can be retried. - return controller.NewUpdateIgnoreConflictError(updateErr) - } - return nil + // The delete stage is still updating or just to start. + return totalStages, nil } diff --git a/pkg/controllers/updaterun/validation_integration_test.go b/pkg/controllers/updaterun/validation_integration_test.go index de5043c78..2d2efd03e 100644 --- a/pkg/controllers/updaterun/validation_integration_test.go +++ b/pkg/controllers/updaterun/validation_integration_test.go @@ -57,9 +57,9 @@ var _ = Describe("UpdateRun validation tests", func() { targetClusters = make([]*clusterv1beta1.MemberCluster, numTargetClusters) for i := range targetClusters { // split the clusters into 2 regions - region := "eastus" + region := regionEastus if i%2 == 0 { - region = "westus" + region = regionWestus } // reserse the order of the clusters by index targetClusters[i] = generateTestMemberCluster(numTargetClusters-1-i, "cluster-"+strconv.Itoa(i), map[string]string{"group": "prod", "region": region}) @@ -90,7 +90,8 @@ var _ = Describe("UpdateRun validation tests", func() { resourceSnapshot = generateTestClusterResourceSnapshot() // Set smaller wait time for testing - stageUpdatingWaitTime = time.Second * 2 + stageUpdatingWaitTime = time.Second * 3 + clusterUpdatingWaitTime = time.Second * 2 By("Creating a new clusterResourcePlacement") Expect(k8sClient.Create(ctx, crp)).To(Succeed()) @@ -133,8 +134,9 @@ var _ = Describe("UpdateRun validation tests", func() { Expect(k8sClient.Create(ctx, updateRun)).To(Succeed()) By("Validating the initialization succeeded") - wantStatus = generateSucceededInitializationStatus(crp, updateRun, policySnapshot, updateStrategy, clusterResourceOverride) - validateSucceededInitializationStatus(ctx, updateRun, wantStatus) + initialized := generateSucceededInitializationStatus(crp, updateRun, policySnapshot, updateStrategy, clusterResourceOverride) + wantStatus = generateExecutionStartedStatus(updateRun, initialized) + validateClusterStagedUpdateRunStatus(ctx, updateRun, wantStatus, "") }) AfterEach(func() { @@ -185,7 +187,7 @@ var _ = Describe("UpdateRun validation tests", func() { By("Validating the validation failed") wantStatus = generateFailedValidationStatus(updateRun, wantStatus) - validateFailedValidationStatus(ctx, updateRun, wantStatus, "parent clusterResourcePlacement not found") + validateClusterStagedUpdateRunStatus(ctx, updateRun, wantStatus, "parent clusterResourcePlacement not found") }) It("Should fail to validate if CRP does not have external rollout strategy type", func() { @@ -195,7 +197,7 @@ var _ = Describe("UpdateRun validation tests", func() { By("Validating the validation failed") wantStatus = generateFailedValidationStatus(updateRun, wantStatus) - validateFailedValidationStatus(ctx, updateRun, wantStatus, + validateClusterStagedUpdateRunStatus(ctx, updateRun, wantStatus, "parent clusterResourcePlacement does not have an external rollout strategy") }) @@ -206,7 +208,7 @@ var _ = Describe("UpdateRun validation tests", func() { By("Validating the validation failed") wantStatus = generateFailedValidationStatus(updateRun, wantStatus) - validateFailedValidationStatus(ctx, updateRun, wantStatus, "the applyStrategy in the clusterStagedUpdateRun is outdated") + validateClusterStagedUpdateRunStatus(ctx, updateRun, wantStatus, "the applyStrategy in the clusterStagedUpdateRun is outdated") }) }) @@ -217,7 +219,7 @@ var _ = Describe("UpdateRun validation tests", func() { By("Validating the validation failed") wantStatus = generateFailedValidationStatus(updateRun, wantStatus) - validateFailedValidationStatus(ctx, updateRun, wantStatus, "no latest policy snapshot associated") + validateClusterStagedUpdateRunStatus(ctx, updateRun, wantStatus, "no latest policy snapshot associated") }) It("Should fail to validate if the latest policySnapshot has changed", func() { @@ -239,7 +241,7 @@ var _ = Describe("UpdateRun validation tests", func() { By("Validating the validation failed") wantStatus = generateFailedValidationStatus(updateRun, wantStatus) - validateFailedValidationStatus(ctx, updateRun, wantStatus, + validateClusterStagedUpdateRunStatus(ctx, updateRun, wantStatus, "the policy snapshot index used in the clusterStagedUpdateRun is outdated") By("Deleting the new policySnapshot") @@ -253,7 +255,7 @@ var _ = Describe("UpdateRun validation tests", func() { By("Validating the validation failed") wantStatus = generateFailedValidationStatus(updateRun, wantStatus) - validateFailedValidationStatus(ctx, updateRun, wantStatus, + validateClusterStagedUpdateRunStatus(ctx, updateRun, wantStatus, "the cluster count initialized in the clusterStagedUpdateRun is outdated") }) }) @@ -267,7 +269,7 @@ var _ = Describe("UpdateRun validation tests", func() { By("Validating the validation failed") wantStatus = generateFailedValidationStatus(updateRun, wantStatus) wantStatus.StagedUpdateStrategySnapshot = nil - validateFailedValidationStatus(ctx, updateRun, wantStatus, "the clusterStagedUpdateRun has nil stagedUpdateStrategySnapshot") + validateClusterStagedUpdateRunStatus(ctx, updateRun, wantStatus, "the clusterStagedUpdateRun has nil stagedUpdateStrategySnapshot") }) It("Should fail to validate if the StagesStatus is nil", func() { @@ -278,7 +280,7 @@ var _ = Describe("UpdateRun validation tests", func() { By("Validating the validation failed") wantStatus = generateFailedValidationStatus(updateRun, wantStatus) wantStatus.StagesStatus = nil - validateFailedValidationStatus(ctx, updateRun, wantStatus, "the clusterStagedUpdateRun has nil stagesStatus") + validateClusterStagedUpdateRunStatus(ctx, updateRun, wantStatus, "the clusterStagedUpdateRun has nil stagesStatus") }) It("Should fail to validate if the DeletionStageStatus is nil", func() { @@ -289,7 +291,7 @@ var _ = Describe("UpdateRun validation tests", func() { By("Validating the validation failed") wantStatus = generateFailedValidationStatus(updateRun, wantStatus) wantStatus.DeletionStageStatus = nil - validateFailedValidationStatus(ctx, updateRun, wantStatus, "the clusterStagedUpdateRun has nil deletionStageStatus") + validateClusterStagedUpdateRunStatus(ctx, updateRun, wantStatus, "the clusterStagedUpdateRun has nil deletionStageStatus") }) It("Should fail to validate if the number of stages has changed", func() { @@ -316,7 +318,7 @@ var _ = Describe("UpdateRun validation tests", func() { }, }, }) - validateFailedValidationStatus(ctx, updateRun, wantStatus, "the number of stages in the clusterStagedUpdateRun has changed") + validateClusterStagedUpdateRunStatus(ctx, updateRun, wantStatus, "the number of stages in the clusterStagedUpdateRun has changed") }) It("Should fail to validate if stage name has changed", func() { @@ -327,7 +329,7 @@ var _ = Describe("UpdateRun validation tests", func() { By("Validating the validation failed") wantStatus = generateFailedValidationStatus(updateRun, wantStatus) wantStatus.StagedUpdateStrategySnapshot.Stages[0].Name = "stage3" - validateFailedValidationStatus(ctx, updateRun, wantStatus, "index `0` stage name in the clusterStagedUpdateRun has changed") + validateClusterStagedUpdateRunStatus(ctx, updateRun, wantStatus, "index `0` stage name in the clusterStagedUpdateRun has changed") }) It("Should fail to validate if the number of clusters has changed in a stage", func() { @@ -336,7 +338,7 @@ var _ = Describe("UpdateRun validation tests", func() { By("Validating the validation failed") wantStatus = generateFailedValidationStatus(updateRun, wantStatus) - validateFailedValidationStatus(ctx, updateRun, wantStatus, "the number of clusters in index `1` stage has changed") + validateClusterStagedUpdateRunStatus(ctx, updateRun, wantStatus, "the number of clusters in index `1` stage has changed") }) It("Should fail to validate if the cluster name has changed in a stage", func() { @@ -349,15 +351,16 @@ var _ = Describe("UpdateRun validation tests", func() { By("Validating the validation failed") wantStatus = generateFailedValidationStatus(updateRun, wantStatus) - validateFailedValidationStatus(ctx, updateRun, wantStatus, "the `3` cluster in the `0` stage has changed") + validateClusterStagedUpdateRunStatus(ctx, updateRun, wantStatus, "the `3` cluster in the `0` stage has changed") }) }) }) -func validateSucceededInitializationStatus( +func validateClusterStagedUpdateRunStatus( ctx context.Context, updateRun *placementv1alpha1.ClusterStagedUpdateRun, want *placementv1alpha1.StagedUpdateRunStatus, + message string, ) { Eventually(func() error { if err := k8sClient.Get(ctx, updateRunNamespacedName, updateRun); err != nil { @@ -367,17 +370,23 @@ func validateSucceededInitializationStatus( if diff := cmp.Diff(*want, updateRun.Status, cmpOptions...); diff != "" { return fmt.Errorf("status mismatch: (-want +got):\n%s", diff) } + if message != "" { + succeedCond := meta.FindStatusCondition(updateRun.Status.Conditions, string(placementv1alpha1.StagedUpdateRunConditionSucceeded)) + if !strings.Contains(succeedCond.Message, message) { + return fmt.Errorf("condition message mismatch: got %s, want %s", succeedCond.Message, message) + } + } return nil - }, timeout, interval).Should(Succeed(), "failed to validate the clusterStagedUpdateRun initialized successfully") + }, timeout, interval).Should(Succeed(), "failed to validate the clusterStagedUpdateRun status") } -func validateFailedValidationStatus( +func validateClusterStagedUpdateRunStatusConsistently( ctx context.Context, updateRun *placementv1alpha1.ClusterStagedUpdateRun, want *placementv1alpha1.StagedUpdateRunStatus, message string, ) { - Eventually(func() error { + Consistently(func() error { if err := k8sClient.Get(ctx, updateRunNamespacedName, updateRun); err != nil { return err } @@ -385,12 +394,14 @@ func validateFailedValidationStatus( if diff := cmp.Diff(*want, updateRun.Status, cmpOptions...); diff != "" { return fmt.Errorf("status mismatch: (-want +got):\n%s", diff) } - succeedCond := meta.FindStatusCondition(updateRun.Status.Conditions, string(placementv1alpha1.StagedUpdateRunConditionSucceeded)) - if !strings.Contains(succeedCond.Message, message) { - return fmt.Errorf("condition message mismatch: got %s, want %s", succeedCond.Message, message) + if message != "" { + succeedCond := meta.FindStatusCondition(updateRun.Status.Conditions, string(placementv1alpha1.StagedUpdateRunConditionSucceeded)) + if !strings.Contains(succeedCond.Message, message) { + return fmt.Errorf("condition message mismatch: got %s, want %s", succeedCond.Message, message) + } } return nil - }, timeout, interval).Should(Succeed(), "failed to validate the clusterStagedUpdateRun failed to validate") + }, duration, interval).Should(Succeed(), "failed to validate the clusterStagedUpdateRun status consistently") } func generateFailedValidationStatus( diff --git a/pkg/controllers/updaterun/validation_test.go b/pkg/controllers/updaterun/validation_test.go index c2330e74d..ffe05cb1a 100644 --- a/pkg/controllers/updaterun/validation_test.go +++ b/pkg/controllers/updaterun/validation_test.go @@ -384,7 +384,7 @@ func TestValidateDeleteStageStatus(t *testing.T) { wantUpdatingStageIndex: -1, }, { - name: "validateDeleteStageStatus should return totalStaged if the delete stage is still running", + name: "validateDeleteStageStatus should return totalStages if the delete stage is still running", updatingStageIndex: -1, lastFinishedStageIndex: totalStages - 1, deleteStageStatus: &placementv1alpha1.StageUpdatingStatus{ @@ -395,12 +395,12 @@ func TestValidateDeleteStageStatus(t *testing.T) { wantUpdatingStageIndex: totalStages, }, { - name: "validateDeleteStageStatus should return error if all updating stages have finished but the delete stage is not active or finished", + name: "validateDeleteStageStatus should return totalStages if all updating stages have finished but the delete stage is not active or finished", updatingStageIndex: -1, lastFinishedStageIndex: totalStages - 1, deleteStageStatus: &placementv1alpha1.StageUpdatingStatus{StageName: "delete-stage"}, - wantErr: wrapErr(true, fmt.Errorf("the delete stage is not active, but all stages finished")), - wantUpdatingStageIndex: -1, + wantErr: nil, + wantUpdatingStageIndex: totalStages, }, } for _, test := range tests { From a369ee53baa50526525417bb8893ba565f3c9fe4 Mon Sep 17 00:00:00 2001 From: Wantong Jiang Date: Fri, 27 Dec 2024 01:06:21 +0000 Subject: [PATCH 2/2] fix comments --- pkg/controllers/updaterun/execution.go | 2 ++ .../updaterun/execution_integration_test.go | 18 +++++++++--------- 2 files changed, 11 insertions(+), 9 deletions(-) diff --git a/pkg/controllers/updaterun/execution.go b/pkg/controllers/updaterun/execution.go index 3c2d6693a..77c02fc9c 100644 --- a/pkg/controllers/updaterun/execution.go +++ b/pkg/controllers/updaterun/execution.go @@ -35,6 +35,8 @@ var ( ) // execute executes the update run by updating the clusters in the updating stage specified by updatingStageIndex. +// It returns a boolean indicating if the clusterStageUpdateRun execution is completed, +// the time to wait before rechecking the cluster update status, and any error encountered. func (r *Reconciler) execute( ctx context.Context, updateRun *placementv1alpha1.ClusterStagedUpdateRun, diff --git a/pkg/controllers/updaterun/execution_integration_test.go b/pkg/controllers/updaterun/execution_integration_test.go index 62cedadb5..0ed3f4bea 100644 --- a/pkg/controllers/updaterun/execution_integration_test.go +++ b/pkg/controllers/updaterun/execution_integration_test.go @@ -182,7 +182,7 @@ var _ = Describe("UpdateRun execution tests", func() { }) Context("Cluster staged update run should update clusters one by one", Ordered, func() { - It("Should mark the 1st cluster in the 1st stage as succeeded", func() { + It("Should mark the 1st cluster in the 1st stage as succeeded after marking the binding available", func() { By("Validating the 1st clusterResourceBinding is updated to Bound") binding := resourceBindings[numTargetClusters-1] // cluster-9 validateBindingState(ctx, binding, resourceSnapshot.Name, updateRun, 0) @@ -200,7 +200,7 @@ var _ = Describe("UpdateRun execution tests", func() { Expect(updateRun.Status.StagesStatus[0].StartTime).ShouldNot(BeNil()) }) - It("Should mark the 2nd cluster in the 1st stage as succeeded", func() { + It("Should mark the 2nd cluster in the 1st stage as succeeded after marking the binding available", func() { By("Validating the 2nd clusterResourceBinding is updated to Bound") binding := resourceBindings[numTargetClusters-3] // cluster-7 validateBindingState(ctx, binding, resourceSnapshot.Name, updateRun, 0) @@ -230,7 +230,7 @@ var _ = Describe("UpdateRun execution tests", func() { validateClusterStagedUpdateRunStatus(ctx, updateRun, wantStatus, "") }) - It("Should mark the 4th cluster in the 1st stage as succeeded", func() { + It("Should mark the 4th cluster in the 1st stage as succeeded after marking the binding available", func() { By("Validating the 4th clusterResourceBinding is updated to Bound") binding := resourceBindings[numTargetClusters-7] // cluster-3 validateBindingState(ctx, binding, resourceSnapshot.Name, updateRun, 0) @@ -245,7 +245,7 @@ var _ = Describe("UpdateRun execution tests", func() { validateClusterStagedUpdateRunStatus(ctx, updateRun, wantStatus, "") }) - It("Should mark the 5th cluster in the 1st stage as succeeded", func() { + It("Should mark the 5th cluster in the 1st stage as succeeded after marking the binding available", func() { By("Validating the 5th clusterResourceBinding is updated to Bound") binding := resourceBindings[numTargetClusters-9] // cluster-1 validateBindingState(ctx, binding, resourceSnapshot.Name, updateRun, 0) @@ -286,7 +286,7 @@ var _ = Describe("UpdateRun execution tests", func() { Expect(waitStartTime.Add(updateStrategy.Spec.Stages[0].AfterStageTasks[0].WaitTime.Duration).Before(waitEndTime)).Should(BeTrue()) }) - It("Should mark the 1st cluster in the 2nd stage as succeeded", func() { + It("Should mark the 1st cluster in the 2nd stage as succeeded after marking the binding available", func() { By("Validating the 1st clusterResourceBinding is updated to Bound") binding := resourceBindings[0] // cluster-0 validateBindingState(ctx, binding, resourceSnapshot.Name, updateRun, 1) @@ -304,7 +304,7 @@ var _ = Describe("UpdateRun execution tests", func() { Expect(updateRun.Status.StagesStatus[0].StartTime).ShouldNot(BeNil()) }) - It("Should mark the 2nd cluster in the 2nd stage as succeeded", func() { + It("Should mark the 2nd cluster in the 2nd stage as succeeded after marking the binding available", func() { By("Validating the 2nd clusterResourceBinding is updated to Bound") binding := resourceBindings[2] // cluster-2 validateBindingState(ctx, binding, resourceSnapshot.Name, updateRun, 1) @@ -319,7 +319,7 @@ var _ = Describe("UpdateRun execution tests", func() { validateClusterStagedUpdateRunStatus(ctx, updateRun, wantStatus, "") }) - It("Should mark the 3rd cluster in the 2nd stage as succeeded", func() { + It("Should mark the 3rd cluster in the 2nd stage as succeeded after marking the binding available", func() { By("Validating the 3rd clusterResourceBinding is updated to Bound") binding := resourceBindings[4] // cluster-4 validateBindingState(ctx, binding, resourceSnapshot.Name, updateRun, 1) @@ -334,7 +334,7 @@ var _ = Describe("UpdateRun execution tests", func() { validateClusterStagedUpdateRunStatus(ctx, updateRun, wantStatus, "") }) - It("Should mark the 4th cluster in the 2nd stage as succeeded", func() { + It("Should mark the 4th cluster in the 2nd stage as succeeded after marking the binding available", func() { By("Validating the 4th clusterResourceBinding is updated to Bound") binding := resourceBindings[6] // cluster-6 validateBindingState(ctx, binding, resourceSnapshot.Name, updateRun, 1) @@ -349,7 +349,7 @@ var _ = Describe("UpdateRun execution tests", func() { validateClusterStagedUpdateRunStatus(ctx, updateRun, wantStatus, "") }) - It("Should mark the 5th cluster in the 2nd stage as succeeded", func() { + It("Should mark the 5th cluster in the 2nd stage as succeeded after marking the binding available", func() { By("Validating the 5th clusterResourceBinding is updated to Bound") binding := resourceBindings[8] // cluster-8 validateBindingState(ctx, binding, resourceSnapshot.Name, updateRun, 1)