diff --git a/ci/kind/test-e2e-kind.sh b/ci/kind/test-e2e-kind.sh index 43301936b6a..19f9913e753 100755 --- a/ci/kind/test-e2e-kind.sh +++ b/ci/kind/test-e2e-kind.sh @@ -201,7 +201,7 @@ COMMON_IMAGES_LIST=("registry.k8s.io/e2e-test-images/agnhost:2.29" \ "projects.registry.vmware.com/antrea/nginx:1.21.6-alpine" \ "projects.registry.vmware.com/antrea/toolbox:1.1-0") -FLOW_VISIBILITY_IMAGE_LIST=("projects.registry.vmware.com/antrea/ipfix-collector:v0.6.2" \ +FLOW_VISIBILITY_IMAGE_LIST=("projects.registry.vmware.com/antrea/ipfix-collector:v0.8.2" \ "projects.registry.vmware.com/antrea/clickhouse-operator:0.21.0" \ "projects.registry.vmware.com/antrea/metrics-exporter:0.21.0" \ "projects.registry.vmware.com/antrea/clickhouse-server:23.4") diff --git a/go.mod b/go.mod index b9c66a4b6a8..822deafe786 100644 --- a/go.mod +++ b/go.mod @@ -50,16 +50,16 @@ require ( github.com/spf13/pflag v1.0.5 github.com/stretchr/testify v1.8.4 github.com/ti-mo/conntrack v0.5.0 - github.com/vishvananda/netlink v1.1.1-0.20211101163509-b10eb8fe5cf6 - github.com/vmware/go-ipfix v0.7.0 - go.uber.org/mock v0.3.0 - golang.org/x/crypto v0.14.0 - golang.org/x/mod v0.13.0 - golang.org/x/net v0.17.0 - golang.org/x/sync v0.4.0 - golang.org/x/sys v0.13.0 - golang.org/x/time v0.3.0 - golang.org/x/tools v0.14.0 + github.com/vishvananda/netlink v1.2.1-beta.2 + github.com/vmware/go-ipfix v0.8.2 + go.uber.org/mock v0.4.0 + golang.org/x/crypto v0.17.0 + golang.org/x/mod v0.14.0 + golang.org/x/net v0.19.0 + golang.org/x/sync v0.5.0 + golang.org/x/sys v0.15.0 + golang.org/x/time v0.5.0 + golang.org/x/tools v0.16.1 golang.zx2c4.com/wireguard/wgctrl v0.0.0-20210506160403-92e472f520a5 google.golang.org/grpc v1.59.0 google.golang.org/protobuf v1.31.0 diff --git a/go.sum b/go.sum index 71db6d980cb..9f4662c5058 100644 --- a/go.sum +++ b/go.sum @@ -1117,13 +1117,8 @@ github.com/vishvananda/netns v0.0.0-20191106174202-0a2b9b5464df/go.mod h1:JP3t17 github.com/vishvananda/netns v0.0.0-20200728191858-db3c7e526aae/go.mod h1:DD4vA1DwXk04H54A1oHXtwZmA0grkVMdPxx/VGLCah0= github.com/vishvananda/netns v0.0.4 h1:Oeaw1EM2JMxD51g9uhtC0D7erkIjgmj8+JZc26m1YX8= github.com/vishvananda/netns v0.0.4/go.mod h1:SpkAiCQRtJ6TvvxPnOSyH3BMl6unz3xZlaprSwhNNJM= -github.com/vmware/go-ipfix v0.7.0 h1:7dOth2p5eL01GKzyXg2sibJcD9Fhb8KeLrn/ysctiwE= -github.com/vmware/go-ipfix v0.7.0/go.mod h1:Y3YKMFN/Nec6QwmXcDae+uy6xuDgbejwRAZv9RTzS9c= -github.com/willf/bitset v1.1.11-0.20200630133818-d5bec3311243/go.mod h1:RjeCKbqT1RxIR/KWY6phxZiaY1IyutSBfGjNPySAYV4= -github.com/willf/bitset v1.1.11/go.mod h1:83CECat5yLh5zVOf4P1ErAgKA5UDvKtgyUABdr3+MjI= -github.com/xeipuuv/gojsonpointer v0.0.0-20180127040702-4e3ac2762d5f/go.mod h1:N2zxlSyiKSe5eX1tZViRH5QA0qijqEDrYZiPEAiq3wU= -github.com/xeipuuv/gojsonreference v0.0.0-20180127040603-bd5ef7bd5415/go.mod h1:GwrjFmJcFw6At/Gs6z4yjiIwzuJ1/+UwLxMQDVQXShQ= -github.com/xeipuuv/gojsonschema v0.0.0-20180618132009-1d523034197f/go.mod h1:5yf86TLmAcydyeJq5YvxkGPE2fm/u4myDekKRoLuqhs= +github.com/vmware/go-ipfix v0.8.2 h1:7pnmXZpI0995psJgno4Bur5fr9PCxGQuKjCI/RYurzA= +github.com/vmware/go-ipfix v0.8.2/go.mod h1:NvEehcpptPOTBaLSkMA+88l2Oe8YNelVBdvj8PA/1d0= github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2 h1:eY9dn8+vbi4tKz5Qo6v2eYzo7kUS51QINcR5jNpbZS8= github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2/go.mod h1:UETIi67q53MR2AWcXfiuqkDkRtnGDLqkBTpCHuJHxtU= github.com/xlab/treeprint v1.1.0 h1:G/1DjNkPpfZCFt9CSh6b5/nY4VimlbHF3Rh4obvtzDk= diff --git a/pkg/agent/flowexporter/exporter/exporter.go b/pkg/agent/flowexporter/exporter/exporter.go index df99ce56e44..e070b655a30 100644 --- a/pkg/agent/flowexporter/exporter/exporter.go +++ b/pkg/agent/flowexporter/exporter/exporter.go @@ -260,8 +260,14 @@ func (exp *FlowExporter) Run(stopCh <-chan struct{}) { func (exp *FlowExporter) sendFlowRecords() (time.Duration, error) { currTime := time.Now() var expireTime1, expireTime2 time.Duration - exp.expiredConns, expireTime1 = exp.conntrackConnStore.GetExpiredConns(exp.expiredConns, currTime, maxConnsToExport) + // We export records from denyConnStore first, then conntrackConnStore. We enforce the ordering to handle a + // special case: for an inter-node connection with egress drop network policy, both conntrackConnStore and + // denyConnStore from the same Node will send out records to Flow Aggregator. If the record from conntrackConnStore + // arrives FA first, FA will not be able to capture the deny network policy metadata, and it will keep waiting + // for a record from destination Node to finish flow correlation until timeout. Later on we probably should + // consider doing a record deduplication between conntrackConnStore and denyConnStore before exporting records. exp.expiredConns, expireTime2 = exp.denyConnStore.GetExpiredConns(exp.expiredConns, currTime, maxConnsToExport) + exp.expiredConns, expireTime1 = exp.conntrackConnStore.GetExpiredConns(exp.expiredConns, currTime, maxConnsToExport) // Select the shorter time out among two connection stores to do the next round of export. nextExpireTime := getMinTime(expireTime1, expireTime2) for i := range exp.expiredConns { diff --git a/pkg/antctl/transform/common/transform.go b/pkg/antctl/transform/common/transform.go index bf0a5df5acc..9f9a1dd7e4c 100644 --- a/pkg/antctl/transform/common/transform.go +++ b/pkg/antctl/transform/common/transform.go @@ -57,6 +57,10 @@ func Int64ToString(val int64) string { return strconv.Itoa(int(val)) } +func BoolToString(val bool) string { + return strconv.FormatBool(val) +} + func GenerateTableElementWithSummary(list []string, maxColumnLength int) string { element := "" sort.Strings(list) diff --git a/pkg/flowaggregator/apiserver/handlers/recordmetrics/handler.go b/pkg/flowaggregator/apiserver/handlers/recordmetrics/handler.go index 960de52d3d9..b41448f08a0 100644 --- a/pkg/flowaggregator/apiserver/handlers/recordmetrics/handler.go +++ b/pkg/flowaggregator/apiserver/handlers/recordmetrics/handler.go @@ -26,10 +26,14 @@ import ( // Response is the response struct of recordmetrics command. type Response struct { - NumRecordsExported int64 `json:"numRecordsExported,omitempty"` - NumRecordsReceived int64 `json:"numRecordsReceived,omitempty"` - NumFlows int64 `json:"numFlows,omitempty"` - NumConnToCollector int64 `json:"numConnToCollector,omitempty"` + NumRecordsExported int64 `json:"numRecordsExported,omitempty"` + NumRecordsReceived int64 `json:"numRecordsReceived,omitempty"` + NumFlows int64 `json:"numFlows,omitempty"` + NumConnToCollector int64 `json:"numConnToCollector,omitempty"` + WithClickHouseExporter bool `json:"withClickHouseExporter,omitempty"` + WithS3Exporter bool `json:"withS3Exporter,omitempty"` + WithLogExporter bool `json:"withLogExporter,omitempty"` + WithIPFIXExporter bool `json:"withIPFIXExporter,omitempty"` } // HandleFunc returns the function which can handle the /recordmetrics API request. @@ -37,10 +41,14 @@ func HandleFunc(faq querier.FlowAggregatorQuerier) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { metrics := faq.GetRecordMetrics() metricsResponse := Response{ - NumRecordsExported: metrics.NumRecordsExported, - NumRecordsReceived: metrics.NumRecordsReceived, - NumFlows: metrics.NumFlows, - NumConnToCollector: metrics.NumConnToCollector, + NumRecordsExported: metrics.NumRecordsExported, + NumRecordsReceived: metrics.NumRecordsReceived, + NumFlows: metrics.NumFlows, + NumConnToCollector: metrics.NumConnToCollector, + WithClickHouseExporter: metrics.WithClickHouseExporter, + WithS3Exporter: metrics.WithS3Exporter, + WithLogExporter: metrics.WithLogExporter, + WithIPFIXExporter: metrics.WithIPFIXExporter, } err := json.NewEncoder(w).Encode(metricsResponse) if err != nil { @@ -51,7 +59,7 @@ func HandleFunc(faq querier.FlowAggregatorQuerier) http.HandlerFunc { } func (r Response) GetTableHeader() []string { - return []string{"RECORDS-EXPORTED", "RECORDS-RECEIVED", "FLOWS", "EXPORTERS-CONNECTED"} + return []string{"RECORDS-EXPORTED", "RECORDS-RECEIVED", "FLOWS", "EXPORTERS-CONNECTED", "CLICKHOUSE-EXPORTER", "S3-EXPORTER", "LOG-EXPORTER", "IPFIX-EXPORTER"} } func (r Response) GetTableRow(maxColumnLength int) []string { @@ -60,6 +68,10 @@ func (r Response) GetTableRow(maxColumnLength int) []string { common.Int64ToString(r.NumRecordsReceived), common.Int64ToString(r.NumFlows), common.Int64ToString(r.NumConnToCollector), + common.BoolToString(r.WithClickHouseExporter), + common.BoolToString(r.WithS3Exporter), + common.BoolToString(r.WithLogExporter), + common.BoolToString(r.WithIPFIXExporter), } } diff --git a/pkg/flowaggregator/apiserver/handlers/recordmetrics/handler_test.go b/pkg/flowaggregator/apiserver/handlers/recordmetrics/handler_test.go index eb1af81495f..45a699e7fa1 100644 --- a/pkg/flowaggregator/apiserver/handlers/recordmetrics/handler_test.go +++ b/pkg/flowaggregator/apiserver/handlers/recordmetrics/handler_test.go @@ -31,10 +31,14 @@ func TestRecordMetricsQuery(t *testing.T) { ctrl := gomock.NewController(t) faq := queriertest.NewMockFlowAggregatorQuerier(ctrl) faq.EXPECT().GetRecordMetrics().Return(querier.Metrics{ - NumRecordsExported: 20, - NumRecordsReceived: 15, - NumFlows: 30, - NumConnToCollector: 1, + NumRecordsExported: 20, + NumRecordsReceived: 15, + NumFlows: 30, + NumConnToCollector: 1, + WithClickHouseExporter: true, + WithS3Exporter: true, + WithLogExporter: true, + WithIPFIXExporter: true, }) handler := HandleFunc(faq) @@ -48,12 +52,16 @@ func TestRecordMetricsQuery(t *testing.T) { err = json.Unmarshal(recorder.Body.Bytes(), &received) assert.Nil(t, err) assert.Equal(t, Response{ - NumRecordsExported: 20, - NumRecordsReceived: 15, - NumFlows: 30, - NumConnToCollector: 1, + NumRecordsExported: 20, + NumRecordsReceived: 15, + NumFlows: 30, + NumConnToCollector: 1, + WithClickHouseExporter: true, + WithS3Exporter: true, + WithLogExporter: true, + WithIPFIXExporter: true, }, received) - assert.Equal(t, received.GetTableRow(0), []string{"20", "15", "30", "1"}) + assert.Equal(t, received.GetTableRow(0), []string{"20", "15", "30", "1", "true", "true", "true", "true"}) } diff --git a/pkg/flowaggregator/exporter/testing/mock_exporter.go b/pkg/flowaggregator/exporter/testing/mock_exporter.go index b19c002b0f9..8307816b5a8 100644 --- a/pkg/flowaggregator/exporter/testing/mock_exporter.go +++ b/pkg/flowaggregator/exporter/testing/mock_exporter.go @@ -1,4 +1,4 @@ -// Copyright 2023 Antrea Authors +// Copyright 2024 Antrea Authors // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. diff --git a/pkg/flowaggregator/flowaggregator.go b/pkg/flowaggregator/flowaggregator.go index e674c1e6f63..e11ea72945b 100644 --- a/pkg/flowaggregator/flowaggregator.go +++ b/pkg/flowaggregator/flowaggregator.go @@ -553,10 +553,14 @@ func (fa *flowAggregator) GetFlowRecords(flowKey *ipfixintermediate.FlowKey) []m func (fa *flowAggregator) GetRecordMetrics() querier.Metrics { return querier.Metrics{ - NumRecordsExported: fa.numRecordsExported, - NumRecordsReceived: fa.collectingProcess.GetNumRecordsReceived(), - NumFlows: fa.aggregationProcess.GetNumFlows(), - NumConnToCollector: fa.collectingProcess.GetNumConnToCollector(), + NumRecordsExported: fa.numRecordsExported, + NumRecordsReceived: fa.collectingProcess.GetNumRecordsReceived(), + NumFlows: fa.aggregationProcess.GetNumFlows(), + NumConnToCollector: fa.collectingProcess.GetNumConnToCollector(), + WithClickHouseExporter: fa.clickHouseExporter != nil, + WithS3Exporter: fa.s3Exporter != nil, + WithLogExporter: fa.logExporter != nil, + WithIPFIXExporter: fa.ipfixExporter != nil, } } diff --git a/pkg/flowaggregator/flowaggregator_test.go b/pkg/flowaggregator/flowaggregator_test.go index 392ce829618..99e1951a50a 100644 --- a/pkg/flowaggregator/flowaggregator_test.go +++ b/pkg/flowaggregator/flowaggregator_test.go @@ -725,17 +725,29 @@ func TestFlowAggregator_GetRecordMetrics(t *testing.T) { ctrl := gomock.NewController(t) mockCollectingProcess := ipfixtesting.NewMockIPFIXCollectingProcess(ctrl) mockAggregationProcess := ipfixtesting.NewMockIPFIXAggregationProcess(ctrl) + mockIPFIXExporter := exportertesting.NewMockInterface(ctrl) + mockClickHouseExporter := exportertesting.NewMockInterface(ctrl) + mockS3Exporter := exportertesting.NewMockInterface(ctrl) + mockLogExporter := exportertesting.NewMockInterface(ctrl) want := querier.Metrics{ - NumRecordsExported: 1, - NumRecordsReceived: 1, - NumFlows: 1, - NumConnToCollector: 1, + NumRecordsExported: 1, + NumRecordsReceived: 1, + NumFlows: 1, + NumConnToCollector: 1, + WithClickHouseExporter: true, + WithS3Exporter: true, + WithLogExporter: true, + WithIPFIXExporter: true, } fa := &flowAggregator{ collectingProcess: mockCollectingProcess, aggregationProcess: mockAggregationProcess, numRecordsExported: 1, + clickHouseExporter: mockClickHouseExporter, + s3Exporter: mockS3Exporter, + logExporter: mockLogExporter, + ipfixExporter: mockIPFIXExporter, } mockCollectingProcess.EXPECT().GetNumRecordsReceived().Return(int64(1)) diff --git a/pkg/flowaggregator/querier/querier.go b/pkg/flowaggregator/querier/querier.go index de694375a1d..349f5ed9fbd 100644 --- a/pkg/flowaggregator/querier/querier.go +++ b/pkg/flowaggregator/querier/querier.go @@ -19,10 +19,14 @@ import ( ) type Metrics struct { - NumRecordsExported int64 - NumRecordsReceived int64 - NumFlows int64 - NumConnToCollector int64 + NumRecordsExported int64 + NumRecordsReceived int64 + NumFlows int64 + NumConnToCollector int64 + WithClickHouseExporter bool + WithS3Exporter bool + WithLogExporter bool + WithIPFIXExporter bool } type FlowAggregatorQuerier interface { diff --git a/test/e2e/flowaggregator_test.go b/test/e2e/flowaggregator_test.go index 85abc98d7df..f5d48d7174b 100644 --- a/test/e2e/flowaggregator_test.go +++ b/test/e2e/flowaggregator_test.go @@ -19,7 +19,6 @@ import ( "encoding/json" "fmt" "net" - "regexp" "strconv" "strings" "testing" @@ -30,6 +29,7 @@ import ( ipfixregistry "github.com/vmware/go-ipfix/pkg/registry" corev1 "k8s.io/api/core/v1" networkingv1 "k8s.io/api/networking/v1" + "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/utils/strings/slices" @@ -39,6 +39,7 @@ import ( "antrea.io/antrea/pkg/antctl" "antrea.io/antrea/pkg/antctl/runtime" secv1beta1 "antrea.io/antrea/pkg/apis/crd/v1beta1" + "antrea.io/antrea/pkg/flowaggregator/apiserver/handlers/recordmetrics" "antrea.io/antrea/test/e2e/utils" ) @@ -168,6 +169,10 @@ type testFlow struct { checkDstSvc bool } +type IPFIXCollectorResponse struct { + FlowRecords []string `json:"flowRecords"` +} + func TestFlowAggregatorSecureConnection(t *testing.T) { skipIfNotFlowVisibilityTest(t) skipIfHasWindowsNodes(t) @@ -209,6 +214,11 @@ func TestFlowAggregatorSecureConnection(t *testing.T) { if err != nil { t.Fatalf("Error when setting up test: %v", err) } + // Check recordmetrics of Flow Aggregator to make sure Antrea-agent Pods/ClickHouse/IPFIX collector and Flow Aggregator + // are correctly connected + if err := getAndCheckFlowAggregatorMetrics(t, data); err != nil { + t.Fatalf("Error when checking metrics of Flow Aggregator: %v", err) + } t.Run(o.name, func(t *testing.T) { defer func() { teardownTest(t, data) @@ -240,6 +250,9 @@ func TestFlowAggregator(t *testing.T) { if err != nil { t.Fatalf("Error when setting up test: %v", err) } + if err := getAndCheckFlowAggregatorMetrics(t, data); err != nil { + t.Fatalf("Error when checking metrics of Flow Aggregator: %v", err) + } defer func() { teardownTest(t, data) // Execute teardownFlowAggregator later than teardownTest to ensure that the log @@ -306,7 +319,7 @@ func testHelper(t *testing.T, data *TestData, isIPv6 bool) { // records from previous subtests. To mitigate this, we add a different label to perftest Pods during each subtest // before initiating traffic. This label is then employed as a filter when collecting records from either the // ClickHouse or the IPFIX collector Pod. - addLabelToPerftestPods(t, data, label) + addLabelToTestPods(t, data, label, podNames) checkIntraNodeFlows(t, data, podAIPs, podBIPs, isIPv6, label) }) @@ -316,7 +329,7 @@ func testHelper(t *testing.T, data *TestData, isIPv6 bool) { t.Run("IntraNodeDenyConnIngressANP", func(t *testing.T) { skipIfAntreaPolicyDisabled(t) label := "IntraNodeDenyConnIngressANP" - addLabelToPerftestPods(t, data, label) + addLabelToTestPods(t, data, label, podNames) anp1, anp2 := deployDenyAntreaNetworkPolicies(t, data, "perftest-a", "perftest-b", "perftest-d", controlPlaneNodeName(), controlPlaneNodeName(), true) defer func() { if anp1 != nil { @@ -353,7 +366,7 @@ func testHelper(t *testing.T, data *TestData, isIPv6 bool) { t.Run("IntraNodeDenyConnEgressANP", func(t *testing.T) { skipIfAntreaPolicyDisabled(t) label := "IntraNodeDenyConnEgressANP" - addLabelToPerftestPods(t, data, label) + addLabelToTestPods(t, data, label, podNames) anp1, anp2 := deployDenyAntreaNetworkPolicies(t, data, "perftest-a", "perftest-b", "perftest-d", controlPlaneNodeName(), controlPlaneNodeName(), false) defer func() { if anp1 != nil { @@ -390,7 +403,7 @@ func testHelper(t *testing.T, data *TestData, isIPv6 bool) { t.Run("IntraNodeDenyConnNP", func(t *testing.T) { skipIfAntreaPolicyDisabled(t) label := "IntraNodeDenyConnNP" - addLabelToPerftestPods(t, data, label) + addLabelToTestPods(t, data, label, podNames) np1, np2 := deployDenyNetworkPolicies(t, data, "perftest-b", "perftest-d", controlPlaneNodeName(), controlPlaneNodeName()) defer func() { if np1 != nil { @@ -428,7 +441,7 @@ func testHelper(t *testing.T, data *TestData, isIPv6 bool) { t.Run("IntraNodeDenyConnIngressANPThroughSvc", func(t *testing.T) { skipIfAntreaPolicyDisabled(t) label := "IntraNodeDenyConnIngressANPThroughSvc" - addLabelToPerftestPods(t, data, label) + addLabelToTestPods(t, data, label, podNames) anp1, anp2 := deployDenyAntreaNetworkPolicies(t, data, "perftest-a", "perftest-b", "perftest-d", controlPlaneNodeName(), controlPlaneNodeName(), true) defer func() { if anp1 != nil { @@ -470,7 +483,7 @@ func testHelper(t *testing.T, data *TestData, isIPv6 bool) { t.Run("IntraNodeDenyConnEgressANPThroughSvc", func(t *testing.T) { skipIfAntreaPolicyDisabled(t) label := "IntraNodeDenyConnEgressANPThroughSvc" - addLabelToPerftestPods(t, data, label) + addLabelToTestPods(t, data, label, podNames) anp1, anp2 := deployDenyAntreaNetworkPolicies(t, data, "perftest-a", "perftest-b", "perftest-d", controlPlaneNodeName(), controlPlaneNodeName(), false) defer func() { if anp1 != nil { @@ -511,7 +524,7 @@ func testHelper(t *testing.T, data *TestData, isIPv6 bool) { t.Run("InterNodeFlows", func(t *testing.T) { skipIfAntreaPolicyDisabled(t) label := "InterNodeFlows" - addLabelToPerftestPods(t, data, label) + addLabelToTestPods(t, data, label, podNames) anp1, anp2 := deployAntreaNetworkPolicies(t, data, "perftest-a", "perftest-c", controlPlaneNodeName(), workerNodeName(1)) defer func() { if anp1 != nil { @@ -534,7 +547,7 @@ func testHelper(t *testing.T, data *TestData, isIPv6 bool) { t.Run("InterNodeDenyConnIngressANP", func(t *testing.T) { skipIfAntreaPolicyDisabled(t) label := "InterNodeDenyConnIngressANP" - addLabelToPerftestPods(t, data, label) + addLabelToTestPods(t, data, label, podNames) anp1, anp2 := deployDenyAntreaNetworkPolicies(t, data, "perftest-a", "perftest-c", "perftest-e", controlPlaneNodeName(), workerNodeName(1), true) defer func() { if anp1 != nil { @@ -571,7 +584,7 @@ func testHelper(t *testing.T, data *TestData, isIPv6 bool) { t.Run("InterNodeDenyConnEgressANP", func(t *testing.T) { skipIfAntreaPolicyDisabled(t) label := "InterNodeDenyConnEgressANP" - addLabelToPerftestPods(t, data, label) + addLabelToTestPods(t, data, label, podNames) anp1, anp2 := deployDenyAntreaNetworkPolicies(t, data, "perftest-a", "perftest-c", "perftest-e", controlPlaneNodeName(), workerNodeName(1), false) defer func() { if anp1 != nil { @@ -608,7 +621,7 @@ func testHelper(t *testing.T, data *TestData, isIPv6 bool) { t.Run("InterNodeDenyConnNP", func(t *testing.T) { skipIfAntreaPolicyDisabled(t) label := "InterNodeDenyConnNP" - addLabelToPerftestPods(t, data, label) + addLabelToTestPods(t, data, label, podNames) np1, np2 := deployDenyNetworkPolicies(t, data, "perftest-c", "perftest-b", workerNodeName(1), controlPlaneNodeName()) defer func() { if np1 != nil { @@ -646,7 +659,7 @@ func testHelper(t *testing.T, data *TestData, isIPv6 bool) { t.Run("InterNodeDenyConnIngressANPThroughSvc", func(t *testing.T) { skipIfAntreaPolicyDisabled(t) label := "InterNodeDenyConnIngressANPThroughSvc" - addLabelToPerftestPods(t, data, label) + addLabelToTestPods(t, data, label, podNames) anp1, anp2 := deployDenyAntreaNetworkPolicies(t, data, "perftest-a", "perftest-c", "perftest-e", controlPlaneNodeName(), workerNodeName(1), true) defer func() { if anp1 != nil { @@ -693,7 +706,7 @@ func testHelper(t *testing.T, data *TestData, isIPv6 bool) { t.Run("InterNodeDenyConnEgressANPThroughSvc", func(t *testing.T) { skipIfAntreaPolicyDisabled(t) label := "InterNodeDenyConnEgressANPThroughSvc" - addLabelToPerftestPods(t, data, label) + addLabelToTestPods(t, data, label, podNames) anp1, anp2 := deployDenyAntreaNetworkPolicies(t, data, "perftest-a", "perftest-c", "perftest-e", controlPlaneNodeName(), workerNodeName(1), false) defer func() { if anp1 != nil { @@ -744,6 +757,8 @@ func testHelper(t *testing.T, data *TestData, isIPv6 bool) { // Deploy the client Pod on the control-plane node clientName, clientIPs, clientCleanupFunc := createAndWaitForPod(t, data, data.createBusyboxPodOnNode, "test-client-", nodeName(0), data.testNamespace, false) defer clientCleanupFunc() + label := "ToExternalEgressOnSourceNode" + addLabelToTestPods(t, data, label, []string{clientName}) // Create an Egress and the Egress IP is assigned to the Node running the client Pods var egressNodeIP string @@ -759,14 +774,13 @@ func testHelper(t *testing.T, data *TestData, isIPv6 bool) { } t.Logf("Egress %s is realized with Egress IP %s", egress.Name, egressNodeIP) defer data.crdClient.CrdV1beta1().Egresses().Delete(context.TODO(), egress.Name, metav1.DeleteOptions{}) - if !isIPv6 { if clientIPs.IPv4 != nil && serverIPs.IPv4 != nil { - checkRecordsForToExternalFlows(t, data, nodeName(0), clientName, clientIPs.IPv4.String(), serverIPs.IPv4.String(), serverPodPort, isIPv6, egress.Name, egressNodeIP) + checkRecordsForToExternalFlows(t, data, nodeName(0), clientName, clientIPs.IPv4.String(), serverIPs.IPv4.String(), serverPodPort, isIPv6, egress.Name, egressNodeIP, label) } } else { if clientIPs.IPv6 != nil && serverIPs.IPv6 != nil { - checkRecordsForToExternalFlows(t, data, nodeName(0), clientName, clientIPs.IPv6.String(), serverIPs.IPv6.String(), serverPodPort, isIPv6, egress.Name, egressNodeIP) + checkRecordsForToExternalFlows(t, data, nodeName(0), clientName, clientIPs.IPv6.String(), serverIPs.IPv6.String(), serverPodPort, isIPv6, egress.Name, egressNodeIP, label) } } }) @@ -784,6 +798,8 @@ func testHelper(t *testing.T, data *TestData, isIPv6 bool) { // Deploy the client Pod on the control-plane node clientName, clientIPs, clientCleanupFunc := createAndWaitForPod(t, data, data.createBusyboxPodOnNode, "test-client-", nodeName(0), data.testNamespace, false) defer clientCleanupFunc() + label := "ToExternalEgressOnOtherNode" + addLabelToTestPods(t, data, label, []string{clientName}) // Create an Egress and the Egress IP is assigned to the Node not running the client Pods var egressNodeIP string @@ -799,14 +815,13 @@ func testHelper(t *testing.T, data *TestData, isIPv6 bool) { } t.Logf("Egress %s is realized with Egress IP %s", egress.Name, egressNodeIP) defer data.crdClient.CrdV1beta1().Egresses().Delete(context.TODO(), egress.Name, metav1.DeleteOptions{}) - if !isIPv6 { if clientIPs.IPv4 != nil && serverIPs.IPv4 != nil { - checkRecordsForToExternalFlows(t, data, nodeName(0), clientName, clientIPs.IPv4.String(), serverIPs.IPv4.String(), serverPodPort, isIPv6, egress.Name, egressNodeIP) + checkRecordsForToExternalFlows(t, data, nodeName(0), clientName, clientIPs.IPv4.String(), serverIPs.IPv4.String(), serverPodPort, isIPv6, egress.Name, egressNodeIP, label) } } else { if clientIPs.IPv6 != nil && serverIPs.IPv6 != nil { - checkRecordsForToExternalFlows(t, data, nodeName(0), clientName, clientIPs.IPv6.String(), serverIPs.IPv6.String(), serverPodPort, isIPv6, egress.Name, egressNodeIP) + checkRecordsForToExternalFlows(t, data, nodeName(0), clientName, clientIPs.IPv6.String(), serverIPs.IPv6.String(), serverPodPort, isIPv6, egress.Name, egressNodeIP, label) } } }) @@ -817,14 +832,15 @@ func testHelper(t *testing.T, data *TestData, isIPv6 bool) { // Deploy the client Pod on the control-plane node clientName, clientIPs, clientCleanupFunc := createAndWaitForPod(t, data, data.createBusyboxPodOnNode, "test-client-", nodeName(0), data.testNamespace, false) defer clientCleanupFunc() - + label := "ToExternalFlows" + addLabelToTestPods(t, data, label, []string{clientName}) if !isIPv6 { if clientIPs.IPv4 != nil && serverIPs.IPv4 != nil { - checkRecordsForToExternalFlows(t, data, nodeName(0), clientName, clientIPs.IPv4.String(), serverIPs.IPv4.String(), serverPodPort, isIPv6, "", "") + checkRecordsForToExternalFlows(t, data, nodeName(0), clientName, clientIPs.IPv4.String(), serverIPs.IPv4.String(), serverPodPort, isIPv6, "", "", label) } } else { if clientIPs.IPv6 != nil && serverIPs.IPv6 != nil { - checkRecordsForToExternalFlows(t, data, nodeName(0), clientName, clientIPs.IPv6.String(), serverIPs.IPv6.String(), serverPodPort, isIPv6, "", "") + checkRecordsForToExternalFlows(t, data, nodeName(0), clientName, clientIPs.IPv6.String(), serverIPs.IPv6.String(), serverPodPort, isIPv6, "", "", label) } } }) @@ -833,7 +849,7 @@ func testHelper(t *testing.T, data *TestData, isIPv6 bool) { t.Run("LocalServiceAccess", func(t *testing.T) { skipIfProxyDisabled(t, data) label := "LocalServiceAccess" - addLabelToPerftestPods(t, data, label) + addLabelToTestPods(t, data, label, podNames) // In dual stack cluster, Service IP can be assigned as different IP family from specified. // In that case, source IP and destination IP will align with IP family of Service IP. // For IPv4-only and IPv6-only cluster, IP family of Service IP will be same as Pod IPs. @@ -849,7 +865,7 @@ func testHelper(t *testing.T, data *TestData, isIPv6 bool) { t.Run("RemoteServiceAccess", func(t *testing.T) { skipIfProxyDisabled(t, data) label := "RemoteServiceAccess" - addLabelToPerftestPods(t, data, label) + addLabelToTestPods(t, data, label, podNames) // In dual stack cluster, Service IP can be assigned as different IP family from specified. // In that case, source IP and destination IP will align with IP family of Service IP. // For IPv4-only and IPv6-only cluster, IP family of Service IP will be same as Pod IPs. @@ -987,79 +1003,71 @@ func checkRecordsForFlows(t *testing.T, data *TestData, srcIP string, dstIP stri func checkRecordsForFlowsCollector(t *testing.T, data *TestData, srcIP, dstIP, srcPort string, isIPv6, isIntraNode, checkService, checkK8sNetworkPolicy, checkAntreaNetworkPolicy bool, bandwidthInMbps float64, labelFilter string) { collectorOutput, recordSlices := getCollectorOutput(t, srcIP, dstIP, srcPort, checkService, true, isIPv6, data, labelFilter) + // Checking only data records as data records cannot be decoded without template + // record. + assert.GreaterOrEqualf(t, len(recordSlices), expectedNumDataRecords, "IPFIX collector should receive expected number of flow records. Considered records: %s \n Collector output: %s", recordSlices, collectorOutput) // Iterate over recordSlices and build some results to test with expected results - dataRecordsCount := 0 - src, dst := matchSrcAndDstAddress(srcIP, dstIP, checkService, isIPv6) for _, record := range recordSlices { - // Check the source port along with source and destination IPs as there - // are flow records for control flows during the iperf with same IPs - // and destination port. - if strings.Contains(record, src) && strings.Contains(record, dst) && strings.Contains(record, srcPort) { - dataRecordsCount = dataRecordsCount + 1 - // Check if record has both Pod name of source and destination Pod. + // Check if record has both Pod name of source and destination Pod. + if isIntraNode { + checkPodAndNodeData(t, record, "perftest-a", controlPlaneNodeName(), "perftest-b", controlPlaneNodeName(), data.testNamespace) + checkFlowType(t, record, ipfixregistry.FlowTypeIntraNode) + } else { + checkPodAndNodeData(t, record, "perftest-a", controlPlaneNodeName(), "perftest-c", workerNodeName(1), data.testNamespace) + checkFlowType(t, record, ipfixregistry.FlowTypeInterNode) + } + assert := assert.New(t) + if checkService { if isIntraNode { - checkPodAndNodeData(t, record, "perftest-a", controlPlaneNodeName(), "perftest-b", controlPlaneNodeName(), data.testNamespace) - checkFlowType(t, record, ipfixregistry.FlowTypeIntraNode) + assert.Contains(record, data.testNamespace+"/perftest-b", "Record with ServiceIP does not have Service name") } else { - checkPodAndNodeData(t, record, "perftest-a", controlPlaneNodeName(), "perftest-c", workerNodeName(1), data.testNamespace) - checkFlowType(t, record, ipfixregistry.FlowTypeInterNode) - } - assert := assert.New(t) - if checkService { - if isIntraNode { - assert.Contains(record, data.testNamespace+"/perftest-b", "Record with ServiceIP does not have Service name") - } else { - assert.Contains(record, data.testNamespace+"/perftest-c", "Record with ServiceIP does not have Service name") - } - } - if checkK8sNetworkPolicy { - // Check if records have both ingress and egress network policies. - assert.Contains(record, fmt.Sprintf("ingressNetworkPolicyName: %s", ingressAllowNetworkPolicyName), "Record does not have the correct NetworkPolicy name with the ingress rule") - assert.Contains(record, fmt.Sprintf("ingressNetworkPolicyNamespace: %s", data.testNamespace), "Record does not have the correct NetworkPolicy Namespace with the ingress rule") - assert.Contains(record, fmt.Sprintf("ingressNetworkPolicyType: %d", ipfixregistry.PolicyTypeK8sNetworkPolicy), "Record does not have the correct NetworkPolicy Type with the ingress rule") - assert.Contains(record, fmt.Sprintf("egressNetworkPolicyName: %s", egressAllowNetworkPolicyName), "Record does not have the correct NetworkPolicy name with the egress rule") - assert.Contains(record, fmt.Sprintf("egressNetworkPolicyNamespace: %s", data.testNamespace), "Record does not have the correct NetworkPolicy Namespace with the egress rule") - assert.Contains(record, fmt.Sprintf("egressNetworkPolicyType: %d", ipfixregistry.PolicyTypeK8sNetworkPolicy), "Record does not have the correct NetworkPolicy Type with the egress rule") - } - if checkAntreaNetworkPolicy { - // Check if records have both ingress and egress network policies. - assert.Contains(record, fmt.Sprintf("ingressNetworkPolicyName: %s", ingressAntreaNetworkPolicyName), "Record does not have the correct NetworkPolicy name with the ingress rule") - assert.Contains(record, fmt.Sprintf("ingressNetworkPolicyNamespace: %s", data.testNamespace), "Record does not have the correct NetworkPolicy Namespace with the ingress rule") - assert.Contains(record, fmt.Sprintf("ingressNetworkPolicyType: %d", ipfixregistry.PolicyTypeAntreaNetworkPolicy), "Record does not have the correct NetworkPolicy Type with the ingress rule") - assert.Contains(record, fmt.Sprintf("ingressNetworkPolicyRuleName: %s", testIngressRuleName), "Record does not have the correct NetworkPolicy RuleName with the ingress rule") - assert.Contains(record, fmt.Sprintf("ingressNetworkPolicyRuleAction: %d", ipfixregistry.NetworkPolicyRuleActionAllow), "Record does not have the correct NetworkPolicy RuleAction with the ingress rule") - assert.Contains(record, fmt.Sprintf("egressNetworkPolicyName: %s", egressAntreaNetworkPolicyName), "Record does not have the correct NetworkPolicy name with the egress rule") - assert.Contains(record, fmt.Sprintf("egressNetworkPolicyNamespace: %s", data.testNamespace), "Record does not have the correct NetworkPolicy Namespace with the egress rule") - assert.Contains(record, fmt.Sprintf("egressNetworkPolicyType: %d", ipfixregistry.PolicyTypeAntreaNetworkPolicy), "Record does not have the correct NetworkPolicy Type with the egress rule") - assert.Contains(record, fmt.Sprintf("egressNetworkPolicyRuleName: %s", testEgressRuleName), "Record does not have the correct NetworkPolicy RuleName with the egress rule") - assert.Contains(record, fmt.Sprintf("egressNetworkPolicyRuleAction: %d", ipfixregistry.NetworkPolicyRuleActionAllow), "Record does not have the correct NetworkPolicy RuleAction with the egress rule") + assert.Contains(record, data.testNamespace+"/perftest-c", "Record with ServiceIP does not have Service name") } + } + if checkK8sNetworkPolicy { + // Check if records have both ingress and egress network policies. + assert.Contains(record, fmt.Sprintf("ingressNetworkPolicyName: %s", ingressAllowNetworkPolicyName), "Record does not have the correct NetworkPolicy name with the ingress rule") + assert.Contains(record, fmt.Sprintf("ingressNetworkPolicyNamespace: %s", data.testNamespace), "Record does not have the correct NetworkPolicy Namespace with the ingress rule") + assert.Contains(record, fmt.Sprintf("ingressNetworkPolicyType: %d", ipfixregistry.PolicyTypeK8sNetworkPolicy), "Record does not have the correct NetworkPolicy Type with the ingress rule") + assert.Contains(record, fmt.Sprintf("egressNetworkPolicyName: %s", egressAllowNetworkPolicyName), "Record does not have the correct NetworkPolicy name with the egress rule") + assert.Contains(record, fmt.Sprintf("egressNetworkPolicyNamespace: %s", data.testNamespace), "Record does not have the correct NetworkPolicy Namespace with the egress rule") + assert.Contains(record, fmt.Sprintf("egressNetworkPolicyType: %d", ipfixregistry.PolicyTypeK8sNetworkPolicy), "Record does not have the correct NetworkPolicy Type with the egress rule") + } + if checkAntreaNetworkPolicy { + // Check if records have both ingress and egress network policies. + assert.Contains(record, fmt.Sprintf("ingressNetworkPolicyName: %s", ingressAntreaNetworkPolicyName), "Record does not have the correct NetworkPolicy name with the ingress rule") + assert.Contains(record, fmt.Sprintf("ingressNetworkPolicyNamespace: %s", data.testNamespace), "Record does not have the correct NetworkPolicy Namespace with the ingress rule") + assert.Contains(record, fmt.Sprintf("ingressNetworkPolicyType: %d", ipfixregistry.PolicyTypeAntreaNetworkPolicy), "Record does not have the correct NetworkPolicy Type with the ingress rule") + assert.Contains(record, fmt.Sprintf("ingressNetworkPolicyRuleName: %s", testIngressRuleName), "Record does not have the correct NetworkPolicy RuleName with the ingress rule") + assert.Contains(record, fmt.Sprintf("ingressNetworkPolicyRuleAction: %d", ipfixregistry.NetworkPolicyRuleActionAllow), "Record does not have the correct NetworkPolicy RuleAction with the ingress rule") + assert.Contains(record, fmt.Sprintf("egressNetworkPolicyName: %s", egressAntreaNetworkPolicyName), "Record does not have the correct NetworkPolicy name with the egress rule") + assert.Contains(record, fmt.Sprintf("egressNetworkPolicyNamespace: %s", data.testNamespace), "Record does not have the correct NetworkPolicy Namespace with the egress rule") + assert.Contains(record, fmt.Sprintf("egressNetworkPolicyType: %d", ipfixregistry.PolicyTypeAntreaNetworkPolicy), "Record does not have the correct NetworkPolicy Type with the egress rule") + assert.Contains(record, fmt.Sprintf("egressNetworkPolicyRuleName: %s", testEgressRuleName), "Record does not have the correct NetworkPolicy RuleName with the egress rule") + assert.Contains(record, fmt.Sprintf("egressNetworkPolicyRuleAction: %d", ipfixregistry.NetworkPolicyRuleActionAllow), "Record does not have the correct NetworkPolicy RuleAction with the egress rule") + } - // Skip the bandwidth check for the iperf control flow records which have 0 throughput. - if !strings.Contains(record, "throughput: 0") { - flowStartTime := int64(getUint64FieldFromRecord(t, record, "flowStartSeconds")) - exportTime := int64(getUint64FieldFromRecord(t, record, "flowEndSeconds")) - flowEndReason := int64(getUint64FieldFromRecord(t, record, "flowEndReason")) - var recBandwidth float64 - // flowEndReason == 3 means the end of flow detected - if exportTime >= flowStartTime+iperfTimeSec || flowEndReason == 3 { - // Check average bandwidth on the last record. - octetTotalCount := getUint64FieldFromRecord(t, record, "octetTotalCount") - recBandwidth = float64(octetTotalCount) * 8 / float64(iperfTimeSec) / 1000000 - } else { - // Check bandwidth with the field "throughput" except for the last record, - // as their throughput may be significantly lower than the average Iperf throughput. - throughput := getUint64FieldFromRecord(t, record, "throughput") - recBandwidth = float64(throughput) / 1000000 - } - t.Logf("Throughput check on record with flowEndSeconds-flowStartSeconds: %v, Iperf throughput: %.2f Mbits/s, IPFIX record throughput: %.2f Mbits/s", exportTime-flowStartTime, bandwidthInMbps, recBandwidth) - assert.InDeltaf(recBandwidth, bandwidthInMbps, bandwidthInMbps*0.15, "Difference between Iperf bandwidth and IPFIX record bandwidth should be lower than 15%%, record: %s", record) + // Skip the bandwidth check for the iperf control flow records which have 0 throughput. + if !strings.Contains(record, "throughput: 0") { + flowStartTime := int64(getUint64FieldFromRecord(t, record, "flowStartSeconds")) + exportTime := int64(getUint64FieldFromRecord(t, record, "flowEndSeconds")) + flowEndReason := int64(getUint64FieldFromRecord(t, record, "flowEndReason")) + var recBandwidth float64 + // flowEndReason == 3 means the end of flow detected + if flowEndReason == 3 { + // Check average bandwidth on the last record. + octetTotalCount := getUint64FieldFromRecord(t, record, "octetTotalCount") + recBandwidth = float64(octetTotalCount) * 8 / float64(iperfTimeSec) / 1000000 + } else { + // Check bandwidth with the field "throughput" except for the last record, + // as their throughput may be significantly lower than the average Iperf throughput. + throughput := getUint64FieldFromRecord(t, record, "throughput") + recBandwidth = float64(throughput) / 1000000 } + t.Logf("Throughput check on record with flowEndSeconds-flowStartSeconds: %v, Iperf throughput: %.2f Mbits/s, IPFIX record throughput: %.2f Mbits/s", exportTime-flowStartTime, bandwidthInMbps, recBandwidth) + assert.InDeltaf(recBandwidth, bandwidthInMbps, bandwidthInMbps*0.15, "Difference between Iperf bandwidth and IPFIX record bandwidth should be lower than 15%%, record: %s", record) } } - // Checking only data records as data records cannot be decoded without template - // record. - assert.GreaterOrEqualf(t, dataRecordsCount, expectedNumDataRecords, "IPFIX collector should receive expected number of flow records. Considered records: %s \n Collector output: %s", recordSlices, collectorOutput) } func checkRecordsForFlowsClickHouse(t *testing.T, data *TestData, srcIP, dstIP, srcPort string, isIntraNode, checkService, checkK8sNetworkPolicy, checkAntreaNetworkPolicy bool, bandwidthInMbps float64, labelFilter string) { @@ -1114,7 +1122,7 @@ func checkRecordsForFlowsClickHouse(t *testing.T, data *TestData, srcIP, dstIP, exportTime := record.FlowEndSeconds.Unix() var recBandwidth float64 // flowEndReason == 3 means the end of flow detected - if exportTime >= flowStartTime+iperfTimeSec || record.FlowEndReason == 3 { + if record.FlowEndReason == 3 { octetTotalCount := record.OctetTotalCount recBandwidth = float64(octetTotalCount) * 8 / float64(exportTime-flowStartTime) / 1000000 } else { @@ -1132,7 +1140,7 @@ func checkRecordsForFlowsClickHouse(t *testing.T, data *TestData, srcIP, dstIP, assert.GreaterOrEqualf(t, len(clickHouseRecords), expectedNumDataRecords, "ClickHouse should receive expected number of flow records. Considered records: %s", clickHouseRecords) } -func checkRecordsForToExternalFlows(t *testing.T, data *TestData, srcNodeName string, srcPodName string, srcIP string, dstIP string, dstPort int32, isIPv6 bool, egressName, egressIP string) { +func checkRecordsForToExternalFlows(t *testing.T, data *TestData, srcNodeName string, srcPodName string, srcIP string, dstIP string, dstPort int32, isIPv6 bool, egressName, egressIP, labelFilter string) { var cmd string if !isIPv6 { cmd = fmt.Sprintf("wget -O- %s:%d", dstIP, dstPort) @@ -1141,24 +1149,19 @@ func checkRecordsForToExternalFlows(t *testing.T, data *TestData, srcNodeName st } stdout, stderr, err := data.RunCommandFromPod(data.testNamespace, srcPodName, busyboxContainerName, strings.Fields(cmd)) require.NoErrorf(t, err, "Error when running wget command, stdout: %s, stderr: %s", stdout, stderr) - - _, recordSlices := getCollectorOutput(t, srcIP, dstIP, "", false, false, isIPv6, data, "") + _, recordSlices := getCollectorOutput(t, srcIP, dstIP, "", false, false, isIPv6, data, labelFilter) for _, record := range recordSlices { - if strings.Contains(record, srcIP) && strings.Contains(record, dstIP) { - checkPodAndNodeData(t, record, srcPodName, srcNodeName, "", "", data.testNamespace) - checkFlowType(t, record, ipfixregistry.FlowTypeToExternal) - assert.NotContains(t, record, "octetDeltaCount: 0", "octetDeltaCount should be non-zero") - if egressName != "" { - checkEgressInfo(t, record, egressName, egressIP) - } + checkPodAndNodeData(t, record, srcPodName, srcNodeName, "", "", data.testNamespace) + checkFlowType(t, record, ipfixregistry.FlowTypeToExternal) + if egressName != "" { + checkEgressInfo(t, record, egressName, egressIP) } } - clickHouseRecords := getClickHouseOutput(t, data, srcIP, dstIP, "", false, false, "") + clickHouseRecords := getClickHouseOutput(t, data, srcIP, dstIP, "", false, false, labelFilter) for _, record := range clickHouseRecords { checkPodAndNodeDataClickHouse(data, t, record, srcPodName, srcNodeName, "", "") checkFlowTypeClickHouse(t, record, ipfixregistry.FlowTypeToExternal) - assert.Greater(t, record.OctetDeltaCount, uint64(0), "octetDeltaCount should be non-zero") if egressName != "" { checkEgressInfoClickHouse(t, record, egressName, egressIP) } @@ -1418,31 +1421,36 @@ func getCollectorOutput(t *testing.T, srcIP, dstIP, srcPort string, isDstService err := wait.PollImmediate(500*time.Millisecond, exporterActiveFlowExportTimeout+aggregatorActiveFlowRecordTimeout*2, func() (bool, error) { var rc int var err error - // `pod-running-timeout` option is added to cover scenarios where ipfix flow-collector has crashed after being deployed - rc, collectorOutput, _, err = data.RunCommandOnNode(controlPlaneNodeName(), fmt.Sprintf("kubectl logs --pod-running-timeout=%v ipfix-collector -n %s", aggregatorInactiveFlowRecordTimeout.String(), data.testNamespace)) + var cmd string + ipfixCollectorIP, err := testData.podWaitForIPs(defaultTimeout, "ipfix-collector", testData.testNamespace) + if err != nil || len(ipfixCollectorIP.IPStrings) == 0 { + require.NoErrorf(t, err, "Should be able to get IP from IPFIX collector Pod") + } + if !isIPv6 { + cmd = fmt.Sprintf("curl http://%s:8080/records", ipfixCollectorIP.IPv4.String()) + } else { + cmd = fmt.Sprintf("curl http://[%s]:8080/records", ipfixCollectorIP.IPv6.String()) + } + rc, collectorOutput, _, err = data.RunCommandOnNode(controlPlaneNodeName(), cmd) if err != nil || rc != 0 { return false, err } // Checking that all the data records which correspond to the iperf flow are received - recordSlices = getRecordsFromOutput(t, collectorOutput, labelFilter) src, dst := matchSrcAndDstAddress(srcIP, dstIP, isDstService, isIPv6) + recordSlices = getRecordsFromOutput(t, collectorOutput, labelFilter, src, dst, srcPort) if checkAllRecords { for _, record := range recordSlices { - flowStartTime := int64(getUint64FieldFromRecord(t, record, "flowStartSeconds")) - exportTime := int64(getUint64FieldFromRecord(t, record, "flowEndSeconds")) flowEndReason := int64(getUint64FieldFromRecord(t, record, "flowEndReason")) - if strings.Contains(record, src) && strings.Contains(record, dst) && strings.Contains(record, srcPort) { - // flowEndReason == 3 means the end of flow detected - if exportTime >= flowStartTime+iperfTimeSec || flowEndReason == 3 { - return true, nil - } + // flowEndReason == 3 means the end of flow detected + if flowEndReason == 3 { + return true, nil } } return false, nil } - return strings.Contains(collectorOutput, src) && strings.Contains(collectorOutput, dst) && strings.Contains(collectorOutput, srcPort), nil + return len(recordSlices) != 0, nil }) - require.NoErrorf(t, err, "IPFIX collector did not receive the expected records in collector output: %v iperf source port: %s", collectorOutput, srcPort) + require.NoErrorf(t, err, "IPFIX collector did not receive the expected records in collector, recordSlices ares: %v, output: %v iperf source port: %s", recordSlices, collectorOutput, srcPort) return collectorOutput, recordSlices } @@ -1454,9 +1462,9 @@ func getClickHouseOutput(t *testing.T, data *TestData, srcIP, dstIP, srcPort str var flowRecords []*ClickHouseFullRow var queryOutput string - query := fmt.Sprintf("SELECT * FROM flows WHERE (sourceIP = '%s') AND (destinationIP = '%s')", srcIP, dstIP) + query := fmt.Sprintf("SELECT * FROM flows WHERE (sourceIP = '%s') AND (destinationIP = '%s') AND (octetDeltaCount != 0)", srcIP, dstIP) if isDstService { - query = fmt.Sprintf("SELECT * FROM flows WHERE (sourceIP = '%s') AND (destinationClusterIP = '%s')", srcIP, dstIP) + query = fmt.Sprintf("SELECT * FROM flows WHERE (sourceIP = '%s') AND (destinationClusterIP = '%s') AND (octetDeltaCount != 0)", srcIP, dstIP) } if len(srcPort) > 0 { query = fmt.Sprintf("%s AND (sourceTransportPort = %s)", query, srcPort) @@ -1477,7 +1485,6 @@ func getClickHouseOutput(t *testing.T, data *TestData, srcIP, dstIP, srcPort str if err != nil { return false, err } - rows := strings.Split(queryOutput, "\n") flowRecords = make([]*ClickHouseFullRow, 0, len(rows)) for _, row := range rows { @@ -1495,10 +1502,8 @@ func getClickHouseOutput(t *testing.T, data *TestData, srcIP, dstIP, srcPort str if checkAllRecords { for _, record := range flowRecords { - flowStartTime := record.FlowStartSeconds.Unix() - exportTime := record.FlowEndSeconds.Unix() // flowEndReason == 3 means the end of flow detected - if exportTime >= flowStartTime+iperfTimeSec || record.FlowEndReason == 3 { + if record.FlowEndReason == 3 { return true, nil } } @@ -1510,17 +1515,24 @@ func getClickHouseOutput(t *testing.T, data *TestData, srcIP, dstIP, srcPort str return flowRecords } -func getRecordsFromOutput(t *testing.T, output, labelFilter string) []string { - re := regexp.MustCompile("(?m)^.*" + "#" + ".*$[\r\n]+") - output = re.ReplaceAllString(output, "") - output = strings.TrimSpace(output) - recordSlices := strings.Split(output, "IPFIX-HDR:") - if labelFilter == "" { - return recordSlices +func getRecordsFromOutput(t *testing.T, output, labelFilter, src, dst, srcPort string) []string { + var response IPFIXCollectorResponse + err := json.Unmarshal([]byte(output), &response) + if err != nil { + require.NoErrorf(t, err, "error when unmarshall output from IPFIX collector Pod") } + recordSlices := response.FlowRecords records := []string{} for _, recordSlice := range recordSlices { - if strings.Contains(recordSlice, labelFilter) { + // We don't check the last record. + if strings.Contains(recordSlice, "octetDeltaCount: 0") { + continue + } + // We don't check the record that can't match the srcIP, dstIP and srcPort. + if !strings.Contains(recordSlice, src) || !strings.Contains(recordSlice, dst) || !strings.Contains(recordSlice, srcPort) { + continue + } + if labelFilter == "" || strings.Contains(recordSlice, labelFilter) { records = append(records, recordSlice) } } @@ -1753,14 +1765,24 @@ func deletePerftestServices(t *testing.T, data *TestData) { } } -func addLabelToPerftestPods(t *testing.T, data *TestData, label string) { - perftestPods, err := data.clientset.CoreV1().Pods(data.testNamespace).List(context.TODO(), metav1.ListOptions{LabelSelector: "app=iperf"}) - require.NoError(t, err, "Error when getting perftest Pods") - for i := range perftestPods.Items { - pod := &perftestPods.Items[i] - pod.Labels["targetLabel"] = label - _, err = data.clientset.CoreV1().Pods(data.testNamespace).Update(context.TODO(), pod, metav1.UpdateOptions{}) - require.NoErrorf(t, err, "Error when adding label to %s", pod.Name) +func addLabelToTestPods(t *testing.T, data *TestData, label string, podNames []string) { + for _, podName := range podNames { + testPod, err := data.clientset.CoreV1().Pods(data.testNamespace).Get(context.TODO(), podName, metav1.GetOptions{}) + require.NoErrorf(t, err, "Error when getting Pod %s in %s", testPod, data.testNamespace) + testPod.Labels["targetLabel"] = label + _, err = data.clientset.CoreV1().Pods(data.testNamespace).Update(context.TODO(), testPod, metav1.UpdateOptions{}) + require.NoErrorf(t, err, "Error when adding label to %s", testPod.Name) + err = wait.Poll(defaultInterval, timeout, func() (bool, error) { + pod, err := data.clientset.CoreV1().Pods(data.testNamespace).Get(context.TODO(), testPod.Name, metav1.GetOptions{}) + if err != nil { + if errors.IsNotFound(err) { + return false, nil + } + return false, fmt.Errorf("error when getting Pod '%s': %w", pod.Name, err) + } + return pod.Labels["targetLabel"] == label, nil + }) + require.NoErrorf(t, err, "Error when verifying the label on %s", testPod.Name) } } @@ -1815,6 +1837,34 @@ func createToExternalTestServer(t *testing.T, data *TestData) *PodIPs { return serverIPs } +func getAndCheckFlowAggregatorMetrics(t *testing.T, data *TestData) error { + flowAggPod, err := data.getFlowAggregator() + if err != nil { + return fmt.Errorf("error when getting flow-aggregator Pod: %w", err) + } + podName := flowAggPod.Name + command := []string{"antctl", "get", "recordmetrics", "-o", "json"} + if err := wait.Poll(defaultInterval, 2*defaultTimeout, func() (bool, error) { + stdout, _, err := runAntctl(podName, command, data) + if err != nil { + t.Logf("Error when requesting recordmetrics, %v", err) + return false, nil + } + metrics := &recordmetrics.Response{} + if err := json.Unmarshal([]byte(stdout), metrics); err != nil { + return false, fmt.Errorf("error when decoding recordmetrics: %w", err) + } + if metrics.NumConnToCollector != int64(clusterInfo.numNodes) || !metrics.WithClickHouseExporter || !metrics.WithIPFIXExporter || metrics.NumRecordsExported == 0 { + t.Logf("Metrics are not correct. Current metrics: NumConnToCollector=%d, ClickHouseExporter=%v, IPFIXExporter=%v, NumRecordsExported=%d", metrics.NumConnToCollector, metrics.WithClickHouseExporter, metrics.WithIPFIXExporter, metrics.NumRecordsExported) + return false, nil + } + return true, nil + }); err != nil { + return fmt.Errorf("error when checking recordmetrics for Flow Aggregator: %w", err) + } + return nil +} + type ClickHouseFullRow struct { TimeInserted time.Time `json:"timeInserted"` FlowStartSeconds time.Time `json:"flowStartSeconds"` diff --git a/test/e2e/framework.go b/test/e2e/framework.go index 7973fb7f9d2..a371360da6c 100644 --- a/test/e2e/framework.go +++ b/test/e2e/framework.go @@ -75,42 +75,44 @@ const ( defaultInterval = 1 * time.Second // antreaNamespace is the K8s Namespace in which all Antrea resources are running. - antreaNamespace = "kube-system" - kubeNamespace = "kube-system" - flowAggregatorNamespace = "flow-aggregator" - antreaConfigVolume = "antrea-config" - antreaWindowsConfigVolume = "antrea-windows-config" - flowAggregatorConfigVolume = "flow-aggregator-config" - antreaDaemonSet = "antrea-agent" - antreaWindowsDaemonSet = "antrea-agent-windows" - antreaDeployment = "antrea-controller" - flowAggregatorDeployment = "flow-aggregator" - flowAggregatorCHSecret = "clickhouse-ca" - antreaDefaultGW = "antrea-gw0" - testAntreaIPAMNamespace = "antrea-ipam-test" - testAntreaIPAMNamespace11 = "antrea-ipam-test-11" - testAntreaIPAMNamespace12 = "antrea-ipam-test-12" - busyboxContainerName = "busybox" - mcjoinContainerName = "mcjoin" - agnhostContainerName = "agnhost" - toolboxContainerName = "toolbox" - nginxContainerName = "nginx" - controllerContainerName = "antrea-controller" - ovsContainerName = "antrea-ovs" - agentContainerName = "antrea-agent" - antreaYML = "antrea.yml" - antreaIPSecYML = "antrea-ipsec.yml" - antreaCovYML = "antrea-coverage.yml" - antreaIPSecCovYML = "antrea-ipsec-coverage.yml" - flowAggregatorYML = "flow-aggregator.yml" - flowAggregatorCovYML = "flow-aggregator-coverage.yml" - flowVisibilityYML = "flow-visibility.yml" - flowVisibilityTLSYML = "flow-visibility-tls.yml" - chOperatorYML = "clickhouse-operator-install-bundle.yml" - flowVisibilityCHPodName = "chi-clickhouse-clickhouse-0-0-0" - flowVisibilityNamespace = "flow-visibility" - defaultBridgeName = "br-int" - monitoringNamespace = "monitoring" + antreaNamespace = "kube-system" + kubeNamespace = "kube-system" + flowAggregatorNamespace = "flow-aggregator" + antreaConfigVolume = "antrea-config" + antreaWindowsConfigVolume = "antrea-windows-config" + flowAggregatorConfigVolume = "flow-aggregator-config" + antreaDaemonSet = "antrea-agent" + antreaWindowsDaemonSet = "antrea-agent-windows" + antreaDeployment = "antrea-controller" + flowAggregatorDeployment = "flow-aggregator" + flowAggregatorCHSecret = "clickhouse-ca" + antreaDefaultGW = "antrea-gw0" + testAntreaIPAMNamespace = "antrea-ipam-test" + testAntreaIPAMNamespace11 = "antrea-ipam-test-11" + testAntreaIPAMNamespace12 = "antrea-ipam-test-12" + busyboxContainerName = "busybox" + mcjoinContainerName = "mcjoin" + agnhostContainerName = "agnhost" + toolboxContainerName = "toolbox" + nginxContainerName = "nginx" + controllerContainerName = "antrea-controller" + ovsContainerName = "antrea-ovs" + agentContainerName = "antrea-agent" + flowAggregatorContainerName = "flow-aggregator" + + antreaYML = "antrea.yml" + antreaIPSecYML = "antrea-ipsec.yml" + antreaCovYML = "antrea-coverage.yml" + antreaIPSecCovYML = "antrea-ipsec-coverage.yml" + flowAggregatorYML = "flow-aggregator.yml" + flowAggregatorCovYML = "flow-aggregator-coverage.yml" + flowVisibilityYML = "flow-visibility.yml" + flowVisibilityTLSYML = "flow-visibility-tls.yml" + chOperatorYML = "clickhouse-operator-install-bundle.yml" + flowVisibilityCHPodName = "chi-clickhouse-clickhouse-0-0-0" + flowVisibilityNamespace = "flow-visibility" + defaultBridgeName = "br-int" + monitoringNamespace = "monitoring" antreaControllerCovBinary = "antrea-controller-coverage" antreaAgentCovBinary = "antrea-agent-coverage" @@ -132,7 +134,7 @@ const ( nginxImage = "projects.registry.vmware.com/antrea/nginx:1.21.6-alpine" iisImage = "mcr.microsoft.com/windows/servercore/iis" toolboxImage = "projects.registry.vmware.com/antrea/toolbox:1.2-1" - ipfixCollectorImage = "projects.registry.vmware.com/antrea/ipfix-collector:v0.6.2" + ipfixCollectorImage = "projects.registry.vmware.com/antrea/ipfix-collector:v0.8.2" ipfixCollectorPort = "4739" clickHouseHTTPPort = "8123"