Skip to content

Commit

Permalink
Enhance e2e test (#515)
Browse files Browse the repository at this point in the history
1. Delete uploaded images only when test-e2e-multicluster and other e2e tests pass.
2. Update image url in test-e2e-kind.sh.
3. After the status of  Flow Aggregator becomes ready, we check the log of Flow Aggregator to make sure it successfully connects to ClickHouse.

Signed-off-by: Yun-Tang Hsu <[email protected]>
  • Loading branch information
yuntanghsu authored Oct 19, 2023
1 parent f974b32 commit 25de070
Show file tree
Hide file tree
Showing 5 changed files with 57 additions and 47 deletions.
1 change: 1 addition & 0 deletions .github/workflows/kind.yml
Original file line number Diff line number Diff line change
Expand Up @@ -402,6 +402,7 @@ jobs:
- build-clickhouse-server-image
- build-theia-manager-image
- test-e2e-encap
- test-e2e-multicluster
- test-upgrade-from-N-1
- test-ClickHouse-migrate-from-N-1
runs-on: [ubuntu-latest]
Expand Down
14 changes: 1 addition & 13 deletions test/e2e/flowvisibility_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ const (

var (
// Single iperf run results in two connections with separate ports (control connection and actual data connection).
// As 2s is the export active timeout of flow exporter and iperf traffic runs for 12s, we expect totally 12 records
// As 2s is the export active timeout of flow exporter and iperf traffic runs for 12s, we expect totally 6 records
// exporting to the flow aggregator at time 2s, 4s, 6s, 8s, 10s, and 12s after iperf traffic begins.
// Since flow aggregator will aggregate records based on 5-tuple connection key and active timeout is 3.5 seconds,
// we expect 3 records at time 5.5s, 9s, and 12.5s after iperf traffic begins.
Expand Down Expand Up @@ -178,18 +178,6 @@ func testHelper(t *testing.T, data *TestData, podAIPs, podBIPs, podCIPs, podDIPs
failOnError(fmt.Errorf("error when creating perftest Services: %v", err), t, data)
}

// FlowAggregator takes up to 20s to set up the initial connection with
// ClickHouse DB. Most of the wait happens when looking up for ClickHouse
// on kube-dns. During the wait, FlowAggregator Pod is also likely to crash
// and restart once due to ping timeout. It usually does not introduce too
// much issue in real-life use cases, but will fail the e2e tests, as we
// start the iPerf traffic right away and we check whether ClickHouse
// receive every record. Here we add a workaround to wait 30s before
// starting sending iPerf traffic to avoid missing the first a few records.
// We should consider support a method to check the connection setup status
// later on, e.g. a new Antctl command.
time.Sleep(30 * time.Second)

// IntraNodeFlows tests the case, where Pods are deployed on same Node
// and their flow information is exported as IPFIX flow records.
// K8s network policies are being tested here.
Expand Down
67 changes: 41 additions & 26 deletions test/e2e/framework.go
Original file line number Diff line number Diff line change
Expand Up @@ -1066,18 +1066,28 @@ func (data *TestData) createBusyboxPodOnNode(name string, ns string, nodeName st
}

// GetFlowAggregator retrieves the name of the Flow-Aggregator Pod (flow-aggregator-*) running on a specific Node.
func (data *TestData) GetFlowAggregator() (*corev1.Pod, error) {
func (data *TestData) GetFlowAggregator() (string, error) {
listOptions := metav1.ListOptions{
LabelSelector: "app=flow-aggregator",
}
pods, err := data.clientset.CoreV1().Pods(flowAggregatorNamespace).List(context.TODO(), listOptions)
if err != nil {
return nil, fmt.Errorf("failed to list Flow Aggregator Pod: %v", err)
}
if len(pods.Items) != 1 {
return nil, fmt.Errorf("expected *exactly* one Pod")
var pod *corev1.Pod
if err := wait.Poll(defaultInterval, defaultTimeout, func() (bool, error) {
pods, err := data.clientset.CoreV1().Pods(flowAggregatorNamespace).List(context.TODO(), listOptions)
if err != nil {
return false, fmt.Errorf("failed to list Flow Aggregator Pod: %v", err)
}
if len(pods.Items) != 1 {
return false, nil
}
pod = &pods.Items[0]
return true, nil
}); err != nil {
if err == wait.ErrWaitTimeout {
return "", fmt.Errorf("failed to get *exactly* one Pod in %s", defaultTimeout.String())
}
return "", err
}
return &pods.Items[0], nil
return pod.Name, nil
}

// RunCommandFromPod Run the provided command in the specified Container for the give Pod and returns the contents of
Expand Down Expand Up @@ -1429,30 +1439,35 @@ func (data *TestData) deployFlowAggregator() error {
if err != nil || rc != 0 {
return fmt.Errorf("error when deploying the Flow Aggregator; %s not available on the control-plane Node", flowAggYaml)
}

if rc, _, _, err = data.provider.RunCommandOnNode(controlPlaneNodeName(), fmt.Sprintf("kubectl -n %s rollout status deployment/%s --timeout=%v", flowAggregatorNamespace, flowAggregatorDeployment, 2*defaultTimeout)); err != nil || rc != 0 {
_, stdout, _, _ := data.provider.RunCommandOnNode(controlPlaneNodeName(), fmt.Sprintf("kubectl -n %s describe pod", flowAggregatorNamespace))
_, logStdout, _, _ := data.provider.RunCommandOnNode(controlPlaneNodeName(), fmt.Sprintf("kubectl -n %s logs -l app=flow-aggregator", flowAggregatorNamespace))
return fmt.Errorf("error when waiting for the Flow Aggregator rollout to complete. kubectl describe output: %s, logs: %s", stdout, logStdout)
}
// Check for flow-aggregator pod running again for db connection establishment
flowAggPod, err := data.GetFlowAggregator()
// Check for flow-aggregator pod running and for db connection establishment.
flowAggPodName, err := data.GetFlowAggregator()
if err != nil {
return fmt.Errorf("error when getting flow-aggregator Pod: %v", err)
}
podName := flowAggPod.Name
_, err = data.PodWaitFor(defaultTimeout*2, podName, flowAggregatorNamespace, func(p *corev1.Pod) (bool, error) {
for _, condition := range p.Status.Conditions {
if condition.Type == corev1.PodReady {
return condition.Status == corev1.ConditionTrue, nil
}
}
return false, nil
})
err = data.PodWaitForReady(2*defaultTimeout, flowAggPodName, flowAggregatorNamespace)
if err != nil {
_, stdout, stderr, podErr := data.provider.RunCommandOnNode(controlPlaneNodeName(), fmt.Sprintf("kubectl get pod %s -n %s -o yaml", podName, flowAggregatorNamespace))
_, stdout, stderr, podErr := data.provider.RunCommandOnNode(controlPlaneNodeName(), fmt.Sprintf("kubectl get pod %s -n %s -o yaml", flowAggPodName, flowAggregatorNamespace))
return fmt.Errorf("error when waiting for flow-aggregator Ready: %v; stdout %s, stderr: %s, %v", err, stdout, stderr, podErr)
}
var errLog error
// Check log of flow-aggregator pod to make sure ClickHouse is enabled and connected to flow-aggregator
if err := wait.Poll(defaultInterval, defaultTimeout, func() (bool, error) {
logString, err := data.GetPodLogs(flowAggregatorNamespace, flowAggPodName, &corev1.PodLogOptions{})
if err != nil {
errLog = fmt.Errorf("error when getting Flow Aggregaotr logs: %v", err)
return false, nil
}
if strings.Contains(logString, "error when creating ClickHouse export process") {
errLog = fmt.Errorf("error when creating ClickHouse export process")
return false, nil
}
if strings.Contains(logString, "Starting ClickHouse exporting process") {
return true, nil
}
return false, nil
}); err != nil {
return fmt.Errorf("error when checking log for Flow Aggregator: %v", errLog)
}
return nil
}

Expand Down
20 changes: 13 additions & 7 deletions test/e2e_mc/framework.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,11 @@ func (data *MCTestData) deployClickHouse(td *e2e.TestData) (string, int32, error
func (data *MCTestData) deployFlowAggregator(td *e2e.TestData, databaseURL string, security bool) error {

flowAggYaml := flowAggregatorYML
rc, _, _, err := td.RunCommandOnNode(data.controlPlaneNames[td.GetClusterName()], fmt.Sprintf("kubectl apply -f %s", flowAggYaml))
controlPlaneName, ok := data.controlPlaneNames[td.GetClusterName()]
if !ok {
return fmt.Errorf("cannot find the name of control plane Node")
}
rc, _, _, err := td.RunCommandOnNode(controlPlaneName, fmt.Sprintf("kubectl apply -f %s", flowAggYaml))
if err != nil || rc != 0 {
return fmt.Errorf("error when deploying the Flow Aggregator; %s not available on the control-plane Node", flowAggYaml)
}
Expand Down Expand Up @@ -268,18 +272,20 @@ func (data *MCTestData) deployFlowAggregator(td *e2e.TestData, databaseURL strin
if err = td.MutateFlowAggregatorConfigMap(databaseURL, security); err != nil {
return err
}
if rc, _, _, err = td.RunCommandOnNode(data.controlPlaneNames[td.GetClusterName()], fmt.Sprintf("kubectl -n %s rollout status deployment/%s --timeout=%v", flowAggregatorNamespace, flowAggregatorDeployment, 2*defaultTimeout)); err != nil || rc != 0 {
_, stdout, _, _ := td.RunCommandOnNode(data.controlPlaneNames[td.GetClusterName()], fmt.Sprintf("kubectl -n %s describe pod", flowAggregatorNamespace))
_, logStdout, _, _ := td.RunCommandOnNode(data.controlPlaneNames[td.GetClusterName()], fmt.Sprintf("kubectl -n %s logs -l app=flow-aggregator", flowAggregatorNamespace))
if rc, _, _, err = td.RunCommandOnNode(controlPlaneName, fmt.Sprintf("kubectl -n %s rollout status deployment/%s --timeout=%v", flowAggregatorNamespace, flowAggregatorDeployment, 2*defaultTimeout)); err != nil || rc != 0 {
_, stdout, _, _ := td.RunCommandOnNode(controlPlaneName, fmt.Sprintf("kubectl -n %s describe pod", flowAggregatorNamespace))
_, logStdout, _, _ := td.RunCommandOnNode(controlPlaneName, fmt.Sprintf("kubectl -n %s logs -l app=flow-aggregator", flowAggregatorNamespace))
return fmt.Errorf("error when waiting for the Flow Aggregator rollout to complete. kubectl describe output: %s, logs: %s", stdout, logStdout)
}
// Check for flow-aggregator Pod running again for db connection establishment
flowAggPod, err := td.GetFlowAggregator()
flowAggPodName, err := td.GetFlowAggregator()
if err != nil {
return fmt.Errorf("error when getting flow-aggregator Pod: %v", err)
}
if err = td.PodWaitForReady(2*defaultTimeout, flowAggPod.Name, flowAggregatorNamespace); err != nil {
return err
err = td.PodWaitForReady(2*defaultTimeout, flowAggPodName, flowAggregatorNamespace)
if err != nil {
_, stdout, stderr, podErr := td.RunCommandOnNode(controlPlaneName, fmt.Sprintf("kubectl get pod %s -n %s -o yaml", flowAggPodName, flowAggregatorNamespace))
return fmt.Errorf("error when waiting for flow-aggregator Ready: %v; stdout %s, stderr: %s, %v", err, stdout, stderr, podErr)
}
return nil
}
Expand Down
2 changes: 1 addition & 1 deletion test/e2e_mc/multicluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func TestMultiCluster(t *testing.T) {
t.Logf("Verifying records in ClickHouse")
require.GreaterOrEqualf(t, len(clickHouseRecordsW), expectedNumDataRecords, "ClickHouse should receive expected number of flow records. Considered records: %v", clickHouseRecordsW)
t.Logf("Verifying cluster UUID in East/West cluster are different")
require.NotEqualf(t, clickHouseRecordsE[0].ClusterUUID, clickHouseRecordsW[0].ClusterUUID, "ClusterUUID for EAST/WEST cluster should be different.\n Records of EAST cluster: %v\nRecords of EAST cluster: %v", clickHouseRecordsE, clickHouseRecordsW)
require.NotEqualf(t, clickHouseRecordsE[0].ClusterUUID, clickHouseRecordsW[0].ClusterUUID, "ClusterUUID for EAST/WEST cluster should be different.\n EAST ClusterUUID: %s\nWEST ClusterUUID: %s", clickHouseRecordsE[0].ClusterUUID, clickHouseRecordsW[0].ClusterUUID)
}

func createPerftestPods(data *MCTestData) (podAIPs *e2e.PodIPs, podBIPs *e2e.PodIPs, podCIPs *e2e.PodIPs, podDIPs *e2e.PodIPs, err error) {
Expand Down

0 comments on commit 25de070

Please sign in to comment.