Skip to content

Commit

Permalink
[FIX] use discovery api instead of reading CRD (#147)
Browse files Browse the repository at this point in the history
CHANGES:
- use discovery API to check existence of `ServiceMonitor` capability
rather than fetching the `CustomResourceDefinition`
- remove unused `apiextensions` typed client
- adjust tests - `common_test` enhanced to supply discoverable resources
- refactor `ServiceMonitor` reconciliation to reduce complexity (sonar) 

Controller image for testing:
`ghcr.io/skrishnan-sap/cap-operator/controller:sha-444fa21`
  • Loading branch information
skrishnan-sap authored Oct 25, 2024
1 parent 4604b77 commit 49e0e2e
Show file tree
Hide file tree
Showing 10 changed files with 84 additions and 146 deletions.
8 changes: 1 addition & 7 deletions cmd/controller/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

promop "github.com/prometheus-operator/prometheus-operator/pkg/client/versioned"
apiext "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset"
)

const (
Expand All @@ -55,11 +54,6 @@ func main() {
klog.Fatal("could not create client for custom resources: ", err.Error())
}

apiExtClient, err := apiext.NewForConfig(config)
if err != nil {
klog.Fatal("could not create client for api-extensions: ", err.Error())
}

promClient, err := promop.NewForConfig(config)
if err != nil {
klog.Fatal("could not create client for prometheus-operator resources: ", err.Error())
Expand Down Expand Up @@ -120,7 +114,7 @@ func main() {
Callbacks: leaderelection.LeaderCallbacks{
OnStartedLeading: func(ctx context.Context) {
klog.InfoS("Started leading: ", LeaseLockName, leaseLockId)
c := controller.NewController(coreClient, crdClient, istioClient, certClient, certManagerClient, dnsClient, apiExtClient, promClient)
c := controller.NewController(coreClient, crdClient, istioClient, certClient, certManagerClient, dnsClient, promClient)
go c.Start(ctx)
},
OnStoppedLeading: func() {
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ require (
istio.io/api v1.23.2
istio.io/client-go v1.23.2
k8s.io/api v0.31.1
k8s.io/apiextensions-apiserver v0.31.1
k8s.io/apimachinery v0.31.1
k8s.io/client-go v0.31.1
k8s.io/code-generator v0.31.1
Expand Down Expand Up @@ -78,6 +77,7 @@ require (
gopkg.in/inf.v0 v0.9.1 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
k8s.io/apiextensions-apiserver v0.31.1 // indirect
k8s.io/gengo/v2 v2.0.0-20240826214909-a7b603a56eb7 // indirect
k8s.io/kube-openapi v0.0.0-20241009091222-67ed5848f094 // indirect
k8s.io/utils v0.0.0-20240921022957-49e7df575cb6 // indirect
Expand Down
43 changes: 29 additions & 14 deletions internal/controller/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,6 @@ import (
batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
networkingv1 "k8s.io/api/networking/v1"
apiextv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
apiextFake "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset/fake"
apiExtScheme "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset/scheme"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
Expand All @@ -65,6 +62,7 @@ var gvrKindMap map[string]string = map[string]string{
"captenants.sme.sap.com/v1alpha1": "CAPTenant",
"capapplications.sme.sap.com/v1alpha1": "CAPApplication",
"capapplicationversions.sme.sap.com/v1alpha1": "CAPApplicationVersion",
"servicemonitors.monitoring.coreos.com/v1": "ServiceMonitor",
}

// adds fixed suffix "gen" to newly created objects with generateName
Expand Down Expand Up @@ -209,23 +207,44 @@ func getDeleteCollectionHandler[T FakeClientSetConstraint](t *testing.T, client
}
}

func initializeControllerForReconciliationTests(t *testing.T, items []ResourceAction) *Controller {
func addForDiscovery(c *k8stesting.Fake, resources []schema.GroupVersionResource) {
m := map[string][]schema.GroupVersionResource{}
for i := range resources {
r := resources[i]
gv := fmt.Sprintf("%s/%s", r.Group, r.Version)
if v, ok := m[gv]; ok {
v = append(v, r)
} else {
m[gv] = []schema.GroupVersionResource{r}
}
}
for k, v := range m {
apiResources := []metav1.APIResource{}
for i := range v {
apiResources = append(apiResources, metav1.APIResource{Name: v[i].Resource, Kind: getKindFromGVR(v[i])})
}
c.Resources = append(c.Resources, &metav1.APIResourceList{GroupVersion: k, APIResources: apiResources})
}
}

func initializeControllerForReconciliationTests(t *testing.T, items []ResourceAction, discoverResources []schema.GroupVersionResource) *Controller {
// add schemes for various client sets
smeScheme.AddToScheme(scheme.Scheme)
gardenercertscheme.AddToScheme(scheme.Scheme)
gardenerdnsscheme.AddToScheme(scheme.Scheme)
istioscheme.AddToScheme(scheme.Scheme)
certManagerScheme.AddToScheme(scheme.Scheme)
apiExtScheme.AddToScheme(scheme.Scheme)
promopScheme.AddToScheme(scheme.Scheme)

coreClient := k8sfake.NewSimpleClientset()
if len(discoverResources) > 0 {
addForDiscovery(&coreClient.Fake, discoverResources)
}
copClient := copfake.NewSimpleClientset()
istioClient := istiofake.NewSimpleClientset()
gardenerCertClient := gardenercertfake.NewSimpleClientset()
gardenerDNSClient := gardenerdnsfake.NewSimpleClientset()
certManagerClient := certManagerFake.NewSimpleClientset()
apiExtClient := apiextFake.NewSimpleClientset()
promopClient := promopFake.NewSimpleClientset()

copClient.PrependReactor("create", "*", generateNameCreateHandler)
Expand All @@ -249,7 +268,7 @@ func initializeControllerForReconciliationTests(t *testing.T, items []ResourceAc
gardenerCertClient.PrependReactor("*", "*", getErrorReactorWithResources(t, items))
gardenerCertClient.PrependReactor("delete-collection", "*", getDeleteCollectionHandler(t, gardenerDNSClient))

c := NewController(coreClient, copClient, istioClient, gardenerCertClient, certManagerClient, gardenerDNSClient, apiExtClient, promopClient)
c := NewController(coreClient, copClient, istioClient, gardenerCertClient, certManagerClient, gardenerDNSClient, promopClient)
c.eventRecorder = events.NewFakeRecorder(10)
return c
}
Expand All @@ -271,6 +290,8 @@ type TestData struct {
attempts int
// mock errors during the following resource modifications
mockErrorForResources []ResourceAction
// add resources for discovery API
discoverResources []schema.GroupVersionResource
// relevant backlog items (link for traceability)
backlogItems []string
}
Expand Down Expand Up @@ -303,7 +324,7 @@ func eventDrain(ctx context.Context, c *Controller, t *testing.T) {
func reconcileTestItem(ctx context.Context, t *testing.T, item QueueItem, data TestData) (err error) {
// run inside a test sub-context to maintain test case name with reference to backlog items
t.Run(strings.Join(append([]string{data.description}, data.backlogItems...), " "), func(t *testing.T) {
c := initializeControllerForReconciliationTests(t, data.mockErrorForResources)
c := initializeControllerForReconciliationTests(t, data.mockErrorForResources, data.discoverResources)
go eventDrain(ctx, c, t)

// load initial data
Expand Down Expand Up @@ -555,12 +576,6 @@ func addInitialObjectToStore(resource []byte, c *Controller) error {
case *v1alpha1.CAPTenantOperation:
err = c.crdInformerFactory.Sme().V1alpha1().CAPTenantOperations().Informer().GetIndexer().Add(obj)
}
case *apiextv1.CustomResourceDefinition:
fakeClient, ok := c.apiExtClient.(*apiextFake.Clientset)
if !ok {
return fmt.Errorf("controller is not using a fake clientset")
}
fakeClient.Tracker().Add(obj)
case *monv1.ServiceMonitor:
fakeClient, ok := c.promClient.(*promopFake.Clientset)
if !ok {
Expand Down
5 changes: 1 addition & 4 deletions internal/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ import (
"k8s.io/klog/v2"

promop "github.com/prometheus-operator/prometheus-operator/pkg/client/versioned"
apiext "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset"
)

type Controller struct {
Expand All @@ -45,7 +44,6 @@ type Controller struct {
gardenerCertificateClient gardenerCert.Interface
certManagerCertificateClient certManager.Interface
gardenerDNSClient gardenerDNS.Interface
apiExtClient apiext.Interface
promClient promop.Interface
kubeInformerFactory informers.SharedInformerFactory
crdInformerFactory crdInformers.SharedInformerFactory
Expand All @@ -58,7 +56,7 @@ type Controller struct {
eventRecorder events.EventRecorder
}

func NewController(client kubernetes.Interface, crdClient versioned.Interface, istioClient istio.Interface, gardenerCertificateClient gardenerCert.Interface, certManagerCertificateClient certManager.Interface, gardenerDNSClient gardenerDNS.Interface, apiExtClient apiext.Interface, promClient promop.Interface) *Controller {
func NewController(client kubernetes.Interface, crdClient versioned.Interface, istioClient istio.Interface, gardenerCertificateClient gardenerCert.Interface, certManagerCertificateClient certManager.Interface, gardenerDNSClient gardenerDNS.Interface, promClient promop.Interface) *Controller {

queues := map[int]workqueue.TypedRateLimitingInterface[QueueItem]{
ResourceCAPApplication: workqueue.NewTypedRateLimitingQueueWithConfig(workqueue.DefaultTypedControllerRateLimiter[QueueItem](), workqueue.TypedRateLimitingQueueConfig[QueueItem]{Name: KindMap[ResourceCAPApplication]}),
Expand Down Expand Up @@ -107,7 +105,6 @@ func NewController(client kubernetes.Interface, crdClient versioned.Interface, i
gardenerCertificateClient: gardenerCertificateClient,
certManagerCertificateClient: certManagerCertificateClient,
gardenerDNSClient: gardenerDNSClient,
apiExtClient: apiExtClient,
promClient: promClient,
kubeInformerFactory: kubeInformerFactory,
crdInformerFactory: crdInformerFactory,
Expand Down
1 change: 0 additions & 1 deletion internal/controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,6 @@ func TestController_processQueue(t *testing.T) {
istioClient: c.istioClient,
gardenerCertificateClient: c.gardenerCertificateClient,
gardenerDNSClient: c.gardenerDNSClient,
apiExtClient: c.apiExtClient,
promClient: c.promClient,
kubeInformerFactory: dummyKubeInformerFactory,
crdInformerFactory: c.crdInformerFactory,
Expand Down
86 changes: 43 additions & 43 deletions internal/controller/reconcile-capapplicationversion.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@ import (
batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
networkingv1 "k8s.io/api/networking/v1"
"k8s.io/apiextensions-apiserver/pkg/apihelpers"
apiextv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
k8sErrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
Expand Down Expand Up @@ -447,19 +445,20 @@ func newService(ca *v1alpha1.CAPApplication, cav *v1alpha1.CAPApplicationVersion

// #region ServiceMonitor
func (c *Controller) checkServiceMonitorCapability(ctx context.Context) error {
crdName := "servicemonitors.monitoring.coreos.com"
crd, err := c.apiExtClient.ApiextensionsV1().CustomResourceDefinitions().Get(ctx, crdName, metav1.GetOptions{})
const (
monitoringGroupVersion = "monitoring.coreos.com/v1"
resourceKind = "ServiceMonitor"
)
list, err := c.kubeClient.Discovery().ServerResourcesForGroupVersion(monitoringGroupVersion)
if err != nil {
return fmt.Errorf("could not get custom resource definition %s: %v", crdName, err)
}
requiredVersion := "v1"
if !apihelpers.HasVersionServed(crd, requiredVersion) {
return fmt.Errorf("version %s of custom resource %s is not served", requiredVersion, crdName)
return fmt.Errorf("error discovering resources for API version %s: %v", monitoringGroupVersion, err)
}
if !apihelpers.IsCRDConditionTrue(crd, apiextv1.Established) {
return fmt.Errorf("custom resource %s condition %s not true", crdName, apiextv1.Established)
for i := range list.APIResources {
if list.APIResources[i].Kind == resourceKind {
return nil // found service monitor capability
}
}
return nil
return fmt.Errorf("resource %s is not served by API version %s", resourceKind, monitoringGroupVersion)
}

func (c *Controller) updateServiceMonitors(ctx context.Context, ca *v1alpha1.CAPApplication, cav *v1alpha1.CAPApplicationVersion, workloadServicePortInfos []servicePortInfo) error {
Expand All @@ -468,46 +467,47 @@ func (c *Controller) updateServiceMonitors(ctx context.Context, ca *v1alpha1.CAP
return nil
}

isWorkloadPort := func(wlPorts []corev1.ServicePort, scrapePort string) bool {
for j := range wlPorts {
if wlPorts[j].Name == scrapePort {
return true
}
}
return false
}

for i := range cav.Spec.Workloads {
wl := cav.Spec.Workloads[i]
if wl.DeploymentDefinition == nil || wl.DeploymentDefinition.Monitoring == nil || wl.DeploymentDefinition.Monitoring.ScrapeConfig == nil {
continue // do not reconcile service monitors
if err := c.reconcileWorkloadServiceMonitor(ctx, &wl, cav, workloadServicePortInfos, ca); err != nil {
return err
}
}

wlPortInfos := getServicePortInfoByWorkloadName(workloadServicePortInfos, cav.Name, wl.Name)
if wlPortInfos == nil {
return fmt.Errorf("could not identify workload port information for workload %s in version %s", wl.Name, cav.Name)
}
return nil
}

if portVerified := isWorkloadPort(wlPortInfos.Ports, wl.DeploymentDefinition.Monitoring.ScrapeConfig.WorkloadPort); !portVerified {
return fmt.Errorf("invalid port reference in workload %s monitoring config of version %s", wl.Name, cav.Name)
}
func (c *Controller) reconcileWorkloadServiceMonitor(ctx context.Context, wl *v1alpha1.WorkloadDetails, cav *v1alpha1.CAPApplicationVersion, workloadServicePortInfos []servicePortInfo, ca *v1alpha1.CAPApplication) error {
if wl.DeploymentDefinition == nil || wl.DeploymentDefinition.Monitoring == nil || wl.DeploymentDefinition.Monitoring.ScrapeConfig == nil {
return nil // do not reconcile service monitors
}

sm, err := c.promClient.MonitoringV1().ServiceMonitors(cav.Namespace).Get(ctx, wlPortInfos.WorkloadName+ServiceSuffix, metav1.GetOptions{})
if err != nil {
if k8sErrors.IsNotFound(err) {
sm, err = c.promClient.MonitoringV1().ServiceMonitors(cav.Namespace).Create(ctx, newServiceMonitor(ca, cav, &wl, wlPortInfos), metav1.CreateOptions{})
if err == nil {
util.LogInfo("Service monitor created successfully", string(Processing), cav, sm, "version", cav.Spec.Version)
}
}
}
err = doChecks(err, sm, cav, wlPortInfos.WorkloadName+ServiceSuffix)
if err != nil {
return err
wlPortInfos := getServicePortInfoByWorkloadName(workloadServicePortInfos, cav.Name, wl.Name)
if wlPortInfos == nil {
return fmt.Errorf("could not identify workload port information for workload %s in version %s", wl.Name, cav.Name)
}

if portVerified := isWorkloadPort(wlPortInfos.Ports, wl.DeploymentDefinition.Monitoring.ScrapeConfig.WorkloadPort); !portVerified {
return fmt.Errorf("invalid port reference in workload %s monitoring config of version %s", wl.Name, cav.Name)
}

sm, err := c.promClient.MonitoringV1().ServiceMonitors(cav.Namespace).Get(ctx, wlPortInfos.WorkloadName+ServiceSuffix, metav1.GetOptions{})
if err != nil && k8sErrors.IsNotFound(err) {
sm, err = c.promClient.MonitoringV1().ServiceMonitors(cav.Namespace).Create(ctx, newServiceMonitor(ca, cav, wl, wlPortInfos), metav1.CreateOptions{})
if err == nil {
util.LogInfo("Service monitor created successfully", string(Processing), cav, sm, "version", cav.Spec.Version)
}
}
return doChecks(err, sm, cav, wlPortInfos.WorkloadName+ServiceSuffix)
}

return nil
func isWorkloadPort(wlPorts []corev1.ServicePort, scrapePort string) bool {
for j := range wlPorts {
if wlPorts[j].Name == scrapePort {
return true
}
}
return false
}

func newServiceMonitor(ca *v1alpha1.CAPApplication, cav *v1alpha1.CAPApplicationVersion, wl *v1alpha1.WorkloadDetails, wlPortInfos *servicePortInfo) *monv1.ServiceMonitor {
Expand Down
11 changes: 6 additions & 5 deletions internal/controller/reconcile-capapplicationversion_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import (
"context"
"fmt"
"testing"

"k8s.io/apimachinery/pkg/runtime/schema"
)

func TestCAV_WithoutCAPApplicationAndVersion(t *testing.T) {
Expand Down Expand Up @@ -811,15 +813,14 @@ func TestCAV_ServiceMonitorCreation(t *testing.T) {
TestData{
description: "capapplication version - service monitor creation",
initialResources: []string{
"testdata/common/crd-servicemonitors.yaml",
"testdata/common/capapplication.yaml",
"testdata/common/credential-secrets.yaml",
"testdata/version-monitoring/cav-v1-deletion-rules-processing.yaml",
"testdata/capapplicationversion/deployments-ready.yaml",
"testdata/capapplicationversion/content-job-completed.yaml",
},
discoverResources: []schema.GroupVersionResource{{Resource: "servicemonitors", Group: "monitoring.coreos.com", Version: "v1"}},
expectedResources: "testdata/version-monitoring/servicemonitors-cav-v1.yaml",
backlogItems: []string{},
},
)
}
Expand All @@ -831,15 +832,15 @@ func TestCAV_InvalidMonitoringConfig(t *testing.T) {
TestData{
description: "capapplication version - service monitor creation",
initialResources: []string{
"testdata/common/crd-servicemonitors.yaml",
"testdata/common/capapplication.yaml",
"testdata/common/credential-secrets.yaml",
"testdata/version-monitoring/cav-v1-monitoring-port-missing.yaml",
"testdata/capapplicationversion/deployments-ready.yaml",
"testdata/capapplicationversion/content-job-completed.yaml",
},
expectError: true,
backlogItems: []string{},
discoverResources: []schema.GroupVersionResource{{Resource: "servicemonitors", Group: "monitoring.coreos.com", Version: "v1"}},
expectError: true,
backlogItems: []string{},
},
)
if err == nil || err.Error() != fmt.Sprintf("invalid port reference in workload %s monitoring config of version %s", "app-router", "test-cap-01-cav-v1") {
Expand Down
5 changes: 1 addition & 4 deletions internal/controller/reconcile_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ import (
"github.com/sap/cap-operator/pkg/client/clientset/versioned/fake"
istionwv1beta1 "istio.io/client-go/pkg/apis/networking/v1beta1"
istiofake "istio.io/client-go/pkg/clientset/versioned/fake"
apiextFake "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset/fake"
)

const (
Expand Down Expand Up @@ -333,8 +332,6 @@ func getTestController(resources testResources) *Controller {

crdClient := fake.NewSimpleClientset(crdObjects...)

apiExtClient := apiextFake.NewSimpleClientset()

promopClient := promopFake.NewSimpleClientset()

istioClient := istiofake.NewSimpleClientset(istioObjects...)
Expand All @@ -345,7 +342,7 @@ func getTestController(resources testResources) *Controller {

dnsClient := dnsfake.NewSimpleClientset(dnsObjects...)

c := NewController(coreClient, crdClient, istioClient, certClient, certManagerCertClient, dnsClient, apiExtClient, promopClient)
c := NewController(coreClient, crdClient, istioClient, certClient, certManagerCertClient, dnsClient, promopClient)

for _, ca := range resources.cas {
if ca != nil {
Expand Down
Loading

0 comments on commit 49e0e2e

Please sign in to comment.