From 842d8e8bc7a894600880b063415f0247be10245c Mon Sep 17 00:00:00 2001 From: Tushar Tathgur Date: Mon, 27 Mar 2023 12:32:29 -0700 Subject: [PATCH] Aggregated Throughput Anomaly Detection This PR does the following: - Implements argument "agg-flow" and "p2p-label" for aggregated flow. - Aggregated flow contains Pods to external, pods to pods based of labels and pod to service flows. - New retrieve table has been added for aggregated TAD. - Modified retrieve table for TAD, new fields include agg_type and algo_type for better understanding. - TAD delete command can now take multiple tad ids to delete. partially solves: #168 Signed-off-by: Tushar Tathgur --- .../theia/crds/anomaly-detector-crd.yaml | 4 + .../provisioning/datasources/create_table.sh | 8 +- .../migrators/000005_0-5-0.down.sql | 13 + .../datasources/migrators/000005_0-5-0.up.sql | 13 + build/yamls/flow-visibility.yml | 40 ++- pkg/apis/crd/v1alpha1/types.go | 2 + pkg/apis/intelligence/v1alpha1/types.go | 29 +- .../throughputanomalydetector/rest.go | 93 +++++- .../throughputanomalydetector/rest_test.go | 165 +++++++++-- pkg/controller/anomalydetector/controller.go | 17 +- .../anomalydetector/controller_test.go | 31 +- .../commands/anomaly_detection_delete.go | 40 ++- .../commands/anomaly_detection_retrieve.go | 24 +- .../anomaly_detection_retrieve_test.go | 80 ++++- pkg/theia/commands/anomaly_detection_run.go | 30 +- .../commands/anomaly_detection_run_test.go | 2 + .../anomaly-detection/anomaly_detection.py | 278 ++++++++++++++---- .../anomaly_detection_test.py | 156 ++++++---- test/e2e/throughputanomalydetection_test.go | 8 +- 19 files changed, 851 insertions(+), 182 deletions(-) create mode 100644 build/charts/theia/provisioning/datasources/migrators/000005_0-5-0.down.sql create mode 100644 build/charts/theia/provisioning/datasources/migrators/000005_0-5-0.up.sql diff --git a/build/charts/theia/crds/anomaly-detector-crd.yaml b/build/charts/theia/crds/anomaly-detector-crd.yaml index 1c895af6..686e69d0 100644 --- a/build/charts/theia/crds/anomaly-detector-crd.yaml +++ b/build/charts/theia/crds/anomaly-detector-crd.yaml @@ -33,6 +33,10 @@ spec: type: array items: type: string + aggflow: + type: string + pod2podlabel: + type: string executorInstances: type: integer driverCoreRequest: diff --git a/build/charts/theia/provisioning/datasources/create_table.sh b/build/charts/theia/provisioning/datasources/create_table.sh index a9400c6c..64cbbf06 100644 --- a/build/charts/theia/provisioning/datasources/create_table.sh +++ b/build/charts/theia/provisioning/datasources/create_table.sh @@ -293,15 +293,21 @@ clickhouse client -n -h 127.0.0.1 <<-EOSQL destinationTransportPort UInt16, protocolIdentifier UInt16, flowStartSeconds DateTime, + sourcePodNamespace String, + sourcePodLabels String, + destinationPodNamespace String, + destinationPodLabels String, + destinationServicePortName String, flowEndSeconds DateTime, throughputStandardDeviation Float64, + aggType String, algoType String, algoCalc Float64, throughput Float64, anomaly String, id String ) engine=ReplicatedMergeTree('/clickhouse/tables/{shard}/{database}/{table}', '{replica}') - ORDER BY (flowStartSeconds); + ORDER BY (flowEndSeconds); --Create distributed tables for cluster CREATE TABLE IF NOT EXISTS flows AS flows_local diff --git a/build/charts/theia/provisioning/datasources/migrators/000005_0-5-0.down.sql b/build/charts/theia/provisioning/datasources/migrators/000005_0-5-0.down.sql new file mode 100644 index 00000000..8a157afc --- /dev/null +++ b/build/charts/theia/provisioning/datasources/migrators/000005_0-5-0.down.sql @@ -0,0 +1,13 @@ +--Drop aggregated TAD columns +ALTER TABLE tadetector DROP COLUMN sourcePodNamespace String; +ALTER TABLE tadetector_local DROP COLUMN sourcePodNamespace String; +ALTER TABLE tadetector DROP COLUMN sourcePodLabels String; +ALTER TABLE tadetector_local DROP COLUMN sourcePodLabels String; +ALTER TABLE tadetector DROP COLUMN destinationPodNamespace String; +ALTER TABLE tadetector_local DROP COLUMN destinationPodNamespace String; +ALTER TABLE tadetector DROP COLUMN destinationPodLabels String; +ALTER TABLE tadetector_local DROP COLUMN destinationPodLabels String; +ALTER TABLE tadetector DROP COLUMN destinationServicePortName String; +ALTER TABLE tadetector_local DROP COLUMN destinationServicePortName String; +ALTER TABLE tadetector DROP COLUMN aggType String; +ALTER TABLE tadetector_local DROP COLUMN aggType String; diff --git a/build/charts/theia/provisioning/datasources/migrators/000005_0-5-0.up.sql b/build/charts/theia/provisioning/datasources/migrators/000005_0-5-0.up.sql new file mode 100644 index 00000000..6e4bca2f --- /dev/null +++ b/build/charts/theia/provisioning/datasources/migrators/000005_0-5-0.up.sql @@ -0,0 +1,13 @@ +--Add aggregated TAD columns +ALTER TABLE tadetector ADD COLUMN sourcePodNamespace String; +ALTER TABLE tadetector_local ADD COLUMN sourcePodNamespace String; +ALTER TABLE tadetector ADD COLUMN sourcePodLabels String; +ALTER TABLE tadetector_local ADD COLUMN sourcePodLabels String; +ALTER TABLE tadetector ADD COLUMN destinationPodNamespace String; +ALTER TABLE tadetector_local ADD COLUMN destinationPodNamespace String; +ALTER TABLE tadetector ADD COLUMN destinationPodLabels String; +ALTER TABLE tadetector_local ADD COLUMN destinationPodLabels String; +ALTER TABLE tadetector ADD COLUMN destinationServicePortName String; +ALTER TABLE tadetector_local ADD COLUMN destinationServicePortName String; +ALTER TABLE tadetector ADD COLUMN aggType String; +ALTER TABLE tadetector_local ADD COLUMN aggType String; diff --git a/build/yamls/flow-visibility.yml b/build/yamls/flow-visibility.yml index bca83da9..f08e34c3 100644 --- a/build/yamls/flow-visibility.yml +++ b/build/yamls/flow-visibility.yml @@ -388,6 +388,34 @@ data: CREATE TABLE IF NOT EXISTS tadetector AS tadetector_local engine=Distributed('{cluster}', default, tadetector_local, rand()); + 000005_0-5-0.down.sql: | + --Drop aggregated TAD columns + ALTER TABLE tadetector DROP COLUMN sourcePodNamespace String; + ALTER TABLE tadetector_local DROP COLUMN sourcePodNamespace String; + ALTER TABLE tadetector DROP COLUMN sourcePodLabels String; + ALTER TABLE tadetector_local DROP COLUMN sourcePodLabels String; + ALTER TABLE tadetector DROP COLUMN destinationPodNamespace String; + ALTER TABLE tadetector_local DROP COLUMN destinationPodNamespace String; + ALTER TABLE tadetector DROP COLUMN destinationPodLabels String; + ALTER TABLE tadetector_local DROP COLUMN destinationPodLabels String; + ALTER TABLE tadetector DROP COLUMN destinationServicePortName String; + ALTER TABLE tadetector_local DROP COLUMN destinationServicePortName String; + ALTER TABLE tadetector DROP COLUMN aggType String; + ALTER TABLE tadetector_local DROP COLUMN aggType String; + 000005_0-5-0.up.sql: | + --Add aggregated TAD columns + ALTER TABLE tadetector ADD COLUMN sourcePodNamespace String; + ALTER TABLE tadetector_local ADD COLUMN sourcePodNamespace String; + ALTER TABLE tadetector ADD COLUMN sourcePodLabels String; + ALTER TABLE tadetector_local ADD COLUMN sourcePodLabels String; + ALTER TABLE tadetector ADD COLUMN destinationPodNamespace String; + ALTER TABLE tadetector_local ADD COLUMN destinationPodNamespace String; + ALTER TABLE tadetector ADD COLUMN destinationPodLabels String; + ALTER TABLE tadetector_local ADD COLUMN destinationPodLabels String; + ALTER TABLE tadetector ADD COLUMN destinationServicePortName String; + ALTER TABLE tadetector_local ADD COLUMN destinationServicePortName String; + ALTER TABLE tadetector ADD COLUMN aggType String; + ALTER TABLE tadetector_local ADD COLUMN aggType String; create_table.sh: | #!/usr/bin/env bash @@ -675,15 +703,21 @@ data: destinationTransportPort UInt16, protocolIdentifier UInt16, flowStartSeconds DateTime, + sourcePodNamespace String, + sourcePodLabels String, + destinationPodNamespace String, + destinationPodLabels String, + destinationServicePortName String, flowEndSeconds DateTime, throughputStandardDeviation Float64, + aggType String, algoType String, algoCalc Float64, throughput Float64, anomaly String, id String ) engine=ReplicatedMergeTree('/clickhouse/tables/{shard}/{database}/{table}', '{replica}') - ORDER BY (flowStartSeconds); + ORDER BY (flowEndSeconds); --Create distributed tables for cluster CREATE TABLE IF NOT EXISTS flows AS flows_local @@ -6850,6 +6884,10 @@ spec: path: migrators/000004_0-4-0.down.sql - key: 000004_0-4-0.up.sql path: migrators/000004_0-4-0.up.sql + - key: 000005_0-5-0.down.sql + path: migrators/000005_0-5-0.down.sql + - key: 000005_0-5-0.up.sql + path: migrators/000005_0-5-0.up.sql name: clickhouse-mounted-configmap name: clickhouse-configmap-volume - emptyDir: diff --git a/pkg/apis/crd/v1alpha1/types.go b/pkg/apis/crd/v1alpha1/types.go index ff5ca0ce..1fcd7e70 100644 --- a/pkg/apis/crd/v1alpha1/types.go +++ b/pkg/apis/crd/v1alpha1/types.go @@ -98,6 +98,8 @@ type ThroughputAnomalyDetectorSpec struct { StartInterval metav1.Time `json:"startInterval,omitempty"` EndInterval metav1.Time `json:"endInterval,omitempty"` NSIgnoreList []string `json:"nsIgnoreList,omitempty"` + AggregatedFlow string `json:"aggflow,omitempty"` + Pod2PodLabel string `json:"pod2podlabel,omitempty"` ExecutorInstances int `json:"executorInstances,omitempty"` DriverCoreRequest string `json:"driverCoreRequest,omitempty"` DriverMemory string `json:"driverMemory,omitempty"` diff --git a/pkg/apis/intelligence/v1alpha1/types.go b/pkg/apis/intelligence/v1alpha1/types.go index c86045dc..d979bfdc 100644 --- a/pkg/apis/intelligence/v1alpha1/types.go +++ b/pkg/apis/intelligence/v1alpha1/types.go @@ -73,6 +73,8 @@ type ThroughputAnomalyDetector struct { EndInterval metav1.Time `json:"endInterval,omitempty"` ExecutorInstances int `json:"executorInstances,omitempty"` NSIgnoreList []string `json:"nsIgnoreList,omitempty"` + AggregatedFlow string `json:"aggflow,omitempty"` + Pod2PodLabel string `json:"pod2podlabel,omitempty"` DriverCoreRequest string `json:"driverCoreRequest,omitempty"` DriverMemory string `json:"driverMemory,omitempty"` ExecutorCoreRequest string `json:"executorCoreRequest,omitempty"` @@ -100,14 +102,21 @@ type ThroughputAnomalyDetectorList struct { } type ThroughputAnomalyDetectorStats struct { - Id string `json:"id,omitempty"` - SourceIP string `json:"sourceIP,omitempty"` - SourceTransportPort string `json:"sourceTransportPort,omitempty"` - DestinationIP string `json:"destinationIP,omitempty"` - DestinationTransportPort string `json:"destinationTransportPort,omitempty"` - FlowStartSeconds string `json:"FlowStartSeconds,omitempty"` - FlowEndSeconds string `json:"FlowEndSeconds,omitempty"` - Throughput string `json:"Throughput,omitempty"` - AlgoCalc string `json:"AlgoCalc,omitempty"` - Anomaly string `json:"anomaly,omitempty"` + Id string `json:"id,omitempty"` + SourceIP string `json:"sourceIP,omitempty"` + SourceTransportPort string `json:"sourceTransportPort,omitempty"` + DestinationIP string `json:"destinationIP,omitempty"` + DestinationTransportPort string `json:"destinationTransportPort,omitempty"` + FlowStartSeconds string `json:"FlowStartSeconds,omitempty"` + SourcePodNamespace string `json:"sourcePodNamespace,omitempty"` + SourcePodLabels string `json:"sourcePodLabels,omitempty"` + DestinationPodNamespace string `json:"destinationPodNamespace,omitempty"` + DestinationPodLabels string `json:"destinationPodLabels,omitempty"` + DestinationServicePortName string `json:"destinationServicePortName,omitempty"` + FlowEndSeconds string `json:"FlowEndSeconds,omitempty"` + Throughput string `json:"throughput,omitempty"` + AggType string `json:"aggType,omitempty"` + AlgoType string `json:"algoType,omitempty"` + AlgoCalc string `json:"AlgoCalc,omitempty"` + Anomaly string `json:"anomaly,omitempty"` } diff --git a/pkg/apiserver/registry/intelligence/throughputanomalydetector/rest.go b/pkg/apiserver/registry/intelligence/throughputanomalydetector/rest.go index 96fbc3f2..2aff1a27 100644 --- a/pkg/apiserver/registry/intelligence/throughputanomalydetector/rest.go +++ b/pkg/apiserver/registry/intelligence/throughputanomalydetector/rest.go @@ -34,6 +34,9 @@ import ( const ( defaultNameSpace = "flow-visibility" tadQuery int = iota + aggtadpodQuery + aggtadpod2podQuery + aggtadpod2svcQuery ) // REST implements rest.Storage for anomalydetector. @@ -63,9 +66,50 @@ var queryMap = map[int]string{ flowStartSeconds, flowEndSeconds, throughput, + aggType, + algoType, algoCalc, anomaly FROM tadetector WHERE id = (?);`, + aggtadpodQuery: ` + SELECT + id, + sourcePodNamespace, + sourcePodLabels, + flowEndSeconds, + throughput, + aggType, + algoType, + algoCalc, + anomaly + FROM tadetector WHERE id = (?);`, + aggtadpod2podQuery: ` + SELECT + id, + sourcePodNamespace, + sourcePodLabels, + destinationPodNamespace, + destinationPodLabels, + flowEndSeconds, + throughput, + aggType, + algoType, + algoCalc, + anomaly + FROM tadetector WHERE id = (?);`, + aggtadpod2svcQuery: ` + SELECT + id, + sourcePodNamespace, + sourcePodLabels, + destinationServicePortName, + flowEndSeconds, + throughput, + aggType, + algoType, + algoCalc, + anomaly + FROM tadetector WHERE id = (?);`, } // NewREST returns a REST object that will work against API services. @@ -102,6 +146,8 @@ func (r *REST) copyThroughputAnomalyDetector(tad *v1alpha1.ThroughputAnomalyDete tad.EndInterval = crd.Spec.EndInterval tad.ExecutorInstances = crd.Spec.ExecutorInstances tad.NSIgnoreList = crd.Spec.NSIgnoreList + tad.AggregatedFlow = crd.Spec.AggregatedFlow + tad.Pod2PodLabel = crd.Spec.Pod2PodLabel tad.DriverCoreRequest = crd.Spec.DriverCoreRequest tad.DriverMemory = crd.Spec.DriverMemory tad.ExecutorCoreRequest = crd.Spec.ExecutorCoreRequest @@ -170,6 +216,8 @@ func (r *REST) Create(ctx context.Context, obj runtime.Object, createValidation job.Spec.DriverMemory = newTAD.DriverMemory job.Spec.ExecutorCoreRequest = newTAD.ExecutorCoreRequest job.Spec.ExecutorMemory = newTAD.ExecutorMemory + job.Spec.AggregatedFlow = newTAD.AggregatedFlow + job.Spec.Pod2PodLabel = newTAD.Pod2PodLabel _, err := r.ThroughputAnomalyDetectorQuerier.CreateThroughputAnomalyDetector(defaultNameSpace, job) if err != nil { return nil, errors.NewBadRequest(fmt.Sprintf("error when creating ThroughputAnomalyDetection job: %+v, err: %v", job, err)) @@ -179,24 +227,57 @@ func (r *REST) Create(ctx context.Context, obj runtime.Object, createValidation func (r *REST) getTADetectorResult(id string, tad *v1alpha1.ThroughputAnomalyDetector) error { var err error + query := tadQuery + switch tad.AggregatedFlow { + case "pod": + query = aggtadpodQuery + case "pod2pod": + query = aggtadpod2podQuery + case "pod2svc": + query = aggtadpod2svcQuery + } if r.clickhouseConnect == nil { r.clickhouseConnect, err = setupClickHouseConnection(nil) if err != nil { return err } } - rows, err := r.clickhouseConnect.Query(queryMap[tadQuery], id) + rows, err := r.clickhouseConnect.Query(queryMap[query], id) if err != nil { return fmt.Errorf("failed to get Throughput Anomaly Detector results with id %s: %v", id, err) } defer rows.Close() for rows.Next() { - res := v1alpha1.ThroughputAnomalyDetectorStats{} - err := rows.Scan(&res.Id, &res.SourceIP, &res.SourceTransportPort, &res.DestinationIP, &res.DestinationTransportPort, &res.FlowStartSeconds, &res.FlowEndSeconds, &res.Throughput, &res.AlgoCalc, &res.Anomaly) - if err != nil { - return fmt.Errorf("failed to scan Throughput Anomaly Detector results: %v", err) + switch query { + case tadQuery: + res := v1alpha1.ThroughputAnomalyDetectorStats{} + err := rows.Scan(&res.Id, &res.SourceIP, &res.SourceTransportPort, &res.DestinationIP, &res.DestinationTransportPort, &res.FlowStartSeconds, &res.FlowEndSeconds, &res.Throughput, &res.AggType, &res.AlgoType, &res.AlgoCalc, &res.Anomaly) + if err != nil { + return fmt.Errorf("failed to scan Throughput Anomaly Detector results: %v", err) + } + tad.Stats = append(tad.Stats, res) + case aggtadpodQuery: + res := v1alpha1.ThroughputAnomalyDetectorStats{} + err := rows.Scan(&res.Id, &res.SourcePodNamespace, &res.SourcePodLabels, &res.FlowEndSeconds, &res.Throughput, &res.AggType, &res.AlgoType, &res.AlgoCalc, &res.Anomaly) + if err != nil { + return fmt.Errorf("failed to scan Throughput Anomaly Detector pod Aggregate results: %v", err) + } + tad.Stats = append(tad.Stats, res) + case aggtadpod2podQuery: + res := v1alpha1.ThroughputAnomalyDetectorStats{} + err := rows.Scan(&res.Id, &res.SourcePodNamespace, &res.SourcePodLabels, &res.DestinationPodNamespace, &res.DestinationPodLabels, &res.FlowEndSeconds, &res.Throughput, &res.AggType, &res.AlgoType, &res.AlgoCalc, &res.Anomaly) + if err != nil { + return fmt.Errorf("failed to scan Throughput Anomaly Detector pod to pod Aggregate results: %v", err) + } + tad.Stats = append(tad.Stats, res) + case aggtadpod2svcQuery: + res := v1alpha1.ThroughputAnomalyDetectorStats{} + err := rows.Scan(&res.Id, &res.SourcePodNamespace, &res.SourcePodLabels, &res.DestinationServicePortName, &res.FlowEndSeconds, &res.Throughput, &res.AggType, &res.AlgoType, &res.AlgoCalc, &res.Anomaly) + if err != nil { + return fmt.Errorf("failed to scan Throughput Anomaly Detector pod to svc Aggregate results: %v", err) + } + tad.Stats = append(tad.Stats, res) } - tad.Stats = append(tad.Stats, res) } return nil } diff --git a/pkg/apiserver/registry/intelligence/throughputanomalydetector/rest_test.go b/pkg/apiserver/registry/intelligence/throughputanomalydetector/rest_test.go index e17c0fb4..72f9ff59 100644 --- a/pkg/apiserver/registry/intelligence/throughputanomalydetector/rest_test.go +++ b/pkg/apiserver/registry/intelligence/throughputanomalydetector/rest_test.go @@ -18,6 +18,7 @@ import ( "context" "database/sql" "fmt" + "regexp" "testing" "github.com/DATA-DOG/go-sqlmock" @@ -29,7 +30,7 @@ import ( "k8s.io/client-go/kubernetes" crdv1alpha1 "antrea.io/theia/pkg/apis/crd/v1alpha1" - anomalydetector "antrea.io/theia/pkg/apis/intelligence/v1alpha1" + "antrea.io/theia/pkg/apis/intelligence/v1alpha1" ) type fakeQuerier struct{} @@ -40,24 +41,24 @@ func TestREST_Get(t *testing.T) { name string tadName string expectErr error - expectResult *anomalydetector.ThroughputAnomalyDetector + expectResult *v1alpha1.ThroughputAnomalyDetector }{ { name: "Not Found case", tadName: "non-existent-tad", - expectErr: errors.NewNotFound(anomalydetector.Resource("throughputanomalydetectors"), "non-existent-tad"), + expectErr: errors.NewNotFound(v1alpha1.Resource("throughputanomalydetectors"), "non-existent-tad"), expectResult: nil, }, { name: "Successful Get case", - tadName: "tad-2", + tadName: "tad-1", expectErr: nil, - expectResult: &anomalydetector.ThroughputAnomalyDetector{ + expectResult: &v1alpha1.ThroughputAnomalyDetector{ Type: "TAD", - Status: anomalydetector.ThroughputAnomalyDetectorStatus{ + Status: v1alpha1.ThroughputAnomalyDetectorStatus{ State: crdv1alpha1.ThroughputAnomalyDetectorStateCompleted, }, - Stats: []anomalydetector.ThroughputAnomalyDetectorStats{{ + Stats: []v1alpha1.ThroughputAnomalyDetectorStats{{ Id: "mock_Id", SourceIP: "mock_SourceIP", SourceTransportPort: "mock_SourceTransportPort", @@ -66,6 +67,8 @@ func TestREST_Get(t *testing.T) { FlowStartSeconds: "mock_FlowStartSeconds", FlowEndSeconds: "mock_FlowEndSeconds", Throughput: "mock_Throughput", + AggType: "mock_AggType", + AlgoType: "mock_AlgoType", AlgoCalc: "mock_AlgoCalc", Anomaly: "mock_Anomaly", }}, @@ -80,10 +83,9 @@ func TestREST_Get(t *testing.T) { } defer db.Close() resultRows := sqlmock.NewRows([]string{ - "Id", "SourceIP", "SourceTransportPort", "DestinationIP", "DestinationTransportPort", "FlowStartSeconds", "FlowEndSeconds", "Throughput", "AlgoCalc", "Anomaly"}). - AddRow("mock_Id", "mock_SourceIP", "mock_SourceTransportPort", "mock_DestinationIP", "mock_DestinationTransportPort", "mock_FlowStartSeconds", "mock_FlowEndSeconds", "mock_Throughput", "mock_AlgoCalc", "mock_Anomaly") - mock.ExpectQuery("SELECT id, sourceIP, sourceTransportPort, destinationIP, destinationTransportPort, flowStartSeconds, flowEndSeconds, throughput, algoCalc, anomaly FROM tadetector WHERE id = (?);").WillReturnRows(resultRows) - + "Id", "SourceIP", "SourceTransportPort", "DestinationIP", "DestinationTransportPort", "FlowStartSeconds", "FlowEndSeconds", "Throughput", "AggType", "AlgoType", "AlgoCalc", "Anomaly"}). + AddRow("mock_Id", "mock_SourceIP", "mock_SourceTransportPort", "mock_DestinationIP", "mock_DestinationTransportPort", "mock_FlowStartSeconds", "mock_FlowEndSeconds", "mock_Throughput", "mock_AggType", "mock_AlgoType", "mock_AlgoCalc", "mock_Anomaly") + mock.ExpectQuery(queryMap[tadQuery]).WillReturnRows(resultRows) setupClickHouseConnection = func(client kubernetes.Interface) (connect *sql.DB, err error) { return db, nil } @@ -91,7 +93,7 @@ func TestREST_Get(t *testing.T) { tad, err := r.Get(context.TODO(), tt.tadName, &v1.GetOptions{}) assert.Equal(t, err, tt.expectErr) if tad != nil { - assert.Equal(t, tt.expectResult, tad.(*anomalydetector.ThroughputAnomalyDetector)) + assert.Equal(t, tt.expectResult, tad.(*v1alpha1.ThroughputAnomalyDetector)) } else { assert.Nil(t, tt.expectResult) } @@ -141,7 +143,7 @@ func TestREST_Create(t *testing.T) { }, { name: "Job already exists case", - obj: &anomalydetector.ThroughputAnomalyDetector{ + obj: &v1alpha1.ThroughputAnomalyDetector{ TypeMeta: v1.TypeMeta{}, ObjectMeta: v1.ObjectMeta{Name: "existent-tad"}, }, @@ -150,7 +152,7 @@ func TestREST_Create(t *testing.T) { }, { name: "Successful Create case", - obj: &anomalydetector.ThroughputAnomalyDetector{ + obj: &v1alpha1.ThroughputAnomalyDetector{ TypeMeta: v1.TypeMeta{}, ObjectMeta: v1.ObjectMeta{Name: "non-existent-tad"}, }, @@ -171,11 +173,11 @@ func TestREST_Create(t *testing.T) { func TestREST_List(t *testing.T) { tests := []struct { name string - expectResult []anomalydetector.ThroughputAnomalyDetector + expectResult []v1alpha1.ThroughputAnomalyDetector }{ { name: "Successful List case", - expectResult: []anomalydetector.ThroughputAnomalyDetector{ + expectResult: []v1alpha1.ThroughputAnomalyDetector{ {ObjectMeta: v1.ObjectMeta{Name: "tad-1"}}, {ObjectMeta: v1.ObjectMeta{Name: "tad-2"}}, }, @@ -186,13 +188,142 @@ func TestREST_List(t *testing.T) { r := NewREST(&fakeQuerier{}) itemList, err := r.List(context.TODO(), &internalversion.ListOptions{}) assert.NoError(t, err) - tadList, ok := itemList.(*anomalydetector.ThroughputAnomalyDetectorList) + tadList, ok := itemList.(*v1alpha1.ThroughputAnomalyDetectorList) assert.True(t, ok) assert.ElementsMatch(t, tt.expectResult, tadList.Items) }) } } +func Test_getTadetectorResult(t *testing.T) { + tests := []struct { + name string + id string + query int + returnedRow *sqlmock.Rows + expectedResult *v1alpha1.ThroughputAnomalyDetector + expecterr error + }{ + { + name: "Get tadquery result", + id: "tad-1", + query: tadQuery, + returnedRow: sqlmock.NewRows([]string{ + "Id", "SourceIP", "SourceTransportPort", "DestinationIP", "DestinationTransportPort", "FlowStartSeconds", "FlowEndSeconds", "Throughput", "AggType", "AlgoType", "AlgoCalc", "Anomaly"}). + AddRow("mock_Id", "mock_SourceIP", "mock_SourceTransportPort", "mock_DestinationIP", "mock_DestinationTransportPort", "mock_FlowStartSeconds", "mock_FlowEndSeconds", "mock_Throughput", "mock_AggType", "mock_AlgoType", "mock_AlgoCalc", "mock_Anomaly"), + expectedResult: &v1alpha1.ThroughputAnomalyDetector{ + Stats: []v1alpha1.ThroughputAnomalyDetectorStats{{ + Id: "mock_Id", + SourceIP: "mock_SourceIP", + SourceTransportPort: "mock_SourceTransportPort", + DestinationIP: "mock_DestinationIP", + DestinationTransportPort: "mock_DestinationTransportPort", + FlowStartSeconds: "mock_FlowStartSeconds", + FlowEndSeconds: "mock_FlowEndSeconds", + Throughput: "mock_Throughput", + AggType: "mock_AggType", + AlgoType: "mock_AlgoType", + AlgoCalc: "mock_AlgoCalc", + Anomaly: "mock_Anomaly", + }}, + }, + expecterr: nil, + }, + { + name: "Get aggtadquery pod2external result", + id: "tad-2", + query: aggtadpodQuery, + returnedRow: sqlmock.NewRows([]string{ + "Id", "SourcePodNamespace", "SourcePodLabels", "FlowEndSeconds", "Throughput", "AggType", "AlgoType", "AlgoCalc", "Anomaly"}). + AddRow("mock_Id", "mock_SourcePodNamespace", "mock_SourcePodLabels", "mock_FlowEndSeconds", "mock_Throughput", "mock_AggType", "mock_AlgoType", "mock_AlgoCalc", "mock_Anomaly"), + expectedResult: &v1alpha1.ThroughputAnomalyDetector{ + Stats: []v1alpha1.ThroughputAnomalyDetectorStats{{ + Id: "mock_Id", + SourcePodNamespace: "mock_SourcePodNamespace", + SourcePodLabels: "mock_SourcePodLabels", + FlowEndSeconds: "mock_FlowEndSeconds", + Throughput: "mock_Throughput", + AggType: "mock_AggType", + AlgoType: "mock_AlgoType", + AlgoCalc: "mock_AlgoCalc", + Anomaly: "mock_Anomaly", + }}, + }, + expecterr: nil, + }, + { + name: "Get aggtadquery pod2pod result", + id: "tad-3", + query: aggtadpod2podQuery, + returnedRow: sqlmock.NewRows([]string{ + "Id", "SourcePodNamespace", "SourcePodLabels", "DestinationPodNamespace", "DestinationPodLabels", "FlowEndSeconds", "Throughput", "AggType", "AlgoType", "AlgoCalc", "Anomaly"}). + AddRow("mock_Id", "mock_SourcePodNamespace", "mock_SourcePodLabels", "mock_DestinationPodNamespace", "mock_DestinationPodLabels", "mock_FlowEndSeconds", "mock_Throughput", "mock_AggType", "mock_AlgoType", "mock_AlgoCalc", "mock_Anomaly"), + expectedResult: &v1alpha1.ThroughputAnomalyDetector{ + Stats: []v1alpha1.ThroughputAnomalyDetectorStats{{ + Id: "mock_Id", + SourcePodNamespace: "mock_SourcePodNamespace", + SourcePodLabels: "mock_SourcePodLabels", + DestinationPodNamespace: "mock_DestinationPodNamespace", + DestinationPodLabels: "mock_DestinationPodLabels", + FlowEndSeconds: "mock_FlowEndSeconds", + Throughput: "mock_Throughput", + AggType: "mock_AggType", + AlgoType: "mock_AlgoType", + AlgoCalc: "mock_AlgoCalc", + Anomaly: "mock_Anomaly", + }}, + }, + expecterr: nil, + }, + { + name: "Get aggtadquery pod2svc result", + id: "tad-2", + query: aggtadpod2svcQuery, + returnedRow: sqlmock.NewRows([]string{ + "Id", "SourcePodNamespace", "SourcePodLabels", "DestinationServicePortName", "FlowEndSeconds", "Throughput", "AggType", "AlgoType", "AlgoCalc", "Anomaly"}). + AddRow("mock_Id", "mock_SourcePodNamespace", "mock_SourcePodLabels", "mock_DestinationServicePortName", "mock_FlowEndSeconds", "mock_Throughput", "mock_AggType", "mock_AlgoType", "mock_AlgoCalc", "mock_Anomaly"), + expectedResult: &v1alpha1.ThroughputAnomalyDetector{ + Stats: []v1alpha1.ThroughputAnomalyDetectorStats{{ + Id: "mock_Id", + SourcePodNamespace: "mock_SourcePodNamespace", + SourcePodLabels: "mock_SourcePodLabels", + DestinationServicePortName: "mock_DestinationServicePortName", + FlowEndSeconds: "mock_FlowEndSeconds", + Throughput: "mock_Throughput", + AggType: "mock_AggType", + AlgoType: "mock_AlgoType", + AlgoCalc: "mock_AlgoCalc", + Anomaly: "mock_Anomaly", + }}, + }, + expecterr: nil, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + db, mock, err := sqlmock.New() + assert.NoError(t, err) + mock.ExpectQuery(regexp.QuoteMeta(queryMap[tt.query])).WillReturnRows(tt.returnedRow) + setupClickHouseConnection = func(client kubernetes.Interface) (connect *sql.DB, err error) { + return db, nil + } + r := NewREST(&fakeQuerier{}) + var tad v1alpha1.ThroughputAnomalyDetector + switch tt.query { + case aggtadpod2podQuery: + tad.AggregatedFlow = "pod2pod" + case aggtadpodQuery: + tad.AggregatedFlow = "pod" + case aggtadpod2svcQuery: + tad.AggregatedFlow = "pod2svc" + } + err = r.getTADetectorResult(tt.id, &tad) + assert.Equal(t, tt.expecterr, err) + assert.Equal(t, tt.expectedResult.Stats, tad.Stats) + }) + } +} + func (c *fakeQuerier) GetThroughputAnomalyDetector(namespace, name string) (*crdv1alpha1.ThroughputAnomalyDetector, error) { if name == "non-existent-tad" { return nil, fmt.Errorf("not found") diff --git a/pkg/controller/anomalydetector/controller.go b/pkg/controller/anomalydetector/controller.go index aa3319bd..0e7c7de1 100644 --- a/pkg/controller/anomalydetector/controller.go +++ b/pkg/controller/anomalydetector/controller.go @@ -525,7 +525,7 @@ func (c *AnomalyDetectorController) startJob(newTAD *crdv1alpha1.ThroughputAnoma func (c *AnomalyDetectorController) startSparkApplication(newTAD *crdv1alpha1.ThroughputAnomalyDetector) error { var newTADJobArgs []string if newTAD.Spec.JobType != "EWMA" && newTAD.Spec.JobType != "ARIMA" && newTAD.Spec.JobType != "DBSCAN" { - return illeagelArguementError{fmt.Errorf("invalid request: Throughput Anomaly DetectorQuerier type should be 'EWMA' or 'ARIMA' or 'DBSCAN'")} + return illeagelArguementError{fmt.Errorf("invalid request: Throughput Anomaly Detector algorithm type should be 'EWMA' or 'ARIMA' or 'DBSCAN'")} } newTADJobArgs = append(newTADJobArgs, "--algo", newTAD.Spec.JobType) @@ -546,6 +546,21 @@ func (c *AnomalyDetectorController) startSparkApplication(newTAD *crdv1alpha1.Th newTADJobArgs = append(newTADJobArgs, "--ns_ignore_list", nsIgnoreListStr) } + if newTAD.Spec.AggregatedFlow != "" { + if newTAD.Spec.AggregatedFlow == "pod" || newTAD.Spec.AggregatedFlow == "pod2pod" || newTAD.Spec.AggregatedFlow == "pod2svc" { + newTADJobArgs = append(newTADJobArgs, "--agg_flow", newTAD.Spec.AggregatedFlow) + } else { + return illeagelArguementError{fmt.Errorf("invalid request: Throughput Anomaly Detector aggregated flow type should be 'pod' or 'pod2pod' or 'pod2svc'")} + } + } + + if newTAD.Spec.Pod2PodLabel != "" { + if newTAD.Spec.AggregatedFlow != "pod2pod" { + return illeagelArguementError{fmt.Errorf("invalid request: Throughput Anomaly Detector Pod2PodLabel requires aggregated flow type to be 'pod2pod'")} + } + newTADJobArgs = append(newTADJobArgs, "--pod2pod_label", newTAD.Spec.Pod2PodLabel) + } + sparkResourceArgs := struct { executorInstances int32 driverCoreRequest string diff --git a/pkg/controller/anomalydetector/controller_test.go b/pkg/controller/anomalydetector/controller_test.go index 9019c0da..5cc7bd8d 100644 --- a/pkg/controller/anomalydetector/controller_test.go +++ b/pkg/controller/anomalydetector/controller_test.go @@ -72,7 +72,7 @@ func newFakeController(t *testing.T) (*fakeController, *sql.DB) { tadController := NewAnomalyDetectorController(crdClient, kubeClient, taDetectorInformer) mock.ExpectQuery("SELECT DISTINCT id FROM tadetector;").WillReturnRows(sqlmock.NewRows([]string{})) - mock.ExpectExec("ALTER TABLE tadetector_local ON CLUSTER '{cluster}' DELETE WHERE id = (?);").WithArgs(tadName[3:]).WillReturnResult(sqlmock.NewResult(0, 1)) + mock.ExpectExec("ALTER TABLE tadetector_local ON CLUSTER '{cluster}' DELETE WHERE id = (?);").WithArgs(tadName[4:]).WillReturnResult(sqlmock.NewResult(0, 1)) return &fakeController{ tadController, crdClient, @@ -240,6 +240,8 @@ func TestTADetection(t *testing.T) { StartInterval: metav1.NewTime(time.Now()), EndInterval: metav1.NewTime(time.Now().Add(time.Second * 100)), NSIgnoreList: []string{"kube-system", "flow-visibility"}, + AggregatedFlow: "pod2pod", + Pod2PodLabel: "app:label", }, Status: crdv1alpha1.ThroughputAnomalyDetectorStatus{}, } @@ -299,7 +301,7 @@ func TestTADetection(t *testing.T) { JobType: "nonexistent-job-type", }, }, - expectedErrorMsg: "invalid request: Throughput Anomaly DetectorQuerier type should be 'EWMA' or 'ARIMA' or 'DBSCAN'", + expectedErrorMsg: "invalid request: Throughput Anomaly Detector algorithm type should be 'EWMA' or 'ARIMA' or 'DBSCAN'", }, { name: "invalid EndInterval", @@ -384,6 +386,31 @@ func TestTADetection(t *testing.T) { }, expectedErrorMsg: "invalid request: ExecutorMemory should conform to the Kubernetes resource quantity convention", }, + { + name: "invalid Aggregatedflow", + tadName: "tad-invalid-agg-flow", + tad: &crdv1alpha1.ThroughputAnomalyDetector{ + ObjectMeta: metav1.ObjectMeta{Name: "tad-invalid-agg-flow", Namespace: testNamespace}, + Spec: crdv1alpha1.ThroughputAnomalyDetectorSpec{ + JobType: "ARIMA", + AggregatedFlow: "nonexistent-agg-flow", + }, + }, + expectedErrorMsg: "invalid request: Throughput Anomaly Detector aggregated flow type should be 'pod' or 'pod2pod' or 'pod2svc'", + }, + { + name: "invalid Aggregatedflow pod2podlabel combo", + tadName: "tad-invalid-agg-flow-pod2podlabel-combo", + tad: &crdv1alpha1.ThroughputAnomalyDetector{ + ObjectMeta: metav1.ObjectMeta{Name: "tad-invalid-agg-flow-pod2podlabel-combo", Namespace: testNamespace}, + Spec: crdv1alpha1.ThroughputAnomalyDetectorSpec{ + JobType: "ARIMA", + AggregatedFlow: "pod", + Pod2PodLabel: "app:label", + }, + }, + expectedErrorMsg: "invalid request: Throughput Anomaly Detector Pod2PodLabel requires aggregated flow type to be 'pod2pod'", + }, } for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { diff --git a/pkg/theia/commands/anomaly_detection_delete.go b/pkg/theia/commands/anomaly_detection_delete.go index 6186f489..aaabac8b 100644 --- a/pkg/theia/commands/anomaly_detection_delete.go +++ b/pkg/theia/commands/anomaly_detection_delete.go @@ -17,6 +17,7 @@ package commands import ( "context" "fmt" + "strings" "github.com/spf13/cobra" @@ -26,13 +27,15 @@ import ( // anomalyDetectionDeleteCmd represents the anomaly detection delete command var anomalyDetectionDeleteCmd = &cobra.Command{ Use: "delete", - Short: "Delete a anomaly detection job", - Long: `Delete a anomaly detection job by Name.`, + Short: "Delete anomaly detection job", + Long: `Delete anomaly detection job by Name.`, Aliases: []string{"del"}, Args: cobra.RangeArgs(0, 1), Example: ` Delete the anomaly detection job with Name tad-e998433e-accb-4888-9fc8-06563f073e86 $ theia throughput-anomaly-detection delete tad-e998433e-accb-4888-9fc8-06563f073e86 +Delete multiple anomaly detection jobs using "" with space separated tad ids +$ theia throughput-anomaly-detection delete "tad-e998433e-accb-4888-9fc8-06563f073e86 tad-1234abcd-1234-abcd-12ab-12345678abcd" `, RunE: anomalyDetectionDelete, } @@ -45,7 +48,28 @@ func anomalyDetectionDelete(cmd *cobra.Command, args []string) error { if tadName == "" && len(args) == 1 { tadName = args[0] } - err = util.ParseADAlgorithmID(tadName) + tadNameList := strings.Fields(tadName) + for _, tadName := range tadNameList { + err = deleteTADid(cmd, tadName) + if err != nil { + return err + } + } + return nil +} + +func init() { + throughputanomalyDetectionCmd.AddCommand(anomalyDetectionDeleteCmd) + anomalyDetectionDeleteCmd.Flags().StringP( + "name", + "", + "", + "Name of the anomaly detection job.", + ) +} + +func deleteTADid(cmd *cobra.Command, tadName string) error { + err := util.ParseADAlgorithmID(tadName) if err != nil { return err } @@ -72,13 +96,3 @@ func anomalyDetectionDelete(cmd *cobra.Command, args []string) error { fmt.Printf("Successfully deleted anomaly detection job with name: %s\n", tadName) return nil } - -func init() { - throughputanomalyDetectionCmd.AddCommand(anomalyDetectionDeleteCmd) - anomalyDetectionDeleteCmd.Flags().StringP( - "name", - "", - "", - "Name of the anomaly detection job.", - ) -} diff --git a/pkg/theia/commands/anomaly_detection_retrieve.go b/pkg/theia/commands/anomaly_detection_retrieve.go index b2573bc1..73ef328d 100644 --- a/pkg/theia/commands/anomaly_detection_retrieve.go +++ b/pkg/theia/commands/anomaly_detection_retrieve.go @@ -106,9 +106,27 @@ func throughputAnomalyDetectionRetrieve(cmd *cobra.Command, args []string) error return nil } else { w := tabwriter.NewWriter(os.Stdout, 15, 8, 1, '\t', tabwriter.AlignRight) - fmt.Fprintf(w, "id\tsourceIP\tsourceTransportPort\tdestinationIP\tdestinationTransportPort\tflowStartSeconds\tflowEndSeconds\tthroughput\talgoCalc\tanomaly\n") - for _, p := range tad.Stats { - fmt.Fprintf(w, "%v\t%v\t%v\t%v\t%v\t%v\t%v\t%v\t%v\t%v\n", p.Id, p.SourceIP, p.SourceTransportPort, p.DestinationIP, p.DestinationTransportPort, p.FlowStartSeconds, p.FlowEndSeconds, p.Throughput, p.AlgoCalc, p.Anomaly) + switch tad.Stats[0].AggType { + case "e2e": + fmt.Fprintf(w, "id\tsourceIP\tsourceTransportPort\tdestinationIP\tdestinationTransportPort\tflowStartSeconds\tflowEndSeconds\tthroughput\taggType\talgoType\talgoCalc\tanomaly\n") + for _, p := range tad.Stats { + fmt.Fprintf(w, "%v\t%v\t%v\t%v\t%v\t%v\t%v\t%v\t%v\t%v\t%v\t%v\n", p.Id, p.SourceIP, p.SourceTransportPort, p.DestinationIP, p.DestinationTransportPort, p.FlowStartSeconds, p.FlowEndSeconds, p.Throughput, p.AggType, p.AlgoType, p.AlgoCalc, p.Anomaly) + } + case "pod_to_external": + fmt.Fprintf(w, "id\tsourcePodNamespace\tsourcePodLabels\tflowEndSeconds\tthroughput\taggType\talgoType\talgoCalc\tanomaly\n") + for _, p := range tad.Stats { + fmt.Fprintf(w, "%v\t%v\t%v\t%v\t%v\t%v\t%v\t%v\t%v\n", p.Id, p.SourcePodNamespace, p.SourcePodLabels, p.FlowEndSeconds, p.Throughput, p.AggType, p.AlgoType, p.AlgoCalc, p.Anomaly) + } + case "pod_to_pod": + fmt.Fprintf(w, "id\tsourcePodNamespace\tsourcePodLabels\tdestinationPodNamespace\tdestinationPodLabels\tflowEndSeconds\tthroughput\taggType\talgoType\talgoCalc\tanomaly\n") + for _, p := range tad.Stats { + fmt.Fprintf(w, "%v\t%v\t%v\t%v\t%v\t%v\t%v\t%v\t%v\t%v\t%v\n", p.Id, p.SourcePodNamespace, p.SourcePodLabels, p.DestinationPodNamespace, p.DestinationPodLabels, p.FlowEndSeconds, p.Throughput, p.AggType, p.AlgoType, p.AlgoCalc, p.Anomaly) + } + case "pod_to_svc": + fmt.Fprintf(w, "id\tsourcePodNamespace\tsourcePodLabels\tdestinationServicePortName\tflowEndSeconds\tthroughput\taggType\talgoType\talgoCalc\tanomaly\n") + for _, p := range tad.Stats { + fmt.Fprintf(w, "%v\t%v\t%v\t%v\t%v\t%v\t%v\t%v\t%v\t%v\n", p.Id, p.SourcePodNamespace, p.SourcePodLabels, p.DestinationServicePortName, p.FlowEndSeconds, p.Throughput, p.AggType, p.AlgoType, p.AlgoCalc, p.Anomaly) + } } w.Flush() fmt.Printf("\n") diff --git a/pkg/theia/commands/anomaly_detection_retrieve_test.go b/pkg/theia/commands/anomaly_detection_retrieve_test.go index 6d65f2d6..9b99c6d8 100644 --- a/pkg/theia/commands/anomaly_detection_retrieve_test.go +++ b/pkg/theia/commands/anomaly_detection_retrieve_test.go @@ -45,7 +45,7 @@ func TestAnomalyDetectorRetrieve(t *testing.T) { filePath string }{ { - name: "Valid case", + name: "Valid case e2e", testServer: httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { switch strings.TrimSpace(r.URL.Path) { case fmt.Sprintf("/apis/intelligence.theia.antrea.io/v1alpha1/throughputanomalydetectors/%s", tadName): @@ -57,6 +57,7 @@ func TestAnomalyDetectorRetrieve(t *testing.T) { Id: "tad-1234abcd-1234-abcd-12ab-12345678abcd", Anomaly: "true", AlgoCalc: "1234567", + AggType: "e2e", }}, } w.Header().Set("Content-Type", "application/json") @@ -65,7 +66,82 @@ func TestAnomalyDetectorRetrieve(t *testing.T) { } })), tadName: "tad-1234abcd-1234-abcd-12ab-12345678abcd", - expectedMsg: []string{"1234567\t\ttrue"}, + expectedMsg: []string{"id sourceIP sourceTransportPort destinationIP destinationTransportPort flowStartSeconds flowEndSeconds throughput aggType algoType algoCalc anomaly", "e2e 1234567 true"}, + expectedErrorMsg: "", + }, + { + name: "Valid case pod_to_external", + testServer: httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + switch strings.TrimSpace(r.URL.Path) { + case fmt.Sprintf("/apis/intelligence.theia.antrea.io/v1alpha1/throughputanomalydetectors/%s", tadName): + tad := &anomalydetector.ThroughputAnomalyDetector{ + Status: anomalydetector.ThroughputAnomalyDetectorStatus{ + State: "COMPLETED", + }, + Stats: []anomalydetector.ThroughputAnomalyDetectorStats{{ + Id: "tad-1234abcd-1234-abcd-12ab-12345678abcd", + Anomaly: "true", + AlgoCalc: "1234567", + AggType: "pod_to_external", + }}, + } + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + json.NewEncoder(w).Encode(tad) + } + })), + tadName: "tad-1234abcd-1234-abcd-12ab-12345678abcd", + expectedMsg: []string{"id sourcePodNamespace sourcePodLabels flowEndSeconds throughput aggType algoType algoCalc anomaly", "pod_to_external 1234567 true"}, + expectedErrorMsg: "", + }, + { + name: "Valid case pod_to_pod", + testServer: httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + switch strings.TrimSpace(r.URL.Path) { + case fmt.Sprintf("/apis/intelligence.theia.antrea.io/v1alpha1/throughputanomalydetectors/%s", tadName): + tad := &anomalydetector.ThroughputAnomalyDetector{ + Status: anomalydetector.ThroughputAnomalyDetectorStatus{ + State: "COMPLETED", + }, + Stats: []anomalydetector.ThroughputAnomalyDetectorStats{{ + Id: "tad-1234abcd-1234-abcd-12ab-12345678abcd", + Anomaly: "true", + AlgoCalc: "1234567", + AggType: "pod_to_pod", + }}, + } + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + json.NewEncoder(w).Encode(tad) + } + })), + tadName: "tad-1234abcd-1234-abcd-12ab-12345678abcd", + expectedMsg: []string{"id sourcePodNamespace sourcePodLabels destinationPodNamespace destinationPodLabels flowEndSeconds throughput aggType algoType algoCalc anomaly", "pod_to_pod 1234567 true"}, + expectedErrorMsg: "", + }, + { + name: "Valid case pod_to_svc", + testServer: httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + switch strings.TrimSpace(r.URL.Path) { + case fmt.Sprintf("/apis/intelligence.theia.antrea.io/v1alpha1/throughputanomalydetectors/%s", tadName): + tad := &anomalydetector.ThroughputAnomalyDetector{ + Status: anomalydetector.ThroughputAnomalyDetectorStatus{ + State: "COMPLETED", + }, + Stats: []anomalydetector.ThroughputAnomalyDetectorStats{{ + Id: "tad-1234abcd-1234-abcd-12ab-12345678abcd", + Anomaly: "true", + AlgoCalc: "1234567", + AggType: "pod_to_svc", + }}, + } + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + json.NewEncoder(w).Encode(tad) + } + })), + tadName: "tad-1234abcd-1234-abcd-12ab-12345678abcd", + expectedMsg: []string{"id sourcePodNamespace sourcePodLabels destinationServicePortName flowEndSeconds throughput aggType algoType algoCalc anomaly", "pod_to_svc 1234567 true"}, expectedErrorMsg: "", }, { diff --git a/pkg/theia/commands/anomaly_detection_run.go b/pkg/theia/commands/anomaly_detection_run.go index 205f8ba5..7b72746e 100644 --- a/pkg/theia/commands/anomaly_detection_run.go +++ b/pkg/theia/commands/anomaly_detection_run.go @@ -151,6 +151,25 @@ be a list of namespace string, for example: '["kube-system","flow-aggregator","f } throughputAnomalyDetection.ExecutorMemory = executorMemory + aggregatedFlow, err := cmd.Flags().GetString("agg-flow") + if err != nil { + return err + } + if aggregatedFlow != "" { + throughputAnomalyDetection.AggregatedFlow = aggregatedFlow + } + + pod2podlabel, err := cmd.Flags().GetString("p2p-label") + if err != nil { + return err + } + if pod2podlabel != "" { + if aggregatedFlow != "pod2pod" { + return fmt.Errorf("pop2podlabel can only be mentioned with aggregatedFlow as pod2pod, instead found %v", aggregatedFlow) + } + throughputAnomalyDetection.Pod2PodLabel = pod2podlabel + } + tadID := uuid.New().String() throughputAnomalyDetection.Name = "tad-" + tadID throughputAnomalyDetection.Namespace = config.FlowVisibilityNS @@ -240,5 +259,14 @@ Example values include 0.1, 500m, 1.5, 5, etc.`, `Specify the memory request for the executor Pod. Values conform to the Kubernetes resource quantity convention. Example values include 512M, 1G, 8G, etc.`, ) - + throughputAnomalyDetectionAlgoCmd.Flags().String( + "agg-flow", + "", + `Specifies which aggregated flow to perform anomaly detection on, options are pods/pod2pod/pod2svc`, + ) + throughputAnomalyDetectionAlgoCmd.Flags().String( + "p2p-label", + "", + `On choosing agg-flow as pod2pod, user need to mention labels for inbound/outbound throughput`, + ) } diff --git a/pkg/theia/commands/anomaly_detection_run_test.go b/pkg/theia/commands/anomaly_detection_run_test.go index ee206eed..e2c8d47c 100644 --- a/pkg/theia/commands/anomaly_detection_run_test.go +++ b/pkg/theia/commands/anomaly_detection_run_test.go @@ -91,6 +91,8 @@ func TestAnomalyDetectionRun(t *testing.T) { cmd.Flags().String("start-time", "2006-01-02 15:04:05", "") cmd.Flags().String("end-time", "2006-01-03 15:04:05", "") cmd.Flags().String("ns-ignore-list", "[\"kube-system\",\"flow-aggregator\",\"flow-visibility\"]", "") + cmd.Flags().String("agg-flow", "pod2pod", "") + cmd.Flags().String("p2p-label", "app:label1", "") cmd.Flags().Int32("executor-instances", 1, "") cmd.Flags().String("driver-core-request", "1", "") cmd.Flags().String("driver-memory", "1m", "") diff --git a/plugins/anomaly-detection/anomaly_detection.py b/plugins/anomaly-detection/anomaly_detection.py index 56547299..9d074ce2 100644 --- a/plugins/anomaly-detection/anomaly_detection.py +++ b/plugins/anomaly-detection/anomaly_detection.py @@ -57,7 +57,18 @@ 'protocolIdentifier', 'flowStartSeconds', 'flowEndSeconds', - 'flowEndSeconds - flowStartSeconds as Diff_Secs', + 'max(throughput)' +] + +AGG_FLOW_TABLE_COLUMNS = [ + 'sourcePodNamespace', + 'sourcePodLabels', + 'destinationPodNamespace', + 'destinationPodLabels', + 'destinationServicePortName', + 'protocolIdentifier', + 'flowType', + 'flowEndSeconds', 'max(throughput)' ] @@ -71,15 +82,29 @@ 'flowStartSeconds' ] +DF_AGG_GRP_COLUMNS = [ + 'sourcePodNamespace', + 'sourcePodLabels', + 'destinationPodNamespace', + 'destinationPodLabels', + 'destinationServicePortName', + 'protocolIdentifier', + 'flowType', +] + +MEANINGLESS_LABELS = [ + "pod-template-hash", + "controller-revision-hash", + "pod-template-generation", +] + -def calculate_ewma(diff_secs_throughput): +def calculate_ewma(throughput_list): """ The function calculates Exponential Weighted Moving Average (EWMA) for a given list of throughput values of a connection Args: - diff_secs_throughput: Column of a dataframe containing difference in - seconds between connection start and current flow, along with its - throughput as a tuple + throughput_list: Column of a dataframe containing throughput Returns: A list of EWMA values calculated for the set of throughput values for that specific connection. @@ -88,8 +113,8 @@ def calculate_ewma(diff_secs_throughput): alpha = 0.5 # Can be changed and passed as UDF value later. prev_ewma_val = 0.0 ewma_row = [] - for ele in diff_secs_throughput: - ele_float = float(ele[1]) + for ele in throughput_list: + ele_float = float(ele[0]) curr_ewma_val = (1 - alpha) * prev_ewma_val + alpha * ele_float prev_ewma_val = curr_ewma_val ewma_row.append(float(curr_ewma_val)) @@ -113,8 +138,8 @@ def calculate_ewma_anomaly(dataframe): of that connection is anomalous or not. """ stddev = dataframe[7] - ewma_arr = dataframe[9] - throughput_arr = dataframe[10] + ewma_arr = dataframe[10] + throughput_arr = dataframe[11] anomaly_result = [] if ewma_arr is None: @@ -146,7 +171,7 @@ def calculate_ewma_anomaly(dataframe): return anomaly_result -def calculate_arima(diff_secs_throughput): +def calculate_arima(throughputs): """ The function calculates AutoRegressive Integrated Moving Average (ARIMA) for a given list of throughput values of a connection @@ -154,17 +179,15 @@ def calculate_arima(diff_secs_throughput): prediction, any connection with less than 3 flow records will not be taken into account for calculation. We return empty value in that case. Args: - diff_secs_throughput: Column of a dataframe containing difference - in seconds between connection start and current flow, - along with its throughput as a tuple + throughputs: Column of a dataframe containing throughput Returns: A list of ARIMA values calculated for the set of throughput values for that specific connection. """ throughput_list = [] - for ele in diff_secs_throughput: - throughput_list.append(float(ele[1])) + for ele in throughputs: + throughput_list.append(float(ele[0])) if len(throughput_list) <= 3: logger.error("Error: Too Few throughput values for ARIMA to work with") return None @@ -216,8 +239,8 @@ def calculate_arima_anomaly(dataframe): """ stddev = dataframe[7] - arima_arr = dataframe[9] - throughput_arr = dataframe[10] + arima_arr = dataframe[10] + throughput_arr = dataframe[11] anomaly_result = [] if arima_arr is None: @@ -247,7 +270,7 @@ def calculate_arima_anomaly(dataframe): return anomaly_result -def calculate_dbscan(diff_secs_throughput): +def calculate_dbscan(throughput_list): """ The function is a placeholder function as anomaly detection with DBSCAN only inputs the throughput values. However, in order to maintain @@ -255,7 +278,7 @@ def calculate_dbscan(diff_secs_throughput): """ # Currently just a placeholder function placeholder_throughput_list = [] - for i in range(len(diff_secs_throughput)): + for i in range(len(throughput_list)): placeholder_throughput_list.append(0.0) return placeholder_throughput_list @@ -275,7 +298,7 @@ def calculate_dbscan_anomaly(dataframe): of the connection is anomalous or not based on DBSCAN """ - throughput_list = dataframe[10] + throughput_list = dataframe[11] anomaly_result = [] np_throughput_list = np.array(throughput_list) np_throughput_list = np_throughput_list.reshape(-1, 1) @@ -289,22 +312,45 @@ def calculate_dbscan_anomaly(dataframe): return anomaly_result -def filter_df_with_true_anomalies(spark, plotDF, algo_type): - plotDF = plotDF.withColumn( - "new", f.arrays_zip( - "flowEndSeconds", "algoCalc", "throughputs", - "anomaly")).withColumn( - "new", f.explode("new")).select( - "sourceIP", "sourceTransportPort", "destinationIP", - "destinationTransportPort", - "protocolIdentifier", "flowStartSeconds", - f.col("new.flowEndSeconds").alias("flowEndSeconds"), - "throughputStandardDeviation", "algoType", - f.col("new.algoCalc").alias("algoCalc"), - f.col("new.throughputs").alias("throughput"), - f.col("new.anomaly").alias("anomaly")) +def filter_df_with_true_anomalies(spark, plotDF, algo_type, agg_flow=None): + if agg_flow: + plotDF = plotDF.withColumn( + "new", f.arrays_zip( + "flowEndSeconds", "algoCalc", "throughputs", + "anomaly")).withColumn( + "new", f.explode("new")).select( + "sourcePodNamespace", "sourcePodLabels", "destinationPodNamespace", + "destinationPodLabels", + "destinationServicePortName", "protocolIdentifier", "aggType", + f.col("new.flowEndSeconds").alias("flowEndSeconds"), + "throughputStandardDeviation", "algoType", + f.col("new.algoCalc").alias("algoCalc"), + f.col("new.throughputs").alias("throughput"), + f.col("new.anomaly").alias("anomaly")) + else: + plotDF = plotDF.withColumn( + "new", f.arrays_zip( + "flowEndSeconds", "algoCalc", "throughputs", + "anomaly")).withColumn( + "new", f.explode("new")).select( + "sourceIP", "sourceTransportPort", "destinationIP", + "destinationTransportPort", + "protocolIdentifier", "flowStartSeconds", "aggType", + f.col("new.flowEndSeconds").alias("flowEndSeconds"), + "throughputStandardDeviation", "algoType", + f.col("new.algoCalc").alias("algoCalc"), + f.col("new.throughputs").alias("throughput"), + f.col("new.anomaly").alias("anomaly")) ret_plot = plotDF.where(~plotDF.anomaly.isin([False])) if ret_plot.count() == 0: + ret_plot = ret_plot.collect() + agg_type = "e2e" + if agg_flow == "pod": + agg_type = "pod_to_external" + elif agg_flow == "pod2pod": + agg_type = "pod_to_pod" + elif agg_flow == "pod2svc": + agg_type = "pod_to_svc" ret_plot.append({ "sourceIP": 'None', "sourceTransportPort": 0, @@ -312,8 +358,14 @@ def filter_df_with_true_anomalies(spark, plotDF, algo_type): "destinationTransportPort": 0, "protocolIdentifier": 0, "flowStartSeconds": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), + "sourcePodNamespace": 'None', + "sourcePodLabels": 'None', + "destinationPodNamespace": 'None', + "destinationPodLabels": 'None', + "destinationServicePortName": 'None', "flowEndSeconds": 0, "throughputStandardDeviation": 0, + "aggType": agg_type, "algoType": algo_type, "algoCalc": 0.0, "throughput": 0.0, @@ -323,10 +375,10 @@ def filter_df_with_true_anomalies(spark, plotDF, algo_type): def plot_anomaly(spark, init_plot_df, algo_type, algo_func, anomaly_func, - tad_id_input): + tad_id_input, agg_flow=None): # Insert the Algo currently in use init_plot_df = init_plot_df.withColumn('algoType', f.lit(algo_type)) - # Common schema List + # Endpoint to Endpoint schema List common_schema_list = [ StructField('sourceIP', StringType(), True), StructField('sourceTransportPort', LongType(), True), @@ -338,34 +390,46 @@ def plot_anomaly(spark, init_plot_df, algo_type, algo_func, anomaly_func, StructField('throughputStandardDeviation', DoubleType(), True) ] + # Aggregated Schema List + agg_schema_list = [ + StructField('sourcePodNamespace', StringType(), True), + StructField('sourcePodLabels', StringType(), True), + StructField('destinationPodNamespace', StringType(), True), + StructField('destinationPodLabels', StringType(), True), + StructField('destinationServicePortName', StringType(), True), + StructField('protocolIdentifier', LongType(), True), + StructField('flowEndSeconds', ArrayType(TimestampType(), True)), + StructField('throughputStandardDeviation', DoubleType(), True) + ] + schema_list = agg_schema_list if agg_flow else common_schema_list + # Calculate anomaly Values on the DF algo_func_rdd = init_plot_df.rdd.map( lambda x: (x[0], x[1], x[2], x[3], x[4], x[5], - x[6], x[7], x[8], x[9], algo_func(x[8]))) + x[6], x[7], x[8], x[9], x[10], algo_func(x[8]))) # Schema for the Dataframe to be created from the RDD - algo_func_rdd_Schema = StructType(common_schema_list + [ - StructField('Diff_Secs, Throughput', ArrayType(StructType([ - StructField("Diff_Secs", LongType(), True), - StructField("max(throughput)", DecimalType(38, 18), True) - ]))), + algo_func_rdd_Schema = StructType(schema_list + [StructField( + 'Throughput', ArrayType(StructType( + [StructField("max(throughput)", DecimalType(38, 18), True)]))), + StructField('aggType', StringType(), True), StructField('algoType', StringType(), True), - StructField('algoCalc', ArrayType(DoubleType(), True)) - ]) + StructField('algoCalc', ArrayType(DoubleType(), True))]) algo_DF = spark.createDataFrame(algo_func_rdd, algo_func_rdd_Schema) algo_DF = algo_DF.withColumn("throughputs", f.col( - "Diff_Secs, Throughput.max(throughput)").cast( - ArrayType(DecimalType(38, 18)))).drop("Diff_Secs, Throughput") + "Throughput.max(throughput)").cast( + ArrayType(DecimalType(38, 18)))).drop("Throughput") # DF to RDD to calculate anomaly using EWMA, ARIMA and DBSCAN anomaly_func_rdd = algo_DF.rdd.map( lambda x: (x[0], x[1], x[2], x[3], x[4], x[5], - x[6], x[7], x[8], x[9], x[10], + x[6], x[7], x[8], x[9], x[10], x[11], anomaly_func(x))) # Schema for the Dataframe to be created from the RDD - anomaly_func_rdd_Schema = StructType(common_schema_list + [ + anomaly_func_rdd_Schema = StructType(schema_list + [ + StructField('aggType', StringType(), True), StructField('algoType', StringType(), True), StructField('algoCalc', ArrayType(DoubleType(), True)), StructField('throughputs', ArrayType(DecimalType(38, 18), True)), @@ -373,7 +437,8 @@ def plot_anomaly(spark, init_plot_df, algo_type, algo_func, anomaly_func, ]) anomalyDF = spark.createDataFrame(anomaly_func_rdd, anomaly_func_rdd_Schema) - ret_plotDF = filter_df_with_true_anomalies(spark, anomalyDF, algo_type) + ret_plotDF = filter_df_with_true_anomalies(spark, anomalyDF, algo_type, + agg_flow) # Write anomalous records to DB/CSV - Module WIP # Module to write to CSV. Optional. ret_plotDF = ret_plotDF.withColumn( @@ -383,9 +448,11 @@ def plot_anomaly(spark, init_plot_df, algo_type, algo_func, anomaly_func, return ret_plotDF -def generate_tad_sql_query(start_time, end_time, ns_ignore_list): - sql_query = ("SELECT {} FROM {} " - .format(", ".join(FLOW_TABLE_COLUMNS), table_name)) +def generate_tad_sql_query(start_time, end_time, ns_ignore_list, + agg_flow=None, pod2pod_label=None): + sql_query = ("SELECT {} FROM {} ".format( + ", ".join(AGG_FLOW_TABLE_COLUMNS if agg_flow else FLOW_TABLE_COLUMNS), + table_name)) sql_query_extension = [] if ns_ignore_list: sql_query_extension.append( @@ -397,17 +464,67 @@ def generate_tad_sql_query(start_time, end_time, ns_ignore_list): "flowStartSeconds >= '{}'".format(start_time)) if end_time: sql_query_extension.append("flowEndSeconds < '{}'".format(end_time)) + if agg_flow: + if agg_flow == "pod": + sql_query_extension.append("flowType = 3") + elif agg_flow == "pod2pod": + if pod2pod_label: + sql_query_extension.append( + "ilike(destinationPodLabels, '%{0}%') AND ilike(" + "sourcePodLabels, '%{0}%')".format(pod2pod_label)) + else: + sql_query_extension.append("destinationPodLabels <> '' AND " + "sourcePodLabels <> ''") + elif agg_flow == "pod2svc": + sql_query_extension.append("destinationServicePortName <> ''") + if sql_query_extension: sql_query += "WHERE " + " AND ".join(sql_query_extension) + " " sql_query += "GROUP BY {} ".format( - ", ".join(DF_GROUP_COLUMNS + ["flowEndSeconds"])) + ", ".join(DF_AGG_GRP_COLUMNS + [ + "flowEndSeconds"] if agg_flow else DF_GROUP_COLUMNS + [ + "flowEndSeconds"])) return sql_query +def assign_flow_type(prepared_DF, agg_flow=None): + if agg_flow == "pod": + prepared_DF = prepared_DF.withColumn('aggType', f.lit( + "pod_to_external")) + prepared_DF = prepared_DF.drop('flowType') + elif agg_flow == "pod2svc": + prepared_DF = prepared_DF.withColumn('aggType', f.lit("pod_to_svc")) + prepared_DF = prepared_DF.drop('flowType') + elif agg_flow == "pod2pod": + prepared_DF = prepared_DF.withColumn('aggType', f.lit("pod_to_pod")) + prepared_DF = prepared_DF.drop('flowType') + else: + prepared_DF = prepared_DF.withColumn('aggType', f.lit("e2e")) + return prepared_DF + + +def remove_meaningless_labels(podLabels): + try: + labels_dict = json.loads(podLabels) + except Exception as e: + logger.error( + "Error {}: labels {} are not in json format".format(e, podLabels) + ) + return "" + labels_dict = { + key: value + for key, value in labels_dict.items() + if key not in MEANINGLESS_LABELS + } + return json.dumps(labels_dict, sort_keys=True) + + def anomaly_detection(algo_type, db_jdbc_address, start_time, end_time, - tad_id_input, ns_ignore_list): + tad_id_input, ns_ignore_list, agg_flow=None, + pod2pod_label=None): spark = SparkSession.builder.getOrCreate() - sql_query = generate_tad_sql_query(start_time, end_time, ns_ignore_list) + sql_query = generate_tad_sql_query(start_time, end_time, ns_ignore_list, + agg_flow, pod2pod_label) initDF = ( spark.read.format("jdbc").option( 'driver', "ru.yandex.clickhouse.ClickHouseDriver").option( @@ -416,23 +533,43 @@ def anomaly_detection(algo_type, db_jdbc_address, start_time, end_time, "password", os.getenv("CH_PASSWORD")).option( "query", sql_query).load() ) + group_columns = DF_AGG_GRP_COLUMNS if agg_flow else DF_GROUP_COLUMNS - prepared_DF = initDF.groupby(DF_GROUP_COLUMNS).agg( + prepared_DF = initDF.groupby(group_columns).agg( f.collect_list("flowEndSeconds").alias("flowEndSeconds"), f.stddev_samp("max(throughput)").alias("throughputStandardDeviation"), - f.collect_list(f.struct(["Diff_Secs", "max(throughput)"])).alias( - "Diff_Secs, Throughput")) + f.collect_list(f.struct(["max(throughput)"])).alias("Throughput")) + + prepared_DF = assign_flow_type(prepared_DF, agg_flow) + if agg_flow: + prepared_DF = ( + prepared_DF.withColumn( + "sourcePodLabels", + f.udf(remove_meaningless_labels, StringType())( + "sourcePodLabels" + ), + ) + .withColumn( + "destinationPodLabels", + f.udf(remove_meaningless_labels, StringType())( + "destinationPodLabels" + ), + ) + .dropDuplicates(["sourcePodLabels", "destinationPodLabels"]) + ) if algo_type == "EWMA": ret_plot = plot_anomaly(spark, prepared_DF, algo_type, calculate_ewma, - calculate_ewma_anomaly, tad_id_input) + calculate_ewma_anomaly, tad_id_input, agg_flow) elif algo_type == "ARIMA": ret_plot = plot_anomaly(spark, prepared_DF, algo_type, calculate_arima, - calculate_arima_anomaly, tad_id_input) + calculate_arima_anomaly, tad_id_input, + agg_flow) elif algo_type == "DBSCAN": ret_plot = plot_anomaly(spark, prepared_DF, algo_type, calculate_dbscan, - calculate_dbscan_anomaly, tad_id_input) + calculate_dbscan_anomaly, tad_id_input, + agg_flow) return spark, ret_plot @@ -461,6 +598,8 @@ def main(): end_time = "" tad_id_input = None ns_ignore_list = [] + agg_flow = "" + pod2pod_label = "" help_message = """ Start the Throughput Anomaly Detection spark job. Options: @@ -484,13 +623,16 @@ def main(): If not specified, it will be generated automatically. -n, --ns_ignore_list=[]: List of namespaces to ignore in anomaly calculation. + -f, --agg_flow=None: Aggregated Flow Throughput Anomaly Detection. + -l, --pod2pod_label=None: Aggregated Flow Throughput Anomaly Detection + pod2pod using labels """ # TODO: change to use argparse instead of getopt for options try: opts, _ = getopt.getopt( sys.argv[1:], - "ht:d:s:e:i:n:", + "ht:d:s:e:i:n:f:l", [ "help", "algo=", @@ -498,7 +640,9 @@ def main(): "start_time=", "end_time=", "id=", - "ns_ignore_list=" + "ns_ignore_list=", + "agg_flow=", + "pod2pod_label=", ], ) except getopt.GetoptError as e: @@ -559,6 +703,10 @@ def main(): ns_ignore_list = arg_list elif opt in ("-i", "--id"): tad_id_input = arg + elif opt in ("-f", "--agg_flow"): + agg_flow = arg + elif opt in ("-l", "--pod2pod_label"): + pod2pod_label = arg func_start_time = time.time() logger.info("Script started at {}".format( @@ -569,7 +717,9 @@ def main(): start_time, end_time, tad_id_input, - ns_ignore_list + ns_ignore_list, + agg_flow, + pod2pod_label, ) func_end_time = time.time() tad_id = write_anomaly_detection_result( diff --git a/plugins/anomaly-detection/anomaly_detection_test.py b/plugins/anomaly-detection/anomaly_detection_test.py index 9d6f18dd..cea52a82 100644 --- a/plugins/anomaly-detection/anomaly_detection_test.py +++ b/plugins/anomaly-detection/anomaly_detection_test.py @@ -23,7 +23,7 @@ def spark_session(request): spark_session = ( SparkSession.builder.master("local") - .appName("anomlay_detection_job_test") + .appName("anomaly_detection_job_test") .getOrCreate() ) request.addfinalizer(lambda: spark_session.sparkContext.stop()) @@ -37,75 +37,117 @@ def spark_session(request): "test_input, expected_sql_query", [ ( - ("", "", []), - "SELECT {} FROM {} GROUP BY {} ".format( - ", ".join(ad.FLOW_TABLE_COLUMNS), - table_name, - ", ".join(ad.DF_GROUP_COLUMNS + ['flowEndSeconds']), - ), + ("", "", [], "", ""), + "SELECT {} FROM {} GROUP BY {} ".format( + ", ".join(ad.FLOW_TABLE_COLUMNS), + table_name, + ", ".join(ad.DF_GROUP_COLUMNS + ['flowEndSeconds']), + ), ), ( - ("2022-01-01 00:00:00", "", []), - "SELECT {} FROM {} WHERE " - "flowStartSeconds >= '2022-01-01 00:00:00' " - "GROUP BY {} ".format( - ", ".join(ad.FLOW_TABLE_COLUMNS), - table_name, - ", ".join(ad.DF_GROUP_COLUMNS + ['flowEndSeconds']), - ), + ("2022-01-01 00:00:00", "", [], "", ""), + "SELECT {} FROM {} WHERE " + "flowStartSeconds >= '2022-01-01 00:00:00' " + "GROUP BY {} ".format( + ", ".join(ad.FLOW_TABLE_COLUMNS), + table_name, + ", ".join(ad.DF_GROUP_COLUMNS + ['flowEndSeconds']), + ), ), ( - ("", "2022-01-01 23:59:59", []), - "SELECT {} FROM {} WHERE " - "flowEndSeconds < '2022-01-01 23:59:59' GROUP BY {} ".format( - ", ".join(ad.FLOW_TABLE_COLUMNS), - table_name, - ", ".join(ad.DF_GROUP_COLUMNS + ['flowEndSeconds']), - ), + ("", "2022-01-01 23:59:59", [], "", ""), + "SELECT {} FROM {} WHERE " + "flowEndSeconds < '2022-01-01 23:59:59' GROUP BY {} ".format( + ", ".join(ad.FLOW_TABLE_COLUMNS), + table_name, + ", ".join(ad.DF_GROUP_COLUMNS + ['flowEndSeconds']), + ), ), ( - ("2022-01-01 00:00:00", "2022-01-01 23:59:59", []), - "SELECT {} FROM {} WHERE " - "flowStartSeconds >= '2022-01-01 00:00:00' AND " - "flowEndSeconds < '2022-01-01 23:59:59' " - "GROUP BY {} ".format( - ", ".join(ad.FLOW_TABLE_COLUMNS), - table_name, - ", ".join(ad.DF_GROUP_COLUMNS + ['flowEndSeconds']), - ), + ("2022-01-01 00:00:00", "2022-01-01 23:59:59", [], "", ""), + "SELECT {} FROM {} WHERE " + "flowStartSeconds >= '2022-01-01 00:00:00' AND " + "flowEndSeconds < '2022-01-01 23:59:59' " + "GROUP BY {} ".format( + ", ".join(ad.FLOW_TABLE_COLUMNS), + table_name, + ", ".join(ad.DF_GROUP_COLUMNS + ['flowEndSeconds']), + ), ), ( - ("", "", ["mock_ns", "mock_ns2"]), - "SELECT {} FROM {} WHERE " - "sourcePodNamespace NOT IN ('mock_ns', 'mock_ns2') AND " - "destinationPodNamespace NOT IN ('mock_ns', 'mock_ns2') " - "GROUP BY {} ".format( - ", ".join(ad.FLOW_TABLE_COLUMNS), - table_name, - ", ".join(ad.DF_GROUP_COLUMNS + ['flowEndSeconds']), - ), + ("", "", ["mock_ns", "mock_ns2"], "", ""), + "SELECT {} FROM {} WHERE " + "sourcePodNamespace NOT IN ('mock_ns', 'mock_ns2') AND " + "destinationPodNamespace NOT IN ('mock_ns', 'mock_ns2') " + "GROUP BY {} ".format( + ", ".join(ad.FLOW_TABLE_COLUMNS), + table_name, + ", ".join(ad.DF_GROUP_COLUMNS + ['flowEndSeconds']), + ), + ), + ( + ("", "", [], "pod", ""), + "SELECT {} FROM {} WHERE " + "flowType = 3 " + "GROUP BY {} ".format( + ", ".join(ad.AGG_FLOW_TABLE_COLUMNS), + table_name, + ", ".join(ad.DF_AGG_GRP_COLUMNS + ['flowEndSeconds']), + ) + ), + ( + ("", "", [], "pod2pod", ""), + "SELECT {} FROM {} WHERE " + "destinationPodLabels <> '' AND sourcePodLabels <> '' " + "GROUP BY {} ".format( + ", ".join(ad.AGG_FLOW_TABLE_COLUMNS), + table_name, + ", ".join(ad.DF_AGG_GRP_COLUMNS + ['flowEndSeconds']), + ) + ), + ( + ("", "", [], "pod2pod", "app:label"), + "SELECT {} FROM {} WHERE " + "ilike(destinationPodLabels, '%app:label%') " + "AND ilike(sourcePodLabels, '%app:label%') " + "GROUP BY {} ".format( + ", ".join(ad.AGG_FLOW_TABLE_COLUMNS), + table_name, + ", ".join(ad.DF_AGG_GRP_COLUMNS + ['flowEndSeconds']), + ) + ), + ( + ("", "", [], "pod2svc", ""), + "SELECT {} FROM {} WHERE " + "destinationServicePortName <> '' " + "GROUP BY {} ".format( + ", ".join(ad.AGG_FLOW_TABLE_COLUMNS), + table_name, + ", ".join(ad.DF_AGG_GRP_COLUMNS + ['flowEndSeconds']), + ) ), ], ) def test_generate_sql_query(test_input, expected_sql_query): - start_time, end_time, ns_ignore_list = test_input - sql_query = ad.generate_tad_sql_query(start_time, end_time, ns_ignore_list) + start_time, end_time, ns_ignore_list, agg_flow, pod2podlabel = test_input + sql_query = ad.generate_tad_sql_query( + start_time, end_time, ns_ignore_list, agg_flow, pod2podlabel) assert sql_query == expected_sql_query diff_secs_throughput_list = [ - [5220, 4004471308], [4860, 4006917952], [6720, 4006373555], - [7080, 10004969097], [6000, 4005703059], [7140, 4005517222], - [3600, 4007380032], [3780, 4005277827], [4800, 4005435632], - [8580, 4004723289], [6300, 4005760579], [4740, 4005486294], - [8640, 4006172825], [5640, 4005486294], [8700, 4005561235], - [7200, 1005533779], [5340, 4005486294], [6600, 4004706899], - [6660, 4006355667], [5280, 4005277827], [4380, 4005277827], - [7920, 4005355097], [7560, 4005615814], [7500, 4004496934], - [8100, 4004839744], [4440, 4005486294], [7260, 4005370905], - [5580, 4005277827], [4680, 4005277827], [6360, 4006503308], - [8520, 4006191046], [6180, 4004834307], [5880, 4006201196], - [5760, 4004465468], [7860, 4006448435], [6780, 4005542681]] + [4004471308], [4006917952], [4006373555], + [10004969097], [4005703059], [4005517222], + [4007380032], [4005277827], [4005435632], + [4004723289], [4005760579], [4005486294], + [4006172825], [4005486294], [4005561235], + [1005533779], [4005486294], [4004706899], + [4006355667], [4005277827], [4005277827], + [4005355097], [4005615814], [4004496934], + [4004839744], [4005486294], [4005370905], + [4005277827], [4005277827], [4006503308], + [4006191046], [4004834307], [4006201196], + [4004465468], [4006448435], [4005542681]] expected_ewma_row_list = [ 2002235654.0, 3004576803.0, 3505475179.0, @@ -196,7 +238,7 @@ def test_calculate_arima(test_input, expected_output): @pytest.mark.parametrize( "test_input, expected_arima_anomaly", - [(["", "", "", "", "", "", "", stddev, "", expanded_arima_row_list, + [(["", "", "", "", "", "", "", stddev, "", "", expanded_arima_row_list, throughput_list], expected_anomaly_list), ], ) def test_calculate_arima_anomaly(test_input, expected_arima_anomaly): @@ -206,7 +248,7 @@ def test_calculate_arima_anomaly(test_input, expected_arima_anomaly): @pytest.mark.parametrize( "test_input, expected_ewma_anomaly", - [(["", "", "", "", "", "", "", stddev, "", expected_ewma_row_list, + [(["", "", "", "", "", "", "", stddev, "", "", expected_ewma_row_list, throughput_list], expected_anomaly_list), ], ) def test_calculate_ewma_anomaly(test_input, expected_ewma_anomaly): @@ -226,7 +268,7 @@ def test_calculate_ewma_anomaly(test_input, expected_ewma_anomaly): @pytest.mark.parametrize( "test_input, expected_dbscan_anomaly", - [(["", "", "", "", "", "", "", "", "", "", throughput_list], + [(["", "", "", "", "", "", "", "", "", "", "", throughput_list], expected_dbscan_anomaly_list), ], ) def test_calculate_dbscan_anomaly(test_input, expected_dbscan_anomaly): diff --git a/test/e2e/throughputanomalydetection_test.go b/test/e2e/throughputanomalydetection_test.go index a7c1006c..67e124cd 100644 --- a/test/e2e/throughputanomalydetection_test.go +++ b/test/e2e/throughputanomalydetection_test.go @@ -211,14 +211,14 @@ func testAnomalyDetectionRetrieve(t *testing.T, data *TestData, connect *sql.DB) if resultArray[i] != "" { resultArray[i] = strings.ReplaceAll(resultArray[i], "\t", " ") tadoutputArray := strings.Fields(resultArray[i]) - anomaly_output := tadoutputArray[9] - assert.Equal(10, len(tadoutputArray), "tadoutputArray: %s", tadoutputArray) + anomaly_output := tadoutputArray[11] + assert.Equal(12, len(tadoutputArray), "tadoutputArray: %s", tadoutputArray) switch algo { case "ARIMA": - algo_throughput := tadoutputArray[8][:5] + algo_throughput := tadoutputArray[10][:5] assert.Equal(result_map["ARIMA"][algo_throughput], anomaly_output, "Anomaly outputs dont match in tadoutputArray: %s", tadoutputArray) case "EWMA": - algo_throughput := tadoutputArray[8][:5] + algo_throughput := tadoutputArray[10][:5] assert.Equal(result_map["EWMA"][algo_throughput], anomaly_output, "Anomaly outputs dont match in tadoutputArray: %s", tadoutputArray) case "DBSCAN": throughput := tadoutputArray[7][:5]