Skip to content

Commit

Permalink
Fix bug where new revision might not be saved when updating cluster s…
Browse files Browse the repository at this point in the history
…tatus.
  • Loading branch information
SaaldjorMike committed Sep 9, 2024
1 parent 5b7b807 commit 02db873
Show file tree
Hide file tree
Showing 12 changed files with 155 additions and 195 deletions.
2 changes: 2 additions & 0 deletions api/v1alpha1/humiocluster_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -379,6 +379,8 @@ type HumioNodePoolStatus struct {
Name string `json:"name,omitempty"`
// State will be empty before the cluster is bootstrapped. From there it can be "Running", "Upgrading", "Restarting" or "Pending"
State string `json:"state,omitempty"`
// DesiredPodRevision holds the desired pod revision for pods of the given node pool.
DesiredPodRevision int `json:"desiredPodRevision,omitempty"`
}

// HumioClusterStatus defines the observed state of HumioCluster
Expand Down
4 changes: 4 additions & 0 deletions charts/humio-operator/crds/core.humio.com_humioclusters.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -15050,6 +15050,10 @@ spec:
items:
description: HumioNodePoolStatus shows the status of each node pool
properties:
desiredPodRevision:
description: DesiredPodRevision holds the desired pod revision
for pods of the given node pool.
type: integer
name:
description: Name is the name of the node pool
type: string
Expand Down
4 changes: 4 additions & 0 deletions config/crd/bases/core.humio.com_humioclusters.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -15050,6 +15050,10 @@ spec:
items:
description: HumioNodePoolStatus shows the status of each node pool
properties:
desiredPodRevision:
description: DesiredPodRevision holds the desired pod revision
for pods of the given node pool.
type: integer
name:
description: Name is the name of the node pool
type: string
Expand Down
31 changes: 0 additions & 31 deletions controllers/humiocluster_annotations.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,9 @@ limitations under the License.
package controllers

import (
"context"
"fmt"
"strconv"

k8serrors "k8s.io/apimachinery/pkg/api/errors"

"k8s.io/client-go/util/retry"

corev1 "k8s.io/api/core/v1"

humiov1alpha1 "github.com/humio/humio-operator/api/v1alpha1"
)

const (
Expand All @@ -38,29 +30,6 @@ const (
pvcHashAnnotation = "humio_pvc_hash"
)

func (r *HumioClusterReconciler) incrementHumioClusterPodRevision(ctx context.Context, hc *humiov1alpha1.HumioCluster, hnp *HumioNodePool) (int, error) {
revisionKey, revisionValue := hnp.GetHumioClusterNodePoolRevisionAnnotation()
revisionValue++
r.Log.Info(fmt.Sprintf("setting cluster pod revision %s=%d", revisionKey, revisionValue))
err := retry.RetryOnConflict(retry.DefaultRetry, func() error {
err := r.getLatestHumioCluster(ctx, hc)
if err != nil {
if !k8serrors.IsNotFound(err) {
return err
}
}
if hc.Annotations == nil {
hc.Annotations = map[string]string{}
}
hc.Annotations[revisionKey] = strconv.Itoa(revisionValue)
return r.Update(ctx, hc)
})
if err != nil {
return revisionValue, fmt.Errorf("unable to set annotation %s on HumioCluster: %w", revisionKey, err)
}
return revisionValue, nil
}

func (r *HumioClusterReconciler) setPodRevision(pod *corev1.Pod, newRevision int) {
pod.Annotations[PodRevisionAnnotation] = strconv.Itoa(newRevision)
}
73 changes: 19 additions & 54 deletions controllers/humiocluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"context"
"fmt"
"reflect"
"strconv"
"strings"
"time"

Expand Down Expand Up @@ -125,17 +124,17 @@ func (r *HumioClusterReconciler) Reconcile(ctx context.Context, req ctrl.Request
if err := r.setImageFromSource(ctx, pool); err != nil {
return r.updateStatus(ctx, r.Client.Status(), hc, statusOptions().
withMessage(err.Error()).
withNodePoolState(humiov1alpha1.HumioClusterStateConfigError, pool.GetNodePoolName()))
withNodePoolState(humiov1alpha1.HumioClusterStateConfigError, pool.GetNodePoolName(), pool.GetDesiredPodRevision()))
}
if err := r.ensureValidHumioVersion(pool); err != nil {
return r.updateStatus(ctx, r.Client.Status(), hc, statusOptions().
withMessage(err.Error()).
withNodePoolState(humiov1alpha1.HumioClusterStateConfigError, pool.GetNodePoolName()))
withNodePoolState(humiov1alpha1.HumioClusterStateConfigError, pool.GetNodePoolName(), pool.GetDesiredPodRevision()))
}
if err := r.ensureValidStorageConfiguration(pool); err != nil {
return r.updateStatus(ctx, r.Client.Status(), hc, statusOptions().
withMessage(err.Error()).
withNodePoolState(humiov1alpha1.HumioClusterStateConfigError, pool.GetNodePoolName()))
withNodePoolState(humiov1alpha1.HumioClusterStateConfigError, pool.GetNodePoolName(), pool.GetDesiredPodRevision()))
}
}

