From 64ac0c2b82f0493c1ca990f2d817447ef84ff5ef Mon Sep 17 00:00:00 2001 From: Geoffrey Beausire Date: Tue, 2 Jan 2024 14:02:30 +0100 Subject: [PATCH 1/3] Seperate pending update check from health check during roll restart Refactor the code to seperate each steps clearly. In the first loop there was 2 operations: - Check if there is a pending update - Check if all pods are ready However, we can break in some conditions and skip the health checking. To simplify the maintability of the scope, it is now split in 2 separate loops: - first check if restarts are pending - then check that all pools are healthy Signed-off-by: Geoffrey Beausire --- .../pkg/reconcilers/rollingRestart.go | 18 ++++++++++++++++-- 1 file changed, 16 insertions(+), 2 deletions(-) diff --git a/opensearch-operator/pkg/reconcilers/rollingRestart.go b/opensearch-operator/pkg/reconcilers/rollingRestart.go index 748c06d8..ad8f2744 100644 --- a/opensearch-operator/pkg/reconcilers/rollingRestart.go +++ b/opensearch-operator/pkg/reconcilers/rollingRestart.go @@ -67,13 +67,17 @@ func (r *RollingRestartReconciler) Reconcile() (ctrl.Result, error) { status := r.findStatus() var pendingUpdate bool - // Check that all nodes are ready before doing work - // Also check if there are pending updates for all nodes. + statefulSets := []appsv1.StatefulSet{} for _, nodePool := range r.instance.Spec.NodePools { sts, err := r.client.GetStatefulSet(builders.StsName(r.instance, &nodePool), r.instance.Namespace) if err != nil { return ctrl.Result{}, err } + statefulSets = append(statefulSets, sts) + } + + // Check if there are pending updates for all nodes. + for _, sts := range statefulSets { if sts.Status.UpdateRevision != "" && sts.Status.UpdatedReplicas != pointer.Int32Deref(sts.Spec.Replicas, 1) { pendingUpdate = true @@ -118,6 +122,16 @@ func (r *RollingRestartReconciler) Reconcile() (ctrl.Result, error) { return ctrl.Result{}, nil } + // Check that all nodes of all pools are ready before doing work + for _, sts := range statefulSets { + if sts.Status.ReadyReplicas != pointer.Int32Deref(sts.Spec.Replicas, 1) { + return ctrl.Result{ + Requeue: true, + RequeueAfter: 10 * time.Second, + }, nil + } + } + // Skip a rolling restart if the cluster hasn't finished initializing if !r.instance.Status.Initialized { return ctrl.Result{ From c2a3df214da417e5f49575c9cb4b977eb01a6601 Mon Sep 17 00:00:00 2001 From: Geoffrey Beausire Date: Tue, 2 Jan 2024 14:07:43 +0100 Subject: [PATCH 2/3] Separate the rolling update loop from the pod stuck restart Refactor the order or restart to simplify the reconcile function and make it more deterministic. Before we could maybe restart stuck pods then perform roll restart. Now, stuck pods are always restarted first. Also, stuck pods are restarted before checking the number of ready replicas, because if we check before we will never reach this point. Finally, we don't process a rolling restart if any pods stuck was deleted, to avoid performing any dangerous actions. Signed-off-by: Geoffrey Beausire --- opensearch-operator/pkg/helpers/helpers.go | 9 +++--- .../pkg/reconcilers/rollingRestart.go | 32 +++++++++++-------- 2 files changed, 24 insertions(+), 17 deletions(-) diff --git a/opensearch-operator/pkg/helpers/helpers.go b/opensearch-operator/pkg/helpers/helpers.go index 7795f260..68d2d836 100644 --- a/opensearch-operator/pkg/helpers/helpers.go +++ b/opensearch-operator/pkg/helpers/helpers.go @@ -455,16 +455,17 @@ func WorkingPodForRollingRestart(k8sClient k8s.K8sClient, sts *appsv1.StatefulSe } // DeleteStuckPodWithOlderRevision deletes the crashed pod only if there is any update in StatefulSet. -func DeleteStuckPodWithOlderRevision(k8sClient k8s.K8sClient, sts *appsv1.StatefulSet) error { +// Return true if a pod was restarded +func DeleteStuckPodWithOlderRevision(k8sClient k8s.K8sClient, sts *appsv1.StatefulSet) (bool, error) { podWithOlderRevision, err := GetPodWithOlderRevision(k8sClient, sts) if err != nil { - return err + return false, err } if podWithOlderRevision != nil { for _, container := range podWithOlderRevision.Status.ContainerStatuses { // If any container is getting crashed, restart it by deleting the pod so that new update in sts can take place. if !container.Ready && container.State.Waiting != nil && container.State.Waiting.Reason == "CrashLoopBackOff" { - return k8sClient.DeletePod(&corev1.Pod{ + return true, k8sClient.DeletePod(&corev1.Pod{ ObjectMeta: metav1.ObjectMeta{ Name: podWithOlderRevision.Name, Namespace: sts.Namespace, @@ -473,7 +474,7 @@ func DeleteStuckPodWithOlderRevision(k8sClient k8s.K8sClient, sts *appsv1.Statef } } } - return nil + return false, nil } // GetPodWithOlderRevision fetches the pod that is not having the updated revision. diff --git a/opensearch-operator/pkg/reconcilers/rollingRestart.go b/opensearch-operator/pkg/reconcilers/rollingRestart.go index ad8f2744..81e8be41 100644 --- a/opensearch-operator/pkg/reconcilers/rollingRestart.go +++ b/opensearch-operator/pkg/reconcilers/rollingRestart.go @@ -95,12 +95,6 @@ func (r *RollingRestartReconciler) Reconcile() (ctrl.Result, error) { } } - if sts.Status.ReadyReplicas != pointer.Int32Deref(sts.Spec.Replicas, 1) { - return ctrl.Result{ - Requeue: true, - RequeueAfter: 10 * time.Second, - }, nil - } } if !pendingUpdate { @@ -122,6 +116,24 @@ func (r *RollingRestartReconciler) Reconcile() (ctrl.Result, error) { return ctrl.Result{}, nil } + // Check if there is any crashed pod. Delete it if there is any update in sts. + any_restarted_pod := false + for _, sts := range statefulSets { + restared_pod, err := helpers.DeleteStuckPodWithOlderRevision(r.client, &sts) + if err != nil { + return ctrl.Result{}, err + } + if restared_pod { + any_restarted_pod = true + } + } + if any_restarted_pod { + return ctrl.Result{ + Requeue: true, + RequeueAfter: 10 * time.Second, + }, nil + } + // Check that all nodes of all pools are ready before doing work for _, sts := range statefulSets { if sts.Status.ReadyReplicas != pointer.Int32Deref(sts.Spec.Replicas, 1) { @@ -153,8 +165,7 @@ func (r *RollingRestartReconciler) Reconcile() (ctrl.Result, error) { return ctrl.Result{}, err } - // Restart StatefulSet pod. Order is not important So we just pick the first we find - + // Restart a single pod of a StatefulSet. Order is not important so we just pick the first we find for _, nodePool := range r.instance.Spec.NodePools { sts, err := r.client.GetStatefulSet(builders.StsName(r.instance, &nodePool), r.instance.Namespace) if err != nil { @@ -168,11 +179,6 @@ func (r *RollingRestartReconciler) Reconcile() (ctrl.Result, error) { lg.Info(fmt.Sprintf("Starting rolling restart of the StatefulSet %s", sts.Name)) return r.restartStatefulSetPod(&sts) } - } else { // Check if there is any crashed pod. Delete it if there is any update in sts. - err = helpers.DeleteStuckPodWithOlderRevision(r.client, &sts) - if err != nil { - return ctrl.Result{}, err - } } } } From 2543804200eded15d02cd0b374666770d9f82d9e Mon Sep 17 00:00:00 2001 From: Geoffrey Beausire Date: Tue, 2 Jan 2024 14:34:47 +0100 Subject: [PATCH 3/3] Alleviate race conditions in roll restart reconciler When a Roll Restart is triggered with at least 2 pools, a race condition can trigger the roll restart of a pods in each pools. This can lead to a red cluster. Normally to prevent this from happening, there are 3 checks: - check the status.ReadyReplicas of all sts before moving forward. - for each nodePool, check that that all replicas are ready by listing pods directly - before deleting a pod, a check is made on the OpenSearch to see if the cluster is healthy In practice, it is not enough. Considering the rollRestart of 2 nodePool: - data - masterData The following sequence can happen: - a rollRestart is triggered - reconcile function is called - data and masterData have all their pods ready - data pod is deleted; pods is terminating (NOT terminated yet) - reconcile function is recalled - data and masterData have all their pods ready from the status.ReadyReplicas point of view (because it didn't see the change yet) - data is seen as unhealthy thanks to CountRunningPodsForNodePool - masterData is seen as healthy because all its pods are ready - Opensearch is still healthy, because the deleted pod is not terminated yet - A pod in masterData is restarted - Cluster is red! This commit make sure we check readiness of all nodePool using CountRunningPodsForNodePool before trying to restart any pools. Signed-off-by: Geoffrey Beausire --- opensearch-operator/pkg/helpers/helpers.go | 3 +- .../pkg/helpers/helpers_suite_test.go | 67 +++++++++++++++++++ .../pkg/reconcilers/rollingRestart.go | 25 ++++--- 3 files changed, 86 insertions(+), 9 deletions(-) diff --git a/opensearch-operator/pkg/helpers/helpers.go b/opensearch-operator/pkg/helpers/helpers.go index 68d2d836..86979673 100644 --- a/opensearch-operator/pkg/helpers/helpers.go +++ b/opensearch-operator/pkg/helpers/helpers.go @@ -4,11 +4,12 @@ import ( "encoding/json" "errors" "fmt" - apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" "reflect" "sort" "time" + apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" + policyv1 "k8s.io/api/policy/v1" opsterv1 "github.com/Opster/opensearch-k8s-operator/opensearch-operator/api/v1" diff --git a/opensearch-operator/pkg/helpers/helpers_suite_test.go b/opensearch-operator/pkg/helpers/helpers_suite_test.go index 7ab2c7be..2e90cc84 100644 --- a/opensearch-operator/pkg/helpers/helpers_suite_test.go +++ b/opensearch-operator/pkg/helpers/helpers_suite_test.go @@ -3,11 +3,78 @@ package helpers import ( "testing" + "github.com/Opster/opensearch-k8s-operator/opensearch-operator/mocks/github.com/Opster/opensearch-k8s-operator/opensearch-operator/pkg/reconcilers/k8s" . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" + appsv1 "k8s.io/api/apps/v1" + corev1 "k8s.io/api/core/v1" + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) func TestHelpers(t *testing.T) { RegisterFailHandler(Fail) RunSpecs(t, "Helpers Suite") } + +var _ = Describe("Helpers", func() { + When("A STS has a pod stuck with the same revision", func() { + It("Should do nothing", func() { + mockClient := k8s.NewMockK8sClient(GinkgoT()) + + pod := corev1.Pod{ + ObjectMeta: v1.ObjectMeta{ + Labels: map[string]string{ + stsRevisionLabel: "foo", + }, + }, + } + mockClient.EXPECT().GetPod("foo-0", "").Return(pod, nil) + sts := &appsv1.StatefulSet{ + ObjectMeta: v1.ObjectMeta{Name: "foo"}, + Status: appsv1.StatefulSetStatus{UpdateRevision: "foo"}, + } + ok, err := DeleteStuckPodWithOlderRevision(mockClient, sts) + Expect(err).NotTo(HaveOccurred()) + Expect(ok).NotTo(BeTrue()) + }) + }) + + When("A STS has a pod stuck with a different revision", func() { + It("Should delete it", func() { + mockClient := k8s.NewMockK8sClient(GinkgoT()) + + pod := corev1.Pod{ + ObjectMeta: v1.ObjectMeta{ + Name: "foo-0", + Labels: map[string]string{ + stsRevisionLabel: "foo", + }, + }, + Status: corev1.PodStatus{ + ContainerStatuses: []corev1.ContainerStatus{ + { + State: corev1.ContainerState{ + Waiting: &corev1.ContainerStateWaiting{Reason: "CrashLoopBackOff"}, + }, + }, + }, + }, + } + deletedPod := corev1.Pod{ + ObjectMeta: v1.ObjectMeta{ + Name: "foo-0", + Namespace: "", + }} + mockClient.EXPECT().GetPod("foo-0", "").Return(pod, nil) + mockClient.EXPECT().DeletePod(&deletedPod).Return(nil) + sts := &appsv1.StatefulSet{ + ObjectMeta: v1.ObjectMeta{Name: "foo"}, + Status: appsv1.StatefulSetStatus{UpdateRevision: "bar"}, + } + ok, err := DeleteStuckPodWithOlderRevision(mockClient, sts) + Expect(err).NotTo(HaveOccurred()) + Expect(ok).To(BeTrue()) + + }) + }) +}) diff --git a/opensearch-operator/pkg/reconcilers/rollingRestart.go b/opensearch-operator/pkg/reconcilers/rollingRestart.go index 81e8be41..70c07121 100644 --- a/opensearch-operator/pkg/reconcilers/rollingRestart.go +++ b/opensearch-operator/pkg/reconcilers/rollingRestart.go @@ -135,13 +135,26 @@ func (r *RollingRestartReconciler) Reconcile() (ctrl.Result, error) { } // Check that all nodes of all pools are ready before doing work - for _, sts := range statefulSets { + for _, nodePool := range r.instance.Spec.NodePools { + sts, err := r.client.GetStatefulSet(builders.StsName(r.instance, &nodePool), r.instance.Namespace) + if err != nil { + return ctrl.Result{}, err + } if sts.Status.ReadyReplicas != pointer.Int32Deref(sts.Spec.Replicas, 1) { return ctrl.Result{ Requeue: true, RequeueAfter: 10 * time.Second, }, nil } + // CountRunningPodsForNodePool provides a more consistent view of the sts. Status of statefulset + // is eventually consistent. Listing pods is more consistent. + numReadyPods, err := helpers.CountRunningPodsForNodePool(r.client, r.instance, &nodePool) + if err != nil || numReadyPods != int(pointer.Int32Deref(sts.Spec.Replicas, 1)) { + return ctrl.Result{ + Requeue: true, + RequeueAfter: 10 * time.Second, + }, nil + } } // Skip a rolling restart if the cluster hasn't finished initializing @@ -173,13 +186,9 @@ func (r *RollingRestartReconciler) Reconcile() (ctrl.Result, error) { } if sts.Status.UpdateRevision != "" && sts.Status.UpdatedReplicas != pointer.Int32Deref(sts.Spec.Replicas, 1) { - // Only restart pods if not all pods are updated and the sts is healthy with no pods terminating - if sts.Status.ReadyReplicas == pointer.Int32Deref(sts.Spec.Replicas, 1) { - if numReadyPods, err := helpers.CountRunningPodsForNodePool(r.client, r.instance, &nodePool); err == nil && numReadyPods == int(pointer.Int32Deref(sts.Spec.Replicas, 1)) { - lg.Info(fmt.Sprintf("Starting rolling restart of the StatefulSet %s", sts.Name)) - return r.restartStatefulSetPod(&sts) - } - } + + lg.Info(fmt.Sprintf("Starting rolling restart of the StatefulSet %s", sts.Name)) + return r.restartStatefulSetPod(&sts) } }