Skip to content

Commit

Permalink
Merge pull request #189 from limhawjia/fix-scheduler-ut
Browse files Browse the repository at this point in the history
fix: scheduler unit tests
  • Loading branch information
limhawjia authored Aug 4, 2023
2 parents c00dbec + e311011 commit 0800f8a
Show file tree
Hide file tree
Showing 2 changed files with 119 additions and 93 deletions.
65 changes: 37 additions & 28 deletions pkg/controllers/scheduler/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,36 +26,30 @@ import (
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/json"
"k8s.io/utils/pointer"

fedcorev1a1 "github.com/kubewharf/kubeadmiral/pkg/apis/core/v1alpha1"
fedtypesv1a1 "github.com/kubewharf/kubeadmiral/pkg/apis/types/v1alpha1"
"github.com/kubewharf/kubeadmiral/pkg/controllers/common"
"github.com/kubewharf/kubeadmiral/pkg/controllers/scheduler/framework"
)

func TestGetSchedulingUnit(t *testing.T) {
g := gomega.NewWithT(t)

fedObj := fedtypesv1a1.GenericObjectWithPlacements{
TypeMeta: metav1.TypeMeta{
APIVersion: fedtypesv1a1.SchemeGroupVersion.String(),
Kind: "FederatedDeployment",
},
fedObj := fedcorev1a1.FederatedObject{
ObjectMeta: metav1.ObjectMeta{
Name: "test",
Namespace: "default",
},
Spec: fedtypesv1a1.GenericSpecWithPlacements{
Placements: []fedtypesv1a1.PlacementWithController{
Spec: fedcorev1a1.GenericFederatedObjectSpec{
Placements: []fedcorev1a1.PlacementWithController{
{
Controller: "test-controller",
Placement: fedtypesv1a1.Placement{
Clusters: []fedtypesv1a1.GenericClusterReference{
{Name: "cluster-1"},
{Name: "cluster-2"},
{Name: "cluster-3"},
},
Placement: []fedcorev1a1.ClusterReference{
{Cluster: "cluster-1"},
{Cluster: "cluster-2"},
{Cluster: "cluster-3"},
},
},
},
Expand Down Expand Up @@ -114,16 +108,13 @@ func TestGetSchedulingUnit(t *testing.T) {
},
}

fedObjUns, err := runtime.DefaultUnstructuredConverter.ToUnstructured(&fedObj)
g.Expect(err).NotTo(gomega.HaveOccurred())
templateUns, err := runtime.DefaultUnstructuredConverter.ToUnstructured(&template)
g.Expect(err).NotTo(gomega.HaveOccurred())
err = unstructured.SetNestedMap(fedObjUns, templateUns, common.TemplatePath...)
rawJSON, err := json.Marshal(&template)
g.Expect(err).NotTo(gomega.HaveOccurred())
fedObj.Spec.Template.Raw = rawJSON

typeConfig := &fedcorev1a1.FederatedTypeConfig{
Spec: fedcorev1a1.FederatedTypeConfigSpec{
TargetType: fedcorev1a1.APIResource{
SourceType: fedcorev1a1.APIResource{
Group: "apps",
Version: "v1",
Kind: "Deployment",
Expand All @@ -136,7 +127,7 @@ func TestGetSchedulingUnit(t *testing.T) {
},
}

su, err := schedulingUnitForFedObject(typeConfig, &unstructured.Unstructured{Object: fedObjUns}, &policy)
su, err := schedulingUnitForFedObject(typeConfig, &fedObj, &policy)
g.Expect(err).NotTo(gomega.HaveOccurred())

g.Expect(su).To(gomega.Equal(&framework.SchedulingUnit{
Expand Down Expand Up @@ -351,7 +342,7 @@ func TestGetSchedulingUnitWithAnnotationOverrides(t *testing.T) {
"label": "value1",
},
MaxClusters: pointer.Int64(5),
Placements: []fedcorev1a1.ClusterReference{
Placements: []fedcorev1a1.DesiredPlacement{
{
Cluster: "cluster1",
},
Expand Down Expand Up @@ -407,10 +398,18 @@ func TestGetSchedulingUnitWithAnnotationOverrides(t *testing.T) {

var err error

obj := &unstructured.Unstructured{Object: make(map[string]interface{})}
testObj := &appsv1.Deployment{
TypeMeta: metav1.TypeMeta{
Kind: "Deployment",
APIVersion: "apps/v1",
},
}
rawJSON, err := json.Marshal(testObj)
g.Expect(err).ToNot(gomega.HaveOccurred())

obj := &fedcorev1a1.FederatedObject{}
obj.SetAnnotations(test.annotations)
err = unstructured.SetNestedMap(obj.Object, make(map[string]interface{}), common.TemplatePath...)
g.Expect(err).NotTo(gomega.HaveOccurred())
obj.Spec.Template.Raw = rawJSON

typeConfig := &fedcorev1a1.FederatedTypeConfig{
Spec: fedcorev1a1.FederatedTypeConfigSpec{
Expand Down Expand Up @@ -442,6 +441,10 @@ func TestGetSchedulingUnitWithAnnotationOverrides(t *testing.T) {

func TestSchedulingMode(t *testing.T) {
deploymentObj, err := runtime.DefaultUnstructuredConverter.ToUnstructured(&appsv1.Deployment{
TypeMeta: metav1.TypeMeta{
Kind: "Deployment",
APIVersion: "apps/v1",
},
ObjectMeta: metav1.ObjectMeta{
Name: "deployment",
Namespace: "default",
Expand All @@ -456,6 +459,10 @@ func TestSchedulingMode(t *testing.T) {
deploymentUns := &unstructured.Unstructured{Object: deploymentObj}

statefulSetObj, err := runtime.DefaultUnstructuredConverter.ToUnstructured(&appsv1.StatefulSet{
TypeMeta: metav1.TypeMeta{
Kind: "StatefulSet",
APIVersion: "apps/v1",
},
ObjectMeta: metav1.ObjectMeta{
Name: "statefulset",
Namespace: "default",
Expand Down Expand Up @@ -519,7 +526,7 @@ func TestSchedulingMode(t *testing.T) {
Name: "<ftc-name>",
},
Spec: fedcorev1a1.FederatedTypeConfigSpec{
TargetType: fedcorev1a1.APIResource{
SourceType: fedcorev1a1.APIResource{
Group: test.gvk.Group,
Version: test.gvk.Version,
Kind: test.gvk.Kind,
Expand All @@ -529,9 +536,11 @@ func TestSchedulingMode(t *testing.T) {
},
},
}
obj := &unstructured.Unstructured{Object: make(map[string]interface{})}
err := unstructured.SetNestedMap(obj.Object, test.obj.Object, common.TemplatePath...)
obj := &fedcorev1a1.FederatedObject{}
rawJSON, err := json.Marshal(test.obj)
g.Expect(err).NotTo(gomega.HaveOccurred())
obj.Spec.Template.Raw = rawJSON

su, err := schedulingUnitForFedObject(typeConfig, obj, test.policy)
g.Expect(err).NotTo(gomega.HaveOccurred())
g.Expect(su.SchedulingMode).To(gomega.Equal(test.expectedResult))
Expand Down
147 changes: 82 additions & 65 deletions pkg/controllers/scheduler/webhook_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,27 +35,29 @@ import (
"time"

"github.com/onsi/gomega"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
jsonutil "k8s.io/apimachinery/pkg/util/json"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/dynamic/dynamicinformer"
dynamicFake "k8s.io/client-go/dynamic/fake"
kubeFake "k8s.io/client-go/kubernetes/fake"
clienttesting "k8s.io/client-go/testing"
"k8s.io/client-go/tools/cache"
"k8s.io/klog/v2/ktesting"

fedcorev1a1 "github.com/kubewharf/kubeadmiral/pkg/apis/core/v1alpha1"
schedwebhookv1a1 "github.com/kubewharf/kubeadmiral/pkg/apis/schedulerwebhook/v1alpha1"
fedtypesv1a1 "github.com/kubewharf/kubeadmiral/pkg/apis/types/v1alpha1"
fedFake "github.com/kubewharf/kubeadmiral/pkg/client/clientset/versioned/fake"
fedinformers "github.com/kubewharf/kubeadmiral/pkg/client/informers/externalversions"
"github.com/kubewharf/kubeadmiral/pkg/controllers/common"
"github.com/kubewharf/kubeadmiral/pkg/controllers/scheduler/framework"
"github.com/kubewharf/kubeadmiral/pkg/controllers/util/pendingcontrollers"
schemautil "github.com/kubewharf/kubeadmiral/pkg/controllers/util/schema"
"github.com/kubewharf/kubeadmiral/pkg/stats"
"github.com/kubewharf/kubeadmiral/pkg/util/informermanager"
"github.com/kubewharf/kubeadmiral/pkg/util/pendingcontrollers"
)

// Generate a self-signed certificate and key pair.
Expand Down Expand Up @@ -227,85 +229,96 @@ func doTest(t *testing.T, clientTLS *fedcorev1a1.WebhookTLSConfig, serverTLS *tl
},
}

typeConfig := &fedcorev1a1.FederatedTypeConfig{
clusters := []runtime.Object{
getCluster("accept"),
getCluster("reject"),
}

ftc := fedcorev1a1.FederatedTypeConfig{
ObjectMeta: metav1.ObjectMeta{
Name: "deployment.apps",
},
Spec: fedcorev1a1.FederatedTypeConfigSpec{
FederatedType: fedcorev1a1.APIResource{
Group: fedtypesv1a1.SchemeGroupVersion.Group,
Version: fedcorev1a1.SchemeGroupVersion.Version,
Kind: "FederatedDeployment",
PluralName: "federateddeployments",
Scope: "Namespaced",
},
TargetType: fedcorev1a1.APIResource{
SourceType: fedcorev1a1.APIResource{
Group: "apps",
Version: "v1",
Kind: "Deployment",
PluralName: "deployments",
Scope: "Namespaced",
},
PathDefinition: fedcorev1a1.PathDefinition{
ReplicasSpec: "spec.replicas",
Scope: v1beta1.NamespaceScoped,
},
},
}

clusters := []runtime.Object{
getCluster("accept"),
getCluster("reject"),
}

kubeClient := kubeFake.NewSimpleClientset()

scheme := runtime.NewScheme()
scheme.AddKnownTypeWithName(
fedtypesv1a1.SchemeGroupVersion.WithKind(typeConfig.Spec.FederatedType.Kind+"List"),
&unstructured.UnstructuredList{},
)
err := corev1.AddToScheme(scheme)
g.Expect(err).ToNot(gomega.HaveOccurred())
err = appsv1.AddToScheme(scheme)
g.Expect(err).ToNot(gomega.HaveOccurred())

// Ensure watcher is started before creating objects to avoid missing events occurred after LIST and before WATCH
// ref: https://github.com/kubernetes/client-go/blob/master/examples/fake-client/main_test.go
watcherStarted := make(chan struct{})
dynamicClient := dynamicFake.NewSimpleDynamicClient(scheme)
dynamicClient.PrependWatchReactor("*", func(action clienttesting.Action) (handled bool, ret watch.Interface, err error) {
gvr := action.GetResource()
ns := action.GetNamespace()
watch, err := dynamicClient.Tracker().Watch(gvr, ns)
if err != nil {
return false, nil, err
}
close(watcherStarted)
return true, watch, nil
})
dynInformerFactory := dynamicinformer.NewDynamicSharedInformerFactory(dynamicClient, 0)

fedClient := fedFake.NewSimpleClientset(append(clusters, &webhookConfig, &profile, &policy)...)
testObjs := []runtime.Object{}
testObjs = append(testObjs, &webhookConfig, &profile, &policy, &ftc)
testObjs = append(testObjs, clusters...)

// Ensure watcher is started before creating objects to avoid missing events occurred after LIST and before WATCH
// ref: https://github.com/kubernetes/client-go/blob/master/examples/fake-client/main_test.go
watcherStarted := make(chan struct{})
fedClient := fedFake.NewSimpleClientset(testObjs...)
fedClient.PrependWatchReactor(
"*",
func(action clienttesting.Action) (handled bool, ret watch.Interface, err error) {
gvr := action.GetResource()
ns := action.GetNamespace()
watch, err := fedClient.Tracker().Watch(gvr, ns)
if err != nil {
return false, nil, err
}
select {
case <-watcherStarted:
default:
close(watcherStarted)
}
return true, watch, nil
},
)
fedInformerFactory := fedinformers.NewSharedInformerFactory(fedClient, 0)

federatedType := typeConfig.GetFederatedType()
gvr := schemautil.APIResourceToGVR(&federatedType)
manager := informermanager.NewInformerManager(
dynamicClient,
fedInformerFactory.Core().V1alpha1().FederatedTypeConfigs(),
nil,
)

scheduler, err := NewScheduler(
ktesting.NewLogger(t, ktesting.NewConfig(ktesting.Verbosity(3))),
typeConfig, kubeClient, fedClient, dynamicClient,
dynInformerFactory.ForResource(gvr),
kubeClient,
fedClient,
dynamicClient,
fedInformerFactory.Core().V1alpha1().FederatedObjects(),
fedInformerFactory.Core().V1alpha1().ClusterFederatedObjects(),
fedInformerFactory.Core().V1alpha1().PropagationPolicies(),
fedInformerFactory.Core().V1alpha1().ClusterPropagationPolicies(),
fedInformerFactory.Core().V1alpha1().FederatedClusters(),
fedInformerFactory.Core().V1alpha1().SchedulingProfiles(),
manager,
fedInformerFactory.Core().V1alpha1().SchedulerPluginWebhookConfigurations(),
stats.NewMock("test", "kube-admiral", false),
ktesting.NewLogger(t, ktesting.NewConfig(ktesting.Verbosity(3))),
1,
)
g.Expect(err).NotTo(gomega.HaveOccurred())

ctx := context.Background()
dynInformerFactory.Start(ctx.Done())
fedInformerFactory.Start(ctx.Done())
manager.Start(ctx)

go scheduler.Run(ctx)

dynInformerFactory.WaitForCacheSync(ctx.Done())
fedInformerFactory.WaitForCacheSync(ctx.Done())

cache.WaitForCacheSync(ctx.Done(), scheduler.HasSynced)
<-watcherStarted

// Wait for the plugin to be initialized
Expand All @@ -315,11 +328,7 @@ func doTest(t *testing.T, clientTLS *fedcorev1a1.WebhookTLSConfig, serverTLS *tl
g.Expect(plugin.(framework.Plugin).Name()).To(gomega.Equal(webhookConfig.Name))
}).WithContext(ctx).WithTimeout(3 * time.Second).WithPolling(100 * time.Millisecond).Should(gomega.Succeed())

fedObj := metav1.PartialObjectMetadata{
TypeMeta: metav1.TypeMeta{
APIVersion: fedtypesv1a1.SchemeGroupVersion.String(),
Kind: "FederatedDeployment",
},
fedObj := &fedcorev1a1.FederatedObject{
ObjectMeta: metav1.ObjectMeta{
Name: "test",
Namespace: "default",
Expand All @@ -331,23 +340,31 @@ func doTest(t *testing.T, clientTLS *fedcorev1a1.WebhookTLSConfig, serverTLS *tl
},
},
}

fedObjUns := &unstructured.Unstructured{}
fedObjUns.Object, err = runtime.DefaultUnstructuredConverter.ToUnstructured(&fedObj)
g.Expect(err).NotTo(gomega.HaveOccurred())
err = unstructured.SetNestedMap(fedObjUns.Object, map[string]interface{}{}, common.TemplatePath...)
deployment := &appsv1.Deployment{
TypeMeta: metav1.TypeMeta{
Kind: "Deployment",
APIVersion: "apps/v1",
},
ObjectMeta: metav1.ObjectMeta{
Name: "test",
Namespace: "default",
},
}
rawBytes, err := jsonutil.Marshal(deployment)
g.Expect(err).NotTo(gomega.HaveOccurred())
fedObj.Spec.Template.Raw = rawBytes

fedObjUns, err = dynamicClient.Resource(gvr).Namespace(fedObj.Namespace).Create(ctx, fedObjUns, metav1.CreateOptions{})
fedObj, err = fedClient.CoreV1alpha1().
FederatedObjects(fedObj.Namespace).
Create(ctx, fedObj, metav1.CreateOptions{})
g.Expect(err).NotTo(gomega.HaveOccurred())

g.Eventually(func(g gomega.Gomega) {
fedObjUns, err = dynamicClient.Resource(gvr).Namespace(fedObj.Namespace).Get(ctx, fedObj.Name, metav1.GetOptions{})
g.Expect(err).NotTo(gomega.HaveOccurred())
fedObj := fedtypesv1a1.GenericObjectWithPlacements{}
err = runtime.DefaultUnstructuredConverter.FromUnstructured(fedObjUns.Object, &fedObj)
res, err := fedClient.CoreV1alpha1().
FederatedObjects(fedObj.Namespace).
Get(ctx, fedObj.Name, metav1.GetOptions{})
g.Expect(err).NotTo(gomega.HaveOccurred())
g.Expect(fedObj.ClusterNameUnion()).To(gomega.Equal(map[string]struct{}{"accept": {}}))
g.Expect(res.GetSpec().GetPlacementUnion()).To(gomega.Equal(sets.New("accept")))
}).WithContext(ctx).WithTimeout(3 * time.Second).WithPolling(100 * time.Millisecond).Should(gomega.Succeed())

g.Expect(filterCalled.Load()).To(gomega.Equal(int32(len(clusters))))
Expand Down

0 comments on commit 0800f8a

Please sign in to comment.