From d932b391709bcfc85692b0f50485d2e22071ca45 Mon Sep 17 00:00:00 2001 From: Leon Hwang Date: Wed, 29 May 2024 22:44:01 +0800 Subject: [PATCH] fix: Combine two perf-events to one It should use one perf-event to handle all tracing events to avoid concurrent issue. Signed-off-by: Leon Hwang --- bpf/vista.c | 32 ++++------ internal/build/build.go | 3 +- internal/vista/flags.go | 4 ++ internal/vista/output.go | 23 +++---- internal/vista/output_event.go | 57 ++---------------- internal/vista/output_func.go | 2 +- internal/vista/output_iface.go | 2 +- internal/vista/output_process.go | 2 +- internal/vista/output_tcp.go | 2 +- internal/vista/tracing.go | 5 +- main.go | 100 +++++-------------------------- 11 files changed, 56 insertions(+), 176 deletions(-) diff --git a/bpf/vista.c b/bpf/vista.c index 65cee07..c0a4093 100644 --- a/bpf/vista.c +++ b/bpf/vista.c @@ -139,6 +139,7 @@ enum event_source { EVENT_SOURCE_SK = 1, EVENT_SOURCE_IPTABLES = 2, EVENT_SOURCE_TCP = 3, + EVENT_SOURCE_PCAP = 4, }; struct event_t { @@ -184,16 +185,9 @@ get_event(void) { return event; } -#define MAX_QUEUE_ENTRIES 10000 -struct { - __uint(type, BPF_MAP_TYPE_QUEUE); - __type(value, struct event_t); - __uint(max_entries, MAX_QUEUE_ENTRIES); -} events SEC(".maps"); - struct { __uint(type, BPF_MAP_TYPE_PERF_EVENT_ARRAY); -} pcap_events SEC(".maps"); +} events SEC(".maps"); #define MAX_TRACK_SIZE 1024 struct { @@ -482,7 +476,7 @@ kprobe_skb(struct sk_buff *skb, struct pt_regs *ctx, bool has_get_func_ip, event->addr = has_get_func_ip ? bpf_get_func_ip(ctx) : PT_REGS_IP(ctx); event->type = EVENT_TYPE_KPROBE; event->source = EVENT_SOURCE_SKB; - bpf_map_push_elem(&events, event, BPF_EXIST); + bpf_perf_event_output(ctx, &events, BPF_F_CURRENT_CPU, event, sizeof(*event)); return BPF_OK; } @@ -571,12 +565,11 @@ set_skb_pcap_meta(struct sk_buff *skb, struct pcap_meta *pcap, int action, bool static __always_inline void output_skb_pcap_event(struct sk_buff *skb, struct event_t *event, int action, bool is_fexit) { - u64 flags; - + event->source = EVENT_SOURCE_PCAP; set_skb_pcap_meta(skb, &event->pcap, action, is_fexit); - flags = (((u64) event->pcap.cap_len) << 32) | BPF_F_CURRENT_CPU; - bpf_skb_output(skb, &pcap_events, flags, event, __sizeof_pcap_event); + u64 flags = (((u64) event->pcap.cap_len) << 32) | BPF_F_CURRENT_CPU; + bpf_skb_output(skb, &events, flags, event, __sizeof_pcap_event); } static __noinline void @@ -594,7 +587,7 @@ handle_tc_skb(struct sk_buff *skb, void *ctx, int action, bool is_fexit, const b event->source = EVENT_SOURCE_SKB; if (!cfg->output_pcap) { - bpf_map_push_elem(&events, event, BPF_EXIST); + bpf_perf_event_output(ctx, &events, BPF_F_CURRENT_CPU, event, sizeof(*event)); return; } @@ -731,10 +724,11 @@ set_xdp_pcap_meta(struct xdp_buff *xdp, struct pcap_meta *pcap, u32 len, int act static __always_inline void output_xdp_pcap_event(struct xdp_buff *xdp, struct event_t *event, u32 len, int action, bool is_fexit) { + event->source = EVENT_SOURCE_PCAP; set_xdp_pcap_meta(xdp, &event->pcap, len, action, is_fexit); u64 flags = (((u64) event->pcap.cap_len) << 32) | BPF_F_CURRENT_CPU; - bpf_xdp_output(xdp, &pcap_events, flags, event, __sizeof_pcap_event); + bpf_xdp_output(xdp, &events, flags, event, __sizeof_pcap_event); } static __noinline void @@ -759,7 +753,7 @@ handle_xdp_buff(struct xdp_buff *xdp, void *ctx, int verdict, bool is_fexit, con event->source = EVENT_SOURCE_SKB; if (!cfg->output_pcap) { - bpf_map_push_elem(&events, event, BPF_EXIST); + bpf_perf_event_output(ctx, &events, BPF_F_CURRENT_CPU, event, sizeof(*event)); return; } @@ -854,7 +848,7 @@ ipt_do_table_exit(struct pt_regs *ctx, uint verdict) { event->addr = PT_REGS_IP(ctx); event->type = EVENT_TYPE_KPROBE; event->source = EVENT_SOURCE_IPTABLES; - bpf_map_push_elem(&events, event, BPF_EXIST); + bpf_perf_event_output(ctx, &events, BPF_F_CURRENT_CPU, event, sizeof(*event)); return BPF_OK; } @@ -1021,7 +1015,7 @@ kprobe_sk(struct sock *sk, struct pt_regs *ctx, const bool has_get_func_ip) { event->addr = has_get_func_ip ? bpf_get_func_ip(ctx) : PT_REGS_IP(ctx); event->type = EVENT_TYPE_KPROBE; event->source = EVENT_SOURCE_SK; - bpf_map_push_elem(&events, event, BPF_EXIST); + bpf_perf_event_output(ctx, &events, BPF_F_CURRENT_CPU, event, sizeof(*event)); return BPF_OK; } @@ -1117,7 +1111,7 @@ output_tcp(void *ctx, struct sock *sk, struct event_t *event) { event->skb_addr = (u64) sk; event->type = EVENT_TYPE_KPROBE; event->source = EVENT_SOURCE_TCP; - bpf_map_push_elem(&events, event, BPF_EXIST); + bpf_perf_event_output(ctx, &events, BPF_F_CURRENT_CPU, event, sizeof(*event)); } SEC("kprobe/tcp_connect") diff --git a/internal/build/build.go b/internal/build/build.go index ab00d43..76a2ee5 100644 --- a/internal/build/build.go +++ b/internal/build/build.go @@ -3,9 +3,10 @@ package build +//go:generate sh -c "echo Generating for $TARGET_GOARCH" + //go:generate go run github.com/cilium/ebpf/cmd/bpf2go -no-global-types -target $TARGET_GOARCH -cc clang -no-strip VistaFeatures ../../bpf/features.c -- -I../../bpf/headers -Wno-address-of-packed-member -//go:generate sh -c "echo Generating for $TARGET_GOARCH" //go:generate go run github.com/cilium/ebpf/cmd/bpf2go -no-global-types -target $TARGET_GOARCH -cc clang -no-strip KProbeVista ../../bpf/vista.c -- -DOUTPUT_SKB -I../../bpf/headers -Wno-address-of-packed-member //go:generate go run github.com/cilium/ebpf/cmd/bpf2go -no-global-types -target $TARGET_GOARCH -cc clang -no-strip KProbeMultiVista ../../bpf/vista.c -- -DOUTPUT_SKB -DHAS_KPROBE_MULTI -I../../bpf/headers -Wno-address-of-packed-member //go:generate go run github.com/cilium/ebpf/cmd/bpf2go -no-global-types -target $TARGET_GOARCH -cc clang -no-strip KProbeVistaWithoutOutputSKB ../../bpf/vista.c -- -I../../bpf/headers -Wno-address-of-packed-member diff --git a/internal/vista/flags.go b/internal/vista/flags.go index 51795f5..427f0a7 100644 --- a/internal/vista/flags.go +++ b/internal/vista/flags.go @@ -77,6 +77,8 @@ type Flags struct { ReadyFile string KprobeBackend string + + PerCPUBuffer uint } func (f *Flags) SetFlags() { @@ -122,6 +124,8 @@ func (f *Flags) SetFlags() { flag.StringVar(&f.KprobeBackend, "kprobe-backend", "", fmt.Sprintf("Tracing backend('%s', '%s'). Will auto-detect if not specified.", BackendKprobe, BackendKprobeMulti)) + flag.UintVar(&f.PerCPUBuffer, "output-percpu-buffer", 8192, "specified the buffer size for perf-event") + flag.StringVar(&f.FilterProto, "filter-protocol", "", "filter protocol, tcp, udp, icmp, empty for any") flag.StringVar(&f.FilterAddr, "filter-addr", "", "filter IP address") flag.Uint16Var(&f.FilterPort, "filter-port", 0, "filter port") diff --git a/internal/vista/output.go b/internal/vista/output.go index 235e9ec..0be2fca 100644 --- a/internal/vista/output.go +++ b/internal/vista/output.go @@ -29,9 +29,10 @@ const ( eventSourceSk = 1 eventSourceIptables = 2 eventSourceTCP = 3 + eventSourcePcap = 4 ) -type output struct { +type Output struct { flags *Flags lastSeenSkb map[uint64]uint64 // skb addr => last seen TS printSkbMap *ebpf.Map @@ -47,7 +48,7 @@ type output struct { func NewOutput(flags *Flags, printSkbMap *ebpf.Map, printStackMap *ebpf.Map, addr2Name Addr2Name, kprobeMulti bool, btfSpec *btf.Spec, -) (*output, error) { +) (*Output, error) { writer := os.Stdout if flags.OutputFile != "" { @@ -79,7 +80,7 @@ func NewOutput(flags *Flags, printSkbMap *ebpf.Map, printStackMap *ebpf.Map, } } - return &output{ + return &Output{ flags: flags, lastSeenSkb: map[uint64]uint64{}, printSkbMap: printSkbMap, @@ -94,7 +95,7 @@ func NewOutput(flags *Flags, printSkbMap *ebpf.Map, printStackMap *ebpf.Map, }, nil } -func (o *output) Close() { +func (o *Output) Close() { if o.writer != os.Stdout { _ = o.writer.Sync() _ = o.writer.Close() @@ -105,7 +106,7 @@ func (o *output) Close() { } } -func (o *output) PrintHeader() { +func (o *Output) PrintHeader() { if o.flags.outputTs == outputTimestampAbsolute { fmt.Fprintf(o.buf, "%12s ", "TIME") } @@ -120,7 +121,7 @@ func (o *output) PrintHeader() { o.buf.Reset() } -func (o *output) print(event *Event) { +func (o *Output) print(event *Event) { if o.flags.outputTs == outputTimestampAbsolute { fmt.Fprintf(o.buf, "%12s ", time.Now().Format(absoluteTS)) } @@ -193,20 +194,20 @@ func (o *output) print(event *Event) { } } -func (o *output) flushBuffer() { +func (o *Output) flushBuffer() { fmt.Fprintln(o.writer, o.buf.String()) o.buf.Reset() } -func (o *output) Print(event *Event) { - o.print(event) +func (o *Output) Print(ev OutputEvent) { + o.print(ev.Event) o.flushBuffer() } -func (o *output) Pcap(ev OutputEvent) error { +func (o *Output) Pcap(ev OutputEvent) error { o.print(ev.Event) - fmt.Fprintf(o.buf, "Saving this packet to %s..", o.flags.PcapFile) + fmt.Fprintf(o.buf, "\nSaving this packet to %s..", o.flags.PcapFile) o.flushBuffer() iface := o.getIfaceName(ev.Event.Meta.Netns, ev.Event.Meta.Ifindex) diff --git a/internal/vista/output_event.go b/internal/vista/output_event.go index ffc1c6a..f24361d 100644 --- a/internal/vista/output_event.go +++ b/internal/vista/output_event.go @@ -6,8 +6,6 @@ package vista import ( "fmt" "unsafe" - - "golang.org/x/sync/errgroup" ) type OutputEvent struct { @@ -16,11 +14,14 @@ type OutputEvent struct { IsPcap bool } -func NewOutputEvent(raw []byte, isPcap bool) (OutputEvent, error) { +func NewOutputEvent(raw []byte) (OutputEvent, error) { if len(raw) == 0 { return OutputEvent{}, fmt.Errorf("empty packet") } + event := (*Event)(unsafe.Pointer(&raw[0])) + isPcap := event.Source == eventSourcePcap + size := sizeofEvent if isPcap { size = sizeofPcapEvent @@ -30,8 +31,6 @@ func NewOutputEvent(raw []byte, isPcap bool) (OutputEvent, error) { return OutputEvent{}, fmt.Errorf("record too short: %d < %d", len(raw), size) } - event := (*Event)(unsafe.Pointer(&raw[0])) - if !isPcap { return OutputEvent{Event: event}, nil } @@ -46,51 +45,3 @@ func NewOutputEvent(raw []byte, isPcap bool) (OutputEvent, error) { return OutputEvent{Event: event, Packet: data, IsPcap: true}, nil } - -type EventChannels struct { - chs []chan OutputEvent - - out chan OutputEvent -} - -func NewEventChannels(chs ...chan OutputEvent) *EventChannels { - e := &EventChannels{ - chs: chs, - out: make(chan OutputEvent, 100), - } - - go e.run() - - return e -} - -func (e *EventChannels) RecvChan() <-chan OutputEvent { - return e.out -} - -func (e *EventChannels) Drain() { - for range e.out { - } -} - -func (e *EventChannels) runChan(ch chan OutputEvent) { - for ev := range ch { - e.out <- ev - } -} - -func (e *EventChannels) run() { - var errg errgroup.Group - - for _, ch := range e.chs { - ch := ch - errg.Go(func() error { - e.runChan(ch) - return nil - }) - } - - _ = errg.Wait() - - close(e.out) -} diff --git a/internal/vista/output_func.go b/internal/vista/output_func.go index 5cb2a00..23773b2 100644 --- a/internal/vista/output_func.go +++ b/internal/vista/output_func.go @@ -8,7 +8,7 @@ import ( "runtime" ) -func (o *output) getFuncName(event *Event) string { +func (o *Output) getFuncName(event *Event) string { var outFuncName string switch event.Source { diff --git a/internal/vista/output_iface.go b/internal/vista/output_iface.go index 5218776..a7e0fda 100644 --- a/internal/vista/output_iface.go +++ b/internal/vista/output_iface.go @@ -107,7 +107,7 @@ func getIfacesInNetNs(path string) (map[uint32]string, error) { return ifaces, nil } -func (o *output) getIfaceName(netnsInode, ifindex uint32) string { +func (o *Output) getIfaceName(netnsInode, ifindex uint32) string { if ifaces, ok := o.ifaceCache[uint64(netnsInode)]; ok { if name, ok := ifaces[ifindex]; ok { return fmt.Sprintf("%d(%s)", ifindex, name) diff --git a/internal/vista/output_process.go b/internal/vista/output_process.go index fec2846..7dc39ce 100644 --- a/internal/vista/output_process.go +++ b/internal/vista/output_process.go @@ -9,7 +9,7 @@ import ( "github.com/tklauser/ps" ) -func (o *output) getProcessExecName(event *Event) string { +func (o *Output) getProcessExecName(event *Event) string { var execName string if event.PID != 0 { if event.Source != eventSourceTCP { diff --git a/internal/vista/output_tcp.go b/internal/vista/output_tcp.go index f14fea9..e27881a 100644 --- a/internal/vista/output_tcp.go +++ b/internal/vista/output_tcp.go @@ -59,6 +59,6 @@ func outputTCP(w io.Writer, tcp *TCPMeta) { time.Microsecond*time.Duration(tcp.Srtt), tcp.Retrans, tcp.SkMark, nullStr(tcp.Cong[:])) if tcp.Reset != 0 { - fmt.Fprintf(w, " reset=%s", tcp.Reset) + fmt.Fprintf(w, " reset=%d", tcp.Reset) } } diff --git a/internal/vista/tracing.go b/internal/vista/tracing.go index 08433ce..3159763 100644 --- a/internal/vista/tracing.go +++ b/internal/vista/tracing.go @@ -169,7 +169,7 @@ func (t *tracing) traceProg(options *TracingOptions, prog *ebpf.Program, fentryN func (t *tracing) trace(options *TracingOptions, fentryName, fexitName, fentryPcap, fexitPcap string) error { progs, err := listBpfProgs(options.progType) if err != nil { - log.Fatalf("failed to list bpf progs: %w", err) + log.Fatalf("failed to list bpf progs: %v", err) } defer func() { for _, p := range progs { @@ -197,9 +197,6 @@ func (t *tracing) trace(options *TracingOptions, fentryName, fexitName, fentryPc if options.OutputSkb { replacedMaps["print_skb_map"] = options.Coll.Maps["print_skb_map"] } - if options.Pcap { - replacedMaps["pcap_events"] = options.Coll.Maps["pcap_events"] - } options.Opts.MapReplacements = replacedMaps t.links = make([]link.Link, 0, len(progs)*2) diff --git a/main.go b/main.go index 11e6d57..d4dd3b8 100644 --- a/main.go +++ b/main.go @@ -13,7 +13,6 @@ import ( "os" "os/signal" "syscall" - "time" "github.com/cilium/ebpf" "github.com/cilium/ebpf/btf" @@ -322,56 +321,24 @@ func main() { errg, ectx := errgroup.WithContext(ctx) - chEvent := make(chan vista.OutputEvent) - chPcap := make(chan vista.OutputEvent) - chans := vista.NewEventChannels(chEvent, chPcap) counter := vista.NewOutputCounter(flags.OutputLimitLines) - errg.Go(func() error { - defer close(chEvent) - - events := coll.Maps["events"] - return loopEvents(ectx, events, chEvent, counter) + events := coll.Maps["events"] + reader, err := perf.NewReaderWithOptions(events, int(flags.PerCPUBuffer), perf.ReaderOptions{ + Watermark: 1, }) + if err != nil { + log.Fatalf("Failed to create perf reader: %s", err) + } errg.Go(func() error { - defer close(chPcap) - - if !flags.HavePcap() { - return nil - } - - events := coll.Maps["pcap_events"] - reader, err := perf.NewReaderWithOptions(events, 8192, perf.ReaderOptions{ - Watermark: 1, - }) - if err != nil { - return fmt.Errorf("failed to create perf reader: %w", err) - } - - errg.Go(func() error { - <-ectx.Done() - _ = reader.Close() - - return nil - }) - - return loopPcapEvents(ectx, reader, chPcap, counter) + <-ectx.Done() + _ = reader.Close() + return nil }) errg.Go(func() error { - // Prevent to block event sending channels. - defer func() { go chans.Drain() }() - - for ev := range chans.RecvChan() { - if !ev.IsPcap { - output.Print(ev.Event) - } else if err := output.Pcap(ev); err != nil { - return fmt.Errorf("failed to write pcap: %w", err) - } - } - - return nil + return loopEvents(ectx, reader, counter, output) }) err = errg.Wait() @@ -382,41 +349,7 @@ func main() { var errFinished = errors.New("finished") -func loopEvents(ctx context.Context, events *ebpf.Map, ch chan<- vista.OutputEvent, counter *vista.OutputCounter) error { - next := true - for next { - event := new(vista.Event) - - for { - if err := events.LookupAndDelete(nil, event); err == nil { - break - } else if !errors.Is(err, ebpf.ErrKeyNotExist) { - return fmt.Errorf("failed to lookup event: %w", err) - } - select { - case <-ctx.Done(): - return nil - case <-time.After(time.Microsecond): - continue - } - } - - select { - case <-ctx.Done(): - return nil - - case ch <- vista.OutputEvent{ - Event: event, - }: - } - - next = counter.Next() - } - - return errFinished -} - -func loopPcapEvents(ctx context.Context, reader *perf.Reader, ch chan<- vista.OutputEvent, counter *vista.OutputCounter) error { +func loopEvents(ctx context.Context, reader *perf.Reader, counter *vista.OutputCounter, output *vista.Output) error { next := true for next { record, err := reader.Read() @@ -433,17 +366,16 @@ func loopPcapEvents(ctx context.Context, reader *perf.Reader, ch chan<- vista.Ou } raw := record.RawSample - event, err := vista.NewOutputEvent(raw, true) + event, err := vista.NewOutputEvent(raw) if err != nil { log.Printf("Failed to parse pcap event: %v\n", err) continue } - select { - case <-ctx.Done(): - return nil - - case ch <- event: + if !event.IsPcap { + output.Print(event) + } else { + output.Pcap(event) } next = counter.Next()