Skip to content

Commit

Permalink
Merge pull request #148 from mrlihanbo/refactor/follow-controller
Browse files Browse the repository at this point in the history
  • Loading branch information
gary-lgy authored Aug 1, 2023
2 parents 9952582 + 3473b46 commit 87282d2
Show file tree
Hide file tree
Showing 10 changed files with 290 additions and 462 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ output
vendor
.vscode
*debug*
.DS_Store

# log files
*.log
Expand Down
1 change: 1 addition & 0 deletions cmd/controller-manager/app/controllermanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ var knownControllers = map[string]controllermanager.StartControllerFunc{
FederatedClusterControllerName: startFederatedClusterController,
SchedulerName: startScheduler,
SyncControllerName: startSyncController,
FollowerControllerName: startFollowerController,
}

var controllersDisabledByDefault = sets.New[string]()
Expand Down
24 changes: 24 additions & 0 deletions cmd/controller-manager/app/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
controllercontext "github.com/kubewharf/kubeadmiral/pkg/controllers/context"
"github.com/kubewharf/kubeadmiral/pkg/controllers/federate"
"github.com/kubewharf/kubeadmiral/pkg/controllers/federatedcluster"
"github.com/kubewharf/kubeadmiral/pkg/controllers/follower"
"github.com/kubewharf/kubeadmiral/pkg/controllers/nsautoprop"
"github.com/kubewharf/kubeadmiral/pkg/controllers/override"
"github.com/kubewharf/kubeadmiral/pkg/controllers/policyrc"
Expand Down Expand Up @@ -237,3 +238,26 @@ func startSyncController(

return syncController, nil
}

func startFollowerController(
ctx context.Context,
controllerCtx *controllercontext.Context,
) (controllermanager.Controller, error) {
followerController, err := follower.NewFollowerController(
controllerCtx.KubeClientset,
controllerCtx.FedClientset,
controllerCtx.InformerManager,
controllerCtx.FedInformerFactory.Core().V1alpha1().FederatedObjects(),
controllerCtx.FedInformerFactory.Core().V1alpha1().ClusterFederatedObjects(),
controllerCtx.Metrics,
klog.Background(),
controllerCtx.WorkerCount,
)
if err != nil {
return nil, fmt.Errorf("error creating follower controller: %w", err)
}

go followerController.Run(ctx)

return followerController, nil
}
1 change: 0 additions & 1 deletion config/sample/extra/pod-ftc.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ spec:
controllers:
- - kubeadmiral.io/global-scheduler
- - kubeadmiral.io/overridepolicy-controller
- - kubeadmiral.io/follower-controller
statusCollection:
fields:
- metadata.creationTimestamp
Expand Down
5 changes: 0 additions & 5 deletions config/sample/host/01-ftc.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@ spec:
controllers:
- - kubeadmiral.io/global-scheduler
- - kubeadmiral.io/overridepolicy-controller
- - kubeadmiral.io/follower-controller
pathDefinition:
labelSelector: spec.selector
replicasSpec: spec.replicas
Expand Down Expand Up @@ -264,7 +263,6 @@ spec:
controllers:
- - kubeadmiral.io/global-scheduler
- - kubeadmiral.io/overridepolicy-controller
- - kubeadmiral.io/follower-controller
statusCollection:
enabled: true
fields:
Expand All @@ -288,7 +286,6 @@ spec:
controllers:
- - kubeadmiral.io/global-scheduler
- - kubeadmiral.io/overridepolicy-controller
- - kubeadmiral.io/follower-controller
statusCollection:
enabled: true
fields:
Expand All @@ -309,7 +306,6 @@ spec:
controllers:
- - kubeadmiral.io/global-scheduler
- - kubeadmiral.io/overridepolicy-controller
- - kubeadmiral.io/follower-controller
statusCollection:
enabled: true
fields:
Expand All @@ -330,7 +326,6 @@ spec:
controllers:
- - kubeadmiral.io/global-scheduler
- - kubeadmiral.io/overridepolicy-controller
- - kubeadmiral.io/follower-controller
statusCollection:
enabled: true
fields:
Expand Down
117 changes: 0 additions & 117 deletions pkg/client/generic/genericclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,21 +23,15 @@ package generic
import (
"context"
"fmt"
"strings"

appsv1 "k8s.io/api/apps/v1"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/rest"
"sigs.k8s.io/controller-runtime/pkg/client"

"github.com/kubewharf/kubeadmiral/pkg/client/generic/scheme"
"github.com/kubewharf/kubeadmiral/pkg/controllers/common"
"github.com/kubewharf/kubeadmiral/pkg/controllers/util/history"
)

