Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: follower controller #148

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ output
vendor
.vscode
*debug*
.DS_Store

# log files
*.log
Expand Down
1 change: 1 addition & 0 deletions cmd/controller-manager/app/controllermanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ var knownControllers = map[string]controllermanager.StartControllerFunc{
FederatedClusterControllerName: startFederatedClusterController,
SchedulerName: startScheduler,
SyncControllerName: startSyncController,
FollowerControllerName: startFollowerController,
}

var controllersDisabledByDefault = sets.New[string]()
Expand Down
24 changes: 24 additions & 0 deletions cmd/controller-manager/app/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
controllercontext "github.com/kubewharf/kubeadmiral/pkg/controllers/context"
"github.com/kubewharf/kubeadmiral/pkg/controllers/federate"
"github.com/kubewharf/kubeadmiral/pkg/controllers/federatedcluster"
"github.com/kubewharf/kubeadmiral/pkg/controllers/follower"
"github.com/kubewharf/kubeadmiral/pkg/controllers/nsautoprop"
"github.com/kubewharf/kubeadmiral/pkg/controllers/override"
"github.com/kubewharf/kubeadmiral/pkg/controllers/policyrc"
Expand Down Expand Up @@ -237,3 +238,26 @@ func startSyncController(

return syncController, nil
}

func startFollowerController(
ctx context.Context,
controllerCtx *controllercontext.Context,
) (controllermanager.Controller, error) {
followerController, err := follower.NewFollowerController(
controllerCtx.KubeClientset,
controllerCtx.FedClientset,
controllerCtx.InformerManager,
controllerCtx.FedInformerFactory.Core().V1alpha1().FederatedObjects(),
controllerCtx.FedInformerFactory.Core().V1alpha1().ClusterFederatedObjects(),
controllerCtx.Metrics,
klog.Background(),
controllerCtx.WorkerCount,
)
if err != nil {
return nil, fmt.Errorf("error creating follower controller: %w", err)
}

go followerController.Run(ctx)

return followerController, nil
}
1 change: 0 additions & 1 deletion config/sample/extra/pod-ftc.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ spec:
controllers:
- - kubeadmiral.io/global-scheduler
- - kubeadmiral.io/overridepolicy-controller
- - kubeadmiral.io/follower-controller
statusCollection:
fields:
- metadata.creationTimestamp
Expand Down
5 changes: 0 additions & 5 deletions config/sample/host/01-ftc.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@ spec:
controllers:
- - kubeadmiral.io/global-scheduler
- - kubeadmiral.io/overridepolicy-controller
- - kubeadmiral.io/follower-controller
pathDefinition:
labelSelector: spec.selector
replicasSpec: spec.replicas
Expand Down Expand Up @@ -264,7 +263,6 @@ spec:
controllers:
- - kubeadmiral.io/global-scheduler
- - kubeadmiral.io/overridepolicy-controller
- - kubeadmiral.io/follower-controller
statusCollection:
enabled: true
fields:
Expand All @@ -288,7 +286,6 @@ spec:
controllers:
- - kubeadmiral.io/global-scheduler
- - kubeadmiral.io/overridepolicy-controller
- - kubeadmiral.io/follower-controller
statusCollection:
enabled: true
fields:
Expand All @@ -309,7 +306,6 @@ spec:
controllers:
- - kubeadmiral.io/global-scheduler
- - kubeadmiral.io/overridepolicy-controller
- - kubeadmiral.io/follower-controller
statusCollection:
enabled: true
fields:
Expand All @@ -330,7 +326,6 @@ spec:
controllers:
- - kubeadmiral.io/global-scheduler
- - kubeadmiral.io/overridepolicy-controller
- - kubeadmiral.io/follower-controller
statusCollection:
enabled: true
fields:
Expand Down
117 changes: 0 additions & 117 deletions pkg/client/generic/genericclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,21 +23,15 @@ package generic
import (
"context"
"fmt"
"strings"

appsv1 "k8s.io/api/apps/v1"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/rest"
"sigs.k8s.io/controller-runtime/pkg/client"

"github.com/kubewharf/kubeadmiral/pkg/client/generic/scheme"
"github.com/kubewharf/kubeadmiral/pkg/controllers/common"
"github.com/kubewharf/kubeadmiral/pkg/controllers/util/history"
)

