Skip to content

Commit

Permalink
Move handling with stats to meter
Browse files Browse the repository at this point in the history
  • Loading branch information
Yuya9786 committed Nov 2, 2023
1 parent 310d6f8 commit 02422da
Show file tree
Hide file tree
Showing 8 changed files with 246 additions and 149 deletions.
7 changes: 6 additions & 1 deletion cmd/fluvia/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,5 +51,10 @@ func main() {
ingressIfName = c.Ipfix.IngressInterface
}

client.New(ingressIfName, raddr)
interval := c.Ipfix.Interval
if interval <= 0 {
interval = 1
}

client.New(ingressIfName, raddr, interval)
}
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ go 1.20
require (
github.com/cilium/ebpf v0.11.0
github.com/google/gopacket v1.1.19
github.com/pkg/errors v0.9.1
golang.org/x/sync v0.0.0-20190423024810-112230192c58
gopkg.in/yaml.v3 v3.0.1
)

Expand Down
3 changes: 1 addition & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@ github.com/google/gopacket v1.1.19 h1:ves8RnFZPGiFnTS0uPQStjwru6uO6h+nlr9j6fL7kF
github.com/google/gopacket v1.1.19/go.mod h1:iJ8V8n6KS+z2U1A8pUwu8bW5SyEMkXJB8Yo/Vo+TKTo=
github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/rogpeppe/go-internal v1.9.0 h1:73kH8U+JUqXU8lRuOHeVHaa/SZPifC7BkcraZVejAe8=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
Expand All @@ -17,6 +15,7 @@ golang.org/x/lint v0.0.0-20200302205851-738671d3881b/go.mod h1:3xt1FjdF8hUf6vQPI
golang.org/x/mod v0.1.1-0.20191105210325-c90efee705ee/go.mod h1:QqPTAvyqsEbceGzBzNggFXnrqF1CaUcvgkdR5Ot7KZg=
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/sync v0.0.0-20190423024810-112230192c58 h1:8gQV6CLnAEikrhgkHFbMAEhagSSnXWGV915qUMm9mrU=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
Expand Down
1 change: 1 addition & 0 deletions internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ type Ipfix struct {
Address string `yaml:"address"`
Port string `yaml:"port"`
IngressInterface string `yaml:"ingress-interface"`
Interval int `yaml:"interval"`
}

