Skip to content

Commit

Permalink
Alleviate race conditions in roll restart reconciler
Browse files Browse the repository at this point in the history
When a Roll Restart is triggered with at least 2 pools, a race
condition can trigger the roll restart of a pods in each pools.
This can lead to a red cluster.
Normally to prevent this from happening, there are 3 checks:
- check the status.ReadyReplicas of all sts before moving forward.
- for each nodePool, check that that all replicas are ready by listing
  pods directly
- before deleting a pod, a check is made on the OpenSearch to see
  if the cluster is healthy

In practice, it is not enough. Considering the rollRestart of 2 nodePool:
- data
- masterData

The following sequence can happen:
- a rollRestart is triggered
- reconcile function is called
- data and masterData have all their pods ready
- data pod is deleted; pods is terminating (NOT terminated yet)
- reconcile function is recalled
- data and masterData have all their pods ready from the status.ReadyReplicas
  point of view (because it didn't see the change yet)
- data is seen as unhealthy thanks to CountRunningPodsForNodePool
- masterData is seen as healthy because all its pods are ready
- Opensearch is still healthy, because the deleted pod is not terminated yet
- A pod in masterData is restarted
- Cluster is red!

This commit make sure we check readiness of all nodePool using
CountRunningPodsForNodePool before trying to restart any pools.

Signed-off-by: Geoffrey Beausire <[email protected]>
  • Loading branch information
geobeau committed Apr 8, 2024
1 parent c2a3df2 commit 2543804
Show file tree
Hide file tree
Showing 3 changed files with 86 additions and 9 deletions.
3 changes: 2 additions & 1 deletion opensearch-operator/pkg/helpers/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,12 @@ import (
"encoding/json"
"errors"
"fmt"
apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
"reflect"
"sort"
"time"

apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"

policyv1 "k8s.io/api/policy/v1"

opsterv1 "github.com/Opster/opensearch-k8s-operator/opensearch-operator/api/v1"
Expand Down
67 changes: 67 additions & 0 deletions opensearch-operator/pkg/helpers/helpers_suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,78 @@ package helpers
import (
"testing"

"github.com/Opster/opensearch-k8s-operator/opensearch-operator/mocks/github.com/Opster/opensearch-k8s-operator/opensearch-operator/pkg/reconcilers/k8s"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

func TestHelpers(t *testing.T) {
RegisterFailHandler(Fail)
RunSpecs(t, "Helpers Suite")
}

var _ = Describe("Helpers", func() {
When("A STS has a pod stuck with the same revision", func() {
It("Should do nothing", func() {
mockClient := k8s.NewMockK8sClient(GinkgoT())

pod := corev1.Pod{
ObjectMeta: v1.ObjectMeta{
Labels: map[string]string{
stsRevisionLabel: "foo",
},
},
}
mockClient.EXPECT().GetPod("foo-0", "").Return(pod, nil)
sts := &appsv1.StatefulSet{
ObjectMeta: v1.ObjectMeta{Name: "foo"},
Status: appsv1.StatefulSetStatus{UpdateRevision: "foo"},
}
ok, err := DeleteStuckPodWithOlderRevision(mockClient, sts)
Expect(err).NotTo(HaveOccurred())
Expect(ok).NotTo(BeTrue())
})
})

When("A STS has a pod stuck with a different revision", func() {
It("Should delete it", func() {
mockClient := k8s.NewMockK8sClient(GinkgoT())

pod := corev1.Pod{
ObjectMeta: v1.ObjectMeta{
Name: "foo-0",
Labels: map[string]string{
stsRevisionLabel: "foo",
},
},
Status: corev1.PodStatus{
ContainerStatuses: []corev1.ContainerStatus{
{
State: corev1.ContainerState{
Waiting: &corev1.ContainerStateWaiting{Reason: "CrashLoopBackOff"},
},
},
},
},
}
deletedPod := corev1.Pod{
ObjectMeta: v1.ObjectMeta{
Name: "foo-0",
Namespace: "",
}}
mockClient.EXPECT().GetPod("foo-0", "").Return(pod, nil)
mockClient.EXPECT().DeletePod(&deletedPod).Return(nil)
sts := &appsv1.StatefulSet{
ObjectMeta: v1.ObjectMeta{Name: "foo"},
Status: appsv1.StatefulSetStatus{UpdateRevision: "bar"},
}
ok, err := DeleteStuckPodWithOlderRevision(mockClient, sts)
Expect(err).NotTo(HaveOccurred())
Expect(ok).To(BeTrue())

})
})
})
25 changes: 17 additions & 8 deletions opensearch-operator/pkg/reconcilers/rollingRestart.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,13 +135,26 @@ func (r *RollingRestartReconciler) Reconcile() (ctrl.Result, error) {
}

// Check that all nodes of all pools are ready before doing work
for _, sts := range statefulSets {
for _, nodePool := range r.instance.Spec.NodePools {
sts, err := r.client.GetStatefulSet(builders.StsName(r.instance, &nodePool), r.instance.Namespace)
if err != nil {
return ctrl.Result{}, err
}
if sts.Status.ReadyReplicas != pointer.Int32Deref(sts.Spec.Replicas, 1) {
return ctrl.Result{
Requeue: true,
RequeueAfter: 10 * time.Second,
}, nil
}
// CountRunningPodsForNodePool provides a more consistent view of the sts. Status of statefulset
// is eventually consistent. Listing pods is more consistent.
numReadyPods, err := helpers.CountRunningPodsForNodePool(r.client, r.instance, &nodePool)
if err != nil || numReadyPods != int(pointer.Int32Deref(sts.Spec.Replicas, 1)) {
return ctrl.Result{
Requeue: true,
RequeueAfter: 10 * time.Second,
}, nil
}
}

// Skip a rolling restart if the cluster hasn't finished initializing
Expand Down Expand Up @@ -173,13 +186,9 @@ func (r *RollingRestartReconciler) Reconcile() (ctrl.Result, error) {
}
if sts.Status.UpdateRevision != "" &&
sts.Status.UpdatedReplicas != pointer.Int32Deref(sts.Spec.Replicas, 1) {
// Only restart pods if not all pods are updated and the sts is healthy with no pods terminating
if sts.Status.ReadyReplicas == pointer.Int32Deref(sts.Spec.Replicas, 1) {
if numReadyPods, err := helpers.CountRunningPodsForNodePool(r.client, r.instance, &nodePool); err == nil && numReadyPods == int(pointer.Int32Deref(sts.Spec.Replicas, 1)) {
lg.Info(fmt.Sprintf("Starting rolling restart of the StatefulSet %s", sts.Name))
return r.restartStatefulSetPod(&sts)
}
}

lg.Info(fmt.Sprintf("Starting rolling restart of the StatefulSet %s", sts.Name))
return r.restartStatefulSetPod(&sts)
}
}

Expand Down

0 comments on commit 2543804

Please sign in to comment.