diff --git a/.gitignore b/.gitignore index d29de412..035f6cc1 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,6 @@ # emacs *~ doc +GPATH +GTAGS +GRTAGS diff --git a/.travis.yml b/.travis.yml index 10daa873..0c8b8549 100644 --- a/.travis.yml +++ b/.travis.yml @@ -12,9 +12,9 @@ services: before_script: - sudo apt-get install -y linux-headers-$(uname -r) - - docker pull ubuntu:cosmic - - docker build --build-arg https_proxy=${https_proxy} -t test-cosmic . - - docker run -it -d --privileged -v /usr/src:/usr/src -v /lib/modules:/lib/modules -v /sys/devices/system/node:/sys/devices/system/node --name test-nff-go test-cosmic /bin/bash + - docker pull ubuntu:disco + - docker build --build-arg https_proxy=${https_proxy} -t test-disco . + - docker run -it -d --privileged -v /usr/src:/usr/src -v /lib/modules:/lib/modules -v /sys/devices/system/node:/sys/devices/system/node --name test-nff-go test-disco /bin/bash script: - docker exec -i test-nff-go go mod download diff --git a/Dockerfile b/Dockerfile index 43267906..757a6841 100644 --- a/Dockerfile +++ b/Dockerfile @@ -20,7 +20,11 @@ RUN apt-get -q update && apt-get -q -y install \ libmnl-dev \ libibverbs-dev -RUN cd /opt && curl -L -s https://dl.google.com/go/go1.12.5.linux-amd64.tar.gz | tar zx +RUN cd /opt && curl -L -s https://dl.google.com/go/go1.12.9.linux-amd64.tar.gz | tar zx +RUN git clone -b v0.0.4 https://github.com/libbpf/libbpf +RUN make -C libbpf/src all install +RUN echo "/usr/lib64" > /etc/ld.so.conf.d/usrlib64.conf +RUN ldconfig RUN mkdir -p ${NFF_GO} COPY . ${NFF_GO} diff --git a/README.md b/README.md index 67afd418..362e4442 100644 --- a/README.md +++ b/README.md @@ -35,7 +35,7 @@ func main() { flow.CheckFatal(flow.SystemInit(&config)) // Get filtering rules from access control file. - L3Rules, err := packet.GetL3ACLFromORIG("Firewall.conf") + L3Rules, err := packet.GetL3ACLFromTextTable("Firewall.conf") flow.CheckFatal(err) // Receive packets from zero port. Receive queue will be added automatically. @@ -118,6 +118,27 @@ Use Go version 1.11.4 or higher. To check the version of Go, do: go version +### AF_XDP support + +AF_XDP support is enabled by default, and it requires you to install +`libbpf` package. At the time of writing Ubuntu doesn't have this +library among its packages, so it is necessary to build `libbpf` from +sources or disable AF_XDP socket support. + +To disable it set variable `NFF_GO_NO_BPF_SUPPORT` to some unempty +value. When NFF_GO is built with it, AF_XDP support is disaled and +using it results in errors. + +If you want to build `libbpf` from sources you can do it in two +different ways. +* If you are using stock Linux kernel from distribution, [download + `libbpf` from GitHub](https://github.com/libbpf/libbpf), then + execute `cd src; make; sudo make install`. Add /usr/lib64 to your + ldconfig path. +* If you build Linux kernel from sources, you can build `libbpf` from + Linux source tree using commands `cd tools/lib/bpf; make; sudo make + install install_headers`. Add /usr/local/lib64 to your ldconfig path. + ## Building NFF-GO When Go compiler runs for the first time it downloads all dependent diff --git a/common/Makefile b/common/Makefile index 06a019f9..b28cbc7e 100644 --- a/common/Makefile +++ b/common/Makefile @@ -7,7 +7,7 @@ include $(PATH_TO_MK)/include.mk .PHONY: testing testing: check-pktgen - go test + go test -tags "${GO_BUILD_TAGS}" .PHONY: coverage coverage: diff --git a/devices/misc.go b/devices/misc.go index ee6b14e0..b2dd37a1 100644 --- a/devices/misc.go +++ b/devices/misc.go @@ -40,12 +40,18 @@ func GetDeviceID(nicName string) (string, error) { return "", err } // raw should be like /sys/devices/pci0002:00/0000:00:08.0/virtio2/net/ens8 + // or /sys/devices/pci0000:00/0000:00:01.0/0000:03:00.2/net/ens4f2 raws := strings.Split(raw, "/") - if len(raws) < 5 { + if len(raws) < 6 { return "", fmt.Errorf("path not correct") } - return raws[4], nil - + if IsPciID.Match([]byte(raws[5])) { + return raws[5], nil + } else if IsPciID.Match([]byte(raws[4])) { + return raws[4], nil + } else { + return "", fmt.Errorf("can't get device ID from path: %s", raw) + } } // IsModuleLoaded checks if the kernel has already loaded the driver or not. diff --git a/dpdk/dpdk b/dpdk/dpdk index 07efd6dd..b81660d0 160000 --- a/dpdk/dpdk +++ b/dpdk/dpdk @@ -1 +1 @@ -Subproject commit 07efd6ddc0499688eb11ae4866d3532295d6db2b +Subproject commit b81660d09e9caa928ab1c683cd0fbfbe2439ac3d diff --git a/dpdk/pktgen-dpdk b/dpdk/pktgen-dpdk index ae5a88bf..7a4e0bcc 160000 --- a/dpdk/pktgen-dpdk +++ b/dpdk/pktgen-dpdk @@ -1 +1 @@ -Subproject commit ae5a88bf89e421ea84d680a7e5613af959243a4b +Subproject commit 7a4e0bcc7c6310dfd244c3a063769d9f774530c3 diff --git a/examples/Makefile b/examples/Makefile index 3aaaef21..b477ffaf 100644 --- a/examples/Makefile +++ b/examples/Makefile @@ -8,15 +8,11 @@ EXECUTABLES = dump clonablePcapDumper kni copy errorHandling timer \ createPacket sendFixedPktsNumber gtpu pingReplay \ netlink gopacketParserExample devbind generate \ OSforwarding jumbo decrementTTL +SUBDIRS = tutorial antiddos demo fileReadWrite firewall forwarding ipsec lb nffPktgen -SUBDIRS = tutorial antiddos demo fileReadWrite firewall forwarding ipsec lb - -.PHONY: dpi nffPktgen +.PHONY: dpi dpi: $(MAKE) -C dpi -nffPktgen: - $(MAKE) -C nffPktgen - include $(PATH_TO_MK)/intermediate.mk include $(PATH_TO_MK)/leaf.mk diff --git a/examples/OSforwarding.go b/examples/OSforwarding.go index a845e160..ef1a58cd 100644 --- a/examples/OSforwarding.go +++ b/examples/OSforwarding.go @@ -10,15 +10,23 @@ import ( ) func main() { + // If you use af-xdp mode you need to configure queues: + // e.g. ethtool -N my_device flow-type tcp4 dst-port 4242 action 16 + afXDP := flag.Bool("af-xdp", false, "use af-xdp. need to use ethtool to setup queues") inport := flag.String("in", "", "device for receiver") + inQueue := flag.Int("in-queue", 16, "queue for receiver") outport := flag.String("out", "", "device for sender") flag.Parse() flow.CheckFatal(flow.SystemInit(nil)) - - inputFlow, err := flow.SetReceiverOS(*inport) - flow.CheckFatal(err) - flow.CheckFatal(flow.SetSenderOS(inputFlow, *outport)) - + if *afXDP { + inputFlow, err := flow.SetReceiverXDP(*inport, *inQueue) + flow.CheckFatal(err) + flow.CheckFatal(flow.SetSenderXDP(inputFlow, *outport)) + } else { + inputFlow, err := flow.SetReceiverOS(*inport) + flow.CheckFatal(err) + flow.CheckFatal(flow.SetSenderOS(inputFlow, *outport)) + } flow.CheckFatal(flow.SystemStart()) } diff --git a/examples/errorHandling.go b/examples/errorHandling.go index 26d810cf..aefff719 100644 --- a/examples/errorHandling.go +++ b/examples/errorHandling.go @@ -49,11 +49,11 @@ func main() { } // Get splitting rules from access control file. - l3Rules, err = packet.GetL3ACLFromORIG("wrong.conf") - fmt.Printf("error from GetL3ACLFromORIG was: %+v\n", err) + l3Rules, err = packet.GetL3ACLFromTextTable("wrong.conf") + fmt.Printf("error from GetL3ACLFromTextTable was: %+v\n", err) if common.GetNFErrorCode(err) == common.FileErr { fmt.Printf("changing file name\n") - l3Rules, err = packet.GetL3ACLFromORIG("Forwarding.conf") + l3Rules, err = packet.GetL3ACLFromTextTable("Forwarding.conf") CheckFatal(err) } diff --git a/examples/firewall/firewall.go b/examples/firewall/firewall.go index 30bb9ecc..bf1cf990 100644 --- a/examples/firewall/firewall.go +++ b/examples/firewall/firewall.go @@ -29,7 +29,7 @@ func main() { flow.CheckFatal(flow.SystemInit(&config)) // Get filtering rules from access control file. - l3Rules, err = packet.GetL3ACLFromORIG("firewall.conf") + l3Rules, err = packet.GetL3ACLFromTextTable("firewall.conf") flow.CheckFatal(err) // Receive packets from zero port. Receive queue will be added automatically. diff --git a/examples/forwarding/forwarding.go b/examples/forwarding/forwarding.go index d747d96e..cfbd18f6 100644 --- a/examples/forwarding/forwarding.go +++ b/examples/forwarding/forwarding.go @@ -37,7 +37,7 @@ func main() { flow.CheckFatal(flow.SystemInit(&config)) // Get splitting rules from access control file. - l3Rules, err = packet.GetL3ACLFromORIG("forwarding.conf") + l3Rules, err = packet.GetL3ACLFromTextTable("forwarding.conf") flow.CheckFatal(err) // Receive packets from zero port. Receive queue will be added automatically. diff --git a/examples/nffPktgen/gtp-u/Dockerfile b/examples/nffPktgen/gtp-u/Dockerfile new file mode 100644 index 00000000..0e5a33c3 --- /dev/null +++ b/examples/nffPktgen/gtp-u/Dockerfile @@ -0,0 +1,11 @@ +# Copyright 2019 Intel Corporation. +# Use of this source code is governed by a BSD-style +# license that can be found in the LICENSE file. + +ARG USER_NAME +FROM ${USER_NAME}/nff-go-base + +LABEL RUN docker run -it --privileged -v /sys/bus/pci/drivers:/sys/bus/pci/drivers -v /sys/kernel/mm/hugepages:/sys/kernel/mm/hugepages -v /sys/devices/system/node:/sys/devices/system/node -v /dev:/dev --name NAME -e NAME=NAME -e IMAGE=IMAGE IMAGE + +WORKDIR /workdir +COPY trafficgen . diff --git a/examples/nffPktgen/gtp-u/trafficgen.go b/examples/nffPktgen/gtp-u/trafficgen.go index 2f55de0e..a70f676e 100644 --- a/examples/nffPktgen/gtp-u/trafficgen.go +++ b/examples/nffPktgen/gtp-u/trafficgen.go @@ -192,6 +192,7 @@ func initPortFlows(port *IpPort, myIPs generator.AddrRange, addEncapsulation boo flow.CheckFatal(flow.SetSender(outFlow, uint16(port.Index))) // Input flow inFlow, err := flow.SetReceiver(port.Index) + flow.CheckFatal(err) flow.CheckFatal(flow.SetHandlerDrop(inFlow, receiveHandler, hc)) flow.CheckFatal(flow.SetStopper(inFlow)) } diff --git a/examples/nffPktgen/testing/Dockerfile b/examples/nffPktgen/testing/Dockerfile index fe79ce59..21a1514b 100644 --- a/examples/nffPktgen/testing/Dockerfile +++ b/examples/nffPktgen/testing/Dockerfile @@ -9,5 +9,5 @@ LABEL RUN docker run -it --privileged -v /sys/bus/pci/drivers:/sys/bus/pci/drive WORKDIR /workdir COPY sendGetBack . -COPY perfTesting . +COPY perfTest . COPY dump . diff --git a/examples/nffPktgen/testing/minPktLoss.go b/examples/nffPktgen/testing/minPktLoss.go new file mode 100644 index 00000000..9bce7602 --- /dev/null +++ b/examples/nffPktgen/testing/minPktLoss.go @@ -0,0 +1,210 @@ +// Copyright 2018-2019 Intel Corporation. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package main + +import ( + "flag" + "fmt" + "os" + "os/signal" + "sync/atomic" + "time" + + "github.com/intel-go/nff-go/examples/nffPktgen/generator" + "github.com/intel-go/nff-go/flow" + "github.com/intel-go/nff-go/packet" +) + +type IpPort struct { + Index uint16 + packetCount uint64 + bytesCount uint64 +} + +type HandlerContext struct { + port *IpPort +} + +func (hc HandlerContext) Copy() interface{} { + return HandlerContext{ + port: hc.port, + } +} + +func (hc HandlerContext) Delete() { +} + +func main() { + var ( + speed uint64 + tgtLoss float64 + trafDelay uint + genConfig, cores string + inPort uint + outPort uint + ) + flag.Uint64Var(&speed, "speed", 120000000, "speed of fast generator, Pkts/s") + flag.Float64Var(&tgtLoss, "target loss", 0.5, "target packet loss percentage") + flag.UintVar(&trafDelay, "traffic delay", 3, "time delay after speed is updated, sec") + flag.StringVar(&genConfig, "config", "ip4.json", "specifies config for generator") + flag.StringVar(&cores, "cores", "", "specifies cores") + flag.UintVar(&outPort, "outPort", 1, "specifies output port") + flag.UintVar(&inPort, "inPort", 1, "specifices input port") + testTime := flag.Uint("t", 0, "run generator for specified period of time in seconds, use zero to run forever") + statInterval := flag.Uint("s", 0, "statistics update interval in seconds, use zero to disable it") + flag.Parse() + + + portStats := IpPort { + Index: uint16(inPort), + } + + // Init NFF-GO system at 16 available cores + config := flow.Config { + CPUList: cores, + } + flow.CheckFatal(flow.SystemInit(&config)) + + configuration, err := generator.ReadConfig(genConfig) + if err != nil { + panic(fmt.Sprintf("%s config reading failed: %v", genConfig, err)) + } + context, err := generator.GetContext(configuration) + flow.CheckFatal(err) + outFlow, genChan, _ := flow.SetFastGenerator(generator.Generate, speed, context) + flow.CheckFatal(flow.SetSender(outFlow, uint16(outPort))) + + hc := HandlerContext { + port: &portStats, + } + inFlow, err := flow.SetReceiver(uint16(inPort)) + flow.CheckFatal(err) + flow.CheckFatal(flow.SetHandlerDrop(inFlow, receiveHandler, hc)) + flow.CheckFatal(flow.SetStopper(inFlow)) + + go func() { + flow.CheckFatal(flow.SystemStart()) + }() + + // Set up finish channels + interruptChannel := make(chan os.Signal, 1) + signal.Notify(interruptChannel, os.Interrupt) + + var finishChannel, statsChannel <-chan time.Time + if *testTime > 0 { + finishChannel = time.NewTicker(time.Duration(*testTime) * time.Second).C + } + if *statInterval > 0 { + statsChannel = time.NewTicker(time.Duration(*statInterval) * time.Second).C + } + + started := time.Now() + + // optimize speed based on packet loss: + maxSpeed := -1 + var maxSpeedPkt float64 = -1 + var totalPktRX, totalPktTX uint64 = 0, 0 + + go func() { + gen := generator.GetGenerator() + low := 0 + high := 100 + currSpeed := 100 + + // data at line rate: + portStats.packetCount = 0 + gen.Count = 0 + time.Sleep(time.Duration(trafDelay) * time.Second) + pktRX := portStats.packetCount + pktTX := gen.GetGeneratedNumber() + pktLoss := calcPktLoss(pktRX, pktTX) + totalPktRX += pktRX + totalPktTX += pktTX + printStats(&portStats, pktRX, pktTX, pktLoss, started, currSpeed, float64(speed)) + + // binary search: + for low <= high { + mid := (low + high) / 2 + currSpeed = mid + updatedSpeed := 0.01 * float64(uint64(currSpeed) * speed) + + // reset counters: + portStats.packetCount = 0 + gen.Count = 0 + + genChan <- uint64(updatedSpeed) + time.Sleep(time.Duration(trafDelay) * time.Second) + + pktRX = portStats.packetCount + pktTX = gen.GetGeneratedNumber() + pktLoss = calcPktLoss(pktRX, pktTX) + + if pktLoss <= tgtLoss { + low = mid + 1 // tgt met so try higher speed + if currSpeed > maxSpeed { + maxSpeed = currSpeed + maxSpeedPkt = updatedSpeed + } + } else { + high = mid - 1 // tgt failed so try lower speed + } + + totalPktRX += pktRX + totalPktTX += pktTX + // fmt.Printf("\nmid: %d\nupdatedSpeed: %d%%(%f pkt/s)\npktCountTX: %d\npktCountRX: %d\npktLoss: %f\nlow: %d\nhigh: %d\n\n", mid, currSpeed, updatedSpeed, pktTX, pktRX, pktLoss, low, high) // debugging purposes + + printStats(&portStats, pktRX, pktTX, pktLoss, started, currSpeed, updatedSpeed) + } + fmt.Printf("--------------------BINARY SEARCH COMPLETE!!!--------------------\n") + interruptChannel <- os.Interrupt + }() + + +out: + for { + select { + case sig := <-interruptChannel: + fmt.Printf("Received signal %v, finishing.\n", sig) + break out + case <-finishChannel: + fmt.Println("Test timeout reached") + break out + case <-statsChannel: + printStats(&portStats, totalPktRX, totalPktTX, calcPktLoss(totalPktRX, totalPktTX), started, maxSpeed, maxSpeedPkt) + } + } + printTotals(&portStats, totalPktRX, totalPktTX, started, maxSpeed, maxSpeedPkt) +} + +func calcPktLoss(pktCountRX uint64, pktCountTX uint64) float64 { + return (float64(pktCountTX - pktCountRX) / float64(pktCountTX)) * 100.0 +} + +func receiveHandler(pkt *packet.Packet, ctx flow.UserContext) bool { + hc := ctx.(HandlerContext) + + atomic.AddUint64(&hc.port.packetCount, 1) + atomic.AddUint64(&hc.port.bytesCount, uint64(pkt.GetPacketLen())) + return true +} + +func printStats(port *IpPort, pktRX uint64, pktTX uint64, pktLoss float64, started time.Time, currSpeed int, updatedSpeed float64) { + fmt.Printf("\n%v: ", time.Since(started)) + fmt.Printf("Port %d pkts TX: %d / pkts RX: %d\n", port.Index, pktTX, pktRX) + fmt.Printf("Pkt loss: %f%%\n", pktLoss) + fmt.Printf("Current Speed: %d%% (%f pkts/s)\n\n", currSpeed, updatedSpeed) +} + +func printTotals(port *IpPort, totalPktRX uint64, totalPktTX uint64, started time.Time, maxSpeed int, maxSpeedPkt float64) { + runtime := time.Since(started) + runtimeint := uint64(runtime) / uint64(time.Second) + totalPktLoss := calcPktLoss(totalPktRX, totalPktTX) + fmt.Printf("\nTest executed for %v\n", runtime) + fmt.Printf("Port %d total pkts TX: %d / pkts RX: %d\n", port.Index, totalPktTX, totalPktRX) + fmt.Printf("Total pkt loss: %f%%\n", totalPktLoss) + fmt.Printf("Port %d pkts/s: %v\n", port.Index, totalPktRX/runtimeint) + fmt.Printf("Port %d kB/s: %v\n", port.Index, port.bytesCount/runtimeint/1000) + fmt.Printf("Max Speed: %d%% (%f pkts/s)\n\n", maxSpeed, maxSpeedPkt) +} diff --git a/examples/nffPktgen/testing/perfTest.go b/examples/nffPktgen/testing/perfTest.go index abd7d390..2d922480 100644 --- a/examples/nffPktgen/testing/perfTest.go +++ b/examples/nffPktgen/testing/perfTest.go @@ -1,4 +1,4 @@ -// Copyright 2018 Intel Corporation. +// Copyright 2018-2019 Intel Corporation. // Use of this source code is governed by a BSD-style // license that can be found in the LICENSE file. @@ -7,22 +7,55 @@ package main import ( "flag" "fmt" + "os" + "os/signal" + "sync/atomic" + "time" + "github.com/intel-go/nff-go/examples/nffPktgen/generator" "github.com/intel-go/nff-go/flow" + "github.com/intel-go/nff-go/packet" ) +type IpPort struct { + Index uint16 + packetCount uint64 + bytesCount uint64 +} + +type HandlerContext struct { + port *IpPort +} + +func (hc HandlerContext) Copy() interface{} { + return HandlerContext{ + port: hc.port, + } +} + +func (hc HandlerContext) Delete() { +} + func main() { var ( speed uint64 genConfig, cores string port uint + receive bool ) flag.Uint64Var(&speed, "speed", 120000000, "speed of fast generator, Pkts/s") flag.StringVar(&genConfig, "config", "ip4.json", "specifies config for generator") flag.StringVar(&cores, "cores", "0-2", "specifies cores") flag.UintVar(&port, "port", 1, "specifies output port") + flag.BoolVar(&receive, "receive", false, "Receive packets back and print statistics") + testTime := flag.Uint("t", 30, "run generator for specified period of time in seconds, use zero to run forever") + statInterval := flag.Uint("s", 2, "statistics update interval in seconds, use zero to disable it") flag.Parse() + portStats := IpPort{ + Index: uint16(port), + } + // Init NFF-GO system at 16 available cores config := flow.Config{ CPUList: cores, @@ -37,5 +70,69 @@ func main() { flow.CheckFatal(err) outFlow, _, _ := flow.SetFastGenerator(generator.Generate, speed, context) flow.CheckFatal(flow.SetSender(outFlow, uint16(port))) - flow.CheckFatal(flow.SystemStart()) + if receive { + hc := HandlerContext{ + port: &portStats, + } + inFlow, err := flow.SetReceiver(uint16(port)) + flow.CheckFatal(err) + flow.CheckFatal(flow.SetHandlerDrop(inFlow, receiveHandler, hc)) + flow.CheckFatal(flow.SetStopper(inFlow)) + } + + go func() { + flow.CheckFatal(flow.SystemStart()) + }() + + // Set up finish channels + interruptChannel := make(chan os.Signal, 1) + signal.Notify(interruptChannel, os.Interrupt) + + var finishChannel, statsChannel <-chan time.Time + if *testTime > 0 { + finishChannel = time.NewTicker(time.Duration(*testTime) * time.Second).C + } + if receive && *statInterval > 0 { + statsChannel = time.NewTicker(time.Duration(*statInterval) * time.Second).C + } + + started := time.Now() +out: + for { + select { + case sig := <-interruptChannel: + fmt.Printf("Received signal %v, finishing.\n", sig) + break out + case <-finishChannel: + fmt.Println("Test timeout reached") + break out + case <-statsChannel: + printStats(&portStats, started) + } + } + if receive { + printStats(&portStats, started) + printTotals(&portStats, started) + } +} + +func receiveHandler(pkt *packet.Packet, ctx flow.UserContext) bool { + hc := ctx.(HandlerContext) + + atomic.AddUint64(&hc.port.packetCount, 1) + atomic.AddUint64(&hc.port.bytesCount, uint64(pkt.GetPacketLen())) + return true +} + +func printStats(port *IpPort, started time.Time) { + fmt.Printf("%v: ", time.Since(started)) + fmt.Printf("Port %d received %vpkts, %vkB\n", port.Index, port.packetCount, port.bytesCount/1000) +} + +func printTotals(port *IpPort, started time.Time) { + runtime := time.Since(started) + runtimeint := uint64(runtime) / uint64(time.Second) + fmt.Printf("\nTest executed for %v\n", runtime) + fmt.Printf("Port %d pkts/s: %v\n", port.Index, port.packetCount/runtimeint) + fmt.Printf("Port %d kB/s: %v\n", port.Index, port.bytesCount/runtimeint/1000) } diff --git a/examples/nffPktgen/testing/testnat.json b/examples/nffPktgen/testing/testnat.json new file mode 100644 index 00000000..6933bae5 --- /dev/null +++ b/examples/nffPktgen/testing/testnat.json @@ -0,0 +1,32 @@ +{ + "ether": { + "saddr": "00:25:96:FF:FE:12", + "daddr": "00:FF:96:FF:FE:12", + "ipv4": { + "saddr": { + "range": { + "min": "192.16.0.0", + "max": "192.16.0.255" + } + }, + "daddr": { + "range": { + "min": "172.16.0.0", + "max": "172.16.0.10" + } + }, + "udp": { + "sport": { + "range": { + "min": 1120, + "max": 1152 + } + }, + "dport": 1020, + "raw": { + "data": "123456789012345678" + } + } + } + } +} diff --git a/examples/tutorial/step05.go b/examples/tutorial/step05.go index f2044241..2e1abdf8 100644 --- a/examples/tutorial/step05.go +++ b/examples/tutorial/step05.go @@ -11,7 +11,7 @@ func main() { initCommonState() - l3Rules, err = packet.GetL3ACLFromORIG("rules1.conf") + l3Rules, err = packet.GetL3ACLFromTextTable("rules1.conf") flow.CheckFatal(err) firstFlow, err := flow.SetReceiver(0) diff --git a/examples/tutorial/step06.go b/examples/tutorial/step06.go index 1fe2f99b..74fbff0a 100644 --- a/examples/tutorial/step06.go +++ b/examples/tutorial/step06.go @@ -11,7 +11,7 @@ func main() { initCommonState() - l3Rules, err = packet.GetL3ACLFromORIG("rules1.conf") + l3Rules, err = packet.GetL3ACLFromTextTable("rules1.conf") flow.CheckFatal(err) firstFlow, err := flow.SetReceiver(0) diff --git a/examples/tutorial/step07.go b/examples/tutorial/step07.go index 7fd1ae89..1cd076cf 100644 --- a/examples/tutorial/step07.go +++ b/examples/tutorial/step07.go @@ -13,7 +13,7 @@ func main() { initCommonState() - l3Rules, err := packet.GetL3ACLFromORIG("rules1.conf") + l3Rules, err := packet.GetL3ACLFromTextTable("rules1.conf") flow.CheckFatal(err) rulesp = unsafe.Pointer(&l3Rules) go updateSeparateRules() @@ -37,7 +37,7 @@ func mySeparator(cur *packet.Packet, ctx flow.UserContext) bool { func updateSeparateRules() { for { time.Sleep(time.Second * 5) - locall3Rules, err := packet.GetL3ACLFromORIG("rules1.conf") + locall3Rules, err := packet.GetL3ACLFromTextTable("rules1.conf") flow.CheckFatal(err) atomic.StorePointer(&rulesp, unsafe.Pointer(locall3Rules)) } diff --git a/examples/tutorial/step08.go b/examples/tutorial/step08.go index 66de6ce7..093620f2 100644 --- a/examples/tutorial/step08.go +++ b/examples/tutorial/step08.go @@ -13,7 +13,7 @@ func main() { initCommonState() - l3Rules, err := packet.GetL3ACLFromORIG("rules2.conf") + l3Rules, err := packet.GetL3ACLFromTextTable("rules2.conf") flow.CheckFatal(err) rulesp = unsafe.Pointer(&l3Rules) go updateSeparateRules() @@ -38,7 +38,7 @@ func mySplitter(cur *packet.Packet, ctx flow.UserContext) uint { func updateSeparateRules() { for { time.Sleep(time.Second * 5) - locall3Rules, err := packet.GetL3ACLFromORIG("rules2.conf") + locall3Rules, err := packet.GetL3ACLFromTextTable("rules2.conf") flow.CheckFatal(err) atomic.StorePointer(&rulesp, unsafe.Pointer(locall3Rules)) } diff --git a/examples/tutorial/step09.go b/examples/tutorial/step09.go index 4442bb4c..3b4b18bf 100644 --- a/examples/tutorial/step09.go +++ b/examples/tutorial/step09.go @@ -14,7 +14,7 @@ func main() { initCommonState() - l3Rules, err := packet.GetL3ACLFromORIG("rules2.conf") + l3Rules, err := packet.GetL3ACLFromTextTable("rules2.conf") flow.CheckFatal(err) rulesp = unsafe.Pointer(&l3Rules) go updateSeparateRules() @@ -49,7 +49,7 @@ func myHandler(cur *packet.Packet, ctx flow.UserContext) { func updateSeparateRules() { for { time.Sleep(time.Second * 5) - locall3Rules, err := packet.GetL3ACLFromORIG("rules2.conf") + locall3Rules, err := packet.GetL3ACLFromTextTable("rules2.conf") flow.CheckFatal(err) atomic.StorePointer(&rulesp, unsafe.Pointer(locall3Rules)) } diff --git a/examples/tutorial/step10.go b/examples/tutorial/step10.go index 88be669b..b7f6c0d2 100644 --- a/examples/tutorial/step10.go +++ b/examples/tutorial/step10.go @@ -16,7 +16,7 @@ func main() { initCommonState() - l3Rules, err := packet.GetL3ACLFromORIG("rules2.conf") + l3Rules, err := packet.GetL3ACLFromTextTable("rules2.conf") flow.CheckFatal(err) rulesp = unsafe.Pointer(&l3Rules) go updateSeparateRules() @@ -56,7 +56,7 @@ func myHandler(curV []*packet.Packet, mask *[vecSize]bool, ctx flow.UserContext) func updateSeparateRules() { for { time.Sleep(time.Second * 5) - locall3Rules, err := packet.GetL3ACLFromORIG("rules2.conf") + locall3Rules, err := packet.GetL3ACLFromTextTable("rules2.conf") flow.CheckFatal(err) atomic.StorePointer(&rulesp, unsafe.Pointer(locall3Rules)) } diff --git a/examples/tutorial/step11.go b/examples/tutorial/step11.go index a619e74f..93f8782e 100644 --- a/examples/tutorial/step11.go +++ b/examples/tutorial/step11.go @@ -16,7 +16,7 @@ func main() { initCommonState() - l3Rules, err := packet.GetL3ACLFromORIG("rules2.conf") + l3Rules, err := packet.GetL3ACLFromTextTable("rules2.conf") flow.CheckFatal(err) rulesp = unsafe.Pointer(&l3Rules) go updateSeparateRules() @@ -63,7 +63,7 @@ func heavyCode() { func updateSeparateRules() { for { time.Sleep(time.Second * 5) - locall3Rules, err := packet.GetL3ACLFromORIG("rules2.conf") + locall3Rules, err := packet.GetL3ACLFromTextTable("rules2.conf") flow.CheckFatal(err) atomic.StorePointer(&rulesp, unsafe.Pointer(locall3Rules)) } diff --git a/flow/flow.go b/flow/flow.go index ffe20cd0..9087bdac 100644 --- a/flow/flow.go +++ b/flow/flow.go @@ -32,6 +32,7 @@ package flow import ( "net" "os" + "os/signal" "runtime" "sync/atomic" "time" @@ -48,7 +49,7 @@ var createdPorts []port var portPair map[types.IPv4Address](*port) var schedState *scheduler var vEach [10][vBurstSize]uint8 -var devices map[string]int +var ioDevices map[string]interface{} type Timer struct { t *time.Ticker @@ -160,7 +161,7 @@ func addReceiver(portId uint16, out low.Rings, inIndexNumber int32) { par.port = low.GetPort(portId) par.out = out par.status = make([]int32, maxRecv, maxRecv) - schedState.addFF("receiver", nil, recvRSS, nil, par, nil, receiveRSS, inIndexNumber, &par.stats) + schedState.addFF("receiverPort"+string(portId), nil, recvRSS, nil, par, nil, receiveRSS, inIndexNumber, &par.stats) } type receiveOSParameters struct { @@ -176,6 +177,19 @@ func addOSReceiver(socket int, out low.Rings) { schedState.addFF("OS receiver", nil, recvOS, nil, par, nil, sendReceiveKNI, 0, &par.stats) } +type receiveXDPParameters struct { + out low.Rings + socket low.XDPSocket + stats common.RXTXStats +} + +func addXDPReceiver(socket low.XDPSocket, out low.Rings) { + par := new(receiveXDPParameters) + par.socket = socket + par.out = out + schedState.addFF("AF_XDP receiver", nil, recvXDP, nil, par, nil, sendReceiveKNI, 0, &par.stats) +} + type KNIParameters struct { in low.Rings out low.Rings @@ -242,18 +256,23 @@ func addFastGenerator(out low.Rings, generateFunction GenerateFunction, } type sendParameters struct { - in low.Rings - port uint16 - anyway bool - stats common.RXTXStats + in low.Rings + port uint16 + unrestrictedClones bool + stats common.RXTXStats + sendThreadIndex int } func addSender(port uint16, in low.Rings, inIndexNumber int32) { - par := new(sendParameters) - par.port = port - par.in = in - par.anyway = schedState.anyway - schedState.addFF("sender", nil, send, nil, par, nil, sendReceiveKNI, inIndexNumber, &par.stats) + for iii := 0; iii < sendCPUCoresPerPort; iii++ { + par := new(sendParameters) + par.port = port + par.in = in + par.unrestrictedClones = schedState.unrestrictedClones + par.sendThreadIndex = iii + schedState.addFF("senderPort"+string(port)+"Thread"+string(iii), + nil, send, nil, par, nil, sendReceiveKNI, inIndexNumber, &par.stats) + } } type sendOSParameters struct { @@ -269,6 +288,19 @@ func addSenderOS(socket int, in low.Rings, inIndexNumber int32) { schedState.addFF("sender OS", nil, sendOS, nil, par, nil, sendReceiveKNI, inIndexNumber, &par.stats) } +type sendXDPParameters struct { + in low.Rings + socket low.XDPSocket + stats common.RXTXStats +} + +func addSenderXDP(socket low.XDPSocket, in low.Rings, inIndexNumber int32) { + par := new(sendXDPParameters) + par.socket = socket + par.in = in + schedState.addFF("AF_XDP sender", nil, sendXDP, nil, par, nil, sendReceiveKNI, inIndexNumber, &par.stats) +} + type copyParameters struct { in low.Rings out low.Rings @@ -439,8 +471,9 @@ const reportMbits = false var sizeMultiplier uint var schedTime uint -var hwtxchecksum, hwrxpacketstimestamp bool +var hwtxchecksum, hwrxpacketstimestamp, setSIGINTHandler bool var maxRecv int +var sendCPUCoresPerPort, tXQueuesNumberPerPort int type port struct { wasRequested bool // has user requested any send/receive operations at this port @@ -504,7 +537,7 @@ type Config struct { // Limits parallel instances. 1 for one instance, 1000 for RSS count determine instances MaxInIndex int32 // Scheduler should clone functions even if it can lead to reordering. - // This option should be switch off for all high level reassembling like TCP or HTTP + // This option should be switched off for all high level reassembling like TCP or HTTP RestrictedCloning bool // If application uses EncapsulateHead or DecapsulateHead functions L2 pointers // should be reinit every receving or generating a packet. This can be removed if @@ -538,6 +571,24 @@ type Config struct { // Enables hardware assisted timestamps in packet mbufs. These // timestamps can be accessed with GetPacketTimestamp function. HWRXPacketsTimestamp bool + // Disable setting custom handler for SIGINT in + // SystemStartScheduler. When handler is enabled + // SystemStartScheduler waits for SIGINT notification and calls + // SystemStop after it. It is enabled by default. + NoSetSIGINTHandler bool + // Number of CPU cores to be occupied by Send routines. It is + // necessary to set TXQueuesNumberPerPort to a reasonably big + // number which can be divided by SendCPUCoresPerPort. + SendCPUCoresPerPort int + // Number of transmit queues to use on network card. By default it + // is minimum of NIC supported TX queues number and 2. If this + // value is specified and NIC doesn't support this number of TX + // queues, initialization fails. + TXQueuesNumberPerPort int + // Controls scheduler interval in milliseconds. Default value is + // 500. Lower values allow faster reaction to changing traffic but + // increase scheduling overhead. + SchedulerInterval uint } // SystemInit is initialization of system. This function should be always called before graph construction. @@ -556,12 +607,26 @@ func SystemInit(args *Config) error { cpus = common.GetDefaultCPUs(CPUCoresNumber) } + tXQueuesNumberPerPort = args.TXQueuesNumberPerPort + if tXQueuesNumberPerPort == 0 { + tXQueuesNumberPerPort = 2 + } + + sendCPUCoresPerPort = args.SendCPUCoresPerPort + if sendCPUCoresPerPort == 0 { + sendCPUCoresPerPort = 1 + if tXQueuesNumberPerPort%sendCPUCoresPerPort != 0 { + return common.WrapWithNFError(nil, "TXQueuesNumberPerPort should be divisible by SendCPUCoresPerPort", + common.BadArgument) + } + } + schedulerOff := args.DisableScheduler schedulerOffRemove := args.PersistentClones stopDedicatedCore := args.StopOnDedicatedCore hwtxchecksum = args.HWTXChecksum hwrxpacketstimestamp = args.HWRXPacketsTimestamp - anyway := !args.RestrictedCloning + unrestrictedClones := !args.RestrictedCloning mbufNumber := uint(8191) if args.MbufNumber != 0 { @@ -578,7 +643,12 @@ func SystemInit(args *Config) error { sizeMultiplier = args.RingSize } - schedTime = 500 + if args.SchedulerInterval != 0 { + schedTime = args.SchedulerInterval + } else { + schedTime = 500 + } + if args.ScaleTime != 0 { schedTime = args.ScaleTime } @@ -665,12 +735,12 @@ func SystemInit(args *Config) error { } } portPair = make(map[types.IPv4Address](*port)) - devices = make(map[string]int) + ioDevices = make(map[string]interface{}) // Init scheduler common.LogTitle(common.Initialization, "------------***------ Initializing scheduler -----***------------") StopRing := low.CreateRings(burstSize*sizeMultiplier, maxInIndex /* Maximum possible rings */) common.LogDebug(common.Initialization, "Scheduler can use cores:", cpus) - schedState = newScheduler(cpus, schedulerOff, schedulerOffRemove, stopDedicatedCore, StopRing, checkTime, debugTime, maxPacketsToClone, maxRecv, anyway) + schedState = newScheduler(cpus, schedulerOff, schedulerOffRemove, stopDedicatedCore, StopRing, checkTime, debugTime, maxPacketsToClone, maxRecv, unrestrictedClones) // Set HW offloading flag in packet package packet.SetHWTXChecksumFlag(hwtxchecksum) @@ -690,6 +760,8 @@ func SystemInit(args *Config) error { vEach[i][j] = uint8(i) } } + + setSIGINTHandler = !args.NoSetSIGINTHandler return nil } @@ -703,7 +775,7 @@ func SystemInitPortsAndMemory() error { for i := range createdPorts { if createdPorts[i].wasRequested { if err := low.CreatePort(createdPorts[i].port, createdPorts[i].willReceive, - true, hwtxchecksum, hwrxpacketstimestamp, createdPorts[i].InIndex); err != nil { + true, hwtxchecksum, hwrxpacketstimestamp, createdPorts[i].InIndex, tXQueuesNumberPerPort); err != nil { return err } } @@ -723,7 +795,19 @@ func SystemStartScheduler() error { return common.WrapWithNFError(err, "scheduler start failed", common.Fail) } common.LogTitle(common.Initialization, "------------***---------- NFF-GO Started ---------***------------") - schedState.schedule(schedTime) + + if setSIGINTHandler { + signalChan := make(chan os.Signal, 1) + signal.Notify(signalChan, os.Interrupt) + go func() { + schedState.schedule(schedTime) + }() + <-signalChan + common.LogTitle(common.Debug, "Received an interrupt, stopping everything") + SystemStop() + } else { + schedState.schedule(schedTime) + } return nil } @@ -819,13 +903,14 @@ func SetReceiver(portId uint16) (OUT *Flow, err error) { // Gets name of device, will return error if can't initialize socket. // Creates RAW socket, returns new opened flow with received packets. func SetReceiverOS(device string) (*Flow, error) { - socketID, ok := devices[device] + v, ok := ioDevices[device] + socketID := v.(int) if !ok { socketID = low.InitDevice(device) if socketID == -1 { return nil, common.WrapWithNFError(nil, "Can't initialize socket", common.BadSocket) } - devices[device] = socketID + ioDevices[device] = socketID } rings := low.CreateRings(burstSize*sizeMultiplier, 1) addOSReceiver(socketID, rings) @@ -839,18 +924,57 @@ func SetSenderOS(IN *Flow, device string) error { if err := checkFlow(IN); err != nil { return err } - socketID, ok := devices[device] + v, ok := ioDevices[device] + socketID := v.(int) if !ok { socketID = low.InitDevice(device) if socketID == -1 { return common.WrapWithNFError(nil, "Can't initialize socket", common.BadSocket) } - devices[device] = socketID + ioDevices[device] = socketID } addSenderOS(socketID, finishFlow(IN), IN.inIndexNumber) return nil } +// SetReceiverXDP adds function receive from Linux AF_XDP to flow graph. +// Gets name of device and queue number, will return error if can't initialize socket. +// Creates AF_XDP socket, returns new opened flow with received packets. +func SetReceiverXDP(device string, queue int) (*Flow, error) { + _, ok := ioDevices[device] + if ok { + return nil, common.WrapWithNFError(nil, "Device shouldn't have any sockets before AF_XDP. AF_XDP Send and Receive for one device in forbidden now", common.BadSocket) + } + socketID := low.InitXDP(device, queue) + if socketID == nil { + return nil, common.WrapWithNFError(nil, "Can't initialize AF_XDP socket", common.BadSocket) + } + ioDevices[device] = socketID + rings := low.CreateRings(burstSize*sizeMultiplier, 1) + addXDPReceiver(socketID, rings) + return newFlow(rings, 1), nil +} + +// SetSenderXDP adds function send from flow graph to Linux AF_XDP interface. +// Gets name of device, will return error if can't initialize socket. +// Creates RAW socket, sends packets, closes input flow. +func SetSenderXDP(IN *Flow, device string) error { + if err := checkFlow(IN); err != nil { + return err + } + _, ok := ioDevices[device] + if ok { + return common.WrapWithNFError(nil, "Device shouldn't have any sockets before AF_XDP. AF_XDP Send and Receive for one device in forbidden now", common.BadSocket) + } + socketID := low.InitXDP(device, 0) + if socketID == nil { + return common.WrapWithNFError(nil, "Can't initialize AF_XDP socket", common.BadSocket) + } + ioDevices[device] = socketID + addSenderXDP(socketID, finishFlow(IN), IN.inIndexNumber) + return nil +} + // SetReceiverKNI adds function receive from KNI to flow graph. // Gets KNI device from which packets will be received. // Receive queue will be added to port automatically. @@ -1418,6 +1542,11 @@ func recvOS(parameters interface{}, inIndex []int32, flag *int32, coreID int) { low.ReceiveOS(srp.socket, srp.out[0], flag, coreID, &srp.stats) } +func recvXDP(parameters interface{}, inIndex []int32, flag *int32, coreID int) { + srp := parameters.(*receiveXDPParameters) + low.ReceiveXDP(srp.socket, srp.out[0], flag, coreID, &srp.stats) +} + func processKNI(parameters interface{}, inIndex []int32, flag *int32, coreID int) { srk := parameters.(*KNIParameters) if srk.linuxCore == true { @@ -1599,7 +1728,8 @@ func pcopy(parameters interface{}, inIndex []int32, stopper [2]chan int, report func send(parameters interface{}, inIndex []int32, flag *int32, coreID int) { srp := parameters.(*sendParameters) - low.Send(srp.port, srp.in, srp.anyway, flag, coreID, &srp.stats) + low.Send(srp.port, srp.in, srp.unrestrictedClones, flag, coreID, &srp.stats, + srp.sendThreadIndex, sendCPUCoresPerPort) } func sendOS(parameters interface{}, inIndex []int32, flag *int32, coreID int) { @@ -1607,6 +1737,11 @@ func sendOS(parameters interface{}, inIndex []int32, flag *int32, coreID int) { low.SendOS(srp.socket, srp.in, flag, coreID, &srp.stats) } +func sendXDP(parameters interface{}, inIndex []int32, flag *int32, coreID int) { + srp := parameters.(*sendXDPParameters) + low.SendXDP(srp.socket, srp.in, flag, coreID, &srp.stats) +} + func merge(from low.Rings, to low.Rings) { // We should change out rings in all flow functions which we added before // and change them to one "after merge" ring. @@ -1620,6 +1755,10 @@ func merge(from low.Rings, to low.Rings) { if parameters.out[0] == from[0] { parameters.out = to } + case *receiveOSParameters: + if parameters.out[0] == from[0] { + parameters.out = to + } case *generateParameters: if parameters.out[0] == from[0] { parameters.out = to diff --git a/flow/scheduler.go b/flow/scheduler.go index 6390ea42..3c1d6f87 100644 --- a/flow/scheduler.go +++ b/flow/scheduler.go @@ -36,6 +36,7 @@ const generatePauseStep = 0.1 const process = 1 const stopRequest = 2 const wasStopped = 9 +const printPortStatistics = false // TODO "5" and "39" constants derived empirically. Need to investigate more elegant thresholds. const RSSCloneMin = 5 @@ -156,26 +157,26 @@ func (scheduler *scheduler) addFF(name string, ucfn uncloneFlowFunction, Cfn cFl } type scheduler struct { - ff []*flowFunction - cores []core - off bool - offRemove bool - anyway bool - stopDedicatedCore bool - StopRing low.Rings - usedCores uint8 - checkTime uint - debugTime uint - Dropped uint - maxPacketsToClone uint32 - stopFlag int32 - maxRecv int - Timers []*Timer - nAttempts []uint64 - pAttempts []uint64 - maxInIndex int32 - measureRings low.Rings - coreIndex int + ff []*flowFunction + cores []core + off bool + offRemove bool + unrestrictedClones bool + stopDedicatedCore bool + StopRing low.Rings + usedCores uint8 + checkTime uint + debugTime uint + Dropped uint + maxPacketsToClone uint32 + stopFlag int32 + maxRecv int + Timers []*Timer + nAttempts []uint64 + pAttempts []uint64 + maxInIndex int32 + measureRings low.Rings + coreIndex int } type core struct { @@ -184,7 +185,7 @@ type core struct { } func newScheduler(cpus []int, schedulerOff bool, schedulerOffRemove bool, stopDedicatedCore bool, - stopRing low.Rings, checkTime uint, debugTime uint, maxPacketsToClone uint32, maxRecv int, anyway bool) *scheduler { + stopRing low.Rings, checkTime uint, debugTime uint, maxPacketsToClone uint32, maxRecv int, unrestrictedClones bool) *scheduler { coresNumber := len(cpus) // Init scheduler scheduler := new(scheduler) @@ -200,7 +201,7 @@ func newScheduler(cpus []int, schedulerOff bool, schedulerOffRemove bool, stopDe scheduler.debugTime = debugTime scheduler.maxPacketsToClone = maxPacketsToClone scheduler.maxRecv = maxRecv - scheduler.anyway = anyway + scheduler.unrestrictedClones = unrestrictedClones scheduler.pAttempts = make([]uint64, len(scheduler.cores), len(scheduler.cores)) return scheduler @@ -405,6 +406,13 @@ func (scheduler *scheduler) schedule(schedTime uint) { common.LogDebug(common.Debug, "---------------") common.LogDebug(common.Debug, "System is using", scheduler.usedCores, "cores now.", uint8(len(scheduler.cores))-scheduler.usedCores, "cores are left available.") low.Statistics(float32(scheduler.debugTime) / 1000) + if printPortStatistics { + for i := range createdPorts { + if createdPorts[i].wasRequested { + low.PortStatistics(createdPorts[i].port) + } + } + } for i := range scheduler.ff { scheduler.ff[i].printDebug(schedTime) } @@ -549,7 +557,7 @@ func (scheduler *scheduler) schedule(schedTime uint) { ffi.removed = false continue } - if ffi.inIndex[0] == 1 && scheduler.anyway && ffi.checkInputRingClonable(scheduler.maxPacketsToClone) && + if ffi.inIndex[0] == 1 && scheduler.unrestrictedClones && ffi.checkInputRingClonable(scheduler.maxPacketsToClone) && ffi.checkOutputRingClonable(scheduler.maxPacketsToClone) && (ffi.increasedSpeed == 0 || ffi.increasedSpeed > ffi.reportedState.V.Packets) { if scheduler.pAttempts[ffi.cloneNumber+1] == 0 { diff --git a/internal/low/Makefile b/internal/low/Makefile index d310b737..e69099ce 100644 --- a/internal/low/Makefile +++ b/internal/low/Makefile @@ -7,4 +7,4 @@ include $(PATH_TO_MK)/include.mk .PHONY: testing testing: check-pktgen - go test -v + go test -v -tags "${GO_BUILD_TAGS}" diff --git a/internal/low/low.go b/internal/low/low.go index 9ef31035..6ebdc31b 100644 --- a/internal/low/low.go +++ b/internal/low/low.go @@ -84,7 +84,7 @@ func CheckRSSPacketCount(p *Port, queue int16) int64 { // GetPortMACAddress gets MAC address of given port. func GetPortMACAddress(port uint16) [types.EtherAddrLen]uint8 { var mac [types.EtherAddrLen]uint8 - var cmac C.struct_ether_addr + var cmac C.struct_rte_ether_addr C.rte_eth_macaddr_get(C.uint16_t(port), &cmac) for i := range mac { @@ -517,12 +517,19 @@ func SrKNI(port uint16, flag *int32, coreID int, recv bool, OUT Rings, send bool } // Send - dequeue packets and send. -func Send(port uint16, IN Rings, anyway bool, flag *int32, coreID int, stats *common.RXTXStats) { +func Send(port uint16, IN Rings, unrestrictedClones bool, flag *int32, coreID int, stats *common.RXTXStats, + sendThreadIndex, totalSendTreads int) { if C.rte_eth_dev_socket_id(C.uint16_t(port)) != C.int(C.rte_lcore_to_socket_id(C.uint(coreID))) { common.LogWarning(common.Initialization, "Send port", port, "is on remote NUMA node to polling thread - not optimal performance.") } - C.nff_go_send(C.uint16_t(port), C.extractDPDKRings((**C.struct_nff_go_ring)(unsafe.Pointer(&(IN[0]))), C.int32_t(len(IN))), C.int32_t(len(IN)), - C.bool(anyway), (*C.int)(unsafe.Pointer(flag)), C.int(coreID), (*C.RXTXStats)(unsafe.Pointer(stats))) + C.nff_go_send(C.uint16_t(port), + C.extractDPDKRings((**C.struct_nff_go_ring)(unsafe.Pointer(&(IN[0]))), C.int32_t(len(IN))), + C.int32_t(len(IN)), + C.bool(unrestrictedClones), + (*C.int)(unsafe.Pointer(flag)), C.int(coreID), + (*C.RXTXStats)(unsafe.Pointer(stats)), + C.int32_t(sendThreadIndex), + C.int32_t(totalSendTreads)) } // Stop - dequeue and free packets. @@ -590,11 +597,16 @@ func GetPortsNumber() int { } func CheckPortRSS(port uint16) int32 { - return int32(C.check_port_rss(C.uint16_t(port))) + return int32(C.check_max_port_rx_queues(C.uint16_t(port))) +} + +func CheckPortMaxTXQueues(port uint16) int32 { + return int32(C.check_max_port_tx_queues(C.uint16_t(port))) } // CreatePort initializes a new port using global settings and parameters. -func CreatePort(port uint16, willReceive bool, promiscuous bool, hwtxchecksum, hwrxpacketstimestamp bool, inIndex int32) error { +func CreatePort(port uint16, willReceive bool, promiscuous bool, hwtxchecksum, + hwrxpacketstimestamp bool, inIndex int32, tXQueuesNumberPerPort int) error { var mempools **C.struct_rte_mempool if willReceive { m := CreateMempools("receive", inIndex) @@ -603,7 +615,7 @@ func CreatePort(port uint16, willReceive bool, promiscuous bool, hwtxchecksum, h mempools = nil } if C.port_init(C.uint16_t(port), C.bool(willReceive), mempools, - C._Bool(promiscuous), C._Bool(hwtxchecksum), C._Bool(hwrxpacketstimestamp), C.int32_t(inIndex)) != 0 { + C._Bool(promiscuous), C._Bool(hwtxchecksum), C._Bool(hwrxpacketstimestamp), C.int32_t(inIndex), C.int32_t (tXQueuesNumberPerPort)) != 0 { msg := common.LogError(common.Initialization, "Cannot init port ", port, "!") return common.WrapWithNFError(nil, msg, common.FailToInitPort) } @@ -697,6 +709,11 @@ func Statistics(N float32) { C.statistics(C.float(N)) } +// PortStatistics print statistics about NIC port. +func PortStatistics(port uint16) { + C.portStatistics(C.uint16_t(port)) +} + // ReportMempoolsState prints used and free space of mempools. func ReportMempoolsState() { for _, m := range usedMempools { @@ -760,6 +777,10 @@ func CheckHWRXPacketsTimestamp(port uint16) bool { return bool(C.check_hwrxpackets_timestamp_capability(C.uint16_t(port))) } +func InitDevice(device string) int { + return int(C.initDevice(C.CString(device))) +} + func ReceiveOS(socket int, OUT *Ring, flag *int32, coreID int, stats *common.RXTXStats) { m := CreateMempool("receiveOS") C.receiveOS(C.int(socket), OUT.DPDK_ring, (*C.struct_rte_mempool)(unsafe.Pointer(m)), @@ -772,10 +793,6 @@ func SendOS(socket int, IN Rings, flag *int32, coreID int, stats *common.RXTXSta (*C.RXTXStats)(unsafe.Pointer(stats))) } -func InitDevice(device string) int { - return int(C.initDevice(C.CString(device))) -} - func SetCountersEnabledInApplication(enabled bool) { C.counters_enabled_in_application = C.bool(true) } @@ -791,3 +808,21 @@ func GetPacketOffloadFlags(mb *Mbuf) uint64 { func GetPacketTimestamp(mb *Mbuf) uint64 { return uint64(mb.timestamp) } + +type XDPSocket *C.struct_xsk_socket_info + +func InitXDP(device string, queue int) XDPSocket { + return C.initXDP(C.CString(device), C.int(queue)) +} + +func ReceiveXDP(socket XDPSocket, OUT *Ring, flag *int32, coreID int, stats *common.RXTXStats) { + m := CreateMempool("receiveXDP") + C.receiveXDP(socket, OUT.DPDK_ring, (*C.struct_rte_mempool)(unsafe.Pointer(m)), + (*C.int)(unsafe.Pointer(flag)), C.int(coreID), (*C.RXTXStats)(unsafe.Pointer(stats))) +} + +func SendXDP(socket XDPSocket, IN Rings, flag *int32, coreID int, stats *common.RXTXStats) { + C.sendXDP(socket, C.extractDPDKRings((**C.struct_nff_go_ring)(unsafe.Pointer(&(IN[0]))), + C.int32_t(len(IN))), C.int32_t(len(IN)), (*C.int)(unsafe.Pointer(flag)), C.int(coreID), + (*C.RXTXStats)(unsafe.Pointer(stats))) +} diff --git a/internal/low/low.h b/internal/low/low.h index af656d0b..de591d39 100644 --- a/internal/low/low.h +++ b/internal/low/low.h @@ -6,6 +6,7 @@ // Do not use signals in this C code without much need. // They can and probably will crash go runtime in complex errors. +#include #include #include #include @@ -35,17 +36,20 @@ // These constants are get from DPDK and checked for performance #define RX_RING_SIZE 128 -#define TX_RING_SIZE 512 +#define TX_RING_SIZE 2048 // 2 queues are enough for handling 40GBits. Should be checked for other NICs. // TODO This macro should be a function that will dynamically return the needed number of cores. -#define TX_QUEUE_NUMBER 2 +#define TX_QUEUE_CORES 2 +#define TX_ATTEMPTS 3 #define APP_RETA_SIZE_MAX (ETH_RSS_RETA_SIZE_512 / RTE_RETA_GROUP_SIZE) #define MAX_JUMBO_PKT_LEN 9600 // from most DPDK examples. Only for MEMORY_JUMBO. // #define DEBUG +// #define PORT_XSTATS_ENABLED +// #define DEBUG_PACKET_LOSS #define COUNTERS_ENABLED #define USE_INTERLOCKED_COUNTERS #define ANALYZE_PACKETS_SIZES @@ -138,9 +142,9 @@ bool analyze_packet_sizes = true; bool analyze_packet_sizes = false; #endif -long receive_received = 0, receive_pushed = 0; -long send_required = 0, send_sent = 0; -long stop_freed = 0; +volatile long long receive_received = 0, receive_pushed = 0; +volatile long long send_required = 0, send_sent = 0; +volatile long long stop_freed = 0; typedef struct { uint64_t PacketsProcessed, PacketsDropped, BytesProcessed; @@ -191,6 +195,8 @@ void setAffinity(int coreId) { sched_setaffinity(0, sizeof(cpuset), &cpuset); } +// ---------- DPDK section ---------- + int create_kni(uint16_t port, uint32_t core, char *name, struct rte_mempool *mbuf_pool) { struct rte_eth_dev_info dev_info; const struct rte_pci_device *pci_dev; @@ -214,7 +220,7 @@ int create_kni(uint16_t port, uint32_t core, char *name, struct rte_mempool *mbu conf_default.id = pci_dev->id; } conf_default.force_bind = 1; // Flag to bind kernel thread - rte_eth_macaddr_get(port, (struct ether_addr *)&conf_default.mac_addr); + rte_eth_macaddr_get(port, (struct rte_ether_addr *)&conf_default.mac_addr); rte_eth_dev_get_mtu(port, &conf_default.mtu); struct rte_kni_ops ops; @@ -243,31 +249,39 @@ int checkRSSPacketCount(struct cPort *port, int16_t queue) { return rte_eth_rx_queue_count(port->PortId, queue); } -int check_port_rss(uint16_t port) { +uint16_t check_max_port_rx_queues(uint16_t port) { struct rte_eth_dev_info dev_info; memset(&dev_info, 0, sizeof(dev_info)); rte_eth_dev_info_get(port, &dev_info); return dev_info.max_rx_queues; } -int check_port_tx(uint16_t port) { +uint16_t check_max_port_tx_queues(uint16_t port) { struct rte_eth_dev_info dev_info; memset(&dev_info, 0, sizeof(dev_info)); rte_eth_dev_info_get(port, &dev_info); return dev_info.max_tx_queues; } +uint16_t check_current_port_tx_queues(uint16_t port) { + struct rte_eth_dev_info dev_info; + memset(&dev_info, 0, sizeof(dev_info)); + rte_eth_dev_info_get(port, &dev_info); + return dev_info.nb_tx_queues; +} + // Initializes a given port using global settings and with the RX buffers // coming from the mbuf_pool passed as a parameter. -int port_init(uint16_t port, bool willReceive, struct rte_mempool **mbuf_pools, bool promiscuous, bool hwtxchecksum, bool hwrxpacketstimestamp, int32_t inIndex) { - uint16_t rx_rings, tx_rings = TX_QUEUE_NUMBER; +int port_init(uint16_t port, bool willReceive, struct rte_mempool **mbuf_pools, bool promiscuous, bool hwtxchecksum, bool hwrxpacketstimestamp, int32_t inIndex, int32_t tx_queues) { + uint16_t rx_rings, tx_rings = tx_queues; struct rte_eth_dev_info dev_info; memset(&dev_info, 0, sizeof(dev_info)); rte_eth_dev_info_get(port, &dev_info); if (tx_rings > dev_info.max_tx_queues) { - tx_rings = check_port_tx(port); + printf("Warning! Port %d does not support requested number of TX queues %d. Setting number of TX queues to %d\n", port, tx_rings, dev_info.max_tx_queues); + tx_rings = dev_info.max_tx_queues; } if (willReceive) { @@ -280,7 +294,7 @@ int port_init(uint16_t port, bool willReceive, struct rte_mempool **mbuf_pools, return -1; struct rte_eth_conf port_conf_default = { - .rxmode = { .max_rx_pkt_len = ETHER_MAX_LEN, + .rxmode = { .max_rx_pkt_len = RTE_ETHER_MAX_LEN, .mq_mode = ETH_MQ_RX_RSS }, .txmode = { .mq_mode = ETH_MQ_TX_NONE, }, .rx_adv_conf.rss_conf.rss_key = NULL, @@ -343,6 +357,22 @@ int port_init(uint16_t port, bool willReceive, struct rte_mempool **mbuf_pools, .QueuesNumber = rx_rings }; +#ifdef DEBUG +#ifdef PORT_XSTATS_ENABLED + int stats_len = rte_eth_xstats_get(port, NULL, 0); + struct rte_eth_xstat_name *xstats_names; + assert(stats_len >= 0); + xstats_names = calloc(stats_len, sizeof(*xstats_names)); + assert(xstats_names != NULL); + int ret = rte_eth_xstats_get_names(port, xstats_names, stats_len); + assert(ret >= 0 && ret <= stats_len); + printf("Port %u exposes the following extended stats\n", port); + for (int i = 0; i < stats_len; i++) { + printf("\t%d: %s\n", i, xstats_names[i].name); + } + free(xstats_names); +#endif // PORT_XSTATS_ENABLED +#endif // DEBUG return 0; } @@ -424,12 +454,12 @@ struct rte_ip_frag_tbl* create_reassemble_table() { __attribute__((always_inline)) static inline struct rte_mbuf* reassemble(struct rte_ip_frag_tbl* tbl, struct rte_mbuf *buf, struct rte_ip_frag_death_row* death_row, uint64_t cur_tsc) { - struct ether_hdr *eth_hdr = rte_pktmbuf_mtod(buf, struct ether_hdr *); + struct rte_ether_hdr *eth_hdr = rte_pktmbuf_mtod(buf, struct rte_ether_hdr *); // TODO packet_type is not mandatory required for drivers. // Some drivers won't set it. However this is DPDK implementation. if (RTE_ETH_IS_IPV4_HDR(buf->packet_type)) { // if packet is IPv4 - struct ipv4_hdr *ip_hdr = (struct ipv4_hdr *)(eth_hdr + 1); + struct rte_ipv4_hdr *ip_hdr = (struct rte_ipv4_hdr *)(eth_hdr + 1); if (rte_ipv4_frag_pkt_is_fragmented(ip_hdr)) { // try to reassemble buf->l2_len = sizeof(*eth_hdr); // prepare mbuf: setup l2_len/l3_len. @@ -440,7 +470,7 @@ static inline struct rte_mbuf* reassemble(struct rte_ip_frag_tbl* tbl, struct rt } } if (RTE_ETH_IS_IPV6_HDR(buf->packet_type)) { // if packet is IPv6 - struct ipv6_hdr *ip_hdr = (struct ipv6_hdr *)(eth_hdr + 1); + struct rte_ipv6_hdr *ip_hdr = (struct rte_ipv6_hdr *)(eth_hdr + 1); struct ipv6_extension_fragment *frag_hdr = rte_ipv6_frag_get_ipv6_fragment_header(ip_hdr); if (frag_hdr != NULL) { @@ -475,8 +505,8 @@ void receiveRSS(uint16_t port, volatile int32_t *inIndex, struct rte_ring **out_ // Free any packets which can't be pushed to the ring. The ring is probably full. handleUnpushed((void*)bufs, pushed_pkts_number, rx_pkts_number); #ifdef DEBUG - receive_received += rx_pkts_number; - receive_pushed += pushed_pkts_number; + __sync_fetch_and_add(&receive_received, rx_pkts_number); + __sync_fetch_and_add(&receive_pushed, pushed_pkts_number); #endif // DEBUG } } @@ -527,36 +557,54 @@ void nff_go_KNI(uint16_t port, volatile int *flag, int coreId, *flag = wasStopped; } -void nff_go_send(uint16_t port, struct rte_ring **in_rings, int32_t inIndexNumber, bool anyway, volatile int *flag, int coreId, RXTXStats *stats) { +void nff_go_send(uint16_t port, struct rte_ring **in_rings, int32_t inIndexNumber, bool anyway, volatile int *flag, int coreId, RXTXStats *stats, int32_t sendThreadIndex, int32_t totalSendTreads) { setAffinity(coreId); struct rte_mbuf *bufs[BURST_SIZE]; uint16_t buf; uint16_t tx_pkts_number; - int16_t queue = 0; - bool switchQueue = (check_port_tx(port) > 1) && (anyway || inIndexNumber > 1); + int16_t port_tx_queues = check_current_port_tx_queues(port); + int16_t tx_qstart = port_tx_queues / totalSendTreads * sendThreadIndex; + int16_t tx_qend = sendThreadIndex + 1 == totalSendTreads ? port_tx_queues : port_tx_queues / totalSendTreads * (sendThreadIndex + 1); + int16_t tx_queue_counter = tx_qstart; + int rx_qstart = inIndexNumber / totalSendTreads * sendThreadIndex; + int rx_qend = inIndexNumber / totalSendTreads * (sendThreadIndex + 1); + printf("Starting send with %d to %d RX queues and %d to %d TX on core %d\n", + rx_qstart, rx_qend, tx_qstart, tx_qend, coreId); while (*flag == process) { - for (int q = 0; q < inIndexNumber; q++) { + for (int q = rx_qstart; q < rx_qend; q++) { // Get packets for TX from ring uint16_t pkts_for_tx_number = rte_ring_mc_dequeue_burst(in_rings[q], (void*)bufs, BURST_SIZE, NULL); if (unlikely(pkts_for_tx_number == 0)) continue; - tx_pkts_number = rte_eth_tx_burst(port, queue, bufs, pkts_for_tx_number); - // inIndexNumber must be "1" or even. This prevents any reordering. - // anyway allows reordering explicitly - if (switchQueue) { - queue = !queue; - } + tx_pkts_number = 0; + int tx_attempts_counter = 0; + do { + uint16_t iteration_tx_pkts = rte_eth_tx_burst(port, tx_queue_counter, bufs + tx_pkts_number, pkts_for_tx_number - tx_pkts_number); + tx_pkts_number += iteration_tx_pkts; + tx_attempts_counter++; + } while (tx_pkts_number < pkts_for_tx_number && tx_attempts_counter <= TX_ATTEMPTS); UPDATE_COUNTERS(tx_pkts_number, calculateSize(bufs, tx_pkts_number), pkts_for_tx_number - tx_pkts_number); +#ifdef DEBUG_PACKET_LOSS + if (unlikely(tx_pkts_number < pkts_for_tx_number)) { + printf("**** Port %d, queue %d tried to transmit %d, transmitted only %d\n", + port, tx_queue_counter + tx_queue_offset, pkts_for_tx_number, tx_pkts_number); + } +#endif // Free any unsent packets handleUnpushed(bufs, tx_pkts_number, pkts_for_tx_number); + + tx_queue_counter++; + if (tx_queue_counter >= tx_qend) { + tx_queue_counter = tx_qstart; + } #ifdef DEBUG - send_required += pkts_for_tx_number; - send_sent += tx_pkts_number; + __sync_fetch_and_add(&send_required, pkts_for_tx_number); + __sync_fetch_and_add(&send_sent, tx_pkts_number); #endif } } @@ -585,7 +633,7 @@ void nff_go_stop(struct rte_ring **in_rings, int len, volatile int *flag, int co rte_pktmbuf_free(bufs[buf]); } #ifdef DEBUG - stop_freed += pkts_for_free_number; + __sync_fetch_and_add(&stop_freed, pkts_for_free_number); #endif } } @@ -624,12 +672,12 @@ void statistics(float N) { fprintf(stderr, "DEBUG: Current speed of all receives: received %.0f Mbits/s, pushed %.0f Mbits/s\n", (receive_received/N) * multiplier, (receive_pushed/N) * multiplier); if (receive_pushed < receive_received) { - fprintf(stderr, "DROP: Receive dropped %ld packets\n", receive_received - receive_pushed); + fprintf(stderr, "DROP: Receive dropped %lld packets\n", receive_received - receive_pushed); } fprintf(stderr, "DEBUG: Current speed of all sends: required %.0f Mbits/s, sent %.0f Mbits/s\n", (send_required/N) * multiplier, (send_sent/N) * multiplier); if (send_sent < send_required) { - fprintf(stderr, "DROP: Send dropped %ld packets\n", send_required - send_sent); + fprintf(stderr, "DROP: Send dropped %lld packets\n", send_required - send_sent); } fprintf(stderr, "DEBUG: Current speed of stop ring: freed %.0f Mbits/s\n", (stop_freed/N) * multiplier); // Yes, there can be race conditions here. However in practise they are rare and it is more @@ -643,6 +691,78 @@ void statistics(float N) { #endif } +#define RX_Q0PACKETS 8 +#define RX_Q_PACKETS_STEP 3 +#define TX_Q0PACKETS 56 +#define TX_Q_PACKETS_STEP 2 +#define RX_Q0BURST 88 +#define RX_Q_BURST_STEP 1 +#define TX_Q0BURST 104 +#define TX_Q_BURST_STEP 1 + +void portStatistics(uint16_t port_id) { +#ifdef DEBUG + struct rte_eth_dev_info dev_info; + rte_eth_dev_info_get(port_id, &dev_info); + + struct rte_eth_stats eth_stats; + rte_eth_stats_get(port_id, ð_stats); + rte_eth_stats_reset(port_id); + + if (dev_info.nb_rx_queues > 0 || dev_info.nb_tx_queues > 0) { + fprintf(stderr, "Port %u (RX_Q %u/TX_Q %u): IP %lu, OP %lu, IB %lu, OB %lu, IMISS %lu, IERR %lu, OERR %lu, RX_NOMBUF %lu\n", + port_id, + dev_info.nb_rx_queues, dev_info.nb_tx_queues, + eth_stats.ipackets, + eth_stats.opackets, + eth_stats.ibytes, + eth_stats.obytes, + eth_stats.imissed, + eth_stats.ierrors, + eth_stats.oerrors, + eth_stats.rx_nombuf); + +#ifdef PORT_XSTATS_ENABLED + int len = rte_eth_xstats_get(port_id, NULL, 0); + assert(len >= 0); + struct rte_eth_xstat *xstats = calloc(len, sizeof(*xstats)); + int ret = rte_eth_xstats_get(port_id, xstats, len); + assert(ret >= 0 && ret <= len); + rte_eth_xstats_reset(port_id); +/* + fprintf(stderr, "RX_Q_PACKETS: "); + for (int i = 0; i < dev_info.nb_rx_queues - 1; i++) { + fprintf(stderr, "%d: %lu, ", i, xstats[RX_Q0PACKETS + i * RX_Q_PACKETS_STEP].value); + } + fprintf(stderr, "%d: %lu\n", dev_info.nb_rx_queues - 1, xstats[RX_Q0PACKETS + (dev_info.nb_rx_queues - 1) * RX_Q_PACKETS_STEP].value); + + fprintf(stderr, "TX_Q_PACKETS: "); + for (int i = 0; i < dev_info.nb_tx_queues - 1; i++) { + fprintf(stderr, "%d: %lu, ", i, xstats[TX_Q0PACKETS + i * TX_Q_PACKETS_STEP].value); + } + fprintf(stderr, "%d: %lu\n", dev_info.nb_tx_queues - 1, xstats[TX_Q0PACKETS + (dev_info.nb_tx_queues - 1) * TX_Q_PACKETS_STEP].value); +*/ + if (dev_info.nb_rx_queues > 0) { + fprintf(stderr, "RX_Q_BURST: "); + for (int i = 0; i < dev_info.nb_rx_queues - 1; i++) { + fprintf(stderr, "%d: %lu, ", i, xstats[RX_Q0BURST + i * RX_Q_BURST_STEP].value); + } + fprintf(stderr, "%d: %lu\n", dev_info.nb_rx_queues - 1, xstats[RX_Q0BURST + (dev_info.nb_rx_queues - 1) * RX_Q_BURST_STEP].value); + } + + if (dev_info.nb_tx_queues > 0) { + fprintf(stderr, "TX_Q_BURST: "); + for (int i = 0; i < dev_info.nb_tx_queues - 1; i++) { + fprintf(stderr, "%d: %lu, ", i, xstats[TX_Q0BURST + i * TX_Q_BURST_STEP].value); + } + fprintf(stderr, "%d: %lu\n", dev_info.nb_tx_queues - 1, xstats[TX_Q0BURST + (dev_info.nb_tx_queues - 1) * TX_Q_BURST_STEP].value); + } + free(xstats); +#endif // PORT_XSTATS_ENABLED + } +#endif +} + // Initialize the Environment Abstraction Layer (EAL) in DPDK. int eal_init(int argc, char *argv[], uint32_t burstSize, int32_t needKNI, bool noPacketHeadChange, bool needChainedReassembly, bool needChainedJumbo, bool needMemoryJumbo) { @@ -828,6 +948,8 @@ bool check_hwrxpackets_timestamp_capability(uint16_t port_id) { return (dev_info.rx_offload_capa & flags) == flags; } +// ---------- OS raw socket section ---------- + int initDevice(char *name) { int s = socket(PF_PACKET, SOCK_RAW, htons(ETH_P_ALL)); if (s < 1) { @@ -888,6 +1010,10 @@ void receiveOS(int socket, struct rte_ring *out_ring, struct rte_mempool *m, vol // Free any packets which can't be pushed to the ring. The ring is probably full. handleUnpushed((void*)bufs, pushed_pkts_number, rx_pkts_number); +#ifdef DEBUG + receive_received += rx_pkts_number; + receive_pushed += pushed_pkts_number; +#endif // DEBUG } *flag = wasStopped; } @@ -897,7 +1023,6 @@ void sendOS(int socket, struct rte_ring **in_rings, int32_t inIndexNumber, volat struct rte_mbuf *bufs[BURST_SIZE]; uint16_t buf; - uint16_t tx_pkts_number; while (*flag == process) { for (int q = 0; q < inIndexNumber; q++) { // Get packets for TX from ring @@ -911,8 +1036,254 @@ void sendOS(int socket, struct rte_ring **in_rings, int32_t inIndexNumber, volat // Free all packets handleUnpushed(bufs, 0, pkts_for_tx_number); +#ifdef DEBUG + send_required += pkts_for_tx_number; + send_sent += pkts_for_tx_number; +#endif // DEBUG } } free(in_rings); *flag = wasStopped; } + +// ---------- XDP socket section ---------- + +#ifdef NFF_GO_SUPPORT_XDP + +#include +#include +#include +#include "bpf/libbpf.h" +#include "bpf/xsk.h" + +struct xsk_umem_info { + struct xsk_ring_prod fq; + struct xsk_ring_cons cq; + struct xsk_umem *umem; + const struct rte_memzone *mz; +}; + +struct xsk_socket_info { + struct xsk_ring_cons rx; + struct xsk_ring_prod tx; + struct xsk_umem_info *umem; + struct xsk_socket *xsk; + __u32 outstanding_tx; + __u32 prog_id; + int nameindex; +}; + +struct xsk_socket_info *initXDP(char *name, int queue) { + __u32 idx; + __u32 opt_xdp_flags = XDP_FLAGS_UPDATE_IF_NOEXIST; // TODO get from user + __u32 opt_xdp_bind_flags = 0; // TODO get from user + __u32 NUM_FRAMES = 4 * 1024; // TODO from num_mbufs + int opt_xsk_frame_size = XSK_UMEM__DEFAULT_FRAME_SIZE; // TODO from num_mbufs + int memory_size = NUM_FRAMES * opt_xsk_frame_size; + + struct xsk_socket_info *xdp_socket = calloc(1, sizeof(*xdp_socket)); + if (!xdp_socket) { + fprintf(stderr, "ERROR: Can't allocate memory for socket structure for AF_XDP: %s\n", name); + return NULL; + } + + xdp_socket->umem = calloc(1, sizeof(*xdp_socket->umem)); + if (!xdp_socket->umem) { + fprintf(stderr, "ERROR: Can't allocate memory for umem structure for AF_XDP: %s\n", name); + return NULL; + } + + const struct rte_memzone *mz = rte_memzone_reserve_aligned(name, memory_size, + rte_socket_id(), RTE_MEMZONE_IOVA_CONTIG, getpagesize()); + if (!mz) { + fprintf(stderr, "ERROR: Can't reserve memzone for AF_XDP %s\n", name); + return NULL; + } + + struct xsk_umem_config umem_cfg = { + .fill_size = XSK_RING_PROD__DEFAULT_NUM_DESCS, + .comp_size = XSK_RING_CONS__DEFAULT_NUM_DESCS, + .frame_size = opt_xsk_frame_size, + .frame_headroom = XSK_UMEM__DEFAULT_FRAME_HEADROOM, + }; + int ret = xsk_umem__create(&xdp_socket->umem->umem, mz->addr, memory_size, + &xdp_socket->umem->fq, &xdp_socket->umem->cq, &umem_cfg); + if (ret) { + fprintf(stderr, "ERROR: Can't create umem for AF_XDP %s, error code %d\n", name, ret); + return NULL; + } + xdp_socket->umem->mz = mz; + + struct xsk_socket_config xsk_cfg = { + .rx_size = XSK_RING_CONS__DEFAULT_NUM_DESCS, + .tx_size = XSK_RING_PROD__DEFAULT_NUM_DESCS, + .libbpf_flags = 0, + .xdp_flags = opt_xdp_flags, + .bind_flags = opt_xdp_bind_flags, + }; + ret = xsk_socket__create(&xdp_socket->xsk, name, queue, xdp_socket->umem->umem, + &xdp_socket->rx, &xdp_socket->tx, &xsk_cfg); + if (ret) { + fprintf(stderr, "ERROR: Can't create AF_XDP socket %s, error code %d\n", name, ret); + return NULL; + } + xdp_socket->nameindex = if_nametoindex(name); + if (!xdp_socket->nameindex) { + fprintf(stderr, "ERROR: interface %s for AF_XDP does not exist\n", name); + return NULL; + } + + ret = bpf_get_link_xdp_id(xdp_socket->nameindex, &xdp_socket->prog_id, opt_xdp_flags); + if (ret) { + fprintf(stderr, "ERROR: Can't link BPF for AF_XDP %s, error code %d\n", name, ret); + return NULL; + } + ret = xsk_ring_prod__reserve(&xdp_socket->umem->fq, XSK_RING_PROD__DEFAULT_NUM_DESCS, &idx); + if (ret != XSK_RING_PROD__DEFAULT_NUM_DESCS) { + fprintf(stderr, "ERROR: reserve prod buffer for AF_XDP %s, error code %d\n", name, ret); + return NULL; + } + for (int i = 0; i < XSK_RING_PROD__DEFAULT_NUM_DESCS * XSK_UMEM__DEFAULT_FRAME_SIZE; i += XSK_UMEM__DEFAULT_FRAME_SIZE) { + *xsk_ring_prod__fill_addr(&xdp_socket->umem->fq, idx++) = i; + } + xsk_ring_prod__submit(&xdp_socket->umem->fq, XSK_RING_PROD__DEFAULT_NUM_DESCS); + return xdp_socket; +} + +void removeXDP(struct xsk_socket_info *xsk) { + xsk_socket__delete(xsk->xsk); + xsk_umem__delete(xsk->umem->umem); + __u32 curr_prog_id = 0; + + if (bpf_get_link_xdp_id(xsk->nameindex, &curr_prog_id, XDP_FLAGS_UPDATE_IF_NOEXIST)) { + return; + } + if (xsk->prog_id == curr_prog_id) { + bpf_set_link_xdp_fd(xsk->nameindex, -1, XDP_FLAGS_UPDATE_IF_NOEXIST); + } + free(xsk->umem); + free(xsk); +} +// TODO checkfatal should remove AF_XDP socket +// rings are single consumer, producer +// additional implementation for rx tx at one device + +void receiveXDP(struct xsk_socket_info *xsk, struct rte_ring *out_ring, struct rte_mempool *m, volatile int *flag, int coreId, RXTXStats *stats) { + setAffinity(coreId); + struct rte_mbuf *bufs[BURST_SIZE]; + REASSEMBLY_INIT + while (*flag == process) { + __u32 idx_rx = 0, idx_fq = 0; + // Get packets from AF_XDP + uint16_t rx_pkts_number = xsk_ring_cons__peek(&xsk->rx, BURST_SIZE, &idx_rx); + if (unlikely(rx_pkts_number == 0)) { + continue; + } + if (allocateMbufs(m, bufs, rx_pkts_number) != 0) { + printf("Can't allocate\n"); + } + int ret = xsk_ring_prod__reserve(&xsk->umem->fq, rx_pkts_number, &idx_fq); + while (ret != rx_pkts_number) { + if (ret < 0) { + printf("ERROR reserve\n"); + //exit_with_error(-ret); + } + ret = xsk_ring_prod__reserve(&xsk->umem->fq, rx_pkts_number, &idx_fq); + } + for (int i = 0; i < rx_pkts_number; i++) { + const struct xdp_desc *desc = xsk_ring_cons__rx_desc(&xsk->rx, idx_rx++); + uint64_t addr = desc->addr; + uint32_t len = desc->len; + void *pkt = xsk_umem__get_data(xsk->umem->mz->addr, addr); + rte_memcpy(rte_pktmbuf_mtod(bufs[i], void *), pkt, len); + rte_pktmbuf_pkt_len(bufs[i]) = len; + rte_pktmbuf_data_len(bufs[i]) = len; + *xsk_ring_prod__fill_addr(&xsk->umem->fq, idx_fq++) = addr; + } + xsk_ring_prod__submit(&xsk->umem->fq, rx_pkts_number); + xsk_ring_cons__release(&xsk->rx, rx_pkts_number); + rx_pkts_number = handleReceived(bufs, rx_pkts_number, tbl, pdeath_row); + uint16_t pushed_pkts_number = rte_ring_enqueue_burst(out_ring, (void*)bufs, rx_pkts_number, NULL); + + // Free any packets which can't be pushed to the ring. The ring is probably full. + handleUnpushed((void*)bufs, pushed_pkts_number, rx_pkts_number); +#ifdef DEBUG + receive_received += rx_pkts_number; + receive_pushed += pushed_pkts_number; +#endif // DEBUG + } + removeXDP(xsk); + *flag = wasStopped; +} + +void sendXDP(struct xsk_socket_info *xsk, struct rte_ring **in_rings, int32_t inIndexNumber, volatile int *flag, int coreId, RXTXStats *stats) { + setAffinity(coreId); + struct rte_mbuf *bufs[BURST_SIZE]; + uint16_t buf; + uint16_t tx_pkts_number; + struct xdp_desc *desc; + __u32 idx, frame_nb = 0; + int required = 0; + while (*flag == process) { + for (int q = 0; q < inIndexNumber; q++) { + // Get packets for TX from ring + uint16_t pkts_for_tx_number = rte_ring_mc_dequeue_burst(in_rings[q], (void*)bufs, BURST_SIZE, NULL); + if (pkts_for_tx_number == 0) { + continue; + } + if (xsk_ring_prod__reserve(&xsk->tx, pkts_for_tx_number, &idx) == pkts_for_tx_number) { + for (int i = 0; i < pkts_for_tx_number; i++) { + desc = xsk_ring_prod__tx_desc(&xsk->tx, idx + i); + desc->addr = (frame_nb + i) << XSK_UMEM__DEFAULT_FRAME_SHIFT; + desc->len = bufs[i]->pkt_len; + void *pkt = xsk_umem__get_data(xsk->umem->mz->addr, desc->addr); + rte_memcpy(pkt, rte_pktmbuf_mtod(bufs[i], void *), desc->len); + } + xsk_ring_prod__submit(&xsk->tx, pkts_for_tx_number); + required += pkts_for_tx_number; + frame_nb += pkts_for_tx_number; + frame_nb %= 4 * 1024; //NUM_FRAMES; + } + // Free all packets + handleUnpushed(bufs, 0, pkts_for_tx_number); + if (required > 0) { + int ret = sendto(xsk_socket__fd(xsk->xsk), NULL, 0, MSG_DONTWAIT, NULL, 0); + if (ret < 0 && errno != ENOBUFS && errno != EAGAIN && errno != EBUSY) { + printf("ERROR!!!\n"); + } + int tx_pkts_number = xsk_ring_cons__peek(&xsk->umem->cq, pkts_for_tx_number, &idx); + if (tx_pkts_number > 0) { + xsk_ring_cons__release(&xsk->umem->cq, tx_pkts_number); + required -= tx_pkts_number; + } +#ifdef DEBUG + send_required += pkts_for_tx_number; + send_sent += tx_pkts_number; +#endif // DEBUG + } + } + } + free(in_rings); + removeXDP(xsk); + *flag = wasStopped; +} + +#else // NFF_GO_SUPPORT_XDP + +struct xsk_socket_info { +}; + +struct xsk_socket_info * initXDP(char *name, int queue) { + fprintf(stderr, "AF_XDP support is disabled by build configuration\n"); + return NULL; +} + +void receiveXDP(struct xsk_socket_info *socket, struct rte_ring *out_ring, struct rte_mempool *m, volatile int *flag, int coreId, RXTXStats *stats) { + fprintf(stderr, "AF_XDP support is disabled by build configuration\n"); +} + +void sendXDP(struct xsk_socket_info *socket, struct rte_ring **in_rings, int32_t inIndexNumber, volatile int *flag, int coreId, RXTXStats *stats) { + fprintf(stderr, "AF_XDP support is disabled by build configuration\n"); +} + +#endif // NFF_GO_SUPPORT_XDP diff --git a/internal/low/low_bpf.go b/internal/low/low_bpf.go new file mode 100644 index 00000000..4b4a704f --- /dev/null +++ b/internal/low/low_bpf.go @@ -0,0 +1,12 @@ +// Copyright 2019 Intel Corporation. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +// +build bpf + +package low + +/* +#cgo LDFLAGS: -lbpf -lelf +*/ +import "C" diff --git a/mk/include.mk b/mk/include.mk index 40d5155b..4516944b 100644 --- a/mk/include.mk +++ b/mk/include.mk @@ -7,13 +7,11 @@ PROJECT_ROOT := $(abspath $(dir $(abspath $(lastword $(MAKEFILE_LIST))))/..) # Main DPDK variables -DPDK_VERSION=18.11 DPDK_DIR=dpdk -PKTGEN_VERSION=3.5.8 PKTGEN_DIR=pktgen-dpdk +export RTE_TARGET=x86_64-native-linuxapp-gcc DPDK_INSTALL_DIR=$(RTE_TARGET)-install export RTE_SDK=$(PROJECT_ROOT)/dpdk/$(DPDK_DIR)/$(DPDK_INSTALL_DIR)/usr/local/share/dpdk -export RTE_TARGET=x86_64-native-linuxapp-gcc # Configure flags for native code. Disable FSGSBASE and F16C to run in # VMs and Docker containers. @@ -55,6 +53,19 @@ $(info Checking for AVX support... no) endif endif +ifndef NFF_GO_NO_MLX_DRIVERS +ifeq (,$(findstring mlx,$(GO_BUILD_TAGS))) +export GO_BUILD_TAGS += mlx +endif +endif + +ifndef NFF_GO_NO_BPF_SUPPORT +ifeq (,$(findstring bpf,$(GO_BUILD_TAGS))) +export GO_BUILD_TAGS += bpf +endif +CFLAGS += -DNFF_GO_SUPPORT_XDP +endif + export CGO_CFLAGS = $(CFLAGS) export CGO_LDFLAGS = \ diff --git a/mk/intermediate.mk b/mk/intermediate.mk index 5e508ccb..725de8c8 100644 --- a/mk/intermediate.mk +++ b/mk/intermediate.mk @@ -7,5 +7,10 @@ TARGETS = all clean images clean-images deploy cleanall $(TARGETS): $(SUBDIRS) +included_from_dir := $(shell basename $(dir $(abspath $(word 1, $(MAKEFILE_LIST))))) +@eval $(included_from_dir): all + +targets_to_execute := $(filter-out $(included_from_dir), $(MAKECMDGOALS)) + $(SUBDIRS): - $(MAKE) -C $@ $(MAKECMDGOALS) + $(MAKE) -C $@ $(targets_to_execute) diff --git a/mk/leaf.mk b/mk/leaf.mk index 1e98c6b8..b3cfaac2 100644 --- a/mk/leaf.mk +++ b/mk/leaf.mk @@ -10,12 +10,6 @@ include $(PATH_TO_MK)/include.mk # Build all .PHONY: clean -ifndef NFF_GO_NO_MLX_DRIVERS -ifeq (,$(findstring mlx,$(GO_BUILD_TAGS))) -export GO_BUILD_TAGS += mlx -endif -endif - ifdef NFF_GO_DEBUG # Flags to build Go files without optimizations export GO_COMPILE_FLAGS += -gcflags=all='-N -l' @@ -63,6 +57,7 @@ deploy: .check-deploy-env images $(eval TMPNAME=tmp-$(IMAGENAME).tar) docker save $(WORKIMAGENAME) > $(TMPNAME) for host in `echo $(NFF_GO_HOSTS) | tr ',' ' '`; do \ + echo Uploading $(WORKIMAGENAME) to $$host; \ if ! docker -H tcp://$$host load < $(TMPNAME); then break; fi; \ done rm $(TMPNAME) diff --git a/nff-go-base/Makefile b/nff-go-base/Makefile index bfaebf29..d1a2b9ae 100644 --- a/nff-go-base/Makefile +++ b/nff-go-base/Makefile @@ -17,7 +17,8 @@ Fedora: Makefile echo 'ENV https_proxy ${https_proxy}' >> Dockerfile; \ echo 'RUN echo proxy=${http_proxy} >> /etc/dnf/dnf.conf' >> Dockerfile; \ fi - echo 'RUN dnf -y install numactl-libs.x86_64 lua-devel libmnl-devel rdma-core-devel libibverbs; dnf clean all' >> Dockerfile + echo 'RUN dnf -y install git numactl-libs.x86_64 lua-devel libmnl-devel rdma-core-devel libibverbs; dnf clean all' >> Dockerfile + echo 'RUN git clone https://github.com/libbpf/libbpf && make -C libbpf/src all install && echo "/usr/lib64" > /etc/ld.so.conf.d/usrlib64.conf && ldconfig' >> Dockerfile echo 'CMD ["/bin/bash"]' >> Dockerfile Dockerfile: Makefile @@ -29,7 +30,8 @@ Dockerfile: Makefile echo 'RUN echo Acquire::http::Proxy \"${http_proxy}\"\; >> /etc/apt/apt.conf' >> Dockerfile; \ echo 'RUN echo Acquire::https::Proxy \"${https_proxy}\"\; >> /etc/apt/apt.conf' >> Dockerfile; \ fi - echo 'RUN apt-get update; apt-get install -y pciutils libnuma-dev libpcap0.8-dev liblua5.3-dev libibverbs-dev libmnl-dev; apt-get clean all' >> Dockerfile + echo 'RUN apt-get update; apt-get install -y git pciutils libnuma-dev libpcap0.8-dev liblua5.3-dev libibverbs-dev libmnl-dev libelf-dev; apt-get clean all' >> Dockerfile + echo 'RUN git clone -b v0.0.4 https://github.com/libbpf/libbpf && make -C libbpf/src all install && echo "/usr/lib64" > /etc/ld.so.conf.d/usrlib64.conf && ldconfig' >> Dockerfile echo 'CMD ["/bin/bash"]' >> Dockerfile .PHONY: dpdk diff --git a/packet/Makefile b/packet/Makefile index d4d8019c..dfc337d5 100644 --- a/packet/Makefile +++ b/packet/Makefile @@ -7,7 +7,7 @@ include $(PATH_TO_MK)/include.mk .PHONY: testing testing: check-pktgen - go test + go test -tags "${GO_BUILD_TAGS}" .PHONY: coverage coverage: diff --git a/packet/acl.go b/packet/acl.go index 3a01dbca..bae1baec 100644 --- a/packet/acl.go +++ b/packet/acl.go @@ -14,7 +14,7 @@ // Three such functions are provided: // GetL2ACLFromJSON // GetL3ACLFromJSON -// GetL3ACLFromORIG +// GetL3ACLFromTextTable // GetL2RulesFromORIG function for L2 level is not added yet. TODO // These functions should be used before any usage of rules, however they also can be // used dynamically in parallel which make a possibility of changing rules during execution. @@ -82,9 +82,9 @@ func GetL2ACLFromJSON(filename string) (*L2Rules, error) { return &rules, rawL2Parse(&rawRules, &rules) } -// GetL2ACLFromORIG gets name of fields structed file with combined L2 rules, +// GetL2ACLFromTextTable gets name of fields structed file with combined L2 rules, // returns L2Rules -func GetL2ACLFromORIG(filename string) (*L2Rules, error) { +func GetL2ACLFromTextTable(filename string) (*L2Rules, error) { var rawRules rawL2Rules var rules L2Rules // Load Rules @@ -143,9 +143,9 @@ func readFile(filename string) ([]byte, error) { //TODO we need to construct raw structure as one after one without storing all them in memory -// GetL3ACLFromORIG gets name of fields structed file with combined L3 and L4 rules, +// GetL3ACLFromTextTable gets name of fields structed file with combined L3 and L4 rules, // returns L3Rules -func GetL3ACLFromORIG(filename string) (*L3Rules, error) { +func GetL3ACLFromTextTable(filename string) (*L3Rules, error) { var rawRules rawL3Rules var rules L3Rules // Load Rules diff --git a/packet/acl_internal_test.go b/packet/acl_internal_test.go index 86e7d0c8..f827e42d 100644 --- a/packet/acl_internal_test.go +++ b/packet/acl_internal_test.go @@ -244,10 +244,10 @@ func TestGetL2ACLFromJSON(t *testing.T) { // Function to test L2 rules parser for ORIG format // L2 rules generated, written to ORIG format, then parsed -func TestGetL2ACLFromORIG(t *testing.T) { +func TestGetL2ACLFromTextTable(t *testing.T) { testTable := generateTestL2Rules(rulesL2Ctxt, true) - tmpdir := createTmpDir("tmpTestGetL2ACLFromORIGConfigs") + tmpdir := createTmpDir("tmpTestGetL2ACLFromTextTableConfigs") for _, rule := range testTable { headerORIG := "# Source MAC, Destination MAC, L3 ID, Output port\n" @@ -259,9 +259,9 @@ func TestGetL2ACLFromORIG(t *testing.T) { log.Fatal(err) } - ruleGot, err := GetL2ACLFromORIG(tmpfile.Name()) + ruleGot, err := GetL2ACLFromTextTable(tmpfile.Name()) if err != nil { - t.Errorf("GetL2ACLFromORIG returned error %v\n\n", err) + t.Errorf("GetL2ACLFromTextTable returned error %v\n\n", err) t.FailNow() } @@ -432,11 +432,11 @@ func TestGetL3ACLFromJSON(t *testing.T) { // Function to test L3 rules parser for ORIG format // L3 rules generated, written to ORIG format, then parsed -func TestGetL3ACLFromORIG(t *testing.T) { +func TestGetL3ACLFromTextTable(t *testing.T) { // Generate IPv4 rules testTable4 := generateTestL3Rules(rulesL3Ctxt, true, true, false) - tmpdir := createTmpDir("tmpGetL3ACLFromORIGConfigs") + tmpdir := createTmpDir("tmpGetL3ACLFromTextTableConfigs") for _, rule := range testTable4 { // Create and parse ORIG file @@ -449,9 +449,9 @@ func TestGetL3ACLFromORIG(t *testing.T) { log.Fatal(err) } - ruleGot, err := GetL3ACLFromORIG(tmpfile.Name()) + ruleGot, err := GetL3ACLFromTextTable(tmpfile.Name()) if err != nil { - t.Errorf("GetL3ACLFromORIG returned error %s\n\n", err) + t.Errorf("GetL3ACLFromTextTable returned error %s\n\n", err) t.FailNow() } @@ -479,9 +479,9 @@ func TestGetL3ACLFromORIG(t *testing.T) { log.Fatal(err) } - ruleGot, err := GetL3ACLFromORIG(tmpfile.Name()) + ruleGot, err := GetL3ACLFromTextTable(tmpfile.Name()) if err != nil { - t.Errorf("GetL3ACLFromORIG returned error %s\n\n", err) + t.Errorf("GetL3ACLFromTextTable returned error %s\n\n", err) t.FailNow() } diff --git a/packet/mpls.go b/packet/mpls.go index 3c0518d7..bdd8996a 100644 --- a/packet/mpls.go +++ b/packet/mpls.go @@ -100,7 +100,7 @@ func (packet *Packet) RemoveMPLS() bool { return true } -// SetMPLSLabel sets Label (20 first bits of MPLS header to specified value). +// DecreaseTTL decreases the MPLS header TTL by 1. func (hdr *MPLSHdr) DecreaseTTL() bool { newTime := SwapBytesUint32(hdr.mpls)&0x000000ff - 1 if newTime == 0 { diff --git a/test/stability/testSingleWorkingFF/testSingleWorkingFF.go b/test/stability/testSingleWorkingFF/testSingleWorkingFF.go index 0b485d5c..0d0ff9d3 100644 --- a/test/stability/testSingleWorkingFF/testSingleWorkingFF.go +++ b/test/stability/testSingleWorkingFF/testSingleWorkingFF.go @@ -266,14 +266,14 @@ func setParameters(testScenario uint) (err error) { switch gTestType { case separate, vseparate: if testScenario != generatePart { - l3Rules, err = packet.GetL3ACLFromORIG("test-separate-l3rules.conf") + l3Rules, err = packet.GetL3ACLFromTextTable("test-separate-l3rules.conf") } // Test expects to receive 33% of packets on 0 port and 66% on 1 port lessPercent = 33 shift = 1 case split, vsplit: if testScenario != generatePart { - l3Rules, err = packet.GetL3ACLFromORIG("test-split.conf") + l3Rules, err = packet.GetL3ACLFromTextTable("test-split.conf") } // Test expects to receive 20% of packets on 0 port and 80% on 1 port lessPercent = 20 @@ -290,7 +290,7 @@ func setParameters(testScenario uint) (err error) { shift = 3 case dhandle, vdhandle: if testScenario != generatePart { - l3Rules, err = packet.GetL3ACLFromORIG("test-handle-l3rules.conf") + l3Rules, err = packet.GetL3ACLFromTextTable("test-handle-l3rules.conf") } // Test expects to receive 100% of packets on 0 port and 0% on 1 port (33% total) lessPercent = 100 diff --git a/test/stash/forwardingTestL3.go b/test/stash/forwardingTestL3.go index 87d7937e..e2cdce7c 100644 --- a/test/stash/forwardingTestL3.go +++ b/test/stash/forwardingTestL3.go @@ -47,7 +47,7 @@ func main() { l3Rules, err = packet.GetL3ACLFromJSON("forwardingTestL3_ACL.json") flow.CheckFatal(err) case "orig": - l3Rules, err = packet.GetL3ACLFromORIG("forwardingTestL3_ACL.orig") + l3Rules, err = packet.GetL3ACLFromTextTable("forwardingTestL3_ACL.orig") flow.CheckFatal(err) } diff --git a/vagrant/Vagrantfile b/vagrant/Vagrantfile index de9d615f..64fbfcd8 100644 --- a/vagrant/Vagrantfile +++ b/vagrant/Vagrantfile @@ -126,13 +126,19 @@ echo Reassigning "${syscon}" interface to system name sudo nmcli c mod "${syscon}" connection.id 'System connection' echo Unpacking Go language into /opt -(cd /opt; sudo sh -c 'curl -L -s https://dl.google.com/go/go1.12.5.linux-amd64.tar.gz | tar zx') +(cd /opt; sudo sh -c 'curl -L -s https://dl.google.com/go/go1.12.9.linux-amd64.tar.gz | tar zx') mkdir go chmod +x ~/scripts.sh . ~/scripts.sh echo -e ". ~/scripts.sh\n$(cat .bashrc)" > .bashrc setuptesthost +echo Installing libbpf +git clone -b v0.0.4 https://github.com/libbpf/libbpf +make -C libbpf/src all +sudo make -C libbpf/src install +sudo sh -c "echo /usr/lib64 > /etc/ld.so.conf.d/usrlib64.conf" + echo Downloading and building NFF-GO framework go get -v golang.org/x/tools/cmd/stringer git clone -b master --recurse-submodules http://github.com/intel-go/nff-go