Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🌱 Build all hub controller as a single controller #683

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion cmd/registration-operator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func main() {
func newNucleusCommand() *cobra.Command {
cmd := &cobra.Command{
Use: "registration-operator",
Short: "Nucleus Operator",
Short: "Registration Operator",
Run: func(cmd *cobra.Command, args []string) {
_ = cmd.Help()
os.Exit(1)
Expand All @@ -51,6 +51,7 @@ func newNucleusCommand() *cobra.Command {
}

cmd.AddCommand(hub.NewHubOperatorCmd())
cmd.AddCommand(hub.NewHubManagerCmd())
cmd.AddCommand(spoke.NewKlusterletOperatorCmd())
cmd.AddCommand(spoke.NewKlusterletAgentCmd())

Expand Down
27 changes: 27 additions & 0 deletions pkg/cmd/hub/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,14 @@ import (
"context"

"github.com/spf13/cobra"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"

ocmfeature "open-cluster-management.io/api/feature"

commonoptions "open-cluster-management.io/ocm/pkg/common/options"
"open-cluster-management.io/ocm/pkg/features"
"open-cluster-management.io/ocm/pkg/operator/operators/clustermanager"
singletonhub "open-cluster-management.io/ocm/pkg/singleton/hub"
"open-cluster-management.io/ocm/pkg/version"
)

Expand All @@ -30,3 +35,25 @@ func NewHubOperatorCmd() *cobra.Command {
opts.AddFlags(flags)
return cmd
}

// NewHubManagerCmd is to start the singleton manager including registration/work/placement/addon
func NewHubManagerCmd() *cobra.Command {
opts := commonoptions.NewOptions()
hubOpts := singletonhub.NewOption()

cmdConfig := opts.
NewControllerCommandConfig("ocm-controller", version.Get(), hubOpts.RunControllerManager)
cmd := cmdConfig.NewCommandWithContext(context.TODO())
cmd.Use = "manager"
cmd.Short = "Start the ocm manager"

flags := cmd.Flags()

opts.AddFlags(flags)

utilruntime.Must(features.HubMutableFeatureGate.Add(ocmfeature.DefaultHubWorkFeatureGates))
utilruntime.Must(features.HubMutableFeatureGate.Add(ocmfeature.DefaultHubRegistrationFeatureGates))
utilruntime.Must(features.HubMutableFeatureGate.Add(ocmfeature.DefaultHubAddonManagerFeatureGates))
features.HubMutableFeatureGate.AddFlag(flags)
return cmd
}
10 changes: 5 additions & 5 deletions pkg/registration/hub/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ import (
clusterv1client "open-cluster-management.io/api/client/cluster/clientset/versioned"
clusterscheme "open-cluster-management.io/api/client/cluster/clientset/versioned/scheme"
clusterv1informers "open-cluster-management.io/api/client/cluster/informers/externalversions"
workv1client "open-cluster-management.io/api/client/work/clientset/versioned"
workv1informers "open-cluster-management.io/api/client/work/informers/externalversions"
workclient "open-cluster-management.io/api/client/work/clientset/versioned"
workinformers "open-cluster-management.io/api/client/work/informers/externalversions"
clusterv1 "open-cluster-management.io/api/cluster/v1"
ocmfeature "open-cluster-management.io/api/feature"

Expand Down Expand Up @@ -86,7 +86,7 @@ func (m *HubManagerOptions) RunControllerManager(ctx context.Context, controller
return err
}

workClient, err := workv1client.NewForConfig(controllerContext.KubeConfig)
workClient, err := workclient.NewForConfig(controllerContext.KubeConfig)
if err != nil {
return err
}
Expand All @@ -98,7 +98,7 @@ func (m *HubManagerOptions) RunControllerManager(ctx context.Context, controller

clusterInformers := clusterv1informers.NewSharedInformerFactory(clusterClient, 30*time.Minute)
clusterProfileInformers := cpinformerv1alpha1.NewSharedInformerFactory(clusterProfileClient, 30*time.Minute)
workInformers := workv1informers.NewSharedInformerFactory(workClient, 30*time.Minute)
workInformers := workinformers.NewSharedInformerFactory(workClient, 30*time.Minute)
kubeInfomers := kubeinformers.NewSharedInformerFactoryWithOptions(kubeClient, 30*time.Minute, kubeinformers.WithTweakListOptions(
func(listOptions *metav1.ListOptions) {
// Note all kube resources managed by registration should have the cluster label, and should not have
Expand Down Expand Up @@ -137,7 +137,7 @@ func (m *HubManagerOptions) RunControllerManagerWithInformers(
kubeInformers kubeinformers.SharedInformerFactory,
clusterInformers clusterv1informers.SharedInformerFactory,
clusterProfileInformers cpinformerv1alpha1.SharedInformerFactory,
workInformers workv1informers.SharedInformerFactory,
workInformers workinformers.SharedInformerFactory,
addOnInformers addoninformers.SharedInformerFactory,
) error {
csrApprover, err := csr.NewCSRApprover(kubeClient, kubeInformers, m.ClusterAutoApprovalUsers, controllerContext.EventRecorder)
Expand Down
215 changes: 215 additions & 0 deletions pkg/singleton/hub/manager.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,215 @@
package hub

import (
"context"
"time"

"github.com/openshift/library-go/pkg/controller/controllercmd"
"github.com/spf13/pflag"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/dynamic/dynamicinformer"
kubeinformers "k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/metadata"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/klog/v2"
cpclientset "sigs.k8s.io/cluster-inventory-api/client/clientset/versioned"
cpinformerv1alpha1 "sigs.k8s.io/cluster-inventory-api/client/informers/externalversions"

addonv1alpha1 "open-cluster-management.io/api/addon/v1alpha1"
addonclient "open-cluster-management.io/api/client/addon/clientset/versioned"
addoninformers "open-cluster-management.io/api/client/addon/informers/externalversions"
clusterv1client "open-cluster-management.io/api/client/cluster/clientset/versioned"
clusterv1informers "open-cluster-management.io/api/client/cluster/informers/externalversions"
workclientset "open-cluster-management.io/api/client/work/clientset/versioned"
workinformers "open-cluster-management.io/api/client/work/informers/externalversions"
clusterv1 "open-cluster-management.io/api/cluster/v1"
ocmfeature "open-cluster-management.io/api/feature"
"open-cluster-management.io/sdk-go/pkg/cloudevents/generic"
"open-cluster-management.io/sdk-go/pkg/cloudevents/work"
"open-cluster-management.io/sdk-go/pkg/cloudevents/work/source/codec"
"open-cluster-management.io/sdk-go/pkg/cloudevents/work/store"

"open-cluster-management.io/ocm/pkg/addon"
"open-cluster-management.io/ocm/pkg/features"
placementcontrollers "open-cluster-management.io/ocm/pkg/placement/controllers"
registrationhub "open-cluster-management.io/ocm/pkg/registration/hub"
workhub "open-cluster-management.io/ocm/pkg/work/hub"
)

const sourceID = "ocm-controller"

type Option struct {
RegistrationOption *registrationhub.HubManagerOptions
WorkOption *workhub.WorkHubManagerOptions
}

func NewOption() *Option {
return &Option{
RegistrationOption: registrationhub.NewHubManagerOptions(),
WorkOption: workhub.NewWorkHubManagerOptions(),
}
}

func (o *Option) AddFlags(fs *pflag.FlagSet) {
o.RegistrationOption.AddFlags(fs)
o.WorkOption.AddFlags(fs)
}

func (o *Option) RunControllerManager(ctx context.Context, controllerContext *controllercmd.ControllerContext) error {
kubeClient, err := kubernetes.NewForConfig(controllerContext.KubeConfig)
if err != nil {
return err
}

metadataClient, err := metadata.NewForConfig(controllerContext.KubeConfig)
if err != nil {
return err
}

clusterClient, err := clusterv1client.NewForConfig(controllerContext.KubeConfig)
if err != nil {
return err
}

clusterProfileClient, err := cpclientset.NewForConfig(controllerContext.KubeConfig)
if err != nil {
return err
}

addOnClient, err := addonclient.NewForConfig(controllerContext.KubeConfig)
if err != nil {
return err
}

clusterInformers := clusterv1informers.NewSharedInformerFactory(clusterClient, 30*time.Minute)
clusterProfileInformers := cpinformerv1alpha1.NewSharedInformerFactory(clusterProfileClient, 30*time.Minute)
kubeInfomers := kubeinformers.NewSharedInformerFactoryWithOptions(kubeClient, 30*time.Minute, kubeinformers.WithTweakListOptions(
func(listOptions *metav1.ListOptions) {
// Note all kube resources managed by registration should have the cluster label, and should not have
// the addon label.
selector := &metav1.LabelSelector{
MatchExpressions: []metav1.LabelSelectorRequirement{
{
Key: clusterv1.ClusterNameLabelKey,
Operator: metav1.LabelSelectorOpExists,
},
{
Key: addonv1alpha1.AddonLabelKey,
Operator: metav1.LabelSelectorOpDoesNotExist,
},
},
}
listOptions.LabelSelector = metav1.FormatLabelSelector(selector)
}))
addOnInformers := addoninformers.NewSharedInformerFactory(addOnClient, 30*time.Minute)

var workClient workclientset.Interface
var watcherStore *store.SourceInformerWatcherStore

if o.WorkOption.WorkDriver == "kube" {
config := controllerContext.KubeConfig
if o.WorkOption.WorkDriverConfig != "" {
config, err = clientcmd.BuildConfigFromFlags("", o.WorkOption.WorkDriverConfig)
if err != nil {
return err
}
}

workClient, err = workclientset.NewForConfig(config)
if err != nil {
return err
}
} else {
// For cloudevents drivers, we build ManifestWork client that implements the
// ManifestWorkInterface and ManifestWork informer based on different driver configuration.
// Refer to Event Based Manifestwork proposal in enhancements repo to get more details.

watcherStore = store.NewSourceInformerWatcherStore(ctx)

_, config, err := generic.NewConfigLoader(o.WorkOption.WorkDriver, o.WorkOption.WorkDriverConfig).
LoadConfig()
if err != nil {
return err
}

clientHolder, err := work.NewClientHolderBuilder(config).
WithClientID(o.WorkOption.CloudEventsClientID).
WithSourceID(sourceID).
WithCodecs(codec.NewManifestBundleCodec()).
WithWorkClientWatcherStore(watcherStore).
NewSourceClientHolder(ctx)
if err != nil {
return err
}

workClient = clientHolder.WorkInterface()
}

workInformers := workinformers.NewSharedInformerFactoryWithOptions(workClient, 30*time.Minute)
informer := workInformers.Work().V1().ManifestWorks()

// For cloudevents work client, we use the informer store as the client store
if watcherStore != nil {
watcherStore.SetStore(informer.Informer().GetStore())
}

// start registration component
go func() {
err := o.RegistrationOption.RunControllerManagerWithInformers(
ctx, controllerContext,
kubeClient, metadataClient, clusterClient, clusterProfileClient, addOnClient,
kubeInfomers, clusterInformers, clusterProfileInformers, workInformers, addOnInformers,
)
if err != nil {
klog.Fatal(err)
}
}()

// start placement component
go func() {
err := placementcontrollers.RunControllerManagerWithInformers(
ctx, controllerContext, kubeClient, clusterClient, clusterInformers)
if err != nil {
klog.Fatal(err)
}
}()

// start work component
if features.HubMutableFeatureGate.Enabled(ocmfeature.ManifestWorkReplicaSet) {
// build a hub work client for ManifestWorkReplicaSets
replicaSetsClient, err := workclientset.NewForConfig(controllerContext.KubeConfig)
if err != nil {
return err
}
go func() {
err := workhub.RunControllerManagerWithInformers(
ctx, controllerContext, replicaSetsClient, workClient, informer, clusterInformers)
if err != nil {
klog.Fatal(err)
}
}()
}

// start addon component
if features.HubMutableFeatureGate.Enabled(ocmfeature.AddonManagement) {
dynamicClient, err := dynamic.NewForConfig(controllerContext.KubeConfig)
if err != nil {
return err
}
dynamicInformers := dynamicinformer.NewDynamicSharedInformerFactory(dynamicClient, 10*time.Minute)
go func() {
err := addon.RunControllerManagerWithInformers(
ctx, controllerContext, kubeClient, addOnClient, workClient,
clusterInformers, addOnInformers, workInformers, dynamicInformers,
)
if err != nil {
klog.Fatal(err)
}
}()
}

<-ctx.Done()
return nil
}
Loading