From 41c114b1f71ed612c5a5706b302959983432b4d7 Mon Sep 17 00:00:00 2001 From: Gregory Shimansky Date: Tue, 11 Jun 2019 06:42:07 -0700 Subject: [PATCH 01/24] Added nffPktgen to default build Enabled "make subdir" for intermediate targets --- .travis.yml | 6 +++--- examples/Makefile | 8 ++------ examples/nffPktgen/gtp-u/Dockerfile | 11 +++++++++++ examples/nffPktgen/testing/Dockerfile | 2 +- mk/include.mk | 2 -- mk/intermediate.mk | 7 ++++++- 6 files changed, 23 insertions(+), 13 deletions(-) create mode 100644 examples/nffPktgen/gtp-u/Dockerfile diff --git a/.travis.yml b/.travis.yml index 10daa873..0c8b8549 100644 --- a/.travis.yml +++ b/.travis.yml @@ -12,9 +12,9 @@ services: before_script: - sudo apt-get install -y linux-headers-$(uname -r) - - docker pull ubuntu:cosmic - - docker build --build-arg https_proxy=${https_proxy} -t test-cosmic . - - docker run -it -d --privileged -v /usr/src:/usr/src -v /lib/modules:/lib/modules -v /sys/devices/system/node:/sys/devices/system/node --name test-nff-go test-cosmic /bin/bash + - docker pull ubuntu:disco + - docker build --build-arg https_proxy=${https_proxy} -t test-disco . + - docker run -it -d --privileged -v /usr/src:/usr/src -v /lib/modules:/lib/modules -v /sys/devices/system/node:/sys/devices/system/node --name test-nff-go test-disco /bin/bash script: - docker exec -i test-nff-go go mod download diff --git a/examples/Makefile b/examples/Makefile index 3aaaef21..b477ffaf 100644 --- a/examples/Makefile +++ b/examples/Makefile @@ -8,15 +8,11 @@ EXECUTABLES = dump clonablePcapDumper kni copy errorHandling timer \ createPacket sendFixedPktsNumber gtpu pingReplay \ netlink gopacketParserExample devbind generate \ OSforwarding jumbo decrementTTL +SUBDIRS = tutorial antiddos demo fileReadWrite firewall forwarding ipsec lb nffPktgen -SUBDIRS = tutorial antiddos demo fileReadWrite firewall forwarding ipsec lb - -.PHONY: dpi nffPktgen +.PHONY: dpi dpi: $(MAKE) -C dpi -nffPktgen: - $(MAKE) -C nffPktgen - include $(PATH_TO_MK)/intermediate.mk include $(PATH_TO_MK)/leaf.mk diff --git a/examples/nffPktgen/gtp-u/Dockerfile b/examples/nffPktgen/gtp-u/Dockerfile new file mode 100644 index 00000000..0e5a33c3 --- /dev/null +++ b/examples/nffPktgen/gtp-u/Dockerfile @@ -0,0 +1,11 @@ +# Copyright 2019 Intel Corporation. +# Use of this source code is governed by a BSD-style +# license that can be found in the LICENSE file. + +ARG USER_NAME +FROM ${USER_NAME}/nff-go-base + +LABEL RUN docker run -it --privileged -v /sys/bus/pci/drivers:/sys/bus/pci/drivers -v /sys/kernel/mm/hugepages:/sys/kernel/mm/hugepages -v /sys/devices/system/node:/sys/devices/system/node -v /dev:/dev --name NAME -e NAME=NAME -e IMAGE=IMAGE IMAGE + +WORKDIR /workdir +COPY trafficgen . diff --git a/examples/nffPktgen/testing/Dockerfile b/examples/nffPktgen/testing/Dockerfile index fe79ce59..21a1514b 100644 --- a/examples/nffPktgen/testing/Dockerfile +++ b/examples/nffPktgen/testing/Dockerfile @@ -9,5 +9,5 @@ LABEL RUN docker run -it --privileged -v /sys/bus/pci/drivers:/sys/bus/pci/drive WORKDIR /workdir COPY sendGetBack . -COPY perfTesting . +COPY perfTest . COPY dump . diff --git a/mk/include.mk b/mk/include.mk index 40d5155b..fb4aa8bf 100644 --- a/mk/include.mk +++ b/mk/include.mk @@ -7,9 +7,7 @@ PROJECT_ROOT := $(abspath $(dir $(abspath $(lastword $(MAKEFILE_LIST))))/..) # Main DPDK variables -DPDK_VERSION=18.11 DPDK_DIR=dpdk -PKTGEN_VERSION=3.5.8 PKTGEN_DIR=pktgen-dpdk DPDK_INSTALL_DIR=$(RTE_TARGET)-install export RTE_SDK=$(PROJECT_ROOT)/dpdk/$(DPDK_DIR)/$(DPDK_INSTALL_DIR)/usr/local/share/dpdk diff --git a/mk/intermediate.mk b/mk/intermediate.mk index 5e508ccb..725de8c8 100644 --- a/mk/intermediate.mk +++ b/mk/intermediate.mk @@ -7,5 +7,10 @@ TARGETS = all clean images clean-images deploy cleanall $(TARGETS): $(SUBDIRS) +included_from_dir := $(shell basename $(dir $(abspath $(word 1, $(MAKEFILE_LIST))))) +@eval $(included_from_dir): all + +targets_to_execute := $(filter-out $(included_from_dir), $(MAKECMDGOALS)) + $(SUBDIRS): - $(MAKE) -C $@ $(MAKECMDGOALS) + $(MAKE) -C $@ $(targets_to_execute) From c9c1ad02fe378fc45a322184b2a83f8943c5e0d2 Mon Sep 17 00:00:00 2001 From: Gregory Shimansky Date: Thu, 11 Jul 2019 11:42:29 -0500 Subject: [PATCH 02/24] Added receive functionality to packet generator --- examples/nffPktgen/gtp-u/trafficgen.go | 1 + examples/nffPktgen/testing/perfTest.go | 101 +++++++++++++++++++++++- examples/nffPktgen/testing/testnat.json | 32 ++++++++ 3 files changed, 132 insertions(+), 2 deletions(-) create mode 100644 examples/nffPktgen/testing/testnat.json diff --git a/examples/nffPktgen/gtp-u/trafficgen.go b/examples/nffPktgen/gtp-u/trafficgen.go index 2f55de0e..a70f676e 100644 --- a/examples/nffPktgen/gtp-u/trafficgen.go +++ b/examples/nffPktgen/gtp-u/trafficgen.go @@ -192,6 +192,7 @@ func initPortFlows(port *IpPort, myIPs generator.AddrRange, addEncapsulation boo flow.CheckFatal(flow.SetSender(outFlow, uint16(port.Index))) // Input flow inFlow, err := flow.SetReceiver(port.Index) + flow.CheckFatal(err) flow.CheckFatal(flow.SetHandlerDrop(inFlow, receiveHandler, hc)) flow.CheckFatal(flow.SetStopper(inFlow)) } diff --git a/examples/nffPktgen/testing/perfTest.go b/examples/nffPktgen/testing/perfTest.go index abd7d390..2d922480 100644 --- a/examples/nffPktgen/testing/perfTest.go +++ b/examples/nffPktgen/testing/perfTest.go @@ -1,4 +1,4 @@ -// Copyright 2018 Intel Corporation. +// Copyright 2018-2019 Intel Corporation. // Use of this source code is governed by a BSD-style // license that can be found in the LICENSE file. @@ -7,22 +7,55 @@ package main import ( "flag" "fmt" + "os" + "os/signal" + "sync/atomic" + "time" + "github.com/intel-go/nff-go/examples/nffPktgen/generator" "github.com/intel-go/nff-go/flow" + "github.com/intel-go/nff-go/packet" ) +type IpPort struct { + Index uint16 + packetCount uint64 + bytesCount uint64 +} + +type HandlerContext struct { + port *IpPort +} + +func (hc HandlerContext) Copy() interface{} { + return HandlerContext{ + port: hc.port, + } +} + +func (hc HandlerContext) Delete() { +} + func main() { var ( speed uint64 genConfig, cores string port uint + receive bool ) flag.Uint64Var(&speed, "speed", 120000000, "speed of fast generator, Pkts/s") flag.StringVar(&genConfig, "config", "ip4.json", "specifies config for generator") flag.StringVar(&cores, "cores", "0-2", "specifies cores") flag.UintVar(&port, "port", 1, "specifies output port") + flag.BoolVar(&receive, "receive", false, "Receive packets back and print statistics") + testTime := flag.Uint("t", 30, "run generator for specified period of time in seconds, use zero to run forever") + statInterval := flag.Uint("s", 2, "statistics update interval in seconds, use zero to disable it") flag.Parse() + portStats := IpPort{ + Index: uint16(port), + } + // Init NFF-GO system at 16 available cores config := flow.Config{ CPUList: cores, @@ -37,5 +70,69 @@ func main() { flow.CheckFatal(err) outFlow, _, _ := flow.SetFastGenerator(generator.Generate, speed, context) flow.CheckFatal(flow.SetSender(outFlow, uint16(port))) - flow.CheckFatal(flow.SystemStart()) + if receive { + hc := HandlerContext{ + port: &portStats, + } + inFlow, err := flow.SetReceiver(uint16(port)) + flow.CheckFatal(err) + flow.CheckFatal(flow.SetHandlerDrop(inFlow, receiveHandler, hc)) + flow.CheckFatal(flow.SetStopper(inFlow)) + } + + go func() { + flow.CheckFatal(flow.SystemStart()) + }() + + // Set up finish channels + interruptChannel := make(chan os.Signal, 1) + signal.Notify(interruptChannel, os.Interrupt) + + var finishChannel, statsChannel <-chan time.Time + if *testTime > 0 { + finishChannel = time.NewTicker(time.Duration(*testTime) * time.Second).C + } + if receive && *statInterval > 0 { + statsChannel = time.NewTicker(time.Duration(*statInterval) * time.Second).C + } + + started := time.Now() +out: + for { + select { + case sig := <-interruptChannel: + fmt.Printf("Received signal %v, finishing.\n", sig) + break out + case <-finishChannel: + fmt.Println("Test timeout reached") + break out + case <-statsChannel: + printStats(&portStats, started) + } + } + if receive { + printStats(&portStats, started) + printTotals(&portStats, started) + } +} + +func receiveHandler(pkt *packet.Packet, ctx flow.UserContext) bool { + hc := ctx.(HandlerContext) + + atomic.AddUint64(&hc.port.packetCount, 1) + atomic.AddUint64(&hc.port.bytesCount, uint64(pkt.GetPacketLen())) + return true +} + +func printStats(port *IpPort, started time.Time) { + fmt.Printf("%v: ", time.Since(started)) + fmt.Printf("Port %d received %vpkts, %vkB\n", port.Index, port.packetCount, port.bytesCount/1000) +} + +func printTotals(port *IpPort, started time.Time) { + runtime := time.Since(started) + runtimeint := uint64(runtime) / uint64(time.Second) + fmt.Printf("\nTest executed for %v\n", runtime) + fmt.Printf("Port %d pkts/s: %v\n", port.Index, port.packetCount/runtimeint) + fmt.Printf("Port %d kB/s: %v\n", port.Index, port.bytesCount/runtimeint/1000) } diff --git a/examples/nffPktgen/testing/testnat.json b/examples/nffPktgen/testing/testnat.json new file mode 100644 index 00000000..57112289 --- /dev/null +++ b/examples/nffPktgen/testing/testnat.json @@ -0,0 +1,32 @@ +{ + "ether": { + "saddr": "00:25:96:FF:FE:12", + "daddr": "00:FF:96:FF:FE:12", + "ipv4": { + "saddr": { + "range": { + "min": "192.16.0.0", + "max": "192.16.0.255" + } + }, + "daddr": { + "range": { + "min": "172.16.0.0", + "max": "172.16.0.10" + } + }, + "udp": { + "sport": { + "range": { + "min": 1120, + "max": 1152 + } + }, + "dport": 1020, + "raw": { + "data": "1234567890" + } + } + } + } +} From 108a6fd23740225d7fc898f2f66b0e5752c5f522 Mon Sep 17 00:00:00 2001 From: Gregory Shimansky Date: Fri, 12 Jul 2019 07:34:29 -0700 Subject: [PATCH 03/24] Fixed #629 - renamed ACLFromORIG functions to ACLFromTextTable --- README.md | 2 +- examples/errorHandling.go | 6 +++--- examples/firewall/firewall.go | 2 +- examples/forwarding/forwarding.go | 2 +- examples/tutorial/step05.go | 2 +- examples/tutorial/step06.go | 2 +- examples/tutorial/step07.go | 4 ++-- examples/tutorial/step08.go | 4 ++-- examples/tutorial/step09.go | 4 ++-- examples/tutorial/step10.go | 4 ++-- examples/tutorial/step11.go | 4 ++-- packet/acl.go | 10 +++++----- packet/acl_internal_test.go | 20 +++++++++---------- .../testSingleWorkingFF.go | 6 +++--- test/stash/forwardingTestL3.go | 2 +- 15 files changed, 37 insertions(+), 37 deletions(-) diff --git a/README.md b/README.md index 15d2ee3d..4065f9bc 100644 --- a/README.md +++ b/README.md @@ -35,7 +35,7 @@ func main() { flow.CheckFatal(flow.SystemInit(&config)) // Get filtering rules from access control file. - L3Rules, err := packet.GetL3ACLFromORIG("Firewall.conf") + L3Rules, err := packet.GetL3ACLFromTextTable("Firewall.conf") flow.CheckFatal(err) // Receive packets from zero port. Receive queue will be added automatically. diff --git a/examples/errorHandling.go b/examples/errorHandling.go index 26d810cf..aefff719 100644 --- a/examples/errorHandling.go +++ b/examples/errorHandling.go @@ -49,11 +49,11 @@ func main() { } // Get splitting rules from access control file. - l3Rules, err = packet.GetL3ACLFromORIG("wrong.conf") - fmt.Printf("error from GetL3ACLFromORIG was: %+v\n", err) + l3Rules, err = packet.GetL3ACLFromTextTable("wrong.conf") + fmt.Printf("error from GetL3ACLFromTextTable was: %+v\n", err) if common.GetNFErrorCode(err) == common.FileErr { fmt.Printf("changing file name\n") - l3Rules, err = packet.GetL3ACLFromORIG("Forwarding.conf") + l3Rules, err = packet.GetL3ACLFromTextTable("Forwarding.conf") CheckFatal(err) } diff --git a/examples/firewall/firewall.go b/examples/firewall/firewall.go index 30bb9ecc..bf1cf990 100644 --- a/examples/firewall/firewall.go +++ b/examples/firewall/firewall.go @@ -29,7 +29,7 @@ func main() { flow.CheckFatal(flow.SystemInit(&config)) // Get filtering rules from access control file. - l3Rules, err = packet.GetL3ACLFromORIG("firewall.conf") + l3Rules, err = packet.GetL3ACLFromTextTable("firewall.conf") flow.CheckFatal(err) // Receive packets from zero port. Receive queue will be added automatically. diff --git a/examples/forwarding/forwarding.go b/examples/forwarding/forwarding.go index d747d96e..cfbd18f6 100644 --- a/examples/forwarding/forwarding.go +++ b/examples/forwarding/forwarding.go @@ -37,7 +37,7 @@ func main() { flow.CheckFatal(flow.SystemInit(&config)) // Get splitting rules from access control file. - l3Rules, err = packet.GetL3ACLFromORIG("forwarding.conf") + l3Rules, err = packet.GetL3ACLFromTextTable("forwarding.conf") flow.CheckFatal(err) // Receive packets from zero port. Receive queue will be added automatically. diff --git a/examples/tutorial/step05.go b/examples/tutorial/step05.go index f2044241..2e1abdf8 100644 --- a/examples/tutorial/step05.go +++ b/examples/tutorial/step05.go @@ -11,7 +11,7 @@ func main() { initCommonState() - l3Rules, err = packet.GetL3ACLFromORIG("rules1.conf") + l3Rules, err = packet.GetL3ACLFromTextTable("rules1.conf") flow.CheckFatal(err) firstFlow, err := flow.SetReceiver(0) diff --git a/examples/tutorial/step06.go b/examples/tutorial/step06.go index 1fe2f99b..74fbff0a 100644 --- a/examples/tutorial/step06.go +++ b/examples/tutorial/step06.go @@ -11,7 +11,7 @@ func main() { initCommonState() - l3Rules, err = packet.GetL3ACLFromORIG("rules1.conf") + l3Rules, err = packet.GetL3ACLFromTextTable("rules1.conf") flow.CheckFatal(err) firstFlow, err := flow.SetReceiver(0) diff --git a/examples/tutorial/step07.go b/examples/tutorial/step07.go index 7fd1ae89..1cd076cf 100644 --- a/examples/tutorial/step07.go +++ b/examples/tutorial/step07.go @@ -13,7 +13,7 @@ func main() { initCommonState() - l3Rules, err := packet.GetL3ACLFromORIG("rules1.conf") + l3Rules, err := packet.GetL3ACLFromTextTable("rules1.conf") flow.CheckFatal(err) rulesp = unsafe.Pointer(&l3Rules) go updateSeparateRules() @@ -37,7 +37,7 @@ func mySeparator(cur *packet.Packet, ctx flow.UserContext) bool { func updateSeparateRules() { for { time.Sleep(time.Second * 5) - locall3Rules, err := packet.GetL3ACLFromORIG("rules1.conf") + locall3Rules, err := packet.GetL3ACLFromTextTable("rules1.conf") flow.CheckFatal(err) atomic.StorePointer(&rulesp, unsafe.Pointer(locall3Rules)) } diff --git a/examples/tutorial/step08.go b/examples/tutorial/step08.go index 66de6ce7..093620f2 100644 --- a/examples/tutorial/step08.go +++ b/examples/tutorial/step08.go @@ -13,7 +13,7 @@ func main() { initCommonState() - l3Rules, err := packet.GetL3ACLFromORIG("rules2.conf") + l3Rules, err := packet.GetL3ACLFromTextTable("rules2.conf") flow.CheckFatal(err) rulesp = unsafe.Pointer(&l3Rules) go updateSeparateRules() @@ -38,7 +38,7 @@ func mySplitter(cur *packet.Packet, ctx flow.UserContext) uint { func updateSeparateRules() { for { time.Sleep(time.Second * 5) - locall3Rules, err := packet.GetL3ACLFromORIG("rules2.conf") + locall3Rules, err := packet.GetL3ACLFromTextTable("rules2.conf") flow.CheckFatal(err) atomic.StorePointer(&rulesp, unsafe.Pointer(locall3Rules)) } diff --git a/examples/tutorial/step09.go b/examples/tutorial/step09.go index 4442bb4c..3b4b18bf 100644 --- a/examples/tutorial/step09.go +++ b/examples/tutorial/step09.go @@ -14,7 +14,7 @@ func main() { initCommonState() - l3Rules, err := packet.GetL3ACLFromORIG("rules2.conf") + l3Rules, err := packet.GetL3ACLFromTextTable("rules2.conf") flow.CheckFatal(err) rulesp = unsafe.Pointer(&l3Rules) go updateSeparateRules() @@ -49,7 +49,7 @@ func myHandler(cur *packet.Packet, ctx flow.UserContext) { func updateSeparateRules() { for { time.Sleep(time.Second * 5) - locall3Rules, err := packet.GetL3ACLFromORIG("rules2.conf") + locall3Rules, err := packet.GetL3ACLFromTextTable("rules2.conf") flow.CheckFatal(err) atomic.StorePointer(&rulesp, unsafe.Pointer(locall3Rules)) } diff --git a/examples/tutorial/step10.go b/examples/tutorial/step10.go index 88be669b..b7f6c0d2 100644 --- a/examples/tutorial/step10.go +++ b/examples/tutorial/step10.go @@ -16,7 +16,7 @@ func main() { initCommonState() - l3Rules, err := packet.GetL3ACLFromORIG("rules2.conf") + l3Rules, err := packet.GetL3ACLFromTextTable("rules2.conf") flow.CheckFatal(err) rulesp = unsafe.Pointer(&l3Rules) go updateSeparateRules() @@ -56,7 +56,7 @@ func myHandler(curV []*packet.Packet, mask *[vecSize]bool, ctx flow.UserContext) func updateSeparateRules() { for { time.Sleep(time.Second * 5) - locall3Rules, err := packet.GetL3ACLFromORIG("rules2.conf") + locall3Rules, err := packet.GetL3ACLFromTextTable("rules2.conf") flow.CheckFatal(err) atomic.StorePointer(&rulesp, unsafe.Pointer(locall3Rules)) } diff --git a/examples/tutorial/step11.go b/examples/tutorial/step11.go index a619e74f..93f8782e 100644 --- a/examples/tutorial/step11.go +++ b/examples/tutorial/step11.go @@ -16,7 +16,7 @@ func main() { initCommonState() - l3Rules, err := packet.GetL3ACLFromORIG("rules2.conf") + l3Rules, err := packet.GetL3ACLFromTextTable("rules2.conf") flow.CheckFatal(err) rulesp = unsafe.Pointer(&l3Rules) go updateSeparateRules() @@ -63,7 +63,7 @@ func heavyCode() { func updateSeparateRules() { for { time.Sleep(time.Second * 5) - locall3Rules, err := packet.GetL3ACLFromORIG("rules2.conf") + locall3Rules, err := packet.GetL3ACLFromTextTable("rules2.conf") flow.CheckFatal(err) atomic.StorePointer(&rulesp, unsafe.Pointer(locall3Rules)) } diff --git a/packet/acl.go b/packet/acl.go index 3a01dbca..bae1baec 100644 --- a/packet/acl.go +++ b/packet/acl.go @@ -14,7 +14,7 @@ // Three such functions are provided: // GetL2ACLFromJSON // GetL3ACLFromJSON -// GetL3ACLFromORIG +// GetL3ACLFromTextTable // GetL2RulesFromORIG function for L2 level is not added yet. TODO // These functions should be used before any usage of rules, however they also can be // used dynamically in parallel which make a possibility of changing rules during execution. @@ -82,9 +82,9 @@ func GetL2ACLFromJSON(filename string) (*L2Rules, error) { return &rules, rawL2Parse(&rawRules, &rules) } -// GetL2ACLFromORIG gets name of fields structed file with combined L2 rules, +// GetL2ACLFromTextTable gets name of fields structed file with combined L2 rules, // returns L2Rules -func GetL2ACLFromORIG(filename string) (*L2Rules, error) { +func GetL2ACLFromTextTable(filename string) (*L2Rules, error) { var rawRules rawL2Rules var rules L2Rules // Load Rules @@ -143,9 +143,9 @@ func readFile(filename string) ([]byte, error) { //TODO we need to construct raw structure as one after one without storing all them in memory -// GetL3ACLFromORIG gets name of fields structed file with combined L3 and L4 rules, +// GetL3ACLFromTextTable gets name of fields structed file with combined L3 and L4 rules, // returns L3Rules -func GetL3ACLFromORIG(filename string) (*L3Rules, error) { +func GetL3ACLFromTextTable(filename string) (*L3Rules, error) { var rawRules rawL3Rules var rules L3Rules // Load Rules diff --git a/packet/acl_internal_test.go b/packet/acl_internal_test.go index 86e7d0c8..f827e42d 100644 --- a/packet/acl_internal_test.go +++ b/packet/acl_internal_test.go @@ -244,10 +244,10 @@ func TestGetL2ACLFromJSON(t *testing.T) { // Function to test L2 rules parser for ORIG format // L2 rules generated, written to ORIG format, then parsed -func TestGetL2ACLFromORIG(t *testing.T) { +func TestGetL2ACLFromTextTable(t *testing.T) { testTable := generateTestL2Rules(rulesL2Ctxt, true) - tmpdir := createTmpDir("tmpTestGetL2ACLFromORIGConfigs") + tmpdir := createTmpDir("tmpTestGetL2ACLFromTextTableConfigs") for _, rule := range testTable { headerORIG := "# Source MAC, Destination MAC, L3 ID, Output port\n" @@ -259,9 +259,9 @@ func TestGetL2ACLFromORIG(t *testing.T) { log.Fatal(err) } - ruleGot, err := GetL2ACLFromORIG(tmpfile.Name()) + ruleGot, err := GetL2ACLFromTextTable(tmpfile.Name()) if err != nil { - t.Errorf("GetL2ACLFromORIG returned error %v\n\n", err) + t.Errorf("GetL2ACLFromTextTable returned error %v\n\n", err) t.FailNow() } @@ -432,11 +432,11 @@ func TestGetL3ACLFromJSON(t *testing.T) { // Function to test L3 rules parser for ORIG format // L3 rules generated, written to ORIG format, then parsed -func TestGetL3ACLFromORIG(t *testing.T) { +func TestGetL3ACLFromTextTable(t *testing.T) { // Generate IPv4 rules testTable4 := generateTestL3Rules(rulesL3Ctxt, true, true, false) - tmpdir := createTmpDir("tmpGetL3ACLFromORIGConfigs") + tmpdir := createTmpDir("tmpGetL3ACLFromTextTableConfigs") for _, rule := range testTable4 { // Create and parse ORIG file @@ -449,9 +449,9 @@ func TestGetL3ACLFromORIG(t *testing.T) { log.Fatal(err) } - ruleGot, err := GetL3ACLFromORIG(tmpfile.Name()) + ruleGot, err := GetL3ACLFromTextTable(tmpfile.Name()) if err != nil { - t.Errorf("GetL3ACLFromORIG returned error %s\n\n", err) + t.Errorf("GetL3ACLFromTextTable returned error %s\n\n", err) t.FailNow() } @@ -479,9 +479,9 @@ func TestGetL3ACLFromORIG(t *testing.T) { log.Fatal(err) } - ruleGot, err := GetL3ACLFromORIG(tmpfile.Name()) + ruleGot, err := GetL3ACLFromTextTable(tmpfile.Name()) if err != nil { - t.Errorf("GetL3ACLFromORIG returned error %s\n\n", err) + t.Errorf("GetL3ACLFromTextTable returned error %s\n\n", err) t.FailNow() } diff --git a/test/stability/testSingleWorkingFF/testSingleWorkingFF.go b/test/stability/testSingleWorkingFF/testSingleWorkingFF.go index 0b485d5c..0d0ff9d3 100644 --- a/test/stability/testSingleWorkingFF/testSingleWorkingFF.go +++ b/test/stability/testSingleWorkingFF/testSingleWorkingFF.go @@ -266,14 +266,14 @@ func setParameters(testScenario uint) (err error) { switch gTestType { case separate, vseparate: if testScenario != generatePart { - l3Rules, err = packet.GetL3ACLFromORIG("test-separate-l3rules.conf") + l3Rules, err = packet.GetL3ACLFromTextTable("test-separate-l3rules.conf") } // Test expects to receive 33% of packets on 0 port and 66% on 1 port lessPercent = 33 shift = 1 case split, vsplit: if testScenario != generatePart { - l3Rules, err = packet.GetL3ACLFromORIG("test-split.conf") + l3Rules, err = packet.GetL3ACLFromTextTable("test-split.conf") } // Test expects to receive 20% of packets on 0 port and 80% on 1 port lessPercent = 20 @@ -290,7 +290,7 @@ func setParameters(testScenario uint) (err error) { shift = 3 case dhandle, vdhandle: if testScenario != generatePart { - l3Rules, err = packet.GetL3ACLFromORIG("test-handle-l3rules.conf") + l3Rules, err = packet.GetL3ACLFromTextTable("test-handle-l3rules.conf") } // Test expects to receive 100% of packets on 0 port and 0% on 1 port (33% total) lessPercent = 100 diff --git a/test/stash/forwardingTestL3.go b/test/stash/forwardingTestL3.go index 87d7937e..e2cdce7c 100644 --- a/test/stash/forwardingTestL3.go +++ b/test/stash/forwardingTestL3.go @@ -47,7 +47,7 @@ func main() { l3Rules, err = packet.GetL3ACLFromJSON("forwardingTestL3_ACL.json") flow.CheckFatal(err) case "orig": - l3Rules, err = packet.GetL3ACLFromORIG("forwardingTestL3_ACL.orig") + l3Rules, err = packet.GetL3ACLFromTextTable("forwardingTestL3_ACL.orig") flow.CheckFatal(err) } From a13233f4b57f3f0b1fcca45f666456a986982663 Mon Sep 17 00:00:00 2001 From: Gregory Shimansky Date: Tue, 16 Jul 2019 06:03:25 -0500 Subject: [PATCH 04/24] Updated testnat file to match data size in ixia --- examples/nffPktgen/testing/testnat.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/examples/nffPktgen/testing/testnat.json b/examples/nffPktgen/testing/testnat.json index 57112289..6933bae5 100644 --- a/examples/nffPktgen/testing/testnat.json +++ b/examples/nffPktgen/testing/testnat.json @@ -24,7 +24,7 @@ }, "dport": 1020, "raw": { - "data": "1234567890" + "data": "123456789012345678" } } } From 5f8dfa64a434b7173117cb7f7ac684534d40acc0 Mon Sep 17 00:00:00 2001 From: dhenza Date: Tue, 13 Aug 2019 22:44:04 +0200 Subject: [PATCH 05/24] Updated DecreaseTTL Function description --- packet/mpls.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packet/mpls.go b/packet/mpls.go index 3c0518d7..bdd8996a 100644 --- a/packet/mpls.go +++ b/packet/mpls.go @@ -100,7 +100,7 @@ func (packet *Packet) RemoveMPLS() bool { return true } -// SetMPLSLabel sets Label (20 first bits of MPLS header to specified value). +// DecreaseTTL decreases the MPLS header TTL by 1. func (hdr *MPLSHdr) DecreaseTTL() bool { newTime := SwapBytesUint32(hdr.mpls)&0x000000ff - 1 if newTime == 0 { From b3ae8e79c8832911b050b08951ea227f74e6a5d3 Mon Sep 17 00:00:00 2001 From: Gregory Shimansky Date: Thu, 15 Aug 2019 10:37:25 -0700 Subject: [PATCH 06/24] Updated NFF-GO to DPDK 19.08 --- dpdk/dpdk | 2 +- dpdk/pktgen-dpdk | 2 +- internal/low/low.go | 2 +- internal/low/low.h | 10 +++++----- 4 files changed, 8 insertions(+), 8 deletions(-) diff --git a/dpdk/dpdk b/dpdk/dpdk index 07efd6dd..b81660d0 160000 --- a/dpdk/dpdk +++ b/dpdk/dpdk @@ -1 +1 @@ -Subproject commit 07efd6ddc0499688eb11ae4866d3532295d6db2b +Subproject commit b81660d09e9caa928ab1c683cd0fbfbe2439ac3d diff --git a/dpdk/pktgen-dpdk b/dpdk/pktgen-dpdk index ae5a88bf..7a4e0bcc 160000 --- a/dpdk/pktgen-dpdk +++ b/dpdk/pktgen-dpdk @@ -1 +1 @@ -Subproject commit ae5a88bf89e421ea84d680a7e5613af959243a4b +Subproject commit 7a4e0bcc7c6310dfd244c3a063769d9f774530c3 diff --git a/internal/low/low.go b/internal/low/low.go index 9ef31035..f5f13869 100644 --- a/internal/low/low.go +++ b/internal/low/low.go @@ -84,7 +84,7 @@ func CheckRSSPacketCount(p *Port, queue int16) int64 { // GetPortMACAddress gets MAC address of given port. func GetPortMACAddress(port uint16) [types.EtherAddrLen]uint8 { var mac [types.EtherAddrLen]uint8 - var cmac C.struct_ether_addr + var cmac C.struct_rte_ether_addr C.rte_eth_macaddr_get(C.uint16_t(port), &cmac) for i := range mac { diff --git a/internal/low/low.h b/internal/low/low.h index af656d0b..ac27113d 100644 --- a/internal/low/low.h +++ b/internal/low/low.h @@ -214,7 +214,7 @@ int create_kni(uint16_t port, uint32_t core, char *name, struct rte_mempool *mbu conf_default.id = pci_dev->id; } conf_default.force_bind = 1; // Flag to bind kernel thread - rte_eth_macaddr_get(port, (struct ether_addr *)&conf_default.mac_addr); + rte_eth_macaddr_get(port, (struct rte_ether_addr *)&conf_default.mac_addr); rte_eth_dev_get_mtu(port, &conf_default.mtu); struct rte_kni_ops ops; @@ -280,7 +280,7 @@ int port_init(uint16_t port, bool willReceive, struct rte_mempool **mbuf_pools, return -1; struct rte_eth_conf port_conf_default = { - .rxmode = { .max_rx_pkt_len = ETHER_MAX_LEN, + .rxmode = { .max_rx_pkt_len = RTE_ETHER_MAX_LEN, .mq_mode = ETH_MQ_RX_RSS }, .txmode = { .mq_mode = ETH_MQ_TX_NONE, }, .rx_adv_conf.rss_conf.rss_key = NULL, @@ -424,12 +424,12 @@ struct rte_ip_frag_tbl* create_reassemble_table() { __attribute__((always_inline)) static inline struct rte_mbuf* reassemble(struct rte_ip_frag_tbl* tbl, struct rte_mbuf *buf, struct rte_ip_frag_death_row* death_row, uint64_t cur_tsc) { - struct ether_hdr *eth_hdr = rte_pktmbuf_mtod(buf, struct ether_hdr *); + struct rte_ether_hdr *eth_hdr = rte_pktmbuf_mtod(buf, struct rte_ether_hdr *); // TODO packet_type is not mandatory required for drivers. // Some drivers won't set it. However this is DPDK implementation. if (RTE_ETH_IS_IPV4_HDR(buf->packet_type)) { // if packet is IPv4 - struct ipv4_hdr *ip_hdr = (struct ipv4_hdr *)(eth_hdr + 1); + struct rte_ipv4_hdr *ip_hdr = (struct rte_ipv4_hdr *)(eth_hdr + 1); if (rte_ipv4_frag_pkt_is_fragmented(ip_hdr)) { // try to reassemble buf->l2_len = sizeof(*eth_hdr); // prepare mbuf: setup l2_len/l3_len. @@ -440,7 +440,7 @@ static inline struct rte_mbuf* reassemble(struct rte_ip_frag_tbl* tbl, struct rt } } if (RTE_ETH_IS_IPV6_HDR(buf->packet_type)) { // if packet is IPv6 - struct ipv6_hdr *ip_hdr = (struct ipv6_hdr *)(eth_hdr + 1); + struct rte_ipv6_hdr *ip_hdr = (struct rte_ipv6_hdr *)(eth_hdr + 1); struct ipv6_extension_fragment *frag_hdr = rte_ipv6_frag_get_ipv6_fragment_header(ip_hdr); if (frag_hdr != NULL) { From 064ceded4ebf60a6ab801a353bfa73a73182ecd2 Mon Sep 17 00:00:00 2001 From: Gregory Shimansky Date: Tue, 20 Aug 2019 09:47:15 -0500 Subject: [PATCH 07/24] Fixed #639 with proposed patch to getting device ID --- devices/misc.go | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/devices/misc.go b/devices/misc.go index ee6b14e0..e52afb64 100644 --- a/devices/misc.go +++ b/devices/misc.go @@ -40,12 +40,22 @@ func GetDeviceID(nicName string) (string, error) { return "", err } // raw should be like /sys/devices/pci0002:00/0000:00:08.0/virtio2/net/ens8 + // or /sys/devices/pci0000:00/0000:00:01.0/0000:03:00.2/net/ens4f2 raws := strings.Split(raw, "/") if len(raws) < 5 { return "", fmt.Errorf("path not correct") } - return raws[4], nil + // search and validate deviceID + for idx := len(raws) - 1; idx >= 0; idx-- { + v := strings.Split(raws[idx], ":") + if len(v) == 3 { + if len(v[0]) == 4 && len(v[1]) == 2 && len(v[2]) == 4 { + return raws[idx], nil + } + } + } + return "", fmt.Errorf("path not correct") } // IsModuleLoaded checks if the kernel has already loaded the driver or not. From acef03c22e18cca18ca5e7650dc432d934526849 Mon Sep 17 00:00:00 2001 From: agadiyar Date: Wed, 21 Aug 2019 12:19:40 -0500 Subject: [PATCH 08/24] Binary Search Generator example added --- examples/nffPktgen/testing/minPktLoss.go | 214 +++++++++++++++++++++++ 1 file changed, 214 insertions(+) create mode 100644 examples/nffPktgen/testing/minPktLoss.go diff --git a/examples/nffPktgen/testing/minPktLoss.go b/examples/nffPktgen/testing/minPktLoss.go new file mode 100644 index 00000000..aac95eb1 --- /dev/null +++ b/examples/nffPktgen/testing/minPktLoss.go @@ -0,0 +1,214 @@ +// Copyright 2018-2019 Intel Corporation. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package main + +import ( + "flag" + "fmt" + "os" + "os/signal" + "sync/atomic" + "time" + + "github.com/intel-go/nff-go/examples/nffPktgen/generator" + "github.com/intel-go/nff-go/flow" + "github.com/intel-go/nff-go/packet" +) + +// TEST PARAMETERS: +const TOP_SPEED uint64 = 120000000 /*(pkt/s)*/ +const TGT_LOSS float64 = 0.005 /*(0.5% target packet loss)*/ +const TRAFFIC_DELAY uint = 3 /*(sec)*/ + +type IpPort struct { + Index uint16 + packetCount uint64 + bytesCount uint64 +} + +type HandlerContext struct { + port *IpPort +} + +func (hc HandlerContext) Copy() interface{} { + return HandlerContext{ + port: hc.port, + } +} + +func (hc HandlerContext) Delete() { +} + +func main() { + var ( + speed uint64 + genConfig, cores string + inPort uint + outPort uint + receive bool + ) + flag.Uint64Var(&speed, "speed", TOP_SPEED, "speed of fast generator, Pkts/s") + flag.StringVar(&genConfig, "config", "ip4.json", "specifies config for generator") + flag.StringVar(&cores, "cores", "", "specifies cores") + flag.UintVar(&outPort, "outPort", 1, "specifies output port") + flag.UintVar(&inPort, "inPort", 1, "specifices input port") + flag.BoolVar(&receive, "receive", true, "Receive packets back and print statistics") + testTime := flag.Uint("t", 0, "run generator for specified period of time in seconds, use zero to run forever") + statInterval := flag.Uint("s", 0, "statistics update interval in seconds, use zero to disable it") + flag.Parse() + + + portStats := IpPort { + Index: uint16(inPort), + } + + // Init NFF-GO system at 16 available cores + config := flow.Config { + CPUList: cores, + } + flow.CheckFatal(flow.SystemInit(&config)) + + configuration, err := generator.ReadConfig(genConfig) + if err != nil { + panic(fmt.Sprintf("%s config reading failed: %v", genConfig, err)) + } + context, err := generator.GetContext(configuration) + flow.CheckFatal(err) + outFlow, genChan, _ := flow.SetFastGenerator(generator.Generate, speed, context) + flow.CheckFatal(flow.SetSender(outFlow, uint16(outPort))) + if receive { + hc := HandlerContext { + port: &portStats, + } + inFlow, err := flow.SetReceiver(uint16(inPort)) + flow.CheckFatal(err) + flow.CheckFatal(flow.SetHandlerDrop(inFlow, receiveHandler, hc)) + flow.CheckFatal(flow.SetStopper(inFlow)) + } + + go func() { + flow.CheckFatal(flow.SystemStart()) + }() + + // Set up finish channels + interruptChannel := make(chan os.Signal, 1) + signal.Notify(interruptChannel, os.Interrupt) + + var finishChannel, statsChannel <-chan time.Time + if *testTime > 0 { + finishChannel = time.NewTicker(time.Duration(*testTime) * time.Second).C + } + if receive && *statInterval > 0 { + statsChannel = time.NewTicker(time.Duration(*statInterval) * time.Second).C + } + + started := time.Now() + + // optimize speed based on packet loss: + maxSpeed := -1 + var totalPktRX, totalPktTX uint64 = 0, 0 + + go func() { + gen := generator.GetGenerator() + low := 0 + high := 100 + currSpeed := 100 + + // data at line rate: + portStats.packetCount = 0 + gen.Count = 0 + time.Sleep(time.Duration(TRAFFIC_DELAY) * time.Second) + pktRX := portStats.packetCount + pktTX := gen.GetGeneratedNumber() + pktLoss := calcPktLoss(pktRX, pktTX) + totalPktRX += pktRX + totalPktTX += pktTX + printStats(&portStats, pktRX, pktTX, pktLoss, started, currSpeed, float64(TOP_SPEED)) + + // binary search: + for low <= high { + mid := (low + high) / 2 + currSpeed = mid + updatedSpeed := 0.01 * float64(uint64(currSpeed) * TOP_SPEED) + + // reset counters: + portStats.packetCount = 0 + gen.Count = 0 + + genChan <- uint64(updatedSpeed) + time.Sleep(time.Duration(TRAFFIC_DELAY) * time.Second) + + pktRX = portStats.packetCount + pktTX = gen.GetGeneratedNumber() + pktLoss = calcPktLoss(pktRX, pktTX) + + tgt := pktLoss <= TGT_LOSS + if(tgt) { + low = mid + 1 // tgt met so try higher speed + if currSpeed > maxSpeed { + maxSpeed = currSpeed + } + } else { + high = mid - 1 // tgt failed so try lower speed + } + + totalPktRX += pktRX + totalPktTX += pktTX + //fmt.Printf("\nmid: %d\nupdatedSpeed: %d%%(%f pkt/s)\npktCountTX: %d\npktCountRX: %d\npktLoss: %f\nlow: %d\nhigh: %d\n\n", mid, currSpeed, updatedSpeed, pktTX, pktRX, pktLoss, low, high) // debugging purposes + printStats(&portStats, pktRX, pktTX, pktLoss, started, currSpeed, updatedSpeed) + } + fmt.Printf("-----------------------------------------------------------BINARY SEARCH COMPLETE!!!------------------------------------------------------------\n") + interruptChannel <- os.Interrupt + }() + + +out: + for { + select { + case sig := <-interruptChannel: + fmt.Printf("Received signal %v, finishing.\n", sig) + break out + case <-finishChannel: + fmt.Println("Test timeout reached") + break out + case <-statsChannel: + //printStats(&portStats, totalPktRX, totalPktTX, calcPktLoss(totalPktRX, totalPktTX), started, maxSpeed, /*pkt/s*/) + } + } + if receive { + printTotals(&portStats, totalPktRX, totalPktTX, started, maxSpeed) + } +} + +func calcPktLoss(pktCountRX uint64, pktCountTX uint64) float64 { + return (float64(pktCountTX - pktCountRX) / float64(pktCountTX)) +} + +func receiveHandler(pkt *packet.Packet, ctx flow.UserContext) bool { + hc := ctx.(HandlerContext) + + atomic.AddUint64(&hc.port.packetCount, 1) + atomic.AddUint64(&hc.port.bytesCount, uint64(pkt.GetPacketLen())) + return true +} + +func printStats(port *IpPort, pktRX uint64, pktTX uint64, pktLoss float64, started time.Time, currSpeed int, updatedSpeed float64) { + fmt.Printf("\n%v: ", time.Since(started)) + fmt.Printf("Port %d pkts TX: %d / pkts RX: %d\n", port.Index, pktTX, pktRX) + fmt.Printf("Pkt loss: %f\n", pktLoss) + fmt.Printf("Current Speed: %d%% (%f pkts/s)\n\n", currSpeed, updatedSpeed) +} + +func printTotals(port *IpPort, totalPktRX uint64, totalPktTX uint64, started time.Time, maxSpeed int) { + runtime := time.Since(started) + runtimeint := uint64(runtime) / uint64(time.Second) + totalPktLoss := calcPktLoss(totalPktRX, totalPktTX) + fmt.Printf("\nTest executed for %v\n", runtime) + fmt.Printf("Port %d total pkts TX: %d / pkts RX: %d\n", port.Index, totalPktTX, totalPktRX) + fmt.Printf("Total pkt loss: %f\n", totalPktLoss) + fmt.Printf("Port %d pkts/s: %v\n", port.Index, totalPktRX/runtimeint) + fmt.Printf("Port %d kB/s: %v\n", port.Index, port.bytesCount/runtimeint/1000) + fmt.Printf("Max Speed: %d%%\n\n", maxSpeed) +} From a807e88069c377f94a1b8a8427ec1d205697532e Mon Sep 17 00:00:00 2001 From: agadiyar Date: Thu, 22 Aug 2019 11:07:31 -0500 Subject: [PATCH 09/24] Fixed issues with first commit --- examples/nffPktgen/testing/minPktLoss.go | 58 +++++++++++------------- 1 file changed, 27 insertions(+), 31 deletions(-) diff --git a/examples/nffPktgen/testing/minPktLoss.go b/examples/nffPktgen/testing/minPktLoss.go index aac95eb1..26bd10e4 100644 --- a/examples/nffPktgen/testing/minPktLoss.go +++ b/examples/nffPktgen/testing/minPktLoss.go @@ -17,11 +17,6 @@ import ( "github.com/intel-go/nff-go/packet" ) -// TEST PARAMETERS: -const TOP_SPEED uint64 = 120000000 /*(pkt/s)*/ -const TGT_LOSS float64 = 0.005 /*(0.5% target packet loss)*/ -const TRAFFIC_DELAY uint = 3 /*(sec)*/ - type IpPort struct { Index uint16 packetCount uint64 @@ -44,17 +39,19 @@ func (hc HandlerContext) Delete() { func main() { var ( speed uint64 + tgtLoss float64 + trafDelay uint genConfig, cores string inPort uint outPort uint - receive bool ) - flag.Uint64Var(&speed, "speed", TOP_SPEED, "speed of fast generator, Pkts/s") + flag.Uint64Var(&speed, "speed", 120000000, "speed of fast generator, Pkts/s") + flag.Float64Var(&tgtLoss, "target loss", 0.005, "target packet loss %, use 0.001 for 1%") + flag.UintVar(&trafDelay, "traffic delay", 3, "time delay when speed is updated, sec") flag.StringVar(&genConfig, "config", "ip4.json", "specifies config for generator") flag.StringVar(&cores, "cores", "", "specifies cores") flag.UintVar(&outPort, "outPort", 1, "specifies output port") flag.UintVar(&inPort, "inPort", 1, "specifices input port") - flag.BoolVar(&receive, "receive", true, "Receive packets back and print statistics") testTime := flag.Uint("t", 0, "run generator for specified period of time in seconds, use zero to run forever") statInterval := flag.Uint("s", 0, "statistics update interval in seconds, use zero to disable it") flag.Parse() @@ -78,15 +75,14 @@ func main() { flow.CheckFatal(err) outFlow, genChan, _ := flow.SetFastGenerator(generator.Generate, speed, context) flow.CheckFatal(flow.SetSender(outFlow, uint16(outPort))) - if receive { - hc := HandlerContext { - port: &portStats, - } - inFlow, err := flow.SetReceiver(uint16(inPort)) - flow.CheckFatal(err) - flow.CheckFatal(flow.SetHandlerDrop(inFlow, receiveHandler, hc)) - flow.CheckFatal(flow.SetStopper(inFlow)) + + hc := HandlerContext { + port: &portStats, } + inFlow, err := flow.SetReceiver(uint16(inPort)) + flow.CheckFatal(err) + flow.CheckFatal(flow.SetHandlerDrop(inFlow, receiveHandler, hc)) + flow.CheckFatal(flow.SetStopper(inFlow)) go func() { flow.CheckFatal(flow.SystemStart()) @@ -100,7 +96,7 @@ func main() { if *testTime > 0 { finishChannel = time.NewTicker(time.Duration(*testTime) * time.Second).C } - if receive && *statInterval > 0 { + if *statInterval > 0 { statsChannel = time.NewTicker(time.Duration(*statInterval) * time.Second).C } @@ -108,6 +104,7 @@ func main() { // optimize speed based on packet loss: maxSpeed := -1 + var maxSpeedPkt float64 = -1 var totalPktRX, totalPktTX uint64 = 0, 0 go func() { @@ -119,36 +116,36 @@ func main() { // data at line rate: portStats.packetCount = 0 gen.Count = 0 - time.Sleep(time.Duration(TRAFFIC_DELAY) * time.Second) + time.Sleep(time.Duration(trafDelay) * time.Second) pktRX := portStats.packetCount pktTX := gen.GetGeneratedNumber() pktLoss := calcPktLoss(pktRX, pktTX) totalPktRX += pktRX totalPktTX += pktTX - printStats(&portStats, pktRX, pktTX, pktLoss, started, currSpeed, float64(TOP_SPEED)) + printStats(&portStats, pktRX, pktTX, pktLoss, started, currSpeed, float64(speed)) // binary search: for low <= high { mid := (low + high) / 2 currSpeed = mid - updatedSpeed := 0.01 * float64(uint64(currSpeed) * TOP_SPEED) + updatedSpeed := 0.01 * float64(uint64(currSpeed) * speed) // reset counters: portStats.packetCount = 0 gen.Count = 0 genChan <- uint64(updatedSpeed) - time.Sleep(time.Duration(TRAFFIC_DELAY) * time.Second) + time.Sleep(time.Duration(trafDelay) * time.Second) pktRX = portStats.packetCount pktTX = gen.GetGeneratedNumber() pktLoss = calcPktLoss(pktRX, pktTX) - tgt := pktLoss <= TGT_LOSS - if(tgt) { + if pktLoss <= tgtLoss { low = mid + 1 // tgt met so try higher speed if currSpeed > maxSpeed { maxSpeed = currSpeed + maxSpeedPkt = updatedSpeed } } else { high = mid - 1 // tgt failed so try lower speed @@ -156,10 +153,11 @@ func main() { totalPktRX += pktRX totalPktTX += pktTX - //fmt.Printf("\nmid: %d\nupdatedSpeed: %d%%(%f pkt/s)\npktCountTX: %d\npktCountRX: %d\npktLoss: %f\nlow: %d\nhigh: %d\n\n", mid, currSpeed, updatedSpeed, pktTX, pktRX, pktLoss, low, high) // debugging purposes + // fmt.Printf("\nmid: %d\nupdatedSpeed: %d%%(%f pkt/s)\npktCountTX: %d\npktCountRX: %d\npktLoss: %f\nlow: %d\nhigh: %d\n\n", mid, currSpeed, updatedSpeed, pktTX, pktRX, pktLoss, low, high) // debugging purposes + printStats(&portStats, pktRX, pktTX, pktLoss, started, currSpeed, updatedSpeed) } - fmt.Printf("-----------------------------------------------------------BINARY SEARCH COMPLETE!!!------------------------------------------------------------\n") + fmt.Printf("--------------------BINARY SEARCH COMPLETE!!!--------------------\n") interruptChannel <- os.Interrupt }() @@ -174,12 +172,10 @@ out: fmt.Println("Test timeout reached") break out case <-statsChannel: - //printStats(&portStats, totalPktRX, totalPktTX, calcPktLoss(totalPktRX, totalPktTX), started, maxSpeed, /*pkt/s*/) + printStats(&portStats, totalPktRX, totalPktTX, calcPktLoss(totalPktRX, totalPktTX), started, maxSpeed, maxSpeedPkt) } } - if receive { - printTotals(&portStats, totalPktRX, totalPktTX, started, maxSpeed) - } + printTotals(&portStats, totalPktRX, totalPktTX, started, maxSpeed, maxSpeedPkt) } func calcPktLoss(pktCountRX uint64, pktCountTX uint64) float64 { @@ -201,7 +197,7 @@ func printStats(port *IpPort, pktRX uint64, pktTX uint64, pktLoss float64, start fmt.Printf("Current Speed: %d%% (%f pkts/s)\n\n", currSpeed, updatedSpeed) } -func printTotals(port *IpPort, totalPktRX uint64, totalPktTX uint64, started time.Time, maxSpeed int) { +func printTotals(port *IpPort, totalPktRX uint64, totalPktTX uint64, started time.Time, maxSpeed int, maxSpeedPkt float64) { runtime := time.Since(started) runtimeint := uint64(runtime) / uint64(time.Second) totalPktLoss := calcPktLoss(totalPktRX, totalPktTX) @@ -210,5 +206,5 @@ func printTotals(port *IpPort, totalPktRX uint64, totalPktTX uint64, started tim fmt.Printf("Total pkt loss: %f\n", totalPktLoss) fmt.Printf("Port %d pkts/s: %v\n", port.Index, totalPktRX/runtimeint) fmt.Printf("Port %d kB/s: %v\n", port.Index, port.bytesCount/runtimeint/1000) - fmt.Printf("Max Speed: %d%%\n\n", maxSpeed) + fmt.Printf("Max Speed: %d%% (%f pkts/s)\n\n", maxSpeed, maxSpeedPkt) } From dda5cffd9471daba8631e7c29b7ec6e1ed63dc77 Mon Sep 17 00:00:00 2001 From: agadiyar Date: Thu, 22 Aug 2019 11:11:30 -0500 Subject: [PATCH 10/24] Fixed issue with percent symbol --- examples/nffPktgen/testing/minPktLoss.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/examples/nffPktgen/testing/minPktLoss.go b/examples/nffPktgen/testing/minPktLoss.go index 26bd10e4..6b9d77dd 100644 --- a/examples/nffPktgen/testing/minPktLoss.go +++ b/examples/nffPktgen/testing/minPktLoss.go @@ -46,7 +46,7 @@ func main() { outPort uint ) flag.Uint64Var(&speed, "speed", 120000000, "speed of fast generator, Pkts/s") - flag.Float64Var(&tgtLoss, "target loss", 0.005, "target packet loss %, use 0.001 for 1%") + flag.Float64Var(&tgtLoss, "target loss", 0.005, "target packet loss percentage, use 0.001 for 1 percent") flag.UintVar(&trafDelay, "traffic delay", 3, "time delay when speed is updated, sec") flag.StringVar(&genConfig, "config", "ip4.json", "specifies config for generator") flag.StringVar(&cores, "cores", "", "specifies cores") From 9cb0fa8940fdec8c164463dca7dcbe454ec9d52b Mon Sep 17 00:00:00 2001 From: agadiyar Date: Thu, 22 Aug 2019 14:50:21 -0500 Subject: [PATCH 11/24] Made percentages more readable --- examples/nffPktgen/testing/minPktLoss.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/examples/nffPktgen/testing/minPktLoss.go b/examples/nffPktgen/testing/minPktLoss.go index 6b9d77dd..9bce7602 100644 --- a/examples/nffPktgen/testing/minPktLoss.go +++ b/examples/nffPktgen/testing/minPktLoss.go @@ -46,8 +46,8 @@ func main() { outPort uint ) flag.Uint64Var(&speed, "speed", 120000000, "speed of fast generator, Pkts/s") - flag.Float64Var(&tgtLoss, "target loss", 0.005, "target packet loss percentage, use 0.001 for 1 percent") - flag.UintVar(&trafDelay, "traffic delay", 3, "time delay when speed is updated, sec") + flag.Float64Var(&tgtLoss, "target loss", 0.5, "target packet loss percentage") + flag.UintVar(&trafDelay, "traffic delay", 3, "time delay after speed is updated, sec") flag.StringVar(&genConfig, "config", "ip4.json", "specifies config for generator") flag.StringVar(&cores, "cores", "", "specifies cores") flag.UintVar(&outPort, "outPort", 1, "specifies output port") @@ -179,7 +179,7 @@ out: } func calcPktLoss(pktCountRX uint64, pktCountTX uint64) float64 { - return (float64(pktCountTX - pktCountRX) / float64(pktCountTX)) + return (float64(pktCountTX - pktCountRX) / float64(pktCountTX)) * 100.0 } func receiveHandler(pkt *packet.Packet, ctx flow.UserContext) bool { @@ -193,7 +193,7 @@ func receiveHandler(pkt *packet.Packet, ctx flow.UserContext) bool { func printStats(port *IpPort, pktRX uint64, pktTX uint64, pktLoss float64, started time.Time, currSpeed int, updatedSpeed float64) { fmt.Printf("\n%v: ", time.Since(started)) fmt.Printf("Port %d pkts TX: %d / pkts RX: %d\n", port.Index, pktTX, pktRX) - fmt.Printf("Pkt loss: %f\n", pktLoss) + fmt.Printf("Pkt loss: %f%%\n", pktLoss) fmt.Printf("Current Speed: %d%% (%f pkts/s)\n\n", currSpeed, updatedSpeed) } @@ -203,7 +203,7 @@ func printTotals(port *IpPort, totalPktRX uint64, totalPktTX uint64, started tim totalPktLoss := calcPktLoss(totalPktRX, totalPktTX) fmt.Printf("\nTest executed for %v\n", runtime) fmt.Printf("Port %d total pkts TX: %d / pkts RX: %d\n", port.Index, totalPktTX, totalPktRX) - fmt.Printf("Total pkt loss: %f\n", totalPktLoss) + fmt.Printf("Total pkt loss: %f%%\n", totalPktLoss) fmt.Printf("Port %d pkts/s: %v\n", port.Index, totalPktRX/runtimeint) fmt.Printf("Port %d kB/s: %v\n", port.Index, port.bytesCount/runtimeint/1000) fmt.Printf("Max Speed: %d%% (%f pkts/s)\n\n", maxSpeed, maxSpeedPkt) From 90b1aa3f3af424c7f0c9e20027ee3439574bd61b Mon Sep 17 00:00:00 2001 From: Gregory Shimansky Date: Thu, 22 Aug 2019 14:08:21 -0500 Subject: [PATCH 12/24] Added AF_XDP input/output. Added ctrl-C handling --- .travis.yml | 1 + README.md | 20 +++ common/Makefile | 2 +- examples/OSforwarding.go | 18 ++- flow/flow.go | 116 ++++++++++++++++-- internal/low/Makefile | 2 +- internal/low/low.go | 26 +++- internal/low/low.h | 255 ++++++++++++++++++++++++++++++++++++++- internal/low/low_bpf.go | 12 ++ mk/include.mk | 13 ++ mk/leaf.mk | 6 - packet/Makefile | 2 +- vagrant/Vagrantfile | 3 + 13 files changed, 449 insertions(+), 27 deletions(-) create mode 100644 internal/low/low_bpf.go diff --git a/.travis.yml b/.travis.yml index 0c8b8549..c3d83821 100644 --- a/.travis.yml +++ b/.travis.yml @@ -17,6 +17,7 @@ before_script: - docker run -it -d --privileged -v /usr/src:/usr/src -v /lib/modules:/lib/modules -v /sys/devices/system/node:/sys/devices/system/node --name test-nff-go test-disco /bin/bash script: + - docker exec -i test-nff-go bash -c "git clone https://github.com/libbpf/libbpf && make -C libbpf/src all install" - docker exec -i test-nff-go go mod download - docker exec -i test-nff-go make # Build standalone examples diff --git a/README.md b/README.md index 4065f9bc..d184fce7 100644 --- a/README.md +++ b/README.md @@ -118,6 +118,26 @@ Use Go version 1.11.4 or higher. To check the version of Go, do: go version +### AF_XDP support + +AF_XDP support is enabled by default, and it requires you to install +`libbpf` package. At the time of writing Ubuntu doesn't have this +library among its packages, so it is necessary to build `libbpf` from +sources or disable AF_XDP socket support. + +To disable it set variable `NFF_GO_NO_BPF_SUPPORT` to some unempty +value. When NFF_GO is built with it, AF_XDP support is disaled and +using it results in errors. + +If you want to build `libbpf` from sources you can do it in two +different ways. +* If you are using stock Linux kernel from distribution, [download + `libbpf` from GitHub](https://github.com/libbpf/libbpf), then + execute `cd src; make; sudo make install`. +* If you build Linux kernel from sources, you can build `libbpf` from + Linux source tree using commands `cd tools/lib/bpf; make; sudo make + install install_headers`. + ## Building NFF-GO When Go compiler runs for the first time it downloads all dependent diff --git a/common/Makefile b/common/Makefile index 06a019f9..b28cbc7e 100644 --- a/common/Makefile +++ b/common/Makefile @@ -7,7 +7,7 @@ include $(PATH_TO_MK)/include.mk .PHONY: testing testing: check-pktgen - go test + go test -tags "${GO_BUILD_TAGS}" .PHONY: coverage coverage: diff --git a/examples/OSforwarding.go b/examples/OSforwarding.go index a845e160..ef1a58cd 100644 --- a/examples/OSforwarding.go +++ b/examples/OSforwarding.go @@ -10,15 +10,23 @@ import ( ) func main() { + // If you use af-xdp mode you need to configure queues: + // e.g. ethtool -N my_device flow-type tcp4 dst-port 4242 action 16 + afXDP := flag.Bool("af-xdp", false, "use af-xdp. need to use ethtool to setup queues") inport := flag.String("in", "", "device for receiver") + inQueue := flag.Int("in-queue", 16, "queue for receiver") outport := flag.String("out", "", "device for sender") flag.Parse() flow.CheckFatal(flow.SystemInit(nil)) - - inputFlow, err := flow.SetReceiverOS(*inport) - flow.CheckFatal(err) - flow.CheckFatal(flow.SetSenderOS(inputFlow, *outport)) - + if *afXDP { + inputFlow, err := flow.SetReceiverXDP(*inport, *inQueue) + flow.CheckFatal(err) + flow.CheckFatal(flow.SetSenderXDP(inputFlow, *outport)) + } else { + inputFlow, err := flow.SetReceiverOS(*inport) + flow.CheckFatal(err) + flow.CheckFatal(flow.SetSenderOS(inputFlow, *outport)) + } flow.CheckFatal(flow.SystemStart()) } diff --git a/flow/flow.go b/flow/flow.go index ffe20cd0..bf1e7e77 100644 --- a/flow/flow.go +++ b/flow/flow.go @@ -32,6 +32,7 @@ package flow import ( "net" "os" + "os/signal" "runtime" "sync/atomic" "time" @@ -48,7 +49,7 @@ var createdPorts []port var portPair map[types.IPv4Address](*port) var schedState *scheduler var vEach [10][vBurstSize]uint8 -var devices map[string]int +var ioDevices map[string]interface{} type Timer struct { t *time.Ticker @@ -176,6 +177,19 @@ func addOSReceiver(socket int, out low.Rings) { schedState.addFF("OS receiver", nil, recvOS, nil, par, nil, sendReceiveKNI, 0, &par.stats) } +type receiveXDPParameters struct { + out low.Rings + socket low.XDPSocket + stats common.RXTXStats +} + +func addXDPReceiver(socket low.XDPSocket, out low.Rings) { + par := new(receiveXDPParameters) + par.socket = socket + par.out = out + schedState.addFF("AF_XDP receiver", nil, recvXDP, nil, par, nil, sendReceiveKNI, 0, &par.stats) +} + type KNIParameters struct { in low.Rings out low.Rings @@ -269,6 +283,19 @@ func addSenderOS(socket int, in low.Rings, inIndexNumber int32) { schedState.addFF("sender OS", nil, sendOS, nil, par, nil, sendReceiveKNI, inIndexNumber, &par.stats) } +type sendXDPParameters struct { + in low.Rings + socket low.XDPSocket + stats common.RXTXStats +} + +func addSenderXDP(socket low.XDPSocket, in low.Rings, inIndexNumber int32) { + par := new(sendXDPParameters) + par.socket = socket + par.in = in + schedState.addFF("AF_XDP sender", nil, sendXDP, nil, par, nil, sendReceiveKNI, inIndexNumber, &par.stats) +} + type copyParameters struct { in low.Rings out low.Rings @@ -439,7 +466,7 @@ const reportMbits = false var sizeMultiplier uint var schedTime uint -var hwtxchecksum, hwrxpacketstimestamp bool +var hwtxchecksum, hwrxpacketstimestamp, setSIGINTHandler bool var maxRecv int type port struct { @@ -538,6 +565,11 @@ type Config struct { // Enables hardware assisted timestamps in packet mbufs. These // timestamps can be accessed with GetPacketTimestamp function. HWRXPacketsTimestamp bool + // Disable setting custom handler for SIGINT in + // SystemStartScheduler. When handler is enabled + // SystemStartScheduler waits for SIGINT notification and calls + // SystemStop after it. It is enabled by default. + NoSetSIGINTHandler bool } // SystemInit is initialization of system. This function should be always called before graph construction. @@ -665,7 +697,7 @@ func SystemInit(args *Config) error { } } portPair = make(map[types.IPv4Address](*port)) - devices = make(map[string]int) + ioDevices = make(map[string]interface{}) // Init scheduler common.LogTitle(common.Initialization, "------------***------ Initializing scheduler -----***------------") StopRing := low.CreateRings(burstSize*sizeMultiplier, maxInIndex /* Maximum possible rings */) @@ -690,6 +722,8 @@ func SystemInit(args *Config) error { vEach[i][j] = uint8(i) } } + + setSIGINTHandler = !args.NoSetSIGINTHandler return nil } @@ -723,7 +757,19 @@ func SystemStartScheduler() error { return common.WrapWithNFError(err, "scheduler start failed", common.Fail) } common.LogTitle(common.Initialization, "------------***---------- NFF-GO Started ---------***------------") - schedState.schedule(schedTime) + + if setSIGINTHandler { + signalChan := make(chan os.Signal, 1) + signal.Notify(signalChan, os.Interrupt) + go func() { + schedState.schedule(schedTime) + }() + <-signalChan + common.LogTitle(common.Debug, "Received an interrupt, stopping everything") + SystemStop() + } else { + schedState.schedule(schedTime) + } return nil } @@ -819,13 +865,14 @@ func SetReceiver(portId uint16) (OUT *Flow, err error) { // Gets name of device, will return error if can't initialize socket. // Creates RAW socket, returns new opened flow with received packets. func SetReceiverOS(device string) (*Flow, error) { - socketID, ok := devices[device] + v, ok := ioDevices[device] + socketID := v.(int) if !ok { socketID = low.InitDevice(device) if socketID == -1 { return nil, common.WrapWithNFError(nil, "Can't initialize socket", common.BadSocket) } - devices[device] = socketID + ioDevices[device] = socketID } rings := low.CreateRings(burstSize*sizeMultiplier, 1) addOSReceiver(socketID, rings) @@ -839,18 +886,57 @@ func SetSenderOS(IN *Flow, device string) error { if err := checkFlow(IN); err != nil { return err } - socketID, ok := devices[device] + v, ok := ioDevices[device] + socketID := v.(int) if !ok { socketID = low.InitDevice(device) if socketID == -1 { return common.WrapWithNFError(nil, "Can't initialize socket", common.BadSocket) } - devices[device] = socketID + ioDevices[device] = socketID } addSenderOS(socketID, finishFlow(IN), IN.inIndexNumber) return nil } +// SetReceiverXDP adds function receive from Linux AF_XDP to flow graph. +// Gets name of device and queue number, will return error if can't initialize socket. +// Creates AF_XDP socket, returns new opened flow with received packets. +func SetReceiverXDP(device string, queue int) (*Flow, error) { + _, ok := ioDevices[device] + if ok { + return nil, common.WrapWithNFError(nil, "Device shouldn't have any sockets before AF_XDP. AF_XDP Send and Receive for one device in forbidden now", common.BadSocket) + } + socketID := low.InitXDP(device, queue) + if socketID == nil { + return nil, common.WrapWithNFError(nil, "Can't initialize AF_XDP socket", common.BadSocket) + } + ioDevices[device] = socketID + rings := low.CreateRings(burstSize*sizeMultiplier, 1) + addXDPReceiver(socketID, rings) + return newFlow(rings, 1), nil +} + +// SetSenderXDP adds function send from flow graph to Linux AF_XDP interface. +// Gets name of device, will return error if can't initialize socket. +// Creates RAW socket, sends packets, closes input flow. +func SetSenderXDP(IN *Flow, device string) error { + if err := checkFlow(IN); err != nil { + return err + } + _, ok := ioDevices[device] + if ok { + return common.WrapWithNFError(nil, "Device shouldn't have any sockets before AF_XDP. AF_XDP Send and Receive for one device in forbidden now", common.BadSocket) + } + socketID := low.InitXDP(device, 0) + if socketID == nil { + return common.WrapWithNFError(nil, "Can't initialize AF_XDP socket", common.BadSocket) + } + ioDevices[device] = socketID + addSenderXDP(socketID, finishFlow(IN), IN.inIndexNumber) + return nil +} + // SetReceiverKNI adds function receive from KNI to flow graph. // Gets KNI device from which packets will be received. // Receive queue will be added to port automatically. @@ -1418,6 +1504,11 @@ func recvOS(parameters interface{}, inIndex []int32, flag *int32, coreID int) { low.ReceiveOS(srp.socket, srp.out[0], flag, coreID, &srp.stats) } +func recvXDP(parameters interface{}, inIndex []int32, flag *int32, coreID int) { + srp := parameters.(*receiveXDPParameters) + low.ReceiveXDP(srp.socket, srp.out[0], flag, coreID, &srp.stats) +} + func processKNI(parameters interface{}, inIndex []int32, flag *int32, coreID int) { srk := parameters.(*KNIParameters) if srk.linuxCore == true { @@ -1607,6 +1698,11 @@ func sendOS(parameters interface{}, inIndex []int32, flag *int32, coreID int) { low.SendOS(srp.socket, srp.in, flag, coreID, &srp.stats) } +func sendXDP(parameters interface{}, inIndex []int32, flag *int32, coreID int) { + srp := parameters.(*sendXDPParameters) + low.SendXDP(srp.socket, srp.in, flag, coreID, &srp.stats) +} + func merge(from low.Rings, to low.Rings) { // We should change out rings in all flow functions which we added before // and change them to one "after merge" ring. @@ -1620,6 +1716,10 @@ func merge(from low.Rings, to low.Rings) { if parameters.out[0] == from[0] { parameters.out = to } + case *receiveOSParameters: + if parameters.out[0] == from[0] { + parameters.out = to + } case *generateParameters: if parameters.out[0] == from[0] { parameters.out = to diff --git a/internal/low/Makefile b/internal/low/Makefile index d310b737..e69099ce 100644 --- a/internal/low/Makefile +++ b/internal/low/Makefile @@ -7,4 +7,4 @@ include $(PATH_TO_MK)/include.mk .PHONY: testing testing: check-pktgen - go test -v + go test -v -tags "${GO_BUILD_TAGS}" diff --git a/internal/low/low.go b/internal/low/low.go index f5f13869..b1764428 100644 --- a/internal/low/low.go +++ b/internal/low/low.go @@ -760,6 +760,10 @@ func CheckHWRXPacketsTimestamp(port uint16) bool { return bool(C.check_hwrxpackets_timestamp_capability(C.uint16_t(port))) } +func InitDevice(device string) int { + return int(C.initDevice(C.CString(device))) +} + func ReceiveOS(socket int, OUT *Ring, flag *int32, coreID int, stats *common.RXTXStats) { m := CreateMempool("receiveOS") C.receiveOS(C.int(socket), OUT.DPDK_ring, (*C.struct_rte_mempool)(unsafe.Pointer(m)), @@ -772,10 +776,6 @@ func SendOS(socket int, IN Rings, flag *int32, coreID int, stats *common.RXTXSta (*C.RXTXStats)(unsafe.Pointer(stats))) } -func InitDevice(device string) int { - return int(C.initDevice(C.CString(device))) -} - func SetCountersEnabledInApplication(enabled bool) { C.counters_enabled_in_application = C.bool(true) } @@ -791,3 +791,21 @@ func GetPacketOffloadFlags(mb *Mbuf) uint64 { func GetPacketTimestamp(mb *Mbuf) uint64 { return uint64(mb.timestamp) } + +type XDPSocket *C.struct_xsk_socket_info + +func InitXDP(device string, queue int) XDPSocket { + return C.initXDP(C.CString(device), C.int(queue)) +} + +func ReceiveXDP(socket XDPSocket, OUT *Ring, flag *int32, coreID int, stats *common.RXTXStats) { + m := CreateMempool("receiveXDP") + C.receiveXDP(socket, OUT.DPDK_ring, (*C.struct_rte_mempool)(unsafe.Pointer(m)), + (*C.int)(unsafe.Pointer(flag)), C.int(coreID), (*C.RXTXStats)(unsafe.Pointer(stats))) +} + +func SendXDP(socket XDPSocket, IN Rings, flag *int32, coreID int, stats *common.RXTXStats) { + C.sendXDP(socket, C.extractDPDKRings((**C.struct_nff_go_ring)(unsafe.Pointer(&(IN[0]))), + C.int32_t(len(IN))), C.int32_t(len(IN)), (*C.int)(unsafe.Pointer(flag)), C.int(coreID), + (*C.RXTXStats)(unsafe.Pointer(stats))) +} diff --git a/internal/low/low.h b/internal/low/low.h index ac27113d..46d22ecc 100644 --- a/internal/low/low.h +++ b/internal/low/low.h @@ -191,6 +191,8 @@ void setAffinity(int coreId) { sched_setaffinity(0, sizeof(cpuset), &cpuset); } +// ---------- DPDK section ---------- + int create_kni(uint16_t port, uint32_t core, char *name, struct rte_mempool *mbuf_pool) { struct rte_eth_dev_info dev_info; const struct rte_pci_device *pci_dev; @@ -828,6 +830,8 @@ bool check_hwrxpackets_timestamp_capability(uint16_t port_id) { return (dev_info.rx_offload_capa & flags) == flags; } +// ---------- OS raw socket section ---------- + int initDevice(char *name) { int s = socket(PF_PACKET, SOCK_RAW, htons(ETH_P_ALL)); if (s < 1) { @@ -888,6 +892,10 @@ void receiveOS(int socket, struct rte_ring *out_ring, struct rte_mempool *m, vol // Free any packets which can't be pushed to the ring. The ring is probably full. handleUnpushed((void*)bufs, pushed_pkts_number, rx_pkts_number); +#ifdef DEBUG + receive_received += rx_pkts_number; + receive_pushed += pushed_pkts_number; +#endif // DEBUG } *flag = wasStopped; } @@ -897,7 +905,6 @@ void sendOS(int socket, struct rte_ring **in_rings, int32_t inIndexNumber, volat struct rte_mbuf *bufs[BURST_SIZE]; uint16_t buf; - uint16_t tx_pkts_number; while (*flag == process) { for (int q = 0; q < inIndexNumber; q++) { // Get packets for TX from ring @@ -911,8 +918,254 @@ void sendOS(int socket, struct rte_ring **in_rings, int32_t inIndexNumber, volat // Free all packets handleUnpushed(bufs, 0, pkts_for_tx_number); +#ifdef DEBUG + send_required += pkts_for_tx_number; + send_sent += pkts_for_tx_number; +#endif // DEBUG } } free(in_rings); *flag = wasStopped; } + +// ---------- XDP socket section ---------- + +#ifdef NFF_GO_SUPPORT_XDP + +#include +#include +#include +#include "bpf/libbpf.h" +#include "bpf/xsk.h" + +struct xsk_umem_info { + struct xsk_ring_prod fq; + struct xsk_ring_cons cq; + struct xsk_umem *umem; + const struct rte_memzone *mz; +}; + +struct xsk_socket_info { + struct xsk_ring_cons rx; + struct xsk_ring_prod tx; + struct xsk_umem_info *umem; + struct xsk_socket *xsk; + __u32 outstanding_tx; + __u32 prog_id; + int nameindex; +}; + +struct xsk_socket_info *initXDP(char *name, int queue) { + __u32 idx; + __u32 opt_xdp_flags = XDP_FLAGS_UPDATE_IF_NOEXIST; // TODO get from user + __u32 opt_xdp_bind_flags = 0; // TODO get from user + __u32 NUM_FRAMES = 4 * 1024; // TODO from num_mbufs + int opt_xsk_frame_size = XSK_UMEM__DEFAULT_FRAME_SIZE; // TODO from num_mbufs + int memory_size = NUM_FRAMES * opt_xsk_frame_size; + + struct xsk_socket_info *xdp_socket = calloc(1, sizeof(*xdp_socket)); + if (!xdp_socket) { + fprintf(stderr, "ERROR: Can't allocate memory for socket structure for AF_XDP: %s\n", name); + return NULL; + } + + xdp_socket->umem = calloc(1, sizeof(*xdp_socket->umem)); + if (!xdp_socket->umem) { + fprintf(stderr, "ERROR: Can't allocate memory for umem structure for AF_XDP: %s\n", name); + return NULL; + } + + const struct rte_memzone *mz = rte_memzone_reserve_aligned(name, memory_size, + rte_socket_id(), RTE_MEMZONE_IOVA_CONTIG, getpagesize()); + if (!mz) { + fprintf(stderr, "ERROR: Can't reserve memzone for AF_XDP %s\n", name); + return NULL; + } + + struct xsk_umem_config umem_cfg = { + .fill_size = XSK_RING_PROD__DEFAULT_NUM_DESCS, + .comp_size = XSK_RING_CONS__DEFAULT_NUM_DESCS, + .frame_size = opt_xsk_frame_size, + .frame_headroom = XSK_UMEM__DEFAULT_FRAME_HEADROOM, + }; + int ret = xsk_umem__create(&xdp_socket->umem->umem, mz->addr, memory_size, + &xdp_socket->umem->fq, &xdp_socket->umem->cq, &umem_cfg); + if (ret) { + fprintf(stderr, "ERROR: Can't create umem for AF_XDP %s, error code %d\n", name, ret); + return NULL; + } + xdp_socket->umem->mz = mz; + + struct xsk_socket_config xsk_cfg = { + .rx_size = XSK_RING_CONS__DEFAULT_NUM_DESCS, + .tx_size = XSK_RING_PROD__DEFAULT_NUM_DESCS, + .libbpf_flags = 0, + .xdp_flags = opt_xdp_flags, + .bind_flags = opt_xdp_bind_flags, + }; + ret = xsk_socket__create(&xdp_socket->xsk, name, queue, xdp_socket->umem->umem, + &xdp_socket->rx, &xdp_socket->tx, &xsk_cfg); + if (ret) { + fprintf(stderr, "ERROR: Can't create AF_XDP socket %s, error code %d\n", name, ret); + return NULL; + } + xdp_socket->nameindex = if_nametoindex(name); + if (!xdp_socket->nameindex) { + fprintf(stderr, "ERROR: interface %s for AF_XDP does not exist\n", name); + return NULL; + } + + ret = bpf_get_link_xdp_id(xdp_socket->nameindex, &xdp_socket->prog_id, opt_xdp_flags); + if (ret) { + fprintf(stderr, "ERROR: Can't link BPF for AF_XDP %s, error code %d\n", name, ret); + return NULL; + } + ret = xsk_ring_prod__reserve(&xdp_socket->umem->fq, XSK_RING_PROD__DEFAULT_NUM_DESCS, &idx); + if (ret != XSK_RING_PROD__DEFAULT_NUM_DESCS) { + fprintf(stderr, "ERROR: reserve prod buffer for AF_XDP %s, error code %d\n", name, ret); + return NULL; + } + for (int i = 0; i < XSK_RING_PROD__DEFAULT_NUM_DESCS * XSK_UMEM__DEFAULT_FRAME_SIZE; i += XSK_UMEM__DEFAULT_FRAME_SIZE) { + *xsk_ring_prod__fill_addr(&xdp_socket->umem->fq, idx++) = i; + } + xsk_ring_prod__submit(&xdp_socket->umem->fq, XSK_RING_PROD__DEFAULT_NUM_DESCS); + return xdp_socket; +} + +void removeXDP(struct xsk_socket_info *xsk) { + xsk_socket__delete(xsk->xsk); + xsk_umem__delete(xsk->umem->umem); + __u32 curr_prog_id = 0; + + if (bpf_get_link_xdp_id(xsk->nameindex, &curr_prog_id, XDP_FLAGS_UPDATE_IF_NOEXIST)) { + return; + } + if (xsk->prog_id == curr_prog_id) { + bpf_set_link_xdp_fd(xsk->nameindex, -1, XDP_FLAGS_UPDATE_IF_NOEXIST); + } + free(xsk->umem); + free(xsk); +} +// TODO checkfatal should remove AF_XDP socket +// rings are single consumer, producer +// additional implementation for rx tx at one device + +void receiveXDP(struct xsk_socket_info *xsk, struct rte_ring *out_ring, struct rte_mempool *m, volatile int *flag, int coreId, RXTXStats *stats) { + setAffinity(coreId); + struct rte_mbuf *bufs[BURST_SIZE]; + REASSEMBLY_INIT + while (*flag == process) { + __u32 idx_rx = 0, idx_fq = 0; + // Get packets from AF_XDP + uint16_t rx_pkts_number = xsk_ring_cons__peek(&xsk->rx, BURST_SIZE, &idx_rx); + if (unlikely(rx_pkts_number == 0)) { + continue; + } + if (allocateMbufs(m, bufs, rx_pkts_number) != 0) { + printf("Can't allocate\n"); + } + int ret = xsk_ring_prod__reserve(&xsk->umem->fq, rx_pkts_number, &idx_fq); + while (ret != rx_pkts_number) { + if (ret < 0) { + printf("ERROR reserve\n"); + //exit_with_error(-ret); + } + ret = xsk_ring_prod__reserve(&xsk->umem->fq, rx_pkts_number, &idx_fq); + } + for (int i = 0; i < rx_pkts_number; i++) { + const struct xdp_desc *desc = xsk_ring_cons__rx_desc(&xsk->rx, idx_rx++); + uint64_t addr = desc->addr; + uint32_t len = desc->len; + void *pkt = xsk_umem__get_data(xsk->umem->mz->addr, addr); + rte_memcpy(rte_pktmbuf_mtod(bufs[i], void *), pkt, len); + rte_pktmbuf_pkt_len(bufs[i]) = len; + rte_pktmbuf_data_len(bufs[i]) = len; + *xsk_ring_prod__fill_addr(&xsk->umem->fq, idx_fq++) = addr; + } + xsk_ring_prod__submit(&xsk->umem->fq, rx_pkts_number); + xsk_ring_cons__release(&xsk->rx, rx_pkts_number); + rx_pkts_number = handleReceived(bufs, rx_pkts_number, tbl, pdeath_row); + uint16_t pushed_pkts_number = rte_ring_enqueue_burst(out_ring, (void*)bufs, rx_pkts_number, NULL); + + // Free any packets which can't be pushed to the ring. The ring is probably full. + handleUnpushed((void*)bufs, pushed_pkts_number, rx_pkts_number); +#ifdef DEBUG + receive_received += rx_pkts_number; + receive_pushed += pushed_pkts_number; +#endif // DEBUG + } + removeXDP(xsk); + *flag = wasStopped; +} + +void sendXDP(struct xsk_socket_info *xsk, struct rte_ring **in_rings, int32_t inIndexNumber, volatile int *flag, int coreId, RXTXStats *stats) { + setAffinity(coreId); + struct rte_mbuf *bufs[BURST_SIZE]; + uint16_t buf; + uint16_t tx_pkts_number; + struct xdp_desc *desc; + __u32 idx, frame_nb = 0; + int required = 0; + while (*flag == process) { + for (int q = 0; q < inIndexNumber; q++) { + // Get packets for TX from ring + uint16_t pkts_for_tx_number = rte_ring_mc_dequeue_burst(in_rings[q], (void*)bufs, BURST_SIZE, NULL); + if (pkts_for_tx_number == 0) { + continue; + } + if (xsk_ring_prod__reserve(&xsk->tx, pkts_for_tx_number, &idx) == pkts_for_tx_number) { + for (int i = 0; i < pkts_for_tx_number; i++) { + desc = xsk_ring_prod__tx_desc(&xsk->tx, idx + i); + desc->addr = (frame_nb + i) << XSK_UMEM__DEFAULT_FRAME_SHIFT; + desc->len = bufs[i]->pkt_len; + void *pkt = xsk_umem__get_data(xsk->umem->mz->addr, desc->addr); + rte_memcpy(pkt, rte_pktmbuf_mtod(bufs[i], void *), desc->len); + } + xsk_ring_prod__submit(&xsk->tx, pkts_for_tx_number); + required += pkts_for_tx_number; + frame_nb += pkts_for_tx_number; + frame_nb %= 4 * 1024; //NUM_FRAMES; + } + // Free all packets + handleUnpushed(bufs, 0, pkts_for_tx_number); + if (required > 0) { + int ret = sendto(xsk_socket__fd(xsk->xsk), NULL, 0, MSG_DONTWAIT, NULL, 0); + if (ret < 0 && errno != ENOBUFS && errno != EAGAIN && errno != EBUSY) { + printf("ERROR!!!\n"); + } + int tx_pkts_number = xsk_ring_cons__peek(&xsk->umem->cq, pkts_for_tx_number, &idx); + if (tx_pkts_number > 0) { + xsk_ring_cons__release(&xsk->umem->cq, tx_pkts_number); + required -= tx_pkts_number; + } +#ifdef DEBUG + send_required += pkts_for_tx_number; + send_sent += tx_pkts_number; +#endif // DEBUG + } + } + } + free(in_rings); + removeXDP(xsk); + *flag = wasStopped; +} + +#else // NFF_GO_SUPPORT_XDP + +struct xsk_socket_info { +}; + +struct xsk_socket_info * initXDP(char *name, int queue) { + fprintf(stderr, "AF_XDP support is disabled by build configuration\n"); + return NULL; +} + +void receiveXDP(struct xsk_socket_info *socket, struct rte_ring *out_ring, struct rte_mempool *m, volatile int *flag, int coreId, RXTXStats *stats) { + fprintf(stderr, "AF_XDP support is disabled by build configuration\n"); +} + +void sendXDP(struct xsk_socket_info *socket, struct rte_ring **in_rings, int32_t inIndexNumber, volatile int *flag, int coreId, RXTXStats *stats) { + fprintf(stderr, "AF_XDP support is disabled by build configuration\n"); +} + +#endif // NFF_GO_SUPPORT_XDP diff --git a/internal/low/low_bpf.go b/internal/low/low_bpf.go new file mode 100644 index 00000000..4b4a704f --- /dev/null +++ b/internal/low/low_bpf.go @@ -0,0 +1,12 @@ +// Copyright 2019 Intel Corporation. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +// +build bpf + +package low + +/* +#cgo LDFLAGS: -lbpf -lelf +*/ +import "C" diff --git a/mk/include.mk b/mk/include.mk index fb4aa8bf..c6001682 100644 --- a/mk/include.mk +++ b/mk/include.mk @@ -53,6 +53,19 @@ $(info Checking for AVX support... no) endif endif +ifndef NFF_GO_NO_MLX_DRIVERS +ifeq (,$(findstring mlx,$(GO_BUILD_TAGS))) +export GO_BUILD_TAGS += mlx +endif +endif + +ifndef NFF_GO_NO_BPF_SUPPORT +ifeq (,$(findstring bpf,$(GO_BUILD_TAGS))) +export GO_BUILD_TAGS += bpf +endif +CFLAGS += -DNFF_GO_SUPPORT_XDP +endif + export CGO_CFLAGS = $(CFLAGS) export CGO_LDFLAGS = \ diff --git a/mk/leaf.mk b/mk/leaf.mk index 1e98c6b8..ee35457c 100644 --- a/mk/leaf.mk +++ b/mk/leaf.mk @@ -10,12 +10,6 @@ include $(PATH_TO_MK)/include.mk # Build all .PHONY: clean -ifndef NFF_GO_NO_MLX_DRIVERS -ifeq (,$(findstring mlx,$(GO_BUILD_TAGS))) -export GO_BUILD_TAGS += mlx -endif -endif - ifdef NFF_GO_DEBUG # Flags to build Go files without optimizations export GO_COMPILE_FLAGS += -gcflags=all='-N -l' diff --git a/packet/Makefile b/packet/Makefile index d4d8019c..dfc337d5 100644 --- a/packet/Makefile +++ b/packet/Makefile @@ -7,7 +7,7 @@ include $(PATH_TO_MK)/include.mk .PHONY: testing testing: check-pktgen - go test + go test -tags "${GO_BUILD_TAGS}" .PHONY: coverage coverage: diff --git a/vagrant/Vagrantfile b/vagrant/Vagrantfile index de9d615f..f3703941 100644 --- a/vagrant/Vagrantfile +++ b/vagrant/Vagrantfile @@ -133,6 +133,9 @@ chmod +x ~/scripts.sh echo -e ". ~/scripts.sh\n$(cat .bashrc)" > .bashrc setuptesthost +echo Installing libbpf +git clone https://github.com/libbpf/libbpf && make -C libbpf/src all install + echo Downloading and building NFF-GO framework go get -v golang.org/x/tools/cmd/stringer git clone -b master --recurse-submodules http://github.com/intel-go/nff-go From 1b31697bf769af6ebd65fcba9a281e5cf5082d86 Mon Sep 17 00:00:00 2001 From: Gregory Shimansky Date: Tue, 27 Aug 2019 15:49:59 -0500 Subject: [PATCH 13/24] Fixed ldconfig path in Dockerfiles, Vagrantfile and README --- .travis.yml | 1 - Dockerfile | 6 +++++- README.md | 5 +++-- nff-go-base/Makefile | 2 ++ vagrant/Vagrantfile | 7 +++++-- 5 files changed, 15 insertions(+), 6 deletions(-) diff --git a/.travis.yml b/.travis.yml index c3d83821..0c8b8549 100644 --- a/.travis.yml +++ b/.travis.yml @@ -17,7 +17,6 @@ before_script: - docker run -it -d --privileged -v /usr/src:/usr/src -v /lib/modules:/lib/modules -v /sys/devices/system/node:/sys/devices/system/node --name test-nff-go test-disco /bin/bash script: - - docker exec -i test-nff-go bash -c "git clone https://github.com/libbpf/libbpf && make -C libbpf/src all install" - docker exec -i test-nff-go go mod download - docker exec -i test-nff-go make # Build standalone examples diff --git a/Dockerfile b/Dockerfile index 43267906..2c957b1f 100644 --- a/Dockerfile +++ b/Dockerfile @@ -20,7 +20,11 @@ RUN apt-get -q update && apt-get -q -y install \ libmnl-dev \ libibverbs-dev -RUN cd /opt && curl -L -s https://dl.google.com/go/go1.12.5.linux-amd64.tar.gz | tar zx +RUN cd /opt && curl -L -s https://dl.google.com/go/go1.12.9.linux-amd64.tar.gz | tar zx +RUN git clone https://github.com/libbpf/libbpf +RUN make -C libbpf/src all install +RUN echo "/usr/lib64" > /etc/ld.so.conf.d/usrlib64.conf +RUN ldconfig RUN mkdir -p ${NFF_GO} COPY . ${NFF_GO} diff --git a/README.md b/README.md index d184fce7..08d25802 100644 --- a/README.md +++ b/README.md @@ -133,10 +133,11 @@ If you want to build `libbpf` from sources you can do it in two different ways. * If you are using stock Linux kernel from distribution, [download `libbpf` from GitHub](https://github.com/libbpf/libbpf), then - execute `cd src; make; sudo make install`. + execute `cd src; make; sudo make install`. Add /usr/lib64 to your + ldconfig path. * If you build Linux kernel from sources, you can build `libbpf` from Linux source tree using commands `cd tools/lib/bpf; make; sudo make - install install_headers`. + install install_headers`. Add /usr/local/lib64 to your ldconfig path. ## Building NFF-GO diff --git a/nff-go-base/Makefile b/nff-go-base/Makefile index bfaebf29..4e0499ce 100644 --- a/nff-go-base/Makefile +++ b/nff-go-base/Makefile @@ -18,6 +18,7 @@ Fedora: Makefile echo 'RUN echo proxy=${http_proxy} >> /etc/dnf/dnf.conf' >> Dockerfile; \ fi echo 'RUN dnf -y install numactl-libs.x86_64 lua-devel libmnl-devel rdma-core-devel libibverbs; dnf clean all' >> Dockerfile + echo 'RUN git clone https://github.com/libbpf/libbpf && make -C libbpf/src all install && echo "/usr/lib64" > /etc/ld.so.conf.d/usrlib64.conf && ldconfig' >> Dockerfile echo 'CMD ["/bin/bash"]' >> Dockerfile Dockerfile: Makefile @@ -30,6 +31,7 @@ Dockerfile: Makefile echo 'RUN echo Acquire::https::Proxy \"${https_proxy}\"\; >> /etc/apt/apt.conf' >> Dockerfile; \ fi echo 'RUN apt-get update; apt-get install -y pciutils libnuma-dev libpcap0.8-dev liblua5.3-dev libibverbs-dev libmnl-dev; apt-get clean all' >> Dockerfile + echo 'RUN git clone https://github.com/libbpf/libbpf && make -C libbpf/src all install && echo "/usr/lib64" > /etc/ld.so.conf.d/usrlib64.conf && ldconfig' >> Dockerfile echo 'CMD ["/bin/bash"]' >> Dockerfile .PHONY: dpdk diff --git a/vagrant/Vagrantfile b/vagrant/Vagrantfile index f3703941..e1c5ff0f 100644 --- a/vagrant/Vagrantfile +++ b/vagrant/Vagrantfile @@ -126,7 +126,7 @@ echo Reassigning "${syscon}" interface to system name sudo nmcli c mod "${syscon}" connection.id 'System connection' echo Unpacking Go language into /opt -(cd /opt; sudo sh -c 'curl -L -s https://dl.google.com/go/go1.12.5.linux-amd64.tar.gz | tar zx') +(cd /opt; sudo sh -c 'curl -L -s https://dl.google.com/go/go1.12.9.linux-amd64.tar.gz | tar zx') mkdir go chmod +x ~/scripts.sh . ~/scripts.sh @@ -134,7 +134,10 @@ echo -e ". ~/scripts.sh\n$(cat .bashrc)" > .bashrc setuptesthost echo Installing libbpf -git clone https://github.com/libbpf/libbpf && make -C libbpf/src all install +git clone https://github.com/libbpf/libbpf +make -C libbpf/src all +sudo make -C libbpf/src install +sudo sh -c "echo /usr/lib64 > /etc/ld.so.conf.d/usrlib64.conf" echo Downloading and building NFF-GO framework go get -v golang.org/x/tools/cmd/stringer From 9a6131ff985f1981aaba920dcaa1d3d7c0a3118e Mon Sep 17 00:00:00 2001 From: Gregory Shimansky Date: Wed, 28 Aug 2019 09:07:57 -0500 Subject: [PATCH 14/24] Fixed libbpf checkout to a specific tag v0.0.4 because newer commits break our build and are not yet merged into Linux kernel. --- Dockerfile | 2 +- nff-go-base/Makefile | 2 +- vagrant/Vagrantfile | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/Dockerfile b/Dockerfile index 2c957b1f..757a6841 100644 --- a/Dockerfile +++ b/Dockerfile @@ -21,7 +21,7 @@ RUN apt-get -q update && apt-get -q -y install \ libibverbs-dev RUN cd /opt && curl -L -s https://dl.google.com/go/go1.12.9.linux-amd64.tar.gz | tar zx -RUN git clone https://github.com/libbpf/libbpf +RUN git clone -b v0.0.4 https://github.com/libbpf/libbpf RUN make -C libbpf/src all install RUN echo "/usr/lib64" > /etc/ld.so.conf.d/usrlib64.conf RUN ldconfig diff --git a/nff-go-base/Makefile b/nff-go-base/Makefile index 4e0499ce..b6760700 100644 --- a/nff-go-base/Makefile +++ b/nff-go-base/Makefile @@ -31,7 +31,7 @@ Dockerfile: Makefile echo 'RUN echo Acquire::https::Proxy \"${https_proxy}\"\; >> /etc/apt/apt.conf' >> Dockerfile; \ fi echo 'RUN apt-get update; apt-get install -y pciutils libnuma-dev libpcap0.8-dev liblua5.3-dev libibverbs-dev libmnl-dev; apt-get clean all' >> Dockerfile - echo 'RUN git clone https://github.com/libbpf/libbpf && make -C libbpf/src all install && echo "/usr/lib64" > /etc/ld.so.conf.d/usrlib64.conf && ldconfig' >> Dockerfile + echo 'RUN git clone -b v0.0.4 https://github.com/libbpf/libbpf && make -C libbpf/src all install && echo "/usr/lib64" > /etc/ld.so.conf.d/usrlib64.conf && ldconfig' >> Dockerfile echo 'CMD ["/bin/bash"]' >> Dockerfile .PHONY: dpdk diff --git a/vagrant/Vagrantfile b/vagrant/Vagrantfile index e1c5ff0f..64fbfcd8 100644 --- a/vagrant/Vagrantfile +++ b/vagrant/Vagrantfile @@ -134,7 +134,7 @@ echo -e ". ~/scripts.sh\n$(cat .bashrc)" > .bashrc setuptesthost echo Installing libbpf -git clone https://github.com/libbpf/libbpf +git clone -b v0.0.4 https://github.com/libbpf/libbpf make -C libbpf/src all sudo make -C libbpf/src install sudo sh -c "echo /usr/lib64 > /etc/ld.so.conf.d/usrlib64.conf" From 774fc1dec25316ac629b454a211c3d0a416c6c70 Mon Sep 17 00:00:00 2001 From: Gregory Shimansky Date: Wed, 28 Aug 2019 09:57:07 -0500 Subject: [PATCH 15/24] Added git to installation of basic image because it is needed to clone libbpf repository --- nff-go-base/Makefile | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/nff-go-base/Makefile b/nff-go-base/Makefile index b6760700..f1340a2a 100644 --- a/nff-go-base/Makefile +++ b/nff-go-base/Makefile @@ -17,7 +17,7 @@ Fedora: Makefile echo 'ENV https_proxy ${https_proxy}' >> Dockerfile; \ echo 'RUN echo proxy=${http_proxy} >> /etc/dnf/dnf.conf' >> Dockerfile; \ fi - echo 'RUN dnf -y install numactl-libs.x86_64 lua-devel libmnl-devel rdma-core-devel libibverbs; dnf clean all' >> Dockerfile + echo 'RUN dnf -y install git numactl-libs.x86_64 lua-devel libmnl-devel rdma-core-devel libibverbs; dnf clean all' >> Dockerfile echo 'RUN git clone https://github.com/libbpf/libbpf && make -C libbpf/src all install && echo "/usr/lib64" > /etc/ld.so.conf.d/usrlib64.conf && ldconfig' >> Dockerfile echo 'CMD ["/bin/bash"]' >> Dockerfile @@ -30,7 +30,7 @@ Dockerfile: Makefile echo 'RUN echo Acquire::http::Proxy \"${http_proxy}\"\; >> /etc/apt/apt.conf' >> Dockerfile; \ echo 'RUN echo Acquire::https::Proxy \"${https_proxy}\"\; >> /etc/apt/apt.conf' >> Dockerfile; \ fi - echo 'RUN apt-get update; apt-get install -y pciutils libnuma-dev libpcap0.8-dev liblua5.3-dev libibverbs-dev libmnl-dev; apt-get clean all' >> Dockerfile + echo 'RUN apt-get update; apt-get install -y git pciutils libnuma-dev libpcap0.8-dev liblua5.3-dev libibverbs-dev libmnl-dev; apt-get clean all' >> Dockerfile echo 'RUN git clone -b v0.0.4 https://github.com/libbpf/libbpf && make -C libbpf/src all install && echo "/usr/lib64" > /etc/ld.so.conf.d/usrlib64.conf && ldconfig' >> Dockerfile echo 'CMD ["/bin/bash"]' >> Dockerfile From a4a0c8b8da7d00f4fce2e08e7d443b6b1e9e03f5 Mon Sep 17 00:00:00 2001 From: Gregory Shimansky Date: Wed, 28 Aug 2019 10:11:17 -0500 Subject: [PATCH 16/24] Added libelf-dev to build libbpf --- nff-go-base/Makefile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/nff-go-base/Makefile b/nff-go-base/Makefile index f1340a2a..d1a2b9ae 100644 --- a/nff-go-base/Makefile +++ b/nff-go-base/Makefile @@ -30,7 +30,7 @@ Dockerfile: Makefile echo 'RUN echo Acquire::http::Proxy \"${http_proxy}\"\; >> /etc/apt/apt.conf' >> Dockerfile; \ echo 'RUN echo Acquire::https::Proxy \"${https_proxy}\"\; >> /etc/apt/apt.conf' >> Dockerfile; \ fi - echo 'RUN apt-get update; apt-get install -y git pciutils libnuma-dev libpcap0.8-dev liblua5.3-dev libibverbs-dev libmnl-dev; apt-get clean all' >> Dockerfile + echo 'RUN apt-get update; apt-get install -y git pciutils libnuma-dev libpcap0.8-dev liblua5.3-dev libibverbs-dev libmnl-dev libelf-dev; apt-get clean all' >> Dockerfile echo 'RUN git clone -b v0.0.4 https://github.com/libbpf/libbpf && make -C libbpf/src all install && echo "/usr/lib64" > /etc/ld.so.conf.d/usrlib64.conf && ldconfig' >> Dockerfile echo 'CMD ["/bin/bash"]' >> Dockerfile From 59d52b3175bce2b45835d46cf335e4d020b39549 Mon Sep 17 00:00:00 2001 From: Gregory Shimansky Date: Wed, 28 Aug 2019 15:34:24 -0500 Subject: [PATCH 17/24] Added debug message to debug docker no space left problem --- mk/leaf.mk | 1 + 1 file changed, 1 insertion(+) diff --git a/mk/leaf.mk b/mk/leaf.mk index ee35457c..50dab007 100644 --- a/mk/leaf.mk +++ b/mk/leaf.mk @@ -57,6 +57,7 @@ deploy: .check-deploy-env images $(eval TMPNAME=tmp-$(IMAGENAME).tar) docker save $(WORKIMAGENAME) > $(TMPNAME) for host in `echo $(NFF_GO_HOSTS) | tr ',' ' '`; do \ + echo Uploading $(WORKIMAGENAME) to $$host \ if ! docker -H tcp://$$host load < $(TMPNAME); then break; fi; \ done rm $(TMPNAME) From 17c6dcb5ae00faeccc96855f5976af88fcab83b1 Mon Sep 17 00:00:00 2001 From: Gregory Shimansky Date: Wed, 4 Sep 2019 14:46:00 -0500 Subject: [PATCH 18/24] Fixed build bug that RTE_TARGET is defined after it is used --- mk/include.mk | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/mk/include.mk b/mk/include.mk index c6001682..4516944b 100644 --- a/mk/include.mk +++ b/mk/include.mk @@ -9,9 +9,9 @@ PROJECT_ROOT := $(abspath $(dir $(abspath $(lastword $(MAKEFILE_LIST))))/..) # Main DPDK variables DPDK_DIR=dpdk PKTGEN_DIR=pktgen-dpdk +export RTE_TARGET=x86_64-native-linuxapp-gcc DPDK_INSTALL_DIR=$(RTE_TARGET)-install export RTE_SDK=$(PROJECT_ROOT)/dpdk/$(DPDK_DIR)/$(DPDK_INSTALL_DIR)/usr/local/share/dpdk -export RTE_TARGET=x86_64-native-linuxapp-gcc # Configure flags for native code. Disable FSGSBASE and F16C to run in # VMs and Docker containers. From 8bc50e70bd008b0efa04b706873914e4fff12ec0 Mon Sep 17 00:00:00 2001 From: dannypsnl Date: Mon, 9 Sep 2019 14:00:08 +0800 Subject: [PATCH 19/24] fix get PCI device ID For some NIC the readlink `/sys/class/net/` would get several PCI address and the correct one would be the second one, e.g. $ readlink -f /sys/class/net/ens224 /sys/devices/pci0000:00/0000:00:17.0/0000:13:00.0/net/ens224 To get the correct device ID, we have to check if the second one `0000:13:00.0` is a PCI address, return second one, else return the first one `0000:00:17.0` --- devices/misc.go | 18 +++++++----------- 1 file changed, 7 insertions(+), 11 deletions(-) diff --git a/devices/misc.go b/devices/misc.go index e52afb64..b2dd37a1 100644 --- a/devices/misc.go +++ b/devices/misc.go @@ -42,20 +42,16 @@ func GetDeviceID(nicName string) (string, error) { // raw should be like /sys/devices/pci0002:00/0000:00:08.0/virtio2/net/ens8 // or /sys/devices/pci0000:00/0000:00:01.0/0000:03:00.2/net/ens4f2 raws := strings.Split(raw, "/") - if len(raws) < 5 { + if len(raws) < 6 { return "", fmt.Errorf("path not correct") } - - // search and validate deviceID - for idx := len(raws) - 1; idx >= 0; idx-- { - v := strings.Split(raws[idx], ":") - if len(v) == 3 { - if len(v[0]) == 4 && len(v[1]) == 2 && len(v[2]) == 4 { - return raws[idx], nil - } - } + if IsPciID.Match([]byte(raws[5])) { + return raws[5], nil + } else if IsPciID.Match([]byte(raws[4])) { + return raws[4], nil + } else { + return "", fmt.Errorf("can't get device ID from path: %s", raw) } - return "", fmt.Errorf("path not correct") } // IsModuleLoaded checks if the kernel has already loaded the driver or not. From 9f2140d501d4a3ad5b2eedf3bf9c7e5c8f278d6b Mon Sep 17 00:00:00 2001 From: Gregory Shimansky Date: Wed, 4 Sep 2019 14:52:43 -0500 Subject: [PATCH 20/24] Added global tags to ignore --- .gitignore | 3 +++ 1 file changed, 3 insertions(+) diff --git a/.gitignore b/.gitignore index d29de412..035f6cc1 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,6 @@ # emacs *~ doc +GPATH +GTAGS +GRTAGS From 3331a26b6896f5975ed41a7bbdc4da0651e0170b Mon Sep 17 00:00:00 2001 From: Gregory Shimansky Date: Fri, 13 Sep 2019 14:41:27 -0500 Subject: [PATCH 21/24] Implemented optimization for faster packet sending with less packet loss - Implemented multiple Send threads - Increased size of TX ring to 2048 - Implemented user controlled number of TX queues - Added xstats and other debug messages - Implemented multiple attempts to send packets - Fixed dropped packet statistics (need to use interlocked operation there) --- flow/flow.go | 66 +++++++++++++---- flow/scheduler.go | 54 ++++++++------ internal/low/low.go | 20 ++++-- internal/low/low.h | 168 +++++++++++++++++++++++++++++++++++++------- 4 files changed, 241 insertions(+), 67 deletions(-) diff --git a/flow/flow.go b/flow/flow.go index bf1e7e77..7a2da6bc 100644 --- a/flow/flow.go +++ b/flow/flow.go @@ -161,7 +161,7 @@ func addReceiver(portId uint16, out low.Rings, inIndexNumber int32) { par.port = low.GetPort(portId) par.out = out par.status = make([]int32, maxRecv, maxRecv) - schedState.addFF("receiver", nil, recvRSS, nil, par, nil, receiveRSS, inIndexNumber, &par.stats) + schedState.addFF("receiverPort"+string(portId), nil, recvRSS, nil, par, nil, receiveRSS, inIndexNumber, &par.stats) } type receiveOSParameters struct { @@ -256,18 +256,23 @@ func addFastGenerator(out low.Rings, generateFunction GenerateFunction, } type sendParameters struct { - in low.Rings - port uint16 - anyway bool - stats common.RXTXStats + in low.Rings + port uint16 + unrestrictedClones bool + stats common.RXTXStats + sendThreadIndex int } func addSender(port uint16, in low.Rings, inIndexNumber int32) { - par := new(sendParameters) - par.port = port - par.in = in - par.anyway = schedState.anyway - schedState.addFF("sender", nil, send, nil, par, nil, sendReceiveKNI, inIndexNumber, &par.stats) + for iii := 0; iii < sendCPUCoresPerPort; iii++ { + par := new(sendParameters) + par.port = port + par.in = in + par.unrestrictedClones = schedState.unrestrictedClones + par.sendThreadIndex = iii + schedState.addFF("senderPort"+string(port)+"Thread"+string(iii), + nil, send, nil, par, nil, sendReceiveKNI, inIndexNumber, &par.stats) + } } type sendOSParameters struct { @@ -468,6 +473,7 @@ var sizeMultiplier uint var schedTime uint var hwtxchecksum, hwrxpacketstimestamp, setSIGINTHandler bool var maxRecv int +var sendCPUCoresPerPort, tXQueuesNumberPerPort int type port struct { wasRequested bool // has user requested any send/receive operations at this port @@ -531,7 +537,7 @@ type Config struct { // Limits parallel instances. 1 for one instance, 1000 for RSS count determine instances MaxInIndex int32 // Scheduler should clone functions even if it can lead to reordering. - // This option should be switch off for all high level reassembling like TCP or HTTP + // This option should be switched off for all high level reassembling like TCP or HTTP RestrictedCloning bool // If application uses EncapsulateHead or DecapsulateHead functions L2 pointers // should be reinit every receving or generating a packet. This can be removed if @@ -570,6 +576,19 @@ type Config struct { // SystemStartScheduler waits for SIGINT notification and calls // SystemStop after it. It is enabled by default. NoSetSIGINTHandler bool + // Number of CPU cores to be occupied by Send routines. It is + // necessary to set TXQueueNumber to a reasonably big number which + // can be divided by SendCPUCoresPerPort. + SendCPUCoresPerPort int + // Number of transmit queues to use on network card. By default it + // is minimum of NIC supported TX queues number and 2. If this + // value is specified and NIC doesn't support this number of TX + // queues, initialization fails. + TXQueuesNumberPerPort int + // Controls scheduler interval in milliseconds. Default value is + // 500. Lower values allow faster reaction to changing traffic but + // increase scheduling overhead. + SchedulerInterval uint } // SystemInit is initialization of system. This function should be always called before graph construction. @@ -588,12 +607,23 @@ func SystemInit(args *Config) error { cpus = common.GetDefaultCPUs(CPUCoresNumber) } + tXQueuesNumberPerPort = args.TXQueuesNumberPerPort + + sendCPUCoresPerPort = args.SendCPUCoresPerPort + if sendCPUCoresPerPort == 0 { + sendCPUCoresPerPort = 1 + if tXQueuesNumberPerPort != 0 && tXQueuesNumberPerPort%sendCPUCoresPerPort != 0 { + return common.WrapWithNFError(nil, "TXQueuesNumberPerPort should be divisible by SendCPUCoresPerPort", + common.BadArgument) + } + } + schedulerOff := args.DisableScheduler schedulerOffRemove := args.PersistentClones stopDedicatedCore := args.StopOnDedicatedCore hwtxchecksum = args.HWTXChecksum hwrxpacketstimestamp = args.HWRXPacketsTimestamp - anyway := !args.RestrictedCloning + unrestrictedClones := !args.RestrictedCloning mbufNumber := uint(8191) if args.MbufNumber != 0 { @@ -610,7 +640,12 @@ func SystemInit(args *Config) error { sizeMultiplier = args.RingSize } - schedTime = 500 + if args.SchedulerInterval != 0 { + schedTime = args.SchedulerInterval + } else { + schedTime = 500 + } + if args.ScaleTime != 0 { schedTime = args.ScaleTime } @@ -702,7 +737,7 @@ func SystemInit(args *Config) error { common.LogTitle(common.Initialization, "------------***------ Initializing scheduler -----***------------") StopRing := low.CreateRings(burstSize*sizeMultiplier, maxInIndex /* Maximum possible rings */) common.LogDebug(common.Initialization, "Scheduler can use cores:", cpus) - schedState = newScheduler(cpus, schedulerOff, schedulerOffRemove, stopDedicatedCore, StopRing, checkTime, debugTime, maxPacketsToClone, maxRecv, anyway) + schedState = newScheduler(cpus, schedulerOff, schedulerOffRemove, stopDedicatedCore, StopRing, checkTime, debugTime, maxPacketsToClone, maxRecv, unrestrictedClones) // Set HW offloading flag in packet package packet.SetHWTXChecksumFlag(hwtxchecksum) @@ -1690,7 +1725,8 @@ func pcopy(parameters interface{}, inIndex []int32, stopper [2]chan int, report func send(parameters interface{}, inIndex []int32, flag *int32, coreID int) { srp := parameters.(*sendParameters) - low.Send(srp.port, srp.in, srp.anyway, flag, coreID, &srp.stats) + low.Send(srp.port, srp.in, srp.unrestrictedClones, flag, coreID, &srp.stats, + srp.sendThreadIndex, sendCPUCoresPerPort) } func sendOS(parameters interface{}, inIndex []int32, flag *int32, coreID int) { diff --git a/flow/scheduler.go b/flow/scheduler.go index 6390ea42..3c1d6f87 100644 --- a/flow/scheduler.go +++ b/flow/scheduler.go @@ -36,6 +36,7 @@ const generatePauseStep = 0.1 const process = 1 const stopRequest = 2 const wasStopped = 9 +const printPortStatistics = false // TODO "5" and "39" constants derived empirically. Need to investigate more elegant thresholds. const RSSCloneMin = 5 @@ -156,26 +157,26 @@ func (scheduler *scheduler) addFF(name string, ucfn uncloneFlowFunction, Cfn cFl } type scheduler struct { - ff []*flowFunction - cores []core - off bool - offRemove bool - anyway bool - stopDedicatedCore bool - StopRing low.Rings - usedCores uint8 - checkTime uint - debugTime uint - Dropped uint - maxPacketsToClone uint32 - stopFlag int32 - maxRecv int - Timers []*Timer - nAttempts []uint64 - pAttempts []uint64 - maxInIndex int32 - measureRings low.Rings - coreIndex int + ff []*flowFunction + cores []core + off bool + offRemove bool + unrestrictedClones bool + stopDedicatedCore bool + StopRing low.Rings + usedCores uint8 + checkTime uint + debugTime uint + Dropped uint + maxPacketsToClone uint32 + stopFlag int32 + maxRecv int + Timers []*Timer + nAttempts []uint64 + pAttempts []uint64 + maxInIndex int32 + measureRings low.Rings + coreIndex int } type core struct { @@ -184,7 +185,7 @@ type core struct { } func newScheduler(cpus []int, schedulerOff bool, schedulerOffRemove bool, stopDedicatedCore bool, - stopRing low.Rings, checkTime uint, debugTime uint, maxPacketsToClone uint32, maxRecv int, anyway bool) *scheduler { + stopRing low.Rings, checkTime uint, debugTime uint, maxPacketsToClone uint32, maxRecv int, unrestrictedClones bool) *scheduler { coresNumber := len(cpus) // Init scheduler scheduler := new(scheduler) @@ -200,7 +201,7 @@ func newScheduler(cpus []int, schedulerOff bool, schedulerOffRemove bool, stopDe scheduler.debugTime = debugTime scheduler.maxPacketsToClone = maxPacketsToClone scheduler.maxRecv = maxRecv - scheduler.anyway = anyway + scheduler.unrestrictedClones = unrestrictedClones scheduler.pAttempts = make([]uint64, len(scheduler.cores), len(scheduler.cores)) return scheduler @@ -405,6 +406,13 @@ func (scheduler *scheduler) schedule(schedTime uint) { common.LogDebug(common.Debug, "---------------") common.LogDebug(common.Debug, "System is using", scheduler.usedCores, "cores now.", uint8(len(scheduler.cores))-scheduler.usedCores, "cores are left available.") low.Statistics(float32(scheduler.debugTime) / 1000) + if printPortStatistics { + for i := range createdPorts { + if createdPorts[i].wasRequested { + low.PortStatistics(createdPorts[i].port) + } + } + } for i := range scheduler.ff { scheduler.ff[i].printDebug(schedTime) } @@ -549,7 +557,7 @@ func (scheduler *scheduler) schedule(schedTime uint) { ffi.removed = false continue } - if ffi.inIndex[0] == 1 && scheduler.anyway && ffi.checkInputRingClonable(scheduler.maxPacketsToClone) && + if ffi.inIndex[0] == 1 && scheduler.unrestrictedClones && ffi.checkInputRingClonable(scheduler.maxPacketsToClone) && ffi.checkOutputRingClonable(scheduler.maxPacketsToClone) && (ffi.increasedSpeed == 0 || ffi.increasedSpeed > ffi.reportedState.V.Packets) { if scheduler.pAttempts[ffi.cloneNumber+1] == 0 { diff --git a/internal/low/low.go b/internal/low/low.go index b1764428..3e7bec29 100644 --- a/internal/low/low.go +++ b/internal/low/low.go @@ -517,12 +517,19 @@ func SrKNI(port uint16, flag *int32, coreID int, recv bool, OUT Rings, send bool } // Send - dequeue packets and send. -func Send(port uint16, IN Rings, anyway bool, flag *int32, coreID int, stats *common.RXTXStats) { +func Send(port uint16, IN Rings, unrestrictedClones bool, flag *int32, coreID int, stats *common.RXTXStats, + sendThreadIndex, totalSendTreads int) { if C.rte_eth_dev_socket_id(C.uint16_t(port)) != C.int(C.rte_lcore_to_socket_id(C.uint(coreID))) { common.LogWarning(common.Initialization, "Send port", port, "is on remote NUMA node to polling thread - not optimal performance.") } - C.nff_go_send(C.uint16_t(port), C.extractDPDKRings((**C.struct_nff_go_ring)(unsafe.Pointer(&(IN[0]))), C.int32_t(len(IN))), C.int32_t(len(IN)), - C.bool(anyway), (*C.int)(unsafe.Pointer(flag)), C.int(coreID), (*C.RXTXStats)(unsafe.Pointer(stats))) + C.nff_go_send(C.uint16_t(port), + C.extractDPDKRings((**C.struct_nff_go_ring)(unsafe.Pointer(&(IN[0]))), C.int32_t(len(IN))), + C.int32_t(len(IN)), + C.bool(unrestrictedClones), + (*C.int)(unsafe.Pointer(flag)), C.int(coreID), + (*C.RXTXStats)(unsafe.Pointer(stats)), + C.int32_t(sendThreadIndex), + C.int32_t(totalSendTreads)) } // Stop - dequeue and free packets. @@ -590,7 +597,7 @@ func GetPortsNumber() int { } func CheckPortRSS(port uint16) int32 { - return int32(C.check_port_rss(C.uint16_t(port))) + return int32(C.check_max_port_rx_queues(C.uint16_t(port))) } // CreatePort initializes a new port using global settings and parameters. @@ -697,6 +704,11 @@ func Statistics(N float32) { C.statistics(C.float(N)) } +// PortStatistics print statistics about NIC port. +func PortStatistics(port uint16) { + C.portStatistics(C.uint16_t(port)) +} + // ReportMempoolsState prints used and free space of mempools. func ReportMempoolsState() { for _, m := range usedMempools { diff --git a/internal/low/low.h b/internal/low/low.h index 46d22ecc..e9e2aedf 100644 --- a/internal/low/low.h +++ b/internal/low/low.h @@ -6,6 +6,7 @@ // Do not use signals in this C code without much need. // They can and probably will crash go runtime in complex errors. +#include #include #include #include @@ -35,17 +36,21 @@ // These constants are get from DPDK and checked for performance #define RX_RING_SIZE 128 -#define TX_RING_SIZE 512 +#define TX_RING_SIZE 2048 // 2 queues are enough for handling 40GBits. Should be checked for other NICs. // TODO This macro should be a function that will dynamically return the needed number of cores. -#define TX_QUEUE_NUMBER 2 +#define TX_QUEUE_NUMBER 16 +#define TX_QUEUE_CORES 2 +#define TX_ATTEMPTS 3 #define APP_RETA_SIZE_MAX (ETH_RSS_RETA_SIZE_512 / RTE_RETA_GROUP_SIZE) #define MAX_JUMBO_PKT_LEN 9600 // from most DPDK examples. Only for MEMORY_JUMBO. // #define DEBUG +// #define PORT_XSTATS_ENABLED +// #define DEBUG_PACKET_LOSS #define COUNTERS_ENABLED #define USE_INTERLOCKED_COUNTERS #define ANALYZE_PACKETS_SIZES @@ -138,9 +143,9 @@ bool analyze_packet_sizes = true; bool analyze_packet_sizes = false; #endif -long receive_received = 0, receive_pushed = 0; -long send_required = 0, send_sent = 0; -long stop_freed = 0; +volatile long long receive_received = 0, receive_pushed = 0; +volatile long long send_required = 0, send_sent = 0; +volatile long long stop_freed = 0; typedef struct { uint64_t PacketsProcessed, PacketsDropped, BytesProcessed; @@ -245,20 +250,27 @@ int checkRSSPacketCount(struct cPort *port, int16_t queue) { return rte_eth_rx_queue_count(port->PortId, queue); } -int check_port_rss(uint16_t port) { +uint16_t check_max_port_rx_queues(uint16_t port) { struct rte_eth_dev_info dev_info; memset(&dev_info, 0, sizeof(dev_info)); rte_eth_dev_info_get(port, &dev_info); return dev_info.max_rx_queues; } -int check_port_tx(uint16_t port) { +uint16_t check_max_port_tx_queues(uint16_t port) { struct rte_eth_dev_info dev_info; memset(&dev_info, 0, sizeof(dev_info)); rte_eth_dev_info_get(port, &dev_info); return dev_info.max_tx_queues; } +uint16_t check_current_port_tx_queues(uint16_t port) { + struct rte_eth_dev_info dev_info; + memset(&dev_info, 0, sizeof(dev_info)); + rte_eth_dev_info_get(port, &dev_info); + return dev_info.nb_tx_queues; +} + // Initializes a given port using global settings and with the RX buffers // coming from the mbuf_pool passed as a parameter. int port_init(uint16_t port, bool willReceive, struct rte_mempool **mbuf_pools, bool promiscuous, bool hwtxchecksum, bool hwrxpacketstimestamp, int32_t inIndex) { @@ -269,7 +281,7 @@ int port_init(uint16_t port, bool willReceive, struct rte_mempool **mbuf_pools, rte_eth_dev_info_get(port, &dev_info); if (tx_rings > dev_info.max_tx_queues) { - tx_rings = check_port_tx(port); + tx_rings = check_max_port_tx_queues(port); } if (willReceive) { @@ -345,6 +357,22 @@ int port_init(uint16_t port, bool willReceive, struct rte_mempool **mbuf_pools, .QueuesNumber = rx_rings }; +#ifdef DEBUG +#ifdef PORT_XSTATS_ENABLED + int stats_len = rte_eth_xstats_get(port, NULL, 0); + struct rte_eth_xstat_name *xstats_names; + assert(stats_len >= 0); + xstats_names = calloc(stats_len, sizeof(*xstats_names)); + assert(xstats_names != NULL); + int ret = rte_eth_xstats_get_names(port, xstats_names, stats_len); + assert(ret >= 0 && ret <= stats_len); + printf("Port %u exposes the following extended stats\n", port); + for (int i = 0; i < stats_len; i++) { + printf("\t%d: %s\n", i, xstats_names[i].name); + } + free(xstats_names); +#endif // PORT_XSTATS_ENABLED +#endif // DEBUG return 0; } @@ -477,8 +505,8 @@ void receiveRSS(uint16_t port, volatile int32_t *inIndex, struct rte_ring **out_ // Free any packets which can't be pushed to the ring. The ring is probably full. handleUnpushed((void*)bufs, pushed_pkts_number, rx_pkts_number); #ifdef DEBUG - receive_received += rx_pkts_number; - receive_pushed += pushed_pkts_number; + __sync_fetch_and_add(&receive_received, rx_pkts_number); + __sync_fetch_and_add(&receive_pushed, pushed_pkts_number); #endif // DEBUG } } @@ -529,36 +557,54 @@ void nff_go_KNI(uint16_t port, volatile int *flag, int coreId, *flag = wasStopped; } -void nff_go_send(uint16_t port, struct rte_ring **in_rings, int32_t inIndexNumber, bool anyway, volatile int *flag, int coreId, RXTXStats *stats) { +void nff_go_send(uint16_t port, struct rte_ring **in_rings, int32_t inIndexNumber, bool anyway, volatile int *flag, int coreId, RXTXStats *stats, int32_t sendThreadIndex, int32_t totalSendTreads) { setAffinity(coreId); struct rte_mbuf *bufs[BURST_SIZE]; uint16_t buf; uint16_t tx_pkts_number; - int16_t queue = 0; - bool switchQueue = (check_port_tx(port) > 1) && (anyway || inIndexNumber > 1); + int16_t port_tx_queues = check_current_port_tx_queues(port); + int16_t tx_qstart = port_tx_queues / totalSendTreads * sendThreadIndex; + int16_t tx_qend = port_tx_queues / totalSendTreads * (sendThreadIndex + 1); + int16_t tx_queue_counter = tx_qstart; + int rx_qstart = inIndexNumber / totalSendTreads * sendThreadIndex; + int rx_qend = inIndexNumber / totalSendTreads * (sendThreadIndex + 1); + printf("Starting send with %d to %d RX queues and %d to %d TX on core %d\n", + rx_qstart, rx_qend, tx_qstart, tx_qend, coreId); while (*flag == process) { - for (int q = 0; q < inIndexNumber; q++) { + for (int q = rx_qstart; q < rx_qend; q++) { // Get packets for TX from ring uint16_t pkts_for_tx_number = rte_ring_mc_dequeue_burst(in_rings[q], (void*)bufs, BURST_SIZE, NULL); if (unlikely(pkts_for_tx_number == 0)) continue; - tx_pkts_number = rte_eth_tx_burst(port, queue, bufs, pkts_for_tx_number); - // inIndexNumber must be "1" or even. This prevents any reordering. - // anyway allows reordering explicitly - if (switchQueue) { - queue = !queue; - } + tx_pkts_number = 0; + int tx_attempts_counter = 0; + do { + uint16_t iteration_tx_pkts = rte_eth_tx_burst(port, tx_queue_counter, bufs + tx_pkts_number, pkts_for_tx_number - tx_pkts_number); + tx_pkts_number += iteration_tx_pkts; + tx_attempts_counter++; + } while (tx_pkts_number < pkts_for_tx_number && tx_attempts_counter <= TX_ATTEMPTS); UPDATE_COUNTERS(tx_pkts_number, calculateSize(bufs, tx_pkts_number), pkts_for_tx_number - tx_pkts_number); +#ifdef DEBUG_PACKET_LOSS + if (unlikely(tx_pkts_number < pkts_for_tx_number)) { + printf("**** Port %d, queue %d tried to transmit %d, transmitted only %d\n", + port, tx_queue_counter + tx_queue_offset, pkts_for_tx_number, tx_pkts_number); + } +#endif // Free any unsent packets handleUnpushed(bufs, tx_pkts_number, pkts_for_tx_number); + + tx_queue_counter++; + if (tx_queue_counter >= tx_qend) { + tx_queue_counter = tx_qstart; + } #ifdef DEBUG - send_required += pkts_for_tx_number; - send_sent += tx_pkts_number; + __sync_fetch_and_add(&send_required, pkts_for_tx_number); + __sync_fetch_and_add(&send_sent, tx_pkts_number); #endif } } @@ -587,7 +633,7 @@ void nff_go_stop(struct rte_ring **in_rings, int len, volatile int *flag, int co rte_pktmbuf_free(bufs[buf]); } #ifdef DEBUG - stop_freed += pkts_for_free_number; + __sync_fetch_and_add(&stop_freed, pkts_for_free_number); #endif } } @@ -626,12 +672,12 @@ void statistics(float N) { fprintf(stderr, "DEBUG: Current speed of all receives: received %.0f Mbits/s, pushed %.0f Mbits/s\n", (receive_received/N) * multiplier, (receive_pushed/N) * multiplier); if (receive_pushed < receive_received) { - fprintf(stderr, "DROP: Receive dropped %ld packets\n", receive_received - receive_pushed); + fprintf(stderr, "DROP: Receive dropped %lld packets\n", receive_received - receive_pushed); } fprintf(stderr, "DEBUG: Current speed of all sends: required %.0f Mbits/s, sent %.0f Mbits/s\n", (send_required/N) * multiplier, (send_sent/N) * multiplier); if (send_sent < send_required) { - fprintf(stderr, "DROP: Send dropped %ld packets\n", send_required - send_sent); + fprintf(stderr, "DROP: Send dropped %lld packets\n", send_required - send_sent); } fprintf(stderr, "DEBUG: Current speed of stop ring: freed %.0f Mbits/s\n", (stop_freed/N) * multiplier); // Yes, there can be race conditions here. However in practise they are rare and it is more @@ -645,6 +691,78 @@ void statistics(float N) { #endif } +#define RX_Q0PACKETS 8 +#define RX_Q_PACKETS_STEP 3 +#define TX_Q0PACKETS 56 +#define TX_Q_PACKETS_STEP 2 +#define RX_Q0BURST 88 +#define RX_Q_BURST_STEP 1 +#define TX_Q0BURST 104 +#define TX_Q_BURST_STEP 1 + +void portStatistics(uint16_t port_id) { +#ifdef DEBUG + struct rte_eth_dev_info dev_info; + rte_eth_dev_info_get(port_id, &dev_info); + + struct rte_eth_stats eth_stats; + rte_eth_stats_get(port_id, ð_stats); + rte_eth_stats_reset(port_id); + + if (dev_info.nb_rx_queues > 0 || dev_info.nb_tx_queues > 0) { + fprintf(stderr, "Port %u (RX_Q %u/TX_Q %u): IP %lu, OP %lu, IB %lu, OB %lu, IMISS %lu, IERR %lu, OERR %lu, RX_NOMBUF %lu\n", + port_id, + dev_info.nb_rx_queues, dev_info.nb_tx_queues, + eth_stats.ipackets, + eth_stats.opackets, + eth_stats.ibytes, + eth_stats.obytes, + eth_stats.imissed, + eth_stats.ierrors, + eth_stats.oerrors, + eth_stats.rx_nombuf); + +#ifdef PORT_XSTATS_ENABLED + int len = rte_eth_xstats_get(port_id, NULL, 0); + assert(len >= 0); + struct rte_eth_xstat *xstats = calloc(len, sizeof(*xstats)); + int ret = rte_eth_xstats_get(port_id, xstats, len); + assert(ret >= 0 && ret <= len); + rte_eth_xstats_reset(port_id); +/* + fprintf(stderr, "RX_Q_PACKETS: "); + for (int i = 0; i < dev_info.nb_rx_queues - 1; i++) { + fprintf(stderr, "%d: %lu, ", i, xstats[RX_Q0PACKETS + i * RX_Q_PACKETS_STEP].value); + } + fprintf(stderr, "%d: %lu\n", dev_info.nb_rx_queues - 1, xstats[RX_Q0PACKETS + (dev_info.nb_rx_queues - 1) * RX_Q_PACKETS_STEP].value); + + fprintf(stderr, "TX_Q_PACKETS: "); + for (int i = 0; i < dev_info.nb_tx_queues - 1; i++) { + fprintf(stderr, "%d: %lu, ", i, xstats[TX_Q0PACKETS + i * TX_Q_PACKETS_STEP].value); + } + fprintf(stderr, "%d: %lu\n", dev_info.nb_tx_queues - 1, xstats[TX_Q0PACKETS + (dev_info.nb_tx_queues - 1) * TX_Q_PACKETS_STEP].value); +*/ + if (dev_info.nb_rx_queues > 0) { + fprintf(stderr, "RX_Q_BURST: "); + for (int i = 0; i < dev_info.nb_rx_queues - 1; i++) { + fprintf(stderr, "%d: %lu, ", i, xstats[RX_Q0BURST + i * RX_Q_BURST_STEP].value); + } + fprintf(stderr, "%d: %lu\n", dev_info.nb_rx_queues - 1, xstats[RX_Q0BURST + (dev_info.nb_rx_queues - 1) * RX_Q_BURST_STEP].value); + } + + if (dev_info.nb_tx_queues > 0) { + fprintf(stderr, "TX_Q_BURST: "); + for (int i = 0; i < dev_info.nb_tx_queues - 1; i++) { + fprintf(stderr, "%d: %lu, ", i, xstats[TX_Q0BURST + i * TX_Q_BURST_STEP].value); + } + fprintf(stderr, "%d: %lu\n", dev_info.nb_tx_queues - 1, xstats[TX_Q0BURST + (dev_info.nb_tx_queues - 1) * TX_Q_BURST_STEP].value); + } + free(xstats); +#endif // PORT_XSTATS_ENABLED + } +#endif +} + // Initialize the Environment Abstraction Layer (EAL) in DPDK. int eal_init(int argc, char *argv[], uint32_t burstSize, int32_t needKNI, bool noPacketHeadChange, bool needChainedReassembly, bool needChainedJumbo, bool needMemoryJumbo) { From 6d9cce59ee2266190edca0767f1e95166ffbbf92 Mon Sep 17 00:00:00 2001 From: Gregory Shimansky Date: Fri, 13 Sep 2019 14:02:10 -0700 Subject: [PATCH 22/24] Fixed number of tx queues propagation into port creation --- flow/flow.go | 11 +++++++---- internal/low/low.go | 5 +++-- internal/low/low.h | 2 +- 3 files changed, 11 insertions(+), 7 deletions(-) diff --git a/flow/flow.go b/flow/flow.go index 7a2da6bc..9087bdac 100644 --- a/flow/flow.go +++ b/flow/flow.go @@ -577,8 +577,8 @@ type Config struct { // SystemStop after it. It is enabled by default. NoSetSIGINTHandler bool // Number of CPU cores to be occupied by Send routines. It is - // necessary to set TXQueueNumber to a reasonably big number which - // can be divided by SendCPUCoresPerPort. + // necessary to set TXQueuesNumberPerPort to a reasonably big + // number which can be divided by SendCPUCoresPerPort. SendCPUCoresPerPort int // Number of transmit queues to use on network card. By default it // is minimum of NIC supported TX queues number and 2. If this @@ -608,11 +608,14 @@ func SystemInit(args *Config) error { } tXQueuesNumberPerPort = args.TXQueuesNumberPerPort + if tXQueuesNumberPerPort == 0 { + tXQueuesNumberPerPort = 2 + } sendCPUCoresPerPort = args.SendCPUCoresPerPort if sendCPUCoresPerPort == 0 { sendCPUCoresPerPort = 1 - if tXQueuesNumberPerPort != 0 && tXQueuesNumberPerPort%sendCPUCoresPerPort != 0 { + if tXQueuesNumberPerPort%sendCPUCoresPerPort != 0 { return common.WrapWithNFError(nil, "TXQueuesNumberPerPort should be divisible by SendCPUCoresPerPort", common.BadArgument) } @@ -772,7 +775,7 @@ func SystemInitPortsAndMemory() error { for i := range createdPorts { if createdPorts[i].wasRequested { if err := low.CreatePort(createdPorts[i].port, createdPorts[i].willReceive, - true, hwtxchecksum, hwrxpacketstimestamp, createdPorts[i].InIndex); err != nil { + true, hwtxchecksum, hwrxpacketstimestamp, createdPorts[i].InIndex, tXQueuesNumberPerPort); err != nil { return err } } diff --git a/internal/low/low.go b/internal/low/low.go index 3e7bec29..2e8a09b1 100644 --- a/internal/low/low.go +++ b/internal/low/low.go @@ -601,7 +601,8 @@ func CheckPortRSS(port uint16) int32 { } // CreatePort initializes a new port using global settings and parameters. -func CreatePort(port uint16, willReceive bool, promiscuous bool, hwtxchecksum, hwrxpacketstimestamp bool, inIndex int32) error { +func CreatePort(port uint16, willReceive bool, promiscuous bool, hwtxchecksum, + hwrxpacketstimestamp bool, inIndex int32, tXQueuesNumberPerPort int) error { var mempools **C.struct_rte_mempool if willReceive { m := CreateMempools("receive", inIndex) @@ -610,7 +611,7 @@ func CreatePort(port uint16, willReceive bool, promiscuous bool, hwtxchecksum, h mempools = nil } if C.port_init(C.uint16_t(port), C.bool(willReceive), mempools, - C._Bool(promiscuous), C._Bool(hwtxchecksum), C._Bool(hwrxpacketstimestamp), C.int32_t(inIndex)) != 0 { + C._Bool(promiscuous), C._Bool(hwtxchecksum), C._Bool(hwrxpacketstimestamp), C.int32_t(inIndex), C.int32_t (tXQueuesNumberPerPort)) != 0 { msg := common.LogError(common.Initialization, "Cannot init port ", port, "!") return common.WrapWithNFError(nil, msg, common.FailToInitPort) } diff --git a/internal/low/low.h b/internal/low/low.h index e9e2aedf..673f8215 100644 --- a/internal/low/low.h +++ b/internal/low/low.h @@ -273,7 +273,7 @@ uint16_t check_current_port_tx_queues(uint16_t port) { // Initializes a given port using global settings and with the RX buffers // coming from the mbuf_pool passed as a parameter. -int port_init(uint16_t port, bool willReceive, struct rte_mempool **mbuf_pools, bool promiscuous, bool hwtxchecksum, bool hwrxpacketstimestamp, int32_t inIndex) { +int port_init(uint16_t port, bool willReceive, struct rte_mempool **mbuf_pools, bool promiscuous, bool hwtxchecksum, bool hwrxpacketstimestamp, int32_t inIndex, int32_t tx_queues) { uint16_t rx_rings, tx_rings = TX_QUEUE_NUMBER; struct rte_eth_dev_info dev_info; From 3d0edd972637a56770c27e8ee34f410be30131d8 Mon Sep 17 00:00:00 2001 From: Gregory Shimansky Date: Mon, 16 Sep 2019 11:45:28 -0700 Subject: [PATCH 23/24] Propagate number of TX queues from app to port init --- internal/low/low.go | 4 ++++ internal/low/low.h | 12 ++++++------ 2 files changed, 10 insertions(+), 6 deletions(-) diff --git a/internal/low/low.go b/internal/low/low.go index 2e8a09b1..6ebdc31b 100644 --- a/internal/low/low.go +++ b/internal/low/low.go @@ -600,6 +600,10 @@ func CheckPortRSS(port uint16) int32 { return int32(C.check_max_port_rx_queues(C.uint16_t(port))) } +func CheckPortMaxTXQueues(port uint16) int32 { + return int32(C.check_max_port_tx_queues(C.uint16_t(port))) +} + // CreatePort initializes a new port using global settings and parameters. func CreatePort(port uint16, willReceive bool, promiscuous bool, hwtxchecksum, hwrxpacketstimestamp bool, inIndex int32, tXQueuesNumberPerPort int) error { diff --git a/internal/low/low.h b/internal/low/low.h index 673f8215..de591d39 100644 --- a/internal/low/low.h +++ b/internal/low/low.h @@ -40,7 +40,6 @@ // 2 queues are enough for handling 40GBits. Should be checked for other NICs. // TODO This macro should be a function that will dynamically return the needed number of cores. -#define TX_QUEUE_NUMBER 16 #define TX_QUEUE_CORES 2 #define TX_ATTEMPTS 3 @@ -274,14 +273,15 @@ uint16_t check_current_port_tx_queues(uint16_t port) { // Initializes a given port using global settings and with the RX buffers // coming from the mbuf_pool passed as a parameter. int port_init(uint16_t port, bool willReceive, struct rte_mempool **mbuf_pools, bool promiscuous, bool hwtxchecksum, bool hwrxpacketstimestamp, int32_t inIndex, int32_t tx_queues) { - uint16_t rx_rings, tx_rings = TX_QUEUE_NUMBER; + uint16_t rx_rings, tx_rings = tx_queues; struct rte_eth_dev_info dev_info; memset(&dev_info, 0, sizeof(dev_info)); rte_eth_dev_info_get(port, &dev_info); if (tx_rings > dev_info.max_tx_queues) { - tx_rings = check_max_port_tx_queues(port); + printf("Warning! Port %d does not support requested number of TX queues %d. Setting number of TX queues to %d\n", port, tx_rings, dev_info.max_tx_queues); + tx_rings = dev_info.max_tx_queues; } if (willReceive) { @@ -563,9 +563,9 @@ void nff_go_send(uint16_t port, struct rte_ring **in_rings, int32_t inIndexNumbe struct rte_mbuf *bufs[BURST_SIZE]; uint16_t buf; uint16_t tx_pkts_number; - int16_t port_tx_queues = check_current_port_tx_queues(port); - int16_t tx_qstart = port_tx_queues / totalSendTreads * sendThreadIndex; - int16_t tx_qend = port_tx_queues / totalSendTreads * (sendThreadIndex + 1); + int16_t port_tx_queues = check_current_port_tx_queues(port); + int16_t tx_qstart = port_tx_queues / totalSendTreads * sendThreadIndex; + int16_t tx_qend = sendThreadIndex + 1 == totalSendTreads ? port_tx_queues : port_tx_queues / totalSendTreads * (sendThreadIndex + 1); int16_t tx_queue_counter = tx_qstart; int rx_qstart = inIndexNumber / totalSendTreads * sendThreadIndex; int rx_qend = inIndexNumber / totalSendTreads * (sendThreadIndex + 1); From 6a17849d6fd2fd84497f316f65929c6afe6d251f Mon Sep 17 00:00:00 2001 From: Gregory Shimansky Date: Thu, 19 Sep 2019 12:56:28 -0500 Subject: [PATCH 24/24] Added missing semicolon --- mk/leaf.mk | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/mk/leaf.mk b/mk/leaf.mk index 50dab007..b3cfaac2 100644 --- a/mk/leaf.mk +++ b/mk/leaf.mk @@ -57,7 +57,7 @@ deploy: .check-deploy-env images $(eval TMPNAME=tmp-$(IMAGENAME).tar) docker save $(WORKIMAGENAME) > $(TMPNAME) for host in `echo $(NFF_GO_HOSTS) | tr ',' ' '`; do \ - echo Uploading $(WORKIMAGENAME) to $$host \ + echo Uploading $(WORKIMAGENAME) to $$host; \ if ! docker -H tcp://$$host load < $(TMPNAME); then break; fi; \ done rm $(TMPNAME)