Skip to content

Commit

Permalink
fix(status): make it compilable
Browse files Browse the repository at this point in the history
  • Loading branch information
gary-lgy committed Jul 27, 2023
1 parent 0d02223 commit b0035e0
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 103 deletions.
105 changes: 7 additions & 98 deletions pkg/controllers/status/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (
"fmt"
"reflect"
"sort"
"strconv"
"strings"
"time"

Expand Down Expand Up @@ -153,7 +152,7 @@ func NewStatusController(
// Build queue for triggering cluster reconciliations.
s.clusterQueue = workqueue.NewNamedDelayingQueue("status-controller-cluster-queue")

fedObjectHandler := util.NewTriggerOnAllChanges(func(o pkgruntime.Object) {
fedObjectHandler := eventhandlers.NewTriggerOnAllChanges(func(o pkgruntime.Object) {
s.enqueueEnableCollectedStatusObject(common.NewQualifiedName(o), 0)
})

Expand All @@ -165,13 +164,13 @@ func NewStatusController(
return nil, err
}

if _, err := s.collectedStatusInformer.Informer().AddEventHandler(util.NewTriggerOnAllChanges(func(o pkgruntime.Object) {
if _, err := s.collectedStatusInformer.Informer().AddEventHandler(eventhandlers.NewTriggerOnAllChanges(func(o pkgruntime.Object) {
s.worker.Enqueue(common.NewQualifiedName(o))
})); err != nil {
return nil, err
}

if _, err := s.clusterCollectedStatusInformer.Informer().AddEventHandler(util.NewTriggerOnAllChanges(func(o pkgruntime.Object) {
if _, err := s.clusterCollectedStatusInformer.Informer().AddEventHandler(eventhandlers.NewTriggerOnAllChanges(func(o pkgruntime.Object) {
s.worker.Enqueue(common.NewQualifiedName(o))
})); err != nil {
return nil, err
Expand Down Expand Up @@ -396,7 +395,7 @@ func (s *StatusController) reconcile(ctx context.Context, qualifiedName common.Q
if existingStatus != nil {
hasRSDigestsAnnotation, err = annotation.HasAnnotationKeyValue(
existingStatus,
util.LatestReplicasetDigestsAnnotation,
common.LatestReplicasetDigestsAnnotation,
rsDigestsAnnotation,
)
if err != nil {
Expand All @@ -407,22 +406,7 @@ func (s *StatusController) reconcile(ctx context.Context, qualifiedName common.Q
collectedStatus := newCollectedStatusObject(fedObject, clusterStatuses)

if rsDigestsAnnotation != "" {
collectedStatus.SetAnnotations(map[string]string{util.LatestReplicasetDigestsAnnotation: rsDigestsAnnotation})
}
replicasAnnotationUpdated := false
if targetIsDeployment {
replicasAnnotationUpdated, err = s.setReplicasAnnotations(
ctx,
collectedStatus,
fedObject,
clusterNames,
templateQualifiedName,
templateGVK,
typeConfig,
)
if err != nil {
keyedLogger.Error(err, "Failed to set annotations about replicas")
}
collectedStatus.SetAnnotations(map[string]string{common.LatestReplicasetDigestsAnnotation: rsDigestsAnnotation})
}

if existingStatus == nil {
Expand All @@ -437,7 +421,6 @@ func (s *StatusController) reconcile(ctx context.Context, qualifiedName common.Q
}
} else if !reflect.DeepEqual(existingStatus.GetGenericCollectedStatus().Clusters, collectedStatus.GetGenericCollectedStatus().Clusters) ||
!reflect.DeepEqual(collectedStatus.GetLabels(), existingStatus.GetLabels()) ||
replicasAnnotationUpdated ||
(rsDigestsAnnotation != "" && !hasRSDigestsAnnotation) {
collectedStatus.GetLastUpdateTime().Time = time.Now()
existingStatus.GetGenericCollectedStatus().Clusters = collectedStatus.GetGenericCollectedStatus().Clusters
Expand Down Expand Up @@ -486,11 +469,12 @@ func (s *StatusController) enqueueEnableCollectedStatusObject(qualifiedName comm
return
}

templateGVK, err := fedObject.GetSpec().GetTemplateGVK()
templateMetadata, err := fedObject.GetSpec().GetTemplateMetadata()
if err != nil {
keyedLogger.Error(err, "Failed to get template gvk")
return
}
templateGVK := templateMetadata.GroupVersionKind()

typeConfig, exists := s.ftcManager.GetResourceFTC(templateGVK)
if !exists || typeConfig == nil {
Expand Down Expand Up @@ -665,81 +649,6 @@ func (s *StatusController) latestReplicasetDigests(
return digests, nil
}

func (s *StatusController) realUpdatedReplicas(
ctx context.Context,
clusterNames []string,
targetQualifiedName common.QualifiedName,
targetGVK schema.GroupVersionKind,
typeConfig *fedcorev1a1.FederatedTypeConfig,
revision string,
) (string, error) {
key := targetQualifiedName.String()
var updatedReplicas int64
targetKind := typeConfig.Spec.SourceType.Kind
keyedLogger := klog.FromContext(ctx)

for _, clusterName := range clusterNames {
clusterObj, exist, err := informermanager.GetClusterObject(
ctx,
s.ftcManager,
s.fedInformerManager,
clusterName,
targetQualifiedName,
targetGVK,
)
if err != nil {
return "", errors.Wrapf(err, "Failed to get %s %q from cluster %q", targetKind, key, clusterName)
}
if !exist {
continue
}
// ignore digest errors for now since we want to try the best to collect the status
digest, err := util.ReplicaSetDigestFromObject(clusterObj)
if err != nil {
keyedLogger.WithValues("cluster-name", clusterName).Error(err, "Failed to get latestreplicaset digest")
continue
}
keyedLogger.WithValues("cluster-name", clusterName, "replicas-digest", digest).V(4).Info("Got latestreplicaset digest")
if digest.CurrentRevision != revision {
continue
}
if digest.ObservedGeneration < digest.Generation {
continue
}
updatedReplicas += digest.UpdatedReplicas
}
return strconv.FormatInt(updatedReplicas, 10), nil
}

func (s *StatusController) setReplicasAnnotations(
ctx context.Context,
collectedStatus fedcorev1a1.GenericCollectedStatusObject,
fedObject fedcorev1a1.GenericFederatedObject,
clusterNames []string,
qualifedName common.QualifiedName,
targetGVK schema.GroupVersionKind,
typeConfig *fedcorev1a1.FederatedTypeConfig,
) (bool, error) {
revision, ok := fedObject.GetAnnotations()[common.CurrentRevisionAnnotation]
if !ok {
return false, nil
}
updatedReplicas, err := s.realUpdatedReplicas(ctx, clusterNames, qualifedName, targetGVK, typeConfig, revision)
if err != nil {
return false, err
}

collectedStatusAnno := collectedStatus.GetAnnotations()
if collectedStatusAnno == nil {
collectedStatusAnno = make(map[string]string)
}
collectedStatusAnno[util.AggregatedUpdatedReplicas] = updatedReplicas
collectedStatusAnno[common.CurrentRevisionAnnotation] = revision

collectedStatus.SetAnnotations(collectedStatusAnno)
return true, nil
}

func newCollectedStatusObject(
fedObj fedcorev1a1.GenericFederatedObject,
clusterStatus []fedcorev1a1.CollectedFieldsWithCluster,
Expand Down
5 changes: 0 additions & 5 deletions pkg/controllers/util/federatedstatus.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,6 @@ const (
LatestReplicasetObservedGenerationAnnotation = "latestreplicaset.kubeadmiral.io/observed-generation"
)

const (
// annotations for federatedDeploymentStatus
AggregatedUpdatedReplicas = common.DefaultPrefix + "aggregated-updated-replicas"
)

// FederatedResource is a generic representation of a federated type
type FederatedResource struct {
metav1.TypeMeta `json:",inline"`
Expand Down

0 comments on commit b0035e0

Please sign in to comment.