Skip to content

Commit

Permalink
Improve flow-visibility e2e test
Browse files Browse the repository at this point in the history
In this commit, we do:

1. Changed the order where we append expired records before exporting them from our exporter.
For inter-node traffic with egress/ingress np with action drop, we will receive records from PacketIn and the conntrack table. For the ingress case, there is no issue as we will receive the records from both nodes and these two records are both correlationRequired. For the egress case, the record from the conntrack table is correlationRequired and the record from PacketIn is not correlationRequired. If the record from the conntrack table arrive at the FA first, then the record will need to do correlation at FA. The ReadyToSend will be true forever as it keeps waiting to do correlation.

2. Add check to verify if Flow Exporters can successfully resolve the Flow Aggregator Service address before sending traffic.

3. Add check to verify if Flow Aggregator can successfully connect to the ClickHouse before sending traffic.

4. Add labels to External subtest to filter useless logs from the IPFIX collector Pod.

5. Confirm the correct addition of a label to a specific Pod after updating the Pod.

6. Remove the octetDeltaCount check and, instead, filter out all records with octetDeltaCount=0 when retrieving records from the IPFIX collector Pod and ClickHouse.

7. Use new image from go-ipfix. We improve the IPFIX collector by:
    a.  Disable printing records whenever we receive it. Instead, we store records in a string array.
    b. Add http listener and handler to receive request to return or reset records.
    In this way, we can reduce the retrieving log time from ~4s to ~80ms when we have ~1900 records
    inside it.

Signed-off-by: Yun-Tang Hsu <[email protected]>
  • Loading branch information
yuntanghsu committed Jan 16, 2024
1 parent c757d0c commit 3aaf468
Show file tree
Hide file tree
Showing 13 changed files with 319 additions and 222 deletions.
2 changes: 1 addition & 1 deletion ci/kind/test-e2e-kind.sh
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ COMMON_IMAGES_LIST=("registry.k8s.io/e2e-test-images/agnhost:2.29" \
"projects.registry.vmware.com/antrea/nginx:1.21.6-alpine" \
"projects.registry.vmware.com/antrea/toolbox:1.1-0")

