diff --git a/build/charts/theia/crds/anomaly-detector-crd.yaml b/build/charts/theia/crds/anomaly-detector-crd.yaml index 1c895af6..b0c3babe 100644 --- a/build/charts/theia/crds/anomaly-detector-crd.yaml +++ b/build/charts/theia/crds/anomaly-detector-crd.yaml @@ -33,6 +33,18 @@ spec: type: array items: type: string + aggFlow: + type: string + podLabel: + type: string + externalIp: + type: string + podName: + type: string + podNameSpace: + type: string + servicePortName: + type: string executorInstances: type: integer driverCoreRequest: diff --git a/build/charts/theia/provisioning/datasources/create_table.sh b/build/charts/theia/provisioning/datasources/create_table.sh index 5d905c65..f8efb08f 100644 --- a/build/charts/theia/provisioning/datasources/create_table.sh +++ b/build/charts/theia/provisioning/datasources/create_table.sh @@ -367,8 +367,14 @@ clickhouse client -n -h 127.0.0.1 <<-EOSQL destinationTransportPort UInt16, protocolIdentifier UInt16, flowStartSeconds DateTime, + podNamespace String, + podLabels String, + podName String, + destinationServicePortName String, + direction String, flowEndSeconds DateTime, throughputStandardDeviation Float64, + aggType String, algoType String, algoCalc Float64, throughput Float64, diff --git a/build/charts/theia/provisioning/datasources/migrators/000005_0-6-0.down.sql b/build/charts/theia/provisioning/datasources/migrators/000005_0-6-0.down.sql index bdaa7022..a378c976 100644 --- a/build/charts/theia/provisioning/datasources/migrators/000005_0-6-0.down.sql +++ b/build/charts/theia/provisioning/datasources/migrators/000005_0-6-0.down.sql @@ -198,3 +198,17 @@ ALTER TABLE flows ALTER TABLE flows_local DROP COLUMN egressName, DROP COLUMN egressIP; +ALTER TABLE tadetector + DROP COLUMN podNamespace, + DROP COLUMN podLabels, + DROP COLUMN destinationServicePortName, + DROP COLUMN aggType, + DROP COLUMN direction, + DROP COLUMN podName; +ALTER TABLE tadetector_local + DROP COLUMN podNamespace, + DROP COLUMN podLabels, + DROP COLUMN destinationServicePortName, + DROP COLUMN aggType, + DROP COLUMN direction, + DROP COLUMN podName; diff --git a/build/charts/theia/provisioning/datasources/migrators/000005_0-6-0.up.sql b/build/charts/theia/provisioning/datasources/migrators/000005_0-6-0.up.sql index 28295388..b5ac3bcf 100644 --- a/build/charts/theia/provisioning/datasources/migrators/000005_0-6-0.up.sql +++ b/build/charts/theia/provisioning/datasources/migrators/000005_0-6-0.up.sql @@ -137,3 +137,17 @@ ALTER TABLE flows ALTER TABLE flows_local ADD COLUMN egressName String, ADD COLUMN egressIP String; +ALTER TABLE tadetector + ADD COLUMN podNamespace String, + ADD COLUMN podLabels String, + ADD COLUMN destinationServicePortName String, + ADD COLUMN aggType String, + ADD COLUMN direction String, + ADD COLUMN podName String; +ALTER TABLE tadetector_local + ADD COLUMN podNamespace String, + ADD COLUMN podLabels String, + ADD COLUMN destinationServicePortName String, + ADD COLUMN aggType String, + ADD COLUMN direction String, + ADD COLUMN podName String; diff --git a/build/yamls/flow-visibility.yml b/build/yamls/flow-visibility.yml index 6b2a28d2..61b0fcea 100644 --- a/build/yamls/flow-visibility.yml +++ b/build/yamls/flow-visibility.yml @@ -589,6 +589,20 @@ data: ALTER TABLE flows_local DROP COLUMN egressName, DROP COLUMN egressIP; + ALTER TABLE tadetector + DROP COLUMN podNamespace, + DROP COLUMN podLabels, + DROP COLUMN destinationServicePortName, + DROP COLUMN aggType, + DROP COLUMN direction, + DROP COLUMN podName; + ALTER TABLE tadetector_local + DROP COLUMN podNamespace, + DROP COLUMN podLabels, + DROP COLUMN destinationServicePortName, + DROP COLUMN aggType, + DROP COLUMN direction, + DROP COLUMN podName; 000005_0-6-0.up.sql: | -- Create underlying tables for Materialized Views to attach data CREATE TABLE IF NOT EXISTS pod_view_table_local ( @@ -729,6 +743,20 @@ data: ALTER TABLE flows_local ADD COLUMN egressName String, ADD COLUMN egressIP String; + ALTER TABLE tadetector + ADD COLUMN podNamespace String, + ADD COLUMN podLabels String, + ADD COLUMN destinationServicePortName String, + ADD COLUMN aggType String, + ADD COLUMN direction String, + ADD COLUMN podName String; + ALTER TABLE tadetector_local + ADD COLUMN podNamespace String, + ADD COLUMN podLabels String, + ADD COLUMN destinationServicePortName String, + ADD COLUMN aggType String, + ADD COLUMN direction String, + ADD COLUMN podName String; create_table.sh: | #!/usr/bin/env bash @@ -1090,8 +1118,14 @@ data: destinationTransportPort UInt16, protocolIdentifier UInt16, flowStartSeconds DateTime, + podNamespace String, + podLabels String, + podName String, + destinationServicePortName String, + direction String, flowEndSeconds DateTime, throughputStandardDeviation Float64, + aggType String, algoType String, algoCalc Float64, throughput Float64, diff --git a/ci/jenkins/test-vmc.sh b/ci/jenkins/test-vmc.sh index 51871073..07ef7a6a 100644 --- a/ci/jenkins/test-vmc.sh +++ b/ci/jenkins/test-vmc.sh @@ -313,7 +313,7 @@ function deliver_antrea { antrea_yml="antrea.yml" # Enable verbose log for troubleshooting. sed -i "s/--v=0/--v=4/g" $GIT_CHECKOUT_DIR/build/yamls/$antrea_yml - perl -i -p0e 's/ # feature, you need to set "enable" to true, and ensure that the FlowExporter\n # feature gate is also enabled.\n enable: false/ # feature, you need to set "enable" to true, and ensure that the FlowExporter\n # feature gate is also enabled.\n enable: true/' $TMP_DIR/antrea.yml + perl -i -p0e 's/ # feature, you need to set "enable" to true, and ensure that the FlowExporter\n # feature gate is also enabled.\n enable: false/ # feature, you need to set "enable" to true, and ensure that the FlowExporter\n # feature gate is also enabled.\n enable: true/' $GIT_CHECKOUT_DIR/build/yamls/antrea.yml sed -i -e "s/flowPollInterval: \"5s\"/flowPollInterval: \"1s\"/g" $GIT_CHECKOUT_DIR/build/yamls/$antrea_yml sed -i -e "s/activeFlowExportTimeout: \"5s\"/activeFlowExportTimeout: \"2s\"/g" $GIT_CHECKOUT_DIR/build/yamls/$antrea_yml sed -i -e "s/idleFlowExportTimeout: \"15s\"/idleFlowExportTimeout: \"1s\"/g" $GIT_CHECKOUT_DIR/build/yamls/$antrea_yml diff --git a/ci/kind/test-upgrade-theia.sh b/ci/kind/test-upgrade-theia.sh index 8e48e94d..65f7fd2a 100755 --- a/ci/kind/test-upgrade-theia.sh +++ b/ci/kind/test-upgrade-theia.sh @@ -255,7 +255,7 @@ popd rm -rf $TMP_THEIA_DIR rc=0 -go test -v -run=TestUpgrade antrea.io/theia/test/e2e -provider=kind --logs-export-dir=$ANTREA_LOG_DIR --upgrade.toVersion=$CURRENT_VERSION --upgrade.fromVersion=$THEIA_FROM_TAG || rc=$? +go test -v -timeout=15m -run=TestUpgrade antrea.io/theia/test/e2e -provider=kind --logs-export-dir=$ANTREA_LOG_DIR --upgrade.toVersion=$CURRENT_VERSION --upgrade.fromVersion=$THEIA_FROM_TAG || rc=$? $THIS_DIR/kind-setup.sh destroy kind diff --git a/pkg/apis/crd/v1alpha1/types.go b/pkg/apis/crd/v1alpha1/types.go index ff5ca0ce..b88a2c50 100644 --- a/pkg/apis/crd/v1alpha1/types.go +++ b/pkg/apis/crd/v1alpha1/types.go @@ -98,6 +98,12 @@ type ThroughputAnomalyDetectorSpec struct { StartInterval metav1.Time `json:"startInterval,omitempty"` EndInterval metav1.Time `json:"endInterval,omitempty"` NSIgnoreList []string `json:"nsIgnoreList,omitempty"` + AggregatedFlow string `json:"aggFlow,omitempty"` + PodLabel string `json:"podLabel,omitempty"` + PodName string `json:"podName,omitempty"` + PodNameSpace string `json:"podNameSpace,omitempty"` + ExternalIP string `json:"externalIp,omitempty"` + ServicePortName string `json:"servicePortName,omitempty"` ExecutorInstances int `json:"executorInstances,omitempty"` DriverCoreRequest string `json:"driverCoreRequest,omitempty"` DriverMemory string `json:"driverMemory,omitempty"` diff --git a/pkg/apis/intelligence/v1alpha1/types.go b/pkg/apis/intelligence/v1alpha1/types.go index c86045dc..a147b4bf 100644 --- a/pkg/apis/intelligence/v1alpha1/types.go +++ b/pkg/apis/intelligence/v1alpha1/types.go @@ -73,6 +73,12 @@ type ThroughputAnomalyDetector struct { EndInterval metav1.Time `json:"endInterval,omitempty"` ExecutorInstances int `json:"executorInstances,omitempty"` NSIgnoreList []string `json:"nsIgnoreList,omitempty"` + AggregatedFlow string `json:"aggFlow,omitempty"` + PodLabel string `json:"podLabel,omitempty"` + PodName string `json:"podName,omitempty"` + PodNameSpace string `json:"podNameSpace,omitempty"` + ExternalIP string `json:"externalIp,omitempty"` + ServicePortName string `json:"servicePortName,omitempty"` DriverCoreRequest string `json:"driverCoreRequest,omitempty"` DriverMemory string `json:"driverMemory,omitempty"` ExecutorCoreRequest string `json:"executorCoreRequest,omitempty"` @@ -100,14 +106,21 @@ type ThroughputAnomalyDetectorList struct { } type ThroughputAnomalyDetectorStats struct { - Id string `json:"id,omitempty"` - SourceIP string `json:"sourceIP,omitempty"` - SourceTransportPort string `json:"sourceTransportPort,omitempty"` - DestinationIP string `json:"destinationIP,omitempty"` - DestinationTransportPort string `json:"destinationTransportPort,omitempty"` - FlowStartSeconds string `json:"FlowStartSeconds,omitempty"` - FlowEndSeconds string `json:"FlowEndSeconds,omitempty"` - Throughput string `json:"Throughput,omitempty"` - AlgoCalc string `json:"AlgoCalc,omitempty"` - Anomaly string `json:"anomaly,omitempty"` + Id string `json:"id,omitempty"` + SourceIP string `json:"sourceIP,omitempty"` + SourceTransportPort string `json:"sourceTransportPort,omitempty"` + DestinationIP string `json:"destinationIP,omitempty"` + DestinationTransportPort string `json:"destinationTransportPort,omitempty"` + FlowStartSeconds string `json:"FlowStartSeconds,omitempty"` + PodNamespace string `json:"podNamespace,omitempty"` + PodLabels string `json:"podLabels,omitempty"` + PodName string `json:"podName,omitempty"` + Direction string `json:"direction,omitempty"` + DestinationServicePortName string `json:"destinationServicePortName,omitempty"` + FlowEndSeconds string `json:"FlowEndSeconds,omitempty"` + Throughput string `json:"throughput,omitempty"` + AggType string `json:"aggType,omitempty"` + AlgoType string `json:"algoType,omitempty"` + AlgoCalc string `json:"AlgoCalc,omitempty"` + Anomaly string `json:"anomaly,omitempty"` } diff --git a/pkg/apiserver/registry/intelligence/throughputanomalydetector/rest.go b/pkg/apiserver/registry/intelligence/throughputanomalydetector/rest.go index 96fbc3f2..345e76d8 100644 --- a/pkg/apiserver/registry/intelligence/throughputanomalydetector/rest.go +++ b/pkg/apiserver/registry/intelligence/throughputanomalydetector/rest.go @@ -34,6 +34,10 @@ import ( const ( defaultNameSpace = "flow-visibility" tadQuery int = iota + aggTadExternalQuery + aggTadPodLabelQuery + aggTadPodNameQuery + aggTadSvcQuery ) // REST implements rest.Storage for anomalydetector. @@ -54,7 +58,7 @@ var ( var queryMap = map[int]string{ tadQuery: ` - SELECT + SELECT id, sourceIP, sourceTransportPort, @@ -63,6 +67,56 @@ var queryMap = map[int]string{ flowStartSeconds, flowEndSeconds, throughput, + aggType, + algoType, + algoCalc, + anomaly + FROM tadetector WHERE id = (?);`, + aggTadExternalQuery: ` + SELECT + id, + destinationIP, + flowEndSeconds, + throughput, + aggType, + algoType, + algoCalc, + anomaly + FROM tadetector WHERE id = (?);`, + aggTadPodLabelQuery: ` + SELECT + id, + podNamespace, + podLabels, + direction, + flowEndSeconds, + throughput, + aggType, + algoType, + algoCalc, + anomaly + FROM tadetector WHERE id = (?);`, + aggTadPodNameQuery: ` + SELECT + id, + podNamespace, + podName, + direction, + flowEndSeconds, + throughput, + aggType, + algoType, + algoCalc, + anomaly + FROM tadetector WHERE id = (?);`, + aggTadSvcQuery: ` + SELECT + id, + destinationServicePortName, + flowEndSeconds, + throughput, + aggType, + algoType, algoCalc, anomaly FROM tadetector WHERE id = (?);`, @@ -102,6 +156,12 @@ func (r *REST) copyThroughputAnomalyDetector(tad *v1alpha1.ThroughputAnomalyDete tad.EndInterval = crd.Spec.EndInterval tad.ExecutorInstances = crd.Spec.ExecutorInstances tad.NSIgnoreList = crd.Spec.NSIgnoreList + tad.AggregatedFlow = crd.Spec.AggregatedFlow + tad.PodLabel = crd.Spec.PodLabel + tad.PodName = crd.Spec.PodName + tad.PodNameSpace = crd.Spec.PodNameSpace + tad.ExternalIP = crd.Spec.ExternalIP + tad.ServicePortName = crd.Spec.ServicePortName tad.DriverCoreRequest = crd.Spec.DriverCoreRequest tad.DriverMemory = crd.Spec.DriverMemory tad.ExecutorCoreRequest = crd.Spec.ExecutorCoreRequest @@ -170,6 +230,12 @@ func (r *REST) Create(ctx context.Context, obj runtime.Object, createValidation job.Spec.DriverMemory = newTAD.DriverMemory job.Spec.ExecutorCoreRequest = newTAD.ExecutorCoreRequest job.Spec.ExecutorMemory = newTAD.ExecutorMemory + job.Spec.AggregatedFlow = newTAD.AggregatedFlow + job.Spec.PodLabel = newTAD.PodLabel + job.Spec.PodName = newTAD.PodName + job.Spec.PodNameSpace = newTAD.PodNameSpace + job.Spec.ExternalIP = newTAD.ExternalIP + job.Spec.ServicePortName = newTAD.ServicePortName _, err := r.ThroughputAnomalyDetectorQuerier.CreateThroughputAnomalyDetector(defaultNameSpace, job) if err != nil { return nil, errors.NewBadRequest(fmt.Sprintf("error when creating ThroughputAnomalyDetection job: %+v, err: %v", job, err)) @@ -179,24 +245,68 @@ func (r *REST) Create(ctx context.Context, obj runtime.Object, createValidation func (r *REST) getTADetectorResult(id string, tad *v1alpha1.ThroughputAnomalyDetector) error { var err error + query := tadQuery + switch tad.AggregatedFlow { + case "external": + query = aggTadExternalQuery + case "pod": + if tad.PodName != "" { + query = aggTadPodNameQuery + } else { + query = aggTadPodLabelQuery + } + case "svc": + query = aggTadSvcQuery + } if r.clickhouseConnect == nil { r.clickhouseConnect, err = setupClickHouseConnection(nil) if err != nil { return err } } - rows, err := r.clickhouseConnect.Query(queryMap[tadQuery], id) + rows, err := r.clickhouseConnect.Query(queryMap[query], id) if err != nil { return fmt.Errorf("failed to get Throughput Anomaly Detector results with id %s: %v", id, err) } defer rows.Close() for rows.Next() { - res := v1alpha1.ThroughputAnomalyDetectorStats{} - err := rows.Scan(&res.Id, &res.SourceIP, &res.SourceTransportPort, &res.DestinationIP, &res.DestinationTransportPort, &res.FlowStartSeconds, &res.FlowEndSeconds, &res.Throughput, &res.AlgoCalc, &res.Anomaly) - if err != nil { - return fmt.Errorf("failed to scan Throughput Anomaly Detector results: %v", err) + switch query { + case tadQuery: + res := v1alpha1.ThroughputAnomalyDetectorStats{} + err := rows.Scan(&res.Id, &res.SourceIP, &res.SourceTransportPort, &res.DestinationIP, &res.DestinationTransportPort, &res.FlowStartSeconds, &res.FlowEndSeconds, &res.Throughput, &res.AggType, &res.AlgoType, &res.AlgoCalc, &res.Anomaly) + if err != nil { + return fmt.Errorf("failed to scan Throughput Anomaly Detector results: %v", err) + } + tad.Stats = append(tad.Stats, res) + case aggTadExternalQuery: + res := v1alpha1.ThroughputAnomalyDetectorStats{} + err := rows.Scan(&res.Id, &res.DestinationIP, &res.FlowEndSeconds, &res.Throughput, &res.AggType, &res.AlgoType, &res.AlgoCalc, &res.Anomaly) + if err != nil { + return fmt.Errorf("failed to scan Throughput Anomaly Detector External IP Aggregate results: %v", err) + } + tad.Stats = append(tad.Stats, res) + case aggTadPodLabelQuery: + res := v1alpha1.ThroughputAnomalyDetectorStats{} + err := rows.Scan(&res.Id, &res.PodNamespace, &res.PodLabels, &res.Direction, &res.FlowEndSeconds, &res.Throughput, &res.AggType, &res.AlgoType, &res.AlgoCalc, &res.Anomaly) + if err != nil { + return fmt.Errorf("failed to scan Throughput Anomaly Detector Pod Aggregate results: %v", err) + } + tad.Stats = append(tad.Stats, res) + case aggTadPodNameQuery: + res := v1alpha1.ThroughputAnomalyDetectorStats{} + err := rows.Scan(&res.Id, &res.PodNamespace, &res.PodName, &res.Direction, &res.FlowEndSeconds, &res.Throughput, &res.AggType, &res.AlgoType, &res.AlgoCalc, &res.Anomaly) + if err != nil { + return fmt.Errorf("failed to scan Throughput Anomaly Detector Pod Aggregate results: %v", err) + } + tad.Stats = append(tad.Stats, res) + case aggTadSvcQuery: + res := v1alpha1.ThroughputAnomalyDetectorStats{} + err := rows.Scan(&res.Id, &res.DestinationServicePortName, &res.FlowEndSeconds, &res.Throughput, &res.AggType, &res.AlgoType, &res.AlgoCalc, &res.Anomaly) + if err != nil { + return fmt.Errorf("failed to scan Throughput Anomaly Detector Service Aggregate results: %v", err) + } + tad.Stats = append(tad.Stats, res) } - tad.Stats = append(tad.Stats, res) } return nil } diff --git a/pkg/apiserver/registry/intelligence/throughputanomalydetector/rest_test.go b/pkg/apiserver/registry/intelligence/throughputanomalydetector/rest_test.go index 69f72c19..c434185b 100644 --- a/pkg/apiserver/registry/intelligence/throughputanomalydetector/rest_test.go +++ b/pkg/apiserver/registry/intelligence/throughputanomalydetector/rest_test.go @@ -18,6 +18,7 @@ import ( "context" "database/sql" "fmt" + "regexp" "testing" "github.com/DATA-DOG/go-sqlmock" @@ -29,7 +30,7 @@ import ( "k8s.io/client-go/kubernetes" crdv1alpha1 "antrea.io/theia/pkg/apis/crd/v1alpha1" - anomalydetector "antrea.io/theia/pkg/apis/intelligence/v1alpha1" + "antrea.io/theia/pkg/apis/intelligence/v1alpha1" ) type fakeQuerier struct{} @@ -40,24 +41,24 @@ func TestREST_Get(t *testing.T) { name string tadName string expectErr error - expectResult *anomalydetector.ThroughputAnomalyDetector + expectResult *v1alpha1.ThroughputAnomalyDetector }{ { name: "Not Found case", tadName: "non-existent-tad", - expectErr: errors.NewNotFound(anomalydetector.Resource("throughputanomalydetectors"), "non-existent-tad"), + expectErr: errors.NewNotFound(v1alpha1.Resource("throughputanomalydetectors"), "non-existent-tad"), expectResult: nil, }, { name: "Successful Get case", - tadName: "tad-2", + tadName: "tad-1", expectErr: nil, - expectResult: &anomalydetector.ThroughputAnomalyDetector{ + expectResult: &v1alpha1.ThroughputAnomalyDetector{ Type: "TAD", - Status: anomalydetector.ThroughputAnomalyDetectorStatus{ + Status: v1alpha1.ThroughputAnomalyDetectorStatus{ State: crdv1alpha1.ThroughputAnomalyDetectorStateCompleted, }, - Stats: []anomalydetector.ThroughputAnomalyDetectorStats{{ + Stats: []v1alpha1.ThroughputAnomalyDetectorStats{{ Id: "mock_Id", SourceIP: "mock_SourceIP", SourceTransportPort: "mock_SourceTransportPort", @@ -66,6 +67,8 @@ func TestREST_Get(t *testing.T) { FlowStartSeconds: "mock_FlowStartSeconds", FlowEndSeconds: "mock_FlowEndSeconds", Throughput: "mock_Throughput", + AggType: "mock_AggType", + AlgoType: "mock_AlgoType", AlgoCalc: "mock_AlgoCalc", Anomaly: "mock_Anomaly", }}, @@ -75,9 +78,9 @@ func TestREST_Get(t *testing.T) { name: "Unsuccessful Get case query error", tadName: "tad-2", expectErr: nil, - expectResult: &anomalydetector.ThroughputAnomalyDetector{ + expectResult: &v1alpha1.ThroughputAnomalyDetector{ Type: "TAD", - Status: anomalydetector.ThroughputAnomalyDetectorStatus{ + Status: v1alpha1.ThroughputAnomalyDetectorStatus{ State: crdv1alpha1.ThroughputAnomalyDetectorStateCompleted, ErrorMsg: "Failed to get the result for completed Throughput Anomaly Detector with id , error: failed to get Throughput Anomaly Detector results with id : error in database, please retry", }, @@ -87,11 +90,11 @@ func TestREST_Get(t *testing.T) { name: "Unsuccessful Get case rows error", tadName: "tad-2", expectErr: nil, - expectResult: &anomalydetector.ThroughputAnomalyDetector{ + expectResult: &v1alpha1.ThroughputAnomalyDetector{ Type: "TAD", - Status: anomalydetector.ThroughputAnomalyDetectorStatus{ + Status: v1alpha1.ThroughputAnomalyDetectorStatus{ State: crdv1alpha1.ThroughputAnomalyDetectorStateCompleted, - ErrorMsg: "Failed to get the result for completed Throughput Anomaly Detector with id , error: failed to scan Throughput Anomaly Detector results: sql: expected 1 destination arguments in Scan, not 10", + ErrorMsg: "Failed to get the result for completed Throughput Anomaly Detector with id , error: failed to scan Throughput Anomaly Detector results: sql: expected 1 destination arguments in Scan, not 12", }, }, }, @@ -104,8 +107,8 @@ func TestREST_Get(t *testing.T) { } defer db.Close() resultRows := sqlmock.NewRows([]string{ - "Id", "SourceIP", "SourceTransportPort", "DestinationIP", "DestinationTransportPort", "FlowStartSeconds", "FlowEndSeconds", "Throughput", "AlgoCalc", "Anomaly"}). - AddRow("mock_Id", "mock_SourceIP", "mock_SourceTransportPort", "mock_DestinationIP", "mock_DestinationTransportPort", "mock_FlowStartSeconds", "mock_FlowEndSeconds", "mock_Throughput", "mock_AlgoCalc", "mock_Anomaly") + "Id", "SourceIP", "SourceTransportPort", "DestinationIP", "DestinationTransportPort", "FlowStartSeconds", "FlowEndSeconds", "Throughput", "AggType", "AlgoType", "AlgoCalc", "Anomaly"}). + AddRow("mock_Id", "mock_SourceIP", "mock_SourceTransportPort", "mock_DestinationIP", "mock_DestinationTransportPort", "mock_FlowStartSeconds", "mock_FlowEndSeconds", "mock_Throughput", "mock_AggType", "mock_AlgoType", "mock_AlgoCalc", "mock_Anomaly") if tt.name == "Unsuccessful Get case query error" { mock.ExpectQuery(queryMap[tadQuery]).WillReturnError(fmt.Errorf("error in database, please retry")) } else if tt.name == "Unsuccessful Get case rows error" { @@ -121,7 +124,7 @@ func TestREST_Get(t *testing.T) { tad, err := r.Get(context.TODO(), tt.tadName, &v1.GetOptions{}) assert.Equal(t, err, tt.expectErr) if tad != nil { - assert.Equal(t, tt.expectResult, tad.(*anomalydetector.ThroughputAnomalyDetector)) + assert.Equal(t, tt.expectResult, tad.(*v1alpha1.ThroughputAnomalyDetector)) } else { assert.Nil(t, tt.expectResult) } @@ -171,7 +174,7 @@ func TestREST_Create(t *testing.T) { }, { name: "Job already exists case", - obj: &anomalydetector.ThroughputAnomalyDetector{ + obj: &v1alpha1.ThroughputAnomalyDetector{ TypeMeta: v1.TypeMeta{}, ObjectMeta: v1.ObjectMeta{Name: "existent-tad"}, }, @@ -180,7 +183,7 @@ func TestREST_Create(t *testing.T) { }, { name: "Successful Create case", - obj: &anomalydetector.ThroughputAnomalyDetector{ + obj: &v1alpha1.ThroughputAnomalyDetector{ TypeMeta: v1.TypeMeta{}, ObjectMeta: v1.ObjectMeta{Name: "non-existent-tad"}, }, @@ -201,11 +204,11 @@ func TestREST_Create(t *testing.T) { func TestREST_List(t *testing.T) { tests := []struct { name string - expectResult []anomalydetector.ThroughputAnomalyDetector + expectResult []v1alpha1.ThroughputAnomalyDetector }{ { name: "Successful List case", - expectResult: []anomalydetector.ThroughputAnomalyDetector{ + expectResult: []v1alpha1.ThroughputAnomalyDetector{ {ObjectMeta: v1.ObjectMeta{Name: "tad-1"}}, {ObjectMeta: v1.ObjectMeta{Name: "tad-2"}}, }, @@ -216,13 +219,165 @@ func TestREST_List(t *testing.T) { r := NewREST(&fakeQuerier{}) itemList, err := r.List(context.TODO(), &internalversion.ListOptions{}) assert.NoError(t, err) - tadList, ok := itemList.(*anomalydetector.ThroughputAnomalyDetectorList) + tadList, ok := itemList.(*v1alpha1.ThroughputAnomalyDetectorList) assert.True(t, ok) assert.ElementsMatch(t, tt.expectResult, tadList.Items) }) } } +func Test_getTadetectorResult(t *testing.T) { + tests := []struct { + name string + id string + query int + returnedRow *sqlmock.Rows + expectedResult *v1alpha1.ThroughputAnomalyDetector + expecterr error + }{ + { + name: "Get tadquery result", + id: "tad-1", + query: tadQuery, + returnedRow: sqlmock.NewRows([]string{ + "Id", "SourceIP", "SourceTransportPort", "DestinationIP", "DestinationTransportPort", "FlowStartSeconds", "FlowEndSeconds", "Throughput", "AggType", "AlgoType", "AlgoCalc", "Anomaly"}). + AddRow("mock_Id", "mock_SourceIP", "mock_SourceTransportPort", "mock_DestinationIP", "mock_DestinationTransportPort", "mock_FlowStartSeconds", "mock_FlowEndSeconds", "mock_Throughput", "mock_AggType", "mock_AlgoType", "mock_AlgoCalc", "mock_Anomaly"), + expectedResult: &v1alpha1.ThroughputAnomalyDetector{ + Stats: []v1alpha1.ThroughputAnomalyDetectorStats{{ + Id: "mock_Id", + SourceIP: "mock_SourceIP", + SourceTransportPort: "mock_SourceTransportPort", + DestinationIP: "mock_DestinationIP", + DestinationTransportPort: "mock_DestinationTransportPort", + FlowStartSeconds: "mock_FlowStartSeconds", + FlowEndSeconds: "mock_FlowEndSeconds", + Throughput: "mock_Throughput", + AggType: "mock_AggType", + AlgoType: "mock_AlgoType", + AlgoCalc: "mock_AlgoCalc", + Anomaly: "mock_Anomaly", + }}, + }, + expecterr: nil, + }, + { + name: "Get aggtadquery external result", + id: "tad-2", + query: aggTadExternalQuery, + returnedRow: sqlmock.NewRows([]string{ + "Id", "destinationIP", "FlowEndSeconds", "Throughput", "AggType", "AlgoType", "AlgoCalc", "Anomaly"}). + AddRow("mock_Id", "mock_destinationIP", "mock_FlowEndSeconds", "mock_Throughput", "mock_AggType", "mock_AlgoType", "mock_AlgoCalc", "mock_Anomaly"), + expectedResult: &v1alpha1.ThroughputAnomalyDetector{ + Stats: []v1alpha1.ThroughputAnomalyDetectorStats{{ + Id: "mock_Id", + DestinationIP: "mock_destinationIP", + FlowEndSeconds: "mock_FlowEndSeconds", + Throughput: "mock_Throughput", + AggType: "mock_AggType", + AlgoType: "mock_AlgoType", + AlgoCalc: "mock_AlgoCalc", + Anomaly: "mock_Anomaly", + }}, + }, + expecterr: nil, + }, + { + name: "Get aggtadquery pod podLabel result", + id: "tad-3", + query: aggTadPodLabelQuery, + returnedRow: sqlmock.NewRows([]string{ + "Id", "PodNamespace", "PodLabels", "Direction", "FlowEndSeconds", "Throughput", "AggType", "AlgoType", "AlgoCalc", "Anomaly"}). + AddRow("mock_Id", "mock_PodNamespace", "mock_PodLabels", "mock_Direction", "mock_FlowEndSeconds", "mock_Throughput", "mock_AggType", "mock_AlgoType", "mock_AlgoCalc", "mock_Anomaly"), + expectedResult: &v1alpha1.ThroughputAnomalyDetector{ + Stats: []v1alpha1.ThroughputAnomalyDetectorStats{{ + Id: "mock_Id", + PodNamespace: "mock_PodNamespace", + PodLabels: "mock_PodLabels", + Direction: "mock_Direction", + FlowEndSeconds: "mock_FlowEndSeconds", + Throughput: "mock_Throughput", + AggType: "mock_AggType", + AlgoType: "mock_AlgoType", + AlgoCalc: "mock_AlgoCalc", + Anomaly: "mock_Anomaly", + }}, + }, + expecterr: nil, + }, + { + name: "Get aggtadquery pod podName result", + id: "tad-4", + query: aggTadPodNameQuery, + returnedRow: sqlmock.NewRows([]string{ + "Id", "PodNamespace", "PodName", "Direction", "FlowEndSeconds", "Throughput", "AggType", "AlgoType", "AlgoCalc", "Anomaly"}). + AddRow("mock_Id", "mock_PodNamespace", "mock_PodName", "mock_Direction", "mock_FlowEndSeconds", "mock_Throughput", "mock_AggType", "mock_AlgoType", "mock_AlgoCalc", "mock_Anomaly"), + expectedResult: &v1alpha1.ThroughputAnomalyDetector{ + Stats: []v1alpha1.ThroughputAnomalyDetectorStats{{ + Id: "mock_Id", + PodNamespace: "mock_PodNamespace", + PodName: "mock_PodName", + Direction: "mock_Direction", + FlowEndSeconds: "mock_FlowEndSeconds", + Throughput: "mock_Throughput", + AggType: "mock_AggType", + AlgoType: "mock_AlgoType", + AlgoCalc: "mock_AlgoCalc", + Anomaly: "mock_Anomaly", + }}, + }, + expecterr: nil, + }, + { + name: "Get aggtadquery svc result", + id: "tad-5", + query: aggTadSvcQuery, + returnedRow: sqlmock.NewRows([]string{ + "Id", "DestinationServicePortName", "FlowEndSeconds", "Throughput", "AggType", "AlgoType", "AlgoCalc", "Anomaly"}). + AddRow("mock_Id", "mock_DestinationServicePortName", "mock_FlowEndSeconds", "mock_Throughput", "mock_AggType", "mock_AlgoType", "mock_AlgoCalc", "mock_Anomaly"), + expectedResult: &v1alpha1.ThroughputAnomalyDetector{ + Stats: []v1alpha1.ThroughputAnomalyDetectorStats{{ + Id: "mock_Id", + DestinationServicePortName: "mock_DestinationServicePortName", + FlowEndSeconds: "mock_FlowEndSeconds", + Throughput: "mock_Throughput", + AggType: "mock_AggType", + AlgoType: "mock_AlgoType", + AlgoCalc: "mock_AlgoCalc", + Anomaly: "mock_Anomaly", + }}, + }, + expecterr: nil, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + db, mock, err := sqlmock.New() + assert.NoError(t, err) + mock.ExpectQuery(regexp.QuoteMeta(queryMap[tt.query])).WillReturnRows(tt.returnedRow) + setupClickHouseConnection = func(client kubernetes.Interface) (connect *sql.DB, err error) { + return db, nil + } + r := NewREST(&fakeQuerier{}) + var tad v1alpha1.ThroughputAnomalyDetector + switch tt.query { + case aggTadExternalQuery: + tad.AggregatedFlow = "external" + case aggTadPodLabelQuery: + tad.AggregatedFlow = "pod" + tad.PodLabel = "mock_PodLabels" + case aggTadPodNameQuery: + tad.AggregatedFlow = "pod" + tad.PodName = "mock_PodName" + case aggTadSvcQuery: + tad.AggregatedFlow = "svc" + } + err = r.getTADetectorResult(tt.id, &tad) + assert.Equal(t, tt.expecterr, err) + assert.Equal(t, tt.expectedResult.Stats, tad.Stats) + }) + } +} + func (c *fakeQuerier) GetThroughputAnomalyDetector(namespace, name string) (*crdv1alpha1.ThroughputAnomalyDetector, error) { if name == "non-existent-tad" { return nil, fmt.Errorf("not found") diff --git a/pkg/controller/anomalydetector/controller.go b/pkg/controller/anomalydetector/controller.go index aa3319bd..193518bf 100644 --- a/pkg/controller/anomalydetector/controller.go +++ b/pkg/controller/anomalydetector/controller.go @@ -525,7 +525,7 @@ func (c *AnomalyDetectorController) startJob(newTAD *crdv1alpha1.ThroughputAnoma func (c *AnomalyDetectorController) startSparkApplication(newTAD *crdv1alpha1.ThroughputAnomalyDetector) error { var newTADJobArgs []string if newTAD.Spec.JobType != "EWMA" && newTAD.Spec.JobType != "ARIMA" && newTAD.Spec.JobType != "DBSCAN" { - return illeagelArguementError{fmt.Errorf("invalid request: Throughput Anomaly DetectorQuerier type should be 'EWMA' or 'ARIMA' or 'DBSCAN'")} + return illeagelArguementError{fmt.Errorf("invalid request: Throughput Anomaly Detector algorithm type should be 'EWMA' or 'ARIMA' or 'DBSCAN'")} } newTADJobArgs = append(newTADJobArgs, "--algo", newTAD.Spec.JobType) @@ -543,7 +543,39 @@ func (c *AnomalyDetectorController) startSparkApplication(newTAD *crdv1alpha1.Th if len(newTAD.Spec.NSIgnoreList) > 0 { nsIgnoreListStr := strings.Join(newTAD.Spec.NSIgnoreList, "\",\"") nsIgnoreListStr = "[\"" + nsIgnoreListStr + "\"]" - newTADJobArgs = append(newTADJobArgs, "--ns_ignore_list", nsIgnoreListStr) + newTADJobArgs = append(newTADJobArgs, "--ns-ignore-list", nsIgnoreListStr) + } + + if newTAD.Spec.AggregatedFlow != "" { + switch newTAD.Spec.AggregatedFlow { + case "pod": + newTADJobArgs = append(newTADJobArgs, "--agg-flow", newTAD.Spec.AggregatedFlow) + if newTAD.Spec.PodLabel != "" { + newTADJobArgs = append(newTADJobArgs, "--pod-label", newTAD.Spec.PodLabel) + } + if newTAD.Spec.PodName != "" { + newTADJobArgs = append(newTADJobArgs, "--pod-name", newTAD.Spec.PodName) + } + if newTAD.Spec.PodNameSpace != "" { + if newTAD.Spec.PodName == "" && newTAD.Spec.PodLabel == "" { + return illeagelArguementError{fmt.Errorf("invalid request: 'pod-namespace' argument can not be used alone, should be specified along pod-label or pod-name")} + } else { + newTADJobArgs = append(newTADJobArgs, "--pod-namespace", newTAD.Spec.PodNameSpace) + } + } + case "external": + newTADJobArgs = append(newTADJobArgs, "--agg-flow", newTAD.Spec.AggregatedFlow) + if newTAD.Spec.ExternalIP != "" { + newTADJobArgs = append(newTADJobArgs, "--external-ip", newTAD.Spec.ExternalIP) + } + case "svc": + newTADJobArgs = append(newTADJobArgs, "--agg-flow", newTAD.Spec.AggregatedFlow) + if newTAD.Spec.ServicePortName != "" { + newTADJobArgs = append(newTADJobArgs, "--svc-port-name", newTAD.Spec.ServicePortName) + } + default: + return illeagelArguementError{fmt.Errorf("invalid request: Throughput Anomaly Detector aggregated flow type should be 'pod' or 'external' or 'svc'")} + } } sparkResourceArgs := struct { diff --git a/pkg/controller/anomalydetector/controller_test.go b/pkg/controller/anomalydetector/controller_test.go index 9019c0da..df7d72c5 100644 --- a/pkg/controller/anomalydetector/controller_test.go +++ b/pkg/controller/anomalydetector/controller_test.go @@ -72,7 +72,7 @@ func newFakeController(t *testing.T) (*fakeController, *sql.DB) { tadController := NewAnomalyDetectorController(crdClient, kubeClient, taDetectorInformer) mock.ExpectQuery("SELECT DISTINCT id FROM tadetector;").WillReturnRows(sqlmock.NewRows([]string{})) - mock.ExpectExec("ALTER TABLE tadetector_local ON CLUSTER '{cluster}' DELETE WHERE id = (?);").WithArgs(tadName[3:]).WillReturnResult(sqlmock.NewResult(0, 1)) + mock.ExpectExec("ALTER TABLE tadetector_local ON CLUSTER '{cluster}' DELETE WHERE id = (?);").WithArgs(tadName[4:]).WillReturnResult(sqlmock.NewResult(0, 1)) return &fakeController{ tadController, crdClient, @@ -227,68 +227,100 @@ func TestTADetection(t *testing.T) { go tadController.Run(stopCh) - t.Run("NormalAnomalyDetector", func(t *testing.T) { - tad := &crdv1alpha1.ThroughputAnomalyDetector{ - ObjectMeta: metav1.ObjectMeta{Name: tadName, Namespace: testNamespace}, - Spec: crdv1alpha1.ThroughputAnomalyDetectorSpec{ - JobType: "ARIMA", - ExecutorInstances: 1, - DriverCoreRequest: "200m", - DriverMemory: "512M", - ExecutorCoreRequest: "200m", - ExecutorMemory: "512M", - StartInterval: metav1.NewTime(time.Now()), - EndInterval: metav1.NewTime(time.Now().Add(time.Second * 100)), - NSIgnoreList: []string{"kube-system", "flow-visibility"}, + tadtestCases := []struct { + name string + tad *crdv1alpha1.ThroughputAnomalyDetector + }{ + { + name: "NormalAnomalyDetector agg_type external", + tad: &crdv1alpha1.ThroughputAnomalyDetector{ + ObjectMeta: metav1.ObjectMeta{Name: tadName, Namespace: testNamespace}, + Spec: crdv1alpha1.ThroughputAnomalyDetectorSpec{ + JobType: "ARIMA", + ExecutorInstances: 1, + DriverCoreRequest: "200m", + DriverMemory: "512M", + ExecutorCoreRequest: "200m", + ExecutorMemory: "512M", + StartInterval: metav1.NewTime(time.Now()), + EndInterval: metav1.NewTime(time.Now().Add(time.Second * 100)), + NSIgnoreList: []string{"kube-system", "flow-visibility"}, + AggregatedFlow: "external", + ExternalIP: "10.0.0.1", + }, + Status: crdv1alpha1.ThroughputAnomalyDetectorStatus{}, }, - Status: crdv1alpha1.ThroughputAnomalyDetectorStatus{}, - } - - tad, err := tadController.CreateThroughputAnomalyDetector(testNamespace, tad) - assert.NoError(t, err) + }, + { + name: "NormalAnomalyDetector agg_type svc", + tad: &crdv1alpha1.ThroughputAnomalyDetector{ + ObjectMeta: metav1.ObjectMeta{Name: tadName, Namespace: testNamespace}, + Spec: crdv1alpha1.ThroughputAnomalyDetectorSpec{ + JobType: "ARIMA", + ExecutorInstances: 1, + DriverCoreRequest: "200m", + DriverMemory: "512M", + ExecutorCoreRequest: "200m", + ExecutorMemory: "512M", + StartInterval: metav1.NewTime(time.Now()), + EndInterval: metav1.NewTime(time.Now().Add(time.Second * 100)), + NSIgnoreList: []string{"kube-system", "flow-visibility"}, + AggregatedFlow: "svc", + ServicePortName: "TestServicePortName", + }, + Status: crdv1alpha1.ThroughputAnomalyDetectorStatus{}, + }, + }, + } + for _, tt := range tadtestCases { + t.Run(tt.name, func(t *testing.T) { + tad, err := tadController.CreateThroughputAnomalyDetector(testNamespace, tt.tad) + assert.NoError(t, err) - serviceCreated := false - // The step interval should be larger than resync period to ensure the progress is updated - stepInterval := 1 * time.Second - timeout := 30 * time.Second + serviceCreated := false + // The step interval should be larger than resync period to ensure the progress is updated + stepInterval := 1 * time.Second + timeout := 30 * time.Second - wait.PollImmediate(stepInterval, timeout, func() (done bool, err error) { - tad, err = tadController.GetThroughputAnomalyDetector(testNamespace, tadName) - if err != nil { - return false, nil - } - // Mocking Spark Monitor service requires the SparkApplication id. - if !serviceCreated { - // Create Spark Monitor service - err = createFakeSparkApplicationService(tadController.kubeClient, tad.Status.SparkApplication) - assert.NoError(t, err) - serviceCreated = true - } - if tad != nil { - fakeSAClient.step("tad-"+tad.Status.SparkApplication, testNamespace) - } - return tad.Status.State == crdv1alpha1.ThroughputAnomalyDetectorStateCompleted, nil - }) + wait.PollImmediate(stepInterval, timeout, func() (done bool, err error) { + tad, err = tadController.GetThroughputAnomalyDetector(testNamespace, tadName) + if err != nil { + return false, nil + } + // Mocking Spark Monitor service requires the SparkApplication id. + if !serviceCreated { + // Create Spark Monitor service + err = createFakeSparkApplicationService(tadController.kubeClient, tad.Status.SparkApplication) + assert.NoError(t, err) + serviceCreated = true + } + if tad != nil { + fakeSAClient.step("tad-"+tad.Status.SparkApplication, testNamespace) + } + return tad.Status.State == crdv1alpha1.ThroughputAnomalyDetectorStateCompleted, nil + }) - assert.Equal(t, crdv1alpha1.ThroughputAnomalyDetectorStateCompleted, tad.Status.State) - assert.Equal(t, 3, tad.Status.CompletedStages) - assert.Equal(t, 5, tad.Status.TotalStages) - assert.True(t, tad.Status.StartTime.Before(&tad.Status.EndTime)) + assert.Equal(t, crdv1alpha1.ThroughputAnomalyDetectorStateCompleted, tad.Status.State) + assert.Equal(t, 3, tad.Status.CompletedStages) + assert.Equal(t, 5, tad.Status.TotalStages) + assert.True(t, tad.Status.StartTime.Before(&tad.Status.EndTime)) - tadList, err := tadController.ListThroughputAnomalyDetector(testNamespace) - assert.NoError(t, err) - assert.Equal(t, 1, len(tadList), "Expected exactly one ThroughputAnomalyDetector, got %d", len(tadList)) - assert.Equal(t, tad, tadList[0]) + tadList, err := tadController.ListThroughputAnomalyDetector(testNamespace) + assert.NoError(t, err) + assert.Equal(t, 1, len(tadList), "Expected exactly one ThroughputAnomalyDetector, got %d", len(tadList)) + assert.Equal(t, tad, tadList[0]) - err = tadController.DeleteThroughputAnomalyDetector(testNamespace, tadName) - assert.NoError(t, err) - }) + err = tadController.DeleteThroughputAnomalyDetector(testNamespace, tadName) + assert.NoError(t, err) + }) + } testCases := []struct { name string tadName string tad *crdv1alpha1.ThroughputAnomalyDetector expectedErrorMsg string + expectedstdout string }{ { name: "invalid JobType", @@ -299,7 +331,7 @@ func TestTADetection(t *testing.T) { JobType: "nonexistent-job-type", }, }, - expectedErrorMsg: "invalid request: Throughput Anomaly DetectorQuerier type should be 'EWMA' or 'ARIMA' or 'DBSCAN'", + expectedErrorMsg: "invalid request: Throughput Anomaly Detector algorithm type should be 'EWMA' or 'ARIMA' or 'DBSCAN'", }, { name: "invalid EndInterval", @@ -384,6 +416,33 @@ func TestTADetection(t *testing.T) { }, expectedErrorMsg: "invalid request: ExecutorMemory should conform to the Kubernetes resource quantity convention", }, + { + name: "invalid Aggregatedflow pod-podNamespace combo", + tadName: "tad-invalid-agg-flow-pod-podNamespace-combo", + tad: &crdv1alpha1.ThroughputAnomalyDetector{ + ObjectMeta: metav1.ObjectMeta{Name: "tad-invalid-agg-flow-pod-podNamespace-combo", Namespace: testNamespace}, + Spec: crdv1alpha1.ThroughputAnomalyDetectorSpec{ + JobType: "ARIMA", + AggregatedFlow: "pod", + PodNameSpace: "podNameSpace", + PodName: "", + PodLabel: "", + }, + }, + expectedErrorMsg: "invalid request: 'pod-namespace' argument can not be used alone", + }, + { + name: "invalid Aggregatedflow", + tadName: "tad-invalid-agg-flow", + tad: &crdv1alpha1.ThroughputAnomalyDetector{ + ObjectMeta: metav1.ObjectMeta{Name: "tad-invalid-agg-flow", Namespace: testNamespace}, + Spec: crdv1alpha1.ThroughputAnomalyDetectorSpec{ + JobType: "ARIMA", + AggregatedFlow: "nonexistent-agg-flow", + }, + }, + expectedErrorMsg: "invalid request: Throughput Anomaly Detector aggregated flow type should be 'pod' or 'external' or 'svc'", + }, } for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { diff --git a/pkg/theia/commands/anomaly_detection_delete.go b/pkg/theia/commands/anomaly_detection_delete.go index 6186f489..6024eab3 100644 --- a/pkg/theia/commands/anomaly_detection_delete.go +++ b/pkg/theia/commands/anomaly_detection_delete.go @@ -17,6 +17,7 @@ package commands import ( "context" "fmt" + "strings" "github.com/spf13/cobra" @@ -26,13 +27,15 @@ import ( // anomalyDetectionDeleteCmd represents the anomaly detection delete command var anomalyDetectionDeleteCmd = &cobra.Command{ Use: "delete", - Short: "Delete a anomaly detection job", - Long: `Delete a anomaly detection job by Name.`, + Short: "Delete anomaly detection job", + Long: `Delete anomaly detection job by Name.`, Aliases: []string{"del"}, Args: cobra.RangeArgs(0, 1), Example: ` Delete the anomaly detection job with Name tad-e998433e-accb-4888-9fc8-06563f073e86 $ theia throughput-anomaly-detection delete tad-e998433e-accb-4888-9fc8-06563f073e86 +Delete multiple anomaly detection jobs using "" with space separated tad ids +$ theia throughput-anomaly-detection delete "tad-e998433e-accb-4888-9fc8-06563f073e86 tad-1234abcd-1234-abcd-12ab-12345678abcd" `, RunE: anomalyDetectionDelete, } @@ -45,7 +48,28 @@ func anomalyDetectionDelete(cmd *cobra.Command, args []string) error { if tadName == "" && len(args) == 1 { tadName = args[0] } - err = util.ParseADAlgorithmID(tadName) + tadNameList := strings.Fields(tadName) + for _, tadName := range tadNameList { + err = deleteTADId(cmd, tadName) + if err != nil { + return err + } + } + return nil +} + +func init() { + throughputanomalyDetectionCmd.AddCommand(anomalyDetectionDeleteCmd) + anomalyDetectionDeleteCmd.Flags().StringP( + "name", + "", + "", + "Name of the anomaly detection job.", + ) +} + +func deleteTADId(cmd *cobra.Command, tadName string) error { + err := util.ParseADAlgorithmID(tadName) if err != nil { return err } @@ -72,13 +96,3 @@ func anomalyDetectionDelete(cmd *cobra.Command, args []string) error { fmt.Printf("Successfully deleted anomaly detection job with name: %s\n", tadName) return nil } - -func init() { - throughputanomalyDetectionCmd.AddCommand(anomalyDetectionDeleteCmd) - anomalyDetectionDeleteCmd.Flags().StringP( - "name", - "", - "", - "Name of the anomaly detection job.", - ) -} diff --git a/pkg/theia/commands/anomaly_detection_retrieve.go b/pkg/theia/commands/anomaly_detection_retrieve.go index b2573bc1..34eb9781 100644 --- a/pkg/theia/commands/anomaly_detection_retrieve.go +++ b/pkg/theia/commands/anomaly_detection_retrieve.go @@ -18,7 +18,6 @@ import ( "encoding/json" "fmt" "os" - "text/tabwriter" "github.com/spf13/cobra" @@ -105,13 +104,37 @@ func throughputAnomalyDetectionRetrieve(cmd *cobra.Command, args []string) error } return nil } else { - w := tabwriter.NewWriter(os.Stdout, 15, 8, 1, '\t', tabwriter.AlignRight) - fmt.Fprintf(w, "id\tsourceIP\tsourceTransportPort\tdestinationIP\tdestinationTransportPort\tflowStartSeconds\tflowEndSeconds\tthroughput\talgoCalc\tanomaly\n") - for _, p := range tad.Stats { - fmt.Fprintf(w, "%v\t%v\t%v\t%v\t%v\t%v\t%v\t%v\t%v\t%v\n", p.Id, p.SourceIP, p.SourceTransportPort, p.DestinationIP, p.DestinationTransportPort, p.FlowStartSeconds, p.FlowEndSeconds, p.Throughput, p.AlgoCalc, p.Anomaly) + var result [][]string + switch tad.Stats[0].AggType { + case "e2e": + result = append(result, []string{"id", "sourceIP", "sourceTransportPort", "destinationIP", "destinationTransportPort", "flowStartSeconds", "flowEndSeconds", "throughput", "aggType", "algoType", "algoCalc", "anomaly"}) + for _, p := range tad.Stats { + result = append(result, []string{p.Id, p.SourceIP, p.SourceTransportPort, p.DestinationIP, p.DestinationTransportPort, p.FlowStartSeconds, p.FlowEndSeconds, p.Throughput, p.AggType, p.AlgoType, p.AlgoCalc, p.Anomaly}) + } + case "pod": + if tad.Stats[0].PodName != "" { + result = append(result, []string{"id", "podNamespace", "podName", "direction", "flowEndSeconds", "throughput", "aggType", "algoType", "algoCalc", "anomaly"}) + for _, p := range tad.Stats { + result = append(result, []string{p.Id, p.PodNamespace, p.PodName, p.Direction, p.FlowEndSeconds, p.Throughput, p.AggType, p.AlgoType, p.AlgoCalc, p.Anomaly}) + } + } else { + result = append(result, []string{"id", "podNamespace", "podLabels", "direction", "flowEndSeconds", "throughput", "aggType", "algoType", "algoCalc", "anomaly"}) + for _, p := range tad.Stats { + result = append(result, []string{p.Id, p.PodNamespace, p.PodLabels, p.Direction, p.FlowEndSeconds, p.Throughput, p.AggType, p.AlgoType, p.AlgoCalc, p.Anomaly}) + } + } + case "external": + result = append(result, []string{"id", "destinationIP", "flowEndSeconds", "throughput", "aggType", "algoType", "algoCalc", "anomaly"}) + for _, p := range tad.Stats { + result = append(result, []string{p.Id, p.DestinationIP, p.FlowEndSeconds, p.Throughput, p.AggType, p.AlgoType, p.AlgoCalc, p.Anomaly}) + } + case "svc": + result = append(result, []string{"id", "destinationServicePortName", "flowEndSeconds", "throughput", "aggType", "algoType", "algoCalc", "anomaly"}) + for _, p := range tad.Stats { + result = append(result, []string{p.Id, p.DestinationServicePortName, p.FlowEndSeconds, p.Throughput, p.AggType, p.AlgoType, p.AlgoCalc, p.Anomaly}) + } } - w.Flush() - fmt.Printf("\n") + TableOutput(result) } return nil } diff --git a/pkg/theia/commands/anomaly_detection_retrieve_test.go b/pkg/theia/commands/anomaly_detection_retrieve_test.go index 27038883..55c776da 100644 --- a/pkg/theia/commands/anomaly_detection_retrieve_test.go +++ b/pkg/theia/commands/anomaly_detection_retrieve_test.go @@ -45,7 +45,7 @@ func TestAnomalyDetectorRetrieve(t *testing.T) { filePath string }{ { - name: "Valid case", + name: "Valid case e2e", testServer: httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { switch strings.TrimSpace(r.URL.Path) { case fmt.Sprintf("/apis/intelligence.theia.antrea.io/v1alpha1/throughputanomalydetectors/%s", tadName): @@ -57,6 +57,7 @@ func TestAnomalyDetectorRetrieve(t *testing.T) { Id: tadName, Anomaly: "true", AlgoCalc: "1234567", + AggType: "e2e", }}, } w.Header().Set("Content-Type", "application/json") @@ -64,8 +65,110 @@ func TestAnomalyDetectorRetrieve(t *testing.T) { json.NewEncoder(w).Encode(tad) } })), - tadName: tadName, - expectedMsg: []string{"1234567\t\ttrue"}, + tadName: "tad-1234abcd-1234-abcd-12ab-12345678abcd", + expectedMsg: []string{"id sourceIP sourceTransportPort destinationIP destinationTransportPort flowStartSeconds flowEndSeconds throughput aggType algoType algoCalc anomaly", "e2e 1234567 true"}, + expectedErrorMsg: "", + }, + { + name: "Valid case agg_type: external", + testServer: httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + switch strings.TrimSpace(r.URL.Path) { + case fmt.Sprintf("/apis/intelligence.theia.antrea.io/v1alpha1/throughputanomalydetectors/%s", tadName): + tad := &anomalydetector.ThroughputAnomalyDetector{ + Status: anomalydetector.ThroughputAnomalyDetectorStatus{ + State: "COMPLETED", + }, + Stats: []anomalydetector.ThroughputAnomalyDetectorStats{{ + Id: "tad-1234abcd-1234-abcd-12ab-12345678abcd", + Anomaly: "true", + AlgoCalc: "1234567", + AggType: "external", + }}, + } + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + json.NewEncoder(w).Encode(tad) + } + })), + tadName: "tad-1234abcd-1234-abcd-12ab-12345678abcd", + expectedMsg: []string{"id destinationIP flowEndSeconds throughput aggType algoType algoCalc anomaly", "external 1234567 true"}, + expectedErrorMsg: "", + }, + { + name: "Valid case agg_type: pod podlabels", + testServer: httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + switch strings.TrimSpace(r.URL.Path) { + case fmt.Sprintf("/apis/intelligence.theia.antrea.io/v1alpha1/throughputanomalydetectors/%s", tadName): + tad := &anomalydetector.ThroughputAnomalyDetector{ + Status: anomalydetector.ThroughputAnomalyDetectorStatus{ + State: "COMPLETED", + }, + Stats: []anomalydetector.ThroughputAnomalyDetectorStats{{ + Id: "tad-1234abcd-1234-abcd-12ab-12345678abcd", + Anomaly: "true", + AlgoCalc: "1234567", + AggType: "pod", + PodLabels: "testlabels", + }}, + } + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + json.NewEncoder(w).Encode(tad) + } + })), + tadName: "tad-1234abcd-1234-abcd-12ab-12345678abcd", + expectedMsg: []string{"id podNamespace podLabels direction flowEndSeconds throughput aggType algoType algoCalc anomaly", "pod 1234567 true"}, + expectedErrorMsg: "", + }, + { + name: "Valid case agg_type: pod podname", + testServer: httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + switch strings.TrimSpace(r.URL.Path) { + case fmt.Sprintf("/apis/intelligence.theia.antrea.io/v1alpha1/throughputanomalydetectors/%s", tadName): + tad := &anomalydetector.ThroughputAnomalyDetector{ + Status: anomalydetector.ThroughputAnomalyDetectorStatus{ + State: "COMPLETED", + }, + Stats: []anomalydetector.ThroughputAnomalyDetectorStats{{ + Id: "tad-1234abcd-1234-abcd-12ab-12345678abcd", + Anomaly: "true", + AlgoCalc: "1234567", + AggType: "pod", + PodName: "testpodname", + }}, + } + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + json.NewEncoder(w).Encode(tad) + } + })), + tadName: "tad-1234abcd-1234-abcd-12ab-12345678abcd", + expectedMsg: []string{"id podNamespace podName direction flowEndSeconds throughput aggType algoType algoCalc anomaly", "pod 1234567 true"}, + expectedErrorMsg: "", + }, + { + name: "Valid case agg_type: svc", + testServer: httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + switch strings.TrimSpace(r.URL.Path) { + case fmt.Sprintf("/apis/intelligence.theia.antrea.io/v1alpha1/throughputanomalydetectors/%s", tadName): + tad := &anomalydetector.ThroughputAnomalyDetector{ + Status: anomalydetector.ThroughputAnomalyDetectorStatus{ + State: "COMPLETED", + }, + Stats: []anomalydetector.ThroughputAnomalyDetectorStats{{ + Id: "tad-1234abcd-1234-abcd-12ab-12345678abcd", + Anomaly: "true", + AlgoCalc: "1234567", + AggType: "svc", + }}, + } + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + json.NewEncoder(w).Encode(tad) + } + })), + tadName: "tad-1234abcd-1234-abcd-12ab-12345678abcd", + expectedMsg: []string{"id destinationServicePortName flowEndSeconds throughput aggType algoType algoCalc anomaly", "svc 1234567 true"}, expectedErrorMsg: "", }, { diff --git a/pkg/theia/commands/anomaly_detection_run.go b/pkg/theia/commands/anomaly_detection_run.go index 205f8ba5..527e8d5f 100644 --- a/pkg/theia/commands/anomaly_detection_run.go +++ b/pkg/theia/commands/anomaly_detection_run.go @@ -151,6 +151,62 @@ be a list of namespace string, for example: '["kube-system","flow-aggregator","f } throughputAnomalyDetection.ExecutorMemory = executorMemory + aggregatedFlow, err := cmd.Flags().GetString("agg-flow") + if err != nil { + return err + } + if aggregatedFlow != "" { + switch aggregatedFlow { + case "pod": + podLabel, err := cmd.Flags().GetString("pod-label") + if err != nil { + return err + } + if podLabel != "" { + throughputAnomalyDetection.PodLabel = podLabel + } + podName, err := cmd.Flags().GetString("pod-name") + if err != nil { + return err + } + if podName != "" { + throughputAnomalyDetection.PodName = podName + } + podNameSpace, err := cmd.Flags().GetString("pod-namespace") + if err != nil { + return err + } + if podNameSpace != "" { + if podName == "" && podLabel == "" { + return fmt.Errorf("'pod-namespace' argument can not be used alone, should be specified along pod-label or pod-name") + } else { + throughputAnomalyDetection.PodNameSpace = podNameSpace + } + } + throughputAnomalyDetection.AggregatedFlow = aggregatedFlow + case "external": + externalIp, err := cmd.Flags().GetString("external-ip") + if err != nil { + return err + } + if externalIp != "" { + throughputAnomalyDetection.ExternalIP = externalIp + } + throughputAnomalyDetection.AggregatedFlow = aggregatedFlow + case "svc": + servicePortName, err := cmd.Flags().GetString("svc-port-name") + if err != nil { + return err + } + if servicePortName != "" { + throughputAnomalyDetection.ServicePortName = servicePortName + } + throughputAnomalyDetection.AggregatedFlow = aggregatedFlow + default: + return fmt.Errorf("throughput anomaly detector aggregated flow type should be 'pod' or 'external' or 'svc'") + } + } + tadID := uuid.New().String() throughputAnomalyDetection.Name = "tad-" + tadID throughputAnomalyDetection.Namespace = config.FlowVisibilityNS @@ -240,5 +296,34 @@ Example values include 0.1, 500m, 1.5, 5, etc.`, `Specify the memory request for the executor Pod. Values conform to the Kubernetes resource quantity convention. Example values include 512M, 1G, 8G, etc.`, ) - + throughputAnomalyDetectionAlgoCmd.Flags().String( + "agg-flow", + "", + `Specifies which aggregated flow to perform anomaly detection on, options are pod/svc/external`, + ) + throughputAnomalyDetectionAlgoCmd.Flags().String( + "pod-label", + "", + `On choosing agg-flow as pod, user has option to specify labels for inbound/outbound throughput, default would be all labels`, + ) + throughputAnomalyDetectionAlgoCmd.Flags().String( + "pod-name", + "", + `On choosing agg-flow as pod, user has option to specify podname for inbound/outbound throughput, if pod-labels specified, that will take priority`, + ) + throughputAnomalyDetectionAlgoCmd.Flags().String( + "pod-namespace", + "", + `On choosing agg-flow as pod, user has option to specify podnamespace for inbound/outbound throughput, podnamespace argument should be combined with podlabels or podname`, + ) + throughputAnomalyDetectionAlgoCmd.Flags().String( + "external-ip", + "", + `On choosing agg-flow as external, user has option to specify external-ip for inbound throughput, default would be all IPs`, + ) + throughputAnomalyDetectionAlgoCmd.Flags().String( + "svc-port-name", + "", + `On choosing agg-flow as svc, user has option to specify svc-port-name for inbound throughput, default would be all service port names`, + ) } diff --git a/pkg/theia/commands/anomaly_detection_run_test.go b/pkg/theia/commands/anomaly_detection_run_test.go index 528f465e..814b7ff4 100644 --- a/pkg/theia/commands/anomaly_detection_run_test.go +++ b/pkg/theia/commands/anomaly_detection_run_test.go @@ -129,6 +129,12 @@ func TestAnomalyDetectionRun(t *testing.T) { cmd.Flags().String("start-time", "2006-01-02 15:04:05", "") cmd.Flags().String("end-time", "2006-01-03 16:04:05", "") cmd.Flags().String("ns-ignore-list", "[\"kube-system\",\"flow-aggregator\",\"flow-visibility\"]", "") + cmd.Flags().String("agg-flow", "pod", "") + cmd.Flags().String("pod-label", "app:label1", "") + cmd.Flags().String("pod-name", "testpodname", "") + cmd.Flags().String("pod-namespace", "testpodnamespace", "") + cmd.Flags().String("external-ip", "10.0.0.1", "") + cmd.Flags().String("svc-port-name", "testportname", "") cmd.Flags().Int32("executor-instances", 1, "") cmd.Flags().String("driver-core-request", "1", "") cmd.Flags().String("driver-memory", "1m", "") diff --git a/plugins/anomaly-detection/anomaly_detection.py b/plugins/anomaly-detection/anomaly_detection.py index 56547299..dc5756ea 100644 --- a/plugins/anomaly-detection/anomaly_detection.py +++ b/plugins/anomaly-detection/anomaly_detection.py @@ -57,10 +57,54 @@ 'protocolIdentifier', 'flowStartSeconds', 'flowEndSeconds', - 'flowEndSeconds - flowStartSeconds as Diff_Secs', 'max(throughput)' ] +AGG_FLOW_TABLE_COLUMNS_EXTERNAL = [ + 'destinationIP', + 'flowType', + 'flowEndSeconds', + 'sum(throughput)' +] + +AGG_FLOW_TABLE_COLUMNS_POD_INBOUND = [ + "destinationPodNamespace AS podNamespace", + "destinationPodLabels AS podLabels", + "'inbound' AS direction", + "flowEndSeconds", + "sum(throughput)" +] + +AGG_FLOW_TABLE_COLUMNS_POD_OUTBOUND = [ + "sourcePodNamespace AS podNamespace", + "sourcePodLabels AS podLabels", + "'outbound' AS direction", + "flowEndSeconds", + "sum(throughput)" +] + +AGG_FLOW_TABLE_COLUMNS_PODNAME_INBOUND = [ + "destinationPodNamespace AS podNamespace", + "destinationPodName AS podName", + "'inbound' AS direction", + "flowEndSeconds", + "sum(throughput)" +] + +AGG_FLOW_TABLE_COLUMNS_PODNAME_OUTBOUND = [ + "sourcePodNamespace AS podNamespace", + "sourcePodName AS podName", + "'outbound' AS direction", + "flowEndSeconds", + "sum(throughput)" +] + +AGG_FLOW_TABLE_COLUMNS_SVC = [ + 'destinationServicePortName', + 'flowEndSeconds', + 'sum(throughput)' +] + # Column names to be used to group and identify a connection uniquely DF_GROUP_COLUMNS = [ 'sourceIP', @@ -71,15 +115,40 @@ 'flowStartSeconds' ] +DF_AGG_GRP_COLUMNS_POD = [ + 'podNamespace', + 'podLabels', + 'direction', +] + +DF_AGG_GRP_COLUMNS_PODNAME = [ + 'podNamespace', + 'podName', + 'direction', +] + +DF_AGG_GRP_COLUMNS_EXTERNAL = [ + 'destinationIP', + 'flowType', +] + +DF_AGG_GRP_COLUMNS_SVC = [ + 'destinationServicePortName', +] + +MEANINGLESS_LABELS = [ + "pod-template-hash", + "controller-revision-hash", + "pod-template-generation", +] -def calculate_ewma(diff_secs_throughput): + +def calculate_ewma(throughput_list): """ The function calculates Exponential Weighted Moving Average (EWMA) for a given list of throughput values of a connection Args: - diff_secs_throughput: Column of a dataframe containing difference in - seconds between connection start and current flow, along with its - throughput as a tuple + throughput_list: Column of a dataframe containing throughput Returns: A list of EWMA values calculated for the set of throughput values for that specific connection. @@ -88,16 +157,15 @@ def calculate_ewma(diff_secs_throughput): alpha = 0.5 # Can be changed and passed as UDF value later. prev_ewma_val = 0.0 ewma_row = [] - for ele in diff_secs_throughput: - ele_float = float(ele[1]) + for ele in throughput_list: + ele_float = float(ele) curr_ewma_val = (1 - alpha) * prev_ewma_val + alpha * ele_float prev_ewma_val = curr_ewma_val ewma_row.append(float(curr_ewma_val)) - return ewma_row -def calculate_ewma_anomaly(dataframe): +def calculate_ewma_anomaly(throughput_row, stddev): """ The function categorizes whether a network flow is Anomalous or not based on the calculated EWMA value. @@ -106,22 +174,20 @@ def calculate_ewma_anomaly(dataframe): True - Anomalous Traffic, False - Not Anomalous Args: - dataframe : The row of a dataframe containing all data related to - this network connection. + throughput_row : The row of a dataframe containing all throughput data. + stddev : The row of a dataframe containing standard Deviation Returns: A list of boolean values which signifies if a network flow record of that connection is anomalous or not. """ - stddev = dataframe[7] - ewma_arr = dataframe[9] - throughput_arr = dataframe[10] + ewma_arr = calculate_ewma(throughput_row) anomaly_result = [] if ewma_arr is None: logger.error("Error: EWMA values not calculated for this flow record") result = False anomaly_result.append(result) - elif throughput_arr is None: + elif throughput_row is None: logger.error("Error: Throughput values not in ideal format for this " "flow record") result = False @@ -133,20 +199,20 @@ def calculate_ewma_anomaly(dataframe): logger.error("Error: Too Few Throughput Values for Standard " "Deviation to be calculated.") result = False - elif throughput_arr[i] is None: + elif throughput_row[i] is None: logger.error("Error: Throughput values not in ideal format " "for this flow record") result = False else: float(stddev) - result = True if (abs(float(throughput_arr[i]) - float( + result = True if (abs(float(throughput_row[i]) - float( ewma_arr[i])) > float(stddev)) else False anomaly_result.append(result) return anomaly_result -def calculate_arima(diff_secs_throughput): +def calculate_arima(throughputs): """ The function calculates AutoRegressive Integrated Moving Average (ARIMA) for a given list of throughput values of a connection @@ -154,17 +220,15 @@ def calculate_arima(diff_secs_throughput): prediction, any connection with less than 3 flow records will not be taken into account for calculation. We return empty value in that case. Args: - diff_secs_throughput: Column of a dataframe containing difference - in seconds between connection start and current flow, - along with its throughput as a tuple + throughputs: Column of a dataframe containing throughput Returns: A list of ARIMA values calculated for the set of throughput values for that specific connection. """ throughput_list = [] - for ele in diff_secs_throughput: - throughput_list.append(float(ele[1])) + for ele in throughputs: + throughput_list.append(float(ele)) if len(throughput_list) <= 3: logger.error("Error: Too Few throughput values for ARIMA to work with") return None @@ -200,7 +264,7 @@ def calculate_arima(diff_secs_throughput): return None -def calculate_arima_anomaly(dataframe): +def calculate_arima_anomaly(throughput_row, stddev): """ The function categorizes whether a network flow is Anomalous or not based on the calculated ARIMA value. A traffic is anomalous if abs(throughput @@ -208,23 +272,21 @@ def calculate_arima_anomaly(dataframe): Anomalous Args: - dataframe : The row of a dataframe containing all data related to - this network connection. + Throughput_list : The row of a dataframe containing all throughput + data. + stddev : The row of a dataframe containing standard Deviation Returns: A list of boolean values which signifies if a network flow record of that connection is anomalous or not. """ - - stddev = dataframe[7] - arima_arr = dataframe[9] - throughput_arr = dataframe[10] + arima_arr = calculate_arima(throughput_row) anomaly_result = [] if arima_arr is None: logger.error("Error: ARIMA values not calculated for this flow record") result = False anomaly_result.append(result) - elif throughput_arr is None: + elif throughput_row is None: logger.error("Error: Throughput values not in ideal format for this " "flow record") result = False @@ -236,18 +298,18 @@ def calculate_arima_anomaly(dataframe): logger.error("Error: Too Few Throughput Values for Standard " "Deviation to be calculated.") result = False - elif throughput_arr[i] is None: + elif throughput_row[i] is None: logger.error("Error: Throughput values not in ideal format " "for this flow record") result = False else: - result = True if (abs(float(throughput_arr[i]) - float( + result = True if (abs(float(throughput_row[i]) - float( arima_arr[i])) > float(stddev)) else False anomaly_result.append(result) return anomaly_result -def calculate_dbscan(diff_secs_throughput): +def calculate_dbscan(throughput_list): """ The function is a placeholder function as anomaly detection with DBSCAN only inputs the throughput values. However, in order to maintain @@ -255,18 +317,18 @@ def calculate_dbscan(diff_secs_throughput): """ # Currently just a placeholder function placeholder_throughput_list = [] - for i in range(len(diff_secs_throughput)): + for i in range(len(throughput_list)): placeholder_throughput_list.append(0.0) return placeholder_throughput_list -def calculate_dbscan_anomaly(dataframe): +def calculate_dbscan_anomaly(throughput_row, stddev=None): """ The function calculates Density-based spatial clustering of applications with Noise (DBSCAN) for a given list of throughput values of a connection Args: - dataframe: The row of a dataframe containing all data related to this - network connection. + throughput_row : The row of a dataframe containing all throughput data. + stddev : The row of a dataframe containing standard Deviation Assumption: Since DBSCAN needs only numeric value to train and start prediction, any connection with null values will not be taken into account for calculation. We return empty value in that case. @@ -274,10 +336,8 @@ def calculate_dbscan_anomaly(dataframe): A list of boolean values which signifies if a network flow records of the connection is anomalous or not based on DBSCAN """ - - throughput_list = dataframe[10] anomaly_result = [] - np_throughput_list = np.array(throughput_list) + np_throughput_list = np.array(throughput_row) np_throughput_list = np_throughput_list.reshape(-1, 1) outlier_detection = DBSCAN(min_samples=4, eps=250000000) clusters = outlier_detection.fit_predict(np_throughput_list) @@ -289,22 +349,55 @@ def calculate_dbscan_anomaly(dataframe): return anomaly_result -def filter_df_with_true_anomalies(spark, plotDF, algo_type): - plotDF = plotDF.withColumn( +def filter_df_with_true_anomalies( + spark, plotDF, algo_type, agg_flow=None, pod_label=None): + newDF = plotDF.withColumn( "new", f.arrays_zip( "flowEndSeconds", "algoCalc", "throughputs", "anomaly")).withColumn( - "new", f.explode("new")).select( - "sourceIP", "sourceTransportPort", "destinationIP", - "destinationTransportPort", - "protocolIdentifier", "flowStartSeconds", - f.col("new.flowEndSeconds").alias("flowEndSeconds"), - "throughputStandardDeviation", "algoType", - f.col("new.algoCalc").alias("algoCalc"), - f.col("new.throughputs").alias("throughput"), - f.col("new.anomaly").alias("anomaly")) + "new", f.explode("new")) + if agg_flow == "pod": + plotDF = newDF.select( + "podNamespace", "podLabels" if pod_label else "podName", + "direction", "aggType", + f.col("new.flowEndSeconds").alias("flowEndSeconds"), + "throughputStandardDeviation", "algoType", + f.col("new.algoCalc").alias("algoCalc"), + f.col("new.throughputs").alias("throughput"), + f.col("new.anomaly").alias("anomaly")) + elif agg_flow == "external": + plotDF = newDF.select( + "destinationIP", "aggType", + f.col("new.flowEndSeconds").alias("flowEndSeconds"), + "throughputStandardDeviation", "algoType", + f.col("new.algoCalc").alias("algoCalc"), + f.col("new.throughputs").alias("throughput"), + f.col("new.anomaly").alias("anomaly")) + elif agg_flow == "svc": + plotDF = newDF.select( + "destinationServicePortName", "aggType", + f.col("new.flowEndSeconds").alias("flowEndSeconds"), + "throughputStandardDeviation", "algoType", + f.col("new.algoCalc").alias("algoCalc"), + f.col("new.throughputs").alias("throughput"), + f.col("new.anomaly").alias("anomaly")) + else: + plotDF = newDF.select( + "sourceIP", "sourceTransportPort", "destinationIP", + "destinationTransportPort", + "protocolIdentifier", "flowStartSeconds", "aggType", + f.col("new.flowEndSeconds").alias("flowEndSeconds"), + "throughputStandardDeviation", "algoType", + f.col("new.algoCalc").alias("algoCalc"), + f.col("new.throughputs").alias("throughput"), + f.col("new.anomaly").alias("anomaly")) ret_plot = plotDF.where(~plotDF.anomaly.isin([False])) if ret_plot.count() == 0: + ret_plot = ret_plot.collect() + if agg_flow == "": + agg_type = "e2e" + else: + agg_type = agg_flow ret_plot.append({ "sourceIP": 'None', "sourceTransportPort": 0, @@ -312,8 +405,14 @@ def filter_df_with_true_anomalies(spark, plotDF, algo_type): "destinationTransportPort": 0, "protocolIdentifier": 0, "flowStartSeconds": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), + "podNamespace": 'None', + "podLabels": 'None', + "podName": 'None', + "destinationServicePortName": 'None', + "direction": 'None', "flowEndSeconds": 0, "throughputStandardDeviation": 0, + "aggType": agg_type, "algoType": algo_type, "algoCalc": 0.0, "throughput": 0.0, @@ -323,11 +422,11 @@ def filter_df_with_true_anomalies(spark, plotDF, algo_type): def plot_anomaly(spark, init_plot_df, algo_type, algo_func, anomaly_func, - tad_id_input): + tad_id_input, agg_flow=None, pod_label=None): # Insert the Algo currently in use init_plot_df = init_plot_df.withColumn('algoType', f.lit(algo_type)) - # Common schema List - common_schema_list = [ + # Schema List + schema_list = [ StructField('sourceIP', StringType(), True), StructField('sourceTransportPort', LongType(), True), StructField('destinationIP', StringType(), True), @@ -337,43 +436,65 @@ def plot_anomaly(spark, init_plot_df, algo_type, algo_func, anomaly_func, StructField('flowEndSeconds', ArrayType(TimestampType(), True)), StructField('throughputStandardDeviation', DoubleType(), True) ] - # Calculate anomaly Values on the DF algo_func_rdd = init_plot_df.rdd.map( lambda x: (x[0], x[1], x[2], x[3], x[4], x[5], - x[6], x[7], x[8], x[9], algo_func(x[8]))) - - # Schema for the Dataframe to be created from the RDD - algo_func_rdd_Schema = StructType(common_schema_list + [ - StructField('Diff_Secs, Throughput', ArrayType(StructType([ - StructField("Diff_Secs", LongType(), True), - StructField("max(throughput)", DecimalType(38, 18), True) - ]))), - StructField('algoType', StringType(), True), - StructField('algoCalc', ArrayType(DoubleType(), True)) - ]) - - algo_DF = spark.createDataFrame(algo_func_rdd, algo_func_rdd_Schema) - algo_DF = algo_DF.withColumn("throughputs", f.col( - "Diff_Secs, Throughput.max(throughput)").cast( - ArrayType(DecimalType(38, 18)))).drop("Diff_Secs, Throughput") - - # DF to RDD to calculate anomaly using EWMA, ARIMA and DBSCAN - anomaly_func_rdd = algo_DF.rdd.map( - lambda x: (x[0], x[1], x[2], x[3], x[4], x[5], - x[6], x[7], x[8], x[9], x[10], - anomaly_func(x))) + x[6], x[7], x[8], x[9], x[10], algo_func(x[8]), + anomaly_func(x[8], x[7]))) + if agg_flow == "pod": + if pod_label: + schema_list = [ + StructField('podNamespace', StringType(), True), + StructField('podLabels', StringType(), True), + StructField('direction', StringType(), True), + StructField('flowEndSeconds', ArrayType(TimestampType(), + True)), + StructField('throughputStandardDeviation', DoubleType(), True) + ] + else: + schema_list = [ + StructField('podNamespace', StringType(), True), + StructField('podName', StringType(), True), + StructField('direction', StringType(), True), + StructField('flowEndSeconds', ArrayType(TimestampType(), + True)), + StructField('throughputStandardDeviation', DoubleType(), True) + ] + algo_func_rdd = init_plot_df.rdd.map( + lambda x: (x[0], x[1], x[2], x[3], x[4], x[5], + x[6], x[7], algo_func(x[5]), + anomaly_func(x[5], x[4]))) + elif agg_flow == "svc": + schema_list = [ + StructField('destinationServicePortName', StringType(), True), + StructField('flowEndSeconds', ArrayType(TimestampType(), True)), + StructField('throughputStandardDeviation', DoubleType(), True) + ] + algo_func_rdd = init_plot_df.rdd.map( + lambda x: (x[0], x[1], x[2], x[3], x[4], x[5], algo_func(x[3]), + anomaly_func(x[3], x[2]))) + elif agg_flow == "external": + schema_list = [ + StructField('destinationIP', StringType(), True), + StructField('flowEndSeconds', ArrayType(TimestampType(), True)), + StructField('throughputStandardDeviation', DoubleType(), True) + ] + algo_func_rdd = init_plot_df.rdd.map( + lambda x: (x[0], x[1], x[2], x[3], x[4], x[5], algo_func(x[3]), + anomaly_func(x[3], x[2]))) # Schema for the Dataframe to be created from the RDD - anomaly_func_rdd_Schema = StructType(common_schema_list + [ + algo_func_rdd_Schema = StructType(schema_list + [ + StructField('throughputs', ArrayType(DecimalType(38, 18), True)), + StructField('aggType', StringType(), True), StructField('algoType', StringType(), True), StructField('algoCalc', ArrayType(DoubleType(), True)), - StructField('throughputs', ArrayType(DecimalType(38, 18), True)), StructField('anomaly', ArrayType(BooleanType(), True)) ]) - anomalyDF = spark.createDataFrame(anomaly_func_rdd, - anomaly_func_rdd_Schema) - ret_plotDF = filter_df_with_true_anomalies(spark, anomalyDF, algo_type) + + anomalyDF = spark.createDataFrame(algo_func_rdd, algo_func_rdd_Schema) + ret_plotDF = filter_df_with_true_anomalies(spark, anomalyDF, algo_type, + agg_flow, pod_label) # Write anomalous records to DB/CSV - Module WIP # Module to write to CSV. Optional. ret_plotDF = ret_plotDF.withColumn( @@ -383,31 +504,154 @@ def plot_anomaly(spark, init_plot_df, algo_type, algo_func, anomaly_func, return ret_plotDF -def generate_tad_sql_query(start_time, end_time, ns_ignore_list): - sql_query = ("SELECT {} FROM {} " - .format(", ".join(FLOW_TABLE_COLUMNS), table_name)) - sql_query_extension = [] - if ns_ignore_list: - sql_query_extension.append( - "sourcePodNamespace NOT IN ({0}) AND " - "destinationPodNamespace NOT IN ({0})".format( - ", ".join("'{}'".format(x) for x in ns_ignore_list))) - if start_time: - sql_query_extension.append( - "flowStartSeconds >= '{}'".format(start_time)) - if end_time: - sql_query_extension.append("flowEndSeconds < '{}'".format(end_time)) - if sql_query_extension: - sql_query += "WHERE " + " AND ".join(sql_query_extension) + " " - sql_query += "GROUP BY {} ".format( - ", ".join(DF_GROUP_COLUMNS + ["flowEndSeconds"])) +def generate_tad_sql_query(start_time, end_time, ns_ignore_list, + agg_flow=None, pod_label=None, external_ip=None, + svc_port_name=None, pod_name=None, + pod_namespace=None): + if agg_flow == "pod": + agg_flow_table_columns_pod_inbound = ( + AGG_FLOW_TABLE_COLUMNS_POD_INBOUND) + agg_flow_table_columns_pod_outbound = ( + AGG_FLOW_TABLE_COLUMNS_POD_OUTBOUND) + df_agg_grp_columns_pod = DF_AGG_GRP_COLUMNS_POD + if pod_label: + inbound_condition = ( + "ilike(destinationPodLabels, '%{}%') ".format(pod_label)) + outbound_condition = ( + "ilike(sourcePodLabels, '%{}%')".format(pod_label)) + if pod_namespace: + inbound_condition += ( + " AND destinationPodNamespace = '{}'".format( + pod_namespace)) + outbound_condition += ( + " AND sourcePodNamespace = '{}'".format(pod_namespace)) + elif pod_name: + inbound_condition = ( + "destinationPodName = '{}'".format(pod_name)) + outbound_condition = ( + "sourcePodName = '{}'".format(pod_name)) + if pod_namespace: + inbound_condition += ( + " AND destinationPodNamespace = '{}'".format( + pod_namespace)) + outbound_condition += ( + " AND sourcePodNamespace = '{}'".format(pod_namespace)) + agg_flow_table_columns_pod_inbound = ( + AGG_FLOW_TABLE_COLUMNS_PODNAME_INBOUND) + agg_flow_table_columns_pod_outbound = ( + AGG_FLOW_TABLE_COLUMNS_PODNAME_OUTBOUND) + df_agg_grp_columns_pod = DF_AGG_GRP_COLUMNS_PODNAME + else: + inbound_condition = ( + "destinationPodLabels <> '' ") + outbound_condition = ( + "sourcePodLabels <> ''") + if ns_ignore_list: + sql_query_extension = ( + "AND sourcePodNamespace NOT IN ({0}) AND " + "destinationPodNamespace NOT IN ({0})".format( + ", ".join("'{}'".format(x) for x in ns_ignore_list))) + else: + sql_query_extension = "" + sql_query = ( + "SELECT * FROM " + "(SELECT {0} FROM {1} WHERE {2} {6} GROUP BY {3}) " + "UNION ALL " + "(SELECT {4} FROM {1} WHERE {5} {6} GROUP BY {3}) ".format( + ", ".join(agg_flow_table_columns_pod_inbound), + table_name, inbound_condition, ", ".join( + df_agg_grp_columns_pod + ['flowEndSeconds']), + ", ".join(agg_flow_table_columns_pod_outbound), + outbound_condition, sql_query_extension)) + else: + common_flow_table_columns = FLOW_TABLE_COLUMNS + if agg_flow == "external": + common_flow_table_columns = AGG_FLOW_TABLE_COLUMNS_EXTERNAL + elif agg_flow == "svc": + common_flow_table_columns = AGG_FLOW_TABLE_COLUMNS_SVC + + sql_query = ("SELECT {} FROM {} ".format( + ", ".join(common_flow_table_columns), table_name)) + sql_query_extension = [] + if ns_ignore_list: + sql_query_extension.append( + "sourcePodNamespace NOT IN ({0}) AND " + "destinationPodNamespace NOT IN ({0})".format( + ", ".join("'{}'".format(x) for x in ns_ignore_list))) + if start_time: + sql_query_extension.append( + "flowStartSeconds >= '{}'".format(start_time)) + if end_time: + sql_query_extension.append( + "flowEndSeconds < '{}'".format(end_time)) + if agg_flow: + if agg_flow == "external": + # TODO agg=destination IP, change the name to external + sql_query_extension.append("flowType = 3") + if external_ip: + sql_query_extension.append("destinationIP = '{}'".format( + external_ip)) + elif agg_flow == "svc": + if svc_port_name: + sql_query_extension.append( + "destinationServicePortName = '{}'".format( + svc_port_name)) + else: + sql_query_extension.append( + "destinationServicePortName <> ''") + + if sql_query_extension: + sql_query += "WHERE " + " AND ".join(sql_query_extension) + " " + df_group_columns = DF_GROUP_COLUMNS + if agg_flow == "external": + df_group_columns = DF_AGG_GRP_COLUMNS_EXTERNAL + elif agg_flow == "svc": + df_group_columns = DF_AGG_GRP_COLUMNS_SVC + + sql_query += "GROUP BY {} ".format( + ", ".join(df_group_columns + [ + "flowEndSeconds"])) return sql_query +def assign_flow_type(prepared_DF, agg_flow=None, direction=None): + if agg_flow == "external": + prepared_DF = prepared_DF.withColumn('aggType', f.lit( + "external")) + prepared_DF = prepared_DF.drop('flowType') + elif agg_flow == "svc": + prepared_DF = prepared_DF.withColumn('aggType', f.lit("svc")) + elif agg_flow == "pod": + prepared_DF = prepared_DF.withColumn('aggType', f.lit("pod")) + else: + prepared_DF = prepared_DF.withColumn('aggType', f.lit("e2e")) + return prepared_DF + + +def remove_meaningless_labels(podLabels): + try: + labels_dict = json.loads(podLabels) + except Exception as e: + logger.error( + "Error {}: labels {} are not in json format".format(e, podLabels) + ) + return "" + labels_dict = { + key: value + for key, value in labels_dict.items() + if key not in MEANINGLESS_LABELS + } + return json.dumps(labels_dict, sort_keys=True) + + def anomaly_detection(algo_type, db_jdbc_address, start_time, end_time, - tad_id_input, ns_ignore_list): + tad_id_input, ns_ignore_list, agg_flow=None, + pod_label=None, external_ip=None, svc_port_name=None, + pod_name=None, pod_namespace=None): spark = SparkSession.builder.getOrCreate() - sql_query = generate_tad_sql_query(start_time, end_time, ns_ignore_list) + sql_query = generate_tad_sql_query( + start_time, end_time, ns_ignore_list, agg_flow, pod_label, + external_ip, svc_port_name, pod_name, pod_namespace) initDF = ( spark.read.format("jdbc").option( 'driver', "ru.yandex.clickhouse.ClickHouseDriver").option( @@ -417,22 +661,52 @@ def anomaly_detection(algo_type, db_jdbc_address, start_time, end_time, "query", sql_query).load() ) - prepared_DF = initDF.groupby(DF_GROUP_COLUMNS).agg( - f.collect_list("flowEndSeconds").alias("flowEndSeconds"), - f.stddev_samp("max(throughput)").alias("throughputStandardDeviation"), - f.collect_list(f.struct(["Diff_Secs", "max(throughput)"])).alias( - "Diff_Secs, Throughput")) + if agg_flow: + if agg_flow == "pod": + if pod_name: + df_agg_grp_columns = DF_AGG_GRP_COLUMNS_PODNAME + else: + df_agg_grp_columns = DF_AGG_GRP_COLUMNS_POD + elif agg_flow == "external": + df_agg_grp_columns = DF_AGG_GRP_COLUMNS_EXTERNAL + elif agg_flow == "svc": + df_agg_grp_columns = DF_AGG_GRP_COLUMNS_SVC + prepared_DF = initDF.groupby(df_agg_grp_columns).agg( + f.collect_list("flowEndSeconds").alias("flowEndSeconds"), + f.stddev_samp("sum(throughput)").alias( + "throughputStandardDeviation"), + f.collect_list("sum(throughput)").alias("throughputs")) + else: + prepared_DF = initDF.groupby(DF_GROUP_COLUMNS).agg( + f.collect_list("flowEndSeconds").alias("flowEndSeconds"), + f.stddev_samp("max(throughput)").alias( + "throughputStandardDeviation"), + f.collect_list("max(throughput)").alias("throughputs")) + + prepared_DF = assign_flow_type(prepared_DF, agg_flow) + if agg_flow == "pod" and not pod_name: + prepared_DF = ( + prepared_DF.withColumn( + "PodLabels", + f.udf(remove_meaningless_labels, StringType())( + "PodLabels" + ), + ) + ) if algo_type == "EWMA": ret_plot = plot_anomaly(spark, prepared_DF, algo_type, calculate_ewma, - calculate_ewma_anomaly, tad_id_input) + calculate_ewma_anomaly, tad_id_input, agg_flow, + pod_label) elif algo_type == "ARIMA": ret_plot = plot_anomaly(spark, prepared_DF, algo_type, calculate_arima, - calculate_arima_anomaly, tad_id_input) + calculate_arima_anomaly, tad_id_input, + agg_flow, pod_label) elif algo_type == "DBSCAN": ret_plot = plot_anomaly(spark, prepared_DF, algo_type, calculate_dbscan, - calculate_dbscan_anomaly, tad_id_input) + calculate_dbscan_anomaly, tad_id_input, + agg_flow, pod_label) return spark, ret_plot @@ -461,6 +735,12 @@ def main(): end_time = "" tad_id_input = None ns_ignore_list = [] + agg_flow = "" + pod_label = "" + external_ip = "" + svc_port_name = "" + pod_name = "" + pod_namespace = "" help_message = """ Start the Throughput Anomaly Detection spark job. Options: @@ -482,15 +762,26 @@ def main(): of flow records. -i, --id=None: Throughput Anomaly Detection job ID in UUID format. If not specified, it will be generated automatically. - -n, --ns_ignore_list=[]: List of namespaces to ignore in anomaly + -n, --ns-ignore-list=[]: List of namespaces to ignore in anomaly calculation. + -f, --agg-flow=None: Aggregated Flow Throughput Anomaly Detection. + -l, --pod-label=None: Aggregated Flow Throughput Anomaly Detection + to/from Pod using pod labels + -N, --pod-name=None: Aggregated Flow Throughput Anomaly Detection + to/from Pod using pod Name + -P, --pod-namespace=None: Aggregated Flow Throughput Anomaly Detection + to/from Pod using pod namespace + -x, --external-ip=None: Aggregated Flow Throughput Anomaly Detection + to Destination IP + -p, --svc-port-name=None: Aggregated Flow Throughput Anomaly Detection + to Destination Service Port """ # TODO: change to use argparse instead of getopt for options try: opts, _ = getopt.getopt( sys.argv[1:], - "ht:d:s:e:i:n:", + "ht:d:s:e:i:n:f:l:d:x:p:N:P", [ "help", "algo=", @@ -498,7 +789,13 @@ def main(): "start_time=", "end_time=", "id=", - "ns_ignore_list=" + "ns_ignore_list=", + "agg-flow=", + "pod-label=", + "external-ip=", + "svc-port-name=", + "pod-name=", + "pod-namespace=", ], ) except getopt.GetoptError as e: @@ -559,6 +856,18 @@ def main(): ns_ignore_list = arg_list elif opt in ("-i", "--id"): tad_id_input = arg + elif opt in ("-f", "--agg-flow"): + agg_flow = arg + elif opt in ("-l", "--pod-label"): + pod_label = arg + elif opt in ("-N", "--pod-name"): + pod_name = arg + elif opt in ("-P", "--pod-namespace"): + pod_namespace = arg + elif opt in ("-x", "--external-ip"): + external_ip = arg + elif opt in ("-", "--svc-port-name"): + svc_port_name = arg func_start_time = time.time() logger.info("Script started at {}".format( @@ -569,7 +878,13 @@ def main(): start_time, end_time, tad_id_input, - ns_ignore_list + ns_ignore_list, + agg_flow, + pod_label, + external_ip, + svc_port_name, + pod_name, + pod_namespace ) func_end_time = time.time() tad_id = write_anomaly_detection_result( diff --git a/plugins/anomaly-detection/anomaly_detection_test.py b/plugins/anomaly-detection/anomaly_detection_test.py index 9d6f18dd..9959f6c2 100644 --- a/plugins/anomaly-detection/anomaly_detection_test.py +++ b/plugins/anomaly-detection/anomaly_detection_test.py @@ -22,22 +22,32 @@ @pytest.fixture(scope="session") def spark_session(request): spark_session = ( - SparkSession.builder.master("local") - .appName("anomlay_detection_job_test") - .getOrCreate() + SparkSession.builder.master("local").appName( + "anomaly_detection_job_test").getOrCreate() ) request.addfinalizer(lambda: spark_session.sparkContext.stop()) return spark_session table_name = "default.flows" +inbound_condition = ( + "ilike(destinationPodLabels, '%\"app\":\"clickhouse\"%') ") +outbound_condition = ("ilike(sourcePodLabels, '%\"app\":\"clickhouse\"%')") +inbound_condition_podname = ( + "destinationPodName = 'TestPodName'") +outbound_condition_podname = ( + "sourcePodName = 'TestPodName'") +inbound_condition_podnamespace = ( + " AND destinationPodNamespace = 'TestPodNamespace'") +outbound_condition_podnamespace = ( + " AND sourcePodNamespace = 'TestPodNamespace'") @pytest.mark.parametrize( "test_input, expected_sql_query", [ ( - ("", "", []), + ("", "", [], "", "", "", "", "", ""), "SELECT {} FROM {} GROUP BY {} ".format( ", ".join(ad.FLOW_TABLE_COLUMNS), table_name, @@ -45,7 +55,7 @@ def spark_session(request): ), ), ( - ("2022-01-01 00:00:00", "", []), + ("2022-01-01 00:00:00", "", [], "", "", "", "", "", ""), "SELECT {} FROM {} WHERE " "flowStartSeconds >= '2022-01-01 00:00:00' " "GROUP BY {} ".format( @@ -55,7 +65,7 @@ def spark_session(request): ), ), ( - ("", "2022-01-01 23:59:59", []), + ("", "2022-01-01 23:59:59", [], "", "", "", "", "", ""), "SELECT {} FROM {} WHERE " "flowEndSeconds < '2022-01-01 23:59:59' GROUP BY {} ".format( ", ".join(ad.FLOW_TABLE_COLUMNS), @@ -64,7 +74,8 @@ def spark_session(request): ), ), ( - ("2022-01-01 00:00:00", "2022-01-01 23:59:59", []), + ("2022-01-01 00:00:00", "2022-01-01 23:59:59", [], "", "", "", + "", "", ""), "SELECT {} FROM {} WHERE " "flowStartSeconds >= '2022-01-01 00:00:00' AND " "flowEndSeconds < '2022-01-01 23:59:59' " @@ -75,7 +86,7 @@ def spark_session(request): ), ), ( - ("", "", ["mock_ns", "mock_ns2"]), + ("", "", ["mock_ns", "mock_ns2"], "", "", "", "", "", ""), "SELECT {} FROM {} WHERE " "sourcePodNamespace NOT IN ('mock_ns', 'mock_ns2') AND " "destinationPodNamespace NOT IN ('mock_ns', 'mock_ns2') " @@ -85,46 +96,162 @@ def spark_session(request): ", ".join(ad.DF_GROUP_COLUMNS + ['flowEndSeconds']), ), ), + ( + ("", "", [], "external", "", "10.0.0.1", "", "", ""), + "SELECT {} FROM {} WHERE " + "flowType = 3 AND destinationIP = '10.0.0.1' " + "GROUP BY {} ".format( + ", ".join(ad.AGG_FLOW_TABLE_COLUMNS_EXTERNAL), + table_name, + ", ".join( + ad.DF_AGG_GRP_COLUMNS_EXTERNAL + ['flowEndSeconds']), + ) + ), + ( + ("", "", [], "pod", "\"app\":\"clickhouse\"", "", "", "", + ""), + "SELECT * FROM " + "(SELECT {0} FROM {1} WHERE {2} GROUP BY {3}) " + "UNION ALL " + "(SELECT {4} FROM {1} WHERE {5} GROUP BY {3}) ".format( + ", ".join(ad.AGG_FLOW_TABLE_COLUMNS_POD_INBOUND), + table_name, inbound_condition, + ", ".join(ad.DF_AGG_GRP_COLUMNS_POD + ['flowEndSeconds']), + ", ".join(ad.AGG_FLOW_TABLE_COLUMNS_POD_OUTBOUND), + outbound_condition) + ), + ( + ("", "", [], "pod", "", "", "", "TestPodName", ""), + "SELECT * FROM " + "(SELECT {0} FROM {1} WHERE {2} GROUP BY {3}) " + "UNION ALL " + "(SELECT {4} FROM {1} WHERE {5} GROUP BY {3}) ".format( + ", ".join(ad.AGG_FLOW_TABLE_COLUMNS_PODNAME_INBOUND), + table_name, inbound_condition_podname, + ", ".join(ad.DF_AGG_GRP_COLUMNS_PODNAME + + ['flowEndSeconds']), + ", ".join(ad.AGG_FLOW_TABLE_COLUMNS_PODNAME_OUTBOUND), + outbound_condition_podname) + ), + ( + ("", "", [], "pod", "", "", "", "TestPodName", + "TestPodNamespace"), + "SELECT * FROM " + "(SELECT {0} FROM {1} WHERE {2} GROUP BY {3}) " + "UNION ALL " + "(SELECT {4} FROM {1} WHERE {5} GROUP BY {3}) ".format( + ", ".join(ad.AGG_FLOW_TABLE_COLUMNS_PODNAME_INBOUND), + table_name, inbound_condition_podname + + inbound_condition_podnamespace, + ", ".join(ad.DF_AGG_GRP_COLUMNS_PODNAME + + ['flowEndSeconds']), + ", ".join(ad.AGG_FLOW_TABLE_COLUMNS_PODNAME_OUTBOUND), + outbound_condition_podname + + outbound_condition_podnamespace) + ), + ( + ("", "", ["mock_ns", "mock_ns2"], "pod", + "\"app\":\"clickhouse\"", "", "", "", ""), + "SELECT * FROM " + "(SELECT {0} FROM {1} WHERE {2} {6} GROUP BY {3}) " + "UNION ALL " + "(SELECT {4} FROM {1} WHERE {5} {6} GROUP BY {3}) ".format( + ", ".join(ad.AGG_FLOW_TABLE_COLUMNS_POD_INBOUND), + table_name, inbound_condition, + ", ".join(ad.DF_AGG_GRP_COLUMNS_POD + ['flowEndSeconds']), + ", ".join(ad.AGG_FLOW_TABLE_COLUMNS_POD_OUTBOUND), + outbound_condition, + "AND sourcePodNamespace NOT IN ('mock_ns', 'mock_ns2') AND" + " destinationPodNamespace NOT IN ('mock_ns', 'mock_ns2')") + ), + ( + ("", "", [], "svc", "", "", "", "", ""), + "SELECT {} FROM {} WHERE " + "destinationServicePortName <> '' " + "GROUP BY {} ".format( + ", ".join(ad.AGG_FLOW_TABLE_COLUMNS_SVC), + table_name, + ", ".join(ad.DF_AGG_GRP_COLUMNS_SVC + ['flowEndSeconds']), + ) + ), + ( + ("", "", [], "svc", "", "", "test-service-port-name", "", ""), + "SELECT {} FROM {} WHERE " + "destinationServicePortName = 'test-service-port-name' " + "GROUP BY {} ".format( + ", ".join(ad.AGG_FLOW_TABLE_COLUMNS_SVC), + table_name, + ", ".join(ad.DF_AGG_GRP_COLUMNS_SVC + ['flowEndSeconds']), + ) + ), ], ) def test_generate_sql_query(test_input, expected_sql_query): - start_time, end_time, ns_ignore_list = test_input - sql_query = ad.generate_tad_sql_query(start_time, end_time, ns_ignore_list) + (start_time, end_time, ns_ignore_list, agg_flow, pod_label, external_ip, + svc_port_name, pod_name, pod_namespace) = test_input + sql_query = ad.generate_tad_sql_query( + start_time, end_time, ns_ignore_list, agg_flow, pod_label, + external_ip, svc_port_name, pod_name, pod_namespace) assert sql_query == expected_sql_query -diff_secs_throughput_list = [ - [5220, 4004471308], [4860, 4006917952], [6720, 4006373555], - [7080, 10004969097], [6000, 4005703059], [7140, 4005517222], - [3600, 4007380032], [3780, 4005277827], [4800, 4005435632], - [8580, 4004723289], [6300, 4005760579], [4740, 4005486294], - [8640, 4006172825], [5640, 4005486294], [8700, 4005561235], - [7200, 1005533779], [5340, 4005486294], [6600, 4004706899], - [6660, 4006355667], [5280, 4005277827], [4380, 4005277827], - [7920, 4005355097], [7560, 4005615814], [7500, 4004496934], - [8100, 4004839744], [4440, 4005486294], [7260, 4005370905], - [5580, 4005277827], [4680, 4005277827], [6360, 4006503308], - [8520, 4006191046], [6180, 4004834307], [5880, 4006201196], - [5760, 4004465468], [7860, 4006448435], [6780, 4005542681]] +# Introduced 2 anomalies in between the lists +throughput_list = [ + 4007380032, 4006917952, 4004471308, 4005277827, 4005486294, + 4005435632, 4006917952, 4004471308, 4005277827, 4005486294, + 4005435632, 4006917952, 4004471308, 4005277827, 4005486294, + 4005435632, 4006917952, 4004471308, 4005277827, 4005486294, + 4005435632, 4006917952, 4004471308, 4005277827, 4005486294, + 4005435632, 4006917952, 4004471308, 4005277827, 4005486294, + 4005435632, 4006917952, 4004471308, 4005277827, 4005486294, + 4005435632, 4004465468, 4005336400, 4006201196, 4005546675, + 4005703059, 4004631769, 4006915708, 4004834307, 4005943619, + 4005760579, 4006503308, 4006580124, 4006524102, 4005521494, + 4004706899, 4006355667, 4006373555, 4005542681, 4006120227, + 4003599734, 4005561673, 4005682768, 10004969097, 4005517222, + 1005533779, 4005370905, 4005589772, 4005328806, 4004926121, + 4004496934, 4005615814, 4005798822, 50007861276, 4005396697, + 4005148294, 4006448435, 4005355097, 4004335558, 4005389043, + 4004839744, 4005556492, 4005796992, 4004497248, 4005988134, + 205881027, 4004638304, 4006191046, 4004723289, 4006172825, + 4005561235, 4005658636, 4006005936, 3260272025, 4005589772] expected_ewma_row_list = [ - 2002235654.0, 3004576803.0, 3505475179.0, - 6755222138.0, 5380462598.5, 4692989910.25, - 4350184971.125, 4177731399.0625, 4091583515.53125, - 4048153402.265625, 4026956990.6328125, 4016221642.3164062, - 4011197233.658203, 4008341763.8291016, 4006951499.414551, - 2506242639.2072754, 3255864466.6036377, 3630285682.801819, - 3818320674.9009094, 3911799250.9504547, 3958538538.9752274, - 3981946817.9876137, 3993781315.993807, 3999139124.9969034, - 4001989434.4984517, 4003737864.2492256, 4004554384.624613, - 4004916105.8123064, 4005096966.406153, 4005800137.2030764, - 4005995591.601538, 4005414949.300769, 4005808072.6503844, - 4005136770.3251925, 4005792602.662596, 4005667641.831298] + 2003690016.0, 3005303984.0, 3504887646.0, + 3755082736.5, 3880284515.25, 3942860073.625, + 3974889012.8125, 3989680160.40625, 3997478993.703125, + 4001482643.8515625, 4003459137.9257812, 4005188544.9628906, + 4004829926.4814453, 4005053876.7407227, 4005270085.3703613, + 4005352858.6851807, 4006135405.3425903, 4005303356.671295, + 4005290591.8356476, 4005388442.917824, 4005412037.458912, + 4006164994.729456, 4005318151.364728, 4005297989.182364, + 4005392141.5911818, 4005413886.795591, 4006165919.3977957, + 4005318613.698898, 4005298220.349449, 4005392257.1747246, + 4005413944.5873623, 4006165948.293681, 4005318628.1468406, + 4005298227.5734205, 4005392260.7867103, 4005413946.3933554, + 4004939707.1966777, 4005138053.598339, 4005669624.7991695, + 4005608149.899585, 4005655604.4497924, 4005143686.7248964, + 4006029697.362448, 4005432002.181224, 4005687810.590612, + 4005724194.795306, 4006113751.397653, 4006346937.698827, + 4006435519.8494134, 4005978506.9247065, 4005342702.962353, + 4005849184.9811764, 4006111369.990588, 4005827025.495294, + 4005973626.2476473, 4004786680.1238236, 4005174176.5619116, + 4005428472.280956, 7005198784.640478, 5505358003.320239, + 3255445891.1601195, 3630408398.08006, 3817999085.04003, + 3911663945.520015, 3958295033.2600074, 3981395983.630004, + 3993505898.815002, 3999652360.407501, 27003756818.20375, + 15504576757.601875, 9754862525.800938, 6880655480.400469, + 5443005288.700234, 4723670423.350117, 4364529733.175058, + 4184684738.587529, 4095120615.2937646, 4050458803.646882, + 4027478025.823441, 4016733079.9117203, 2111307053.4558601, + 3057972678.72793, 3532081862.363965, 3768402575.6819825, + 3887287700.340991, 3946424467.6704955, 3976041551.835248, + 3991023743.917624, 3625647884.4588118, 3815618828.229406] @pytest.mark.parametrize( "test_input, expected_output", - [(diff_secs_throughput_list, expected_ewma_row_list), ], + [(throughput_list, expected_ewma_row_list), ], ) def test_calculate_ewma(test_input, expected_output): ewma_row_list = ad.calculate_ewma(test_input) @@ -132,20 +259,23 @@ def test_calculate_ewma(test_input, expected_output): expected_arima_row_list = [ - 40044, 40069, 40063, 40055, - 10006, 49890, 40055, 40106, - 40058, 40054, 42977, 40055, - 37671, 40060, 42326, 37999, - 13902, 36050, 36215, 36385, - 40014, 40047, 40052, 40055, - 40047, 40047, 37220, 37309, - 37394, 37474, 37556, 37626, - 37686, 37758, 37809, 37878] + 40073, 40069, 40044, 40044, 40052, 40055, 40054, 40065, + 40067, 40054, 40055, 40055, 40056, 40060, 40054, 40054, + 40054, 40056, 40059, 40054, 40053, 40054, 40056, 40059, + 40054, 40053, 40054, 40056, 40059, 40054, 40053, 40054, + 40055, 40059, 40054, 40053, 40054, 40053, 40051, 40052, + 40055, 40055, 40054, 40053, 40057, 40055, 40056, 40057, + 40059, 40062, 40062, 40058, 40056, 40059, 40059, 40058, + 40057, 40051, 40053, 98895, 63726, 11685, 56532, 39834, + 39838, 39841, 39844, 39847, 39850, 51939, 42449, 41960, + 41624, 41417, 41407, 41379, 41361, 41344, 41328, 41311, + 41296, 37786, 39971, 39973, 39973, 39975, 39975, 39976, + 39977, 39795] @pytest.mark.parametrize( "test_input, expected_output", - [(diff_secs_throughput_list, expected_arima_row_list), ], + [(throughput_list, expected_arima_row_list), ], ) def test_calculate_arima(test_input, expected_output): arima_list = ad.calculate_arima(test_input) @@ -155,80 +285,118 @@ def test_calculate_arima(test_input, expected_output): stddev = 4.9198515356827E9 -# Introduced 2 anomalies in between the lists -throughput_list = [ - 4.0044713079999986E9, 4.006917951999995E9, 4.006373555E9, - 4.0064328864065647E9, 1.0001208441920076E10, 4.991089312943837E9, - 4.0055171211742964E9, 4.626073573776036E9, 4.534010184967284E9, - 4.466799561945112E9, 4.415333279500873E9, 4.374398755270068E9, - 4.34118839784096E9, 4.313543957535702E9, 4.2901708280305667E9, - 4.2701589854351115E9, 2.8325782582807236E9, 2.878340351423177E9, - 3.385947522781177E9, 3.5646004395330224E9, 3.6667007752395616E9, - 3.734521818953146E9, 3.783516303056411E9, 3.820958451443989E9, - 3.8506855053810143E10, 3.8756093718031974E9, 3.8975599768541164E9, - 3.9181189080811553E10, 3.9389864233827744E9, 3.958470156413464E9, - 3.9601061005316563E9, 3.9615580705907497E9, 3.9628158138629975E9, - 3.9641264728285837E9, 3.9652109214908457E9, 3.9664069982877073E9] - expanded_arima_row_list = [ - 4004471307.9999986, 4006917951.999995, 4006373555.0, - 4005589532.039936, 10006702026.604738, 4989043846.332678, - 4005517137.571196, 4010659163.493726, 4005892608.887371, - 4005450688.4167933, 4297790490.967786, 4005582738.5384636, - 3767119154.3932557, 4006051692.1707573, 4232602969.5832963, - 3799968523.9055543, 1390254377.7501612, 3605038207.7053514, - 3621518502.8757067, 3638555962.676026, 4001438213.3433123, - 4004721638.3185096, 4005245177.0466213, 4005527131.198336, - 4004788075.4534545, 4004792510.4156027, 3722002449.8679004, - 3730981405.6425004, 3739427992.656704, 3747434614.2705755, - 3755687741.4894, 3762658349.287123, 3768651030.8630514, - 3775820305.253907, 3780919888.648877, 3787800077.0179124 -] + 4007380031.999998, 4006917951.999995, 4004471307.9999986, + 4004471338.531294, 4005277824.246516, 4005540171.967446, + 4005417708.3450212, 4006596903.605313, 4006766397.4873962, + 4005464712.562591, 4005505914.47936, 4005509545.4651246, + 4005696322.736301, 4006033558.7986755, 4005433015.8854957, + 4005412742.398469, 4005435189.6809897, 4005639605.435888, + 4005955240.943543, 4005417006.2521853, 4005394611.0058765, + 4005423107.5286036, 4005615008.6452837, 4005930744.817921, + 4005411345.290995, 4005387458.632538, 4005419217.319681, + 4005600201.8535633, 4005919351.798048, 4005407929.6769786, + 4005383100.809166, 4005417053.9240017, 4005590088.792332, + 4005912966.4215846, 4005405453.2374043, 4005380049.8736587, + 4005415584.314455, 4005347223.1615577, 4005118865.3445063, + 4005284593.2700768, 4005512211.463756, 4005544131.8972206, + 4005486224.1983933, 4005378548.2363796, 4005763254.8659606, + 4005506442.62549, 4005639283.9637246, 4005700488.7304854, + 4005994680.054991, 4006202832.9615264, 4006238860.0935726, + 4005890090.9782805, 4005645200.299684, 4005901661.0918307, + 4005961401.6530385, 4005890634.274731, 4005710152.5932317, + 4005117957.4774227, 4005335916.072919, 9889541964.746168, + 6372659702.031328, 1168584960.8614435, 5653370820.402384, + 3983485999.737595, 3983817981.649843, 3984128774.440398, + 3984428874.23501, 3984757281.599942, 3985066198.7098646, + 5193940800.99889, 4244915044.8262258, 4196033928.4701886, + 4163392386.852055, 4141770112.8968925, 4139754955.6538, + 4137999352.749037, 4136188331.593653, 4134572893.2860427, + 4132811885.9396014, 4131164397.6772537, 4129639059.839085, + 3778811317.851506, 3997193748.126457, 3997318079.4362803, + 3997388443.8144045, 3997506294.672869, 3997592427.864033, + 3997686533.833508, 3997782900.975419, 3979560170.4104342] -expected_anomaly_list = [ +expected_anomaly_list_arima = [ + False, False, False, False, False, False, + False, False, False, False, False, False, + False, False, False, False, False, False, False, False, False, False, False, False, False, False, False, False, False, False, False, False, False, False, False, False, False, False, False, False, False, False, - True, False, False, True, False, False, + False, False, False, False, False, False, + False, False, False, False, False, False, + False, False, False, False, True, True, + True, False, False, False, False, False, + False, False, True, False, False, False, + False, False, False, False, False, False, + False, False, False, False, False, False, False, False, False, False, False, False] @pytest.mark.parametrize( "test_input, expected_arima_anomaly", - [(["", "", "", "", "", "", "", stddev, "", expanded_arima_row_list, - throughput_list], expected_anomaly_list), ], + [((throughput_list, stddev), expected_anomaly_list_arima), ], ) def test_calculate_arima_anomaly(test_input, expected_arima_anomaly): - anomaly_list = ad.calculate_arima_anomaly(test_input) + throughput_list, stddev = test_input + anomaly_list = ad.calculate_arima_anomaly(throughput_list, stddev) assert anomaly_list == expected_arima_anomaly +expected_anomaly_list_ewma = [ + False, False, False, False, False, False, + False, False, False, False, False, False, + False, False, False, False, False, False, + False, False, False, False, False, False, + False, False, False, False, False, False, + False, False, False, False, False, False, + False, False, False, False, False, False, + False, False, False, False, False, False, + False, False, False, False, False, False, + False, False, False, False, False, False, + False, False, False, False, False, False, + False, False, True, True, True, False, + False, False, False, False, False, False, + False, False, False, False, False, False, + False, False, False, False, False, False] + + @pytest.mark.parametrize( "test_input, expected_ewma_anomaly", - [(["", "", "", "", "", "", "", stddev, "", expected_ewma_row_list, - throughput_list], expected_anomaly_list), ], + [((throughput_list, stddev), expected_anomaly_list_ewma), ], ) def test_calculate_ewma_anomaly(test_input, expected_ewma_anomaly): - anomaly_list = ad.calculate_ewma_anomaly(test_input) + throughput_list, stddev = test_input + anomaly_list = ad.calculate_ewma_anomaly(throughput_list, stddev) assert anomaly_list == expected_ewma_anomaly expected_dbscan_anomaly_list = [ - False, False, False, False, True, - True, False, False, False, False, - False, False, False, False, False, - False, True, True, False, False, - False, False, False, False, True, - False, False, True, False, False, - False, False, False, False, False, False] + False, False, False, False, False, False, + False, False, False, False, False, False, + False, False, False, False, False, False, + False, False, False, False, False, False, + False, False, False, False, False, False, + False, False, False, False, False, False, + False, False, False, False, False, False, + False, False, False, False, False, False, + False, False, False, False, False, False, + False, False, False, False, True, False, + True, False, False, False, False, False, + False, False, True, False, False, False, + False, False, False, False, False, False, + False, False, True, False, False, False, + False, False, False, False, True, False] @pytest.mark.parametrize( "test_input, expected_dbscan_anomaly", - [(["", "", "", "", "", "", "", "", "", "", throughput_list], + [((throughput_list, stddev), expected_dbscan_anomaly_list), ], ) def test_calculate_dbscan_anomaly(test_input, expected_dbscan_anomaly): - anomaly_list = ad.calculate_dbscan_anomaly(test_input) + throughput_list, stddev = test_input + anomaly_list = ad.calculate_dbscan_anomaly(throughput_list, stddev) assert anomaly_list == expected_dbscan_anomaly diff --git a/test/e2e/fixture.go b/test/e2e/fixture.go index 67397176..f56c2ac4 100644 --- a/test/e2e/fixture.go +++ b/test/e2e/fixture.go @@ -288,7 +288,14 @@ func setupTestForFlowVisibility(tb testing.TB, config FlowVisibiltiySetUpConfig) if err != nil { return testData, v4Enabled, v6Enabled, err } - + _, _, _, err = testData.provider.RunCommandOnNode(controlPlaneNodeName(), "kubectl taint nodes --all node-role.kubernetes.io/master:NoSchedule-") + if err != nil { + fmt.Printf("Error: %v", err) + } + _, _, _, err = testData.provider.RunCommandOnNode(controlPlaneNodeName(), "kubectl taint nodes --all node.cluster.x-k8s.io/uninitialized-") + if err != nil { + fmt.Printf("Error: %v", err) + } tb.Logf("Applying flow visibility YAML") chSvcIP, err := testData.deployFlowVisibility(config) if err != nil { diff --git a/test/e2e/framework.go b/test/e2e/framework.go index 3a3a3d7d..559eb3b9 100644 --- a/test/e2e/framework.go +++ b/test/e2e/framework.go @@ -736,7 +736,7 @@ func (data *TestData) PodWaitFor(timeout time.Duration, name, namespace string, }) if err != nil { if err == wait.ErrWaitTimeout && pod != nil { - return nil, fmt.Errorf("timed out waiting for the condition, Pod.Status: %s", pod.Status.String()) + return nil, fmt.Errorf("timed out waiting for the condition, Pod name:%s, Pod.Status: %s", name, pod.Status.String()) } return nil, err } diff --git a/test/e2e/throughputanomalydetection_test.go b/test/e2e/throughputanomalydetection_test.go index cb9e6f6b..dbbd766e 100644 --- a/test/e2e/throughputanomalydetection_test.go +++ b/test/e2e/throughputanomalydetection_test.go @@ -211,14 +211,14 @@ func testAnomalyDetectionRetrieve(t *testing.T, data *TestData, connect *sql.DB) if resultArray[i] != "" { resultArray[i] = strings.ReplaceAll(resultArray[i], "\t", " ") tadoutputArray := strings.Fields(resultArray[i]) - anomaly_output := tadoutputArray[9] - assert.Equal(10, len(tadoutputArray), "tadoutputArray: %s", tadoutputArray) + anomaly_output := tadoutputArray[11] + assert.Equal(12, len(tadoutputArray), "tadoutputArray: %s", tadoutputArray) switch algo { case "ARIMA": - algo_throughput := tadoutputArray[8][:5] + algo_throughput := tadoutputArray[10][:5] assert.Equal(result_map["ARIMA"][algo_throughput], anomaly_output, "Anomaly outputs dont match in tadoutputArray: %s", tadoutputArray) case "EWMA": - algo_throughput := tadoutputArray[8][:5] + algo_throughput := tadoutputArray[10][:5] assert.Equal(result_map["EWMA"][algo_throughput], anomaly_output, "Anomaly outputs dont match in tadoutputArray: %s", tadoutputArray) case "DBSCAN": throughput := tadoutputArray[7][:5]