Skip to content

Commit

Permalink
Rework SymbolizationComplete
Browse files Browse the repository at this point in the history
Update SymbolizationComplete mechanism to reflect current semantics
around trace processing and timestamping (no batching, in-kernel
high resolution timestamps)
  • Loading branch information
christos68k committed Jan 17, 2025
1 parent fae9351 commit 870479d
Show file tree
Hide file tree
Showing 4 changed files with 47 additions and 24 deletions.
2 changes: 2 additions & 0 deletions processmanager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -353,6 +353,8 @@ func (pm *ProcessManager) SymbolizationComplete(traceCaptureKTime times.KTime) {
defer pm.mu.Unlock()

nowKTime := times.GetKTime()
log.Warnf("SymbolizationComplete captureKT: %v latency: %v ms",
traceCaptureKTime, (nowKTime-traceCaptureKTime)/1e6)

for pid, pidExitKTime := range pm.exitEvents {
if pidExitKTime > traceCaptureKTime {
Expand Down
5 changes: 1 addition & 4 deletions tracehandler/tracehandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,11 +117,8 @@ func newTraceHandler(rep reporter.TraceReporter, traceProcessor TraceProcessor,
}

func (m *traceHandler) HandleTrace(bpfTrace *host.Trace) {
defer m.traceProcessor.SymbolizationComplete(bpfTrace.KTime)
timestamp := libpf.UnixTime64(bpfTrace.KTime.UnixNano())

meta := &reporter.TraceEventMeta{
Timestamp: timestamp,
Timestamp: libpf.UnixTime64(bpfTrace.KTime.UnixNano()),
Comm: bpfTrace.Comm,
PID: bpfTrace.PID,
TID: bpfTrace.TID,
Expand Down
55 changes: 42 additions & 13 deletions tracer/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,11 @@ import (
"github.com/cilium/ebpf/perf"
log "github.com/sirupsen/logrus"

"go.opentelemetry.io/ebpf-profiler/host"
"go.opentelemetry.io/ebpf-profiler/metrics"
"go.opentelemetry.io/ebpf-profiler/process"
"go.opentelemetry.io/ebpf-profiler/support"
"go.opentelemetry.io/ebpf-profiler/times"
)

/*
Expand Down Expand Up @@ -125,29 +127,28 @@ func startPerfEventMonitor(ctx context.Context, perfEventMap *ebpf.Map,
}
}

// startPollingPerfEventMonitor spawns a goroutine that receives events from
// the given perf event map by periodically polling the perf event buffer.
// startTraceEventMonitor spawns a goroutine that receives trace events from
// the kernel by periodically polling the underlying perf event buffer.
// Events written to the perf event buffer do not wake user-land immediately.
//
// For each received event, triggerFunc is called. triggerFunc may NOT store
// references into the buffer that it is given: the buffer is re-used across
// calls. Returns a function that can be called to retrieve perf event array
// Returns a function that can be called to retrieve perf event array
// error counts.
func startPollingPerfEventMonitor(ctx context.Context, perfEventMap *ebpf.Map,
pollFrequency time.Duration, perCPUBufferSize int, triggerFunc func([]byte, int),
) func() (lost, noData, readError uint64) {
eventReader, err := perf.NewReader(perfEventMap, perCPUBufferSize)
func (t *Tracer) startTraceEventMonitor(ctx context.Context,
traceOutChan chan<- *host.Trace) func() (lost, noData, readError uint64) {
eventReader, err := perf.NewReader(t.ebpfMaps["trace_events"],
t.samplesPerSecond*int(unsafe.Sizeof(C.Trace{})))
if err != nil {
log.Fatalf("Failed to setup perf reporting via %s: %v", perfEventMap, err)
log.Fatalf("Failed to setup perf reporting via %s: %v", t.ebpfMaps["trace_events"], err)
}

// A deadline of zero is treated as "no deadline". A deadline in the past
// means "always return immediately". We thus set a deadline 1 second after
// unix epoch to always ensure the latter behavior.
eventReader.SetDeadline(time.Unix(1, 0))

pollTicker := time.NewTicker(pollFrequency)
pollTicker := time.NewTicker(t.intervals.TracePollInterval())

var oldKTime times.KTime
var lostEventsCount, readErrorCount, noDataCount atomic.Uint64
go func() {
var data perf.Record
Expand All @@ -161,13 +162,13 @@ func startPollingPerfEventMonitor(ctx context.Context, perfEventMap *ebpf.Map,
break PollLoop
}

var minKTime times.KTime
// Eagerly read events until the buffer is exhausted.
for {
if err = eventReader.ReadInto(&data); err != nil {
if !errors.Is(err, os.ErrDeadlineExceeded) {
readErrorCount.Add(1)
}

break
}
if data.LostSamples != 0 {
Expand All @@ -178,8 +179,36 @@ func startPollingPerfEventMonitor(ctx context.Context, perfEventMap *ebpf.Map,
noDataCount.Add(1)
continue
}
triggerFunc(data.RawSample, data.CPU)

// Keep track of min KTime seen in this batch processing loop
trace := t.loadBpfTrace(data.RawSample, data.CPU)
if minKTime == 0 || trace.KTime < minKTime {
minKTime = trace.KTime
}
traceOutChan <- trace
}
// After we've received and processed all trace events,
// call SymbolizationComplete if there is a pending oldKTime
// that we haven't yet propagated to the rest of the agent.
//
// This introduces both an upper bound to SymbolizationComplete
// call frequency (dictated by pollTicker) but also skips calls
// when none are needed (e.g. no trace events have been read).
//
// We use oldKTime instead of minKTime (except when the latter is
// smaller than the former) to take into account scheduling delays
// that could in theory result in observed KTime going back in time.
if oldKTime > 0 {
kt := oldKTime
if minKTime > 0 && minKTime < kt {
// If current minKTime is smaller than oldKTime, use it
// instead of oldKTime (and set it to 0 to avoid a repeat).
kt = minKTime
minKTime = 0
}
t.TraceProcessor().SymbolizationComplete(kt)
}
oldKTime = minKTime
}
}()

Expand Down
9 changes: 2 additions & 7 deletions tracer/tracer.go
Original file line number Diff line number Diff line change
Expand Up @@ -906,19 +906,14 @@ func (t *Tracer) loadBpfTrace(raw []byte, cpu int) *host.Trace {
ReturnAddress: rawFrame.return_address != 0,
}
}

return trace
}

// StartMapMonitors starts goroutines for collecting metrics and monitoring eBPF
// maps for tracepoints, new traces, trace count updates and unknown PCs.
func (t *Tracer) StartMapMonitors(ctx context.Context, traceOutChan chan *host.Trace) error {
func (t *Tracer) StartMapMonitors(ctx context.Context, traceOutChan chan<- *host.Trace) error {
eventMetricCollector := t.startEventMonitor(ctx)

startPollingPerfEventMonitor(ctx, t.ebpfMaps["trace_events"], t.intervals.TracePollInterval(),
t.samplesPerSecond*int(unsafe.Sizeof(C.Trace{})), func(rawTrace []byte, cpu int) {
traceOutChan <- t.loadBpfTrace(rawTrace, cpu)
})
t.startTraceEventMonitor(ctx, traceOutChan)

pidEvents := make([]uint32, 0)
periodiccaller.StartWithManualTrigger(ctx, t.intervals.MonitorInterval(),
Expand Down

0 comments on commit 870479d

Please sign in to comment.