From 4c2299f9ca709ff845a2e02bb96696e478a2eb48 Mon Sep 17 00:00:00 2001 From: Tushar Tathgur Date: Tue, 28 Mar 2023 15:08:11 -0700 Subject: [PATCH] Addressed comments in Aggregated Throughput Anomaly Detection partially solves: #168 Signed-off-by: Tushar Tathgur --- .../theia/crds/anomaly-detector-crd.yaml | 12 +- .../provisioning/datasources/create_table.sh | 7 +- .../migrators/000005_0-5-0.down.sql | 14 +- .../datasources/migrators/000005_0-5-0.up.sql | 14 +- build/yamls/flow-visibility.yml | 35 +- pkg/apis/crd/v1alpha1/types.go | 7 +- pkg/apis/intelligence/v1alpha1/types.go | 14 +- .../throughputanomalydetector/rest.go | 102 ++-- .../throughputanomalydetector/rest_test.go | 74 ++- pkg/controller/anomalydetector/controller.go | 30 +- .../anomalydetector/controller_test.go | 20 +- .../commands/anomaly_detection_delete.go | 4 +- .../commands/anomaly_detection_retrieve.go | 28 +- .../anomaly_detection_retrieve_test.go | 20 +- pkg/theia/commands/anomaly_detection_run.go | 69 ++- .../commands/anomaly_detection_run_test.go | 7 +- .../anomaly-detection/anomaly_detection.py | 451 +++++++++++------- .../anomaly_detection_test.py | 420 ++++++++++------ 18 files changed, 783 insertions(+), 545 deletions(-) diff --git a/build/charts/theia/crds/anomaly-detector-crd.yaml b/build/charts/theia/crds/anomaly-detector-crd.yaml index 686e69d0..6e839528 100644 --- a/build/charts/theia/crds/anomaly-detector-crd.yaml +++ b/build/charts/theia/crds/anomaly-detector-crd.yaml @@ -33,10 +33,16 @@ spec: type: array items: type: string - aggflow: + aggFlow: + type: string + podLabel: + type: string + externalIp: + type: string + podName: + type: string + servicePortName: type: string - pod2podlabel: - type: string executorInstances: type: integer driverCoreRequest: diff --git a/build/charts/theia/provisioning/datasources/create_table.sh b/build/charts/theia/provisioning/datasources/create_table.sh index 64cbbf06..45d085f0 100644 --- a/build/charts/theia/provisioning/datasources/create_table.sh +++ b/build/charts/theia/provisioning/datasources/create_table.sh @@ -293,11 +293,10 @@ clickhouse client -n -h 127.0.0.1 <<-EOSQL destinationTransportPort UInt16, protocolIdentifier UInt16, flowStartSeconds DateTime, - sourcePodNamespace String, - sourcePodLabels String, - destinationPodNamespace String, - destinationPodLabels String, + podNamespace String, + podLabels String, destinationServicePortName String, + direction String, flowEndSeconds DateTime, throughputStandardDeviation Float64, aggType String, diff --git a/build/charts/theia/provisioning/datasources/migrators/000005_0-5-0.down.sql b/build/charts/theia/provisioning/datasources/migrators/000005_0-5-0.down.sql index 8a157afc..be4d9bdc 100644 --- a/build/charts/theia/provisioning/datasources/migrators/000005_0-5-0.down.sql +++ b/build/charts/theia/provisioning/datasources/migrators/000005_0-5-0.down.sql @@ -1,13 +1,11 @@ --Drop aggregated TAD columns -ALTER TABLE tadetector DROP COLUMN sourcePodNamespace String; -ALTER TABLE tadetector_local DROP COLUMN sourcePodNamespace String; -ALTER TABLE tadetector DROP COLUMN sourcePodLabels String; -ALTER TABLE tadetector_local DROP COLUMN sourcePodLabels String; -ALTER TABLE tadetector DROP COLUMN destinationPodNamespace String; -ALTER TABLE tadetector_local DROP COLUMN destinationPodNamespace String; -ALTER TABLE tadetector DROP COLUMN destinationPodLabels String; -ALTER TABLE tadetector_local DROP COLUMN destinationPodLabels String; +ALTER TABLE tadetector DROP COLUMN podNamespace String; +ALTER TABLE tadetector_local DROP COLUMN podNamespace String; +ALTER TABLE tadetector DROP COLUMN podLabels String; +ALTER TABLE tadetector_local DROP COLUMN podLabels String; ALTER TABLE tadetector DROP COLUMN destinationServicePortName String; ALTER TABLE tadetector_local DROP COLUMN destinationServicePortName String; ALTER TABLE tadetector DROP COLUMN aggType String; ALTER TABLE tadetector_local DROP COLUMN aggType String; +ALTER TABLE tadetector DROP COLUMN direction String; +ALTER TABLE tadetector_local DROP COLUMN direction String; diff --git a/build/charts/theia/provisioning/datasources/migrators/000005_0-5-0.up.sql b/build/charts/theia/provisioning/datasources/migrators/000005_0-5-0.up.sql index 6e4bca2f..199c19fd 100644 --- a/build/charts/theia/provisioning/datasources/migrators/000005_0-5-0.up.sql +++ b/build/charts/theia/provisioning/datasources/migrators/000005_0-5-0.up.sql @@ -1,13 +1,11 @@ --Add aggregated TAD columns -ALTER TABLE tadetector ADD COLUMN sourcePodNamespace String; -ALTER TABLE tadetector_local ADD COLUMN sourcePodNamespace String; -ALTER TABLE tadetector ADD COLUMN sourcePodLabels String; -ALTER TABLE tadetector_local ADD COLUMN sourcePodLabels String; -ALTER TABLE tadetector ADD COLUMN destinationPodNamespace String; -ALTER TABLE tadetector_local ADD COLUMN destinationPodNamespace String; -ALTER TABLE tadetector ADD COLUMN destinationPodLabels String; -ALTER TABLE tadetector_local ADD COLUMN destinationPodLabels String; +ALTER TABLE tadetector ADD COLUMN podNamespace String; +ALTER TABLE tadetector_local ADD COLUMN podNamespace String; +ALTER TABLE tadetector ADD COLUMN podLabels String; +ALTER TABLE tadetector_local ADD COLUMN podLabels String; ALTER TABLE tadetector ADD COLUMN destinationServicePortName String; ALTER TABLE tadetector_local ADD COLUMN destinationServicePortName String; ALTER TABLE tadetector ADD COLUMN aggType String; ALTER TABLE tadetector_local ADD COLUMN aggType String; +ALTER TABLE tadetector ADD COLUMN direction String; +ALTER TABLE tadetector_local ADD COLUMN direction String; diff --git a/build/yamls/flow-visibility.yml b/build/yamls/flow-visibility.yml index f08e34c3..dc97aca5 100644 --- a/build/yamls/flow-visibility.yml +++ b/build/yamls/flow-visibility.yml @@ -390,32 +390,28 @@ data: engine=Distributed('{cluster}', default, tadetector_local, rand()); 000005_0-5-0.down.sql: | --Drop aggregated TAD columns - ALTER TABLE tadetector DROP COLUMN sourcePodNamespace String; - ALTER TABLE tadetector_local DROP COLUMN sourcePodNamespace String; - ALTER TABLE tadetector DROP COLUMN sourcePodLabels String; - ALTER TABLE tadetector_local DROP COLUMN sourcePodLabels String; - ALTER TABLE tadetector DROP COLUMN destinationPodNamespace String; - ALTER TABLE tadetector_local DROP COLUMN destinationPodNamespace String; - ALTER TABLE tadetector DROP COLUMN destinationPodLabels String; - ALTER TABLE tadetector_local DROP COLUMN destinationPodLabels String; + ALTER TABLE tadetector DROP COLUMN podNamespace String; + ALTER TABLE tadetector_local DROP COLUMN podNamespace String; + ALTER TABLE tadetector DROP COLUMN podLabels String; + ALTER TABLE tadetector_local DROP COLUMN podLabels String; ALTER TABLE tadetector DROP COLUMN destinationServicePortName String; ALTER TABLE tadetector_local DROP COLUMN destinationServicePortName String; ALTER TABLE tadetector DROP COLUMN aggType String; ALTER TABLE tadetector_local DROP COLUMN aggType String; + ALTER TABLE tadetector DROP COLUMN direction String; + ALTER TABLE tadetector_local DROP COLUMN direction String; 000005_0-5-0.up.sql: | --Add aggregated TAD columns - ALTER TABLE tadetector ADD COLUMN sourcePodNamespace String; - ALTER TABLE tadetector_local ADD COLUMN sourcePodNamespace String; - ALTER TABLE tadetector ADD COLUMN sourcePodLabels String; - ALTER TABLE tadetector_local ADD COLUMN sourcePodLabels String; - ALTER TABLE tadetector ADD COLUMN destinationPodNamespace String; - ALTER TABLE tadetector_local ADD COLUMN destinationPodNamespace String; - ALTER TABLE tadetector ADD COLUMN destinationPodLabels String; - ALTER TABLE tadetector_local ADD COLUMN destinationPodLabels String; + ALTER TABLE tadetector ADD COLUMN podNamespace String; + ALTER TABLE tadetector_local ADD COLUMN podNamespace String; + ALTER TABLE tadetector ADD COLUMN podLabels String; + ALTER TABLE tadetector_local ADD COLUMN podLabels String; ALTER TABLE tadetector ADD COLUMN destinationServicePortName String; ALTER TABLE tadetector_local ADD COLUMN destinationServicePortName String; ALTER TABLE tadetector ADD COLUMN aggType String; ALTER TABLE tadetector_local ADD COLUMN aggType String; + ALTER TABLE tadetector ADD COLUMN direction String; + ALTER TABLE tadetector_local ADD COLUMN direction String; create_table.sh: | #!/usr/bin/env bash @@ -703,11 +699,10 @@ data: destinationTransportPort UInt16, protocolIdentifier UInt16, flowStartSeconds DateTime, - sourcePodNamespace String, - sourcePodLabels String, - destinationPodNamespace String, - destinationPodLabels String, + podNamespace String, + podLabels String, destinationServicePortName String, + direction String, flowEndSeconds DateTime, throughputStandardDeviation Float64, aggType String, diff --git a/pkg/apis/crd/v1alpha1/types.go b/pkg/apis/crd/v1alpha1/types.go index 1fcd7e70..3099a9bd 100644 --- a/pkg/apis/crd/v1alpha1/types.go +++ b/pkg/apis/crd/v1alpha1/types.go @@ -98,8 +98,11 @@ type ThroughputAnomalyDetectorSpec struct { StartInterval metav1.Time `json:"startInterval,omitempty"` EndInterval metav1.Time `json:"endInterval,omitempty"` NSIgnoreList []string `json:"nsIgnoreList,omitempty"` - AggregatedFlow string `json:"aggflow,omitempty"` - Pod2PodLabel string `json:"pod2podlabel,omitempty"` + AggregatedFlow string `json:"aggFlow,omitempty"` + PodLabel string `json:"podLabel,omitempty"` + PodName string `json:"podName,omitempty"` + ExternalIP string `json:"externalIp,omitempty"` + ServicePortName string `json:"servicePortName,omitempty"` ExecutorInstances int `json:"executorInstances,omitempty"` DriverCoreRequest string `json:"driverCoreRequest,omitempty"` DriverMemory string `json:"driverMemory,omitempty"` diff --git a/pkg/apis/intelligence/v1alpha1/types.go b/pkg/apis/intelligence/v1alpha1/types.go index d979bfdc..6b3b11b9 100644 --- a/pkg/apis/intelligence/v1alpha1/types.go +++ b/pkg/apis/intelligence/v1alpha1/types.go @@ -73,8 +73,11 @@ type ThroughputAnomalyDetector struct { EndInterval metav1.Time `json:"endInterval,omitempty"` ExecutorInstances int `json:"executorInstances,omitempty"` NSIgnoreList []string `json:"nsIgnoreList,omitempty"` - AggregatedFlow string `json:"aggflow,omitempty"` - Pod2PodLabel string `json:"pod2podlabel,omitempty"` + AggregatedFlow string `json:"aggFlow,omitempty"` + PodLabel string `json:"podLabel,omitempty"` + PodName string `json:"podName,omitempty"` + ExternalIP string `json:"externalIp,omitempty"` + ServicePortName string `json:"servicePortName,omitempty"` DriverCoreRequest string `json:"driverCoreRequest,omitempty"` DriverMemory string `json:"driverMemory,omitempty"` ExecutorCoreRequest string `json:"executorCoreRequest,omitempty"` @@ -108,10 +111,9 @@ type ThroughputAnomalyDetectorStats struct { DestinationIP string `json:"destinationIP,omitempty"` DestinationTransportPort string `json:"destinationTransportPort,omitempty"` FlowStartSeconds string `json:"FlowStartSeconds,omitempty"` - SourcePodNamespace string `json:"sourcePodNamespace,omitempty"` - SourcePodLabels string `json:"sourcePodLabels,omitempty"` - DestinationPodNamespace string `json:"destinationPodNamespace,omitempty"` - DestinationPodLabels string `json:"destinationPodLabels,omitempty"` + PodNamespace string `json:"podNamespace,omitempty"` + PodLabels string `json:"podLabels,omitempty"` + Direction string `json:"direction,omitempty"` DestinationServicePortName string `json:"destinationServicePortName,omitempty"` FlowEndSeconds string `json:"FlowEndSeconds,omitempty"` Throughput string `json:"throughput,omitempty"` diff --git a/pkg/apiserver/registry/intelligence/throughputanomalydetector/rest.go b/pkg/apiserver/registry/intelligence/throughputanomalydetector/rest.go index 2aff1a27..39ed997d 100644 --- a/pkg/apiserver/registry/intelligence/throughputanomalydetector/rest.go +++ b/pkg/apiserver/registry/intelligence/throughputanomalydetector/rest.go @@ -34,9 +34,9 @@ import ( const ( defaultNameSpace = "flow-visibility" tadQuery int = iota - aggtadpodQuery - aggtadpod2podQuery - aggtadpod2svcQuery + aggTadExternalQuery + aggTadPodQuery + aggTadSvcQuery ) // REST implements rest.Storage for anomalydetector. @@ -57,7 +57,7 @@ var ( var queryMap = map[int]string{ tadQuery: ` - SELECT + SELECT id, sourceIP, sourceTransportPort, @@ -71,44 +71,40 @@ var queryMap = map[int]string{ algoCalc, anomaly FROM tadetector WHERE id = (?);`, - aggtadpodQuery: ` + aggTadExternalQuery: ` SELECT - id, - sourcePodNamespace, - sourcePodLabels, - flowEndSeconds, - throughput, + id, + destinationIP, + flowEndSeconds, + throughput, aggType, - algoType, - algoCalc, - anomaly + algoType, + algoCalc, + anomaly FROM tadetector WHERE id = (?);`, - aggtadpod2podQuery: ` + aggTadPodQuery: ` SELECT - id, - sourcePodNamespace, - sourcePodLabels, - destinationPodNamespace, - destinationPodLabels, - flowEndSeconds, - throughput, + id, + podNamespace, + podLabels, + direction, + flowEndSeconds, + throughput, aggType, - algoType, - algoCalc, - anomaly + algoType, + algoCalc, + anomaly FROM tadetector WHERE id = (?);`, - aggtadpod2svcQuery: ` + aggTadSvcQuery: ` SELECT - id, - sourcePodNamespace, - sourcePodLabels, - destinationServicePortName, - flowEndSeconds, - throughput, + id, + destinationServicePortName, + flowEndSeconds, + throughput, aggType, - algoType, - algoCalc, - anomaly + algoType, + algoCalc, + anomaly FROM tadetector WHERE id = (?);`, } @@ -147,7 +143,10 @@ func (r *REST) copyThroughputAnomalyDetector(tad *v1alpha1.ThroughputAnomalyDete tad.ExecutorInstances = crd.Spec.ExecutorInstances tad.NSIgnoreList = crd.Spec.NSIgnoreList tad.AggregatedFlow = crd.Spec.AggregatedFlow - tad.Pod2PodLabel = crd.Spec.Pod2PodLabel + tad.PodLabel = crd.Spec.PodLabel + tad.PodName = crd.Spec.PodName + tad.ExternalIP = crd.Spec.ExternalIP + tad.ServicePortName = crd.Spec.ServicePortName tad.DriverCoreRequest = crd.Spec.DriverCoreRequest tad.DriverMemory = crd.Spec.DriverMemory tad.ExecutorCoreRequest = crd.Spec.ExecutorCoreRequest @@ -217,7 +216,10 @@ func (r *REST) Create(ctx context.Context, obj runtime.Object, createValidation job.Spec.ExecutorCoreRequest = newTAD.ExecutorCoreRequest job.Spec.ExecutorMemory = newTAD.ExecutorMemory job.Spec.AggregatedFlow = newTAD.AggregatedFlow - job.Spec.Pod2PodLabel = newTAD.Pod2PodLabel + job.Spec.PodLabel = newTAD.PodLabel + job.Spec.PodName = newTAD.PodName + job.Spec.ExternalIP = newTAD.ExternalIP + job.Spec.ServicePortName = newTAD.ServicePortName _, err := r.ThroughputAnomalyDetectorQuerier.CreateThroughputAnomalyDetector(defaultNameSpace, job) if err != nil { return nil, errors.NewBadRequest(fmt.Sprintf("error when creating ThroughputAnomalyDetection job: %+v, err: %v", job, err)) @@ -229,12 +231,12 @@ func (r *REST) getTADetectorResult(id string, tad *v1alpha1.ThroughputAnomalyDet var err error query := tadQuery switch tad.AggregatedFlow { + case "external": + query = aggTadExternalQuery case "pod": - query = aggtadpodQuery - case "pod2pod": - query = aggtadpod2podQuery - case "pod2svc": - query = aggtadpod2svcQuery + query = aggTadPodQuery + case "svc": + query = aggTadSvcQuery } if r.clickhouseConnect == nil { r.clickhouseConnect, err = setupClickHouseConnection(nil) @@ -256,25 +258,25 @@ func (r *REST) getTADetectorResult(id string, tad *v1alpha1.ThroughputAnomalyDet return fmt.Errorf("failed to scan Throughput Anomaly Detector results: %v", err) } tad.Stats = append(tad.Stats, res) - case aggtadpodQuery: + case aggTadExternalQuery: res := v1alpha1.ThroughputAnomalyDetectorStats{} - err := rows.Scan(&res.Id, &res.SourcePodNamespace, &res.SourcePodLabels, &res.FlowEndSeconds, &res.Throughput, &res.AggType, &res.AlgoType, &res.AlgoCalc, &res.Anomaly) + err := rows.Scan(&res.Id, &res.DestinationIP, &res.FlowEndSeconds, &res.Throughput, &res.AggType, &res.AlgoType, &res.AlgoCalc, &res.Anomaly) if err != nil { - return fmt.Errorf("failed to scan Throughput Anomaly Detector pod Aggregate results: %v", err) + return fmt.Errorf("failed to scan Throughput Anomaly Detector External IP Aggregate results: %v", err) } tad.Stats = append(tad.Stats, res) - case aggtadpod2podQuery: + case aggTadPodQuery: res := v1alpha1.ThroughputAnomalyDetectorStats{} - err := rows.Scan(&res.Id, &res.SourcePodNamespace, &res.SourcePodLabels, &res.DestinationPodNamespace, &res.DestinationPodLabels, &res.FlowEndSeconds, &res.Throughput, &res.AggType, &res.AlgoType, &res.AlgoCalc, &res.Anomaly) + err := rows.Scan(&res.Id, &res.PodNamespace, &res.PodLabels, &res.Direction, &res.FlowEndSeconds, &res.Throughput, &res.AggType, &res.AlgoType, &res.AlgoCalc, &res.Anomaly) if err != nil { - return fmt.Errorf("failed to scan Throughput Anomaly Detector pod to pod Aggregate results: %v", err) + return fmt.Errorf("failed to scan Throughput Anomaly Detector pod Aggregate results: %v", err) } tad.Stats = append(tad.Stats, res) - case aggtadpod2svcQuery: + case aggTadSvcQuery: res := v1alpha1.ThroughputAnomalyDetectorStats{} - err := rows.Scan(&res.Id, &res.SourcePodNamespace, &res.SourcePodLabels, &res.DestinationServicePortName, &res.FlowEndSeconds, &res.Throughput, &res.AggType, &res.AlgoType, &res.AlgoCalc, &res.Anomaly) + err := rows.Scan(&res.Id, &res.DestinationServicePortName, &res.FlowEndSeconds, &res.Throughput, &res.AggType, &res.AlgoType, &res.AlgoCalc, &res.Anomaly) if err != nil { - return fmt.Errorf("failed to scan Throughput Anomaly Detector pod to svc Aggregate results: %v", err) + return fmt.Errorf("failed to scan Throughput Anomaly Detector service Aggregate results: %v", err) } tad.Stats = append(tad.Stats, res) } diff --git a/pkg/apiserver/registry/intelligence/throughputanomalydetector/rest_test.go b/pkg/apiserver/registry/intelligence/throughputanomalydetector/rest_test.go index 72f9ff59..2fbb6fef 100644 --- a/pkg/apiserver/registry/intelligence/throughputanomalydetector/rest_test.go +++ b/pkg/apiserver/registry/intelligence/throughputanomalydetector/rest_test.go @@ -230,63 +230,59 @@ func Test_getTadetectorResult(t *testing.T) { expecterr: nil, }, { - name: "Get aggtadquery pod2external result", + name: "Get aggtadquery external result", id: "tad-2", - query: aggtadpodQuery, + query: aggTadExternalQuery, returnedRow: sqlmock.NewRows([]string{ - "Id", "SourcePodNamespace", "SourcePodLabels", "FlowEndSeconds", "Throughput", "AggType", "AlgoType", "AlgoCalc", "Anomaly"}). - AddRow("mock_Id", "mock_SourcePodNamespace", "mock_SourcePodLabels", "mock_FlowEndSeconds", "mock_Throughput", "mock_AggType", "mock_AlgoType", "mock_AlgoCalc", "mock_Anomaly"), + "Id", "destinationIP", "FlowEndSeconds", "Throughput", "AggType", "AlgoType", "AlgoCalc", "Anomaly"}). + AddRow("mock_Id", "mock_destinationIP", "mock_FlowEndSeconds", "mock_Throughput", "mock_AggType", "mock_AlgoType", "mock_AlgoCalc", "mock_Anomaly"), expectedResult: &v1alpha1.ThroughputAnomalyDetector{ Stats: []v1alpha1.ThroughputAnomalyDetectorStats{{ - Id: "mock_Id", - SourcePodNamespace: "mock_SourcePodNamespace", - SourcePodLabels: "mock_SourcePodLabels", - FlowEndSeconds: "mock_FlowEndSeconds", - Throughput: "mock_Throughput", - AggType: "mock_AggType", - AlgoType: "mock_AlgoType", - AlgoCalc: "mock_AlgoCalc", - Anomaly: "mock_Anomaly", + Id: "mock_Id", + DestinationIP: "mock_destinationIP", + FlowEndSeconds: "mock_FlowEndSeconds", + Throughput: "mock_Throughput", + AggType: "mock_AggType", + AlgoType: "mock_AlgoType", + AlgoCalc: "mock_AlgoCalc", + Anomaly: "mock_Anomaly", }}, }, expecterr: nil, }, { - name: "Get aggtadquery pod2pod result", + name: "Get aggtadquery pod result", id: "tad-3", - query: aggtadpod2podQuery, + query: aggTadPodQuery, returnedRow: sqlmock.NewRows([]string{ - "Id", "SourcePodNamespace", "SourcePodLabels", "DestinationPodNamespace", "DestinationPodLabels", "FlowEndSeconds", "Throughput", "AggType", "AlgoType", "AlgoCalc", "Anomaly"}). - AddRow("mock_Id", "mock_SourcePodNamespace", "mock_SourcePodLabels", "mock_DestinationPodNamespace", "mock_DestinationPodLabels", "mock_FlowEndSeconds", "mock_Throughput", "mock_AggType", "mock_AlgoType", "mock_AlgoCalc", "mock_Anomaly"), + "Id", "PodNamespace", "PodLabels", "Direction", "FlowEndSeconds", "Throughput", "AggType", "AlgoType", "AlgoCalc", "Anomaly"}). + AddRow("mock_Id", "mock_PodNamespace", "mock_PodLabels", "mock_Direction", "mock_FlowEndSeconds", "mock_Throughput", "mock_AggType", "mock_AlgoType", "mock_AlgoCalc", "mock_Anomaly"), expectedResult: &v1alpha1.ThroughputAnomalyDetector{ Stats: []v1alpha1.ThroughputAnomalyDetectorStats{{ - Id: "mock_Id", - SourcePodNamespace: "mock_SourcePodNamespace", - SourcePodLabels: "mock_SourcePodLabels", - DestinationPodNamespace: "mock_DestinationPodNamespace", - DestinationPodLabels: "mock_DestinationPodLabels", - FlowEndSeconds: "mock_FlowEndSeconds", - Throughput: "mock_Throughput", - AggType: "mock_AggType", - AlgoType: "mock_AlgoType", - AlgoCalc: "mock_AlgoCalc", - Anomaly: "mock_Anomaly", + Id: "mock_Id", + PodNamespace: "mock_PodNamespace", + PodLabels: "mock_PodLabels", + Direction: "mock_Direction", + FlowEndSeconds: "mock_FlowEndSeconds", + Throughput: "mock_Throughput", + AggType: "mock_AggType", + AlgoType: "mock_AlgoType", + AlgoCalc: "mock_AlgoCalc", + Anomaly: "mock_Anomaly", }}, }, expecterr: nil, }, { - name: "Get aggtadquery pod2svc result", + name: "Get aggtadquery svc result", id: "tad-2", - query: aggtadpod2svcQuery, + query: aggTadSvcQuery, returnedRow: sqlmock.NewRows([]string{ - "Id", "SourcePodNamespace", "SourcePodLabels", "DestinationServicePortName", "FlowEndSeconds", "Throughput", "AggType", "AlgoType", "AlgoCalc", "Anomaly"}). - AddRow("mock_Id", "mock_SourcePodNamespace", "mock_SourcePodLabels", "mock_DestinationServicePortName", "mock_FlowEndSeconds", "mock_Throughput", "mock_AggType", "mock_AlgoType", "mock_AlgoCalc", "mock_Anomaly"), + "Id", "DestinationServicePortName", "FlowEndSeconds", "Throughput", "AggType", "AlgoType", "AlgoCalc", "Anomaly"}). + AddRow("mock_Id", "mock_DestinationServicePortName", "mock_FlowEndSeconds", "mock_Throughput", "mock_AggType", "mock_AlgoType", "mock_AlgoCalc", "mock_Anomaly"), expectedResult: &v1alpha1.ThroughputAnomalyDetector{ Stats: []v1alpha1.ThroughputAnomalyDetectorStats{{ Id: "mock_Id", - SourcePodNamespace: "mock_SourcePodNamespace", - SourcePodLabels: "mock_SourcePodLabels", DestinationServicePortName: "mock_DestinationServicePortName", FlowEndSeconds: "mock_FlowEndSeconds", Throughput: "mock_Throughput", @@ -310,12 +306,12 @@ func Test_getTadetectorResult(t *testing.T) { r := NewREST(&fakeQuerier{}) var tad v1alpha1.ThroughputAnomalyDetector switch tt.query { - case aggtadpod2podQuery: - tad.AggregatedFlow = "pod2pod" - case aggtadpodQuery: + case aggTadExternalQuery: + tad.AggregatedFlow = "external" + case aggTadPodQuery: tad.AggregatedFlow = "pod" - case aggtadpod2svcQuery: - tad.AggregatedFlow = "pod2svc" + case aggTadSvcQuery: + tad.AggregatedFlow = "svc" } err = r.getTADetectorResult(tt.id, &tad) assert.Equal(t, tt.expecterr, err) diff --git a/pkg/controller/anomalydetector/controller.go b/pkg/controller/anomalydetector/controller.go index 0e7c7de1..dcd55aa2 100644 --- a/pkg/controller/anomalydetector/controller.go +++ b/pkg/controller/anomalydetector/controller.go @@ -547,18 +547,28 @@ func (c *AnomalyDetectorController) startSparkApplication(newTAD *crdv1alpha1.Th } if newTAD.Spec.AggregatedFlow != "" { - if newTAD.Spec.AggregatedFlow == "pod" || newTAD.Spec.AggregatedFlow == "pod2pod" || newTAD.Spec.AggregatedFlow == "pod2svc" { + switch newTAD.Spec.AggregatedFlow { + case "pod": newTADJobArgs = append(newTADJobArgs, "--agg_flow", newTAD.Spec.AggregatedFlow) - } else { - return illeagelArguementError{fmt.Errorf("invalid request: Throughput Anomaly Detector aggregated flow type should be 'pod' or 'pod2pod' or 'pod2svc'")} - } - } - - if newTAD.Spec.Pod2PodLabel != "" { - if newTAD.Spec.AggregatedFlow != "pod2pod" { - return illeagelArguementError{fmt.Errorf("invalid request: Throughput Anomaly Detector Pod2PodLabel requires aggregated flow type to be 'pod2pod'")} + if newTAD.Spec.PodLabel != "" { + newTADJobArgs = append(newTADJobArgs, "--pod-label", newTAD.Spec.PodLabel) + } + if newTAD.Spec.PodName != "" { + newTADJobArgs = append(newTADJobArgs, "--pod-name", newTAD.Spec.PodName) + } + case "external": + newTADJobArgs = append(newTADJobArgs, "--agg_flow", newTAD.Spec.AggregatedFlow) + if newTAD.Spec.ExternalIP != "" { + newTADJobArgs = append(newTADJobArgs, "--external-ip", newTAD.Spec.ExternalIP) + } + case "svc": + newTADJobArgs = append(newTADJobArgs, "--agg_flow", newTAD.Spec.AggregatedFlow) + if newTAD.Spec.ServicePortName != "" { + newTADJobArgs = append(newTADJobArgs, "--svc-port-name", newTAD.Spec.ServicePortName) + } + default: + return illeagelArguementError{fmt.Errorf("invalid request: Throughput Anomaly Detector aggregated flow type should be 'pod' or 'external' or 'svc'")} } - newTADJobArgs = append(newTADJobArgs, "--pod2pod_label", newTAD.Spec.Pod2PodLabel) } sparkResourceArgs := struct { diff --git a/pkg/controller/anomalydetector/controller_test.go b/pkg/controller/anomalydetector/controller_test.go index 5cc7bd8d..87ace42c 100644 --- a/pkg/controller/anomalydetector/controller_test.go +++ b/pkg/controller/anomalydetector/controller_test.go @@ -240,8 +240,8 @@ func TestTADetection(t *testing.T) { StartInterval: metav1.NewTime(time.Now()), EndInterval: metav1.NewTime(time.Now().Add(time.Second * 100)), NSIgnoreList: []string{"kube-system", "flow-visibility"}, - AggregatedFlow: "pod2pod", - Pod2PodLabel: "app:label", + AggregatedFlow: "pod", + PodLabel: "app:label", }, Status: crdv1alpha1.ThroughputAnomalyDetectorStatus{}, } @@ -291,6 +291,7 @@ func TestTADetection(t *testing.T) { tadName string tad *crdv1alpha1.ThroughputAnomalyDetector expectedErrorMsg string + expectedstdout string }{ { name: "invalid JobType", @@ -396,20 +397,7 @@ func TestTADetection(t *testing.T) { AggregatedFlow: "nonexistent-agg-flow", }, }, - expectedErrorMsg: "invalid request: Throughput Anomaly Detector aggregated flow type should be 'pod' or 'pod2pod' or 'pod2svc'", - }, - { - name: "invalid Aggregatedflow pod2podlabel combo", - tadName: "tad-invalid-agg-flow-pod2podlabel-combo", - tad: &crdv1alpha1.ThroughputAnomalyDetector{ - ObjectMeta: metav1.ObjectMeta{Name: "tad-invalid-agg-flow-pod2podlabel-combo", Namespace: testNamespace}, - Spec: crdv1alpha1.ThroughputAnomalyDetectorSpec{ - JobType: "ARIMA", - AggregatedFlow: "pod", - Pod2PodLabel: "app:label", - }, - }, - expectedErrorMsg: "invalid request: Throughput Anomaly Detector Pod2PodLabel requires aggregated flow type to be 'pod2pod'", + expectedErrorMsg: "invalid request: Throughput Anomaly Detector aggregated flow type should be 'pod' or 'external' or 'svc'", }, } for _, tc := range testCases { diff --git a/pkg/theia/commands/anomaly_detection_delete.go b/pkg/theia/commands/anomaly_detection_delete.go index aaabac8b..6024eab3 100644 --- a/pkg/theia/commands/anomaly_detection_delete.go +++ b/pkg/theia/commands/anomaly_detection_delete.go @@ -50,7 +50,7 @@ func anomalyDetectionDelete(cmd *cobra.Command, args []string) error { } tadNameList := strings.Fields(tadName) for _, tadName := range tadNameList { - err = deleteTADid(cmd, tadName) + err = deleteTADId(cmd, tadName) if err != nil { return err } @@ -68,7 +68,7 @@ func init() { ) } -func deleteTADid(cmd *cobra.Command, tadName string) error { +func deleteTADId(cmd *cobra.Command, tadName string) error { err := util.ParseADAlgorithmID(tadName) if err != nil { return err diff --git a/pkg/theia/commands/anomaly_detection_retrieve.go b/pkg/theia/commands/anomaly_detection_retrieve.go index 73ef328d..3869a983 100644 --- a/pkg/theia/commands/anomaly_detection_retrieve.go +++ b/pkg/theia/commands/anomaly_detection_retrieve.go @@ -18,7 +18,6 @@ import ( "encoding/json" "fmt" "os" - "text/tabwriter" "github.com/spf13/cobra" @@ -105,31 +104,30 @@ func throughputAnomalyDetectionRetrieve(cmd *cobra.Command, args []string) error } return nil } else { - w := tabwriter.NewWriter(os.Stdout, 15, 8, 1, '\t', tabwriter.AlignRight) + var result [][]string switch tad.Stats[0].AggType { case "e2e": - fmt.Fprintf(w, "id\tsourceIP\tsourceTransportPort\tdestinationIP\tdestinationTransportPort\tflowStartSeconds\tflowEndSeconds\tthroughput\taggType\talgoType\talgoCalc\tanomaly\n") + result = append(result, []string{"id", "sourceIP", "sourceTransportPort", "destinationIP", "destinationTransportPort", "flowStartSeconds", "flowEndSeconds", "throughput", "aggType", "algoType", "algoCalc", "anomaly"}) for _, p := range tad.Stats { - fmt.Fprintf(w, "%v\t%v\t%v\t%v\t%v\t%v\t%v\t%v\t%v\t%v\t%v\t%v\n", p.Id, p.SourceIP, p.SourceTransportPort, p.DestinationIP, p.DestinationTransportPort, p.FlowStartSeconds, p.FlowEndSeconds, p.Throughput, p.AggType, p.AlgoType, p.AlgoCalc, p.Anomaly) + result = append(result, []string{p.Id, p.SourceIP, p.SourceTransportPort, p.DestinationIP, p.DestinationTransportPort, p.FlowStartSeconds, p.FlowEndSeconds, p.Throughput, p.AggType, p.AlgoType, p.AlgoCalc, p.Anomaly}) } - case "pod_to_external": - fmt.Fprintf(w, "id\tsourcePodNamespace\tsourcePodLabels\tflowEndSeconds\tthroughput\taggType\talgoType\talgoCalc\tanomaly\n") + case "pod": + result = append(result, []string{"id", "podNamespace", "podLabels", "direction", "flowEndSeconds", "throughput", "aggType", "algoType", "algoCalc", "anomaly"}) for _, p := range tad.Stats { - fmt.Fprintf(w, "%v\t%v\t%v\t%v\t%v\t%v\t%v\t%v\t%v\n", p.Id, p.SourcePodNamespace, p.SourcePodLabels, p.FlowEndSeconds, p.Throughput, p.AggType, p.AlgoType, p.AlgoCalc, p.Anomaly) + result = append(result, []string{p.Id, p.PodNamespace, p.PodLabels, p.Direction, p.FlowEndSeconds, p.Throughput, p.AggType, p.AlgoType, p.AlgoCalc, p.Anomaly}) } - case "pod_to_pod": - fmt.Fprintf(w, "id\tsourcePodNamespace\tsourcePodLabels\tdestinationPodNamespace\tdestinationPodLabels\tflowEndSeconds\tthroughput\taggType\talgoType\talgoCalc\tanomaly\n") + case "external": + result = append(result, []string{"id", "destinationIP", "flowEndSeconds", "throughput", "aggType", "algoType", "algoCalc", "anomaly"}) for _, p := range tad.Stats { - fmt.Fprintf(w, "%v\t%v\t%v\t%v\t%v\t%v\t%v\t%v\t%v\t%v\t%v\n", p.Id, p.SourcePodNamespace, p.SourcePodLabels, p.DestinationPodNamespace, p.DestinationPodLabels, p.FlowEndSeconds, p.Throughput, p.AggType, p.AlgoType, p.AlgoCalc, p.Anomaly) + result = append(result, []string{p.Id, p.DestinationIP, p.FlowEndSeconds, p.Throughput, p.AggType, p.AlgoType, p.AlgoCalc, p.Anomaly}) } - case "pod_to_svc": - fmt.Fprintf(w, "id\tsourcePodNamespace\tsourcePodLabels\tdestinationServicePortName\tflowEndSeconds\tthroughput\taggType\talgoType\talgoCalc\tanomaly\n") + case "svc": + result = append(result, []string{"id", "destinationServicePortName", "flowEndSeconds", "throughput", "aggType", "algoType", "algoCalc", "anomaly"}) for _, p := range tad.Stats { - fmt.Fprintf(w, "%v\t%v\t%v\t%v\t%v\t%v\t%v\t%v\t%v\t%v\n", p.Id, p.SourcePodNamespace, p.SourcePodLabels, p.DestinationServicePortName, p.FlowEndSeconds, p.Throughput, p.AggType, p.AlgoType, p.AlgoCalc, p.Anomaly) + result = append(result, []string{p.Id, p.DestinationServicePortName, p.FlowEndSeconds, p.Throughput, p.AggType, p.AlgoType, p.AlgoCalc, p.Anomaly}) } } - w.Flush() - fmt.Printf("\n") + TableOutput(result) } return nil } diff --git a/pkg/theia/commands/anomaly_detection_retrieve_test.go b/pkg/theia/commands/anomaly_detection_retrieve_test.go index 9b99c6d8..1ed955ef 100644 --- a/pkg/theia/commands/anomaly_detection_retrieve_test.go +++ b/pkg/theia/commands/anomaly_detection_retrieve_test.go @@ -66,11 +66,11 @@ func TestAnomalyDetectorRetrieve(t *testing.T) { } })), tadName: "tad-1234abcd-1234-abcd-12ab-12345678abcd", - expectedMsg: []string{"id sourceIP sourceTransportPort destinationIP destinationTransportPort flowStartSeconds flowEndSeconds throughput aggType algoType algoCalc anomaly", "e2e 1234567 true"}, + expectedMsg: []string{"id sourceIP sourceTransportPort destinationIP destinationTransportPort flowStartSeconds flowEndSeconds throughput aggType algoType algoCalc anomaly", "e2e 1234567 true"}, expectedErrorMsg: "", }, { - name: "Valid case pod_to_external", + name: "Valid case agg_type: external", testServer: httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { switch strings.TrimSpace(r.URL.Path) { case fmt.Sprintf("/apis/intelligence.theia.antrea.io/v1alpha1/throughputanomalydetectors/%s", tadName): @@ -82,7 +82,7 @@ func TestAnomalyDetectorRetrieve(t *testing.T) { Id: "tad-1234abcd-1234-abcd-12ab-12345678abcd", Anomaly: "true", AlgoCalc: "1234567", - AggType: "pod_to_external", + AggType: "external", }}, } w.Header().Set("Content-Type", "application/json") @@ -91,11 +91,11 @@ func TestAnomalyDetectorRetrieve(t *testing.T) { } })), tadName: "tad-1234abcd-1234-abcd-12ab-12345678abcd", - expectedMsg: []string{"id sourcePodNamespace sourcePodLabels flowEndSeconds throughput aggType algoType algoCalc anomaly", "pod_to_external 1234567 true"}, + expectedMsg: []string{"id destinationIP flowEndSeconds throughput aggType algoType algoCalc anomaly", "external 1234567 true"}, expectedErrorMsg: "", }, { - name: "Valid case pod_to_pod", + name: "Valid case agg_type: pod", testServer: httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { switch strings.TrimSpace(r.URL.Path) { case fmt.Sprintf("/apis/intelligence.theia.antrea.io/v1alpha1/throughputanomalydetectors/%s", tadName): @@ -107,7 +107,7 @@ func TestAnomalyDetectorRetrieve(t *testing.T) { Id: "tad-1234abcd-1234-abcd-12ab-12345678abcd", Anomaly: "true", AlgoCalc: "1234567", - AggType: "pod_to_pod", + AggType: "pod", }}, } w.Header().Set("Content-Type", "application/json") @@ -116,11 +116,11 @@ func TestAnomalyDetectorRetrieve(t *testing.T) { } })), tadName: "tad-1234abcd-1234-abcd-12ab-12345678abcd", - expectedMsg: []string{"id sourcePodNamespace sourcePodLabels destinationPodNamespace destinationPodLabels flowEndSeconds throughput aggType algoType algoCalc anomaly", "pod_to_pod 1234567 true"}, + expectedMsg: []string{"id podNamespace podLabels direction flowEndSeconds throughput aggType algoType algoCalc anomaly", "pod 1234567 true"}, expectedErrorMsg: "", }, { - name: "Valid case pod_to_svc", + name: "Valid case agg_type: svc", testServer: httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { switch strings.TrimSpace(r.URL.Path) { case fmt.Sprintf("/apis/intelligence.theia.antrea.io/v1alpha1/throughputanomalydetectors/%s", tadName): @@ -132,7 +132,7 @@ func TestAnomalyDetectorRetrieve(t *testing.T) { Id: "tad-1234abcd-1234-abcd-12ab-12345678abcd", Anomaly: "true", AlgoCalc: "1234567", - AggType: "pod_to_svc", + AggType: "svc", }}, } w.Header().Set("Content-Type", "application/json") @@ -141,7 +141,7 @@ func TestAnomalyDetectorRetrieve(t *testing.T) { } })), tadName: "tad-1234abcd-1234-abcd-12ab-12345678abcd", - expectedMsg: []string{"id sourcePodNamespace sourcePodLabels destinationServicePortName flowEndSeconds throughput aggType algoType algoCalc anomaly", "pod_to_svc 1234567 true"}, + expectedMsg: []string{"id destinationServicePortName flowEndSeconds throughput aggType algoType algoCalc anomaly", "svc 1234567 true"}, expectedErrorMsg: "", }, { diff --git a/pkg/theia/commands/anomaly_detection_run.go b/pkg/theia/commands/anomaly_detection_run.go index 7b72746e..21874afd 100644 --- a/pkg/theia/commands/anomaly_detection_run.go +++ b/pkg/theia/commands/anomaly_detection_run.go @@ -156,18 +156,44 @@ be a list of namespace string, for example: '["kube-system","flow-aggregator","f return err } if aggregatedFlow != "" { - throughputAnomalyDetection.AggregatedFlow = aggregatedFlow - } - - pod2podlabel, err := cmd.Flags().GetString("p2p-label") - if err != nil { - return err - } - if pod2podlabel != "" { - if aggregatedFlow != "pod2pod" { - return fmt.Errorf("pop2podlabel can only be mentioned with aggregatedFlow as pod2pod, instead found %v", aggregatedFlow) + switch aggregatedFlow { + case "pod": + podLabel, err := cmd.Flags().GetString("pod-label") + if err != nil { + return err + } + if podLabel != "" { + throughputAnomalyDetection.PodLabel = podLabel + } + podName, err := cmd.Flags().GetString("pod-name") + if err != nil { + return err + } + if podName != "" { + throughputAnomalyDetection.PodName = podName + } + throughputAnomalyDetection.AggregatedFlow = aggregatedFlow + case "external": + externalIp, err := cmd.Flags().GetString("external-ip") + if err != nil { + return err + } + if externalIp != "" { + throughputAnomalyDetection.ExternalIP = externalIp + } + throughputAnomalyDetection.AggregatedFlow = aggregatedFlow + case "svc": + servicePortName, err := cmd.Flags().GetString("svc-port-name") + if err != nil { + return err + } + if servicePortName != "" { + throughputAnomalyDetection.ServicePortName = servicePortName + } + throughputAnomalyDetection.AggregatedFlow = aggregatedFlow + default: + return fmt.Errorf("throughput anomaly detector aggregated flow type should be 'pod' or 'external' or 'svc'") } - throughputAnomalyDetection.Pod2PodLabel = pod2podlabel } tadID := uuid.New().String() @@ -262,11 +288,26 @@ Example values include 512M, 1G, 8G, etc.`, throughputAnomalyDetectionAlgoCmd.Flags().String( "agg-flow", "", - `Specifies which aggregated flow to perform anomaly detection on, options are pods/pod2pod/pod2svc`, + `Specifies which aggregated flow to perform anomaly detection on, options are pod/svc/external`, + ) + throughputAnomalyDetectionAlgoCmd.Flags().String( + "pod-label", + "", + `On choosing agg-flow as pod, user has option to mention labels for inbound/outbound throughput, default would be all labels`, + ) + throughputAnomalyDetectionAlgoCmd.Flags().String( + "pod-name", + "", + `On choosing agg-flow as pod, user has option to mention podname for inbound/outbound throughput, if pod-labels are mentioned, that will take priority`, + ) + throughputAnomalyDetectionAlgoCmd.Flags().String( + "external-ip", + "", + `On choosing agg-flow as external, user has option to mention external-ip for inbound throughput, default would be all IPs`, ) throughputAnomalyDetectionAlgoCmd.Flags().String( - "p2p-label", + "svc-port-name", "", - `On choosing agg-flow as pod2pod, user need to mention labels for inbound/outbound throughput`, + `On choosing agg-flow as svc, user has option to mention svc-port-name for inbound throughput, default would be all service port names`, ) } diff --git a/pkg/theia/commands/anomaly_detection_run_test.go b/pkg/theia/commands/anomaly_detection_run_test.go index e2c8d47c..6b861802 100644 --- a/pkg/theia/commands/anomaly_detection_run_test.go +++ b/pkg/theia/commands/anomaly_detection_run_test.go @@ -91,8 +91,11 @@ func TestAnomalyDetectionRun(t *testing.T) { cmd.Flags().String("start-time", "2006-01-02 15:04:05", "") cmd.Flags().String("end-time", "2006-01-03 15:04:05", "") cmd.Flags().String("ns-ignore-list", "[\"kube-system\",\"flow-aggregator\",\"flow-visibility\"]", "") - cmd.Flags().String("agg-flow", "pod2pod", "") - cmd.Flags().String("p2p-label", "app:label1", "") + cmd.Flags().String("agg-flow", "pod", "") + cmd.Flags().String("pod-label", "app:label1", "") + cmd.Flags().String("pod-name", "testpodname", "") + cmd.Flags().String("external-ip", "10.0.0.1", "") + cmd.Flags().String("svc-port-name", "testportname", "") cmd.Flags().Int32("executor-instances", 1, "") cmd.Flags().String("driver-core-request", "1", "") cmd.Flags().String("driver-memory", "1m", "") diff --git a/plugins/anomaly-detection/anomaly_detection.py b/plugins/anomaly-detection/anomaly_detection.py index 9d074ce2..73b30692 100644 --- a/plugins/anomaly-detection/anomaly_detection.py +++ b/plugins/anomaly-detection/anomaly_detection.py @@ -60,16 +60,33 @@ 'max(throughput)' ] -AGG_FLOW_TABLE_COLUMNS = [ - 'sourcePodNamespace', - 'sourcePodLabels', - 'destinationPodNamespace', - 'destinationPodLabels', - 'destinationServicePortName', - 'protocolIdentifier', +AGG_FLOW_TABLE_COLUMNS_EXTERNAL = [ + 'destinationIP', 'flowType', 'flowEndSeconds', - 'max(throughput)' + 'sum(throughput)' +] + +AGG_FLOW_TABLE_COLUMNS_POD_INBOUND = [ + "destinationPodNamespace AS podNamespace", + "destinationPodLabels AS podLabels", + "'inbound' AS direction", + "flowEndSeconds", + "sum(throughput)" +] + +AGG_FLOW_TABLE_COLUMNS_POD_OUTBOUND = [ + "sourcePodNamespace AS podNamespace", + "sourcePodLabels AS podLabels", + "'outbound' AS direction", + "flowEndSeconds", + "sum(throughput)" +] + +AGG_FLOW_TABLE_COLUMNS_SVC = [ + 'destinationServicePortName', + 'flowEndSeconds', + 'sum(throughput)' ] # Column names to be used to group and identify a connection uniquely @@ -81,17 +98,22 @@ 'protocolIdentifier', 'flowStartSeconds' ] +# TODO different columns for different agg_types +DF_AGG_GRP_COLUMNS_POD = [ + 'podNamespace', + 'podLabels', + 'direction', +] -DF_AGG_GRP_COLUMNS = [ - 'sourcePodNamespace', - 'sourcePodLabels', - 'destinationPodNamespace', - 'destinationPodLabels', - 'destinationServicePortName', - 'protocolIdentifier', +DF_AGG_GRP_COLUMNS_EXTERNAL = [ + 'destinationIP', 'flowType', ] +DF_AGG_GRP_COLUMNS_SVC = [ + 'destinationServicePortName', +] + MEANINGLESS_LABELS = [ "pod-template-hash", "controller-revision-hash", @@ -114,15 +136,14 @@ def calculate_ewma(throughput_list): prev_ewma_val = 0.0 ewma_row = [] for ele in throughput_list: - ele_float = float(ele[0]) + ele_float = float(ele) curr_ewma_val = (1 - alpha) * prev_ewma_val + alpha * ele_float prev_ewma_val = curr_ewma_val ewma_row.append(float(curr_ewma_val)) - return ewma_row -def calculate_ewma_anomaly(dataframe): +def calculate_ewma_anomaly(throughput_row, stddev): """ The function categorizes whether a network flow is Anomalous or not based on the calculated EWMA value. @@ -131,22 +152,20 @@ def calculate_ewma_anomaly(dataframe): True - Anomalous Traffic, False - Not Anomalous Args: - dataframe : The row of a dataframe containing all data related to - this network connection. + throughput_row : The row of a dataframe containing all throughput data. + stddev : The row of a dataframe containing standard Deviation Returns: A list of boolean values which signifies if a network flow record of that connection is anomalous or not. """ - stddev = dataframe[7] - ewma_arr = dataframe[10] - throughput_arr = dataframe[11] + ewma_arr = calculate_ewma(throughput_row) anomaly_result = [] if ewma_arr is None: logger.error("Error: EWMA values not calculated for this flow record") result = False anomaly_result.append(result) - elif throughput_arr is None: + elif throughput_row is None: logger.error("Error: Throughput values not in ideal format for this " "flow record") result = False @@ -158,13 +177,13 @@ def calculate_ewma_anomaly(dataframe): logger.error("Error: Too Few Throughput Values for Standard " "Deviation to be calculated.") result = False - elif throughput_arr[i] is None: + elif throughput_row[i] is None: logger.error("Error: Throughput values not in ideal format " "for this flow record") result = False else: float(stddev) - result = True if (abs(float(throughput_arr[i]) - float( + result = True if (abs(float(throughput_row[i]) - float( ewma_arr[i])) > float(stddev)) else False anomaly_result.append(result) @@ -187,7 +206,7 @@ def calculate_arima(throughputs): throughput_list = [] for ele in throughputs: - throughput_list.append(float(ele[0])) + throughput_list.append(float(ele)) if len(throughput_list) <= 3: logger.error("Error: Too Few throughput values for ARIMA to work with") return None @@ -223,7 +242,7 @@ def calculate_arima(throughputs): return None -def calculate_arima_anomaly(dataframe): +def calculate_arima_anomaly(throughput_row, stddev): """ The function categorizes whether a network flow is Anomalous or not based on the calculated ARIMA value. A traffic is anomalous if abs(throughput @@ -231,23 +250,21 @@ def calculate_arima_anomaly(dataframe): Anomalous Args: - dataframe : The row of a dataframe containing all data related to - this network connection. + Throughput_list : The row of a dataframe containing all throughput + data. + stddev : The row of a dataframe containing standard Deviation Returns: A list of boolean values which signifies if a network flow record of that connection is anomalous or not. """ - - stddev = dataframe[7] - arima_arr = dataframe[10] - throughput_arr = dataframe[11] + arima_arr = calculate_arima(throughput_row) anomaly_result = [] if arima_arr is None: logger.error("Error: ARIMA values not calculated for this flow record") result = False anomaly_result.append(result) - elif throughput_arr is None: + elif throughput_row is None: logger.error("Error: Throughput values not in ideal format for this " "flow record") result = False @@ -259,12 +276,12 @@ def calculate_arima_anomaly(dataframe): logger.error("Error: Too Few Throughput Values for Standard " "Deviation to be calculated.") result = False - elif throughput_arr[i] is None: + elif throughput_row[i] is None: logger.error("Error: Throughput values not in ideal format " "for this flow record") result = False else: - result = True if (abs(float(throughput_arr[i]) - float( + result = True if (abs(float(throughput_row[i]) - float( arima_arr[i])) > float(stddev)) else False anomaly_result.append(result) return anomaly_result @@ -283,13 +300,13 @@ def calculate_dbscan(throughput_list): return placeholder_throughput_list -def calculate_dbscan_anomaly(dataframe): +def calculate_dbscan_anomaly(throughput_row, stddev=None): """ The function calculates Density-based spatial clustering of applications with Noise (DBSCAN) for a given list of throughput values of a connection Args: - dataframe: The row of a dataframe containing all data related to this - network connection. + throughput_row : The row of a dataframe containing all throughput data. + stddev : The row of a dataframe containing standard Deviation Assumption: Since DBSCAN needs only numeric value to train and start prediction, any connection with null values will not be taken into account for calculation. We return empty value in that case. @@ -297,10 +314,8 @@ def calculate_dbscan_anomaly(dataframe): A list of boolean values which signifies if a network flow records of the connection is anomalous or not based on DBSCAN """ - - throughput_list = dataframe[11] anomaly_result = [] - np_throughput_list = np.array(throughput_list) + np_throughput_list = np.array(throughput_row) np_throughput_list = np_throughput_list.reshape(-1, 1) outlier_detection = DBSCAN(min_samples=4, eps=250000000) clusters = outlier_detection.fit_predict(np_throughput_list) @@ -313,26 +328,37 @@ def calculate_dbscan_anomaly(dataframe): def filter_df_with_true_anomalies(spark, plotDF, algo_type, agg_flow=None): - if agg_flow: - plotDF = plotDF.withColumn( - "new", f.arrays_zip( - "flowEndSeconds", "algoCalc", "throughputs", - "anomaly")).withColumn( - "new", f.explode("new")).select( - "sourcePodNamespace", "sourcePodLabels", "destinationPodNamespace", - "destinationPodLabels", - "destinationServicePortName", "protocolIdentifier", "aggType", + newDF = plotDF.withColumn( + "new", f.arrays_zip( + "flowEndSeconds", "algoCalc", "throughputs", + "anomaly")).withColumn( + "new", f.explode("new")) + if agg_flow == "pod": + plotDF = newDF.select( + "podNamespace", "podLabels", "direction", "aggType", + f.col("new.flowEndSeconds").alias("flowEndSeconds"), + "throughputStandardDeviation", "algoType", + f.col("new.algoCalc").alias("algoCalc"), + f.col("new.throughputs").alias("throughput"), + f.col("new.anomaly").alias("anomaly")) + elif agg_flow == "external": + plotDF = newDF.select( + "destinationIP", "aggType", + f.col("new.flowEndSeconds").alias("flowEndSeconds"), + "throughputStandardDeviation", "algoType", + f.col("new.algoCalc").alias("algoCalc"), + f.col("new.throughputs").alias("throughput"), + f.col("new.anomaly").alias("anomaly")) + elif agg_flow == "svc": + plotDF = newDF.select( + "destinationServicePortName", "aggType", f.col("new.flowEndSeconds").alias("flowEndSeconds"), "throughputStandardDeviation", "algoType", f.col("new.algoCalc").alias("algoCalc"), f.col("new.throughputs").alias("throughput"), f.col("new.anomaly").alias("anomaly")) else: - plotDF = plotDF.withColumn( - "new", f.arrays_zip( - "flowEndSeconds", "algoCalc", "throughputs", - "anomaly")).withColumn( - "new", f.explode("new")).select( + plotDF = newDF.select( "sourceIP", "sourceTransportPort", "destinationIP", "destinationTransportPort", "protocolIdentifier", "flowStartSeconds", "aggType", @@ -344,13 +370,10 @@ def filter_df_with_true_anomalies(spark, plotDF, algo_type, agg_flow=None): ret_plot = plotDF.where(~plotDF.anomaly.isin([False])) if ret_plot.count() == 0: ret_plot = ret_plot.collect() - agg_type = "e2e" - if agg_flow == "pod": - agg_type = "pod_to_external" - elif agg_flow == "pod2pod": - agg_type = "pod_to_pod" - elif agg_flow == "pod2svc": - agg_type = "pod_to_svc" + if agg_flow == "": + agg_type = "e2e" + else: + agg_type = agg_flow ret_plot.append({ "sourceIP": 'None', "sourceTransportPort": 0, @@ -358,11 +381,10 @@ def filter_df_with_true_anomalies(spark, plotDF, algo_type, agg_flow=None): "destinationTransportPort": 0, "protocolIdentifier": 0, "flowStartSeconds": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), - "sourcePodNamespace": 'None', - "sourcePodLabels": 'None', - "destinationPodNamespace": 'None', - "destinationPodLabels": 'None', + "podNamespace": 'None', + "podLabels": 'None', "destinationServicePortName": 'None', + "direction": 'None', "flowEndSeconds": 0, "throughputStandardDeviation": 0, "aggType": agg_type, @@ -378,8 +400,8 @@ def plot_anomaly(spark, init_plot_df, algo_type, algo_func, anomaly_func, tad_id_input, agg_flow=None): # Insert the Algo currently in use init_plot_df = init_plot_df.withColumn('algoType', f.lit(algo_type)) - # Endpoint to Endpoint schema List - common_schema_list = [ + # Schema List + schema_list = [ StructField('sourceIP', StringType(), True), StructField('sourceTransportPort', LongType(), True), StructField('destinationIP', StringType(), True), @@ -389,54 +411,52 @@ def plot_anomaly(spark, init_plot_df, algo_type, algo_func, anomaly_func, StructField('flowEndSeconds', ArrayType(TimestampType(), True)), StructField('throughputStandardDeviation', DoubleType(), True) ] - - # Aggregated Schema List - agg_schema_list = [ - StructField('sourcePodNamespace', StringType(), True), - StructField('sourcePodLabels', StringType(), True), - StructField('destinationPodNamespace', StringType(), True), - StructField('destinationPodLabels', StringType(), True), - StructField('destinationServicePortName', StringType(), True), - StructField('protocolIdentifier', LongType(), True), - StructField('flowEndSeconds', ArrayType(TimestampType(), True)), - StructField('throughputStandardDeviation', DoubleType(), True) - ] - schema_list = agg_schema_list if agg_flow else common_schema_list - # Calculate anomaly Values on the DF algo_func_rdd = init_plot_df.rdd.map( lambda x: (x[0], x[1], x[2], x[3], x[4], x[5], - x[6], x[7], x[8], x[9], x[10], algo_func(x[8]))) - - # Schema for the Dataframe to be created from the RDD - algo_func_rdd_Schema = StructType(schema_list + [StructField( - 'Throughput', ArrayType(StructType( - [StructField("max(throughput)", DecimalType(38, 18), True)]))), - StructField('aggType', StringType(), True), - StructField('algoType', StringType(), True), - StructField('algoCalc', ArrayType(DoubleType(), True))]) - - algo_DF = spark.createDataFrame(algo_func_rdd, algo_func_rdd_Schema) - algo_DF = algo_DF.withColumn("throughputs", f.col( - "Throughput.max(throughput)").cast( - ArrayType(DecimalType(38, 18)))).drop("Throughput") - - # DF to RDD to calculate anomaly using EWMA, ARIMA and DBSCAN - anomaly_func_rdd = algo_DF.rdd.map( - lambda x: (x[0], x[1], x[2], x[3], x[4], x[5], - x[6], x[7], x[8], x[9], x[10], x[11], - anomaly_func(x))) + x[6], x[7], x[8], x[9], x[10], algo_func(x[8]), + anomaly_func(x[8], x[7]))) + if agg_flow == "pod": + schema_list = [ + StructField('podNamespace', StringType(), True), + StructField('podLabels', StringType(), True), + StructField('direction', StringType(), True), + StructField('flowEndSeconds', ArrayType(TimestampType(), True)), + StructField('throughputStandardDeviation', DoubleType(), True) + ] + algo_func_rdd = init_plot_df.rdd.map( + lambda x: (x[0], x[1], x[2], x[3], x[4], x[5], + x[6], x[7], algo_func(x[5]), + anomaly_func(x[5], x[4]))) + elif agg_flow == "svc": + schema_list = [ + StructField('destinationServicePortName', StringType(), True), + StructField('flowEndSeconds', ArrayType(TimestampType(), True)), + StructField('throughputStandardDeviation', DoubleType(), True) + ] + algo_func_rdd = init_plot_df.rdd.map( + lambda x: (x[0], x[1], x[2], x[3], x[4], x[5], algo_func(x[3]), + anomaly_func(x[3], x[2]))) + elif agg_flow == "external": + schema_list = [ + StructField('destinationIP', StringType(), True), + StructField('flowEndSeconds', ArrayType(TimestampType(), True)), + StructField('throughputStandardDeviation', DoubleType(), True) + ] + algo_func_rdd = init_plot_df.rdd.map( + lambda x: (x[0], x[1], x[2], x[3], x[4], x[5], algo_func(x[3]), + anomaly_func(x[3], x[2]))) # Schema for the Dataframe to be created from the RDD - anomaly_func_rdd_Schema = StructType(schema_list + [ + algo_func_rdd_Schema = StructType(schema_list + [ + StructField('throughputs', ArrayType(DecimalType(38, 18), True)), StructField('aggType', StringType(), True), StructField('algoType', StringType(), True), StructField('algoCalc', ArrayType(DoubleType(), True)), - StructField('throughputs', ArrayType(DecimalType(38, 18), True)), StructField('anomaly', ArrayType(BooleanType(), True)) ]) - anomalyDF = spark.createDataFrame(anomaly_func_rdd, - anomaly_func_rdd_Schema) + + anomalyDF = spark.createDataFrame(algo_func_rdd, algo_func_rdd_Schema) ret_plotDF = filter_df_with_true_anomalies(spark, anomalyDF, algo_type, agg_flow) # Write anomalous records to DB/CSV - Module WIP @@ -449,55 +469,101 @@ def plot_anomaly(spark, init_plot_df, algo_type, algo_func, anomaly_func, def generate_tad_sql_query(start_time, end_time, ns_ignore_list, - agg_flow=None, pod2pod_label=None): - sql_query = ("SELECT {} FROM {} ".format( - ", ".join(AGG_FLOW_TABLE_COLUMNS if agg_flow else FLOW_TABLE_COLUMNS), - table_name)) - sql_query_extension = [] - if ns_ignore_list: - sql_query_extension.append( - "sourcePodNamespace NOT IN ({0}) AND " - "destinationPodNamespace NOT IN ({0})".format( - ", ".join("'{}'".format(x) for x in ns_ignore_list))) - if start_time: - sql_query_extension.append( - "flowStartSeconds >= '{}'".format(start_time)) - if end_time: - sql_query_extension.append("flowEndSeconds < '{}'".format(end_time)) - if agg_flow: - if agg_flow == "pod": - sql_query_extension.append("flowType = 3") - elif agg_flow == "pod2pod": - if pod2pod_label: - sql_query_extension.append( - "ilike(destinationPodLabels, '%{0}%') AND ilike(" - "sourcePodLabels, '%{0}%')".format(pod2pod_label)) - else: - sql_query_extension.append("destinationPodLabels <> '' AND " - "sourcePodLabels <> ''") - elif agg_flow == "pod2svc": - sql_query_extension.append("destinationServicePortName <> ''") - - if sql_query_extension: - sql_query += "WHERE " + " AND ".join(sql_query_extension) + " " - sql_query += "GROUP BY {} ".format( - ", ".join(DF_AGG_GRP_COLUMNS + [ - "flowEndSeconds"] if agg_flow else DF_GROUP_COLUMNS + [ - "flowEndSeconds"])) + agg_flow=None, pod_label=None, external_ip=None, + svc_port_name=None, pod_name=None): + if agg_flow == "pod": + if pod_label: + inbound_condition = ( + "ilike(destinationPodLabels, '%{}%') ".format(pod_label)) + outbound_condition = ( + "ilike(sourcePodLabels, '%{}%')".format(pod_label)) + elif pod_name: + inbound_condition = ( + "destinationPodName = '{}'".format(pod_name)) + outbound_condition = ( + "sourcePodName = '{}'".format(pod_name)) + else: + inbound_condition = ( + "destinationPodLabels <> '' ") + outbound_condition = ( + "sourcePodLabels <> ''") + if ns_ignore_list: + sql_query_extension = ( + "AND sourcePodNamespace NOT IN ({0}) AND " + "destinationPodNamespace NOT IN ({0})".format( + ", ".join("'{}'".format(x) for x in ns_ignore_list))) + else: + sql_query_extension = "" + sql_query = ( + "SELECT * FROM " + "(SELECT {0} FROM {1} WHERE {2} {6} GROUP BY {3}) " + "UNION ALL " + "(SELECT {4} FROM {1} WHERE {5} {6} GROUP BY {3}) ".format( + ", ".join(AGG_FLOW_TABLE_COLUMNS_POD_INBOUND), + table_name, inbound_condition, ", ".join( + DF_AGG_GRP_COLUMNS_POD + ['flowEndSeconds']), + ", ".join(AGG_FLOW_TABLE_COLUMNS_POD_OUTBOUND), + outbound_condition, sql_query_extension)) + else: + common_flow_table_columns = FLOW_TABLE_COLUMNS + if agg_flow == "external": + common_flow_table_columns = AGG_FLOW_TABLE_COLUMNS_EXTERNAL + elif agg_flow == "svc": + common_flow_table_columns = AGG_FLOW_TABLE_COLUMNS_SVC + + sql_query = ("SELECT {} FROM {} ".format( + ", ".join(common_flow_table_columns), table_name)) + sql_query_extension = [] + if ns_ignore_list: + sql_query_extension.append( + "sourcePodNamespace NOT IN ({0}) AND " + "destinationPodNamespace NOT IN ({0})".format( + ", ".join("'{}'".format(x) for x in ns_ignore_list))) + if start_time: + sql_query_extension.append( + "flowStartSeconds >= '{}'".format(start_time)) + if end_time: + sql_query_extension.append( + "flowEndSeconds < '{}'".format(end_time)) + if agg_flow: + if agg_flow == "external": + # TODO agg=destination IP, change the name to external + sql_query_extension.append("flowType = 3") + if external_ip: + sql_query_extension.append("destinationIP = '{}'".format( + external_ip)) + elif agg_flow == "svc": + if svc_port_name: + sql_query_extension.append( + "destinationServicePortName = '{}'".format( + svc_port_name)) + else: + sql_query_extension.append( + "destinationServicePortName <> ''") + + if sql_query_extension: + sql_query += "WHERE " + " AND ".join(sql_query_extension) + " " + df_group_columns = DF_GROUP_COLUMNS + if agg_flow == "external": + df_group_columns = DF_AGG_GRP_COLUMNS_EXTERNAL + elif agg_flow == "svc": + df_group_columns = DF_AGG_GRP_COLUMNS_SVC + + sql_query += "GROUP BY {} ".format( + ", ".join(df_group_columns + [ + "flowEndSeconds"])) return sql_query -def assign_flow_type(prepared_DF, agg_flow=None): - if agg_flow == "pod": +def assign_flow_type(prepared_DF, agg_flow=None, direction=None): + if agg_flow == "external": prepared_DF = prepared_DF.withColumn('aggType', f.lit( - "pod_to_external")) - prepared_DF = prepared_DF.drop('flowType') - elif agg_flow == "pod2svc": - prepared_DF = prepared_DF.withColumn('aggType', f.lit("pod_to_svc")) - prepared_DF = prepared_DF.drop('flowType') - elif agg_flow == "pod2pod": - prepared_DF = prepared_DF.withColumn('aggType', f.lit("pod_to_pod")) + "external")) prepared_DF = prepared_DF.drop('flowType') + elif agg_flow == "svc": + prepared_DF = prepared_DF.withColumn('aggType', f.lit("svc")) + elif agg_flow == "pod": + prepared_DF = prepared_DF.withColumn('aggType', f.lit("pod")) else: prepared_DF = prepared_DF.withColumn('aggType', f.lit("e2e")) return prepared_DF @@ -521,10 +587,12 @@ def remove_meaningless_labels(podLabels): def anomaly_detection(algo_type, db_jdbc_address, start_time, end_time, tad_id_input, ns_ignore_list, agg_flow=None, - pod2pod_label=None): + pod_label=None, external_ip=None, svc_port_name=None, + pod_name=None): spark = SparkSession.builder.getOrCreate() - sql_query = generate_tad_sql_query(start_time, end_time, ns_ignore_list, - agg_flow, pod2pod_label) + sql_query = generate_tad_sql_query( + start_time, end_time, ns_ignore_list, agg_flow, pod_label, + external_ip, svc_port_name, pod_name) initDF = ( spark.read.format("jdbc").option( 'driver', "ru.yandex.clickhouse.ClickHouseDriver").option( @@ -533,29 +601,35 @@ def anomaly_detection(algo_type, db_jdbc_address, start_time, end_time, "password", os.getenv("CH_PASSWORD")).option( "query", sql_query).load() ) - group_columns = DF_AGG_GRP_COLUMNS if agg_flow else DF_GROUP_COLUMNS - prepared_DF = initDF.groupby(group_columns).agg( - f.collect_list("flowEndSeconds").alias("flowEndSeconds"), - f.stddev_samp("max(throughput)").alias("throughputStandardDeviation"), - f.collect_list(f.struct(["max(throughput)"])).alias("Throughput")) + if agg_flow: + if agg_flow == "pod": + df_agg_grp_columns = DF_AGG_GRP_COLUMNS_POD + elif agg_flow == "external": + df_agg_grp_columns = DF_AGG_GRP_COLUMNS_EXTERNAL + elif agg_flow == "svc": + df_agg_grp_columns = DF_AGG_GRP_COLUMNS_SVC + prepared_DF = initDF.groupby(df_agg_grp_columns).agg( + f.collect_list("flowEndSeconds").alias("flowEndSeconds"), + f.stddev_samp("sum(throughput)").alias( + "throughputStandardDeviation"), + f.collect_list("sum(throughput)").alias("throughputs")) + else: + prepared_DF = initDF.groupby(DF_GROUP_COLUMNS).agg( + f.collect_list("flowEndSeconds").alias("flowEndSeconds"), + f.stddev_samp("max(throughput)").alias( + "throughputStandardDeviation"), + f.collect_list("max(throughput)").alias("throughputs")) prepared_DF = assign_flow_type(prepared_DF, agg_flow) - if agg_flow: + if agg_flow == "pod": prepared_DF = ( prepared_DF.withColumn( - "sourcePodLabels", - f.udf(remove_meaningless_labels, StringType())( - "sourcePodLabels" - ), - ) - .withColumn( - "destinationPodLabels", + "PodLabels", f.udf(remove_meaningless_labels, StringType())( - "destinationPodLabels" + "PodLabels" ), ) - .dropDuplicates(["sourcePodLabels", "destinationPodLabels"]) ) if algo_type == "EWMA": @@ -599,7 +673,10 @@ def main(): tad_id_input = None ns_ignore_list = [] agg_flow = "" - pod2pod_label = "" + pod_label = "" + external_ip = "" + svc_port_name = "" + pod_name = "" help_message = """ Start the Throughput Anomaly Detection spark job. Options: @@ -624,15 +701,21 @@ def main(): -n, --ns_ignore_list=[]: List of namespaces to ignore in anomaly calculation. -f, --agg_flow=None: Aggregated Flow Throughput Anomaly Detection. - -l, --pod2pod_label=None: Aggregated Flow Throughput Anomaly Detection - pod2pod using labels + -l, --pod-label=None: Aggregated Flow Throughput Anomaly Detection + to/from Pod using pod labels + -N, --pod-name=None: Aggregated Flow Throughput Anomaly Detection + to/from Pod using pod labels + -x, --external-ip=None: Aggregated Flow Throughput Anomaly Detection + to Destination IP + -p, --svc-port-name=None: Aggregated Flow Throughput Anomaly Detection + to Destinatination Service Port """ # TODO: change to use argparse instead of getopt for options try: opts, _ = getopt.getopt( sys.argv[1:], - "ht:d:s:e:i:n:f:l", + "ht:d:s:e:i:n:f:l:d:x:p:N", [ "help", "algo=", @@ -642,7 +725,10 @@ def main(): "id=", "ns_ignore_list=", "agg_flow=", - "pod2pod_label=", + "pod-label=", + "external-ip=", + "svc-port-name=", + "pod-name=", ], ) except getopt.GetoptError as e: @@ -705,8 +791,14 @@ def main(): tad_id_input = arg elif opt in ("-f", "--agg_flow"): agg_flow = arg - elif opt in ("-l", "--pod2pod_label"): - pod2pod_label = arg + elif opt in ("-l", "--pod-label"): + pod_label = arg + elif opt in ("-N", "--pod-name"): + pod_name = arg + elif opt in ("-x", "--external-ip"): + external_ip = arg + elif opt in ("-", "--svc-port-name"): + svc_port_name = arg func_start_time = time.time() logger.info("Script started at {}".format( @@ -719,7 +811,10 @@ def main(): tad_id_input, ns_ignore_list, agg_flow, - pod2pod_label, + pod_label, + external_ip, + svc_port_name, + pod_name ) func_end_time = time.time() tad_id = write_anomaly_detection_result( diff --git a/plugins/anomaly-detection/anomaly_detection_test.py b/plugins/anomaly-detection/anomaly_detection_test.py index cea52a82..76892ce6 100644 --- a/plugins/anomaly-detection/anomaly_detection_test.py +++ b/plugins/anomaly-detection/anomaly_detection_test.py @@ -22,151 +22,214 @@ @pytest.fixture(scope="session") def spark_session(request): spark_session = ( - SparkSession.builder.master("local") - .appName("anomaly_detection_job_test") - .getOrCreate() + SparkSession.builder.master("local").appName( + "anomaly_detection_job_test").getOrCreate() ) request.addfinalizer(lambda: spark_session.sparkContext.stop()) return spark_session table_name = "default.flows" +inbound_condition = ( + "ilike(destinationPodLabels, '%\"app\":\"clickhouse\"%') ") +outbound_condition = ("ilike(sourcePodLabels, '%\"app\":\"clickhouse\"%')") +inbound_condition_podname = ( + "destinationPodName = 'TestPodName'") +outbound_condition_podname = ( + "sourcePodName = 'TestPodName'") @pytest.mark.parametrize( "test_input, expected_sql_query", [ ( - ("", "", [], "", ""), - "SELECT {} FROM {} GROUP BY {} ".format( - ", ".join(ad.FLOW_TABLE_COLUMNS), - table_name, - ", ".join(ad.DF_GROUP_COLUMNS + ['flowEndSeconds']), - ), + ("", "", [], "", "", "", "", ""), + "SELECT {} FROM {} GROUP BY {} ".format( + ", ".join(ad.FLOW_TABLE_COLUMNS), + table_name, + ", ".join(ad.DF_GROUP_COLUMNS + ['flowEndSeconds']), + ), ), ( - ("2022-01-01 00:00:00", "", [], "", ""), - "SELECT {} FROM {} WHERE " - "flowStartSeconds >= '2022-01-01 00:00:00' " - "GROUP BY {} ".format( - ", ".join(ad.FLOW_TABLE_COLUMNS), - table_name, - ", ".join(ad.DF_GROUP_COLUMNS + ['flowEndSeconds']), - ), + ("2022-01-01 00:00:00", "", [], "", "", "", "", ""), + "SELECT {} FROM {} WHERE " + "flowStartSeconds >= '2022-01-01 00:00:00' " + "GROUP BY {} ".format( + ", ".join(ad.FLOW_TABLE_COLUMNS), + table_name, + ", ".join(ad.DF_GROUP_COLUMNS + ['flowEndSeconds']), + ), ), ( - ("", "2022-01-01 23:59:59", [], "", ""), - "SELECT {} FROM {} WHERE " - "flowEndSeconds < '2022-01-01 23:59:59' GROUP BY {} ".format( - ", ".join(ad.FLOW_TABLE_COLUMNS), - table_name, - ", ".join(ad.DF_GROUP_COLUMNS + ['flowEndSeconds']), - ), + ("", "2022-01-01 23:59:59", [], "", "", "", "", ""), + "SELECT {} FROM {} WHERE " + "flowEndSeconds < '2022-01-01 23:59:59' GROUP BY {} ".format( + ", ".join(ad.FLOW_TABLE_COLUMNS), + table_name, + ", ".join(ad.DF_GROUP_COLUMNS + ['flowEndSeconds']), + ), ), ( - ("2022-01-01 00:00:00", "2022-01-01 23:59:59", [], "", ""), - "SELECT {} FROM {} WHERE " - "flowStartSeconds >= '2022-01-01 00:00:00' AND " - "flowEndSeconds < '2022-01-01 23:59:59' " - "GROUP BY {} ".format( - ", ".join(ad.FLOW_TABLE_COLUMNS), - table_name, - ", ".join(ad.DF_GROUP_COLUMNS + ['flowEndSeconds']), - ), + ("2022-01-01 00:00:00", "2022-01-01 23:59:59", [], "", "", "", + "", ""), + "SELECT {} FROM {} WHERE " + "flowStartSeconds >= '2022-01-01 00:00:00' AND " + "flowEndSeconds < '2022-01-01 23:59:59' " + "GROUP BY {} ".format( + ", ".join(ad.FLOW_TABLE_COLUMNS), + table_name, + ", ".join(ad.DF_GROUP_COLUMNS + ['flowEndSeconds']), + ), ), ( - ("", "", ["mock_ns", "mock_ns2"], "", ""), - "SELECT {} FROM {} WHERE " - "sourcePodNamespace NOT IN ('mock_ns', 'mock_ns2') AND " - "destinationPodNamespace NOT IN ('mock_ns', 'mock_ns2') " - "GROUP BY {} ".format( - ", ".join(ad.FLOW_TABLE_COLUMNS), - table_name, - ", ".join(ad.DF_GROUP_COLUMNS + ['flowEndSeconds']), - ), + ("", "", ["mock_ns", "mock_ns2"], "", "", "", "", ""), + "SELECT {} FROM {} WHERE " + "sourcePodNamespace NOT IN ('mock_ns', 'mock_ns2') AND " + "destinationPodNamespace NOT IN ('mock_ns', 'mock_ns2') " + "GROUP BY {} ".format( + ", ".join(ad.FLOW_TABLE_COLUMNS), + table_name, + ", ".join(ad.DF_GROUP_COLUMNS + ['flowEndSeconds']), + ), ), ( - ("", "", [], "pod", ""), - "SELECT {} FROM {} WHERE " - "flowType = 3 " - "GROUP BY {} ".format( - ", ".join(ad.AGG_FLOW_TABLE_COLUMNS), - table_name, - ", ".join(ad.DF_AGG_GRP_COLUMNS + ['flowEndSeconds']), - ) + ("", "", [], "external", "", "10.0.0.1", "", ""), + "SELECT {} FROM {} WHERE " + "flowType = 3 AND destinationIP = '10.0.0.1' " + "GROUP BY {} ".format( + ", ".join(ad.AGG_FLOW_TABLE_COLUMNS_EXTERNAL), + table_name, + ", ".join( + ad.DF_AGG_GRP_COLUMNS_EXTERNAL + ['flowEndSeconds']), + ) ), ( - ("", "", [], "pod2pod", ""), - "SELECT {} FROM {} WHERE " - "destinationPodLabels <> '' AND sourcePodLabels <> '' " - "GROUP BY {} ".format( - ", ".join(ad.AGG_FLOW_TABLE_COLUMNS), - table_name, - ", ".join(ad.DF_AGG_GRP_COLUMNS + ['flowEndSeconds']), - ) + ("", "", [], "pod", "\"app\":\"clickhouse\"", "", "", ""), + "SELECT * FROM " + "(SELECT {0} FROM {1} WHERE {2} GROUP BY {3}) " + "UNION ALL " + "(SELECT {4} FROM {1} WHERE {5} GROUP BY {3}) ".format( + ", ".join(ad.AGG_FLOW_TABLE_COLUMNS_POD_INBOUND), + table_name, inbound_condition, + ", ".join(ad.DF_AGG_GRP_COLUMNS_POD + ['flowEndSeconds']), + ", ".join(ad.AGG_FLOW_TABLE_COLUMNS_POD_OUTBOUND), + outbound_condition) ), ( - ("", "", [], "pod2pod", "app:label"), - "SELECT {} FROM {} WHERE " - "ilike(destinationPodLabels, '%app:label%') " - "AND ilike(sourcePodLabels, '%app:label%') " - "GROUP BY {} ".format( - ", ".join(ad.AGG_FLOW_TABLE_COLUMNS), - table_name, - ", ".join(ad.DF_AGG_GRP_COLUMNS + ['flowEndSeconds']), - ) + ("", "", [], "pod", "", "", "", "TestPodName"), + "SELECT * FROM " + "(SELECT {0} FROM {1} WHERE {2} GROUP BY {3}) " + "UNION ALL " + "(SELECT {4} FROM {1} WHERE {5} GROUP BY {3}) ".format( + ", ".join(ad.AGG_FLOW_TABLE_COLUMNS_POD_INBOUND), + table_name, inbound_condition_podname, + ", ".join(ad.DF_AGG_GRP_COLUMNS_POD + ['flowEndSeconds']), + ", ".join(ad.AGG_FLOW_TABLE_COLUMNS_POD_OUTBOUND), + outbound_condition_podname) ), ( - ("", "", [], "pod2svc", ""), - "SELECT {} FROM {} WHERE " - "destinationServicePortName <> '' " - "GROUP BY {} ".format( - ", ".join(ad.AGG_FLOW_TABLE_COLUMNS), - table_name, - ", ".join(ad.DF_AGG_GRP_COLUMNS + ['flowEndSeconds']), - ) + ("", "", ["mock_ns", "mock_ns2"], "pod", + "\"app\":\"clickhouse\"", "", "", ""), + "SELECT * FROM " + "(SELECT {0} FROM {1} WHERE {2} {6} GROUP BY {3}) " + "UNION ALL " + "(SELECT {4} FROM {1} WHERE {5} {6} GROUP BY {3}) ".format( + ", ".join(ad.AGG_FLOW_TABLE_COLUMNS_POD_INBOUND), + table_name, inbound_condition, + ", ".join(ad.DF_AGG_GRP_COLUMNS_POD + ['flowEndSeconds']), + ", ".join(ad.AGG_FLOW_TABLE_COLUMNS_POD_OUTBOUND), + outbound_condition, + "AND sourcePodNamespace NOT IN ('mock_ns', 'mock_ns2') AND" + " destinationPodNamespace NOT IN ('mock_ns', 'mock_ns2')") + ), + ( + ("", "", [], "svc", "", "", "", ""), + "SELECT {} FROM {} WHERE " + "destinationServicePortName <> '' " + "GROUP BY {} ".format( + ", ".join(ad.AGG_FLOW_TABLE_COLUMNS_SVC), + table_name, + ", ".join(ad.DF_AGG_GRP_COLUMNS_SVC + ['flowEndSeconds']), + ) + ), + ( + ("", "", [], "svc", "", "", "test-service-port-name", ""), + "SELECT {} FROM {} WHERE " + "destinationServicePortName = 'test-service-port-name' " + "GROUP BY {} ".format( + ", ".join(ad.AGG_FLOW_TABLE_COLUMNS_SVC), + table_name, + ", ".join(ad.DF_AGG_GRP_COLUMNS_SVC + ['flowEndSeconds']), + ) ), ], ) def test_generate_sql_query(test_input, expected_sql_query): - start_time, end_time, ns_ignore_list, agg_flow, pod2podlabel = test_input + (start_time, end_time, ns_ignore_list, agg_flow, pod_label, external_ip, + svc_port_name, pod_name) = test_input sql_query = ad.generate_tad_sql_query( - start_time, end_time, ns_ignore_list, agg_flow, pod2podlabel) + start_time, end_time, ns_ignore_list, agg_flow, pod_label, + external_ip, svc_port_name, pod_name) assert sql_query == expected_sql_query -diff_secs_throughput_list = [ - [4004471308], [4006917952], [4006373555], - [10004969097], [4005703059], [4005517222], - [4007380032], [4005277827], [4005435632], - [4004723289], [4005760579], [4005486294], - [4006172825], [4005486294], [4005561235], - [1005533779], [4005486294], [4004706899], - [4006355667], [4005277827], [4005277827], - [4005355097], [4005615814], [4004496934], - [4004839744], [4005486294], [4005370905], - [4005277827], [4005277827], [4006503308], - [4006191046], [4004834307], [4006201196], - [4004465468], [4006448435], [4005542681]] +# Introduced 2 anomalies in between the lists +throughput_list = [ + 4007380032, 4006917952, 4004471308, 4005277827, 4005486294, + 4005435632, 4006917952, 4004471308, 4005277827, 4005486294, + 4005435632, 4006917952, 4004471308, 4005277827, 4005486294, + 4005435632, 4006917952, 4004471308, 4005277827, 4005486294, + 4005435632, 4006917952, 4004471308, 4005277827, 4005486294, + 4005435632, 4006917952, 4004471308, 4005277827, 4005486294, + 4005435632, 4006917952, 4004471308, 4005277827, 4005486294, + 4005435632, 4004465468, 4005336400, 4006201196, 4005546675, + 4005703059, 4004631769, 4006915708, 4004834307, 4005943619, + 4005760579, 4006503308, 4006580124, 4006524102, 4005521494, + 4004706899, 4006355667, 4006373555, 4005542681, 4006120227, + 4003599734, 4005561673, 4005682768, 10004969097, 4005517222, + 1005533779, 4005370905, 4005589772, 4005328806, 4004926121, + 4004496934, 4005615814, 4005798822, 50007861276, 4005396697, + 4005148294, 4006448435, 4005355097, 4004335558, 4005389043, + 4004839744, 4005556492, 4005796992, 4004497248, 4005988134, + 205881027, 4004638304, 4006191046, 4004723289, 4006172825, + 4005561235, 4005658636, 4006005936, 3260272025, 4005589772] expected_ewma_row_list = [ - 2002235654.0, 3004576803.0, 3505475179.0, - 6755222138.0, 5380462598.5, 4692989910.25, - 4350184971.125, 4177731399.0625, 4091583515.53125, - 4048153402.265625, 4026956990.6328125, 4016221642.3164062, - 4011197233.658203, 4008341763.8291016, 4006951499.414551, - 2506242639.2072754, 3255864466.6036377, 3630285682.801819, - 3818320674.9009094, 3911799250.9504547, 3958538538.9752274, - 3981946817.9876137, 3993781315.993807, 3999139124.9969034, - 4001989434.4984517, 4003737864.2492256, 4004554384.624613, - 4004916105.8123064, 4005096966.406153, 4005800137.2030764, - 4005995591.601538, 4005414949.300769, 4005808072.6503844, - 4005136770.3251925, 4005792602.662596, 4005667641.831298] + 2003690016.0, 3005303984.0, 3504887646.0, + 3755082736.5, 3880284515.25, 3942860073.625, + 3974889012.8125, 3989680160.40625, 3997478993.703125, + 4001482643.8515625, 4003459137.9257812, 4005188544.9628906, + 4004829926.4814453, 4005053876.7407227, 4005270085.3703613, + 4005352858.6851807, 4006135405.3425903, 4005303356.671295, + 4005290591.8356476, 4005388442.917824, 4005412037.458912, + 4006164994.729456, 4005318151.364728, 4005297989.182364, + 4005392141.5911818, 4005413886.795591, 4006165919.3977957, + 4005318613.698898, 4005298220.349449, 4005392257.1747246, + 4005413944.5873623, 4006165948.293681, 4005318628.1468406, + 4005298227.5734205, 4005392260.7867103, 4005413946.3933554, + 4004939707.1966777, 4005138053.598339, 4005669624.7991695, + 4005608149.899585, 4005655604.4497924, 4005143686.7248964, + 4006029697.362448, 4005432002.181224, 4005687810.590612, + 4005724194.795306, 4006113751.397653, 4006346937.698827, + 4006435519.8494134, 4005978506.9247065, 4005342702.962353, + 4005849184.9811764, 4006111369.990588, 4005827025.495294, + 4005973626.2476473, 4004786680.1238236, 4005174176.5619116, + 4005428472.280956, 7005198784.640478, 5505358003.320239, + 3255445891.1601195, 3630408398.08006, 3817999085.04003, + 3911663945.520015, 3958295033.2600074, 3981395983.630004, + 3993505898.815002, 3999652360.407501, 27003756818.20375, + 15504576757.601875, 9754862525.800938, 6880655480.400469, + 5443005288.700234, 4723670423.350117, 4364529733.175058, + 4184684738.587529, 4095120615.2937646, 4050458803.646882, + 4027478025.823441, 4016733079.9117203, 2111307053.4558601, + 3057972678.72793, 3532081862.363965, 3768402575.6819825, + 3887287700.340991, 3946424467.6704955, 3976041551.835248, + 3991023743.917624, 3625647884.4588118, 3815618828.229406] @pytest.mark.parametrize( "test_input, expected_output", - [(diff_secs_throughput_list, expected_ewma_row_list), ], + [(throughput_list, expected_ewma_row_list), ], ) def test_calculate_ewma(test_input, expected_output): ewma_row_list = ad.calculate_ewma(test_input) @@ -174,20 +237,23 @@ def test_calculate_ewma(test_input, expected_output): expected_arima_row_list = [ - 40044, 40069, 40063, 40055, - 10006, 49890, 40055, 40106, - 40058, 40054, 42977, 40055, - 37671, 40060, 42326, 37999, - 13902, 36050, 36215, 36385, - 40014, 40047, 40052, 40055, - 40047, 40047, 37220, 37309, - 37394, 37474, 37556, 37626, - 37686, 37758, 37809, 37878] + 40073, 40069, 40044, 40044, 40052, 40055, 40054, 40065, + 40067, 40054, 40055, 40055, 40056, 40060, 40054, 40054, + 40054, 40056, 40059, 40054, 40053, 40054, 40056, 40059, + 40054, 40053, 40054, 40056, 40059, 40054, 40053, 40054, + 40055, 40059, 40054, 40053, 40054, 40053, 40051, 40052, + 40055, 40055, 40054, 40053, 40057, 40055, 40056, 40057, + 40059, 40062, 40062, 40058, 40056, 40059, 40059, 40058, + 40057, 40051, 40053, 98895, 63726, 11685, 56532, 39834, + 39838, 39841, 39844, 39847, 39850, 51939, 42449, 41960, + 41624, 41417, 41407, 41379, 41361, 41344, 41328, 41311, + 41296, 37786, 39971, 39973, 39973, 39975, 39975, 39976, + 39977, 39795] @pytest.mark.parametrize( "test_input, expected_output", - [(diff_secs_throughput_list, expected_arima_row_list), ], + [(throughput_list, expected_arima_row_list), ], ) def test_calculate_arima(test_input, expected_output): arima_list = ad.calculate_arima(test_input) @@ -197,80 +263,118 @@ def test_calculate_arima(test_input, expected_output): stddev = 4.9198515356827E9 -# Introduced 2 anomalies in between the lists -throughput_list = [ - 4.0044713079999986E9, 4.006917951999995E9, 4.006373555E9, - 4.0064328864065647E9, 1.0001208441920076E10, 4.991089312943837E9, - 4.0055171211742964E9, 4.626073573776036E9, 4.534010184967284E9, - 4.466799561945112E9, 4.415333279500873E9, 4.374398755270068E9, - 4.34118839784096E9, 4.313543957535702E9, 4.2901708280305667E9, - 4.2701589854351115E9, 2.8325782582807236E9, 2.878340351423177E9, - 3.385947522781177E9, 3.5646004395330224E9, 3.6667007752395616E9, - 3.734521818953146E9, 3.783516303056411E9, 3.820958451443989E9, - 3.8506855053810143E10, 3.8756093718031974E9, 3.8975599768541164E9, - 3.9181189080811553E10, 3.9389864233827744E9, 3.958470156413464E9, - 3.9601061005316563E9, 3.9615580705907497E9, 3.9628158138629975E9, - 3.9641264728285837E9, 3.9652109214908457E9, 3.9664069982877073E9] - expanded_arima_row_list = [ - 4004471307.9999986, 4006917951.999995, 4006373555.0, - 4005589532.039936, 10006702026.604738, 4989043846.332678, - 4005517137.571196, 4010659163.493726, 4005892608.887371, - 4005450688.4167933, 4297790490.967786, 4005582738.5384636, - 3767119154.3932557, 4006051692.1707573, 4232602969.5832963, - 3799968523.9055543, 1390254377.7501612, 3605038207.7053514, - 3621518502.8757067, 3638555962.676026, 4001438213.3433123, - 4004721638.3185096, 4005245177.0466213, 4005527131.198336, - 4004788075.4534545, 4004792510.4156027, 3722002449.8679004, - 3730981405.6425004, 3739427992.656704, 3747434614.2705755, - 3755687741.4894, 3762658349.287123, 3768651030.8630514, - 3775820305.253907, 3780919888.648877, 3787800077.0179124 -] + 4007380031.999998, 4006917951.999995, 4004471307.9999986, + 4004471338.531294, 4005277824.246516, 4005540171.967446, + 4005417708.3450212, 4006596903.605313, 4006766397.4873962, + 4005464712.562591, 4005505914.47936, 4005509545.4651246, + 4005696322.736301, 4006033558.7986755, 4005433015.8854957, + 4005412742.398469, 4005435189.6809897, 4005639605.435888, + 4005955240.943543, 4005417006.2521853, 4005394611.0058765, + 4005423107.5286036, 4005615008.6452837, 4005930744.817921, + 4005411345.290995, 4005387458.632538, 4005419217.319681, + 4005600201.8535633, 4005919351.798048, 4005407929.6769786, + 4005383100.809166, 4005417053.9240017, 4005590088.792332, + 4005912966.4215846, 4005405453.2374043, 4005380049.8736587, + 4005415584.314455, 4005347223.1615577, 4005118865.3445063, + 4005284593.2700768, 4005512211.463756, 4005544131.8972206, + 4005486224.1983933, 4005378548.2363796, 4005763254.8659606, + 4005506442.62549, 4005639283.9637246, 4005700488.7304854, + 4005994680.054991, 4006202832.9615264, 4006238860.0935726, + 4005890090.9782805, 4005645200.299684, 4005901661.0918307, + 4005961401.6530385, 4005890634.274731, 4005710152.5932317, + 4005117957.4774227, 4005335916.072919, 9889541964.746168, + 6372659702.031328, 1168584960.8614435, 5653370820.402384, + 3983485999.737595, 3983817981.649843, 3984128774.440398, + 3984428874.23501, 3984757281.599942, 3985066198.7098646, + 5193940800.99889, 4244915044.8262258, 4196033928.4701886, + 4163392386.852055, 4141770112.8968925, 4139754955.6538, + 4137999352.749037, 4136188331.593653, 4134572893.2860427, + 4132811885.9396014, 4131164397.6772537, 4129639059.839085, + 3778811317.851506, 3997193748.126457, 3997318079.4362803, + 3997388443.8144045, 3997506294.672869, 3997592427.864033, + 3997686533.833508, 3997782900.975419, 3979560170.4104342] -expected_anomaly_list = [ +expected_anomaly_list_arima = [ + False, False, False, False, False, False, + False, False, False, False, False, False, False, False, False, False, False, False, False, False, False, False, False, False, False, False, False, False, False, False, False, False, False, False, False, False, - True, False, False, True, False, False, + False, False, False, False, False, False, + False, False, False, False, False, False, + False, False, False, False, False, False, + False, False, False, False, True, True, + True, False, False, False, False, False, + False, False, True, False, False, False, + False, False, False, False, False, False, + False, False, False, False, False, False, False, False, False, False, False, False] @pytest.mark.parametrize( "test_input, expected_arima_anomaly", - [(["", "", "", "", "", "", "", stddev, "", "", expanded_arima_row_list, - throughput_list], expected_anomaly_list), ], + [((throughput_list, stddev), expected_anomaly_list_arima), ], ) def test_calculate_arima_anomaly(test_input, expected_arima_anomaly): - anomaly_list = ad.calculate_arima_anomaly(test_input) + throughput_list, stddev = test_input + anomaly_list = ad.calculate_arima_anomaly(throughput_list, stddev) assert anomaly_list == expected_arima_anomaly +expected_anomaly_list_ewma = [ + False, False, False, False, False, False, + False, False, False, False, False, False, + False, False, False, False, False, False, + False, False, False, False, False, False, + False, False, False, False, False, False, + False, False, False, False, False, False, + False, False, False, False, False, False, + False, False, False, False, False, False, + False, False, False, False, False, False, + False, False, False, False, False, False, + False, False, False, False, False, False, + False, False, True, True, True, False, + False, False, False, False, False, False, + False, False, False, False, False, False, + False, False, False, False, False, False] + + @pytest.mark.parametrize( "test_input, expected_ewma_anomaly", - [(["", "", "", "", "", "", "", stddev, "", "", expected_ewma_row_list, - throughput_list], expected_anomaly_list), ], + [((throughput_list, stddev), expected_anomaly_list_ewma), ], ) def test_calculate_ewma_anomaly(test_input, expected_ewma_anomaly): - anomaly_list = ad.calculate_ewma_anomaly(test_input) + throughput_list, stddev = test_input + anomaly_list = ad.calculate_ewma_anomaly(throughput_list, stddev) assert anomaly_list == expected_ewma_anomaly expected_dbscan_anomaly_list = [ - False, False, False, False, True, - True, False, False, False, False, - False, False, False, False, False, - False, True, True, False, False, - False, False, False, False, True, - False, False, True, False, False, - False, False, False, False, False, False] + False, False, False, False, False, False, + False, False, False, False, False, False, + False, False, False, False, False, False, + False, False, False, False, False, False, + False, False, False, False, False, False, + False, False, False, False, False, False, + False, False, False, False, False, False, + False, False, False, False, False, False, + False, False, False, False, False, False, + False, False, False, False, True, False, + True, False, False, False, False, False, + False, False, True, False, False, False, + False, False, False, False, False, False, + False, False, True, False, False, False, + False, False, False, False, True, False] @pytest.mark.parametrize( "test_input, expected_dbscan_anomaly", - [(["", "", "", "", "", "", "", "", "", "", "", throughput_list], + [((throughput_list, stddev), expected_dbscan_anomaly_list), ], ) def test_calculate_dbscan_anomaly(test_input, expected_dbscan_anomaly): - anomaly_list = ad.calculate_dbscan_anomaly(test_input) + throughput_list, stddev = test_input + anomaly_list = ad.calculate_dbscan_anomaly(throughput_list, stddev) assert anomaly_list == expected_dbscan_anomaly