type Client interface {
Expand All @@ -48,7 +42,6 @@ type Client interface {
List(ctx context.Context, obj client.ObjectList, namespace string) error
UpdateStatus(ctx context.Context, obj client.Object) error
Patch(ctx context.Context, obj client.Object, patch client.Patch) error
Rollback(ctx context.Context, obj client.Object, toRevision int64) error
DeleteHistory(ctx context.Context, obj client.Object) error

ListWithOptions(ctx context.Context, obj client.ObjectList, opts ...client.ListOption) error
Expand Down Expand Up @@ -121,91 +114,6 @@ func (c *genericClient) Patch(ctx context.Context, obj client.Object, patch clie
return c.client.Patch(ctx, obj, patch)
}

// Rollback rollbacks federated Object such as FederatedDeployment
func (c *genericClient) Rollback(ctx context.Context, obj client.Object, toRevision int64) error {
if toRevision < 0 {
return fmt.Errorf("unable to find specified revision %v in history", toRevision)
}
if toRevision == 0 {
// try to get last revision from annotations, fallback to list all revisions on error
if err := c.rollbackToLastRevision(ctx, obj); err == nil {
return nil
}
}

history, err := c.controlledHistory(ctx, obj)
if err != nil {
return fmt.Errorf("failed to list history: %s", err)
}
if toRevision == 0 && len(history) <= 1 {
return fmt.Errorf("no last revision to roll back to")
}

toHistory := findHistory(toRevision, history)
if toHistory == nil {
return fmt.Errorf("unable to find specified revision %v in history", toHistory)
}

// Restore revision
if err := c.Patch(ctx, obj, client.RawPatch(types.JSONPatchType, toHistory.Data.Raw)); err != nil {
return fmt.Errorf("failed restoring revision %d: %v", toRevision, err)
}
return nil
}

func (c *genericClient) rollbackToLastRevision(ctx context.Context, obj client.Object) error {
accessor, err := meta.Accessor(obj)
if err != nil {
return err
}
lastRevisionNameWithHash := accessor.GetAnnotations()[common.LastRevisionAnnotation]
if len(lastRevisionNameWithHash) == 0 {
return fmt.Errorf("annotation: %s not found", common.LastRevisionAnnotation)
}

lastRevisionName, err := c.checkLastRevisionNameWithHash(lastRevisionNameWithHash, obj)
if err != nil {
return fmt.Errorf("failed to check last revision name, err: %v", err)
}

latestRevision := &appsv1.ControllerRevision{}
if err := c.Get(ctx, latestRevision, accessor.GetNamespace(), lastRevisionName); err != nil {
return err
}

// restore latest revision
if err := c.Patch(ctx, obj, client.RawPatch(types.JSONPatchType, latestRevision.Data.Raw)); err != nil {
return fmt.Errorf("failed restoring latest revision: %v", err)
}
return nil
}

func (c *genericClient) checkLastRevisionNameWithHash(lastRevisionNameWithHash string, obj client.Object) (string, error) {
parts := strings.Split(lastRevisionNameWithHash, "|")
if len(parts) != 2 {
return "", fmt.Errorf("invalid lastRevisionNameWithHash: %s", lastRevisionNameWithHash)
}
lastRevisionName, hash := parts[0], parts[1]

utdObj, err := runtime.DefaultUnstructuredConverter.ToUnstructured(obj)
if err != nil {
return "", err
}

template, ok, err := unstructured.NestedMap(utdObj, "spec", "template", "spec", "template")
if err != nil {
return "", err
}
if !ok {
return "", fmt.Errorf("spec.template.spec.template is not found, fedResource: %+v", obj)
}

if templateHash := history.HashObject(template); templateHash != hash {
return "", fmt.Errorf("pod template hash: %s, last revision name suffix: %s, they should be equal", templateHash, hash)
}
return lastRevisionName, nil
}

// controlledHistories returns all ControllerRevisions in namespace that selected by selector and owned by accessor
func (c *genericClient) controlledHistory(ctx context.Context, obj client.Object) ([]*appsv1.ControllerRevision, error) {
accessor, err := meta.Accessor(obj)
Expand Down Expand Up @@ -246,28 +154,3 @@ func (c *genericClient) DeleteHistory(ctx context.Context, obj client.Object) er
}
return nil
}

// findHistory returns a controllerrevision of a specific revision from the given controllerrevisions.
// It returns nil if no such controllerrevision exists.
// If toRevision is 0, the last previously used history is returned.
func findHistory(toRevision int64, allHistory []*appsv1.ControllerRevision) *appsv1.ControllerRevision {
if toRevision == 0 && len(allHistory) <= 1 {
return nil
}

// Find the history to rollback to
var toHistory *appsv1.ControllerRevision
if toRevision == 0 {
// If toRevision == 0, find the latest revision (2nd max)
history.SortControllerRevisions(allHistory)
toHistory = allHistory[len(allHistory)-2]
} else {
for _, h := range allHistory {
if h.Revision == toRevision {
// If toRevision != 0, find the history with matching revision
return h
}
}
}
return toHistory
}
5 changes: 2 additions & 3 deletions pkg/controllers/follower/bidirectional_cache.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
//go:build exclude
/*
Copyright 2023 The KubeAdmiral Authors.
Expand Down Expand Up @@ -40,13 +39,13 @@ type bidirectionalCache[V1, V2 comparable] struct {
func (c *bidirectionalCache[V1, V2]) lookup(key V1) sets.Set[V2] {
c.RLock()
defer c.RUnlock()
return c.cache[key]
return c.cache[key].Clone()
}

func (c *bidirectionalCache[V1, V2]) reverseLookup(key V2) sets.Set[V1] {
c.RLock()
defer c.RUnlock()
return c.reverseCache[key]
return c.reverseCache[key].Clone()
}

func (c *bidirectionalCache[V1, V2]) update(key V1, newValues sets.Set[V2]) {
Expand Down
Loading

0 comments on commit 87282d2

Please sign in to comment.