diff --git a/build/charts/theia/crds/anomaly-detector-crd.yaml b/build/charts/theia/crds/anomaly-detector-crd.yaml index 686e69d0..f041d944 100644 --- a/build/charts/theia/crds/anomaly-detector-crd.yaml +++ b/build/charts/theia/crds/anomaly-detector-crd.yaml @@ -33,9 +33,9 @@ spec: type: array items: type: string - aggflow: + aggFlow: type: string - pod2podlabel: + pod2PodLabel: type: string executorInstances: type: integer diff --git a/pkg/apis/crd/v1alpha1/types.go b/pkg/apis/crd/v1alpha1/types.go index 1fcd7e70..d66fdeb2 100644 --- a/pkg/apis/crd/v1alpha1/types.go +++ b/pkg/apis/crd/v1alpha1/types.go @@ -98,8 +98,8 @@ type ThroughputAnomalyDetectorSpec struct { StartInterval metav1.Time `json:"startInterval,omitempty"` EndInterval metav1.Time `json:"endInterval,omitempty"` NSIgnoreList []string `json:"nsIgnoreList,omitempty"` - AggregatedFlow string `json:"aggflow,omitempty"` - Pod2PodLabel string `json:"pod2podlabel,omitempty"` + AggregatedFlow string `json:"aggFlow,omitempty"` + Pod2PodLabel string `json:"pod2PodLabel,omitempty"` ExecutorInstances int `json:"executorInstances,omitempty"` DriverCoreRequest string `json:"driverCoreRequest,omitempty"` DriverMemory string `json:"driverMemory,omitempty"` diff --git a/pkg/apis/intelligence/v1alpha1/types.go b/pkg/apis/intelligence/v1alpha1/types.go index d979bfdc..fd1433e3 100644 --- a/pkg/apis/intelligence/v1alpha1/types.go +++ b/pkg/apis/intelligence/v1alpha1/types.go @@ -73,8 +73,8 @@ type ThroughputAnomalyDetector struct { EndInterval metav1.Time `json:"endInterval,omitempty"` ExecutorInstances int `json:"executorInstances,omitempty"` NSIgnoreList []string `json:"nsIgnoreList,omitempty"` - AggregatedFlow string `json:"aggflow,omitempty"` - Pod2PodLabel string `json:"pod2podlabel,omitempty"` + AggregatedFlow string `json:"aggFlow,omitempty"` + Pod2PodLabel string `json:"pod2PodLabel,omitempty"` DriverCoreRequest string `json:"driverCoreRequest,omitempty"` DriverMemory string `json:"driverMemory,omitempty"` ExecutorCoreRequest string `json:"executorCoreRequest,omitempty"` diff --git a/pkg/apiserver/registry/intelligence/throughputanomalydetector/rest.go b/pkg/apiserver/registry/intelligence/throughputanomalydetector/rest.go index 2aff1a27..fe511e13 100644 --- a/pkg/apiserver/registry/intelligence/throughputanomalydetector/rest.go +++ b/pkg/apiserver/registry/intelligence/throughputanomalydetector/rest.go @@ -34,9 +34,9 @@ import ( const ( defaultNameSpace = "flow-visibility" tadQuery int = iota - aggtadpodQuery - aggtadpod2podQuery - aggtadpod2svcQuery + aggTadPodQuery + aggTadPod2PodQuery + aggTadPod2SvcQuery ) // REST implements rest.Storage for anomalydetector. @@ -57,7 +57,7 @@ var ( var queryMap = map[int]string{ tadQuery: ` - SELECT + SELECT id, sourceIP, sourceTransportPort, @@ -71,44 +71,44 @@ var queryMap = map[int]string{ algoCalc, anomaly FROM tadetector WHERE id = (?);`, - aggtadpodQuery: ` + aggTadPodQuery: ` SELECT - id, - sourcePodNamespace, - sourcePodLabels, - flowEndSeconds, - throughput, + id, + sourcePodNamespace, + sourcePodLabels, + flowEndSeconds, + throughput, aggType, - algoType, - algoCalc, - anomaly + algoType, + algoCalc, + anomaly FROM tadetector WHERE id = (?);`, - aggtadpod2podQuery: ` + aggTadPod2PodQuery: ` SELECT - id, - sourcePodNamespace, - sourcePodLabels, - destinationPodNamespace, - destinationPodLabels, - flowEndSeconds, - throughput, + id, + sourcePodNamespace, + sourcePodLabels, + destinationPodNamespace, + destinationPodLabels, + flowEndSeconds, + throughput, aggType, - algoType, - algoCalc, - anomaly + algoType, + algoCalc, + anomaly FROM tadetector WHERE id = (?);`, - aggtadpod2svcQuery: ` + aggTadPod2SvcQuery: ` SELECT - id, - sourcePodNamespace, - sourcePodLabels, - destinationServicePortName, - flowEndSeconds, - throughput, + id, + sourcePodNamespace, + sourcePodLabels, + destinationServicePortName, + flowEndSeconds, + throughput, aggType, - algoType, - algoCalc, - anomaly + algoType, + algoCalc, + anomaly FROM tadetector WHERE id = (?);`, } @@ -230,11 +230,11 @@ func (r *REST) getTADetectorResult(id string, tad *v1alpha1.ThroughputAnomalyDet query := tadQuery switch tad.AggregatedFlow { case "pod": - query = aggtadpodQuery + query = aggTadPodQuery case "pod2pod": - query = aggtadpod2podQuery + query = aggTadPod2PodQuery case "pod2svc": - query = aggtadpod2svcQuery + query = aggTadPod2SvcQuery } if r.clickhouseConnect == nil { r.clickhouseConnect, err = setupClickHouseConnection(nil) @@ -256,21 +256,21 @@ func (r *REST) getTADetectorResult(id string, tad *v1alpha1.ThroughputAnomalyDet return fmt.Errorf("failed to scan Throughput Anomaly Detector results: %v", err) } tad.Stats = append(tad.Stats, res) - case aggtadpodQuery: + case aggTadPodQuery: res := v1alpha1.ThroughputAnomalyDetectorStats{} err := rows.Scan(&res.Id, &res.SourcePodNamespace, &res.SourcePodLabels, &res.FlowEndSeconds, &res.Throughput, &res.AggType, &res.AlgoType, &res.AlgoCalc, &res.Anomaly) if err != nil { return fmt.Errorf("failed to scan Throughput Anomaly Detector pod Aggregate results: %v", err) } tad.Stats = append(tad.Stats, res) - case aggtadpod2podQuery: + case aggTadPod2PodQuery: res := v1alpha1.ThroughputAnomalyDetectorStats{} err := rows.Scan(&res.Id, &res.SourcePodNamespace, &res.SourcePodLabels, &res.DestinationPodNamespace, &res.DestinationPodLabels, &res.FlowEndSeconds, &res.Throughput, &res.AggType, &res.AlgoType, &res.AlgoCalc, &res.Anomaly) if err != nil { return fmt.Errorf("failed to scan Throughput Anomaly Detector pod to pod Aggregate results: %v", err) } tad.Stats = append(tad.Stats, res) - case aggtadpod2svcQuery: + case aggTadPod2SvcQuery: res := v1alpha1.ThroughputAnomalyDetectorStats{} err := rows.Scan(&res.Id, &res.SourcePodNamespace, &res.SourcePodLabels, &res.DestinationServicePortName, &res.FlowEndSeconds, &res.Throughput, &res.AggType, &res.AlgoType, &res.AlgoCalc, &res.Anomaly) if err != nil { diff --git a/pkg/apiserver/registry/intelligence/throughputanomalydetector/rest_test.go b/pkg/apiserver/registry/intelligence/throughputanomalydetector/rest_test.go index 72f9ff59..9202fe9f 100644 --- a/pkg/apiserver/registry/intelligence/throughputanomalydetector/rest_test.go +++ b/pkg/apiserver/registry/intelligence/throughputanomalydetector/rest_test.go @@ -232,7 +232,7 @@ func Test_getTadetectorResult(t *testing.T) { { name: "Get aggtadquery pod2external result", id: "tad-2", - query: aggtadpodQuery, + query: aggTadPodQuery, returnedRow: sqlmock.NewRows([]string{ "Id", "SourcePodNamespace", "SourcePodLabels", "FlowEndSeconds", "Throughput", "AggType", "AlgoType", "AlgoCalc", "Anomaly"}). AddRow("mock_Id", "mock_SourcePodNamespace", "mock_SourcePodLabels", "mock_FlowEndSeconds", "mock_Throughput", "mock_AggType", "mock_AlgoType", "mock_AlgoCalc", "mock_Anomaly"), @@ -254,7 +254,7 @@ func Test_getTadetectorResult(t *testing.T) { { name: "Get aggtadquery pod2pod result", id: "tad-3", - query: aggtadpod2podQuery, + query: aggTadPod2PodQuery, returnedRow: sqlmock.NewRows([]string{ "Id", "SourcePodNamespace", "SourcePodLabels", "DestinationPodNamespace", "DestinationPodLabels", "FlowEndSeconds", "Throughput", "AggType", "AlgoType", "AlgoCalc", "Anomaly"}). AddRow("mock_Id", "mock_SourcePodNamespace", "mock_SourcePodLabels", "mock_DestinationPodNamespace", "mock_DestinationPodLabels", "mock_FlowEndSeconds", "mock_Throughput", "mock_AggType", "mock_AlgoType", "mock_AlgoCalc", "mock_Anomaly"), @@ -278,7 +278,7 @@ func Test_getTadetectorResult(t *testing.T) { { name: "Get aggtadquery pod2svc result", id: "tad-2", - query: aggtadpod2svcQuery, + query: aggTadPod2SvcQuery, returnedRow: sqlmock.NewRows([]string{ "Id", "SourcePodNamespace", "SourcePodLabels", "DestinationServicePortName", "FlowEndSeconds", "Throughput", "AggType", "AlgoType", "AlgoCalc", "Anomaly"}). AddRow("mock_Id", "mock_SourcePodNamespace", "mock_SourcePodLabels", "mock_DestinationServicePortName", "mock_FlowEndSeconds", "mock_Throughput", "mock_AggType", "mock_AlgoType", "mock_AlgoCalc", "mock_Anomaly"), @@ -310,11 +310,11 @@ func Test_getTadetectorResult(t *testing.T) { r := NewREST(&fakeQuerier{}) var tad v1alpha1.ThroughputAnomalyDetector switch tt.query { - case aggtadpod2podQuery: + case aggTadPod2PodQuery: tad.AggregatedFlow = "pod2pod" - case aggtadpodQuery: + case aggTadPodQuery: tad.AggregatedFlow = "pod" - case aggtadpod2svcQuery: + case aggTadPod2SvcQuery: tad.AggregatedFlow = "pod2svc" } err = r.getTADetectorResult(tt.id, &tad) diff --git a/pkg/controller/anomalydetector/controller_test.go b/pkg/controller/anomalydetector/controller_test.go index 5cc7bd8d..e089322a 100644 --- a/pkg/controller/anomalydetector/controller_test.go +++ b/pkg/controller/anomalydetector/controller_test.go @@ -399,10 +399,10 @@ func TestTADetection(t *testing.T) { expectedErrorMsg: "invalid request: Throughput Anomaly Detector aggregated flow type should be 'pod' or 'pod2pod' or 'pod2svc'", }, { - name: "invalid Aggregatedflow pod2podlabel combo", - tadName: "tad-invalid-agg-flow-pod2podlabel-combo", + name: "invalid Aggregatedflow pod2PodLabel combo", + tadName: "tad-invalid-agg-flow-pod2PodLabel-combo", tad: &crdv1alpha1.ThroughputAnomalyDetector{ - ObjectMeta: metav1.ObjectMeta{Name: "tad-invalid-agg-flow-pod2podlabel-combo", Namespace: testNamespace}, + ObjectMeta: metav1.ObjectMeta{Name: "tad-invalid-agg-flow-pod2PodLabel-combo", Namespace: testNamespace}, Spec: crdv1alpha1.ThroughputAnomalyDetectorSpec{ JobType: "ARIMA", AggregatedFlow: "pod", diff --git a/pkg/theia/commands/anomaly_detection_delete.go b/pkg/theia/commands/anomaly_detection_delete.go index aaabac8b..6024eab3 100644 --- a/pkg/theia/commands/anomaly_detection_delete.go +++ b/pkg/theia/commands/anomaly_detection_delete.go @@ -50,7 +50,7 @@ func anomalyDetectionDelete(cmd *cobra.Command, args []string) error { } tadNameList := strings.Fields(tadName) for _, tadName := range tadNameList { - err = deleteTADid(cmd, tadName) + err = deleteTADId(cmd, tadName) if err != nil { return err } @@ -68,7 +68,7 @@ func init() { ) } -func deleteTADid(cmd *cobra.Command, tadName string) error { +func deleteTADId(cmd *cobra.Command, tadName string) error { err := util.ParseADAlgorithmID(tadName) if err != nil { return err diff --git a/pkg/theia/commands/anomaly_detection_retrieve.go b/pkg/theia/commands/anomaly_detection_retrieve.go index 73ef328d..2e350f74 100644 --- a/pkg/theia/commands/anomaly_detection_retrieve.go +++ b/pkg/theia/commands/anomaly_detection_retrieve.go @@ -18,7 +18,6 @@ import ( "encoding/json" "fmt" "os" - "text/tabwriter" "github.com/spf13/cobra" @@ -105,31 +104,30 @@ func throughputAnomalyDetectionRetrieve(cmd *cobra.Command, args []string) error } return nil } else { - w := tabwriter.NewWriter(os.Stdout, 15, 8, 1, '\t', tabwriter.AlignRight) + var result [][]string switch tad.Stats[0].AggType { case "e2e": - fmt.Fprintf(w, "id\tsourceIP\tsourceTransportPort\tdestinationIP\tdestinationTransportPort\tflowStartSeconds\tflowEndSeconds\tthroughput\taggType\talgoType\talgoCalc\tanomaly\n") + result = append(result, []string{"id", "sourceIP", "sourceTransportPort", "destinationIP", "destinationTransportPort", "flowStartSeconds", "flowEndSeconds", "throughput", "aggType", "algoType", "algoCalc", "anomaly"}) for _, p := range tad.Stats { - fmt.Fprintf(w, "%v\t%v\t%v\t%v\t%v\t%v\t%v\t%v\t%v\t%v\t%v\t%v\n", p.Id, p.SourceIP, p.SourceTransportPort, p.DestinationIP, p.DestinationTransportPort, p.FlowStartSeconds, p.FlowEndSeconds, p.Throughput, p.AggType, p.AlgoType, p.AlgoCalc, p.Anomaly) + result = append(result, []string{p.Id, p.SourceIP, p.SourceTransportPort, p.DestinationIP, p.DestinationTransportPort, p.FlowStartSeconds, p.FlowEndSeconds, p.Throughput, p.AggType, p.AlgoType, p.AlgoCalc, p.Anomaly}) } case "pod_to_external": - fmt.Fprintf(w, "id\tsourcePodNamespace\tsourcePodLabels\tflowEndSeconds\tthroughput\taggType\talgoType\talgoCalc\tanomaly\n") + result = append(result, []string{"id", "sourcePodNamespace", "sourcePodLabels", "flowEndSeconds", "throughput", "aggType", "algoType", "algoCalc", "anomaly"}) for _, p := range tad.Stats { - fmt.Fprintf(w, "%v\t%v\t%v\t%v\t%v\t%v\t%v\t%v\t%v\n", p.Id, p.SourcePodNamespace, p.SourcePodLabels, p.FlowEndSeconds, p.Throughput, p.AggType, p.AlgoType, p.AlgoCalc, p.Anomaly) + result = append(result, []string{p.Id, p.SourcePodNamespace, p.SourcePodLabels, p.FlowEndSeconds, p.Throughput, p.AggType, p.AlgoType, p.AlgoCalc, p.Anomaly}) } case "pod_to_pod": - fmt.Fprintf(w, "id\tsourcePodNamespace\tsourcePodLabels\tdestinationPodNamespace\tdestinationPodLabels\tflowEndSeconds\tthroughput\taggType\talgoType\talgoCalc\tanomaly\n") + result = append(result, []string{"id", "sourcePodNamespace", "sourcePodLabels", "destinationPodNamespace", "destinationPodLabels", "flowEndSeconds", "throughput", "aggType", "algoType", "algoCalc", "anomaly"}) for _, p := range tad.Stats { - fmt.Fprintf(w, "%v\t%v\t%v\t%v\t%v\t%v\t%v\t%v\t%v\t%v\t%v\n", p.Id, p.SourcePodNamespace, p.SourcePodLabels, p.DestinationPodNamespace, p.DestinationPodLabels, p.FlowEndSeconds, p.Throughput, p.AggType, p.AlgoType, p.AlgoCalc, p.Anomaly) + result = append(result, []string{p.Id, p.SourcePodNamespace, p.SourcePodLabels, p.DestinationPodNamespace, p.DestinationPodLabels, p.FlowEndSeconds, p.Throughput, p.AggType, p.AlgoType, p.AlgoCalc, p.Anomaly}) } case "pod_to_svc": - fmt.Fprintf(w, "id\tsourcePodNamespace\tsourcePodLabels\tdestinationServicePortName\tflowEndSeconds\tthroughput\taggType\talgoType\talgoCalc\tanomaly\n") + result = append(result, []string{"id", "sourcePodNamespace", "sourcePodLabels", "destinationServicePortName", "flowEndSeconds", "throughput", "aggType", "algoType", "algoCalc", "anomaly"}) for _, p := range tad.Stats { - fmt.Fprintf(w, "%v\t%v\t%v\t%v\t%v\t%v\t%v\t%v\t%v\t%v\n", p.Id, p.SourcePodNamespace, p.SourcePodLabels, p.DestinationServicePortName, p.FlowEndSeconds, p.Throughput, p.AggType, p.AlgoType, p.AlgoCalc, p.Anomaly) + result = append(result, []string{p.Id, p.SourcePodNamespace, p.SourcePodLabels, p.DestinationServicePortName, p.FlowEndSeconds, p.Throughput, p.AggType, p.AlgoType, p.AlgoCalc, p.Anomaly}) } } - w.Flush() - fmt.Printf("\n") + TableOutput(result) } return nil } diff --git a/pkg/theia/commands/anomaly_detection_retrieve_test.go b/pkg/theia/commands/anomaly_detection_retrieve_test.go index 9b99c6d8..2d4d625e 100644 --- a/pkg/theia/commands/anomaly_detection_retrieve_test.go +++ b/pkg/theia/commands/anomaly_detection_retrieve_test.go @@ -66,7 +66,7 @@ func TestAnomalyDetectorRetrieve(t *testing.T) { } })), tadName: "tad-1234abcd-1234-abcd-12ab-12345678abcd", - expectedMsg: []string{"id sourceIP sourceTransportPort destinationIP destinationTransportPort flowStartSeconds flowEndSeconds throughput aggType algoType algoCalc anomaly", "e2e 1234567 true"}, + expectedMsg: []string{"id sourceIP sourceTransportPort destinationIP destinationTransportPort flowStartSeconds flowEndSeconds throughput aggType algoType algoCalc anomaly", "e2e 1234567 true"}, expectedErrorMsg: "", }, { @@ -91,7 +91,7 @@ func TestAnomalyDetectorRetrieve(t *testing.T) { } })), tadName: "tad-1234abcd-1234-abcd-12ab-12345678abcd", - expectedMsg: []string{"id sourcePodNamespace sourcePodLabels flowEndSeconds throughput aggType algoType algoCalc anomaly", "pod_to_external 1234567 true"}, + expectedMsg: []string{"id sourcePodNamespace sourcePodLabels flowEndSeconds throughput aggType algoType algoCalc anomaly", "pod_to_external 1234567 true"}, expectedErrorMsg: "", }, { @@ -116,7 +116,7 @@ func TestAnomalyDetectorRetrieve(t *testing.T) { } })), tadName: "tad-1234abcd-1234-abcd-12ab-12345678abcd", - expectedMsg: []string{"id sourcePodNamespace sourcePodLabels destinationPodNamespace destinationPodLabels flowEndSeconds throughput aggType algoType algoCalc anomaly", "pod_to_pod 1234567 true"}, + expectedMsg: []string{"id sourcePodNamespace sourcePodLabels destinationPodNamespace destinationPodLabels flowEndSeconds throughput aggType algoType algoCalc anomaly", "pod_to_pod 1234567 true"}, expectedErrorMsg: "", }, { @@ -141,7 +141,7 @@ func TestAnomalyDetectorRetrieve(t *testing.T) { } })), tadName: "tad-1234abcd-1234-abcd-12ab-12345678abcd", - expectedMsg: []string{"id sourcePodNamespace sourcePodLabels destinationServicePortName flowEndSeconds throughput aggType algoType algoCalc anomaly", "pod_to_svc 1234567 true"}, + expectedMsg: []string{"id sourcePodNamespace sourcePodLabels destinationServicePortName flowEndSeconds throughput aggType algoType algoCalc anomaly", "pod_to_svc 1234567 true"}, expectedErrorMsg: "", }, { diff --git a/pkg/theia/commands/anomaly_detection_run.go b/pkg/theia/commands/anomaly_detection_run.go index 7b72746e..5cffa4d3 100644 --- a/pkg/theia/commands/anomaly_detection_run.go +++ b/pkg/theia/commands/anomaly_detection_run.go @@ -159,15 +159,15 @@ be a list of namespace string, for example: '["kube-system","flow-aggregator","f throughputAnomalyDetection.AggregatedFlow = aggregatedFlow } - pod2podlabel, err := cmd.Flags().GetString("p2p-label") + pod2PodLabel, err := cmd.Flags().GetString("p2p-label") if err != nil { return err } - if pod2podlabel != "" { + if pod2PodLabel != "" { if aggregatedFlow != "pod2pod" { - return fmt.Errorf("pop2podlabel can only be mentioned with aggregatedFlow as pod2pod, instead found %v", aggregatedFlow) + return fmt.Errorf("pod2PodLabel can only be mentioned with aggregatedFlow as pod2pod, instead found %v", aggregatedFlow) } - throughputAnomalyDetection.Pod2PodLabel = pod2podlabel + throughputAnomalyDetection.Pod2PodLabel = pod2PodLabel } tadID := uuid.New().String()