Expand All @@ -161,7 +160,7 @@ func (r *HumioClusterReconciler) Reconcile(ctx context.Context, req ctrl.Request
}
}

defer func(ctx context.Context, humioClient humio.Client, hc *humiov1alpha1.HumioCluster) {
defer func(ctx context.Context, hc *humiov1alpha1.HumioCluster) {
opts := statusOptions()
podStatusList, err := r.getPodStatusList(ctx, hc, humioNodePools.Filter(NodePoolFilterHasNode))
if err != nil {
Expand All @@ -170,7 +169,7 @@ func (r *HumioClusterReconciler) Reconcile(ctx context.Context, req ctrl.Request
_, _ = r.updateStatus(ctx, r.Client.Status(), hc, opts.
withPods(podStatusList).
withNodeCount(len(podStatusList)))
}(ctx, r.HumioClient, hc)
}(ctx, hc)

for _, pool := range humioNodePools.Items {
if err := r.ensureOrphanedPvcsAreDeleted(ctx, hc, pool); err != nil {
Expand All @@ -194,7 +193,7 @@ func (r *HumioClusterReconciler) Reconcile(ctx context.Context, req ctrl.Request
if err := r.validateInitialPodSpec(pool); err != nil {
return r.updateStatus(ctx, r.Client.Status(), hc, statusOptions().
withMessage(err.Error()).
withNodePoolState(humiov1alpha1.HumioClusterStateConfigError, pool.GetNodePoolName()))
withNodePoolState(humiov1alpha1.HumioClusterStateConfigError, pool.GetNodePoolName(), pool.GetDesiredPodRevision()))
}
}

Expand All @@ -205,21 +204,12 @@ func (r *HumioClusterReconciler) Reconcile(ctx context.Context, req ctrl.Request
}

if hc.Status.State == "" {
// TODO: migrate to updateStatus()
err := r.setState(ctx, humiov1alpha1.HumioClusterStateRunning, hc)
if err != nil {
return reconcile.Result{}, r.logErrorAndReturn(err, "unable to set cluster state")
}
}

for _, pool := range humioNodePools.Filter(NodePoolFilterHasNode) {
if clusterState, err := r.ensurePodRevisionAnnotation(ctx, hc, pool); err != nil || clusterState != hc.Status.State {
return r.updateStatus(ctx, r.Client.Status(), hc, statusOptions().
withMessage(err.Error()).
withNodePoolState(clusterState, pool.GetNodePoolName()))
}
}

for _, fun := range []ctxHumioClusterFunc{
r.ensureValidCAIssuer,
r.ensureHumioClusterCACertBundle,
Expand Down Expand Up @@ -253,23 +243,21 @@ func (r *HumioClusterReconciler) Reconcile(ctx context.Context, req ctrl.Request

for _, pool := range humioNodePools.Filter(NodePoolFilterHasNode) {
if issueRestart, err := r.ensureHumioServiceAccountAnnotations(ctx, pool); err != nil || issueRestart {
opts := statusOptions()
desiredPodRevision := pool.GetDesiredPodRevision()
if issueRestart {
_, err = r.incrementHumioClusterPodRevision(ctx, hc, pool)
}
if err != nil {
opts.withMessage(err.Error())
desiredPodRevision++
}
_, _ = r.updateStatus(ctx, r.Client.Status(), hc, opts.withState(hc.Status.State))
return reconcile.Result{Requeue: true}, nil
_, err = r.updateStatus(ctx, r.Client.Status(), hc, statusOptions().
withNodePoolState(hc.Status.State, pool.GetNodePoolName(), desiredPodRevision))
return reconcile.Result{Requeue: true}, err
}
}

for _, pool := range humioNodePools.Filter(NodePoolFilterHasNode) {
if err := r.ensurePersistentVolumeClaimsExist(ctx, hc, pool); err != nil {
opts := statusOptions()
if hc.Status.State != humiov1alpha1.HumioClusterStateRestarting && hc.Status.State != humiov1alpha1.HumioClusterStateUpgrading {
opts.withNodePoolState(humiov1alpha1.HumioClusterStatePending, pool.GetNodePoolName())
opts.withNodePoolState(humiov1alpha1.HumioClusterStatePending, pool.GetNodePoolName(), pool.GetDesiredPodRevision())
}
return r.updateStatus(ctx, r.Client.Status(), hc, opts.
withMessage(err.Error()))
Expand Down Expand Up @@ -434,24 +422,6 @@ func (r *HumioClusterReconciler) hasNoUnusedNodePoolStatus(hc *humiov1alpha1.Hum
return true, 0
}

func (r *HumioClusterReconciler) ensurePodRevisionAnnotation(ctx context.Context, hc *humiov1alpha1.HumioCluster, hnp *HumioNodePool) (string, error) {
revisionKey, revisionValue := hnp.GetHumioClusterNodePoolRevisionAnnotation()
if revisionValue == 0 {
revisionValue = 1
r.Log.Info(fmt.Sprintf("setting cluster pod revision %s=%d", revisionKey, revisionValue))
if hc.Annotations == nil {
hc.Annotations = map[string]string{}
}
hc.Annotations[revisionKey] = strconv.Itoa(revisionValue)
hnp.SetHumioClusterNodePoolRevisionAnnotation(revisionValue)

if err := r.Update(ctx, hc); err != nil {
return humiov1alpha1.HumioClusterStatePending, r.logErrorAndReturn(err, fmt.Sprintf("unable to set pod revision annotation %s", revisionKey))
}
}
return hc.Status.State, nil
}

func (r *HumioClusterReconciler) validateInitialPodSpec(hnp *HumioNodePool) error {
if _, err := ConstructPod(hnp, "", &podAttachments{}); err != nil {
return r.logErrorAndReturn(err, "failed to validate pod spec")
Expand Down Expand Up @@ -2038,27 +2008,22 @@ func (r *HumioClusterReconciler) ensureMismatchedPodsAreDeleted(ctx context.Cont
// PodRestartPolicyRecreate == HumioClusterStateUpgrading
// PodRestartPolicyRolling == HumioClusterStateRestarting
if hc.Status.State == humiov1alpha1.HumioClusterStateRunning || hc.Status.State == humiov1alpha1.HumioClusterStateConfigError {
podRevision := hnp.GetDesiredPodRevision()
podRevision++
if desiredLifecycleState.WantsUpgrade() {
r.Log.Info(fmt.Sprintf("changing cluster state from %s to %s", hc.Status.State, humiov1alpha1.HumioClusterStateUpgrading))
r.Log.Info(fmt.Sprintf("changing cluster state from %s to %s with pod revision %d for node pool %s", hc.Status.State, humiov1alpha1.HumioClusterStateUpgrading, podRevision, hnp.GetNodePoolName()))
if result, err := r.updateStatus(ctx, r.Client.Status(), hc, statusOptions().
withNodePoolState(humiov1alpha1.HumioClusterStateUpgrading, hnp.GetNodePoolName())); err != nil {
withNodePoolState(humiov1alpha1.HumioClusterStateUpgrading, hnp.GetNodePoolName(), podRevision)); err != nil {
return result, err
}
if revision, err := r.incrementHumioClusterPodRevision(ctx, hc, hnp); err != nil {
return r.updateStatus(ctx, r.Client.Status(), hc, statusOptions().
withMessage(r.logErrorAndReturn(err, fmt.Sprintf("failed to increment pod revision to %d", revision)).Error()))
}
return reconcile.Result{Requeue: true}, nil
}
if !desiredLifecycleState.WantsUpgrade() && desiredLifecycleState.WantsRestart() {
r.Log.Info(fmt.Sprintf("changing cluster state from %s to %s with pod revision %d for node pool %s", hc.Status.State, humiov1alpha1.HumioClusterStateRestarting, podRevision, hnp.GetNodePoolName()))
if result, err := r.updateStatus(ctx, r.Client.Status(), hc, statusOptions().
withNodePoolState(humiov1alpha1.HumioClusterStateRestarting, hnp.GetNodePoolName())); err != nil {
withNodePoolState(humiov1alpha1.HumioClusterStateRestarting, hnp.GetNodePoolName(), podRevision)); err != nil {
return result, err
}
if revision, err := r.incrementHumioClusterPodRevision(ctx, hc, hnp); err != nil {
return r.updateStatus(ctx, r.Client.Status(), hc, statusOptions().
withMessage(r.logErrorAndReturn(err, fmt.Sprintf("failed to increment pod revision to %d", revision)).Error()))
}
return reconcile.Result{Requeue: true}, nil
}
}
Expand Down Expand Up @@ -2118,7 +2083,7 @@ func (r *HumioClusterReconciler) ensureMismatchedPodsAreDeleted(ctx context.Cont
if hc.Status.State == humiov1alpha1.HumioClusterStateRestarting || hc.Status.State == humiov1alpha1.HumioClusterStateUpgrading || hc.Status.State == humiov1alpha1.HumioClusterStateConfigError {
r.Log.Info(fmt.Sprintf("no longer deleting pods. changing cluster state from %s to %s", hc.Status.State, humiov1alpha1.HumioClusterStateRunning))
if result, err := r.updateStatus(ctx, r.Client.Status(), hc, statusOptions().
withNodePoolState(humiov1alpha1.HumioClusterStateRunning, hnp.GetNodePoolName())); err != nil {
withNodePoolState(humiov1alpha1.HumioClusterStateRunning, hnp.GetNodePoolName(), hnp.GetDesiredPodRevision())); err != nil {
return result, err
}
}
Expand Down
45 changes: 23 additions & 22 deletions controllers/humiocluster_defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,9 +86,18 @@ type HumioNodePool struct {
path string
ingress humiov1alpha1.HumioClusterIngressSpec
clusterAnnotations map[string]string
desiredPodRevision int
}

func NewHumioNodeManagerFromHumioCluster(hc *humiov1alpha1.HumioCluster) *HumioNodePool {
desiredPodRevision := 0
for _, status := range hc.Status.NodePoolStatus {
if status.Name == hc.Name {
desiredPodRevision = status.DesiredPodRevision
break
}
}

return &HumioNodePool{
namespace: hc.Namespace,
clusterName: hc.Name,
Expand Down Expand Up @@ -147,10 +156,19 @@ func NewHumioNodeManagerFromHumioCluster(hc *humiov1alpha1.HumioCluster) *HumioN
path: hc.Spec.Path,
ingress: hc.Spec.Ingress,
clusterAnnotations: hc.Annotations,
desiredPodRevision: desiredPodRevision,
}
}

func NewHumioNodeManagerFromHumioNodePool(hc *humiov1alpha1.HumioCluster, hnp *humiov1alpha1.HumioNodePoolSpec) *HumioNodePool {
desiredPodRevision := 0
for _, status := range hc.Status.NodePoolStatus {
if status.Name == strings.Join([]string{hc.Name, hnp.Name}, "-") {
desiredPodRevision = status.DesiredPodRevision
break
}
}

return &HumioNodePool{
namespace: hc.Namespace,
clusterName: hc.Name,
Expand Down Expand Up @@ -209,6 +227,7 @@ func NewHumioNodeManagerFromHumioNodePool(hc *humiov1alpha1.HumioCluster, hnp *h
path: hc.Spec.Path,
ingress: hc.Spec.Ingress,
clusterAnnotations: hc.Annotations,
desiredPodRevision: desiredPodRevision,
}
}

Expand Down Expand Up @@ -289,29 +308,11 @@ func (hnp *HumioNodePool) GetDigestPartitionsCount() int {
return digestPartitionsCount
}

func (hnp *HumioNodePool) SetHumioClusterNodePoolRevisionAnnotation(newRevision int) {
if hnp.clusterAnnotations == nil {
hnp.clusterAnnotations = map[string]string{}
}
revisionKey, _ := hnp.GetHumioClusterNodePoolRevisionAnnotation()
hnp.clusterAnnotations[revisionKey] = strconv.Itoa(newRevision)
}

func (hnp *HumioNodePool) GetHumioClusterNodePoolRevisionAnnotation() (string, int) {
annotations := map[string]string{}
if len(hnp.clusterAnnotations) > 0 {
annotations = hnp.clusterAnnotations
}
podAnnotationKey := strings.Join([]string{PodRevisionAnnotation, hnp.GetNodePoolName()}, "-")
revision, ok := annotations[podAnnotationKey]
if !ok {
revision = "0"
}
existingRevision, err := strconv.Atoi(revision)
if err != nil {
return "", -1
func (hnp *HumioNodePool) GetDesiredPodRevision() int {
if hnp.desiredPodRevision == 0 {
return 1
}
return podAnnotationKey, existingRevision
return hnp.desiredPodRevision
}

func (hnp *HumioNodePool) GetIngress() humiov1alpha1.HumioClusterIngressSpec {
Expand Down
2 changes: 1 addition & 1 deletion controllers/humiocluster_pod_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ func (r *HumioClusterReconciler) getPodsStatus(ctx context.Context, hc *humiov1a
}
}
}
r.Log.Info(fmt.Sprintf("pod status readyCount=%d notReadyCount=%d podsReady=%s podsNotReady=%s", status.readyCount, status.notReadyCount, podsReady, podsNotReady))
r.Log.Info(fmt.Sprintf("pod status nodePoolName=%s readyCount=%d notReadyCount=%d podsReady=%s podsNotReady=%s", hnp.GetNodePoolName(), status.readyCount, status.notReadyCount, podsReady, podsNotReady))
// collect ready pods and not ready pods in separate lists and just print the lists here instead of a log entry per host
return &status, nil
}
Expand Down
7 changes: 3 additions & 4 deletions controllers/humiocluster_pods.go
Original file line number Diff line number Diff line change
Expand Up @@ -812,7 +812,7 @@ func (r *HumioClusterReconciler) createPod(ctx context.Context, hc *humiov1alpha
pod.Annotations[certHashAnnotation] = podNameAndCertHash.certificateHash
}

_, podRevision := hnp.GetHumioClusterNodePoolRevisionAnnotation()
podRevision := hnp.GetDesiredPodRevision()
r.setPodRevision(pod, podRevision)

r.Log.Info(fmt.Sprintf("creating pod %s with revision %d", pod.Name, podRevision))
Expand Down Expand Up @@ -874,8 +874,8 @@ func (r *HumioClusterReconciler) podsMatch(hnp *HumioNodePool, pod corev1.Pod, d
var certHasAnnotationMatches bool

desiredPodHash := podSpecAsSHA256(hnp, desiredPod)
_, existingPodRevision := hnp.GetHumioClusterNodePoolRevisionAnnotation()
r.setPodRevision(&desiredPod, existingPodRevision)
desiredPodRevision := hnp.GetDesiredPodRevision()
r.setPodRevision(&desiredPod, desiredPodRevision)
if pod.Annotations[PodHashAnnotation] == desiredPodHash {
specMatches = true
}
Expand Down Expand Up @@ -1131,7 +1131,6 @@ func (r *HumioClusterReconciler) getPodStatusList(ctx context.Context, hc *humio
}
}
sort.Sort(podStatusList)
r.Log.Info(fmt.Sprintf("updating pod status with %+v", podStatusList))
return podStatusList, nil
}

Expand Down
Loading

0 comments on commit 02db873

Please sign in to comment.