Skip to content

Commit

Permalink
fix: Combine two perf-events to one
Browse files Browse the repository at this point in the history
It should use one perf-event to handle all tracing events to avoid
concurrent issue.

Signed-off-by: Leon Hwang <[email protected]>
  • Loading branch information
Asphaltt committed May 29, 2024
1 parent a913db0 commit d932b39
Show file tree
Hide file tree
Showing 11 changed files with 56 additions and 176 deletions.
32 changes: 13 additions & 19 deletions bpf/vista.c
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ enum event_source {
EVENT_SOURCE_SK = 1,
EVENT_SOURCE_IPTABLES = 2,
EVENT_SOURCE_TCP = 3,
EVENT_SOURCE_PCAP = 4,
};

struct event_t {
Expand Down Expand Up @@ -184,16 +185,9 @@ get_event(void) {
return event;
}

#define MAX_QUEUE_ENTRIES 10000
struct {
__uint(type, BPF_MAP_TYPE_QUEUE);
__type(value, struct event_t);
__uint(max_entries, MAX_QUEUE_ENTRIES);
} events SEC(".maps");

struct {
__uint(type, BPF_MAP_TYPE_PERF_EVENT_ARRAY);
} pcap_events SEC(".maps");
} events SEC(".maps");

#define MAX_TRACK_SIZE 1024
struct {
Expand Down Expand Up @@ -482,7 +476,7 @@ kprobe_skb(struct sk_buff *skb, struct pt_regs *ctx, bool has_get_func_ip,
event->addr = has_get_func_ip ? bpf_get_func_ip(ctx) : PT_REGS_IP(ctx);
event->type = EVENT_TYPE_KPROBE;
event->source = EVENT_SOURCE_SKB;
bpf_map_push_elem(&events, event, BPF_EXIST);
bpf_perf_event_output(ctx, &events, BPF_F_CURRENT_CPU, event, sizeof(*event));

return BPF_OK;
}
Expand Down Expand Up @@ -571,12 +565,11 @@ set_skb_pcap_meta(struct sk_buff *skb, struct pcap_meta *pcap, int action, bool

static __always_inline void
output_skb_pcap_event(struct sk_buff *skb, struct event_t *event, int action, bool is_fexit) {
u64 flags;

event->source = EVENT_SOURCE_PCAP;
set_skb_pcap_meta(skb, &event->pcap, action, is_fexit);

flags = (((u64) event->pcap.cap_len) << 32) | BPF_F_CURRENT_CPU;
bpf_skb_output(skb, &pcap_events, flags, event, __sizeof_pcap_event);
u64 flags = (((u64) event->pcap.cap_len) << 32) | BPF_F_CURRENT_CPU;
bpf_skb_output(skb, &events, flags, event, __sizeof_pcap_event);
}

static __noinline void
Expand All @@ -594,7 +587,7 @@ handle_tc_skb(struct sk_buff *skb, void *ctx, int action, bool is_fexit, const b
event->source = EVENT_SOURCE_SKB;

if (!cfg->output_pcap) {
bpf_map_push_elem(&events, event, BPF_EXIST);
bpf_perf_event_output(ctx, &events, BPF_F_CURRENT_CPU, event, sizeof(*event));
return;
}

Expand Down Expand Up @@ -731,10 +724,11 @@ set_xdp_pcap_meta(struct xdp_buff *xdp, struct pcap_meta *pcap, u32 len, int act

static __always_inline void
output_xdp_pcap_event(struct xdp_buff *xdp, struct event_t *event, u32 len, int action, bool is_fexit) {
event->source = EVENT_SOURCE_PCAP;
set_xdp_pcap_meta(xdp, &event->pcap, len, action, is_fexit);

u64 flags = (((u64) event->pcap.cap_len) << 32) | BPF_F_CURRENT_CPU;
bpf_xdp_output(xdp, &pcap_events, flags, event, __sizeof_pcap_event);
bpf_xdp_output(xdp, &events, flags, event, __sizeof_pcap_event);
}

static __noinline void
Expand All @@ -759,7 +753,7 @@ handle_xdp_buff(struct xdp_buff *xdp, void *ctx, int verdict, bool is_fexit, con
event->source = EVENT_SOURCE_SKB;

if (!cfg->output_pcap) {
bpf_map_push_elem(&events, event, BPF_EXIST);
bpf_perf_event_output(ctx, &events, BPF_F_CURRENT_CPU, event, sizeof(*event));
return;
}

Expand Down Expand Up @@ -854,7 +848,7 @@ ipt_do_table_exit(struct pt_regs *ctx, uint verdict) {
event->addr = PT_REGS_IP(ctx);
event->type = EVENT_TYPE_KPROBE;
event->source = EVENT_SOURCE_IPTABLES;
bpf_map_push_elem(&events, event, BPF_EXIST);
bpf_perf_event_output(ctx, &events, BPF_F_CURRENT_CPU, event, sizeof(*event));

return BPF_OK;
}
Expand Down Expand Up @@ -1021,7 +1015,7 @@ kprobe_sk(struct sock *sk, struct pt_regs *ctx, const bool has_get_func_ip) {
event->addr = has_get_func_ip ? bpf_get_func_ip(ctx) : PT_REGS_IP(ctx);
event->type = EVENT_TYPE_KPROBE;
event->source = EVENT_SOURCE_SK;
bpf_map_push_elem(&events, event, BPF_EXIST);
bpf_perf_event_output(ctx, &events, BPF_F_CURRENT_CPU, event, sizeof(*event));

return BPF_OK;
}
Expand Down Expand Up @@ -1117,7 +1111,7 @@ output_tcp(void *ctx, struct sock *sk, struct event_t *event) {
event->skb_addr = (u64) sk;
event->type = EVENT_TYPE_KPROBE;
event->source = EVENT_SOURCE_TCP;
bpf_map_push_elem(&events, event, BPF_EXIST);
bpf_perf_event_output(ctx, &events, BPF_F_CURRENT_CPU, event, sizeof(*event));
}

SEC("kprobe/tcp_connect")
Expand Down
3 changes: 2 additions & 1 deletion internal/build/build.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,10 @@

package build

//go:generate sh -c "echo Generating for $TARGET_GOARCH"

//go:generate go run github.com/cilium/ebpf/cmd/bpf2go -no-global-types -target $TARGET_GOARCH -cc clang -no-strip VistaFeatures ../../bpf/features.c -- -I../../bpf/headers -Wno-address-of-packed-member

//go:generate sh -c "echo Generating for $TARGET_GOARCH"
//go:generate go run github.com/cilium/ebpf/cmd/bpf2go -no-global-types -target $TARGET_GOARCH -cc clang -no-strip KProbeVista ../../bpf/vista.c -- -DOUTPUT_SKB -I../../bpf/headers -Wno-address-of-packed-member
//go:generate go run github.com/cilium/ebpf/cmd/bpf2go -no-global-types -target $TARGET_GOARCH -cc clang -no-strip KProbeMultiVista ../../bpf/vista.c -- -DOUTPUT_SKB -DHAS_KPROBE_MULTI -I../../bpf/headers -Wno-address-of-packed-member
//go:generate go run github.com/cilium/ebpf/cmd/bpf2go -no-global-types -target $TARGET_GOARCH -cc clang -no-strip KProbeVistaWithoutOutputSKB ../../bpf/vista.c -- -I../../bpf/headers -Wno-address-of-packed-member
Expand Down
4 changes: 4 additions & 0 deletions internal/vista/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,8 @@ type Flags struct {
ReadyFile string

KprobeBackend string

PerCPUBuffer uint
}

func (f *Flags) SetFlags() {
Expand Down Expand Up @@ -122,6 +124,8 @@ func (f *Flags) SetFlags() {
flag.StringVar(&f.KprobeBackend, "kprobe-backend", "",
fmt.Sprintf("Tracing backend('%s', '%s'). Will auto-detect if not specified.", BackendKprobe, BackendKprobeMulti))

flag.UintVar(&f.PerCPUBuffer, "output-percpu-buffer", 8192, "specified the buffer size for perf-event")

flag.StringVar(&f.FilterProto, "filter-protocol", "", "filter protocol, tcp, udp, icmp, empty for any")
flag.StringVar(&f.FilterAddr, "filter-addr", "", "filter IP address")
flag.Uint16Var(&f.FilterPort, "filter-port", 0, "filter port")
Expand Down
23 changes: 12 additions & 11 deletions internal/vista/output.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,10 @@ const (
eventSourceSk = 1
eventSourceIptables = 2
eventSourceTCP = 3
eventSourcePcap = 4
)

type output struct {
type Output struct {
flags *Flags
lastSeenSkb map[uint64]uint64 // skb addr => last seen TS
printSkbMap *ebpf.Map
Expand All @@ -47,7 +48,7 @@ type output struct {

func NewOutput(flags *Flags, printSkbMap *ebpf.Map, printStackMap *ebpf.Map,
addr2Name Addr2Name, kprobeMulti bool, btfSpec *btf.Spec,
) (*output, error) {
) (*Output, error) {
writer := os.Stdout

if flags.OutputFile != "" {
Expand Down Expand Up @@ -79,7 +80,7 @@ func NewOutput(flags *Flags, printSkbMap *ebpf.Map, printStackMap *ebpf.Map,
}
}

return &output{
return &Output{
flags: flags,
lastSeenSkb: map[uint64]uint64{},
printSkbMap: printSkbMap,
Expand All @@ -94,7 +95,7 @@ func NewOutput(flags *Flags, printSkbMap *ebpf.Map, printStackMap *ebpf.Map,
}, nil
}

func (o *output) Close() {
func (o *Output) Close() {
if o.writer != os.Stdout {
_ = o.writer.Sync()
_ = o.writer.Close()
Expand All @@ -105,7 +106,7 @@ func (o *output) Close() {
}
}

func (o *output) PrintHeader() {
func (o *Output) PrintHeader() {
if o.flags.outputTs == outputTimestampAbsolute {
fmt.Fprintf(o.buf, "%12s ", "TIME")
}
Expand All @@ -120,7 +121,7 @@ func (o *output) PrintHeader() {
o.buf.Reset()
}

func (o *output) print(event *Event) {
func (o *Output) print(event *Event) {
if o.flags.outputTs == outputTimestampAbsolute {
fmt.Fprintf(o.buf, "%12s ", time.Now().Format(absoluteTS))
}
Expand Down Expand Up @@ -193,20 +194,20 @@ func (o *output) print(event *Event) {
}
}

func (o *output) flushBuffer() {
func (o *Output) flushBuffer() {
fmt.Fprintln(o.writer, o.buf.String())

o.buf.Reset()
}

func (o *output) Print(event *Event) {
o.print(event)
func (o *Output) Print(ev OutputEvent) {
o.print(ev.Event)
o.flushBuffer()
}

func (o *output) Pcap(ev OutputEvent) error {
func (o *Output) Pcap(ev OutputEvent) error {
o.print(ev.Event)
fmt.Fprintf(o.buf, "Saving this packet to %s..", o.flags.PcapFile)
fmt.Fprintf(o.buf, "\nSaving this packet to %s..", o.flags.PcapFile)
o.flushBuffer()

iface := o.getIfaceName(ev.Event.Meta.Netns, ev.Event.Meta.Ifindex)
Expand Down
57 changes: 4 additions & 53 deletions internal/vista/output_event.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@ package vista
import (
"fmt"
"unsafe"

"golang.org/x/sync/errgroup"
)

type OutputEvent struct {
Expand All @@ -16,11 +14,14 @@ type OutputEvent struct {
IsPcap bool
}

func NewOutputEvent(raw []byte, isPcap bool) (OutputEvent, error) {
func NewOutputEvent(raw []byte) (OutputEvent, error) {
if len(raw) == 0 {
return OutputEvent{}, fmt.Errorf("empty packet")
}

event := (*Event)(unsafe.Pointer(&raw[0]))
isPcap := event.Source == eventSourcePcap

size := sizeofEvent
if isPcap {
size = sizeofPcapEvent
Expand All @@ -30,8 +31,6 @@ func NewOutputEvent(raw []byte, isPcap bool) (OutputEvent, error) {
return OutputEvent{}, fmt.Errorf("record too short: %d < %d", len(raw), size)
}

event := (*Event)(unsafe.Pointer(&raw[0]))

if !isPcap {
return OutputEvent{Event: event}, nil
}
Expand All @@ -46,51 +45,3 @@ func NewOutputEvent(raw []byte, isPcap bool) (OutputEvent, error) {

return OutputEvent{Event: event, Packet: data, IsPcap: true}, nil
}

type EventChannels struct {
chs []chan OutputEvent

out chan OutputEvent
}

func NewEventChannels(chs ...chan OutputEvent) *EventChannels {
e := &EventChannels{
chs: chs,
out: make(chan OutputEvent, 100),
}

go e.run()

return e
}

func (e *EventChannels) RecvChan() <-chan OutputEvent {
return e.out
}

func (e *EventChannels) Drain() {
for range e.out {
}
}

func (e *EventChannels) runChan(ch chan OutputEvent) {
for ev := range ch {
e.out <- ev
}
}

func (e *EventChannels) run() {
var errg errgroup.Group

for _, ch := range e.chs {
ch := ch
errg.Go(func() error {
e.runChan(ch)
return nil
})
}

_ = errg.Wait()

close(e.out)
}
2 changes: 1 addition & 1 deletion internal/vista/output_func.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
"runtime"
)

func (o *output) getFuncName(event *Event) string {
func (o *Output) getFuncName(event *Event) string {
var outFuncName string

switch event.Source {
Expand Down
2 changes: 1 addition & 1 deletion internal/vista/output_iface.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ func getIfacesInNetNs(path string) (map[uint32]string, error) {
return ifaces, nil
}

func (o *output) getIfaceName(netnsInode, ifindex uint32) string {
func (o *Output) getIfaceName(netnsInode, ifindex uint32) string {
if ifaces, ok := o.ifaceCache[uint64(netnsInode)]; ok {
if name, ok := ifaces[ifindex]; ok {
return fmt.Sprintf("%d(%s)", ifindex, name)
Expand Down
2 changes: 1 addition & 1 deletion internal/vista/output_process.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
"github.com/tklauser/ps"
)

func (o *output) getProcessExecName(event *Event) string {
func (o *Output) getProcessExecName(event *Event) string {
var execName string
if event.PID != 0 {
if event.Source != eventSourceTCP {
Expand Down
2 changes: 1 addition & 1 deletion internal/vista/output_tcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,6 @@ func outputTCP(w io.Writer, tcp *TCPMeta) {
time.Microsecond*time.Duration(tcp.Srtt), tcp.Retrans, tcp.SkMark,
nullStr(tcp.Cong[:]))
if tcp.Reset != 0 {
fmt.Fprintf(w, " reset=%s", tcp.Reset)
fmt.Fprintf(w, " reset=%d", tcp.Reset)
}
}
5 changes: 1 addition & 4 deletions internal/vista/tracing.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ func (t *tracing) traceProg(options *TracingOptions, prog *ebpf.Program, fentryN
func (t *tracing) trace(options *TracingOptions, fentryName, fexitName, fentryPcap, fexitPcap string) error {
progs, err := listBpfProgs(options.progType)
if err != nil {
log.Fatalf("failed to list bpf progs: %w", err)
log.Fatalf("failed to list bpf progs: %v", err)
}
defer func() {
for _, p := range progs {
Expand Down Expand Up @@ -197,9 +197,6 @@ func (t *tracing) trace(options *TracingOptions, fentryName, fexitName, fentryPc
if options.OutputSkb {
replacedMaps["print_skb_map"] = options.Coll.Maps["print_skb_map"]
}
if options.Pcap {
replacedMaps["pcap_events"] = options.Coll.Maps["pcap_events"]
}
options.Opts.MapReplacements = replacedMaps

t.links = make([]link.Link, 0, len(progs)*2)
Expand Down
Loading

0 comments on commit d932b39

Please sign in to comment.