diff --git a/ci/kind/test-e2e-kind.sh b/ci/kind/test-e2e-kind.sh index b7237550..659cb1b8 100755 --- a/ci/kind/test-e2e-kind.sh +++ b/ci/kind/test-e2e-kind.sh @@ -173,7 +173,7 @@ function run_test { rm -rf $TMP_DIR sleep 1 - go test -v -timeout=30m antrea.io/theia/test/e2e -provider=kind --logs-export-dir=$ANTREA_LOG_DIR --skip=$skiplist + go test -v -timeout=45m antrea.io/theia/test/e2e -provider=kind --logs-export-dir=$ANTREA_LOG_DIR --skip=$skiplist } echo "======== Test encap mode ==========" diff --git a/docs/throughput-anomaly-detection.md b/docs/throughput-anomaly-detection.md index 6ea4ce0f..4c4e1bb8 100644 --- a/docs/throughput-anomaly-detection.md +++ b/docs/throughput-anomaly-detection.md @@ -69,6 +69,34 @@ $ theia throughput-anomaly-detection run --algo "ARIMA" Successfully started Throughput Anomaly Detection job with name tad-1234abcd-1234-abcd-12ab-12345678abcd ``` +Throughput Anomaly Detection also provides support for aggregated throughput +anomaly detection. +There are three different types of aggregations that are included. + +- `external` : Aggregated flows for inbound traffic to external IP, + user could provide external-IP using `external-ip` argument for further + filtering. +- `pod`: Aggregated flows for inbound/outbound pod traffic. +- `svc`: Aggregated flows for traffic to service port, user could + provide a destination port name using `svc-name-port` argument for + further filtering. + +For aggregated flows `pod`, user can provide the following filter arguments. + +- `pod-label`: The argument aggregates inbound/outbound traffic using Pod + labels. +- `pod-name`: The argument aggregates inbound/outbound traffic using Pod name. +- `pod-namespace`: The argument aggregates inbound/outbound traffic using + Pod namespace. However, this argument only works as a combination to any of + the above two arguments and can not be used alone. + +To start an aggregated throughput anomaly detection, please run the following command: + +```bash +$ theia throughput-anomaly-detection run --algo "ARIMA" --agg-flow pod --pod-label \"test_key\":\"test_value\" +Successfully started Throughput Anomaly Detection job with name tad-1234abcd-1234-abcd-12ab-12345678abcd +``` + The name of the Throughput Anomaly Detection job contains a universally unique identifier ([UUID]( https://en.wikipedia.org/wiki/Universally_unique_identifier)) that is @@ -110,13 +138,24 @@ in table format, run: ```bash $ theia throughput-anomaly-detection retrieve tad-1234abcd-1234-abcd-12ab-12345678abcd -id sourceIP sourceTransportPort destinationIP destinationTransportPort flowStartSeconds flowEndSeconds throughput algoCalc anomaly -1234abcd-1234-abcd-12ab-12345678abcd 10.10.1.25 58076 10.10.1.33 5201 2022-08-11T06:26:54Z 2022-08-11 08:24:54 10004969097.000000000000000000 4.0063773860532994E9 true -1234abcd-1234-abcd-12ab-12345678abcd 10.10.1.25 58076 10.10.1.33 5201 2022-08-11T06:26:54Z 2022-08-11 08:06:54 4005703059.000000000000000000 1.0001208294655691E10 true -1234abcd-1234-abcd-12ab-12345678abcd 10.10.1.25 58076 10.10.1.33 5201 2022-08-11T06:26:54Z 2022-08-11 08:34:54 50007861276.000000000000000000 3.9735065921281104E9 true +id sourceIP sourceTransportPort destinationIP destinationTransportPort flowStartSeconds flowEndSeconds throughput aggType algoType algoCalc anomaly +1234abcd-1234-abcd-12ab-12345678abcd 10.10.1.25 58076 10.10.1.33 5201 2022-08-11T06:26:54Z 2022-08-11T08:06:54Z 4.005703059e+09 None ARIMA 1.0001208441920074e+10 true +1234abcd-1234-abcd-12ab-12345678abcd 10.10.1.25 58076 10.10.1.33 5201 2022-08-11T06:26:54Z 2022-08-11T08:24:54Z 1.0004969097e+10 None ARIMA 4.006432886406564e+09 true +1234abcd-1234-abcd-12ab-12345678abcd 10.10.1.25 58076 10.10.1.33 5201 2022-08-11T06:26:54Z 2022-08-11T08:34:54Z 5.0007861276e+10 None ARIMA 3.9735067954945493e+09 true +``` + +Aggregated Throughput Anomaly Detection has different columns based on the +aggregation type. +e.g. when aggregation type is `svc`, the output is the following + +```bash +$ theia throughput-anomaly-detection retrieve tad-5ca4413d-6730-463e-8f95-86032ba28a4f +id destinationServicePortName flowEndSeconds throughput aggType algoType algoCalc anomaly +5ca4413d-6730-463e-8f95-86032ba28a4f test_serviceportname 2022-08-11T08:24:54Z 5.0024845485e+10 svc ARIMA 2.0863933021708477e+10 true +5ca4413d-6730-463e-8f95-86032ba28a4f test_serviceportname 2022-08-11T08:34:54Z 2.5003930638e+11 svc ARIMA 1.9138281301304165e+10 true ``` -User may also save the result in an output file in json format +User may also save the result in an output file in json format. ### List all throughput anomaly detection jobs diff --git a/pkg/theia/commands/anomaly_detection_retrieve.go b/pkg/theia/commands/anomaly_detection_retrieve.go index 34eb9781..29076f96 100644 --- a/pkg/theia/commands/anomaly_detection_retrieve.go +++ b/pkg/theia/commands/anomaly_detection_retrieve.go @@ -106,7 +106,7 @@ func throughputAnomalyDetectionRetrieve(cmd *cobra.Command, args []string) error } else { var result [][]string switch tad.Stats[0].AggType { - case "e2e": + case "None": result = append(result, []string{"id", "sourceIP", "sourceTransportPort", "destinationIP", "destinationTransportPort", "flowStartSeconds", "flowEndSeconds", "throughput", "aggType", "algoType", "algoCalc", "anomaly"}) for _, p := range tad.Stats { result = append(result, []string{p.Id, p.SourceIP, p.SourceTransportPort, p.DestinationIP, p.DestinationTransportPort, p.FlowStartSeconds, p.FlowEndSeconds, p.Throughput, p.AggType, p.AlgoType, p.AlgoCalc, p.Anomaly}) diff --git a/pkg/theia/commands/anomaly_detection_retrieve_test.go b/pkg/theia/commands/anomaly_detection_retrieve_test.go index 55c776da..0bf3cf78 100644 --- a/pkg/theia/commands/anomaly_detection_retrieve_test.go +++ b/pkg/theia/commands/anomaly_detection_retrieve_test.go @@ -45,7 +45,7 @@ func TestAnomalyDetectorRetrieve(t *testing.T) { filePath string }{ { - name: "Valid case e2e", + name: "Valid case No agg_type", testServer: httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { switch strings.TrimSpace(r.URL.Path) { case fmt.Sprintf("/apis/intelligence.theia.antrea.io/v1alpha1/throughputanomalydetectors/%s", tadName): @@ -57,7 +57,7 @@ func TestAnomalyDetectorRetrieve(t *testing.T) { Id: tadName, Anomaly: "true", AlgoCalc: "1234567", - AggType: "e2e", + AggType: "None", }}, } w.Header().Set("Content-Type", "application/json") @@ -66,7 +66,7 @@ func TestAnomalyDetectorRetrieve(t *testing.T) { } })), tadName: "tad-1234abcd-1234-abcd-12ab-12345678abcd", - expectedMsg: []string{"id sourceIP sourceTransportPort destinationIP destinationTransportPort flowStartSeconds flowEndSeconds throughput aggType algoType algoCalc anomaly", "e2e 1234567 true"}, + expectedMsg: []string{"id sourceIP sourceTransportPort destinationIP destinationTransportPort flowStartSeconds flowEndSeconds throughput aggType algoType algoCalc anomaly", "None 1234567 true"}, expectedErrorMsg: "", }, { diff --git a/pkg/theia/commands/anomaly_detection_run_test.go b/pkg/theia/commands/anomaly_detection_run_test.go index 814b7ff4..7c737343 100644 --- a/pkg/theia/commands/anomaly_detection_run_test.go +++ b/pkg/theia/commands/anomaly_detection_run_test.go @@ -129,7 +129,7 @@ func TestAnomalyDetectionRun(t *testing.T) { cmd.Flags().String("start-time", "2006-01-02 15:04:05", "") cmd.Flags().String("end-time", "2006-01-03 16:04:05", "") cmd.Flags().String("ns-ignore-list", "[\"kube-system\",\"flow-aggregator\",\"flow-visibility\"]", "") - cmd.Flags().String("agg-flow", "pod", "") + cmd.Flags().String("agg-flow", "external", "") cmd.Flags().String("pod-label", "app:label1", "") cmd.Flags().String("pod-name", "testpodname", "") cmd.Flags().String("pod-namespace", "testpodnamespace", "") @@ -236,6 +236,38 @@ func TestThroughputAnomalyDetectionAlgo(t *testing.T) { name: "Invalid executor-memory", expectedErrorMsg: "executor-memory should conform to the Kubernetes resource quantity convention", }, + { + name: "Unspecified agg-flow", + expectedErrorMsg: ErrorMsgUnspecifiedCase, + }, + { + name: "Unspecified pod-label", + expectedErrorMsg: ErrorMsgUnspecifiedCase, + }, + { + name: "Unspecified pod-name", + expectedErrorMsg: ErrorMsgUnspecifiedCase, + }, + { + name: "Unspecified pod-namespace", + expectedErrorMsg: ErrorMsgUnspecifiedCase, + }, + { + name: "Invalid pod-namespace", + expectedErrorMsg: "argument can not be used alone", + }, + { + name: "Unspecified external-ip", + expectedErrorMsg: ErrorMsgUnspecifiedCase, + }, + { + name: "Unspecified svc-port-name", + expectedErrorMsg: ErrorMsgUnspecifiedCase, + }, + { + name: "Invalid agg-flow", + expectedErrorMsg: "aggregated flow type should be 'pod' or 'external' or 'svc'", + }, { name: "Unspecified use-cluster-ip", expectedErrorMsg: ErrorMsgUnspecifiedCase, @@ -346,6 +378,99 @@ func TestThroughputAnomalyDetectionAlgo(t *testing.T) { cmd.Flags().String("driver-memory", "1m", "") cmd.Flags().String("executor-core-request", "1", "") cmd.Flags().String("executor-memory", "mock_executor-memory", "") + case "Unspecified agg-flow": + cmd.Flags().String("algo", "ARIMA", "") + cmd.Flags().String("start-time", "2006-01-02 15:04:05", "") + cmd.Flags().String("end-time", "2006-01-03 16:04:05", "") + cmd.Flags().String("ns-ignore-list", "[\"kube-system\",\"flow-aggregator\",\"flow-visibility\"]", "") + cmd.Flags().Int32("executor-instances", 1, "") + cmd.Flags().String("driver-core-request", "1", "") + cmd.Flags().String("driver-memory", "1m", "") + cmd.Flags().String("executor-core-request", "1", "") + cmd.Flags().String("executor-memory", "1m", "") + case "Unspecified pod-label": + cmd.Flags().String("algo", "ARIMA", "") + cmd.Flags().String("start-time", "2006-01-02 15:04:05", "") + cmd.Flags().String("end-time", "2006-01-03 16:04:05", "") + cmd.Flags().String("ns-ignore-list", "[\"kube-system\",\"flow-aggregator\",\"flow-visibility\"]", "") + cmd.Flags().Int32("executor-instances", 1, "") + cmd.Flags().String("driver-core-request", "1", "") + cmd.Flags().String("driver-memory", "1m", "") + cmd.Flags().String("executor-core-request", "1", "") + cmd.Flags().String("executor-memory", "1m", "") + cmd.Flags().String("agg-flow", "pod", "") + case "Unspecified pod-name": + cmd.Flags().String("algo", "ARIMA", "") + cmd.Flags().String("start-time", "2006-01-02 15:04:05", "") + cmd.Flags().String("end-time", "2006-01-03 16:04:05", "") + cmd.Flags().String("ns-ignore-list", "[\"kube-system\",\"flow-aggregator\",\"flow-visibility\"]", "") + cmd.Flags().Int32("executor-instances", 1, "") + cmd.Flags().String("driver-core-request", "1", "") + cmd.Flags().String("driver-memory", "1m", "") + cmd.Flags().String("executor-core-request", "1", "") + cmd.Flags().String("executor-memory", "1m", "") + cmd.Flags().String("agg-flow", "pod", "") + cmd.Flags().String("pod-label", "mock_pod-label", "") + case "Unspecified pod-namespace": + cmd.Flags().String("algo", "ARIMA", "") + cmd.Flags().String("start-time", "2006-01-02 15:04:05", "") + cmd.Flags().String("end-time", "2006-01-03 16:04:05", "") + cmd.Flags().String("ns-ignore-list", "[\"kube-system\",\"flow-aggregator\",\"flow-visibility\"]", "") + cmd.Flags().Int32("executor-instances", 1, "") + cmd.Flags().String("driver-core-request", "1", "") + cmd.Flags().String("driver-memory", "1m", "") + cmd.Flags().String("executor-core-request", "1", "") + cmd.Flags().String("executor-memory", "1m", "") + cmd.Flags().String("agg-flow", "pod", "") + cmd.Flags().String("pod-label", "mock_pod_label", "") + cmd.Flags().String("pod-name", "mock_pod-name", "") + case "Invalid pod-namespace": + cmd.Flags().String("algo", "ARIMA", "") + cmd.Flags().String("start-time", "2006-01-02 15:04:05", "") + cmd.Flags().String("end-time", "2006-01-03 16:04:05", "") + cmd.Flags().String("ns-ignore-list", "[\"kube-system\",\"flow-aggregator\",\"flow-visibility\"]", "") + cmd.Flags().Int32("executor-instances", 1, "") + cmd.Flags().String("driver-core-request", "1", "") + cmd.Flags().String("driver-memory", "1m", "") + cmd.Flags().String("executor-core-request", "1", "") + cmd.Flags().String("executor-memory", "1m", "") + cmd.Flags().String("agg-flow", "pod", "") + cmd.Flags().String("pod-label", "", "") + cmd.Flags().String("pod-name", "", "") + cmd.Flags().String("pod-namespace", "mock_pod-namespace", "") + case "Unspecified external-ip": + cmd.Flags().String("algo", "ARIMA", "") + cmd.Flags().String("start-time", "2006-01-02 15:04:05", "") + cmd.Flags().String("end-time", "2006-01-03 16:04:05", "") + cmd.Flags().String("ns-ignore-list", "[\"kube-system\",\"flow-aggregator\",\"flow-visibility\"]", "") + cmd.Flags().Int32("executor-instances", 1, "") + cmd.Flags().String("driver-core-request", "1", "") + cmd.Flags().String("driver-memory", "1m", "") + cmd.Flags().String("executor-core-request", "1", "") + cmd.Flags().String("executor-memory", "1m", "") + cmd.Flags().String("agg-flow", "external", "") + case "Unspecified svc-port-name": + cmd.Flags().String("algo", "ARIMA", "") + cmd.Flags().String("start-time", "2006-01-02 15:04:05", "") + cmd.Flags().String("end-time", "2006-01-03 16:04:05", "") + cmd.Flags().String("ns-ignore-list", "[\"kube-system\",\"flow-aggregator\",\"flow-visibility\"]", "") + cmd.Flags().Int32("executor-instances", 1, "") + cmd.Flags().String("driver-core-request", "1", "") + cmd.Flags().String("driver-memory", "1m", "") + cmd.Flags().String("executor-core-request", "1", "") + cmd.Flags().String("executor-memory", "1m", "") + cmd.Flags().String("agg-flow", "svc", "") + case "Invalid agg-flow": + cmd.Flags().String("algo", "ARIMA", "") + cmd.Flags().String("start-time", "2006-01-02 15:04:05", "") + cmd.Flags().String("end-time", "2006-01-03 16:04:05", "") + cmd.Flags().String("ns-ignore-list", "[\"kube-system\",\"flow-aggregator\",\"flow-visibility\"]", "") + cmd.Flags().Int32("executor-instances", 1, "") + cmd.Flags().String("driver-core-request", "1", "") + cmd.Flags().String("driver-memory", "1m", "") + cmd.Flags().String("executor-core-request", "1", "") + cmd.Flags().String("executor-memory", "1m", "") + cmd.Flags().String("agg-flow", "mock_agg-flow", "") case "Unspecified use-cluster-ip": cmd.Flags().String("algo", "ARIMA", "") cmd.Flags().String("start-time", "2006-01-02 15:04:05", "") @@ -356,6 +481,8 @@ func TestThroughputAnomalyDetectionAlgo(t *testing.T) { cmd.Flags().String("driver-memory", "1m", "") cmd.Flags().String("executor-core-request", "1", "") cmd.Flags().String("executor-memory", "1m", "") + cmd.Flags().String("agg-flow", "svc", "") + cmd.Flags().String("svc-port-name", "mock_svc_name", "") } err := throughputAnomalyDetectionAlgo(cmd, []string{}) if tt.expectedErrorMsg == "" { diff --git a/plugins/anomaly-detection/anomaly_detection.py b/plugins/anomaly-detection/anomaly_detection.py index dc5756ea..c0c42589 100644 --- a/plugins/anomaly-detection/anomaly_detection.py +++ b/plugins/anomaly-detection/anomaly_detection.py @@ -395,7 +395,7 @@ def filter_df_with_true_anomalies( if ret_plot.count() == 0: ret_plot = ret_plot.collect() if agg_flow == "": - agg_type = "e2e" + agg_type = "None" else: agg_type = agg_flow ret_plot.append({ @@ -624,7 +624,7 @@ def assign_flow_type(prepared_DF, agg_flow=None, direction=None): elif agg_flow == "pod": prepared_DF = prepared_DF.withColumn('aggType', f.lit("pod")) else: - prepared_DF = prepared_DF.withColumn('aggType', f.lit("e2e")) + prepared_DF = prepared_DF.withColumn('aggType', f.lit("None")) return prepared_DF diff --git a/test/e2e/throughputanomalydetection_test.go b/test/e2e/throughputanomalydetection_test.go index 537364c0..f67057f6 100644 --- a/test/e2e/throughputanomalydetection_test.go +++ b/test/e2e/throughputanomalydetection_test.go @@ -38,6 +38,8 @@ const ( tadretrieveCmd = "./theia throughput-anomaly-detection retrieve" ) +var e2eMutex sync.Mutex + func TestAnomalyDetection(t *testing.T) { config := FlowVisibilitySetUpConfig{ withSparkOperator: true, @@ -101,7 +103,7 @@ func prepareFlowTable(t *testing.T, connect *sql.DB) { // Example output: Successfully created Throughput Anomaly Detection job with name tad-eec9d1be-7204-4d50-8f57-d9c8757a2668 func testThroughputAnomalyDetectionAlgo(t *testing.T, data *TestData, connect *sql.DB) { prepareFlowTable(t, connect) - stdout, jobName, err := tadrunJob(t, data, "ARIMA") + stdout, jobName, err := tadrunJob(t, data, "ARIMA", "None") require.NoError(t, err) assert := assert.New(t) assert.Containsf(stdout, fmt.Sprintf("Successfully started Throughput Anomaly Detection job with name: %s", jobName), "stdout: %s", stdout) @@ -114,7 +116,7 @@ func testThroughputAnomalyDetectionAlgo(t *testing.T, data *TestData, connect *s // Example output: Status of this anomaly detection job is COMPLETED func testAnomalyDetectionStatus(t *testing.T, data *TestData, connect *sql.DB) { prepareFlowTable(t, connect) - _, jobName, err := tadrunJob(t, data, "ARIMA") + _, jobName, err := tadrunJob(t, data, "ARIMA", "None") require.NoError(t, err) stdout, err := tadgetJobStatus(t, data, jobName) require.NoError(t, err) @@ -133,7 +135,7 @@ func testAnomalyDetectionStatus(t *testing.T, data *TestData, connect *sql.DB) { // 2022-06-17 15:03:39 N/A tad-c7a9e768-559a-4bfb-b0c8-a0291b4c208c SUBMITTED func testAnomalyDetectionList(t *testing.T, data *TestData, connect *sql.DB) { prepareFlowTable(t, connect) - _, jobName, err := tadrunJob(t, data, "ARIMA") + _, jobName, err := tadrunJob(t, data, "ARIMA", "None") require.NoError(t, err) stdout, err := tadlistJobs(t, data) require.NoError(t, err) @@ -152,7 +154,7 @@ func testAnomalyDetectionList(t *testing.T, data *TestData, connect *sql.DB) { // Example output: Successfully deleted anomaly detection job with name tad-eec9d1be-7204-4d50-8f57-d9c8757a2668 func testAnomalyDetectionDelete(t *testing.T, data *TestData, connect *sql.DB) { prepareFlowTable(t, connect) - _, jobName, err := tadrunJob(t, data, "ARIMA") + _, jobName, err := tadrunJob(t, data, "ARIMA", "None") require.NoError(t, err) err = data.podWaitForReady(defaultTimeout, jobName+"-driver", flowVisibilityNamespace) require.NoError(t, err) @@ -166,78 +168,156 @@ func testAnomalyDetectionDelete(t *testing.T, data *TestData, connect *sql.DB) { } // Example Output -// id sourceIP sourceTransportPort destinationIP destinationTransportPort flowStartSeconds flowEndSeconds throughput algoCalc anomaly -// 4196479b-6e90-462c-b44f-e326baa52686 10.10.1.25 58076 10.10.1.33 5201 2022-08-11T06:26:54Z 2022-08-11T08:24:54Z 1.0004969097e+10 4.006432886406564e+09 true -// 4196479b-6e90-462c-b44f-e326baa52686 10.10.1.25 58076 10.10.1.33 5201 2022-08-11T06:26:54Z 2022-08-11T08:06:54Z 4.005703059e+09 1.0001208441920074e+10 true -// 4196479b-6e90-462c-b44f-e326baa52686 10.10.1.25 58076 10.10.1.33 5201 2022-08-11T06:26:54Z 2022-08-11T08:34:54Z 5.0007861276e+10 3.9735067954945493e+09 true +// id destinationServicePortName flowEndSeconds throughput aggType algoType algoCalc anomaly +// 5ca4413d-6730-463e-8f95-86032ba28a4f test_serviceportname 2022-08-11T08:24:54Z 5.0024845485e+10 svc ARIMA 2.0863933021708477e+10 true +// 5ca4413d-6730-463e-8f95-86032ba28a4f test_serviceportname 2022-08-11T08:34:54Z 2.5003930638e+11 svc ARIMA 1.9138281301304165e+10 true func testAnomalyDetectionRetrieve(t *testing.T, data *TestData, connect *sql.DB) { - prepareFlowTable(t, connect) algoNames := []string{"ARIMA", "EWMA", "DBSCAN"} + // agg_type 'podLabel' stands for agg_type 'pod' with pod-label argument + // agg_type 'podName' stands for agg_type 'pod' with pod-name argument + agg_types := []string{"None", "podName", "podLabel", "svc", "external"} + // Select random algo for the agg_types + aggTypeToAlgoNameMap := make(map[string]string) + for _, agg_type := range agg_types { + aggTypeToAlgoNameMap[agg_type] = algoNames[randInt(t, int64(len(algoNames)))] + } + // Create a worker pool with maximum size of 3 + pool := make(chan int, len(algoNames)) + var ( + wg sync.WaitGroup + poolIdx int + ) + prepareFlowTable(t, connect) result_map := map[string]map[string]string{ "ARIMA": { - "4.006": "true", + "4.005": "true", "1.000": "true", - "3.973": "true"}, + "5.000": "true", + "2.500": "true", + "5.002": "true", + "2.003": "true", + "2.002": "true", + }, "EWMA": { - "2.700": "true", - "1.550": "true", - "9.755": "true"}, + "4.004": "true", + "4.005": "true", + "4.006": "true", + "5.000": "true", + "2.002": "true", + "2.003": "true", + "2.500": "true", + }, "DBSCAN": { "1.000": "true", "1.005": "true", "5.000": "true", "3.260": "true", - "2.058": "true"}, + "2.058": "true", + "5.002": "true", + "5.027": "true", + "2.500": "true", + "1.029": "true", + "1.630": "true"}, } - for _, algo := range algoNames { - _, jobName, err := tadrunJob(t, data, algo) - require.NoError(t, err) - err = data.podWaitForReady(defaultTimeout, jobName+"-driver", flowVisibilityNamespace) - require.NoError(t, err) - err = waitTADJobComplete(t, data, jobName, tadjobCompleteTimeout) - require.NoErrorf(t, err, "Throughput Anomaly Detection Spark job failed to complete") - stdout, err := tadretrieveJobResult(t, data, jobName) - resultArray := strings.Split(stdout, "\n") - assert := assert.New(t) - length := len(resultArray) - assert.GreaterOrEqualf(length, 3, "stdout: %s", stdout) - assert.Containsf(stdout, "throughput", "stdout: %s", stdout) - assert.Containsf(stdout, "algoCalc", "stdout: %s", stdout) - assert.Containsf(stdout, "anomaly", "stdout: %s", stdout) - - for i := 1; i < length; i++ { - // check metrics' value - resultArray[i] = strings.TrimSpace(resultArray[i]) - if resultArray[i] != "" { - resultArray[i] = strings.ReplaceAll(resultArray[i], "\t", " ") - tadoutputArray := strings.Fields(resultArray[i]) - anomaly_output := tadoutputArray[11] - assert.Equal(12, len(tadoutputArray), "tadoutputArray: %s", tadoutputArray) - switch algo { - case "ARIMA": - algo_throughput := tadoutputArray[10][:5] - assert.Equal(result_map["ARIMA"][algo_throughput], anomaly_output, "Anomaly outputs dont match in tadoutputArray: %s", tadoutputArray) - case "EWMA": - algo_throughput := tadoutputArray[10][:5] - assert.Equal(result_map["EWMA"][algo_throughput], anomaly_output, "Anomaly outputs dont match in tadoutputArray: %s", tadoutputArray) - case "DBSCAN": - throughput := tadoutputArray[7][:5] - assert.Equal(result_map["DBSCAN"][throughput], anomaly_output, "Anomaly outputs dont match in tadoutputArray: %s", tadoutputArray) - } + assert_variable_map := map[string]map[string]int{ + "None": { + "tadoutputArray_len": 12, + "anomaly_output_idx": 11, + "throughput_idx": 7}, + "podName": { + "tadoutputArray_len": 10, + "anomaly_output_idx": 9, + "throughput_idx": 5}, + "podLabel": { + "tadoutputArray_len": 11, + "anomaly_output_idx": 10, + "throughput_idx": 6}, + "external": { + "tadoutputArray_len": 8, + "anomaly_output_idx": 7, + "throughput_idx": 3}, + "svc": { + "tadoutputArray_len": 8, + "anomaly_output_idx": 7, + "throughput_idx": 3}, + } + poolIdx = 0 + for agg_type, algoName := range aggTypeToAlgoNameMap { + poolIdx += 1 + if agg_type == "None" { + for _, algo := range algoNames { + algoName = algo + wg.Add(1) + pool <- poolIdx + go executeRetrieveTest(t, data, algoName, agg_type, result_map, assert_variable_map, pool, &wg) } + } else { + wg.Add(1) + pool <- poolIdx + go executeRetrieveTest(t, data, algoName, agg_type, result_map, assert_variable_map, pool, &wg) } - require.NoError(t, err) - _, err = taddeleteJob(t, data, jobName) - require.NoError(t, err) } + wg.Wait() +} + +func executeRetrieveTest(t *testing.T, data *TestData, algo, agg_type string, result_map map[string]map[string]string, assert_variable_map map[string]map[string]int, pool chan int, wg *sync.WaitGroup) { + var stdout string + defer func() { + <-pool + wg.Done() + }() + _, jobName, err := tadrunJob(t, data, algo, agg_type) + require.NoError(t, err) + err = data.podWaitForReady(defaultTimeout, jobName+"-driver", flowVisibilityNamespace) + require.NoError(t, err) + err = waitTADJobComplete(t, data, jobName, tadjobCompleteTimeout) + require.NoError(t, err) + stdout, err = tadretrieveJobResult(t, data, jobName) + require.NoError(t, err) + resultArray := strings.Split(stdout, "\n") + assert := assert.New(t) + length := len(resultArray) + assert.GreaterOrEqualf(length, 3, "stdout: %s", stdout) + assert.Containsf(stdout, "throughput", "stdout: %s", stdout) + assert.Containsf(stdout, "algoCalc", "stdout: %s", stdout) + assert.Containsf(stdout, "anomaly", "stdout: %s", stdout) + for i := 1; i < length; i++ { + // check metrics' value + resultArray[i] = strings.TrimSpace(resultArray[i]) + if resultArray[i] != "" { + resultArray[i] = strings.ReplaceAll(resultArray[i], "\t", " ") + tadoutputArray := strings.Fields(resultArray[i]) + anomaly_output := tadoutputArray[assert_variable_map[agg_type]["anomaly_output_idx"]] + throughput := tadoutputArray[assert_variable_map[agg_type]["throughput_idx"]][:5] + assert.Equal(assert_variable_map[agg_type]["tadoutputArray_len"], len(tadoutputArray), "tadoutputArray: %s", tadoutputArray) + switch algo { + case "ARIMA": + assert.Equal(result_map["ARIMA"][throughput], anomaly_output, "Anomaly outputs dont match in tadoutputArray: %s", tadoutputArray) + case "EWMA": + assert.Equal(result_map["EWMA"][throughput], anomaly_output, "Anomaly outputs dont match in tadoutputArray: %s", tadoutputArray) + case "DBSCAN": + assert.Equal(result_map["DBSCAN"][throughput], anomaly_output, "Anomaly outputs dont match in tadoutputArray: %s", tadoutputArray) + } + } + } + _, err = taddeleteJob(t, data, jobName) + require.NoError(t, err) } // waitJobComplete waits for the anomaly detection Spark job completes func waitTADJobComplete(t *testing.T, data *TestData, jobName string, timeout time.Duration) error { + e2eMutex.Lock() + defer e2eMutex.Unlock() stdout := "" err := wait.PollImmediate(defaultInterval, timeout, func() (bool, error) { stdout, err := tadgetJobStatus(t, data, jobName) - require.NoError(t, err) + if err != nil { + if strings.Contains(err.Error(), "TLS handshake timeout") { + return false, nil + } + } else { + require.NoError(t, err) + } if strings.Contains(stdout, "Status of this anomaly detection job is COMPLETED") { return true, nil } @@ -252,8 +332,26 @@ func waitTADJobComplete(t *testing.T, data *TestData, jobName string, timeout ti return nil } -func tadrunJob(t *testing.T, data *TestData, algotype string) (stdout string, jobName string, err error) { +func tadrunJob(t *testing.T, data *TestData, algotype, agg_type string) (stdout string, jobName string, err error) { + e2eMutex.Lock() + defer e2eMutex.Unlock() + var agg_flow_ext, ext string newjobcmd := tadstartCmd + " --algo " + algotype + " --driver-memory 1G --start-time 2022-08-11T06:26:50 --end-time 2022-08-12T08:26:54" + switch agg_type { + case "podName": + agg_flow_ext = " --agg-flow pod" + ext = " --pod-name test_podName" + case "podLabel": + agg_flow_ext = " --agg-flow pod" + ext = " --pod-label \"test_key\":\"test_value\"" + case "external": + agg_flow_ext = fmt.Sprintf(" --agg-flow %s", agg_type) + ext = " --external-ip 10.10.1.33" + case "svc": + agg_flow_ext = fmt.Sprintf(" --agg-flow %s", agg_type) + ext = " --svc-port-name test_serviceportname" + } + newjobcmd = newjobcmd + agg_flow_ext + ext stdout, jobName, err = RunJob(t, data, newjobcmd) if err != nil { return "", "", err @@ -279,6 +377,8 @@ func tadlistJobs(t *testing.T, data *TestData) (stdout string, err error) { } func taddeleteJob(t *testing.T, data *TestData, jobName string) (stdout string, err error) { + e2eMutex.Lock() + defer e2eMutex.Unlock() cmd := fmt.Sprintf("%s %s", taddeleteCmd, jobName) stdout, err = DeleteJob(t, data, cmd) if err != nil { @@ -288,6 +388,8 @@ func taddeleteJob(t *testing.T, data *TestData, jobName string) (stdout string, } func tadretrieveJobResult(t *testing.T, data *TestData, jobName string) (stdout string, err error) { + e2eMutex.Lock() + defer e2eMutex.Unlock() cmd := fmt.Sprintf("%s %s", tadretrieveCmd, jobName) stdout, err = RetrieveJobResult(t, data, cmd) if err != nil { @@ -304,6 +406,14 @@ func addFakeRecordforTAD(t *testing.T, stmt *sql.Stmt) { destinationIP := "10.10.1.33" destinationTransportPort := 5201 protocolIndentifier := 6 + sourcePodNamespace := "test_namespace" + sourcePodName := "test_podName" + destinationPodName := "test_podName" + destinationPodNamespace := "test_namespace" + sourcePodLabels := "{\"test_key\":\"test_value\"}" + destinationPodLabels := "{\"test_key\":\"test_value\"}" + destinationServicePortName := "test_serviceportname" + flowtype := 3 throughputs := []int64{ 4007380032, 4006917952, 4004471308, 4005277827, 4005486294, @@ -344,15 +454,15 @@ func addFakeRecordforTAD(t *testing.T, stmt *sql.Stmt) { uint64(randInt(t, MaxInt32)), uint64(randInt(t, MaxInt32)), uint64(randInt(t, MaxInt32)), - fmt.Sprintf("PodName-%d", randInt(t, MaxInt32)), - fmt.Sprintf("PodNameSpace-%d", randInt(t, MaxInt32)), + sourcePodName, + sourcePodNamespace, fmt.Sprintf("NodeName-%d", randInt(t, MaxInt32)), - fmt.Sprintf("PodName-%d", randInt(t, MaxInt32)), - fmt.Sprintf("PodNameSpace-%d", randInt(t, MaxInt32)), + destinationPodName, + destinationPodNamespace, fmt.Sprintf("NodeName-%d", randInt(t, MaxInt32)), getRandIP(t), uint16(randInt(t, 65535)), - fmt.Sprintf("ServicePortName-%d", randInt(t, MaxInt32)), + destinationServicePortName, fmt.Sprintf("PolicyName-%d", randInt(t, MaxInt32)), fmt.Sprintf("PolicyNameSpace-%d", randInt(t, MaxInt32)), fmt.Sprintf("PolicyRuleName-%d", randInt(t, MaxInt32)), @@ -364,9 +474,9 @@ func addFakeRecordforTAD(t *testing.T, stmt *sql.Stmt) { 1, 1, "tcpState", - 0, - fmt.Sprintf("PodLabels-%d", randInt(t, MaxInt32)), - fmt.Sprintf("PodLabels-%d", randInt(t, MaxInt32)), + flowtype, + sourcePodLabels, + destinationPodLabels, uint64(throughput), uint64(randInt(t, MaxInt32)), uint64(randInt(t, MaxInt32)), @@ -419,9 +529,9 @@ func populateFlowTable(t *testing.T, connect *sql.DB) { } func testTADCleanAfterTheiaMgrResync(t *testing.T, data *TestData) { - _, jobName1, err := tadrunJob(t, data, "ARIMA") + _, jobName1, err := tadrunJob(t, data, "ARIMA", "None") require.NoError(t, err) - _, jobName2, err := tadrunJob(t, data, "ARIMA") + _, jobName2, err := tadrunJob(t, data, "ARIMA", "None") require.NoError(t, err) err = TheiaManagerRestart(t, data, jobName1, "tad")