Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: implement clusterStagedUpdateRun execution #1000

Merged
merged 2 commits into from
Dec 30, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 72 additions & 8 deletions pkg/controllers/updaterun/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"time"

"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/workqueue"
Expand Down Expand Up @@ -40,10 +41,6 @@ var (
// errInitializedFailed is the error when the ClusterStagedUpdateRun fails to initialize.
// It is a wrapped error of errStagedUpdatedAborted, because some initialization functions are reused in the validation step.
errInitializedFailed = fmt.Errorf("%w: failed to initialize the clusterStagedUpdateRun", errStagedUpdatedAborted)

// stageUpdatingWaitTime is the time to wait before rechecking the stage update status.
// Put it as a variable for convenient testing.
stageUpdatingWaitTime = 60 * time.Second
)

// Reconciler reconciles a ClusterStagedUpdateRun object.
Expand Down Expand Up @@ -127,10 +124,35 @@ func (r *Reconciler) Reconcile(ctx context.Context, req runtime.Request) (runtim
klog.V(2).InfoS("The clusterStagedUpdateRun is validated", "clusterStagedUpdateRun", runObjRef)
}

// TODO(wantjian): execute the clusterStagedUpdateRun and fix the requeue time.
klog.V(2).InfoS("Executing the clusterStagedUpdateRun", "clusterStagedUpdateRun", runObjRef, "updatingStageIndex", updatingStageIndex,
"toBeUpdatedBindings count", len(toBeUpdatedBindings), "toBeDeletedBindings count", len(toBeDeletedBindings))
return runtime.Result{RequeueAfter: stageUpdatingWaitTime}, nil
// The previous run is completed but the update to the status failed.
jwtty marked this conversation as resolved.
Show resolved Hide resolved
if updatingStageIndex == -1 {
klog.V(2).InfoS("The clusterStagedUpdateRun is completed", "clusterStagedUpdateRun", runObjRef)
return runtime.Result{}, r.recordUpdateRunSucceeded(ctx, &updateRun)
}

// Execute the updateRun.
klog.V(2).InfoS("Continue to execute the clusterStagedUpdateRun", "updatingStageIndex", updatingStageIndex, "clusterStagedUpdateRun", runObjRef)
finished, waitTime, execErr := r.execute(ctx, &updateRun, updatingStageIndex, toBeUpdatedBindings, toBeDeletedBindings)
if execErr != nil && errors.Is(execErr, errStagedUpdatedAborted) {
jwtty marked this conversation as resolved.
Show resolved Hide resolved
// errStagedUpdatedAborted cannot be retried.
return runtime.Result{}, r.recordUpdateRunFailed(ctx, &updateRun, execErr.Error())
}

if finished {
klog.V(2).InfoS("The clusterStagedUpdateRun is completed", "clusterStagedUpdateRun", runObjRef)
return runtime.Result{}, r.recordUpdateRunSucceeded(ctx, &updateRun)
}

// The execution is not finished yet or it encounters a retriable error.
// We need to record the status and requeue.
if updateErr := r.recordUpdateRunStatus(ctx, &updateRun); updateErr != nil {
return runtime.Result{}, updateErr
}
klog.V(2).InfoS("The clusterStagedUpdateRun is not finished yet", "requeueWaitTime", waitTime, "execErr", execErr, "clusterStagedUpdateRun", runObjRef)
if execErr != nil {
return runtime.Result{}, execErr
}
return runtime.Result{Requeue: true, RequeueAfter: waitTime}, nil
}

// handleDelete handles the deletion of the clusterStagedUpdateRun object.
Expand Down Expand Up @@ -162,6 +184,48 @@ func (r *Reconciler) ensureFinalizer(ctx context.Context, updateRun *placementv1
return r.Update(ctx, updateRun, client.FieldOwner(utils.UpdateRunControllerFieldManagerName))
}

// recordUpdateRunSucceeded records the succeeded condition in the ClusterStagedUpdateRun status.
func (r *Reconciler) recordUpdateRunSucceeded(ctx context.Context, updateRun *placementv1alpha1.ClusterStagedUpdateRun) error {
meta.SetStatusCondition(&updateRun.Status.Conditions, metav1.Condition{
Type: string(placementv1alpha1.StagedUpdateRunConditionSucceeded),
Status: metav1.ConditionTrue,
ObservedGeneration: updateRun.Generation,
Reason: condition.UpdateRunSucceededReason,
})
if updateErr := r.Client.Status().Update(ctx, updateRun); updateErr != nil {
klog.ErrorS(updateErr, "Failed to update the ClusterStagedUpdateRun status as succeeded", "clusterStagedUpdateRun", klog.KObj(updateRun))
// updateErr can be retried.
return controller.NewUpdateIgnoreConflictError(updateErr)
}
return nil
}

