Skip to content

Commit

Permalink
Merge pull request #173 from limhawjia/refactor-scheduler
Browse files Browse the repository at this point in the history
refactor(scheduler): adopt unified types
  • Loading branch information
limhawjia authored Jul 28, 2023
2 parents d28244f + ab72f36 commit e1e35d5
Show file tree
Hide file tree
Showing 57 changed files with 1,687 additions and 1,261 deletions.
8 changes: 5 additions & 3 deletions cmd/controller-manager/app/controllermanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,12 @@ import (
const (
FederatedClusterControllerName = "cluster"
FederateControllerName = "federate"
MonitorControllerName = "monitor"
FollowerControllerName = "follower"
PolicyRCControllerName = "policyrc"
OverrideControllerName = "overridepolicy"
OverrideControllerName = "override"
NamespaceAutoPropagationControllerName = "nsautoprop"
StatusControllerName = "status"
SchedulerName = "scheduler"
)

var knownControllers = map[string]controllermanager.StartControllerFunc{
Expand All @@ -51,9 +51,11 @@ var knownControllers = map[string]controllermanager.StartControllerFunc{
OverrideControllerName: startOverridePolicyController,
NamespaceAutoPropagationControllerName: startNamespaceAutoPropagationController,
StatusControllerName: startStatusController,
FederatedClusterControllerName: startFederatedClusterController,
SchedulerName: startScheduler,
}

var controllersDisabledByDefault = sets.New(MonitorControllerName)
var controllersDisabledByDefault = sets.New[string]()

// Run starts the controller manager according to the given options.
func Run(ctx context.Context, opts *options.Options) {
Expand Down
55 changes: 55 additions & 0 deletions cmd/controller-manager/app/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ import (
"github.com/kubewharf/kubeadmiral/pkg/controllers/override"
"github.com/kubewharf/kubeadmiral/pkg/controllers/policyrc"
"github.com/kubewharf/kubeadmiral/pkg/controllers/status"
"github.com/kubewharf/kubeadmiral/pkg/controllers/federatedcluster"
"github.com/kubewharf/kubeadmiral/pkg/controllers/scheduler"
)

func startFederateController(
Expand Down Expand Up @@ -152,3 +154,56 @@ func startStatusController(

return statusController, nil
}

func startFederatedClusterController(
ctx context.Context,
controllerCtx *controllercontext.Context,
) (controllermanager.Controller, error) {
federatedClusterController, err := federatedcluster.NewFederatedClusterController(
controllerCtx.KubeClientset,
controllerCtx.FedClientset,
controllerCtx.FedInformerFactory.Core().V1alpha1().FederatedClusters(),
controllerCtx.FederatedInformerManager,
controllerCtx.Metrics,
klog.Background(),
controllerCtx.ComponentConfig.ClusterJoinTimeout,
controllerCtx.WorkerCount,
controllerCtx.FedSystemNamespace,
)
if err != nil {
return nil, fmt.Errorf("error creating federate controller: %w", err)
}

go federatedClusterController.Run(ctx)

return federatedClusterController, nil
}

func startScheduler(
ctx context.Context,
controllerCtx *controllercontext.Context,
) (controllermanager.Controller, error) {
scheduler, err := scheduler.NewScheduler(
controllerCtx.KubeClientset,
controllerCtx.FedClientset,
controllerCtx.DynamicClientset,
controllerCtx.FedInformerFactory.Core().V1alpha1().FederatedObjects(),
controllerCtx.FedInformerFactory.Core().V1alpha1().ClusterFederatedObjects(),
controllerCtx.FedInformerFactory.Core().V1alpha1().PropagationPolicies(),
controllerCtx.FedInformerFactory.Core().V1alpha1().ClusterPropagationPolicies(),
controllerCtx.FedInformerFactory.Core().V1alpha1().FederatedClusters(),
controllerCtx.FedInformerFactory.Core().V1alpha1().SchedulingProfiles(),
controllerCtx.InformerManager,
controllerCtx.FedInformerFactory.Core().V1alpha1().SchedulerPluginWebhookConfigurations(),
controllerCtx.Metrics,
klog.Background(),
controllerCtx.WorkerCount,
)
if err != nil {
return nil, fmt.Errorf("error creating scheduler: %w", err)
}

go scheduler.Run(ctx)

return scheduler, nil
}
11 changes: 4 additions & 7 deletions cmd/controller-manager/app/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"k8s.io/client-go/dynamic/dynamicinformer"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"

"github.com/kubewharf/kubeadmiral/cmd/controller-manager/app/options"
Expand Down Expand Up @@ -115,19 +116,15 @@ func createControllerContext(opts *options.Options) (*controllercontext.Context,
nil,
)
federatedInformerManager := informermanager.NewFederatedInformerManager(
informermanager.ClusterClientGetter{
informermanager.ClusterClientHelper{
ConnectionHash: informermanager.DefaultClusterConnectionHash,
ClientGetter: func(cluster *fedcorev1a1.FederatedCluster) (dynamic.Interface, error) {
restConfig, err := clusterutil.BuildClusterConfig(
RestConfigGetter: func(cluster *fedcorev1a1.FederatedCluster) (*rest.Config, error) {
return clusterutil.BuildClusterConfig(
cluster,
kubeClientset,
restConfig,
common.DefaultFedSystemNamespace,
)
if err != nil {
return nil, err
}
return dynamic.NewForConfig(restConfig)
},
},
fedInformerFactory.Core().V1alpha1().FederatedTypeConfigs(),
Expand Down
79 changes: 77 additions & 2 deletions pkg/apis/core/v1alpha1/extensions_federatedobject.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,9 @@ func (spec *GenericFederatedObjectSpec) GetControllerPlacement(controller string
return nil
}

// SetControllerPlacement sets the ClusterPlacements for a given controller. If clusterNames is nil or empty, the previous
// placement for the given controller will be deleted. Returns a bool indicating if the GenericFederatedObject has changed.
// SetControllerPlacement sets the cluster placements for a given controller. If clusterNames is nil or empty, the
// previous placement for the given controller will be deleted. Returns a bool indicating if the GenericFederatedObject
// has changed.
func (spec *GenericFederatedObjectSpec) SetControllerPlacement(controller string, clusterNames []string) bool {
if len(clusterNames) == 0 {
return spec.DeleteControllerPlacement(controller)
Expand Down Expand Up @@ -140,6 +141,79 @@ func (spec *GenericFederatedObjectSpec) DeleteControllerPlacement(controller str
return true
}

// Overrides extensions

func (spec *GenericFederatedObjectSpec) GetControllerOverrides(controller string) []ClusterReferenceWithPatches {
for _, overrides := range spec.Overrides {
if overrides.Controller == controller {
return overrides.Override
}
}
return nil
}

// SetControllerOverrides sets the cluster overrides for a given controller. If clusterNames is nil or empty, the
// previous overrides for the given controller will be deleted. Returns a bool indicating if the GenericFederatedObject
// has changed.
func (spec *GenericFederatedObjectSpec) SetControllerOverrides(
controller string,
clusterOverrides []ClusterReferenceWithPatches,
) bool {
if len(clusterOverrides) == 0 {
return spec.DeleteControllerOverrides(controller)
}

// sort the clusters by name for readability and to avoid unnecessary updates
sort.Slice(clusterOverrides, func(i, j int) bool {
return clusterOverrides[i].Cluster < clusterOverrides[j].Cluster
})

oldOverridesWithControllerIdx := -1
for i := range spec.Overrides {
if spec.Overrides[i].Controller == controller {
oldOverridesWithControllerIdx = i
break
}
}

newOverridesWithController := OverrideWithController{
Controller: controller,
Override: clusterOverrides,
}
if oldOverridesWithControllerIdx == -1 {
spec.Overrides = append(spec.Overrides, newOverridesWithController)
return true
}
if !reflect.DeepEqual(newOverridesWithController, spec.Overrides[oldOverridesWithControllerIdx]) {
spec.Overrides[oldOverridesWithControllerIdx] = newOverridesWithController
return true
}

return false
}

// DeleteControllerOverrides deletes a controller's overrides, returning a bool to indicate if the
// GenericFederatedObject has changed.
func (spec *GenericFederatedObjectSpec) DeleteControllerOverrides(controller string) bool {
oldOverridesIdx := -1
for i := range spec.Overrides {
if spec.Overrides[i].Controller == controller {
oldOverridesIdx = i
break
}
}

if oldOverridesIdx == -1 {
return false
}

spec.Overrides = append(spec.Overrides[:oldOverridesIdx], spec.Overrides[(oldOverridesIdx+1):]...)
return true
}

// Template extensions

// GetTemplateAsUnstructured returns the FederatedObject's template unmarshalled into an *unstructured.Unstructured.
func (spec *GenericFederatedObjectSpec) GetTemplateAsUnstructured() (*unstructured.Unstructured, error) {
template := &unstructured.Unstructured{}
if err := template.UnmarshalJSON(spec.Template.Raw); err != nil {
Expand All @@ -148,6 +222,7 @@ func (spec *GenericFederatedObjectSpec) GetTemplateAsUnstructured() (*unstructur
return template, nil
}

// GetTemplateGVK returns the GVK of the FederatedObject's source object by parsing the FederatedObject's template.
func (spec *GenericFederatedObjectSpec) GetTemplateGVK() (schema.GroupVersionKind, error) {
type partialTypeMetadata struct {
metav1.TypeMeta `json:",inline"`
Expand Down
4 changes: 2 additions & 2 deletions pkg/controllers/common/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (
"strings"

"k8s.io/apimachinery/pkg/api/meta"
pkgruntime "k8s.io/apimachinery/pkg/runtime"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

// QualifiedName comprises a resource name with an optional namespace.
Expand All @@ -39,7 +39,7 @@ type QualifiedName struct {
Name string
}

func NewQualifiedName(obj pkgruntime.Object) QualifiedName {
func NewQualifiedName(obj metav1.Object) QualifiedName {
accessor, err := meta.Accessor(obj)
if err != nil {
// This should never happen, but if it does, the
Expand Down
102 changes: 52 additions & 50 deletions pkg/controllers/federate/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/dynamic"
Expand Down Expand Up @@ -112,7 +111,6 @@ func NewFederateController(
c.eventRecorder = eventsink.NewDefederatingRecorderMux(kubeClient, FederateControllerName, 6)
c.worker = worker.NewReconcileWorker[workerKey](
FederateControllerName,
nil,
c.reconcile,
worker.RateLimiterOptions{},
workerCount,
Expand All @@ -130,14 +128,15 @@ func NewFederateController(
uns := obj.(*unstructured.Unstructured)
return uns.GetNamespace() != fedSystemNamespace
},
Handler: eventhandlers.NewTriggerOnAllChanges(func(obj runtime.Object) {
uns := obj.(*unstructured.Unstructured)
c.worker.Enqueue(workerKey{
name: uns.GetName(),
namespace: uns.GetNamespace(),
gvk: ftc.GetSourceTypeGVK(),
})
}),
Handler: eventhandlers.NewTriggerOnAllChanges(
func(uns *unstructured.Unstructured) {
c.worker.Enqueue(workerKey{
name: uns.GetName(),
namespace: uns.GetNamespace(),
gvk: ftc.GetSourceTypeGVK(),
})
},
),
}
},
}); err != nil {
Expand All @@ -152,47 +151,53 @@ func NewFederateController(
fedObj := obj.(*fedcorev1a1.FederatedObject)
return fedObj.Namespace != fedSystemNamespace
},
Handler: eventhandlers.NewTriggerOnAllChanges(func(o runtime.Object) {
fedObj := o.(*fedcorev1a1.FederatedObject)
logger := c.logger.WithValues("federated-object", common.NewQualifiedName(fedObj))

srcMeta, err := fedObj.Spec.GetTemplateAsUnstructured()
if err != nil {
logger.Error(err, "Failed to get source object's metadata from FederatedObject")
return
}

gvk := srcMeta.GroupVersionKind()

c.worker.Enqueue(workerKey{
name: srcMeta.GetName(),
namespace: srcMeta.GetNamespace(),
gvk: gvk,
})
}),
Handler: eventhandlers.NewTriggerOnAllChanges(
func(fedObj *fedcorev1a1.FederatedObject) {
srcMeta, err := fedObj.Spec.GetTemplateAsUnstructured()
if err != nil {
c.logger.Error(
err,
"Failed to get source object's metadata from FederatedObject",
"object",
common.NewQualifiedName(fedObj),
)
return
}

gvk := srcMeta.GroupVersionKind()

c.worker.Enqueue(workerKey{
name: srcMeta.GetName(),
namespace: srcMeta.GetNamespace(),
gvk: gvk,
})
}),
}); err != nil {
return nil, err
}

if _, err := clusterFedObjectInformer.Informer().AddEventHandler(
eventhandlers.NewTriggerOnAllChanges(func(o runtime.Object) {
fedObj := o.(*fedcorev1a1.ClusterFederatedObject)
logger := c.logger.WithValues("cluster-federated-object", common.NewQualifiedName(fedObj))

srcMeta, err := fedObj.Spec.GetTemplateAsUnstructured()
if err != nil {
logger.Error(err, "Failed to get source object's metadata from ClusterFederatedObject")
return
}

gvk := srcMeta.GroupVersionKind()

c.worker.Enqueue(workerKey{
name: srcMeta.GetName(),
namespace: srcMeta.GetNamespace(),
gvk: gvk,
})
}),
eventhandlers.NewTriggerOnAllChanges(
func(fedObj *fedcorev1a1.ClusterFederatedObject) {
srcMeta, err := fedObj.Spec.GetTemplateAsUnstructured()
if err != nil {
logger.Error(
err,
"Failed to get source object's metadata from ClusterFederatedObject",
"object",
common.NewQualifiedName(fedObj),
)
return
}

gvk := srcMeta.GroupVersionKind()

c.worker.Enqueue(workerKey{
name: srcMeta.GetName(),
namespace: srcMeta.GetNamespace(),
gvk: gvk,
})
}),
); err != nil {
return nil, err
}
Expand Down Expand Up @@ -224,8 +229,7 @@ func (c *FederateController) HasSynced() bool {

func (c *FederateController) reconcile(ctx context.Context, key workerKey) (status worker.Result) {
_ = c.metrics.Rate("federate.throughput", 1)
ctx, logger := logging.InjectLogger(ctx, c.logger)
ctx, logger = logging.InjectLoggerValues(ctx, "source-object", key.QualifiedName().String(), "gvk", key.gvk)
ctx, logger := logging.InjectLoggerValues(ctx, "source-object", key.QualifiedName().String(), "gvk", key.gvk)

startTime := time.Now()

Expand Down Expand Up @@ -396,8 +400,6 @@ func (c *FederateController) reconcile(ctx context.Context, key workerKey) (stat
"Federated object updated: %s",
fedObject.GetName(),
)
} else {
logger.V(3).Info("No updates required to the federated object")
}

return worker.StatusAllOK
Expand Down
Loading

0 comments on commit e1e35d5

Please sign in to comment.