FLOW_VISIBILITY_IMAGE_LIST=("projects.registry.vmware.com/antrea/ipfix-collector:v0.6.2" \
FLOW_VISIBILITY_IMAGE_LIST=("projects.registry.vmware.com/antrea/ipfix-collector:v0.8.2" \
"projects.registry.vmware.com/antrea/clickhouse-operator:0.21.0" \
"projects.registry.vmware.com/antrea/metrics-exporter:0.21.0" \
"projects.registry.vmware.com/antrea/clickhouse-server:23.4")
Expand Down
20 changes: 10 additions & 10 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -50,16 +50,16 @@ require (
github.com/spf13/pflag v1.0.5
github.com/stretchr/testify v1.8.4
github.com/ti-mo/conntrack v0.5.0
github.com/vishvananda/netlink v1.1.1-0.20211101163509-b10eb8fe5cf6
github.com/vmware/go-ipfix v0.7.0
go.uber.org/mock v0.3.0
golang.org/x/crypto v0.14.0
golang.org/x/mod v0.13.0
golang.org/x/net v0.17.0
golang.org/x/sync v0.4.0
golang.org/x/sys v0.13.0
golang.org/x/time v0.3.0
golang.org/x/tools v0.14.0
github.com/vishvananda/netlink v1.2.1-beta.2
github.com/vmware/go-ipfix v0.8.2
go.uber.org/mock v0.4.0
golang.org/x/crypto v0.17.0
golang.org/x/mod v0.14.0
golang.org/x/net v0.19.0
golang.org/x/sync v0.5.0
golang.org/x/sys v0.15.0
golang.org/x/time v0.5.0
golang.org/x/tools v0.16.1
golang.zx2c4.com/wireguard/wgctrl v0.0.0-20210506160403-92e472f520a5
google.golang.org/grpc v1.59.0
google.golang.org/protobuf v1.31.0
Expand Down
9 changes: 2 additions & 7 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1117,13 +1117,8 @@ github.com/vishvananda/netns v0.0.0-20191106174202-0a2b9b5464df/go.mod h1:JP3t17
github.com/vishvananda/netns v0.0.0-20200728191858-db3c7e526aae/go.mod h1:DD4vA1DwXk04H54A1oHXtwZmA0grkVMdPxx/VGLCah0=
github.com/vishvananda/netns v0.0.4 h1:Oeaw1EM2JMxD51g9uhtC0D7erkIjgmj8+JZc26m1YX8=
github.com/vishvananda/netns v0.0.4/go.mod h1:SpkAiCQRtJ6TvvxPnOSyH3BMl6unz3xZlaprSwhNNJM=
github.com/vmware/go-ipfix v0.7.0 h1:7dOth2p5eL01GKzyXg2sibJcD9Fhb8KeLrn/ysctiwE=
github.com/vmware/go-ipfix v0.7.0/go.mod h1:Y3YKMFN/Nec6QwmXcDae+uy6xuDgbejwRAZv9RTzS9c=
github.com/willf/bitset v1.1.11-0.20200630133818-d5bec3311243/go.mod h1:RjeCKbqT1RxIR/KWY6phxZiaY1IyutSBfGjNPySAYV4=
github.com/willf/bitset v1.1.11/go.mod h1:83CECat5yLh5zVOf4P1ErAgKA5UDvKtgyUABdr3+MjI=
github.com/xeipuuv/gojsonpointer v0.0.0-20180127040702-4e3ac2762d5f/go.mod h1:N2zxlSyiKSe5eX1tZViRH5QA0qijqEDrYZiPEAiq3wU=
github.com/xeipuuv/gojsonreference v0.0.0-20180127040603-bd5ef7bd5415/go.mod h1:GwrjFmJcFw6At/Gs6z4yjiIwzuJ1/+UwLxMQDVQXShQ=
github.com/xeipuuv/gojsonschema v0.0.0-20180618132009-1d523034197f/go.mod h1:5yf86TLmAcydyeJq5YvxkGPE2fm/u4myDekKRoLuqhs=
github.com/vmware/go-ipfix v0.8.2 h1:7pnmXZpI0995psJgno4Bur5fr9PCxGQuKjCI/RYurzA=
github.com/vmware/go-ipfix v0.8.2/go.mod h1:NvEehcpptPOTBaLSkMA+88l2Oe8YNelVBdvj8PA/1d0=
github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2 h1:eY9dn8+vbi4tKz5Qo6v2eYzo7kUS51QINcR5jNpbZS8=
github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2/go.mod h1:UETIi67q53MR2AWcXfiuqkDkRtnGDLqkBTpCHuJHxtU=
github.com/xlab/treeprint v1.1.0 h1:G/1DjNkPpfZCFt9CSh6b5/nY4VimlbHF3Rh4obvtzDk=
Expand Down
8 changes: 7 additions & 1 deletion pkg/agent/flowexporter/exporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,8 +260,14 @@ func (exp *FlowExporter) Run(stopCh <-chan struct{}) {
func (exp *FlowExporter) sendFlowRecords() (time.Duration, error) {
currTime := time.Now()
var expireTime1, expireTime2 time.Duration
exp.expiredConns, expireTime1 = exp.conntrackConnStore.GetExpiredConns(exp.expiredConns, currTime, maxConnsToExport)
// We export records from denyConnStore first, then conntrackConnStore. We enforce the ordering to handle a
// special case: for an inter-node connection with egress drop network policy, both conntrackConnStore and
// denyConnStore from the same Node will send out records to Flow Aggregator. If the record from conntrackConnStore
// arrives FA first, FA will not be able to capture the deny network policy metadata, and it will keep waiting
// for a record from destination Node to finish flow correlation until timeout. Later on we probably should
// consider doing a record deduplication between conntrackConnStore and denyConnStore before exporting records.
exp.expiredConns, expireTime2 = exp.denyConnStore.GetExpiredConns(exp.expiredConns, currTime, maxConnsToExport)
exp.expiredConns, expireTime1 = exp.conntrackConnStore.GetExpiredConns(exp.expiredConns, currTime, maxConnsToExport)
// Select the shorter time out among two connection stores to do the next round of export.
nextExpireTime := getMinTime(expireTime1, expireTime2)
for i := range exp.expiredConns {
Expand Down
4 changes: 4 additions & 0 deletions pkg/antctl/transform/common/transform.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,10 @@ func Int64ToString(val int64) string {
return strconv.Itoa(int(val))
}

func BoolToString(val bool) string {
return strconv.FormatBool(val)
}

func GenerateTableElementWithSummary(list []string, maxColumnLength int) string {
element := ""
sort.Strings(list)
Expand Down
30 changes: 21 additions & 9 deletions pkg/flowaggregator/apiserver/handlers/recordmetrics/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,21 +26,29 @@ import (

// Response is the response struct of recordmetrics command.
type Response struct {
NumRecordsExported int64 `json:"numRecordsExported,omitempty"`
NumRecordsReceived int64 `json:"numRecordsReceived,omitempty"`
NumFlows int64 `json:"numFlows,omitempty"`
NumConnToCollector int64 `json:"numConnToCollector,omitempty"`
NumRecordsExported int64 `json:"numRecordsExported,omitempty"`
NumRecordsReceived int64 `json:"numRecordsReceived,omitempty"`
NumFlows int64 `json:"numFlows,omitempty"`
NumConnToCollector int64 `json:"numConnToCollector,omitempty"`
WithClickHouseExporter bool `json:"withClickHouseExporter,omitempty"`
WithS3Exporter bool `json:"withS3Exporter,omitempty"`
WithLogExporter bool `json:"withLogExporter,omitempty"`
WithIPFIXExporter bool `json:"withIPFIXExporter,omitempty"`
}

// HandleFunc returns the function which can handle the /recordmetrics API request.
func HandleFunc(faq querier.FlowAggregatorQuerier) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
metrics := faq.GetRecordMetrics()
metricsResponse := Response{
NumRecordsExported: metrics.NumRecordsExported,
NumRecordsReceived: metrics.NumRecordsReceived,
NumFlows: metrics.NumFlows,
NumConnToCollector: metrics.NumConnToCollector,
NumRecordsExported: metrics.NumRecordsExported,
NumRecordsReceived: metrics.NumRecordsReceived,
NumFlows: metrics.NumFlows,
NumConnToCollector: metrics.NumConnToCollector,
WithClickHouseExporter: metrics.WithClickHouseExporter,
WithS3Exporter: metrics.WithS3Exporter,
WithLogExporter: metrics.WithLogExporter,
WithIPFIXExporter: metrics.WithIPFIXExporter,
}
err := json.NewEncoder(w).Encode(metricsResponse)
if err != nil {
Expand All @@ -51,7 +59,7 @@ func HandleFunc(faq querier.FlowAggregatorQuerier) http.HandlerFunc {
}

func (r Response) GetTableHeader() []string {
return []string{"RECORDS-EXPORTED", "RECORDS-RECEIVED", "FLOWS", "EXPORTERS-CONNECTED"}
return []string{"RECORDS-EXPORTED", "RECORDS-RECEIVED", "FLOWS", "EXPORTERS-CONNECTED", "CLICKHOUSE-EXPORTER", "S3-EXPORTER", "LOG-EXPORTER", "IPFIX-EXPORTER"}
}

func (r Response) GetTableRow(maxColumnLength int) []string {
Expand All @@ -60,6 +68,10 @@ func (r Response) GetTableRow(maxColumnLength int) []string {
common.Int64ToString(r.NumRecordsReceived),
common.Int64ToString(r.NumFlows),
common.Int64ToString(r.NumConnToCollector),
common.BoolToString(r.WithClickHouseExporter),
common.BoolToString(r.WithS3Exporter),
common.BoolToString(r.WithLogExporter),
common.BoolToString(r.WithIPFIXExporter),
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,14 @@ func TestRecordMetricsQuery(t *testing.T) {
ctrl := gomock.NewController(t)
faq := queriertest.NewMockFlowAggregatorQuerier(ctrl)
faq.EXPECT().GetRecordMetrics().Return(querier.Metrics{
NumRecordsExported: 20,
NumRecordsReceived: 15,
NumFlows: 30,
NumConnToCollector: 1,
NumRecordsExported: 20,
NumRecordsReceived: 15,
NumFlows: 30,
NumConnToCollector: 1,
WithClickHouseExporter: true,
WithS3Exporter: true,
WithLogExporter: true,
WithIPFIXExporter: true,
})

handler := HandleFunc(faq)
Expand All @@ -48,12 +52,16 @@ func TestRecordMetricsQuery(t *testing.T) {
err = json.Unmarshal(recorder.Body.Bytes(), &received)
assert.Nil(t, err)
assert.Equal(t, Response{
NumRecordsExported: 20,
NumRecordsReceived: 15,
NumFlows: 30,
NumConnToCollector: 1,
NumRecordsExported: 20,
NumRecordsReceived: 15,
NumFlows: 30,
NumConnToCollector: 1,
WithClickHouseExporter: true,
WithS3Exporter: true,
WithLogExporter: true,
WithIPFIXExporter: true,
}, received)

assert.Equal(t, received.GetTableRow(0), []string{"20", "15", "30", "1"})
assert.Equal(t, received.GetTableRow(0), []string{"20", "15", "30", "1", "true", "true", "true", "true"})

}
2 changes: 1 addition & 1 deletion pkg/flowaggregator/exporter/testing/mock_exporter.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 8 additions & 4 deletions pkg/flowaggregator/flowaggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -553,10 +553,14 @@ func (fa *flowAggregator) GetFlowRecords(flowKey *ipfixintermediate.FlowKey) []m

func (fa *flowAggregator) GetRecordMetrics() querier.Metrics {
return querier.Metrics{
NumRecordsExported: fa.numRecordsExported,
NumRecordsReceived: fa.collectingProcess.GetNumRecordsReceived(),
NumFlows: fa.aggregationProcess.GetNumFlows(),
NumConnToCollector: fa.collectingProcess.GetNumConnToCollector(),
NumRecordsExported: fa.numRecordsExported,
NumRecordsReceived: fa.collectingProcess.GetNumRecordsReceived(),
NumFlows: fa.aggregationProcess.GetNumFlows(),
NumConnToCollector: fa.collectingProcess.GetNumConnToCollector(),
WithClickHouseExporter: fa.clickHouseExporter != nil,
WithS3Exporter: fa.s3Exporter != nil,
WithLogExporter: fa.logExporter != nil,
WithIPFIXExporter: fa.ipfixExporter != nil,
}
}

Expand Down
20 changes: 16 additions & 4 deletions pkg/flowaggregator/flowaggregator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -725,17 +725,29 @@ func TestFlowAggregator_GetRecordMetrics(t *testing.T) {
ctrl := gomock.NewController(t)
mockCollectingProcess := ipfixtesting.NewMockIPFIXCollectingProcess(ctrl)
mockAggregationProcess := ipfixtesting.NewMockIPFIXAggregationProcess(ctrl)
mockIPFIXExporter := exportertesting.NewMockInterface(ctrl)
mockClickHouseExporter := exportertesting.NewMockInterface(ctrl)
mockS3Exporter := exportertesting.NewMockInterface(ctrl)
mockLogExporter := exportertesting.NewMockInterface(ctrl)
want := querier.Metrics{
NumRecordsExported: 1,
NumRecordsReceived: 1,
NumFlows: 1,
NumConnToCollector: 1,
NumRecordsExported: 1,
NumRecordsReceived: 1,
NumFlows: 1,
NumConnToCollector: 1,
WithClickHouseExporter: true,
WithS3Exporter: true,
WithLogExporter: true,
WithIPFIXExporter: true,
}

fa := &flowAggregator{
collectingProcess: mockCollectingProcess,
aggregationProcess: mockAggregationProcess,
numRecordsExported: 1,
clickHouseExporter: mockClickHouseExporter,
s3Exporter: mockS3Exporter,
logExporter: mockLogExporter,
ipfixExporter: mockIPFIXExporter,
}

mockCollectingProcess.EXPECT().GetNumRecordsReceived().Return(int64(1))
Expand Down
12 changes: 8 additions & 4 deletions pkg/flowaggregator/querier/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,14 @@ import (
)

type Metrics struct {
NumRecordsExported int64
NumRecordsReceived int64
NumFlows int64
NumConnToCollector int64
NumRecordsExported int64
NumRecordsReceived int64
NumFlows int64
NumConnToCollector int64
WithClickHouseExporter bool
WithS3Exporter bool
WithLogExporter bool
WithIPFIXExporter bool
}

type FlowAggregatorQuerier interface {
Expand Down
Loading

0 comments on commit 3aaf468

Please sign in to comment.