// recordUpdateRunFailed records the failed condition in the ClusterStagedUpdateRun status.
func (r *Reconciler) recordUpdateRunFailed(ctx context.Context, updateRun *placementv1alpha1.ClusterStagedUpdateRun, message string) error {
meta.SetStatusCondition(&updateRun.Status.Conditions, metav1.Condition{
Type: string(placementv1alpha1.StagedUpdateRunConditionSucceeded),
Status: metav1.ConditionFalse,
ObservedGeneration: updateRun.Generation,
Reason: condition.UpdateRunFailedReason,
Message: message,
})
if updateErr := r.Client.Status().Update(ctx, updateRun); updateErr != nil {
klog.ErrorS(updateErr, "Failed to update the ClusterStagedUpdateRun status as failed", "clusterStagedUpdateRun", klog.KObj(updateRun))
// updateErr can be retried.
return controller.NewUpdateIgnoreConflictError(updateErr)
}
return nil
}

// recordUpdateRunStatus records the ClusterStagedUpdateRun status.
func (r *Reconciler) recordUpdateRunStatus(ctx context.Context, updateRun *placementv1alpha1.ClusterStagedUpdateRun) error {
if updateErr := r.Client.Status().Update(ctx, updateRun); updateErr != nil {
klog.ErrorS(updateErr, "Failed to update the ClusterStagedUpdateRun status", "clusterStagedUpdateRun", klog.KObj(updateRun))
return controller.NewUpdateIgnoreConflictError(updateErr)
}
return nil
}

// SetupWithManager sets up the controller with the Manager.
func (r *Reconciler) SetupWithManager(mgr runtime.Manager) error {
r.recorder = mgr.GetEventRecorderFor("clusterresource-stagedupdaterun-controller")
Expand Down
39 changes: 34 additions & 5 deletions pkg/controllers/updaterun/controller_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"

clusterv1beta1 "go.goms.io/fleet/apis/cluster/v1beta1"
Expand Down Expand Up @@ -327,7 +328,7 @@ func generateTestClusterStagedUpdateStrategy() *placementv1alpha1.ClusterStagedU
{
Type: placementv1alpha1.AfterStageTaskTypeTimedWait,
WaitTime: metav1.Duration{
Duration: time.Minute * 10,
Duration: time.Second * 4,
},
},
},
Expand Down Expand Up @@ -469,7 +470,7 @@ func validateApprovalRequestCount(ctx context.Context, count int) {
}, timeout, interval).Should(Equal(count), "approval requests count mismatch")
}

func generateTrueCondition(updateRun *placementv1alpha1.ClusterStagedUpdateRun, condType any) metav1.Condition {
func generateTrueCondition(obj client.Object, condType any) metav1.Condition {
reason, typeStr := "", ""
switch cond := condType.(type) {
case placementv1alpha1.StagedUpdateRunConditionType:
Expand Down Expand Up @@ -498,16 +499,38 @@ func generateTrueCondition(updateRun *placementv1alpha1.ClusterStagedUpdateRun,
reason = condition.ClusterUpdatingSucceededReason
}
typeStr = string(cond)
case placementv1alpha1.AfterStageTaskConditionType:
switch cond {
case placementv1alpha1.AfterStageTaskConditionWaitTimeElapsed:
reason = condition.AfterStageTaskWaitTimeElapsedReason
case placementv1alpha1.AfterStageTaskConditionApprovalRequestCreated:
reason = condition.AfterStageTaskApprovalRequestCreatedReason
case placementv1alpha1.AfterStageTaskConditionApprovalRequestApproved:
reason = condition.AfterStageTaskApprovalRequestApprovedReason
}
typeStr = string(cond)
case placementv1alpha1.ApprovalRequestConditionType:
switch cond {
case placementv1alpha1.ApprovalRequestConditionApproved:
reason = "LGTM"
}
typeStr = string(cond)
case placementv1beta1.ResourceBindingConditionType:
switch cond {
case placementv1beta1.ResourceBindingAvailable:
reason = condition.AvailableReason
}
typeStr = string(cond)
}
return metav1.Condition{
Status: metav1.ConditionTrue,
Type: typeStr,
ObservedGeneration: updateRun.Generation,
ObservedGeneration: obj.GetGeneration(),
Reason: reason,
}
}

func generateFalseCondition(updateRun *placementv1alpha1.ClusterStagedUpdateRun, condType any) metav1.Condition {
func generateFalseCondition(obj client.Object, condType any) metav1.Condition {
reason, typeStr := "", ""
switch cond := condType.(type) {
case placementv1alpha1.StagedUpdateRunConditionType:
Expand All @@ -530,11 +553,17 @@ func generateFalseCondition(updateRun *placementv1alpha1.ClusterStagedUpdateRun,
reason = condition.ClusterUpdatingFailedReason
}
typeStr = string(cond)
case placementv1beta1.ResourceBindingConditionType:
switch cond {
case placementv1beta1.ResourceBindingApplied:
reason = condition.ApplyFailedReason
}
typeStr = string(cond)
}
return metav1.Condition{
Status: metav1.ConditionFalse,
Type: typeStr,
ObservedGeneration: updateRun.Generation,
ObservedGeneration: obj.GetGeneration(),
Reason: reason,
}
}
Loading
Loading