type Client interface {
Expand All @@ -48,7 +42,6 @@ type Client interface {
List(ctx context.Context, obj client.ObjectList, namespace string) error
UpdateStatus(ctx context.Context, obj client.Object) error
Patch(ctx context.Context, obj client.Object, patch client.Patch) error
Rollback(ctx context.Context, obj client.Object, toRevision int64) error
DeleteHistory(ctx context.Context, obj client.Object) error

ListWithOptions(ctx context.Context, obj client.ObjectList, opts ...client.ListOption) error
Expand Down Expand Up @@ -121,91 +114,6 @@ func (c *genericClient) Patch(ctx context.Context, obj client.Object, patch clie
return c.client.Patch(ctx, obj, patch)
}

// Rollback rollbacks federated Object such as FederatedDeployment
func (c *genericClient) Rollback(ctx context.Context, obj client.Object, toRevision int64) error {
if toRevision < 0 {
return fmt.Errorf("unable to find specified revision %v in history", toRevision)
}
if toRevision == 0 {
// try to get last revision from annotations, fallback to list all revisions on error
if err := c.rollbackToLastRevision(ctx, obj); err == nil {
return nil
}
}

history, err := c.controlledHistory(ctx, obj)
if err != nil {
return fmt.Errorf("failed to list history: %s", err)
}
if toRevision == 0 && len(history) <= 1 {
return fmt.Errorf("no last revision to roll back to")
}

toHistory := findHistory(toRevision, history)
if toHistory == nil {
return fmt.Errorf("unable to find specified revision %v in history", toHistory)
}

// Restore revision
if err := c.Patch(ctx, obj, client.RawPatch(types.JSONPatchType, toHistory.Data.Raw)); err != nil {
return fmt.Errorf("failed restoring revision %d: %v", toRevision, err)
}
return nil
}

func (c *genericClient) rollbackToLastRevision(ctx context.Context, obj client.Object) error {
accessor, err := meta.Accessor(obj)
if err != nil {
return err
}
lastRevisionNameWithHash := accessor.GetAnnotations()[common.LastRevisionAnnotation]
if len(lastRevisionNameWithHash) == 0 {
return fmt.Errorf("annotation: %s not found", common.LastRevisionAnnotation)
}

lastRevisionName, err := c.checkLastRevisionNameWithHash(lastRevisionNameWithHash, obj)
if err != nil {
return fmt.Errorf("failed to check last revision name, err: %v", err)
}

latestRevision := &appsv1.ControllerRevision{}
if err := c.Get(ctx, latestRevision, accessor.GetNamespace(), lastRevisionName); err != nil {
return err
}

// restore latest revision
if err := c.Patch(ctx, obj, client.RawPatch(types.JSONPatchType, latestRevision.Data.Raw)); err != nil {
return fmt.Errorf("failed restoring latest revision: %v", err)
}
return nil
}

func (c *genericClient) checkLastRevisionNameWithHash(lastRevisionNameWithHash string, obj client.Object) (string, error) {
parts := strings.Split(lastRevisionNameWithHash, "|")
if len(parts) != 2 {
return "", fmt.Errorf("invalid lastRevisionNameWithHash: %s", lastRevisionNameWithHash)
}
lastRevisionName, hash := parts[0], parts[1]

utdObj, err := runtime.DefaultUnstructuredConverter.ToUnstructured(obj)
if err != nil {
return "", err
}

template, ok, err := unstructured.NestedMap(utdObj, "spec", "template", "spec", "template")
if err != nil {
return "", err
}
if !ok {
return "", fmt.Errorf("spec.template.spec.template is not found, fedResource: %+v", obj)
}

if templateHash := history.HashObject(template); templateHash != hash {
return "", fmt.Errorf("pod template hash: %s, last revision name suffix: %s, they should be equal", templateHash, hash)
}
return lastRevisionName, nil
}

// controlledHistories returns all ControllerRevisions in namespace that selected by selector and owned by accessor
func (c *genericClient) controlledHistory(ctx context.Context, obj client.Object) ([]*appsv1.ControllerRevision, error) {
accessor, err := meta.Accessor(obj)
Expand Down Expand Up @@ -246,28 +154,3 @@ func (c *genericClient) DeleteHistory(ctx context.Context, obj client.Object) er
}
return nil
}

// findHistory returns a controllerrevision of a specific revision from the given controllerrevisions.
// It returns nil if no such controllerrevision exists.
// If toRevision is 0, the last previously used history is returned.
func findHistory(toRevision int64, allHistory []*appsv1.ControllerRevision) *appsv1.ControllerRevision {
if toRevision == 0 && len(allHistory) <= 1 {
return nil
}

// Find the history to rollback to
var toHistory *appsv1.ControllerRevision
if toRevision == 0 {
// If toRevision == 0, find the latest revision (2nd max)
history.SortControllerRevisions(allHistory)
toHistory = allHistory[len(allHistory)-2]
} else {
for _, h := range allHistory {
if h.Revision == toRevision {
// If toRevision != 0, find the history with matching revision
return h
}
}
}
return toHistory
}
5 changes: 2 additions & 3 deletions pkg/controllers/follower/bidirectional_cache.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
//go:build exclude
/*
Copyright 2023 The KubeAdmiral Authors.
Expand Down Expand Up @@ -40,13 +39,13 @@ type bidirectionalCache[V1, V2 comparable] struct {
func (c *bidirectionalCache[V1, V2]) lookup(key V1) sets.Set[V2] {
c.RLock()
defer c.RUnlock()
return c.cache[key]
return c.cache[key].Clone()
}

func (c *bidirectionalCache[V1, V2]) reverseLookup(key V2) sets.Set[V1] {
c.RLock()
defer c.RUnlock()
return c.reverseCache[key]
return c.reverseCache[key].Clone()
}

func (c *bidirectionalCache[V1, V2]) update(key V1, newValues sets.Set[V2]) {
Expand Down
Loading
Loading