Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Alleviate race conditions in roll restart reconciler #694

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 7 additions & 5 deletions opensearch-operator/pkg/helpers/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,12 @@ import (
"encoding/json"
"errors"
"fmt"
apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
"reflect"
"sort"
"time"

apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"

policyv1 "k8s.io/api/policy/v1"

opsterv1 "github.com/Opster/opensearch-k8s-operator/opensearch-operator/api/v1"
Expand Down Expand Up @@ -455,16 +456,17 @@ func WorkingPodForRollingRestart(k8sClient k8s.K8sClient, sts *appsv1.StatefulSe
}

// DeleteStuckPodWithOlderRevision deletes the crashed pod only if there is any update in StatefulSet.
func DeleteStuckPodWithOlderRevision(k8sClient k8s.K8sClient, sts *appsv1.StatefulSet) error {
// Return true if a pod was restarded
func DeleteStuckPodWithOlderRevision(k8sClient k8s.K8sClient, sts *appsv1.StatefulSet) (bool, error) {
podWithOlderRevision, err := GetPodWithOlderRevision(k8sClient, sts)
if err != nil {
return err
return false, err
}
if podWithOlderRevision != nil {
for _, container := range podWithOlderRevision.Status.ContainerStatuses {
// If any container is getting crashed, restart it by deleting the pod so that new update in sts can take place.
if !container.Ready && container.State.Waiting != nil && container.State.Waiting.Reason == "CrashLoopBackOff" {
return k8sClient.DeletePod(&corev1.Pod{
return true, k8sClient.DeletePod(&corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: podWithOlderRevision.Name,
Namespace: sts.Namespace,
Expand All @@ -473,7 +475,7 @@ func DeleteStuckPodWithOlderRevision(k8sClient k8s.K8sClient, sts *appsv1.Statef
}
}
}
return nil
return false, nil
}

// GetPodWithOlderRevision fetches the pod that is not having the updated revision.
Expand Down
67 changes: 67 additions & 0 deletions opensearch-operator/pkg/helpers/helpers_suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,78 @@ package helpers
import (
"testing"

"github.com/Opster/opensearch-k8s-operator/opensearch-operator/mocks/github.com/Opster/opensearch-k8s-operator/opensearch-operator/pkg/reconcilers/k8s"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

func TestHelpers(t *testing.T) {
RegisterFailHandler(Fail)
RunSpecs(t, "Helpers Suite")
}

var _ = Describe("Helpers", func() {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not the correct file for the tests, they should go in a file helpers_test.go, the helpers_suite_test.go only contains the init/start code for the test of the entire package.

When("A STS has a pod stuck with the same revision", func() {
It("Should do nothing", func() {
mockClient := k8s.NewMockK8sClient(GinkgoT())

pod := corev1.Pod{
ObjectMeta: v1.ObjectMeta{
Labels: map[string]string{
stsRevisionLabel: "foo",
},
},
}
mockClient.EXPECT().GetPod("foo-0", "").Return(pod, nil)
sts := &appsv1.StatefulSet{
ObjectMeta: v1.ObjectMeta{Name: "foo"},
Status: appsv1.StatefulSetStatus{UpdateRevision: "foo"},
}
ok, err := DeleteStuckPodWithOlderRevision(mockClient, sts)
Expect(err).NotTo(HaveOccurred())
Expect(ok).NotTo(BeTrue())
})
})

When("A STS has a pod stuck with a different revision", func() {
It("Should delete it", func() {
mockClient := k8s.NewMockK8sClient(GinkgoT())

pod := corev1.Pod{
ObjectMeta: v1.ObjectMeta{
Name: "foo-0",
Labels: map[string]string{
stsRevisionLabel: "foo",
},
},
Status: corev1.PodStatus{
ContainerStatuses: []corev1.ContainerStatus{
{
State: corev1.ContainerState{
Waiting: &corev1.ContainerStateWaiting{Reason: "CrashLoopBackOff"},
},
},
},
},
}
deletedPod := corev1.Pod{
ObjectMeta: v1.ObjectMeta{
Name: "foo-0",
Namespace: "",
}}
mockClient.EXPECT().GetPod("foo-0", "").Return(pod, nil)
mockClient.EXPECT().DeletePod(&deletedPod).Return(nil)
sts := &appsv1.StatefulSet{
ObjectMeta: v1.ObjectMeta{Name: "foo"},
Status: appsv1.StatefulSetStatus{UpdateRevision: "bar"},
}
ok, err := DeleteStuckPodWithOlderRevision(mockClient, sts)
Expect(err).NotTo(HaveOccurred())
Expect(ok).To(BeTrue())

})
})
})
73 changes: 51 additions & 22 deletions opensearch-operator/pkg/reconcilers/rollingRestart.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,13 +67,17 @@ func (r *RollingRestartReconciler) Reconcile() (ctrl.Result, error) {
status := r.findStatus()
var pendingUpdate bool

// Check that all nodes are ready before doing work
// Also check if there are pending updates for all nodes.
statefulSets := []appsv1.StatefulSet{}
for _, nodePool := range r.instance.Spec.NodePools {
sts, err := r.client.GetStatefulSet(builders.StsName(r.instance, &nodePool), r.instance.Namespace)
if err != nil {
return ctrl.Result{}, err
}
statefulSets = append(statefulSets, sts)
}

// Check if there are pending updates for all nodes.
for _, sts := range statefulSets {
if sts.Status.UpdateRevision != "" &&
sts.Status.UpdatedReplicas != pointer.Int32Deref(sts.Spec.Replicas, 1) {
pendingUpdate = true
Expand All @@ -91,12 +95,6 @@ func (r *RollingRestartReconciler) Reconcile() (ctrl.Result, error) {
}

}
if sts.Status.ReadyReplicas != pointer.Int32Deref(sts.Spec.Replicas, 1) {
return ctrl.Result{
Requeue: true,
RequeueAfter: 10 * time.Second,
}, nil
}
}

if !pendingUpdate {
Expand All @@ -118,6 +116,47 @@ func (r *RollingRestartReconciler) Reconcile() (ctrl.Result, error) {
return ctrl.Result{}, nil
}

// Check if there is any crashed pod. Delete it if there is any update in sts.
any_restarted_pod := false
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please use camelCase for any variables

for _, sts := range statefulSets {
restared_pod, err := helpers.DeleteStuckPodWithOlderRevision(r.client, &sts)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Typo

Suggested change
restared_pod, err := helpers.DeleteStuckPodWithOlderRevision(r.client, &sts)
restarted_pod, err := helpers.DeleteStuckPodWithOlderRevision(r.client, &sts)

if err != nil {
return ctrl.Result{}, err
}
if restared_pod {
any_restarted_pod = true
}
}
if any_restarted_pod {
return ctrl.Result{
Requeue: true,
RequeueAfter: 10 * time.Second,
}, nil
}

// Check that all nodes of all pools are ready before doing work
for _, nodePool := range r.instance.Spec.NodePools {
sts, err := r.client.GetStatefulSet(builders.StsName(r.instance, &nodePool), r.instance.Namespace)
if err != nil {
return ctrl.Result{}, err
}
if sts.Status.ReadyReplicas != pointer.Int32Deref(sts.Spec.Replicas, 1) {
return ctrl.Result{
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add a log line (can be debug) so it is visible the operator is waiting on pods being ready

Requeue: true,
RequeueAfter: 10 * time.Second,
}, nil
}
// CountRunningPodsForNodePool provides a more consistent view of the sts. Status of statefulset
// is eventually consistent. Listing pods is more consistent.
numReadyPods, err := helpers.CountRunningPodsForNodePool(r.client, r.instance, &nodePool)
if err != nil || numReadyPods != int(pointer.Int32Deref(sts.Spec.Replicas, 1)) {
return ctrl.Result{
Requeue: true,
RequeueAfter: 10 * time.Second,
}, nil
}
}

// Skip a rolling restart if the cluster hasn't finished initializing
if !r.instance.Status.Initialized {
return ctrl.Result{
Expand All @@ -139,27 +178,17 @@ func (r *RollingRestartReconciler) Reconcile() (ctrl.Result, error) {
return ctrl.Result{}, err
}

// Restart StatefulSet pod. Order is not important So we just pick the first we find

// Restart a single pod of a StatefulSet. Order is not important so we just pick the first we find
for _, nodePool := range r.instance.Spec.NodePools {
sts, err := r.client.GetStatefulSet(builders.StsName(r.instance, &nodePool), r.instance.Namespace)
if err != nil {
return ctrl.Result{}, err
}
if sts.Status.UpdateRevision != "" &&
sts.Status.UpdatedReplicas != pointer.Int32Deref(sts.Spec.Replicas, 1) {
// Only restart pods if not all pods are updated and the sts is healthy with no pods terminating
if sts.Status.ReadyReplicas == pointer.Int32Deref(sts.Spec.Replicas, 1) {
if numReadyPods, err := helpers.CountRunningPodsForNodePool(r.client, r.instance, &nodePool); err == nil && numReadyPods == int(pointer.Int32Deref(sts.Spec.Replicas, 1)) {
lg.Info(fmt.Sprintf("Starting rolling restart of the StatefulSet %s", sts.Name))
return r.restartStatefulSetPod(&sts)
}
} else { // Check if there is any crashed pod. Delete it if there is any update in sts.
err = helpers.DeleteStuckPodWithOlderRevision(r.client, &sts)
if err != nil {
return ctrl.Result{}, err
}
}

lg.Info(fmt.Sprintf("Starting rolling restart of the StatefulSet %s", sts.Name))
return r.restartStatefulSetPod(&sts)
}
}

Expand Down