diff --git a/cmd/exporter/main.go b/cmd/exporter/main.go index fd069c4f..3ed2001c 100644 --- a/cmd/exporter/main.go +++ b/cmd/exporter/main.go @@ -18,8 +18,11 @@ package main import ( "net/http" + "time" + "k8s.io/apimachinery/pkg/util/runtime" _ "k8s.io/client-go/plugin/pkg/client/auth" + "k8s.io/client-go/util/workqueue" "github.com/prometheus/client_golang/prometheus/promhttp" "github.com/sirupsen/logrus" @@ -39,15 +42,21 @@ func init() { } func main() { - clients := clients.ClientSets{} + stop := make(chan struct{}) + defer close(stop) + defer runtime.HandleCrash() + + wq := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()) + defer wq.ShutDown() //Getting kubeConfig and Generate ClientSets - if err := clients.GenerateClientSetFromKubeConfig(); err != nil { + clientset, err := clients.NewClientSet(stop, 5*time.Minute, wq) + if err != nil { log.Fatalf("Unable to Get the kubeconfig, err: %v", err) } // Trigger the chaos metrics collection - go controller.Exporter(clients) + go controller.Exporter(clientset, wq) //This section will start the HTTP server and expose metrics on the /metrics endpoint. http.Handle("/metrics", promhttp.Handler()) diff --git a/controller/collect-data.go b/controller/collect-data.go index 671017dd..8453de6e 100644 --- a/controller/collect-data.go +++ b/controller/collect-data.go @@ -1,7 +1,6 @@ package controller import ( - "context" "math" "strconv" "strings" @@ -12,7 +11,7 @@ import ( litmuschaosv1alpha1 "github.com/litmuschaos/chaos-operator/api/litmuschaos/v1alpha1" corev1 "k8s.io/api/core/v1" k8serrors "k8s.io/apimachinery/pkg/api/errors" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" clientTypes "k8s.io/apimachinery/pkg/types" ) @@ -20,7 +19,7 @@ import ( // ResultCollector interface for the both functions GetResultList and getExperimentMetricsFromResult type ResultCollector interface { - GetResultList(clients clients.ClientSets, chaosNamespace string, monitoringEnabled *MonitoringEnabled) (litmuschaosv1alpha1.ChaosResultList, error) + GetResultList(clients clients.ClientSets, chaosNamespace string, monitoringEnabled *MonitoringEnabled) ([]*v1alpha1.ChaosResult, error) GetExperimentMetricsFromResult(chaosResult *litmuschaosv1alpha1.ChaosResult, clients clients.ClientSets) (bool, error) SetResultDetails() GetResultDetails() ChaosResultDetails @@ -30,20 +29,20 @@ type ResultDetails struct { } // GetResultList return the result list correspond to the monitoring enabled chaosengine -func (r *ResultDetails) GetResultList(clients clients.ClientSets, chaosNamespace string, monitoringEnabled *MonitoringEnabled) (litmuschaosv1alpha1.ChaosResultList, error) { +func (r *ResultDetails) GetResultList(clients clients.ClientSets, chaosNamespace string, monitoringEnabled *MonitoringEnabled) ([]*v1alpha1.ChaosResult, error) { - chaosResultList, err := clients.LitmusClient.LitmuschaosV1alpha1().ChaosResults(chaosNamespace).List(context.Background(), metav1.ListOptions{}) + chaosResultList, err := clients.ResultInformer.ChaosResults(chaosNamespace).List(labels.Everything()) if err != nil { - return litmuschaosv1alpha1.ChaosResultList{}, err + return nil, err } // waiting until any chaosresult found - if len(chaosResultList.Items) == 0 { + if len(chaosResultList) == 0 { if monitoringEnabled.IsChaosResultsAvailable { monitoringEnabled.IsChaosResultsAvailable = false log.Warnf("No chaosresult found!") log.Info("[Wait]: Waiting for the chaosresult ... ") } - return litmuschaosv1alpha1.ChaosResultList{}, nil + return nil, nil } if !monitoringEnabled.IsChaosResultsAvailable { @@ -51,7 +50,7 @@ func (r *ResultDetails) GetResultList(clients clients.ClientSets, chaosNamespace monitoringEnabled.IsChaosResultsAvailable = true } - return *chaosResultList, nil + return chaosResultList, nil } // GetExperimentMetricsFromResult derive all the metrics data from the chaosresult and set into resultDetails struct @@ -61,7 +60,7 @@ func (r *ResultDetails) GetExperimentMetricsFromResult(chaosResult *litmuschaosv if err != nil { return false, err } - engine, err := clients.LitmusClient.LitmuschaosV1alpha1().ChaosEngines(chaosResult.Namespace).Get(context.Background(), chaosResult.Spec.EngineName, metav1.GetOptions{}) + engine, err := clients.EngineInformer.ChaosEngines(chaosResult.Namespace).Get(chaosResult.Spec.EngineName) if err != nil { // k8serrors.IsNotFound(err) checking k8s resource is found or not, // It will skip this result if k8s resource is not found. @@ -267,14 +266,14 @@ func getProbeSuccessPercentage(chaosResult *litmuschaosv1alpha1.ChaosResult) (fl // getEventsForSpecificInvolvedResource derive all the events correspond to the specific resource func getEventsForSpecificInvolvedResource(clients clients.ClientSets, resourceUID clientTypes.UID, chaosNamespace string) (corev1.EventList, error) { finalEventList := corev1.EventList{} - eventsList, err := clients.KubeClient.CoreV1().Events(chaosNamespace).List(context.Background(), metav1.ListOptions{}) + eventsList, err := clients.EventsInformer.Events(chaosNamespace).List(labels.Everything()) if err != nil { return corev1.EventList{}, err } - for _, event := range eventsList.Items { - if event.InvolvedObject.UID == resourceUID { - finalEventList.Items = append(finalEventList.Items, event) + for _, event := range eventsList { + if event != nil && event.InvolvedObject.UID == resourceUID { + finalEventList.Items = append(finalEventList.Items, *event) } } return finalEventList, nil diff --git a/controller/collect-data_test.go b/controller/collect-data_test.go index 94553475..f14f4f57 100644 --- a/controller/collect-data_test.go +++ b/controller/collect-data_test.go @@ -2,6 +2,8 @@ package controller_test import ( "context" + "testing" + "github.com/litmuschaos/chaos-exporter/controller" "github.com/litmuschaos/chaos-exporter/pkg/clients" "github.com/litmuschaos/chaos-operator/api/litmuschaos/v1alpha1" @@ -12,7 +14,7 @@ import ( "k8s.io/apimachinery/pkg/runtime" "k8s.io/client-go/kubernetes/fake" _ "k8s.io/client-go/plugin/pkg/client/auth/gcp" - "testing" + "k8s.io/client-go/util/workqueue" ) func TestGetResultList(t *testing.T) { @@ -183,5 +185,8 @@ func CreateFakeClient(t *testing.T) clients.ClientSets { cs := clients.ClientSets{} cs.KubeClient = fake.NewSimpleClientset([]runtime.Object{}...) cs.LitmusClient = litmusFakeClientSet.NewSimpleClientset([]runtime.Object{}...) + stopCh := make(chan struct{}) + err := cs.SetupInformers(stopCh, cs.KubeClient, cs.LitmusClient, 0, workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())) + require.NoError(t, err) return cs } diff --git a/controller/controller.go b/controller/controller.go index 88666ad9..1bf610d7 100644 --- a/controller/controller.go +++ b/controller/controller.go @@ -17,16 +17,15 @@ limitations under the License. package controller import ( - "time" - "github.com/litmuschaos/chaos-exporter/pkg/clients" "github.com/litmuschaos/chaos-exporter/pkg/log" litmuschaosv1alpha1 "github.com/litmuschaos/chaos-operator/api/litmuschaos/v1alpha1" "github.com/prometheus/client_golang/prometheus" + "k8s.io/client-go/util/workqueue" ) // Exporter continuously collects the chaos metrics for a given chaosengine -func Exporter(clients clients.ClientSets) { +func Exporter(clientSet clients.ClientSets, wq workqueue.RateLimitingInterface) { log.Info("Started creating Metrics") // Register the fixed (count) chaos metrics log.Info("Registering Fixed Metrics") @@ -35,7 +34,7 @@ func Exporter(clients clients.ClientSets) { ResultCollector: &ResultDetails{}, } //gaugeMetrics := GaugeMetrics{} - overallChaosResults := litmuschaosv1alpha1.ChaosResultList{} + overallChaosResults := []*litmuschaosv1alpha1.ChaosResult{} r.GaugeMetrics.InitializeGaugeMetrics(). RegisterFixedMetrics() @@ -45,11 +44,18 @@ func Exporter(clients clients.ClientSets) { IsChaosEnginesAvailable: true, } - for { - if err := r.GetLitmusChaosMetrics(clients, &overallChaosResults, &monitoringEnabled); err != nil { + // refresh metrics whenever there's a change in chaosengine or chaosresult + // or every informer resync duration, whichever is earlier + for _, done := wq.Get(); !done; _, done = wq.Get() { + needRequeue, err := r.GetLitmusChaosMetrics(clientSet, &overallChaosResults, &monitoringEnabled) + if err != nil { log.Errorf("err: %v", err) } - time.Sleep(1000 * time.Millisecond) + wq.Done(clients.ProcessKey) + // Add after + if needRequeue != nil { + wq.AddAfter(clients.ProcessKey, *needRequeue) + } } } diff --git a/controller/handle-result-deletion.go b/controller/handle-result-deletion.go index 2b7a1501..f92663c9 100644 --- a/controller/handle-result-deletion.go +++ b/controller/handle-result-deletion.go @@ -4,15 +4,16 @@ import ( "fmt" "os" "strconv" + "time" litmuschaosv1alpha1 "github.com/litmuschaos/chaos-operator/api/litmuschaos/v1alpha1" ) // unsetDeletedChaosResults unset the metrics correspond to deleted chaosresults -func (gaugeMetrics *GaugeMetrics) unsetDeletedChaosResults(oldChaosResults, newChaosResults *litmuschaosv1alpha1.ChaosResultList) { - for _, oldResult := range oldChaosResults.Items { +func (gaugeMetrics *GaugeMetrics) unsetDeletedChaosResults(oldChaosResults, newChaosResults []*litmuschaosv1alpha1.ChaosResult) { + for _, oldResult := range oldChaosResults { found := false - for _, newResult := range newChaosResults.Items { + for _, newResult := range newChaosResults { if oldResult.UID == newResult.UID { found = true break @@ -22,7 +23,7 @@ func (gaugeMetrics *GaugeMetrics) unsetDeletedChaosResults(oldChaosResults, newC if !found { for _, value := range resultStore[string(oldResult.UID)] { - probeSuccesPercentage, _ := getProbeSuccessPercentage(&oldResult) + probeSuccesPercentage, _ := getProbeSuccessPercentage(oldResult) resultDetails := initialiseResult(). setName(oldResult.Name). setNamespace(oldResult.Namespace). @@ -46,10 +47,13 @@ func (gaugeMetrics *GaugeMetrics) unsetDeletedChaosResults(oldChaosResults, newC // unsetOutdatedMetrics unset the metrics when chaosresult verdict changes // if same chaosresult is continuously repeated more than scrape interval then it sets the metrics value to 0 -func (gaugeMetrics *GaugeMetrics) unsetOutdatedMetrics(resultDetails ChaosResultDetails) float64 { +func (gaugeMetrics *GaugeMetrics) unsetOutdatedMetrics(resultDetails ChaosResultDetails) (float64, *time.Duration) { scrapeTime, _ := strconv.Atoi(getEnv("TSDB_SCRAPE_INTERVAL", "10")) result, ok := matchVerdict[string(resultDetails.UID)] reset := false + var needRequeue *time.Duration + + scrapeDuration := time.Duration(scrapeTime) * time.Second switch ok { case true: @@ -58,20 +62,23 @@ func (gaugeMetrics *GaugeMetrics) unsetOutdatedMetrics(resultDetails ChaosResult case result.Verdict != resultDetails.Verdict: gaugeMetrics.ResultVerdict.DeleteLabelValues(resultDetails.Namespace, resultDetails.Name, resultDetails.ChaosEngineName, resultDetails.ChaosEngineContext, result.Verdict, fmt.Sprintf("%f", result.ProbeSuccessPercentage), resultDetails.AppLabel, - resultDetails.AppNs, resultDetails.AppKind, resultDetails.WorkflowName, result.FaultName) - result.Count = 1 + resultDetails.AppNs, resultDetails.AppKind, resultDetails.WorkflowName, resultDetails.FaultName) + result.Timer = time.Now() + needRequeue = &scrapeDuration default: // if time passed scrape time then reset the value to 0 - if result.Count >= scrapeTime { + if time.Since(result.Timer) >= scrapeDuration { reset = true } else { - result.Count++ + scrapeDuration = scrapeDuration - time.Since(result.Timer) + needRequeue = &scrapeDuration } } default: result = initialiseResultData(). - setCount(1). + setTimer(time.Now()). setVerdictReset(false) + needRequeue = &scrapeDuration } // update the values inside matchVerdict @@ -80,9 +87,9 @@ func (gaugeMetrics *GaugeMetrics) unsetOutdatedMetrics(resultDetails ChaosResult setVerdictReset(reset) if reset { - return float64(0) + return float64(0), needRequeue } - return float64(1) + return float64(1), needRequeue } // getEnv derived the ENVs and sets the default value if env contains empty value @@ -105,7 +112,7 @@ func (resultDetails *ChaosResultDetails) setResultData() { setAppLabel(resultDetails.AppLabel). setVerdict(resultDetails.Verdict). setFaultName(resultDetails.FaultName). - setCount(0). + setTimer(time.Now()). setVerdictReset(false). setProbeSuccesPercentage(resultDetails.ProbeSuccessPercentage) @@ -164,8 +171,8 @@ func (resultData *ResultData) setFaultName(fault string) *ResultData { } // setCount sets the count inside resultData struct -func (resultData *ResultData) setCount(count int) *ResultData { - resultData.Count = count +func (resultData *ResultData) setTimer(timer time.Time) *ResultData { + resultData.Timer = timer return resultData } diff --git a/controller/handle-result-deletion_test.go b/controller/handle-result-deletion_test.go index f5e05567..7145b578 100644 --- a/controller/handle-result-deletion_test.go +++ b/controller/handle-result-deletion_test.go @@ -2,10 +2,11 @@ package controller import ( "errors" + "testing" + "github.com/litmuschaos/chaos-operator/api/litmuschaos/v1alpha1" "github.com/stretchr/testify/require" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "testing" ) func Test_unsetDeletedChaosResults(t *testing.T) { @@ -15,8 +16,8 @@ func Test_unsetDeletedChaosResults(t *testing.T) { execFunc func(details *ChaosResultDetails) isErr bool resultDetails *ChaosResultDetails - oldChaosResult *v1alpha1.ChaosResultList - newChaosResult *v1alpha1.ChaosResultList + oldChaosResult []*v1alpha1.ChaosResult + newChaosResult []*v1alpha1.ChaosResult }{ { name: "success: deleted chaosResult", @@ -26,21 +27,17 @@ func Test_unsetDeletedChaosResults(t *testing.T) { resultDetails: &ChaosResultDetails{ UID: "FAKE-UID-OLD", }, - oldChaosResult: &v1alpha1.ChaosResultList{ - Items: []v1alpha1.ChaosResult{ - { - ObjectMeta: metav1.ObjectMeta{ - UID: "FAKE-UID-OLD", - }, + oldChaosResult: []*v1alpha1.ChaosResult{ + { + ObjectMeta: metav1.ObjectMeta{ + UID: "FAKE-UID-OLD", }, }, }, - newChaosResult: &v1alpha1.ChaosResultList{ - Items: []v1alpha1.ChaosResult{ - { - ObjectMeta: metav1.ObjectMeta{ - UID: "FAKE-UID-NEW", - }, + newChaosResult: []*v1alpha1.ChaosResult{ + { + ObjectMeta: metav1.ObjectMeta{ + UID: "FAKE-UID-NEW", }, }, }, diff --git a/controller/mocks/mock_collect-data.go b/controller/mocks/mock_collect-data.go index 1ebf59ef..d7c2bf51 100644 --- a/controller/mocks/mock_collect-data.go +++ b/controller/mocks/mock_collect-data.go @@ -66,10 +66,10 @@ func (mr *MockResultCollectorMockRecorder) GetResultDetails() *gomock.Call { } // GetResultList mocks base method. -func (m *MockResultCollector) GetResultList(arg0 clients.ClientSets, arg1 string, arg2 *controller.MonitoringEnabled) (v1alpha1.ChaosResultList, error) { +func (m *MockResultCollector) GetResultList(arg0 clients.ClientSets, arg1 string, arg2 *controller.MonitoringEnabled) ([]*v1alpha1.ChaosResult, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "GetResultList", arg0, arg1, arg2) - ret0, _ := ret[0].(v1alpha1.ChaosResultList) + ret0, _ := ret[0].([]*v1alpha1.ChaosResult) ret1, _ := ret[1].(error) return ret0, ret1 } diff --git a/controller/scrap_test.go b/controller/scrap_test.go index 1a44c5ab..9fa94955 100644 --- a/controller/scrap_test.go +++ b/controller/scrap_test.go @@ -1,14 +1,15 @@ package controller_test import ( + "testing" + "github.com/golang/mock/gomock" "github.com/litmuschaos/chaos-exporter/controller" "github.com/litmuschaos/chaos-exporter/controller/mocks" - v1alpha1 "github.com/litmuschaos/chaos-operator/api/litmuschaos/v1alpha1" + "github.com/litmuschaos/chaos-operator/api/litmuschaos/v1alpha1" "github.com/pkg/errors" "github.com/stretchr/testify/require" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "testing" ) func TestGetLitmusChaosMetrics(t *testing.T) { @@ -27,18 +28,16 @@ func TestGetLitmusChaosMetrics(t *testing.T) { execFunc func() isErr bool monitoring *controller.MonitoringEnabled - overallChaosResult *v1alpha1.ChaosResultList + overallChaosResult []*v1alpha1.ChaosResult }{ { name: "success", execFunc: func() { mockCollectData.EXPECT().GetResultList(gomock.Any(), gomock.Any(), gomock.Any()). - Return(v1alpha1.ChaosResultList{ - Items: []v1alpha1.ChaosResult{ - { - ObjectMeta: metav1.ObjectMeta{ - Name: "chaosresult-1", - }, + Return([]*v1alpha1.ChaosResult{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "chaosresult-1", }, }, }, nil).Times(1) @@ -48,12 +47,10 @@ func TestGetLitmusChaosMetrics(t *testing.T) { UID: "FAKE-UID", }).Times(1) }, - overallChaosResult: &v1alpha1.ChaosResultList{ - Items: []v1alpha1.ChaosResult{ - { - ObjectMeta: metav1.ObjectMeta{ - Name: "chaosresult-1", - }, + overallChaosResult: []*v1alpha1.ChaosResult{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "chaosresult-1", }, }, }, @@ -64,9 +61,9 @@ func TestGetLitmusChaosMetrics(t *testing.T) { name: "failure: no ChaosResultList found", execFunc: func() { mockCollectData.EXPECT().GetResultList(gomock.Any(), gomock.Any(), gomock.Any()). - Return(v1alpha1.ChaosResultList{}, errors.New("Fake Error")).Times(1) + Return([]*v1alpha1.ChaosResult{}, errors.New("Fake Error")).Times(1) }, - overallChaosResult: &v1alpha1.ChaosResultList{}, + overallChaosResult: []*v1alpha1.ChaosResult{}, monitoring: &controller.MonitoringEnabled{}, isErr: true, }, @@ -76,7 +73,7 @@ func TestGetLitmusChaosMetrics(t *testing.T) { tt.execFunc() client := CreateFakeClient(t) - err := r.GetLitmusChaosMetrics(client, tt.overallChaosResult, tt.monitoring) + _, err := r.GetLitmusChaosMetrics(client, &tt.overallChaosResult, tt.monitoring) if tt.isErr { require.Error(t, err) return diff --git a/controller/scrape.go b/controller/scrape.go index 502055a5..35645925 100644 --- a/controller/scrape.go +++ b/controller/scrape.go @@ -20,6 +20,7 @@ import ( "fmt" "os" "strings" + "time" "github.com/aws/aws-sdk-go/aws/session" "github.com/aws/aws-sdk-go/service/cloudwatch" @@ -34,7 +35,7 @@ import ( var err error // GetLitmusChaosMetrics derive and send the chaos metrics -func (m *MetricesCollecter) GetLitmusChaosMetrics(clients clients.ClientSets, overallChaosResults *litmuschaosv1alpha1.ChaosResultList, monitoringEnabled *MonitoringEnabled) error { +func (m *MetricesCollecter) GetLitmusChaosMetrics(clients clients.ClientSets, overallChaosResults *[]*litmuschaosv1alpha1.ChaosResult, monitoringEnabled *MonitoringEnabled) (*time.Duration, error) { engineCount := 0 // initialising the parameters for the namespaced scope metrics @@ -55,24 +56,26 @@ func (m *MetricesCollecter) GetLitmusChaosMetrics(clients clients.ClientSets, ov // Getting list of all the chaosresults for the monitoring resultList, err := m.ResultCollector.GetResultList(clients, watchNamespace, monitoringEnabled) if err != nil { - return err + return nil, err } // unset the metrics correspond to deleted chaosresults - m.GaugeMetrics.unsetDeletedChaosResults(overallChaosResults, &resultList) + m.GaugeMetrics.unsetDeletedChaosResults(*overallChaosResults, resultList) // updating the overall chaosresults items to latest - overallChaosResults.Items = resultList.Items + *overallChaosResults = resultList + + var needRequeue *time.Duration // iterating over all chaosresults and derive all the metrics data it generates metrics per chaosresult // and aggregate metrics of all results present inside chaos namespace, if chaos namespace is defined // otherwise it derive metrics for all chaosresults present inside cluster - for _, chaosresult := range resultList.Items { + for _, chaosresult := range resultList { m.ResultCollector.SetResultDetails() // deriving metrics data from the chaosresult - skip, err := m.ResultCollector.GetExperimentMetricsFromResult(&chaosresult, clients) + skip, err := m.ResultCollector.GetExperimentMetricsFromResult(chaosresult, clients) resultDetails := m.ResultCollector.GetResultDetails() if err != nil { - return err + return nil, err } // generating the aggeregate metrics from per chaosresult metric namespacedScopeMetrics.AwaitedExperiments += resultDetails.AwaitedExperiments @@ -105,7 +108,10 @@ func (m *MetricesCollecter) GetLitmusChaosMetrics(clients clients.ClientSets, ov }) // setting chaosresult metrics for the given chaosresult - verdictValue := m.GaugeMetrics.unsetOutdatedMetrics(resultDetails) + verdictValue, requeue := m.GaugeMetrics.unsetOutdatedMetrics(resultDetails) + if requeue != nil { + needRequeue = requeue + } m.GaugeMetrics.setResultChaosMetrics(resultDetails, verdictValue) // setting chaosresult aws metrics for the given chaosresult, which can be used for cloudwatch if awsConfig.Namespace != "" && awsConfig.ClusterName != "" && awsConfig.Service != "" { @@ -128,7 +134,7 @@ func (m *MetricesCollecter) GetLitmusChaosMetrics(clients clients.ClientSets, ov if awsConfig.Namespace != "" && awsConfig.ClusterName != "" && awsConfig.Service != "" { awsConfig.setAwsNamespacedChaosMetrics(namespacedScopeMetrics) } - return nil + return needRequeue, nil } // setNamespacedChaosMetrics sets metrics for the all chaosresults diff --git a/controller/types.go b/controller/types.go index 86a509ae..b592ea4d 100644 --- a/controller/types.go +++ b/controller/types.go @@ -17,6 +17,8 @@ limitations under the License. package controller import ( + "time" + "github.com/prometheus/client_golang/prometheus" clientTypes "k8s.io/apimachinery/pkg/types" ) @@ -38,7 +40,7 @@ type ResultData struct { AppNs string AppLabel string Verdict string - Count int + Timer time.Time VerdictReset bool ProbeSuccessPercentage float64 FaultName string diff --git a/go.mod b/go.mod index f54664f3..dfa675a3 100644 --- a/go.mod +++ b/go.mod @@ -13,7 +13,7 @@ require ( github.com/pkg/errors v0.9.1 github.com/prometheus/client_golang v1.12.1 github.com/sirupsen/logrus v1.9.3 - github.com/stretchr/testify v1.7.0 + github.com/stretchr/testify v1.9.0 k8s.io/api v0.26.0 k8s.io/apimachinery v0.26.0 k8s.io/client-go v12.0.0+incompatible @@ -39,6 +39,7 @@ require ( github.com/google/go-cmp v0.5.6 // indirect github.com/google/gofuzz v1.1.0 // indirect github.com/googleapis/gnostic v0.5.5 // indirect + github.com/hashicorp/golang-lru v0.5.3 // indirect github.com/imdario/mergo v0.3.12 // indirect github.com/jmespath/go-jmespath v0.4.0 // indirect github.com/json-iterator/go v1.1.12 // indirect diff --git a/go.sum b/go.sum index 74b8d9b3..e6adb637 100644 --- a/go.sum +++ b/go.sum @@ -177,6 +177,7 @@ github.com/golang/groupcache v0.0.0-20190129154638-5b532d6fd5ef/go.mod h1:cIg4er github.com/golang/groupcache v0.0.0-20190702054246-869f871628b6/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= github.com/golang/groupcache v0.0.0-20191227052852-215e87163ea7/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= github.com/golang/groupcache v0.0.0-20200121045136-8c9f03a8e57e/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= +github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da h1:oI5xCqsCo564l8iNU+DwB5epxmsaqB+rhGL0m5jtYqE= github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A= github.com/golang/mock v1.2.0/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A= github.com/golang/mock v1.3.1/go.mod h1:sBzyDLLjw3U8JLTeZvSv8jJB+tU5PVekmnlKIyFUx0Y= @@ -242,6 +243,7 @@ github.com/google/pprof v0.0.0-20210601050228-01bbb1931b22/go.mod h1:kpwsk12EmLe github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI= github.com/google/uuid v1.0.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/google/uuid v1.1.2 h1:EVhdT+1Kseyi1/pUmXKaFxYsDNy9RQYkMWRH68J/W7Y= github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/googleapis/gax-go/v2 v2.0.4/go.mod h1:0Wqv26UfaUD9n4G6kQubkQ+KchISgw+vpHVxEJEs9eg= github.com/googleapis/gax-go/v2 v2.0.5/go.mod h1:DWXyrwAJ9X0FpwwEdw+IPEYBICEFu5mhpdKc/us6bOk= @@ -273,6 +275,8 @@ github.com/hashicorp/go-uuid v1.0.1/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/b github.com/hashicorp/go.net v0.0.1/go.mod h1:hjKkEWcCURg++eb33jQU7oqQcI9XDCnUzHA0oac0k90= github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= +github.com/hashicorp/golang-lru v0.5.3 h1:YPkqC67at8FYaadspW/6uE0COsBxS2656RLEr8Bppgk= +github.com/hashicorp/golang-lru v0.5.3/go.mod h1:iADmTwqILo4mZ8BN3D2Q6+9jd8WM5uGBxy+E8yxSoD4= github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ= github.com/hashicorp/logutils v1.0.0/go.mod h1:QIAnNjmIWmVIIkWDTG1z5v++HQmx9WQRO+LraFDTW64= github.com/hashicorp/mdns v1.0.0/go.mod h1:tL+uN++7HEJ6SQLQ2/p+z2pH24WQKWjBPkE0mNTz8vQ= @@ -451,8 +455,9 @@ github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UV github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA= github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= -github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= +github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= github.com/subosito/gotenv v1.2.0/go.mod h1:N0PQaV/YGNqwC0u51sEeR/aUtSLEXKX9iv69rRypqCw= github.com/tmc/grpc-websocket-proxy v0.0.0-20170815181823-89b8d40f7ca8/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U= github.com/tmc/grpc-websocket-proxy v0.0.0-20190109142713-0ad062ec5ee5/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U= diff --git a/pkg/clients/clientset.go b/pkg/clients/clientset.go index d544f4a1..ea75b640 100644 --- a/pkg/clients/clientset.go +++ b/pkg/clients/clientset.go @@ -2,38 +2,120 @@ package clients import ( "flag" + "fmt" + "os" + "time" + clientv1alpha1 "github.com/litmuschaos/chaos-operator/pkg/client/clientset/versioned" + litmusInformer "github.com/litmuschaos/chaos-operator/pkg/client/informers/externalversions" + "github.com/litmuschaos/chaos-operator/pkg/client/listers/litmuschaos/v1alpha1" "github.com/pkg/errors" + "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes" + v1 "k8s.io/client-go/listers/core/v1" "k8s.io/client-go/rest" + "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/clientcmd" + "k8s.io/client-go/util/workqueue" ) // ClientSets is a collection of clientSets and kubeConfig needed type ClientSets struct { - KubeClient kubernetes.Interface - LitmusClient clientv1alpha1.Interface - KubeConfig *rest.Config + KubeClient kubernetes.Interface + EventsInformer v1.EventLister + EngineInformer v1alpha1.ChaosEngineLister + ResultInformer v1alpha1.ChaosResultLister + LitmusClient clientv1alpha1.Interface + KubeConfig *rest.Config } -// GenerateClientSetFromKubeConfig will generation both ClientSets (k8s, and Litmus) as well as the KubeConfig -func (clientSets *ClientSets) GenerateClientSetFromKubeConfig() error { +const ( + ProcessKey = "process" +) + +// NewClientSet will generation both ClientSets (k8s, and Litmus) as well as the KubeConfig +func NewClientSet(stopCh <-chan struct{}, resyncDuration time.Duration, wq workqueue.RateLimitingInterface) (ClientSets, error) { config, err := getKubeConfig() if err != nil { - return err + return ClientSets{}, err } + k8sClientSet, err := GenerateK8sClientSet(config) if err != nil { - return err + return ClientSets{}, err } + litmusClientSet, err := GenerateLitmusClientSet(config) if err != nil { - return err + return ClientSets{}, err } + + clientSets := ClientSets{} clientSets.KubeClient = k8sClientSet clientSets.LitmusClient = litmusClientSet clientSets.KubeConfig = config + + if err := clientSets.SetupInformers(stopCh, k8sClientSet, litmusClientSet, resyncDuration, wq); err != nil { + return ClientSets{}, err + } + return clientSets, nil +} + +func (clientSets *ClientSets) SetupInformers(stopCh <-chan struct{}, k8sClientSet kubernetes.Interface, litmusClientSet clientv1alpha1.Interface, resyncDuration time.Duration, wq workqueue.RateLimitingInterface) error { + watchNamespace := os.Getenv("WATCH_NAMESPACE") + var ( + factory informers.SharedInformerFactory + litmusFactory litmusInformer.SharedInformerFactory + ) + if watchNamespace == "" { + factory = informers.NewSharedInformerFactory(k8sClientSet, resyncDuration) + litmusFactory = litmusInformer.NewSharedInformerFactory(litmusClientSet, resyncDuration) + } else { + factory = informers.NewSharedInformerFactoryWithOptions(k8sClientSet, resyncDuration, informers.WithNamespace(watchNamespace)) + litmusFactory = litmusInformer.NewSharedInformerFactoryWithOptions(litmusClientSet, resyncDuration, litmusInformer.WithNamespace(watchNamespace)) + } + + eventsInformer := factory.Core().V1().Events().Informer() + clientSets.EventsInformer = factory.Core().V1().Events().Lister() + + chaosEngineInformer := litmusFactory.Litmuschaos().V1alpha1().ChaosEngines().Informer() + chaosResultInformer := litmusFactory.Litmuschaos().V1alpha1().ChaosResults().Informer() + + // queue up for processing if there is any change in the resources + chaosEngineInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + wq.Add(ProcessKey) + }, + UpdateFunc: func(old, new interface{}) { + wq.Add(ProcessKey) + }, + DeleteFunc: func(obj interface{}) { + wq.Add(ProcessKey) + }, + }) + chaosResultInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + wq.Add(ProcessKey) + }, + UpdateFunc: func(old, new interface{}) { + wq.Add(ProcessKey) + }, + DeleteFunc: func(obj interface{}) { + wq.Add(ProcessKey) + }, + }) + + clientSets.EngineInformer = litmusFactory.Litmuschaos().V1alpha1().ChaosEngines().Lister() + clientSets.ResultInformer = litmusFactory.Litmuschaos().V1alpha1().ChaosResults().Lister() + + go eventsInformer.Run(stopCh) + go chaosEngineInformer.Run(stopCh) + go chaosResultInformer.Run(stopCh) + + if !cache.WaitForCacheSync(stopCh, eventsInformer.HasSynced, chaosEngineInformer.HasSynced, chaosResultInformer.HasSynced) { + return fmt.Errorf("timed out waiting for caches to sync") + } return nil } diff --git a/tests/bdd/bdd_test.go b/tests/bdd/bdd_test.go index f38a69bf..f8852413 100644 --- a/tests/bdd/bdd_test.go +++ b/tests/bdd/bdd_test.go @@ -21,14 +21,15 @@ import ( "flag" "fmt" "io/ioutil" - k8serrors "k8s.io/apimachinery/pkg/api/errors" "net/http" "os" "os/exec" "testing" "time" - v1alpha1 "github.com/litmuschaos/chaos-operator/api/litmuschaos/v1alpha1" + k8serrors "k8s.io/apimachinery/pkg/api/errors" + + "github.com/litmuschaos/chaos-operator/api/litmuschaos/v1alpha1" "github.com/litmuschaos/litmus-go/pkg/utils/retry" "github.com/pkg/errors"