diff --git a/pkg/controllers/scheduler/scheduler_test.go b/pkg/controllers/scheduler/scheduler_test.go index f7c0979a..4614743d 100644 --- a/pkg/controllers/scheduler/scheduler_test.go +++ b/pkg/controllers/scheduler/scheduler_test.go @@ -26,10 +26,10 @@ import ( "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/util/json" "k8s.io/utils/pointer" fedcorev1a1 "github.com/kubewharf/kubeadmiral/pkg/apis/core/v1alpha1" - fedtypesv1a1 "github.com/kubewharf/kubeadmiral/pkg/apis/types/v1alpha1" "github.com/kubewharf/kubeadmiral/pkg/controllers/common" "github.com/kubewharf/kubeadmiral/pkg/controllers/scheduler/framework" ) @@ -37,25 +37,19 @@ import ( func TestGetSchedulingUnit(t *testing.T) { g := gomega.NewWithT(t) - fedObj := fedtypesv1a1.GenericObjectWithPlacements{ - TypeMeta: metav1.TypeMeta{ - APIVersion: fedtypesv1a1.SchemeGroupVersion.String(), - Kind: "FederatedDeployment", - }, + fedObj := fedcorev1a1.FederatedObject{ ObjectMeta: metav1.ObjectMeta{ Name: "test", Namespace: "default", }, - Spec: fedtypesv1a1.GenericSpecWithPlacements{ - Placements: []fedtypesv1a1.PlacementWithController{ + Spec: fedcorev1a1.GenericFederatedObjectSpec{ + Placements: []fedcorev1a1.PlacementWithController{ { Controller: "test-controller", - Placement: fedtypesv1a1.Placement{ - Clusters: []fedtypesv1a1.GenericClusterReference{ - {Name: "cluster-1"}, - {Name: "cluster-2"}, - {Name: "cluster-3"}, - }, + Placement: []fedcorev1a1.ClusterReference{ + {Cluster: "cluster-1"}, + {Cluster: "cluster-2"}, + {Cluster: "cluster-3"}, }, }, }, @@ -114,16 +108,13 @@ func TestGetSchedulingUnit(t *testing.T) { }, } - fedObjUns, err := runtime.DefaultUnstructuredConverter.ToUnstructured(&fedObj) - g.Expect(err).NotTo(gomega.HaveOccurred()) - templateUns, err := runtime.DefaultUnstructuredConverter.ToUnstructured(&template) - g.Expect(err).NotTo(gomega.HaveOccurred()) - err = unstructured.SetNestedMap(fedObjUns, templateUns, common.TemplatePath...) + rawJSON, err := json.Marshal(&template) g.Expect(err).NotTo(gomega.HaveOccurred()) + fedObj.Spec.Template.Raw = rawJSON typeConfig := &fedcorev1a1.FederatedTypeConfig{ Spec: fedcorev1a1.FederatedTypeConfigSpec{ - TargetType: fedcorev1a1.APIResource{ + SourceType: fedcorev1a1.APIResource{ Group: "apps", Version: "v1", Kind: "Deployment", @@ -136,7 +127,7 @@ func TestGetSchedulingUnit(t *testing.T) { }, } - su, err := schedulingUnitForFedObject(typeConfig, &unstructured.Unstructured{Object: fedObjUns}, &policy) + su, err := schedulingUnitForFedObject(typeConfig, &fedObj, &policy) g.Expect(err).NotTo(gomega.HaveOccurred()) g.Expect(su).To(gomega.Equal(&framework.SchedulingUnit{ @@ -351,7 +342,7 @@ func TestGetSchedulingUnitWithAnnotationOverrides(t *testing.T) { "label": "value1", }, MaxClusters: pointer.Int64(5), - Placements: []fedcorev1a1.ClusterReference{ + Placements: []fedcorev1a1.DesiredPlacement{ { Cluster: "cluster1", }, @@ -407,10 +398,18 @@ func TestGetSchedulingUnitWithAnnotationOverrides(t *testing.T) { var err error - obj := &unstructured.Unstructured{Object: make(map[string]interface{})} + testObj := &appsv1.Deployment{ + TypeMeta: metav1.TypeMeta{ + Kind: "Deployment", + APIVersion: "apps/v1", + }, + } + rawJSON, err := json.Marshal(testObj) + g.Expect(err).ToNot(gomega.HaveOccurred()) + + obj := &fedcorev1a1.FederatedObject{} obj.SetAnnotations(test.annotations) - err = unstructured.SetNestedMap(obj.Object, make(map[string]interface{}), common.TemplatePath...) - g.Expect(err).NotTo(gomega.HaveOccurred()) + obj.Spec.Template.Raw = rawJSON typeConfig := &fedcorev1a1.FederatedTypeConfig{ Spec: fedcorev1a1.FederatedTypeConfigSpec{ @@ -442,6 +441,10 @@ func TestGetSchedulingUnitWithAnnotationOverrides(t *testing.T) { func TestSchedulingMode(t *testing.T) { deploymentObj, err := runtime.DefaultUnstructuredConverter.ToUnstructured(&appsv1.Deployment{ + TypeMeta: metav1.TypeMeta{ + Kind: "Deployment", + APIVersion: "apps/v1", + }, ObjectMeta: metav1.ObjectMeta{ Name: "deployment", Namespace: "default", @@ -456,6 +459,10 @@ func TestSchedulingMode(t *testing.T) { deploymentUns := &unstructured.Unstructured{Object: deploymentObj} statefulSetObj, err := runtime.DefaultUnstructuredConverter.ToUnstructured(&appsv1.StatefulSet{ + TypeMeta: metav1.TypeMeta{ + Kind: "StatefulSet", + APIVersion: "apps/v1", + }, ObjectMeta: metav1.ObjectMeta{ Name: "statefulset", Namespace: "default", @@ -519,7 +526,7 @@ func TestSchedulingMode(t *testing.T) { Name: "", }, Spec: fedcorev1a1.FederatedTypeConfigSpec{ - TargetType: fedcorev1a1.APIResource{ + SourceType: fedcorev1a1.APIResource{ Group: test.gvk.Group, Version: test.gvk.Version, Kind: test.gvk.Kind, @@ -529,9 +536,11 @@ func TestSchedulingMode(t *testing.T) { }, }, } - obj := &unstructured.Unstructured{Object: make(map[string]interface{})} - err := unstructured.SetNestedMap(obj.Object, test.obj.Object, common.TemplatePath...) + obj := &fedcorev1a1.FederatedObject{} + rawJSON, err := json.Marshal(test.obj) g.Expect(err).NotTo(gomega.HaveOccurred()) + obj.Spec.Template.Raw = rawJSON + su, err := schedulingUnitForFedObject(typeConfig, obj, test.policy) g.Expect(err).NotTo(gomega.HaveOccurred()) g.Expect(su.SchedulingMode).To(gomega.Equal(test.expectedResult)) diff --git a/pkg/controllers/scheduler/webhook_test.go b/pkg/controllers/scheduler/webhook_test.go index 77090a64..3e3bd7c6 100644 --- a/pkg/controllers/scheduler/webhook_test.go +++ b/pkg/controllers/scheduler/webhook_test.go @@ -35,27 +35,29 @@ import ( "time" "github.com/onsi/gomega" + appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" + "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime" + jsonutil "k8s.io/apimachinery/pkg/util/json" + "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/watch" "k8s.io/client-go/dynamic/dynamicinformer" dynamicFake "k8s.io/client-go/dynamic/fake" kubeFake "k8s.io/client-go/kubernetes/fake" clienttesting "k8s.io/client-go/testing" + "k8s.io/client-go/tools/cache" "k8s.io/klog/v2/ktesting" fedcorev1a1 "github.com/kubewharf/kubeadmiral/pkg/apis/core/v1alpha1" schedwebhookv1a1 "github.com/kubewharf/kubeadmiral/pkg/apis/schedulerwebhook/v1alpha1" - fedtypesv1a1 "github.com/kubewharf/kubeadmiral/pkg/apis/types/v1alpha1" fedFake "github.com/kubewharf/kubeadmiral/pkg/client/clientset/versioned/fake" fedinformers "github.com/kubewharf/kubeadmiral/pkg/client/informers/externalversions" - "github.com/kubewharf/kubeadmiral/pkg/controllers/common" "github.com/kubewharf/kubeadmiral/pkg/controllers/scheduler/framework" - "github.com/kubewharf/kubeadmiral/pkg/controllers/util/pendingcontrollers" - schemautil "github.com/kubewharf/kubeadmiral/pkg/controllers/util/schema" "github.com/kubewharf/kubeadmiral/pkg/stats" + "github.com/kubewharf/kubeadmiral/pkg/util/informermanager" + "github.com/kubewharf/kubeadmiral/pkg/util/pendingcontrollers" ) // Generate a self-signed certificate and key pair. @@ -227,72 +229,84 @@ func doTest(t *testing.T, clientTLS *fedcorev1a1.WebhookTLSConfig, serverTLS *tl }, } - typeConfig := &fedcorev1a1.FederatedTypeConfig{ + clusters := []runtime.Object{ + getCluster("accept"), + getCluster("reject"), + } + + ftc := fedcorev1a1.FederatedTypeConfig{ + ObjectMeta: metav1.ObjectMeta{ + Name: "deployment.apps", + }, Spec: fedcorev1a1.FederatedTypeConfigSpec{ - FederatedType: fedcorev1a1.APIResource{ - Group: fedtypesv1a1.SchemeGroupVersion.Group, - Version: fedcorev1a1.SchemeGroupVersion.Version, - Kind: "FederatedDeployment", - PluralName: "federateddeployments", - Scope: "Namespaced", - }, - TargetType: fedcorev1a1.APIResource{ + SourceType: fedcorev1a1.APIResource{ Group: "apps", Version: "v1", Kind: "Deployment", PluralName: "deployments", - Scope: "Namespaced", - }, - PathDefinition: fedcorev1a1.PathDefinition{ - ReplicasSpec: "spec.replicas", + Scope: v1beta1.NamespaceScoped, }, }, } - clusters := []runtime.Object{ - getCluster("accept"), - getCluster("reject"), - } - kubeClient := kubeFake.NewSimpleClientset() scheme := runtime.NewScheme() - scheme.AddKnownTypeWithName( - fedtypesv1a1.SchemeGroupVersion.WithKind(typeConfig.Spec.FederatedType.Kind+"List"), - &unstructured.UnstructuredList{}, - ) + err := corev1.AddToScheme(scheme) + g.Expect(err).ToNot(gomega.HaveOccurred()) + err = appsv1.AddToScheme(scheme) + g.Expect(err).ToNot(gomega.HaveOccurred()) - // Ensure watcher is started before creating objects to avoid missing events occurred after LIST and before WATCH - // ref: https://github.com/kubernetes/client-go/blob/master/examples/fake-client/main_test.go - watcherStarted := make(chan struct{}) dynamicClient := dynamicFake.NewSimpleDynamicClient(scheme) - dynamicClient.PrependWatchReactor("*", func(action clienttesting.Action) (handled bool, ret watch.Interface, err error) { - gvr := action.GetResource() - ns := action.GetNamespace() - watch, err := dynamicClient.Tracker().Watch(gvr, ns) - if err != nil { - return false, nil, err - } - close(watcherStarted) - return true, watch, nil - }) dynInformerFactory := dynamicinformer.NewDynamicSharedInformerFactory(dynamicClient, 0) - fedClient := fedFake.NewSimpleClientset(append(clusters, &webhookConfig, &profile, &policy)...) + testObjs := []runtime.Object{} + testObjs = append(testObjs, &webhookConfig, &profile, &policy, &ftc) + testObjs = append(testObjs, clusters...) + + // Ensure watcher is started before creating objects to avoid missing events occurred after LIST and before WATCH + // ref: https://github.com/kubernetes/client-go/blob/master/examples/fake-client/main_test.go + watcherStarted := make(chan struct{}) + fedClient := fedFake.NewSimpleClientset(testObjs...) + fedClient.PrependWatchReactor( + "*", + func(action clienttesting.Action) (handled bool, ret watch.Interface, err error) { + gvr := action.GetResource() + ns := action.GetNamespace() + watch, err := fedClient.Tracker().Watch(gvr, ns) + if err != nil { + return false, nil, err + } + select { + case <-watcherStarted: + default: + close(watcherStarted) + } + return true, watch, nil + }, + ) fedInformerFactory := fedinformers.NewSharedInformerFactory(fedClient, 0) - federatedType := typeConfig.GetFederatedType() - gvr := schemautil.APIResourceToGVR(&federatedType) + manager := informermanager.NewInformerManager( + dynamicClient, + fedInformerFactory.Core().V1alpha1().FederatedTypeConfigs(), + nil, + ) + scheduler, err := NewScheduler( - ktesting.NewLogger(t, ktesting.NewConfig(ktesting.Verbosity(3))), - typeConfig, kubeClient, fedClient, dynamicClient, - dynInformerFactory.ForResource(gvr), + kubeClient, + fedClient, + dynamicClient, + fedInformerFactory.Core().V1alpha1().FederatedObjects(), + fedInformerFactory.Core().V1alpha1().ClusterFederatedObjects(), fedInformerFactory.Core().V1alpha1().PropagationPolicies(), fedInformerFactory.Core().V1alpha1().ClusterPropagationPolicies(), fedInformerFactory.Core().V1alpha1().FederatedClusters(), fedInformerFactory.Core().V1alpha1().SchedulingProfiles(), + manager, fedInformerFactory.Core().V1alpha1().SchedulerPluginWebhookConfigurations(), stats.NewMock("test", "kube-admiral", false), + ktesting.NewLogger(t, ktesting.NewConfig(ktesting.Verbosity(3))), 1, ) g.Expect(err).NotTo(gomega.HaveOccurred()) @@ -300,12 +314,11 @@ func doTest(t *testing.T, clientTLS *fedcorev1a1.WebhookTLSConfig, serverTLS *tl ctx := context.Background() dynInformerFactory.Start(ctx.Done()) fedInformerFactory.Start(ctx.Done()) + manager.Start(ctx) go scheduler.Run(ctx) - dynInformerFactory.WaitForCacheSync(ctx.Done()) - fedInformerFactory.WaitForCacheSync(ctx.Done()) - + cache.WaitForCacheSync(ctx.Done(), scheduler.HasSynced) <-watcherStarted // Wait for the plugin to be initialized @@ -315,11 +328,7 @@ func doTest(t *testing.T, clientTLS *fedcorev1a1.WebhookTLSConfig, serverTLS *tl g.Expect(plugin.(framework.Plugin).Name()).To(gomega.Equal(webhookConfig.Name)) }).WithContext(ctx).WithTimeout(3 * time.Second).WithPolling(100 * time.Millisecond).Should(gomega.Succeed()) - fedObj := metav1.PartialObjectMetadata{ - TypeMeta: metav1.TypeMeta{ - APIVersion: fedtypesv1a1.SchemeGroupVersion.String(), - Kind: "FederatedDeployment", - }, + fedObj := &fedcorev1a1.FederatedObject{ ObjectMeta: metav1.ObjectMeta{ Name: "test", Namespace: "default", @@ -331,23 +340,31 @@ func doTest(t *testing.T, clientTLS *fedcorev1a1.WebhookTLSConfig, serverTLS *tl }, }, } - - fedObjUns := &unstructured.Unstructured{} - fedObjUns.Object, err = runtime.DefaultUnstructuredConverter.ToUnstructured(&fedObj) - g.Expect(err).NotTo(gomega.HaveOccurred()) - err = unstructured.SetNestedMap(fedObjUns.Object, map[string]interface{}{}, common.TemplatePath...) + deployment := &appsv1.Deployment{ + TypeMeta: metav1.TypeMeta{ + Kind: "Deployment", + APIVersion: "apps/v1", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "test", + Namespace: "default", + }, + } + rawBytes, err := jsonutil.Marshal(deployment) g.Expect(err).NotTo(gomega.HaveOccurred()) + fedObj.Spec.Template.Raw = rawBytes - fedObjUns, err = dynamicClient.Resource(gvr).Namespace(fedObj.Namespace).Create(ctx, fedObjUns, metav1.CreateOptions{}) + fedObj, err = fedClient.CoreV1alpha1(). + FederatedObjects(fedObj.Namespace). + Create(ctx, fedObj, metav1.CreateOptions{}) g.Expect(err).NotTo(gomega.HaveOccurred()) g.Eventually(func(g gomega.Gomega) { - fedObjUns, err = dynamicClient.Resource(gvr).Namespace(fedObj.Namespace).Get(ctx, fedObj.Name, metav1.GetOptions{}) - g.Expect(err).NotTo(gomega.HaveOccurred()) - fedObj := fedtypesv1a1.GenericObjectWithPlacements{} - err = runtime.DefaultUnstructuredConverter.FromUnstructured(fedObjUns.Object, &fedObj) + res, err := fedClient.CoreV1alpha1(). + FederatedObjects(fedObj.Namespace). + Get(ctx, fedObj.Name, metav1.GetOptions{}) g.Expect(err).NotTo(gomega.HaveOccurred()) - g.Expect(fedObj.ClusterNameUnion()).To(gomega.Equal(map[string]struct{}{"accept": {}})) + g.Expect(res.GetSpec().GetPlacementUnion()).To(gomega.Equal(sets.New("accept"))) }).WithContext(ctx).WithTimeout(3 * time.Second).WithPolling(100 * time.Millisecond).Should(gomega.Succeed()) g.Expect(filterCalled.Load()).To(gomega.Equal(int32(len(clusters))))