From 25de070d425ef2c89fa894ef6bdad774a0104f44 Mon Sep 17 00:00:00 2001 From: Yun-Tang Hsu <59460118+yuntanghsu@users.noreply.github.com> Date: Thu, 19 Oct 2023 13:18:56 -0700 Subject: [PATCH] Enhance e2e test (#515) 1. Delete uploaded images only when test-e2e-multicluster and other e2e tests pass. 2. Update image url in test-e2e-kind.sh. 3. After the status of Flow Aggregator becomes ready, we check the log of Flow Aggregator to make sure it successfully connects to ClickHouse. Signed-off-by: Yun-Tang Hsu --- .github/workflows/kind.yml | 1 + test/e2e/flowvisibility_test.go | 14 +------ test/e2e/framework.go | 67 +++++++++++++++++++------------- test/e2e_mc/framework.go | 20 ++++++---- test/e2e_mc/multicluster_test.go | 2 +- 5 files changed, 57 insertions(+), 47 deletions(-) diff --git a/.github/workflows/kind.yml b/.github/workflows/kind.yml index 7a03d9ec..976b4c8b 100644 --- a/.github/workflows/kind.yml +++ b/.github/workflows/kind.yml @@ -402,6 +402,7 @@ jobs: - build-clickhouse-server-image - build-theia-manager-image - test-e2e-encap + - test-e2e-multicluster - test-upgrade-from-N-1 - test-ClickHouse-migrate-from-N-1 runs-on: [ubuntu-latest] diff --git a/test/e2e/flowvisibility_test.go b/test/e2e/flowvisibility_test.go index 4f063c39..65f47fa9 100644 --- a/test/e2e/flowvisibility_test.go +++ b/test/e2e/flowvisibility_test.go @@ -117,7 +117,7 @@ const ( var ( // Single iperf run results in two connections with separate ports (control connection and actual data connection). - // As 2s is the export active timeout of flow exporter and iperf traffic runs for 12s, we expect totally 12 records + // As 2s is the export active timeout of flow exporter and iperf traffic runs for 12s, we expect totally 6 records // exporting to the flow aggregator at time 2s, 4s, 6s, 8s, 10s, and 12s after iperf traffic begins. // Since flow aggregator will aggregate records based on 5-tuple connection key and active timeout is 3.5 seconds, // we expect 3 records at time 5.5s, 9s, and 12.5s after iperf traffic begins. @@ -178,18 +178,6 @@ func testHelper(t *testing.T, data *TestData, podAIPs, podBIPs, podCIPs, podDIPs failOnError(fmt.Errorf("error when creating perftest Services: %v", err), t, data) } - // FlowAggregator takes up to 20s to set up the initial connection with - // ClickHouse DB. Most of the wait happens when looking up for ClickHouse - // on kube-dns. During the wait, FlowAggregator Pod is also likely to crash - // and restart once due to ping timeout. It usually does not introduce too - // much issue in real-life use cases, but will fail the e2e tests, as we - // start the iPerf traffic right away and we check whether ClickHouse - // receive every record. Here we add a workaround to wait 30s before - // starting sending iPerf traffic to avoid missing the first a few records. - // We should consider support a method to check the connection setup status - // later on, e.g. a new Antctl command. - time.Sleep(30 * time.Second) - // IntraNodeFlows tests the case, where Pods are deployed on same Node // and their flow information is exported as IPFIX flow records. // K8s network policies are being tested here. diff --git a/test/e2e/framework.go b/test/e2e/framework.go index c0f9b705..edc1a32a 100644 --- a/test/e2e/framework.go +++ b/test/e2e/framework.go @@ -1066,18 +1066,28 @@ func (data *TestData) createBusyboxPodOnNode(name string, ns string, nodeName st } // GetFlowAggregator retrieves the name of the Flow-Aggregator Pod (flow-aggregator-*) running on a specific Node. -func (data *TestData) GetFlowAggregator() (*corev1.Pod, error) { +func (data *TestData) GetFlowAggregator() (string, error) { listOptions := metav1.ListOptions{ LabelSelector: "app=flow-aggregator", } - pods, err := data.clientset.CoreV1().Pods(flowAggregatorNamespace).List(context.TODO(), listOptions) - if err != nil { - return nil, fmt.Errorf("failed to list Flow Aggregator Pod: %v", err) - } - if len(pods.Items) != 1 { - return nil, fmt.Errorf("expected *exactly* one Pod") + var pod *corev1.Pod + if err := wait.Poll(defaultInterval, defaultTimeout, func() (bool, error) { + pods, err := data.clientset.CoreV1().Pods(flowAggregatorNamespace).List(context.TODO(), listOptions) + if err != nil { + return false, fmt.Errorf("failed to list Flow Aggregator Pod: %v", err) + } + if len(pods.Items) != 1 { + return false, nil + } + pod = &pods.Items[0] + return true, nil + }); err != nil { + if err == wait.ErrWaitTimeout { + return "", fmt.Errorf("failed to get *exactly* one Pod in %s", defaultTimeout.String()) + } + return "", err } - return &pods.Items[0], nil + return pod.Name, nil } // RunCommandFromPod Run the provided command in the specified Container for the give Pod and returns the contents of @@ -1429,30 +1439,35 @@ func (data *TestData) deployFlowAggregator() error { if err != nil || rc != 0 { return fmt.Errorf("error when deploying the Flow Aggregator; %s not available on the control-plane Node", flowAggYaml) } - - if rc, _, _, err = data.provider.RunCommandOnNode(controlPlaneNodeName(), fmt.Sprintf("kubectl -n %s rollout status deployment/%s --timeout=%v", flowAggregatorNamespace, flowAggregatorDeployment, 2*defaultTimeout)); err != nil || rc != 0 { - _, stdout, _, _ := data.provider.RunCommandOnNode(controlPlaneNodeName(), fmt.Sprintf("kubectl -n %s describe pod", flowAggregatorNamespace)) - _, logStdout, _, _ := data.provider.RunCommandOnNode(controlPlaneNodeName(), fmt.Sprintf("kubectl -n %s logs -l app=flow-aggregator", flowAggregatorNamespace)) - return fmt.Errorf("error when waiting for the Flow Aggregator rollout to complete. kubectl describe output: %s, logs: %s", stdout, logStdout) - } - // Check for flow-aggregator pod running again for db connection establishment - flowAggPod, err := data.GetFlowAggregator() + // Check for flow-aggregator pod running and for db connection establishment. + flowAggPodName, err := data.GetFlowAggregator() if err != nil { return fmt.Errorf("error when getting flow-aggregator Pod: %v", err) } - podName := flowAggPod.Name - _, err = data.PodWaitFor(defaultTimeout*2, podName, flowAggregatorNamespace, func(p *corev1.Pod) (bool, error) { - for _, condition := range p.Status.Conditions { - if condition.Type == corev1.PodReady { - return condition.Status == corev1.ConditionTrue, nil - } - } - return false, nil - }) + err = data.PodWaitForReady(2*defaultTimeout, flowAggPodName, flowAggregatorNamespace) if err != nil { - _, stdout, stderr, podErr := data.provider.RunCommandOnNode(controlPlaneNodeName(), fmt.Sprintf("kubectl get pod %s -n %s -o yaml", podName, flowAggregatorNamespace)) + _, stdout, stderr, podErr := data.provider.RunCommandOnNode(controlPlaneNodeName(), fmt.Sprintf("kubectl get pod %s -n %s -o yaml", flowAggPodName, flowAggregatorNamespace)) return fmt.Errorf("error when waiting for flow-aggregator Ready: %v; stdout %s, stderr: %s, %v", err, stdout, stderr, podErr) } + var errLog error + // Check log of flow-aggregator pod to make sure ClickHouse is enabled and connected to flow-aggregator + if err := wait.Poll(defaultInterval, defaultTimeout, func() (bool, error) { + logString, err := data.GetPodLogs(flowAggregatorNamespace, flowAggPodName, &corev1.PodLogOptions{}) + if err != nil { + errLog = fmt.Errorf("error when getting Flow Aggregaotr logs: %v", err) + return false, nil + } + if strings.Contains(logString, "error when creating ClickHouse export process") { + errLog = fmt.Errorf("error when creating ClickHouse export process") + return false, nil + } + if strings.Contains(logString, "Starting ClickHouse exporting process") { + return true, nil + } + return false, nil + }); err != nil { + return fmt.Errorf("error when checking log for Flow Aggregator: %v", errLog) + } return nil } diff --git a/test/e2e_mc/framework.go b/test/e2e_mc/framework.go index 1d04de96..114dafe0 100644 --- a/test/e2e_mc/framework.go +++ b/test/e2e_mc/framework.go @@ -239,7 +239,11 @@ func (data *MCTestData) deployClickHouse(td *e2e.TestData) (string, int32, error func (data *MCTestData) deployFlowAggregator(td *e2e.TestData, databaseURL string, security bool) error { flowAggYaml := flowAggregatorYML - rc, _, _, err := td.RunCommandOnNode(data.controlPlaneNames[td.GetClusterName()], fmt.Sprintf("kubectl apply -f %s", flowAggYaml)) + controlPlaneName, ok := data.controlPlaneNames[td.GetClusterName()] + if !ok { + return fmt.Errorf("cannot find the name of control plane Node") + } + rc, _, _, err := td.RunCommandOnNode(controlPlaneName, fmt.Sprintf("kubectl apply -f %s", flowAggYaml)) if err != nil || rc != 0 { return fmt.Errorf("error when deploying the Flow Aggregator; %s not available on the control-plane Node", flowAggYaml) } @@ -268,18 +272,20 @@ func (data *MCTestData) deployFlowAggregator(td *e2e.TestData, databaseURL strin if err = td.MutateFlowAggregatorConfigMap(databaseURL, security); err != nil { return err } - if rc, _, _, err = td.RunCommandOnNode(data.controlPlaneNames[td.GetClusterName()], fmt.Sprintf("kubectl -n %s rollout status deployment/%s --timeout=%v", flowAggregatorNamespace, flowAggregatorDeployment, 2*defaultTimeout)); err != nil || rc != 0 { - _, stdout, _, _ := td.RunCommandOnNode(data.controlPlaneNames[td.GetClusterName()], fmt.Sprintf("kubectl -n %s describe pod", flowAggregatorNamespace)) - _, logStdout, _, _ := td.RunCommandOnNode(data.controlPlaneNames[td.GetClusterName()], fmt.Sprintf("kubectl -n %s logs -l app=flow-aggregator", flowAggregatorNamespace)) + if rc, _, _, err = td.RunCommandOnNode(controlPlaneName, fmt.Sprintf("kubectl -n %s rollout status deployment/%s --timeout=%v", flowAggregatorNamespace, flowAggregatorDeployment, 2*defaultTimeout)); err != nil || rc != 0 { + _, stdout, _, _ := td.RunCommandOnNode(controlPlaneName, fmt.Sprintf("kubectl -n %s describe pod", flowAggregatorNamespace)) + _, logStdout, _, _ := td.RunCommandOnNode(controlPlaneName, fmt.Sprintf("kubectl -n %s logs -l app=flow-aggregator", flowAggregatorNamespace)) return fmt.Errorf("error when waiting for the Flow Aggregator rollout to complete. kubectl describe output: %s, logs: %s", stdout, logStdout) } // Check for flow-aggregator Pod running again for db connection establishment - flowAggPod, err := td.GetFlowAggregator() + flowAggPodName, err := td.GetFlowAggregator() if err != nil { return fmt.Errorf("error when getting flow-aggregator Pod: %v", err) } - if err = td.PodWaitForReady(2*defaultTimeout, flowAggPod.Name, flowAggregatorNamespace); err != nil { - return err + err = td.PodWaitForReady(2*defaultTimeout, flowAggPodName, flowAggregatorNamespace) + if err != nil { + _, stdout, stderr, podErr := td.RunCommandOnNode(controlPlaneName, fmt.Sprintf("kubectl get pod %s -n %s -o yaml", flowAggPodName, flowAggregatorNamespace)) + return fmt.Errorf("error when waiting for flow-aggregator Ready: %v; stdout %s, stderr: %s, %v", err, stdout, stderr, podErr) } return nil } diff --git a/test/e2e_mc/multicluster_test.go b/test/e2e_mc/multicluster_test.go index de5d781a..02f04f75 100644 --- a/test/e2e_mc/multicluster_test.go +++ b/test/e2e_mc/multicluster_test.go @@ -84,7 +84,7 @@ func TestMultiCluster(t *testing.T) { t.Logf("Verifying records in ClickHouse") require.GreaterOrEqualf(t, len(clickHouseRecordsW), expectedNumDataRecords, "ClickHouse should receive expected number of flow records. Considered records: %v", clickHouseRecordsW) t.Logf("Verifying cluster UUID in East/West cluster are different") - require.NotEqualf(t, clickHouseRecordsE[0].ClusterUUID, clickHouseRecordsW[0].ClusterUUID, "ClusterUUID for EAST/WEST cluster should be different.\n Records of EAST cluster: %v\nRecords of EAST cluster: %v", clickHouseRecordsE, clickHouseRecordsW) + require.NotEqualf(t, clickHouseRecordsE[0].ClusterUUID, clickHouseRecordsW[0].ClusterUUID, "ClusterUUID for EAST/WEST cluster should be different.\n EAST ClusterUUID: %s\nWEST ClusterUUID: %s", clickHouseRecordsE[0].ClusterUUID, clickHouseRecordsW[0].ClusterUUID) } func createPerftestPods(data *MCTestData) (podAIPs *e2e.PodIPs, podBIPs *e2e.PodIPs, podCIPs *e2e.PodIPs, podDIPs *e2e.PodIPs, err error) {