Skip to content

Commit

Permalink
feat: add propagation policy controller
Browse files Browse the repository at this point in the history
  • Loading branch information
JackZxj committed Aug 1, 2023
1 parent cbb3666 commit cda0878
Show file tree
Hide file tree
Showing 6 changed files with 230 additions and 10 deletions.
2 changes: 2 additions & 0 deletions cmd/controller-manager/app/controllermanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ const (
StatusControllerName = "status"
SchedulerName = "scheduler"
SyncControllerName = "sync"
PropagationPolicyControllerName = "propagation-policy-controller"
)

var knownControllers = map[string]controllermanager.StartControllerFunc{
Expand All @@ -56,6 +57,7 @@ var knownControllers = map[string]controllermanager.StartControllerFunc{
SchedulerName: startScheduler,
SyncControllerName: startSyncController,
FollowerControllerName: startFollowerController,
PropagationPolicyControllerName: startPropagationPolicyController,
}

var controllersDisabledByDefault = sets.New[string]()
Expand Down
22 changes: 22 additions & 0 deletions cmd/controller-manager/app/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/kubewharf/kubeadmiral/pkg/controllers/nsautoprop"
"github.com/kubewharf/kubeadmiral/pkg/controllers/override"
"github.com/kubewharf/kubeadmiral/pkg/controllers/policyrc"
"github.com/kubewharf/kubeadmiral/pkg/controllers/propagationpolicy"
"github.com/kubewharf/kubeadmiral/pkg/controllers/scheduler"
"github.com/kubewharf/kubeadmiral/pkg/controllers/status"
"github.com/kubewharf/kubeadmiral/pkg/controllers/sync"
Expand Down Expand Up @@ -261,3 +262,24 @@ func startFollowerController(

return followerController, nil
}

func startPropagationPolicyController(
ctx context.Context,
controllerCtx *controllercontext.Context,
) (controllermanager.Controller, error) {
ppController, err := propagationpolicy.NewPropagationPolicyController(
controllerCtx.FedClientset,
controllerCtx.FedInformerFactory.Core().V1alpha1().PropagationPolicies(),
controllerCtx.FedInformerFactory.Core().V1alpha1().ClusterPropagationPolicies(),
controllerCtx.Metrics,
klog.Background(),
controllerCtx.WorkerCount,
)
if err != nil {
return nil, fmt.Errorf("error creating propagation policy controller: %w", err)
}

go ppController.Run(ctx)

return ppController, nil
}
194 changes: 194 additions & 0 deletions pkg/controllers/propagationpolicy/controller.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,194 @@
/*
Copyright 2023 The KubeAdmiral Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package propagationpolicy

import (
"context"
"time"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/klog/v2"

corev1alpha1 "github.com/kubewharf/kubeadmiral/pkg/apis/core/v1alpha1"

Check failure on line 27 in pkg/controllers/propagationpolicy/controller.go

View workflow job for this annotation

GitHub Actions / lint

import "github.com/kubewharf/kubeadmiral/pkg/apis/core/v1alpha1" imported as "corev1alpha1" but must be "fedcorev1a1" according to config (importas)
fedclient "github.com/kubewharf/kubeadmiral/pkg/client/clientset/versioned"
fedcorev1a1informers "github.com/kubewharf/kubeadmiral/pkg/client/informers/externalversions/core/v1alpha1"
"github.com/kubewharf/kubeadmiral/pkg/controllers/common"
"github.com/kubewharf/kubeadmiral/pkg/stats"
"github.com/kubewharf/kubeadmiral/pkg/util/eventhandlers"
"github.com/kubewharf/kubeadmiral/pkg/util/logging"
"github.com/kubewharf/kubeadmiral/pkg/util/worker"
)

const (
PropagationPolicyControllerName = "propagation-policy-controller"
)

type Controller struct {
propagationPolicyInformer fedcorev1a1informers.PropagationPolicyInformer
clusterPropagationPolicyInformer fedcorev1a1informers.ClusterPropagationPolicyInformer

// updates the local counter upon fed object updates
worker worker.ReconcileWorker[common.QualifiedName]

fedClient fedclient.Interface
metrics stats.Metrics
logger klog.Logger
}

func NewPropagationPolicyController(
fedClient fedclient.Interface,
propagationPolicyInformer fedcorev1a1informers.PropagationPolicyInformer,
clusterPropagationPolicyInformer fedcorev1a1informers.ClusterPropagationPolicyInformer,
metrics stats.Metrics,
logger klog.Logger,
workerCount int,
) (*Controller, error) {
c := &Controller{
propagationPolicyInformer: propagationPolicyInformer,
clusterPropagationPolicyInformer: clusterPropagationPolicyInformer,
fedClient: fedClient,
metrics: metrics,
logger: logger.WithValues("controller", PropagationPolicyControllerName),
}

c.worker = worker.NewReconcileWorker[common.QualifiedName](
PropagationPolicyControllerName,
c.reconcile,
worker.RateLimiterOptions{},
workerCount,
metrics,
)

if _, err := c.propagationPolicyInformer.Informer().AddEventHandler(
eventhandlers.NewTriggerOnAllChangesWithTransform(common.NewQualifiedName, c.worker.Enqueue),
); err != nil {
return nil, err
}

if _, err := c.clusterPropagationPolicyInformer.Informer().AddEventHandler(
eventhandlers.NewTriggerOnAllChangesWithTransform(common.NewQualifiedName, c.worker.Enqueue),
); err != nil {
return nil, err
}

return c, nil
}

func (c *Controller) Run(ctx context.Context) {
ctx, logger := logging.InjectLogger(ctx, c.logger)

logger.Info("Starting controller")
defer c.logger.Info("Stopping controller")

if !cache.WaitForNamedCacheSync(PropagationPolicyControllerName, ctx.Done(), c.HasSynced) {
logger.Error(nil, "Timed out waiting for caches to sync")
return
}
logger.Info("Caches are synced")
c.worker.Run(ctx)
<-ctx.Done()
}

func (c *Controller) HasSynced() bool {
return c.propagationPolicyInformer.Informer().HasSynced() &&
c.clusterPropagationPolicyInformer.Informer().HasSynced()
}

func (c *Controller) IsControllerReady() bool {
return c.HasSynced()
}

func (c *Controller) reconcile(ctx context.Context, key common.QualifiedName) (status worker.Result) {
ctx, logger := logging.InjectLoggerValues(ctx, "object", key.String())

c.metrics.Rate("api-migration-controller.throughput", 1)
logger.V(3).Info("Starting to reconcile")
startTime := time.Now()
defer func() {
c.metrics.Duration("api-migration-controller.latency", startTime)
logger.V(3).WithValues("duration", time.Since(startTime), "status", status.String()).
Info("Finished reconciling")
}()

var policy corev1alpha1.GenericPropagationPolicy
var err error
if key.Namespace == "" {
policy, err = c.fedClient.CoreV1alpha1().ClusterPropagationPolicies().Get(ctx, key.Name, metav1.GetOptions{})
} else {
policy, err = c.fedClient.CoreV1alpha1().PropagationPolicies(key.Namespace).Get(ctx, key.Name, metav1.GetOptions{})
}
if err != nil {
logger.Error(err, "Failed to get policy")
return worker.StatusError
}

policySpec := policy.GetSpec()
policyChanged := false

if policySpec.ReschedulePolicy == nil {
policySpec.ReschedulePolicy = &corev1alpha1.ReschedulePolicy{
ReplicaRescheduling: policySpec.ReplicaRescheduling,
Trigger: &corev1alpha1.RescheduleTrigger{
PolicyContentChanged: true,
},
}
if policySpec.StickyCluster {
policySpec.ReschedulePolicy = &corev1alpha1.ReschedulePolicy{
DisableRescheduling: true,
}
}
policyChanged = true
} else {
if policySpec.ReschedulePolicy.DisableRescheduling {
if !policySpec.StickyCluster {
policySpec.StickyCluster = true
policyChanged = true
}
} else {
if policySpec.ReschedulePolicy.ReplicaRescheduling == nil {
if policySpec.ReplicaRescheduling != nil {
policySpec.ReschedulePolicy.ReplicaRescheduling = policySpec.ReplicaRescheduling
policyChanged = true
}
} else {
if policySpec.ReplicaRescheduling == nil ||
policySpec.ReplicaRescheduling.AvoidDisruption != policySpec.ReschedulePolicy.ReplicaRescheduling.AvoidDisruption {
policySpec.ReplicaRescheduling = policySpec.ReschedulePolicy.ReplicaRescheduling
policyChanged = true
}
}
}
}
if !policyChanged {
return worker.StatusAllOK
}

if key.Namespace == "" {
p := policy.(*corev1alpha1.ClusterPropagationPolicy)
_, err = c.fedClient.CoreV1alpha1().ClusterPropagationPolicies().Update(ctx, p, metav1.UpdateOptions{})
} else {
p := policy.(*corev1alpha1.PropagationPolicy)
_, err = c.fedClient.CoreV1alpha1().PropagationPolicies(key.Namespace).Update(ctx, p, metav1.UpdateOptions{})
}
if err != nil {
logger.Error(err, "Failed to update policy")
return worker.StatusError
}

return worker.StatusAllOK
}
14 changes: 8 additions & 6 deletions pkg/controllers/scheduler/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,8 +108,10 @@ func TestGetSchedulingUnit(t *testing.T) {
AutoMigration: &fedcorev1a1.AutoMigration{
KeepUnschedulableReplicas: false,
},
ReplicaRescheduling: &fedcorev1a1.ReplicaRescheduling{
AvoidDisruption: false,
ReschedulePolicy: &fedcorev1a1.ReschedulePolicy{
ReplicaRescheduling: &fedcorev1a1.ReplicaRescheduling{
AvoidDisruption: false,
},
},
},
}
Expand Down Expand Up @@ -194,10 +196,10 @@ func TestGetSchedulingUnitWithAnnotationOverrides(t *testing.T) {
name: "sticky cluster override",
policy: &fedcorev1a1.PropagationPolicy{
Spec: fedcorev1a1.PropagationPolicySpec{
StickyCluster: true,
ClusterSelector: map[string]string{
"label": "value1",
},
ReschedulePolicy: &fedcorev1a1.ReschedulePolicy{DisableRescheduling: true},
},
},
annotations: map[string]string{
Expand All @@ -215,10 +217,10 @@ func TestGetSchedulingUnitWithAnnotationOverrides(t *testing.T) {
name: "Cluster selector override",
policy: &fedcorev1a1.PropagationPolicy{
Spec: fedcorev1a1.PropagationPolicySpec{
StickyCluster: true,
ClusterSelector: map[string]string{
"label": "value1",
},
ReschedulePolicy: &fedcorev1a1.ReschedulePolicy{DisableRescheduling: true},
},
},
annotations: map[string]string{
Expand All @@ -237,10 +239,10 @@ func TestGetSchedulingUnitWithAnnotationOverrides(t *testing.T) {
policy: &fedcorev1a1.PropagationPolicy{
Spec: fedcorev1a1.PropagationPolicySpec{
SchedulingMode: fedcorev1a1.SchedulingModeDuplicate,
StickyCluster: true,
ClusterSelector: map[string]string{
"label": "value1",
},
ReschedulePolicy: &fedcorev1a1.ReschedulePolicy{DisableRescheduling: true},
},
},
annotations: map[string]string{
Expand Down Expand Up @@ -291,7 +293,6 @@ func TestGetSchedulingUnitWithAnnotationOverrides(t *testing.T) {
name: "Tolerations override",
policy: &fedcorev1a1.PropagationPolicy{
Spec: fedcorev1a1.PropagationPolicySpec{
StickyCluster: true,
ClusterSelector: map[string]string{
"label": "value1",
},
Expand All @@ -302,6 +303,7 @@ func TestGetSchedulingUnitWithAnnotationOverrides(t *testing.T) {
Effect: corev1.TaintEffectNoExecute,
},
},
ReschedulePolicy: &fedcorev1a1.ReschedulePolicy{DisableRescheduling: true},
},
},
annotations: map[string]string{
Expand Down
7 changes: 4 additions & 3 deletions pkg/controllers/scheduler/schedulingunit.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,9 @@ func schedulingUnitForFedObject(
}
}

if replicaRescheduling := policy.GetSpec().ReplicaRescheduling; replicaRescheduling != nil {
schedulingUnit.AvoidDisruption = replicaRescheduling.AvoidDisruption
if reschedulePolicy := policy.GetSpec().ReschedulePolicy; reschedulePolicy != nil &&
reschedulePolicy.ReplicaRescheduling != nil {
schedulingUnit.AvoidDisruption = reschedulePolicy.ReplicaRescheduling.AvoidDisruption
}

schedulingUnit.SchedulingMode = schedulingMode
Expand Down Expand Up @@ -240,7 +241,7 @@ func getAutoMigrationInfo(fedObject fedcorev1a1.GenericFederatedObject) (*framew
}

func getIsStickyClusterFromPolicy(policy fedcorev1a1.GenericPropagationPolicy) bool {
return policy.GetSpec().StickyCluster
return policy.GetSpec().ReschedulePolicy != nil && policy.GetSpec().ReschedulePolicy.DisableRescheduling
}

func getIsStickyClusterFromObject(object fedcorev1a1.GenericFederatedObject) (bool, bool) {
Expand Down
1 change: 0 additions & 1 deletion test/e2e/framework/policies/propagationpolicy.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ func PropagationPolicyForClustersWithPlacements(
},
Spec: fedcorev1a1.PropagationPolicySpec{
SchedulingMode: fedcorev1a1.SchedulingModeDuplicate,
StickyCluster: false,
Placements: []fedcorev1a1.ClusterReference{},
},
}
Expand Down

0 comments on commit cda0878

Please sign in to comment.