Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(scheduler): adopt unified types #173

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
e10bbe8
feat(federated-informer-manager): integrate pod informer
limhawjia Jul 24, 2023
513a40d
fix(federated-informer-manager): fix cluster deletion
limhawjia Jul 25, 2023
18f2623
refactor(cluster-controller): adopt unified types
limhawjia Jul 25, 2023
f87a860
refactor(cluster-controller): bootstrap controller-manager with clust…
limhawjia Jul 25, 2023
a54768f
refactor(scheduler): adopt unified types
limhawjia Jul 22, 2023
89c3720
fix(scheduler): trigger schedulingprofile informer start
limhawjia Jul 22, 2023
937a2bd
refactor(eventhandlers): refactor event handlers
limhawjia Jul 28, 2023
94ca742
refactor(core): change override controller name
limhawjia Jul 28, 2023
338ce5f
feat(override): deprecate override util package
limhawjia Jul 28, 2023
d4038e8
fix(policyrc): fix imports
limhawjia Jul 28, 2023
ed50baf
fix(cluster-controller): status update on cache not synced
limhawjia Jul 28, 2023
cfcaea9
fix(cluster-controller): cluster controller synced
limhawjia Jul 28, 2023
4c9895a
fix(scheduler): typos and unit test
limhawjia Jul 28, 2023
abe213f
fix(policyrc): nil pointer on controller initialization
limhawjia Jul 28, 2023
5285e18
fix(scheduler): nil pointer in cluster join
limhawjia Jul 28, 2023
44367f0
fix(scheduler): nil pointer in enqueue
limhawjia Jul 28, 2023
8c75b1c
fix(worker): remove enqueueObject and keyFunc
limhawjia Jul 28, 2023
fb44559
chore(cluster-controller): adjust healthcheck period
limhawjia Jul 28, 2023
af546b3
feat(scheduler): use label selectors for listing
limhawjia Jul 28, 2023
1b642db
fix(scheduler): use label selectors for listing
limhawjia Jul 28, 2023
76e5baa
fix(scheduler): fix extra error check and double reenque
limhawjia Jul 28, 2023
ab72f36
chore(scheduler): fix formatting
limhawjia Jul 28, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions cmd/controller-manager/app/controllermanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,12 @@ import (
const (
FederatedClusterControllerName = "cluster"
FederateControllerName = "federate"
MonitorControllerName = "monitor"
FollowerControllerName = "follower"
PolicyRCControllerName = "policyrc"
OverrideControllerName = "overridepolicy"
OverrideControllerName = "override"
NamespaceAutoPropagationControllerName = "nsautoprop"
StatusControllerName = "status"
SchedulerName = "scheduler"
)

var knownControllers = map[string]controllermanager.StartControllerFunc{
Expand All @@ -51,9 +51,11 @@ var knownControllers = map[string]controllermanager.StartControllerFunc{
OverrideControllerName: startOverridePolicyController,
NamespaceAutoPropagationControllerName: startNamespaceAutoPropagationController,
StatusControllerName: startStatusController,
FederatedClusterControllerName: startFederatedClusterController,
SchedulerName: startScheduler,
}

var controllersDisabledByDefault = sets.New(MonitorControllerName)
var controllersDisabledByDefault = sets.New[string]()

// Run starts the controller manager according to the given options.
func Run(ctx context.Context, opts *options.Options) {
Expand Down
55 changes: 55 additions & 0 deletions cmd/controller-manager/app/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
"github.com/kubewharf/kubeadmiral/pkg/controllers/override"
"github.com/kubewharf/kubeadmiral/pkg/controllers/policyrc"
"github.com/kubewharf/kubeadmiral/pkg/controllers/status"
"github.com/kubewharf/kubeadmiral/pkg/controllers/federatedcluster"

Check failure on line 32 in cmd/controller-manager/app/core.go

View workflow job for this annotation

GitHub Actions / lint

File is not `gofumpt`-ed (gofumpt)
"github.com/kubewharf/kubeadmiral/pkg/controllers/scheduler"

Check failure on line 33 in cmd/controller-manager/app/core.go

View workflow job for this annotation

GitHub Actions / lint

File is not `gci`-ed with --skip-generated -s standard -s default -s prefix(github.com/kubewharf/kubeadmiral) --custom-order (gci)
)

func startFederateController(
Expand Down Expand Up @@ -152,3 +154,56 @@

return statusController, nil
}

func startFederatedClusterController(
ctx context.Context,
controllerCtx *controllercontext.Context,
) (controllermanager.Controller, error) {
federatedClusterController, err := federatedcluster.NewFederatedClusterController(
controllerCtx.KubeClientset,
controllerCtx.FedClientset,
controllerCtx.FedInformerFactory.Core().V1alpha1().FederatedClusters(),
controllerCtx.FederatedInformerManager,
controllerCtx.Metrics,
klog.Background(),
controllerCtx.ComponentConfig.ClusterJoinTimeout,
controllerCtx.WorkerCount,
controllerCtx.FedSystemNamespace,
)
if err != nil {
return nil, fmt.Errorf("error creating federate controller: %w", err)
}

go federatedClusterController.Run(ctx)

return federatedClusterController, nil
}

func startScheduler(
ctx context.Context,
controllerCtx *controllercontext.Context,
) (controllermanager.Controller, error) {
scheduler, err := scheduler.NewScheduler(
controllerCtx.KubeClientset,
controllerCtx.FedClientset,
controllerCtx.DynamicClientset,
controllerCtx.FedInformerFactory.Core().V1alpha1().FederatedObjects(),
controllerCtx.FedInformerFactory.Core().V1alpha1().ClusterFederatedObjects(),
controllerCtx.FedInformerFactory.Core().V1alpha1().PropagationPolicies(),
controllerCtx.FedInformerFactory.Core().V1alpha1().ClusterPropagationPolicies(),
controllerCtx.FedInformerFactory.Core().V1alpha1().FederatedClusters(),
controllerCtx.FedInformerFactory.Core().V1alpha1().SchedulingProfiles(),
controllerCtx.InformerManager,
controllerCtx.FedInformerFactory.Core().V1alpha1().SchedulerPluginWebhookConfigurations(),
controllerCtx.Metrics,
klog.Background(),
controllerCtx.WorkerCount,
)
if err != nil {
return nil, fmt.Errorf("error creating scheduler: %w", err)
}

go scheduler.Run(ctx)

return scheduler, nil
}
11 changes: 4 additions & 7 deletions cmd/controller-manager/app/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"k8s.io/client-go/dynamic/dynamicinformer"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"

"github.com/kubewharf/kubeadmiral/cmd/controller-manager/app/options"
Expand Down Expand Up @@ -115,19 +116,15 @@ func createControllerContext(opts *options.Options) (*controllercontext.Context,
nil,
)
federatedInformerManager := informermanager.NewFederatedInformerManager(
informermanager.ClusterClientGetter{
informermanager.ClusterClientHelper{
ConnectionHash: informermanager.DefaultClusterConnectionHash,
ClientGetter: func(cluster *fedcorev1a1.FederatedCluster) (dynamic.Interface, error) {
restConfig, err := clusterutil.BuildClusterConfig(
RestConfigGetter: func(cluster *fedcorev1a1.FederatedCluster) (*rest.Config, error) {
return clusterutil.BuildClusterConfig(
cluster,
kubeClientset,
restConfig,
common.DefaultFedSystemNamespace,
)
if err != nil {
return nil, err
}
return dynamic.NewForConfig(restConfig)
},
},
fedInformerFactory.Core().V1alpha1().FederatedTypeConfigs(),
Expand Down
79 changes: 77 additions & 2 deletions pkg/apis/core/v1alpha1/extensions_federatedobject.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,9 @@ func (spec *GenericFederatedObjectSpec) GetControllerPlacement(controller string
return nil
}

// SetControllerPlacement sets the ClusterPlacements for a given controller. If clusterNames is nil or empty, the previous
// placement for the given controller will be deleted. Returns a bool indicating if the GenericFederatedObject has changed.
// SetControllerPlacement sets the cluster placements for a given controller. If clusterNames is nil or empty, the
// previous placement for the given controller will be deleted. Returns a bool indicating if the GenericFederatedObject
// has changed.
func (spec *GenericFederatedObjectSpec) SetControllerPlacement(controller string, clusterNames []string) bool {
if len(clusterNames) == 0 {
return spec.DeleteControllerPlacement(controller)
Expand Down Expand Up @@ -140,6 +141,79 @@ func (spec *GenericFederatedObjectSpec) DeleteControllerPlacement(controller str
return true
}

// Overrides extensions

func (spec *GenericFederatedObjectSpec) GetControllerOverrides(controller string) []ClusterReferenceWithPatches {
for _, overrides := range spec.Overrides {
if overrides.Controller == controller {
return overrides.Override
}
}
return nil
}

// SetControllerOverrides sets the cluster overrides for a given controller. If clusterNames is nil or empty, the
// previous overrides for the given controller will be deleted. Returns a bool indicating if the GenericFederatedObject
// has changed.
func (spec *GenericFederatedObjectSpec) SetControllerOverrides(
controller string,
clusterOverrides []ClusterReferenceWithPatches,
) bool {
if len(clusterOverrides) == 0 {
return spec.DeleteControllerOverrides(controller)
}

// sort the clusters by name for readability and to avoid unnecessary updates
sort.Slice(clusterOverrides, func(i, j int) bool {
return clusterOverrides[i].Cluster < clusterOverrides[j].Cluster
})

oldOverridesWithControllerIdx := -1
for i := range spec.Overrides {
if spec.Overrides[i].Controller == controller {
oldOverridesWithControllerIdx = i
break
}
}

newOverridesWithController := OverrideWithController{
Controller: controller,
Override: clusterOverrides,
}
if oldOverridesWithControllerIdx == -1 {
spec.Overrides = append(spec.Overrides, newOverridesWithController)
return true
}
if !reflect.DeepEqual(newOverridesWithController, spec.Overrides[oldOverridesWithControllerIdx]) {
spec.Overrides[oldOverridesWithControllerIdx] = newOverridesWithController
return true
}

return false
}

// DeleteControllerOverrides deletes a controller's overrides, returning a bool to indicate if the
// GenericFederatedObject has changed.
func (spec *GenericFederatedObjectSpec) DeleteControllerOverrides(controller string) bool {
oldOverridesIdx := -1
for i := range spec.Overrides {
if spec.Overrides[i].Controller == controller {
oldOverridesIdx = i
break
}
}

if oldOverridesIdx == -1 {
return false
}

spec.Overrides = append(spec.Overrides[:oldOverridesIdx], spec.Overrides[(oldOverridesIdx+1):]...)
return true
}

// Template extensions

// GetTemplateAsUnstructured returns the FederatedObject's template unmarshalled into an *unstructured.Unstructured.
func (spec *GenericFederatedObjectSpec) GetTemplateAsUnstructured() (*unstructured.Unstructured, error) {
template := &unstructured.Unstructured{}
if err := template.UnmarshalJSON(spec.Template.Raw); err != nil {
Expand All @@ -148,6 +222,7 @@ func (spec *GenericFederatedObjectSpec) GetTemplateAsUnstructured() (*unstructur
return template, nil
}

// GetTemplateGVK returns the GVK of the FederatedObject's source object by parsing the FederatedObject's template.
func (spec *GenericFederatedObjectSpec) GetTemplateGVK() (schema.GroupVersionKind, error) {
type partialTypeMetadata struct {
metav1.TypeMeta `json:",inline"`
Expand Down
4 changes: 2 additions & 2 deletions pkg/controllers/common/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (
"strings"

"k8s.io/apimachinery/pkg/api/meta"
pkgruntime "k8s.io/apimachinery/pkg/runtime"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

// QualifiedName comprises a resource name with an optional namespace.
Expand All @@ -39,7 +39,7 @@ type QualifiedName struct {
Name string
}

func NewQualifiedName(obj pkgruntime.Object) QualifiedName {
func NewQualifiedName(obj metav1.Object) QualifiedName {
accessor, err := meta.Accessor(obj)
if err != nil {
// This should never happen, but if it does, the
Expand Down
102 changes: 52 additions & 50 deletions pkg/controllers/federate/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/dynamic"
Expand Down Expand Up @@ -112,7 +111,6 @@ func NewFederateController(
c.eventRecorder = eventsink.NewDefederatingRecorderMux(kubeClient, FederateControllerName, 6)
c.worker = worker.NewReconcileWorker[workerKey](
FederateControllerName,
nil,
c.reconcile,
worker.RateLimiterOptions{},
workerCount,
Expand All @@ -130,14 +128,15 @@ func NewFederateController(
uns := obj.(*unstructured.Unstructured)
return uns.GetNamespace() != fedSystemNamespace
},
Handler: eventhandlers.NewTriggerOnAllChanges(func(obj runtime.Object) {
uns := obj.(*unstructured.Unstructured)
c.worker.Enqueue(workerKey{
name: uns.GetName(),
namespace: uns.GetNamespace(),
gvk: ftc.GetSourceTypeGVK(),
})
}),
Handler: eventhandlers.NewTriggerOnAllChanges(
func(uns *unstructured.Unstructured) {
c.worker.Enqueue(workerKey{
name: uns.GetName(),
namespace: uns.GetNamespace(),
gvk: ftc.GetSourceTypeGVK(),
})
},
),
}
},
}); err != nil {
Expand All @@ -152,47 +151,53 @@ func NewFederateController(
fedObj := obj.(*fedcorev1a1.FederatedObject)
return fedObj.Namespace != fedSystemNamespace
},
Handler: eventhandlers.NewTriggerOnAllChanges(func(o runtime.Object) {
fedObj := o.(*fedcorev1a1.FederatedObject)
logger := c.logger.WithValues("federated-object", common.NewQualifiedName(fedObj))

srcMeta, err := fedObj.Spec.GetTemplateAsUnstructured()
if err != nil {
logger.Error(err, "Failed to get source object's metadata from FederatedObject")
return
}

gvk := srcMeta.GroupVersionKind()

c.worker.Enqueue(workerKey{
name: srcMeta.GetName(),
namespace: srcMeta.GetNamespace(),
gvk: gvk,
})
}),
Handler: eventhandlers.NewTriggerOnAllChanges(
func(fedObj *fedcorev1a1.FederatedObject) {
srcMeta, err := fedObj.Spec.GetTemplateAsUnstructured()
if err != nil {
c.logger.Error(
err,
"Failed to get source object's metadata from FederatedObject",
"object",
common.NewQualifiedName(fedObj),
)
return
}

gvk := srcMeta.GroupVersionKind()

c.worker.Enqueue(workerKey{
name: srcMeta.GetName(),
namespace: srcMeta.GetNamespace(),
gvk: gvk,
})
}),
}); err != nil {
return nil, err
}

if _, err := clusterFedObjectInformer.Informer().AddEventHandler(
eventhandlers.NewTriggerOnAllChanges(func(o runtime.Object) {
fedObj := o.(*fedcorev1a1.ClusterFederatedObject)
logger := c.logger.WithValues("cluster-federated-object", common.NewQualifiedName(fedObj))

srcMeta, err := fedObj.Spec.GetTemplateAsUnstructured()
if err != nil {
logger.Error(err, "Failed to get source object's metadata from ClusterFederatedObject")
return
}

gvk := srcMeta.GroupVersionKind()

c.worker.Enqueue(workerKey{
name: srcMeta.GetName(),
namespace: srcMeta.GetNamespace(),
gvk: gvk,
})
}),
eventhandlers.NewTriggerOnAllChanges(
func(fedObj *fedcorev1a1.ClusterFederatedObject) {
srcMeta, err := fedObj.Spec.GetTemplateAsUnstructured()
if err != nil {
logger.Error(
err,
"Failed to get source object's metadata from ClusterFederatedObject",
"object",
common.NewQualifiedName(fedObj),
)
return
}

gvk := srcMeta.GroupVersionKind()

c.worker.Enqueue(workerKey{
name: srcMeta.GetName(),
namespace: srcMeta.GetNamespace(),
gvk: gvk,
})
}),
); err != nil {
return nil, err
}
Expand Down Expand Up @@ -224,8 +229,7 @@ func (c *FederateController) HasSynced() bool {

func (c *FederateController) reconcile(ctx context.Context, key workerKey) (status worker.Result) {
_ = c.metrics.Rate("federate.throughput", 1)
ctx, logger := logging.InjectLogger(ctx, c.logger)
ctx, logger = logging.InjectLoggerValues(ctx, "source-object", key.QualifiedName().String(), "gvk", key.gvk)
ctx, logger := logging.InjectLoggerValues(ctx, "source-object", key.QualifiedName().String(), "gvk", key.gvk)

startTime := time.Now()

Expand Down Expand Up @@ -396,8 +400,6 @@ func (c *FederateController) reconcile(ctx context.Context, key workerKey) (stat
"Federated object updated: %s",
fedObject.GetName(),
)
} else {
logger.V(3).Info("No updates required to the federated object")
}

return worker.StatusAllOK
Expand Down
Loading
Loading