From 02422da799f969d9e995787519a85e9b3e619819 Mon Sep 17 00:00:00 2001 From: Yuya Tajima Date: Thu, 2 Nov 2023 10:20:22 +0000 Subject: [PATCH] Move handling with stats to meter --- cmd/fluvia/main.go | 7 +- go.mod | 2 +- go.sum | 3 +- internal/config/config.go | 1 + pkg/bpf/bpf.go | 57 +++++++++- pkg/client/client.go | 36 +++---- pkg/client/exporter.go | 67 +----------- pkg/client/meter.go | 222 ++++++++++++++++++++++++++++---------- 8 files changed, 246 insertions(+), 149 deletions(-) diff --git a/cmd/fluvia/main.go b/cmd/fluvia/main.go index 1541761..f0851bd 100644 --- a/cmd/fluvia/main.go +++ b/cmd/fluvia/main.go @@ -51,5 +51,10 @@ func main() { ingressIfName = c.Ipfix.IngressInterface } - client.New(ingressIfName, raddr) + interval := c.Ipfix.Interval + if interval <= 0 { + interval = 1 + } + + client.New(ingressIfName, raddr, interval) } diff --git a/go.mod b/go.mod index c3d0b70..cccc02f 100644 --- a/go.mod +++ b/go.mod @@ -5,7 +5,7 @@ go 1.20 require ( github.com/cilium/ebpf v0.11.0 github.com/google/gopacket v1.1.19 - github.com/pkg/errors v0.9.1 + golang.org/x/sync v0.0.0-20190423024810-112230192c58 gopkg.in/yaml.v3 v3.0.1 ) diff --git a/go.sum b/go.sum index e5f5bf2..18d73c7 100644 --- a/go.sum +++ b/go.sum @@ -6,8 +6,6 @@ github.com/google/gopacket v1.1.19 h1:ves8RnFZPGiFnTS0uPQStjwru6uO6h+nlr9j6fL7kF github.com/google/gopacket v1.1.19/go.mod h1:iJ8V8n6KS+z2U1A8pUwu8bW5SyEMkXJB8Yo/Vo+TKTo= github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= -github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= -github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/rogpeppe/go-internal v1.9.0 h1:73kH8U+JUqXU8lRuOHeVHaa/SZPifC7BkcraZVejAe8= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= @@ -17,6 +15,7 @@ golang.org/x/lint v0.0.0-20200302205851-738671d3881b/go.mod h1:3xt1FjdF8hUf6vQPI golang.org/x/mod v0.1.1-0.20191105210325-c90efee705ee/go.mod h1:QqPTAvyqsEbceGzBzNggFXnrqF1CaUcvgkdR5Ot7KZg= golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/sync v0.0.0-20190423024810-112230192c58 h1:8gQV6CLnAEikrhgkHFbMAEhagSSnXWGV915qUMm9mrU= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= diff --git a/internal/config/config.go b/internal/config/config.go index 69de831..de82e21 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -15,6 +15,7 @@ type Ipfix struct { Address string `yaml:"address"` Port string `yaml:"port"` IngressInterface string `yaml:"ingress-interface"` + Interval int `yaml:"interval"` } type Config struct { diff --git a/pkg/bpf/bpf.go b/pkg/bpf/bpf.go index 29433dc..fe5a2db 100644 --- a/pkg/bpf/bpf.go +++ b/pkg/bpf/bpf.go @@ -7,8 +7,12 @@ package bpf import ( + "errors" + "net" + "github.com/cilium/ebpf" - "github.com/pkg/errors" + "github.com/cilium/ebpf/link" + "github.com/cilium/ebpf/perf" ) //go:generate go run github.com/cilium/ebpf/cmd/bpf2go -no-global-types -cc $BPF_CLANG -cflags $BPF_CFLAGS xdp ../../src/main.c -- -I../../src @@ -19,19 +23,62 @@ type XdpMetaData struct { SentSubsec uint32 } -func ReadXdpObjects(ops *ebpf.CollectionOptions) (*xdpObjects, error) { +type Xdp struct { + objs *xdpObjects + link link.Link +} + +func ReadXdpObjects(ops *ebpf.CollectionOptions) (*Xdp, error) { obj := &xdpObjects{} err := loadXdpObjects(obj, ops) if err != nil { - return nil, errors.WithStack(err) + return nil, err } // TODO: BPF log level remove hardcoding. yaml in config if err != nil { - return nil, errors.WithStack(err) + return nil, err + } + + return &Xdp{ + objs: obj, + }, nil +} + +func (x *Xdp) Attach(iface *net.Interface) error { + l, err := link.AttachXDP(link.XDPOptions{ + Program: x.objs.XdpProg, + Interface: iface.Index, + Flags: link.XDPGenericMode, + }) + if err != nil { + return err + } + + x.link = l + + return nil +} + +func (x *Xdp) NewPerfReader() (*perf.Reader, error) { + return perf.NewReader(x.objs.PacketProbePerf, 4096) +} + +func (x *Xdp) Close() error { + errs := []error{} + if err := x.objs.Close(); err != nil { + errs = append(errs, err) + } + + if err := x.link.Close(); err != nil { + errs = append(errs, err) + } + + if len(errs) > 0 { + return errors.Join(errs...) } - return obj, nil + return nil } const ( diff --git a/pkg/client/client.go b/pkg/client/client.go index 375bc0f..0175ad0 100755 --- a/pkg/client/client.go +++ b/pkg/client/client.go @@ -7,31 +7,18 @@ package client import ( "net" - "sync" + "time" - "github.com/nttcom/fluvia/pkg/packet" + "github.com/nttcom/fluvia/pkg/ipfix" ) -type Statistic struct { - Count int64 - DelayMean int64 - DelayMin int64 - DelayMax int64 - DelaySum int64 -} - -type StatisticMap struct { - Mu sync.Mutex - Db map[packet.ProbeData]*Statistic -} - -func New(ingressIfName string, raddr *net.UDPAddr) ClientError { - sm := StatisticMap{Db: make(map[packet.ProbeData]*Statistic)} +func New(ingressIfName string, raddr *net.UDPAddr, interval int) ClientError { + ch := make(chan []ipfix.FieldValue) errChan := make(chan ClientError) e := NewExporter() go func() { - err := e.Run(raddr, &sm) + err := e.Run(raddr, ch) if err != nil { errChan <- ClientError{ Component: "exporter", @@ -39,7 +26,18 @@ func New(ingressIfName string, raddr *net.UDPAddr) ClientError { } } }() - go NewMeter(ingressIfName, &sm) + + m := NewMeter(ingressIfName) + go func() { + err := m.Run(ch, time.Duration(interval)) + if err != nil { + errChan <- ClientError{ + Component: "meter", + Error: err, + } + } + m.Close() + }() for { clientError := <-errChan diff --git a/pkg/client/exporter.go b/pkg/client/exporter.go index 5cf0ef3..5ce4151 100644 --- a/pkg/client/exporter.go +++ b/pkg/client/exporter.go @@ -8,12 +8,9 @@ package client import ( "log" "net" - "net/netip" "os" - "time" "github.com/nttcom/fluvia/pkg/ipfix" - "github.com/nttcom/fluvia/pkg/packet" ) const OBSERVATION_ID uint32 = 61166 @@ -31,7 +28,7 @@ func NewExporter() *Exporter { return e } -func (e *Exporter) Run(raddr *net.UDPAddr, sm *StatisticMap) error { +func (e *Exporter) Run(raddr *net.UDPAddr, flowChan chan []ipfix.FieldValue) error { conn, err := net.DialUDP("udp", nil, raddr) if err != nil { return err @@ -40,68 +37,6 @@ func (e *Exporter) Run(raddr *net.UDPAddr, sm *StatisticMap) error { var m *ipfix.Message - cache := make(map[packet.ProbeData]Statistic) - flowChan := make(chan []ipfix.FieldValue) - - go func() { - ticker := time.NewTicker(1 * time.Second) - defer ticker.Stop() - - for range ticker.C { - sm.Mu.Lock() - for probeData, stat := range sm.Db { - if _, ok := cache[probeData]; !ok { - cache[probeData] = Statistic{ - Count: 0, - DelayMean: 0, - DelayMin: 0, - DelayMax: 0, - DelaySum: 0, - } - } - - dCnt := uint64(stat.Count - cache[probeData].Count) - - cache[probeData] = *stat - - sl := []ipfix.SRHSegmentIPv6{} - for _, seg := range probeData.Segments { - if seg == "" { - break - } - ipSeg, _ := netip.ParseAddr(seg) - - // Ignore zero values received from bpf map - if ipSeg == netip.IPv6Unspecified() { - break - } - seg := ipfix.SRHSegmentIPv6{Val: ipSeg} - sl = append(sl, seg) - } - - actSeg, _ := netip.ParseAddr(probeData.Segments[probeData.SegmentsLeft]) - - f := []ipfix.FieldValue{ - &ipfix.PacketDeltaCount{Val: dCnt}, - &ipfix.SRHActiveSegmentIPv6{Val: actSeg}, - &ipfix.SRHSegmentsIPv6Left{Val: probeData.SegmentsLeft}, - &ipfix.SRHFlagsIPv6{Val: probeData.Flags}, - &ipfix.SRHTagIPv6{Val: probeData.Tag}, - &ipfix.SRHSegmentIPv6BasicList{ - SegmentList: sl, - }, - &ipfix.PathDelayMeanDeltaMicroseconds{Val: uint32(stat.DelayMean)}, - &ipfix.PathDelayMinDeltaMicroseconds{Val: uint32(stat.DelayMin)}, - &ipfix.PathDelayMaxDeltaMicroseconds{Val: uint32(stat.DelayMax)}, - &ipfix.PathDelaySumDeltaMicroseconds{Val: uint32(stat.DelaySum)}, - } - // Throw to channel - flowChan <- f - } - sm.Mu.Unlock() - } - }() - for { fvs := <-flowChan var sets []ipfix.Set diff --git a/pkg/client/meter.go b/pkg/client/meter.go index f6b4fbb..92865e6 100644 --- a/pkg/client/meter.go +++ b/pkg/client/meter.go @@ -8,37 +8,61 @@ package client import ( "bytes" + "context" "encoding/binary" "errors" "fmt" "io/ioutil" "log" "net" + "net/netip" "strconv" "strings" + "sync" "time" "unsafe" "github.com/cilium/ebpf" - "github.com/cilium/ebpf/link" - "github.com/cilium/ebpf/perf" "github.com/nttcom/fluvia/pkg/bpf" + "github.com/nttcom/fluvia/pkg/ipfix" "github.com/nttcom/fluvia/pkg/packet" + "golang.org/x/sync/errgroup" ) -func NewMeter(ingressIfName string, sm *StatisticMap) { +type Stats struct { + Count int64 + DelayMean int64 + DelayMin int64 + DelayMax int64 + DelaySum int64 +} + +type StatsMap struct { + Mu sync.RWMutex + Db map[packet.ProbeData]*Stats +} + +type Meter struct { + statsMap *StatsMap + bootTime time.Time + xdp *bpf.Xdp +} + +func NewMeter(ingressIfName string) *Meter { bootTime, err := getSystemBootTime() if err != nil { log.Fatalf("Could not get boot time: %s", err) } + statsMap := StatsMap{Db: make(map[packet.ProbeData]*Stats)} + iface, err := net.InterfaceByName(ingressIfName) if err != nil { log.Fatalf("lookup network iface %q: %s", ingressIfName, err) } // Load the XDP program - objs, err := bpf.ReadXdpObjects(&ebpf.CollectionOptions{ + xdp, err := bpf.ReadXdpObjects(&ebpf.CollectionOptions{ Programs: ebpf.ProgramOptions{ LogLevel: ebpf.LogLevelInstruction, LogSize: ebpf.DefaultVerifierLogSize * 256, @@ -50,82 +74,170 @@ func NewMeter(ingressIfName string, sm *StatisticMap) { log.Fatalf("Could not load XDP program: %+v\n", ve) } } - defer objs.Close() // Attach the XDP program. - l, err := link.AttachXDP(link.XDPOptions{ - Program: objs.XdpProg, - Interface: iface.Index, - Flags: link.XDPGenericMode, - }) - if err != nil { + if err = xdp.Attach(iface); err != nil { log.Fatalf("Could not attach XDP program: %s", err) } - defer l.Close() - perfEvent, err := perf.NewReader(objs.PacketProbePerf, 4096) + log.Printf("Attached XDP program to iface %q (index %d)", iface.Name, iface.Index) + log.Printf("Press Ctrl-C to exit and remove the program") + + return &Meter{ + statsMap: &statsMap, + bootTime: bootTime, + xdp: xdp, + } +} + +func (m *Meter) Run(flowChan chan []ipfix.FieldValue, interval time.Duration) error { + eg, ctx := errgroup.WithContext(context.Background()) + eg.Go(func() error { + return m.Read(ctx) + }) + eg.Go(func() error { + return m.Send(ctx, flowChan, interval) + }) + + if err := eg.Wait(); err != nil { + return err + } + + return nil +} + +func (m *Meter) Read(ctx context.Context) error { + perfEvent, err := m.xdp.NewPerfReader() if err != nil { log.Fatalf("Could not obtain perf reader: %s", err) } - log.Printf("Attached XDP program to iface %q (index %d)", iface.Name, iface.Index) - log.Printf("Press Ctrl-C to exit and remove the program") - var metadata bpf.XdpMetaData for { - eventData, err := perfEvent.Read() - if err != nil { - log.Fatalf("Could not read from bpf perf map:") - } + select { + case <-ctx.Done(): + return nil + default: + eventData, err := perfEvent.Read() + if err != nil { + log.Fatalf("Could not read from bpf perf map:") + } - reader := bytes.NewReader(eventData.RawSample) + reader := bytes.NewReader(eventData.RawSample) - if err := binary.Read(reader, binary.LittleEndian, &metadata); err != nil { - log.Fatalf("Could not read from reader: %s", err) - } + if err := binary.Read(reader, binary.LittleEndian, &metadata); err != nil { + log.Fatalf("Could not read from reader: %s", err) + } - metadata_size := unsafe.Sizeof(metadata) - if len(eventData.RawSample)-int(metadata_size) <= 0 { - continue - } + metadata_size := unsafe.Sizeof(metadata) + if len(eventData.RawSample)-int(metadata_size) <= 0 { + continue + } - receivedNano := bootTime.Add(time.Duration(metadata.ReceivedNano) * time.Nanosecond) - SentNano := time.Unix(int64(metadata.SentSec), int64(metadata.SentSubsec)) + receivedNano := m.bootTime.Add(time.Duration(metadata.ReceivedNano) * time.Nanosecond) + SentNano := time.Unix(int64(metadata.SentSec), int64(metadata.SentSubsec)) - delay := receivedNano.Sub(SentNano) + delay := receivedNano.Sub(SentNano) - probeData, err := packet.Parse(eventData.RawSample[metadata_size:]) - if err != nil { - log.Fatalf("Could not parse the packet: %s", err) - } + probeData, err := packet.Parse(eventData.RawSample[metadata_size:]) + if err != nil { + log.Fatalf("Could not parse the packet: %s", err) + } - delayMicro := delay.Microseconds() + delayMicro := delay.Microseconds() - sm.Mu.Lock() - if value, ok := sm.Db[*probeData]; !ok { - sm.Db[*probeData] = &Statistic{ - Count: 1, - DelayMean: delayMicro, - DelayMin: delayMicro, - DelayMax: delayMicro, - DelaySum: delayMicro, - } - } else { - value.Count = value.Count + 1 + m.statsMap.Mu.Lock() + if value, ok := m.statsMap.Db[*probeData]; !ok { + m.statsMap.Db[*probeData] = &Stats{ + Count: 1, + DelayMean: delayMicro, + DelayMin: delayMicro, + DelayMax: delayMicro, + DelaySum: delayMicro, + } + } else { + value.Count = value.Count + 1 - if delayMicro < value.DelayMin { - value.DelayMin = delayMicro - } + if delayMicro < value.DelayMin { + value.DelayMin = delayMicro + } + + if delayMicro > value.DelayMax { + value.DelayMax = delayMicro + } - if delayMicro > value.DelayMax { - value.DelayMax = delayMicro + value.DelaySum = value.DelaySum + delayMicro + value.DelayMean = value.DelaySum / value.Count } + m.statsMap.Mu.Unlock() + } + } +} - value.DelaySum = value.DelaySum + delayMicro - value.DelayMean = value.DelaySum / value.Count +func (m *Meter) Send(ctx context.Context, flowChan chan []ipfix.FieldValue, intervalSec time.Duration) error { + ticker := time.NewTicker(intervalSec * time.Second) + defer ticker.Stop() + + for range ticker.C { + select { + case <-ctx.Done(): + return nil + default: + m.statsMap.Mu.Lock() + for probeData, stat := range m.statsMap.Db { + dCnt := uint64(stat.Count) + + sl := []ipfix.SRHSegmentIPv6{} + for _, seg := range probeData.Segments { + if seg == "" { + break + } + ipSeg, _ := netip.ParseAddr(seg) + + // Ignore zero values received from bpf map + if ipSeg == netip.IPv6Unspecified() { + break + } + seg := ipfix.SRHSegmentIPv6{Val: ipSeg} + sl = append(sl, seg) + } + + actSeg, _ := netip.ParseAddr(probeData.Segments[probeData.SegmentsLeft]) + + f := []ipfix.FieldValue{ + &ipfix.PacketDeltaCount{Val: dCnt}, + &ipfix.SRHActiveSegmentIPv6{Val: actSeg}, + &ipfix.SRHSegmentsIPv6Left{Val: probeData.SegmentsLeft}, + &ipfix.SRHFlagsIPv6{Val: probeData.Flags}, + &ipfix.SRHTagIPv6{Val: probeData.Tag}, + &ipfix.SRHSegmentIPv6BasicList{ + SegmentList: sl, + }, + &ipfix.PathDelayMeanDeltaMicroseconds{Val: uint32(stat.DelayMean)}, + &ipfix.PathDelayMinDeltaMicroseconds{Val: uint32(stat.DelayMin)}, + &ipfix.PathDelayMaxDeltaMicroseconds{Val: uint32(stat.DelayMax)}, + &ipfix.PathDelaySumDeltaMicroseconds{Val: uint32(stat.DelaySum)}, + } + // Throw to channel + flowChan <- f + + // Stats (e.g., DelayMean) are based on packets received over a fixed duration + // These need to be cleared out for the next calculation of statistics + delete(m.statsMap.Db, probeData) + } + m.statsMap.Mu.Unlock() } - sm.Mu.Unlock() } + + return nil +} + +func (m *Meter) Close() error { + if err := m.xdp.Close(); err != nil { + return err + } + + return nil } func getSystemBootTime() (time.Time, error) {