Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(scheduler): adopt unified types #173

Merged
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
e10bbe8
feat(federated-informer-manager): integrate pod informer
limhawjia Jul 24, 2023
513a40d
fix(federated-informer-manager): fix cluster deletion
limhawjia Jul 25, 2023
18f2623
refactor(cluster-controller): adopt unified types
limhawjia Jul 25, 2023
f87a860
refactor(cluster-controller): bootstrap controller-manager with clust…
limhawjia Jul 25, 2023
a54768f
refactor(scheduler): adopt unified types
limhawjia Jul 22, 2023
89c3720
fix(scheduler): trigger schedulingprofile informer start
limhawjia Jul 22, 2023
937a2bd
refactor(eventhandlers): refactor event handlers
limhawjia Jul 28, 2023
94ca742
refactor(core): change override controller name
limhawjia Jul 28, 2023
338ce5f
feat(override): deprecate override util package
limhawjia Jul 28, 2023
d4038e8
fix(policyrc): fix imports
limhawjia Jul 28, 2023
ed50baf
fix(cluster-controller): status update on cache not synced
limhawjia Jul 28, 2023
cfcaea9
fix(cluster-controller): cluster controller synced
limhawjia Jul 28, 2023
4c9895a
fix(scheduler): typos and unit test
limhawjia Jul 28, 2023
abe213f
fix(policyrc): nil pointer on controller initialization
limhawjia Jul 28, 2023
5285e18
fix(scheduler): nil pointer in cluster join
limhawjia Jul 28, 2023
44367f0
fix(scheduler): nil pointer in enqueue
limhawjia Jul 28, 2023
8c75b1c
fix(worker): remove enqueueObject and keyFunc
limhawjia Jul 28, 2023
fb44559
chore(cluster-controller): adjust healthcheck period
limhawjia Jul 28, 2023
af546b3
feat(scheduler): use label selectors for listing
limhawjia Jul 28, 2023
1b642db
fix(scheduler): use label selectors for listing
limhawjia Jul 28, 2023
76e5baa
fix(scheduler): fix extra error check and double reenque
limhawjia Jul 28, 2023
ab72f36
chore(scheduler): fix formatting
limhawjia Jul 28, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmd/controller-manager/app/controllermanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ const (
FederateControllerName = "federate"
FollowerControllerName = "follower"
PolicyRCControllerName = "policyrc"
OverrideControllerName = "overridepolicy"
OverrideControllerName = "override"
NamespaceAutoPropagationControllerName = "nsautoprop"
StatusControllerName = "status"
SchedulerName = "scheduler"
Expand Down
3 changes: 0 additions & 3 deletions pkg/controllers/common/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,9 +140,6 @@ const (
// TemplateGeneratorMergePatchAnnotation indicates the merge patch document capable of converting
// the source object to the template object.
TemplateGeneratorMergePatchAnnotation = FederateControllerPrefix + "template-generator-merge-patch"

PropagationPolicyNameLabel = DefaultPrefix + "propagation-policy-name"
ClusterPropagationPolicyNameLabel = DefaultPrefix + "cluster-propagation-policy-name"
)

// PropagatedAnnotationKeys and PropagatedLabelKeys are used to store the keys of annotations and labels that are present
Expand Down
1 change: 0 additions & 1 deletion pkg/controllers/federate/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,6 @@ func NewFederateController(
c.eventRecorder = eventsink.NewDefederatingRecorderMux(kubeClient, FederateControllerName, 6)
c.worker = worker.NewReconcileWorker[workerKey](
FederateControllerName,
nil,
c.reconcile,
worker.RateLimiterOptions{},
workerCount,
Expand Down
4 changes: 2 additions & 2 deletions pkg/controllers/federate/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,8 +270,8 @@ var (
)

federatedLabelSet = sets.New[string](
common.PropagationPolicyNameLabel,
common.ClusterPropagationPolicyNameLabel,
scheduler.PropagationPolicyNameLabel,
scheduler.ClusterPropagationPolicyNameLabel,
override.OverridePolicyNameLabel,
JackZxj marked this conversation as resolved.
Show resolved Hide resolved
override.ClusterOverridePolicyNameLabel,
)
Expand Down
28 changes: 17 additions & 11 deletions pkg/controllers/federatedcluster/clusterstatus.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
"k8s.io/apimachinery/pkg/labels"
"k8s.io/client-go/discovery"
corev1listers "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/retry"
"k8s.io/klog/v2"

Expand Down Expand Up @@ -68,24 +69,14 @@
if !exists {
return 0, fmt.Errorf("failed to get cluster client: FederatedInformerManager not yet up-to-date")
}

podLister, podsSynced, exists := c.federatedInformerManager.GetPodLister(cluster.Name)
if !exists {
return 0, fmt.Errorf("failed to get pod lister: FederatedInformerManager not yet up-to-date")
}
if !podsSynced() {
logger.V(3).Info("Pod informer not synced, will reenqueue")
return 100 * time.Millisecond, nil
}

nodeLister, nodesSynced, exists := c.federatedInformerManager.GetNodeLister(cluster.Name)
if !exists {
return 0, fmt.Errorf("failed to get node lister: FederatedInformerManager not yet up-to-date")
}
if !nodesSynced() {
logger.V(3).Info("Pod informer not synced, will reenqueue")
return 100 * time.Millisecond, nil
}

discoveryClient := clusterKubeClient.Discovery()

Expand All @@ -108,7 +99,14 @@

// We skip updating cluster resources and api resources if cluster is not ready
if readyStatus == corev1.ConditionTrue {
if err := updateClusterResources(ctx, &cluster.Status, podLister, nodeLister); err != nil {
if err := updateClusterResources(
ctx,
&cluster.Status,
podLister,
podsSynced,
nodeLister,
nodesSynced,
); err != nil {
logger.Error(err, "Failed to update cluster resources")
readyStatus = corev1.ConditionFalse
readyReason = ClusterResourceCollectionFailedReason
Expand Down Expand Up @@ -174,8 +172,16 @@
ctx context.Context,
clusterStatus *fedcorev1a1.FederatedClusterStatus,
podLister corev1listers.PodLister,
podsSynced cache.InformerSynced,
nodeLister corev1listers.NodeLister,
nodesSynced cache.InformerSynced,
) error {
ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
if !cache.WaitForCacheSync(ctx.Done(), podsSynced, nodesSynced) {

Check failure on line 181 in pkg/controllers/federatedcluster/clusterstatus.go

View workflow job for this annotation

GitHub Actions / lint

File is not `gci`-ed with --skip-generated -s standard -s default -s prefix(github.com/kubewharf/kubeadmiral) --custom-order (gci)

Check failure on line 181 in pkg/controllers/federatedcluster/clusterstatus.go

View workflow job for this annotation

GitHub Actions / lint

File is not `gofumpt`-ed (gofumpt)
JackZxj marked this conversation as resolved.
Show resolved Hide resolved
return fmt.Errorf("timeout waiting for node and pod informer sync")
}

nodes, err := nodeLister.List(labels.Everything())
if err != nil {
return fmt.Errorf("failed to list nodes: %w", err)
Expand Down
14 changes: 6 additions & 8 deletions pkg/controllers/federatedcluster/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,16 +96,16 @@
logger klog.Logger,
clusterJoinTimeout time.Duration,
workerCount int,
fedsystemNamespace string,
fedSystemNamespace string,
) (*FederatedClusterController, error) {
c := &FederatedClusterController{
clusterInformer: clusterInformer,
federatedInformerManager: federatedInformerManager,
kubeClient: kubeClient,
fedClient: fedClient,
fedSystemNamespace: fedsystemNamespace,
fedSystemNamespace: fedSystemNamespace,
clusterHealthCheckConfig: &ClusterHealthCheckConfig{
Period: time.Minute,
Period: time.Second*30,

Check failure on line 108 in pkg/controllers/federatedcluster/controller.go

View workflow job for this annotation

GitHub Actions / lint

File is not `gci`-ed with --skip-generated -s standard -s default -s prefix(github.com/kubewharf/kubeadmiral) --custom-order (gci)

Check failure on line 108 in pkg/controllers/federatedcluster/controller.go

View workflow job for this annotation

GitHub Actions / lint

File is not `gofumpt`-ed (gofumpt)
},
clusterJoinTimeout: clusterJoinTimeout,
metrics: metrics,
Expand All @@ -124,7 +124,6 @@

c.worker = worker.NewReconcileWorker[common.QualifiedName](
FederatedClusterControllerName,
nil,
c.reconcile,
worker.RateLimiterOptions{},
workerCount,
Expand All @@ -133,7 +132,6 @@

c.statusCollectWorker = worker.NewReconcileWorker[common.QualifiedName](
FederatedClusterControllerName,
nil,
c.collectClusterStatus,
worker.RateLimiterOptions{
InitialDelay: 50 * time.Millisecond,
Expand Down Expand Up @@ -164,7 +162,7 @@
}

func (c *FederatedClusterController) HasSynced() bool {
return c.clusterInformer.Informer().HasSynced()
return c.clusterInformer.Informer().HasSynced() && c.federatedInformerManager.HasSynced()
}

func (c *FederatedClusterController) IsControllerReady() bool {
Expand Down Expand Up @@ -292,7 +290,7 @@

// Trigger initial status collection if successfully joined
if joined, alreadyFailed := isClusterJoined(&cluster.Status); joined && !alreadyFailed {
c.statusCollectWorker.EnqueueObject(cluster)
c.statusCollectWorker.Enqueue(common.NewQualifiedName(cluster))
}

return worker.StatusAllOK
Expand Down Expand Up @@ -492,7 +490,7 @@

for _, cluster := range clusters {
if clusterutil.IsClusterJoined(&cluster.Status) {
c.statusCollectWorker.EnqueueObject(cluster)
c.statusCollectWorker.Enqueue(common.NewQualifiedName(cluster))
}
}
}
3 changes: 1 addition & 2 deletions pkg/controllers/nsautoprop/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,6 @@ func NewNamespaceAutoPropagationController(

c.worker = worker.NewReconcileWorker[common.QualifiedName](
NamespaceAutoPropagationControllerName,
nil,
c.reconcile,
worker.RateLimiterOptions{},
workerCount,
Expand All @@ -136,7 +135,7 @@ func NewNamespaceAutoPropagationController(
func(obj *fedcorev1a1.ClusterFederatedObject) {
srcMeta, err := obj.Spec.GetTemplateAsUnstructured()
if err != nil {
logger.Error(
c.logger.Error(
err,
"Failed to get source object's metadata from ClusterFederatedObject",
"object",
Expand Down
14 changes: 4 additions & 10 deletions pkg/controllers/override/overridepolicy_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,6 @@ func NewOverridePolicyController(
c.eventRecorder = eventsink.NewDefederatingRecorderMux(kubeClient, ControllerName, 4)
c.worker = worker.NewReconcileWorker[common.QualifiedName](
ControllerName,
nil,
c.reconcile,
worker.RateLimiterOptions{},
workerCount,
Expand Down Expand Up @@ -235,7 +234,7 @@ func (c *Controller) enqueueFedObjectsUsingPolicy(policy fedcorev1a1.GenericOver

func (c *Controller) reconcileOnClusterChange(cluster *fedcorev1a1.FederatedCluster) {
logger := c.logger.WithValues("federated-cluster", cluster.GetName())
logger.V(2).Info("observed a cluster change")
logger.V(2).Info("Observed a cluster change")

opRequirement, _ := labels.NewRequirement(OverridePolicyNameLabel, selection.Exists, nil)
copRequirement, _ := labels.NewRequirement(ClusterOverridePolicyNameLabel, selection.Exists, nil)
Expand Down Expand Up @@ -339,7 +338,7 @@ func (c *Controller) reconcile(ctx context.Context, qualifiedName common.Qualifi
return worker.StatusError
}

var overrides util.OverridesMap
var overrides overridesMap
// Apply overrides from each policy in order
for _, policy := range policies {
newOverrides, err := parseOverrides(policy, placedClusters)
Expand All @@ -358,16 +357,11 @@ func (c *Controller) reconcile(ctx context.Context, qualifiedName common.Qualifi
overrides = mergeOverrides(overrides, newOverrides)
}

currentOverrides, err := util.GetOverrides(fedObject, PrefixedControllerName)
if err != nil {
keyedLogger.Error(err, "Failed to get overrides")
return worker.StatusError
}

currentOverrides := fedObject.GetSpec().GetControllerOverrides(PrefixedControllerName)
needsUpdate := !equality.Semantic.DeepEqual(overrides, currentOverrides)

if needsUpdate {
err = util.SetOverrides(fedObject, PrefixedControllerName, overrides)
err = setOverrides(fedObject, overrides)
if err != nil {
keyedLogger.Error(err, "Failed to set overrides")
return worker.StatusError
Expand Down
37 changes: 32 additions & 5 deletions pkg/controllers/override/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,11 @@ import (

fedcorev1a1 "github.com/kubewharf/kubeadmiral/pkg/apis/core/v1alpha1"
fedcorev1a1listers "github.com/kubewharf/kubeadmiral/pkg/client/listers/core/v1alpha1"
"github.com/kubewharf/kubeadmiral/pkg/controllers/util"
"github.com/kubewharf/kubeadmiral/pkg/util/clusterselector"
)

type overridesMap map[string]fedcorev1a1.OverridePatches

/*
lookForMatchedPolicies looks for OverridePolicy and/or ClusterOverridePolicy
that match the obj in the stores.
Expand Down Expand Up @@ -90,8 +91,8 @@ func lookForMatchedPolicies(
func parseOverrides(
policy fedcorev1a1.GenericOverridePolicy,
clusters []*fedcorev1a1.FederatedCluster,
) (util.OverridesMap, error) {
overridesMap := make(util.OverridesMap)
) (overridesMap, error) {
overridesMap := make(overridesMap)

for _, cluster := range clusters {
patches := make(fedcorev1a1.OverridePatches, 0)
Expand Down Expand Up @@ -130,9 +131,9 @@ func parseOverrides(
return overridesMap, nil
}

func mergeOverrides(dest, src util.OverridesMap) util.OverridesMap {
func mergeOverrides(dest, src overridesMap) overridesMap {
if dest == nil {
dest = make(util.OverridesMap)
dest = make(overridesMap)
}

for clusterName, srcOverrides := range src {
Expand Down Expand Up @@ -229,3 +230,29 @@ func policyJsonPatchOverriderToOverridePatch(

return overridePatch, nil
}

func setOverrides(federatedObj fedcorev1a1.GenericFederatedObject, overridesMap overridesMap) error {
for clusterName, clusterOverrides := range overridesMap {
if len(clusterOverrides) == 0 {
delete(overridesMap, clusterName)
}
}

if len(overridesMap) == 0 {
federatedObj.GetSpec().DeleteControllerOverrides(PrefixedControllerName)
return nil
}

overrides := []fedcorev1a1.ClusterReferenceWithPatches{}

for clusterName, clusterOverrides := range overridesMap {
overrides = append(overrides, fedcorev1a1.ClusterReferenceWithPatches{
Cluster: clusterName,
Patches: clusterOverrides,
})
}

federatedObj.GetSpec().SetControllerOverrides(PrefixedControllerName, overrides)

return nil
}
1 change: 0 additions & 1 deletion pkg/controllers/override/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
"k8s.io/client-go/tools/cache"

fedcorev1a1 "github.com/kubewharf/kubeadmiral/pkg/apis/core/v1alpha1"
fedtypesv1a1 "github.com/kubewharf/kubeadmiral/pkg/apis/types/v1alpha1"
"github.com/kubewharf/kubeadmiral/pkg/controllers/util"
)

Expand Down Expand Up @@ -299,7 +298,7 @@
}
}

foundPolicies, needsRecheckOnError, err := lookForMatchedPolicies(obj, isNamespaced, overridePolicyStore, clusterOverridePolicyStore)

Check failure on line 301 in pkg/controllers/override/util_test.go

View workflow job for this annotation

GitHub Actions / test (1.19)

cannot use obj (variable of type *unstructured.Unstructured) as type "github.com/kubewharf/kubeadmiral/pkg/apis/core/v1alpha1".GenericFederatedObject in argument to lookForMatchedPolicies:

Check failure on line 301 in pkg/controllers/override/util_test.go

View workflow job for this annotation

GitHub Actions / test (1.19)

cannot use overridePolicyStore (variable of type "k8s.io/client-go/tools/cache".Store) as type "github.com/kubewharf/kubeadmiral/pkg/client/listers/core/v1alpha1".OverridePolicyLister in argument to lookForMatchedPolicies:

Check failure on line 301 in pkg/controllers/override/util_test.go

View workflow job for this annotation

GitHub Actions / test (1.20)

cannot use obj (variable of type *unstructured.Unstructured) as "github.com/kubewharf/kubeadmiral/pkg/apis/core/v1alpha1".GenericFederatedObject value in argument to lookForMatchedPolicies: *unstructured.Unstructured does not implement "github.com/kubewharf/kubeadmiral/pkg/apis/core/v1alpha1".GenericFederatedObject (missing method DeepCopyGenericFederatedObject)

Check failure on line 301 in pkg/controllers/override/util_test.go

View workflow job for this annotation

GitHub Actions / test (1.20)

cannot use overridePolicyStore (variable of type "k8s.io/client-go/tools/cache".Store) as "github.com/kubewharf/kubeadmiral/pkg/client/listers/core/v1alpha1".OverridePolicyLister value in argument to lookForMatchedPolicies: "k8s.io/client-go/tools/cache".Store does not implement "github.com/kubewharf/kubeadmiral/pkg/client/listers/core/v1alpha1".OverridePolicyLister (wrong type for method List)
if (err != nil) != testCase.isErrorExpected {
t.Fatalf("err = %v, but isErrorExpected = %v", err, testCase.isErrorExpected)
}
Expand Down
35 changes: 17 additions & 18 deletions pkg/controllers/policyrc/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
fedcorev1a1informers "github.com/kubewharf/kubeadmiral/pkg/client/informers/externalversions/core/v1alpha1"
"github.com/kubewharf/kubeadmiral/pkg/controllers/common"
"github.com/kubewharf/kubeadmiral/pkg/controllers/override"
"github.com/kubewharf/kubeadmiral/pkg/controllers/scheduler"
"github.com/kubewharf/kubeadmiral/pkg/stats"
"github.com/kubewharf/kubeadmiral/pkg/util/eventhandlers"
"github.com/kubewharf/kubeadmiral/pkg/util/fedobjectadapters"
Expand Down Expand Up @@ -82,23 +83,8 @@
logger: logger.WithValues("controller", ControllerName),
}

if _, err := c.fedObjectInformer.Informer().AddEventHandler(eventhandlers.NewTriggerOnAllChangesWithTransform(
common.NewQualifiedName,
c.countWorker.Enqueue,
)); err != nil {
return nil, err
}

if _, err := c.clusterFedObjectInformer.Informer().AddEventHandler(eventhandlers.NewTriggerOnAllChangesWithTransform(
common.NewQualifiedName,
c.countWorker.Enqueue,
)); err != nil {
return nil, err
}

c.countWorker = worker.NewReconcileWorker[common.QualifiedName](
"policyrc-controller-count-worker",
nil,
c.reconcileCount,
worker.RateLimiterOptions{},
1, // currently only one worker is meaningful due to the global mutex
Expand All @@ -107,7 +93,6 @@

c.persistPpWorker = worker.NewReconcileWorker[common.QualifiedName](
"policyrc-controller-persist-worker",
nil,
func(ctx context.Context, qualifiedName common.QualifiedName) worker.Result {
return c.reconcilePersist(
ctx,
Expand All @@ -125,7 +110,6 @@

c.persistOpWorker = worker.NewReconcileWorker[common.QualifiedName](
"policyrc-controller-persist-worker",
nil,
func(ctx context.Context, qualifiedName common.QualifiedName) worker.Result {
return c.reconcilePersist(
ctx,
Expand All @@ -141,6 +125,21 @@
metrics,
)

if _, err := c.fedObjectInformer.Informer().AddEventHandler(eventhandlers.NewTriggerOnAllChangesWithTransform(
common.NewQualifiedName,
c.countWorker.Enqueue,
)); err != nil {
return nil, err
}

if _, err := c.clusterFedObjectInformer.Informer().AddEventHandler(eventhandlers.NewTriggerOnAllChangesWithTransform(
common.NewQualifiedName,
c.countWorker.Enqueue,
)); err != nil {
return nil, err
}


Check failure on line 142 in pkg/controllers/policyrc/controller.go

View workflow job for this annotation

GitHub Actions / lint

File is not `gci`-ed with --skip-generated -s standard -s default -s prefix(github.com/kubewharf/kubeadmiral) --custom-order (gci)

Check failure on line 142 in pkg/controllers/policyrc/controller.go

View workflow job for this annotation

GitHub Actions / lint

File is not `gofumpt`-ed (gofumpt)
if _, err := c.propagationPolicyInformer.Informer().AddEventHandler(
eventhandlers.NewTriggerOnAllChangesWithTransform(common.NewQualifiedName, c.persistPpWorker.Enqueue),
); err != nil {
Expand Down Expand Up @@ -236,7 +235,7 @@

var newPps []PolicyKey
if fedObj != nil {
newPolicy, newHasPolicy := fedobjectadapters.MatchedPolicyKey(fedObj, fedObj.GetNamespace() != "")
newPolicy, newHasPolicy := scheduler.GetMatchedPolicyKey(fedObj)
if newHasPolicy {
newPps = []PolicyKey{PolicyKey(newPolicy)}
}
Expand Down
Loading
Loading