type Config struct {
Expand Down
57 changes: 52 additions & 5 deletions pkg/bpf/bpf.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,12 @@
package bpf

import (
"errors"
"net"

"github.com/cilium/ebpf"
"github.com/pkg/errors"
"github.com/cilium/ebpf/link"
"github.com/cilium/ebpf/perf"
)

//go:generate go run github.com/cilium/ebpf/cmd/bpf2go -no-global-types -cc $BPF_CLANG -cflags $BPF_CFLAGS xdp ../../src/main.c -- -I../../src
Expand All @@ -19,19 +23,62 @@ type XdpMetaData struct {
SentSubsec uint32
}

func ReadXdpObjects(ops *ebpf.CollectionOptions) (*xdpObjects, error) {
type Xdp struct {
objs *xdpObjects
link link.Link
}

func ReadXdpObjects(ops *ebpf.CollectionOptions) (*Xdp, error) {
obj := &xdpObjects{}
err := loadXdpObjects(obj, ops)
if err != nil {
return nil, errors.WithStack(err)
return nil, err
}

// TODO: BPF log level remove hardcoding. yaml in config
if err != nil {
return nil, errors.WithStack(err)
return nil, err
}

return &Xdp{
objs: obj,
}, nil
}

func (x *Xdp) Attach(iface *net.Interface) error {
l, err := link.AttachXDP(link.XDPOptions{
Program: x.objs.XdpProg,
Interface: iface.Index,
Flags: link.XDPGenericMode,
})
if err != nil {
return err
}

x.link = l

return nil
}

func (x *Xdp) NewPerfReader() (*perf.Reader, error) {
return perf.NewReader(x.objs.PacketProbePerf, 4096)
}

func (x *Xdp) Close() error {
errs := []error{}
if err := x.objs.Close(); err != nil {
errs = append(errs, err)
}

if err := x.link.Close(); err != nil {
errs = append(errs, err)
}

if len(errs) > 0 {
return errors.Join(errs...)
}

return obj, nil
return nil
}

const (
Expand Down
36 changes: 17 additions & 19 deletions pkg/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,39 +7,37 @@ package client

import (
"net"
"sync"
"time"

"github.com/nttcom/fluvia/pkg/packet"
"github.com/nttcom/fluvia/pkg/ipfix"
)

type Statistic struct {
Count int64
DelayMean int64
DelayMin int64
DelayMax int64
DelaySum int64
}

type StatisticMap struct {
Mu sync.Mutex
Db map[packet.ProbeData]*Statistic
}

func New(ingressIfName string, raddr *net.UDPAddr) ClientError {
sm := StatisticMap{Db: make(map[packet.ProbeData]*Statistic)}
func New(ingressIfName string, raddr *net.UDPAddr, interval int) ClientError {
ch := make(chan []ipfix.FieldValue)
errChan := make(chan ClientError)

e := NewExporter()
go func() {
err := e.Run(raddr, &sm)
err := e.Run(raddr, ch)
if err != nil {
errChan <- ClientError{
Component: "exporter",
Error: err,
}
}
}()
go NewMeter(ingressIfName, &sm)

m := NewMeter(ingressIfName)
go func() {
err := m.Run(ch, time.Duration(interval))
if err != nil {
errChan <- ClientError{
Component: "meter",
Error: err,
}
}
m.Close()
}()

for {
clientError := <-errChan
Expand Down
67 changes: 1 addition & 66 deletions pkg/client/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,9 @@ package client
import (
"log"
"net"
"net/netip"
"os"
"time"

"github.com/nttcom/fluvia/pkg/ipfix"
"github.com/nttcom/fluvia/pkg/packet"
)

const OBSERVATION_ID uint32 = 61166
Expand All @@ -31,7 +28,7 @@ func NewExporter() *Exporter {
return e
}

func (e *Exporter) Run(raddr *net.UDPAddr, sm *StatisticMap) error {
func (e *Exporter) Run(raddr *net.UDPAddr, flowChan chan []ipfix.FieldValue) error {
conn, err := net.DialUDP("udp", nil, raddr)
if err != nil {
return err
Expand All @@ -40,68 +37,6 @@ func (e *Exporter) Run(raddr *net.UDPAddr, sm *StatisticMap) error {

var m *ipfix.Message

cache := make(map[packet.ProbeData]Statistic)
flowChan := make(chan []ipfix.FieldValue)

go func() {
ticker := time.NewTicker(1 * time.Second)
defer ticker.Stop()

for range ticker.C {
sm.Mu.Lock()
for probeData, stat := range sm.Db {
if _, ok := cache[probeData]; !ok {
cache[probeData] = Statistic{
Count: 0,
DelayMean: 0,
DelayMin: 0,
DelayMax: 0,
DelaySum: 0,
}
}

dCnt := uint64(stat.Count - cache[probeData].Count)

cache[probeData] = *stat

sl := []ipfix.SRHSegmentIPv6{}
for _, seg := range probeData.Segments {
if seg == "" {
break
}
ipSeg, _ := netip.ParseAddr(seg)

// Ignore zero values received from bpf map
if ipSeg == netip.IPv6Unspecified() {
break
}
seg := ipfix.SRHSegmentIPv6{Val: ipSeg}
sl = append(sl, seg)
}

actSeg, _ := netip.ParseAddr(probeData.Segments[probeData.SegmentsLeft])

f := []ipfix.FieldValue{
&ipfix.PacketDeltaCount{Val: dCnt},
&ipfix.SRHActiveSegmentIPv6{Val: actSeg},
&ipfix.SRHSegmentsIPv6Left{Val: probeData.SegmentsLeft},
&ipfix.SRHFlagsIPv6{Val: probeData.Flags},
&ipfix.SRHTagIPv6{Val: probeData.Tag},
&ipfix.SRHSegmentIPv6BasicList{
SegmentList: sl,
},
&ipfix.PathDelayMeanDeltaMicroseconds{Val: uint32(stat.DelayMean)},
&ipfix.PathDelayMinDeltaMicroseconds{Val: uint32(stat.DelayMin)},
&ipfix.PathDelayMaxDeltaMicroseconds{Val: uint32(stat.DelayMax)},
&ipfix.PathDelaySumDeltaMicroseconds{Val: uint32(stat.DelaySum)},
}
// Throw to channel
flowChan <- f
}
sm.Mu.Unlock()
}
}()

for {
fvs := <-flowChan
var sets []ipfix.Set
Expand Down
Loading

0 comments on commit 02422da

Please sign in to comment.