Skip to content

Commit

Permalink
Merge pull request #77 from yonch:main
Browse files Browse the repository at this point in the history
add timestamp-sorted perf ring reader
  • Loading branch information
yonch authored Feb 21, 2025
2 parents 423767c + 73feea9 commit 3870744
Show file tree
Hide file tree
Showing 26 changed files with 2,056 additions and 149 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ci-cpi-count.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ jobs:
push: ${{ github.event_name != 'pull_request' }}
tags: ${{ steps.docker-metadata.outputs.tags }}
labels: ${{ steps.docker-metadata.outputs.labels }}
context: cmd/cpi-count
context: .
file: cmd/cpi-count/Dockerfile
platforms: linux/amd64,linux/arm64
cache-from: type=registry,ref=ghcr.io/${{ github.repository }}/cpi-count:cache # Need to add the repository name
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/ci-prometheus-metrics.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ jobs:
push: ${{ github.event_name != 'pull_request' }}
tags: ${{ steps.docker-metadata.outputs.tags }}
labels: ${{ steps.docker-metadata.outputs.labels }}
context: cmd/prometheus_metrics
context: .
file: cmd/prometheus_metrics/Dockerfile
platforms: linux/amd64,linux/arm64
cache-from: type=registry,ref=ghcr.io/${{ github.repository }}/prometheus-metrics:cache
Expand Down
83 changes: 83 additions & 0 deletions .github/workflows/test-go-packages.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
name: Test Go Packages

on:
push:
branches: [ main ]
paths:
- 'pkg/**/*.go'
- 'go.mod'
- 'go.sum'
- '.github/workflows/test-go-packages.yaml'
pull_request:
branches: [ main ]
paths:
- 'pkg/**/*.go'
- 'go.mod'
- 'go.sum'
workflow_dispatch:

jobs:
test:
name: Run Go Tests
runs-on: ubuntu-latest

steps:
- uses: actions/checkout@v4

- name: Set up Go
uses: actions/setup-go@v5
with:
go-version: '1.21'
check-latest: true
cache: true

- name: Install dependencies
run: go mod download

- name: Install libcap
run: sudo apt-get update && sudo apt-get install -y libcap2-bin

- name: Run perf-related package tests
run: |
# Compile the test binaries for perf and perf_ebpf packages
go test -c ./pkg/perf ./pkg/perf_ebpf -race -o .
# Add CAP_PERFMON capability to the test binary
sudo setcap cap_perfmon+ep perf.test
sudo setcap cap_perfmon,cap_bpf+ep perf_ebpf.test
# Verify capability
getcap perf.test
getcap perf_ebpf.test
# Check if perf_event_paranoid is restricting access
cat /proc/sys/kernel/perf_event_paranoid
# Set perf_event_paranoid to 1
echo 1 | sudo tee /proc/sys/kernel/perf_event_paranoid
# Verify perf_event_paranoid
cat /proc/sys/kernel/perf_event_paranoid
# Run the perf tests with the permissioned binary
./perf.test -test.v
./perf_ebpf.test -test.v
- name: Run other package tests
run: |
# Run tests for all packages except perf and perf_ebpf
if [ -n "$(go list ./pkg/... | grep -v 'pkg/perf\|pkg/perf_ebpf')" ]; then
go test -v -race $(go list ./pkg/... | grep -v 'pkg/perf\|pkg/perf_ebpf')
fi
- name: Run linter
uses: golangci/golangci-lint-action@v6
with:
version: latest
args: --timeout=5m
working-directory: pkg/

- name: Check formatting
run: |
# Check if any .go files are not properly formatted
if [ -n "$(gofmt -l ./pkg)" ]; then
echo "The following files are not properly formatted:"
gofmt -l ./pkg
exit 1
fi
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
**/.DS_Store
3 changes: 3 additions & 0 deletions Dockerfile.devcontainer
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ RUN apt-get update && apt-get install -y \
${EBPF_PACKAGES} \
&& rm -rf /var/lib/apt/lists/*

# For Go development
RUN go install -v golang.org/x/tools/gopls@latest

# Create architecture-specific symlink for asm
RUN arch=$(uname -m) && \
case ${arch} in \
Expand Down
8 changes: 0 additions & 8 deletions cmd/collector/go.mod

This file was deleted.

4 changes: 0 additions & 4 deletions cmd/collector/go.sum

This file was deleted.

137 changes: 91 additions & 46 deletions cmd/collector/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,22 +3,28 @@ package main
import (
"bytes"
"encoding/binary"
"errors"
"io/ioutil"
"log"
"os"
"os/signal"
"time"
"unsafe"

"github.com/cilium/ebpf/link"
"github.com/cilium/ebpf/perf"
"github.com/cilium/ebpf/rlimit"
ourperf "github.com/unvariance/collector/pkg/perf"
"github.com/unvariance/collector/pkg/perf_ebpf"
"golang.org/x/sys/unix"
)

// Note: taskCounterEvent is auto-generated by bpf2go
// Note: taskCounterRmidMetadata is auto-generated by bpf2go

// nanotime returns monotonic time in nanoseconds. We get this from the runtime
//
//go:linkname nanotime runtime.nanotime
func nanotime() int64

// dumpRmidMap dumps all valid RMIDs and their metadata
func dumpRmidMap(objs *taskCounterObjects) {
var key uint32
Expand Down Expand Up @@ -92,12 +98,13 @@ func main() {

// Create a ReaderOptions with a large Watermark
perCPUBufferSize := 16 * os.Getpagesize()
opts := perf.ReaderOptions{
Watermark: perCPUBufferSize / 2,
opts := perf_ebpf.Options{
BufferSize: perCPUBufferSize,
WatermarkBytes: uint32(perCPUBufferSize / 2),
}

// Open a perf reader from userspace
rd, err := perf.NewReaderWithOptions(objs.Events, perCPUBufferSize, opts)
// Create our perf map reader
rd, err := perf_ebpf.NewPerfMapReader(objs.Events, opts)
if err != nil {
log.Fatal(err)
}
Expand Down Expand Up @@ -165,16 +172,17 @@ func main() {
signal.Notify(stopper, os.Interrupt)

timeout := time.After(5 * time.Second)

// set deadline in the past for rd, so it will not block
nextDeadline := time.Now().Add(time.Second)
rd.SetDeadline(nextDeadline)

log.Println("Waiting for events...")
ticker := time.NewTicker(100 * time.Millisecond)
defer ticker.Stop()

// Counter to maintain in userspace
var totalEvents uint64 = 0

// Start the reader
reader := rd.Reader()

log.Println("Waiting for events...")

for {
select {
case <-stopper:
Expand All @@ -185,48 +193,85 @@ func main() {
log.Println("Finished counting after 5 seconds")
dumpRmidMap(&objs) // Dump RMID map before exiting
return
default:

// if the deadline is in the past, set it to the next deadline
if time.Now().After(nextDeadline) {
nextDeadline = nextDeadline.Add(time.Second)
rd.SetDeadline(nextDeadline)

// output counts
var count uint64
var key uint32 = 0
if err := objs.EventCount.Lookup(&key, &count); err != nil {
log.Fatal(err)
}
log.Printf("Event count: userspace %d, eBPF %d\n", totalEvents, count)
case <-ticker.C:
// Get current monotonic timestamp before starting the batch
startTimestamp := uint64(nanotime())
log.Printf("Starting batch at timestamp: %d", startTimestamp)

if err := reader.Start(); err != nil {
log.Fatal(err)
}

record, err := rd.Read()
if err != nil {
if errors.Is(err, os.ErrDeadlineExceeded) || errors.Is(err, perf.ErrFlushed) {
break // make for loop check the select statement and set the deadline
} else if errors.Is(err, perf.ErrClosed) {
return
// Process all available events that occurred before startTimestamp
for !reader.Empty() {
// Check if next event's timestamp is after our start timestamp
ts, err := reader.PeekTimestamp()
if err != nil {
log.Printf("Error peeking timestamp: %s", err)
break
}

// Skip processing this batch if we see an event from the future
if ts > startTimestamp {
break
}
log.Printf("Reading from perf event reader: %s", err)
continue
}

if record.LostSamples != 0 {
log.Printf("Lost %d samples", record.LostSamples)
continue
ring, cpuID, err := reader.CurrentRing()
if err != nil {
log.Printf("Error getting current ring: %s", err)
break
}

// Check for lost samples
if ring.PeekType() == ourperf.PERF_RECORD_LOST {
var lostCount uint64
if err := ring.PeekCopy((*[8]byte)(unsafe.Pointer(&lostCount))[:], 8); err != nil {
log.Printf("Error reading lost count: %s", err)
} else {
log.Printf("Lost %d samples on CPU %d", lostCount, cpuID)
}
reader.Pop()
continue
}

// Parse the raw event
size, err := ring.PeekSize()
if err != nil {
log.Printf("Error getting event size: %s", err)
break
}

eventData := make([]byte, size-4)
if err := ring.PeekCopy(eventData, 4); err != nil {
log.Printf("Error copying event data: %s", err)
break
}

var event taskCounterEvent
if err := binary.Read(bytes.NewReader(eventData), binary.LittleEndian, &event); err != nil {
log.Printf("Failed to parse perf event: %s", err)
break
}

// print parsed event
log.Printf("Event - CPU: %d, RMID: %d, Time Delta: %d ns, Cycles Delta: %d, Instructions Delta: %d, LLC Misses Delta: %d, Timestamp: %d",
cpuID, event.Rmid, event.TimeDeltaNs, event.CyclesDelta, event.InstructionsDelta, event.LlcMissesDelta, event.Timestamp)
totalEvents++

reader.Pop()
}

// Parse the raw bytes into our Event struct
var event taskCounterEvent
if err := binary.Read(bytes.NewReader(record.RawSample), binary.LittleEndian, &event); err != nil {
log.Printf("Failed to parse perf event: %s", err)
continue
if err := reader.Finish(); err != nil {
log.Printf("Error finishing reader: %s", err)
}

log.Printf("Event - CPU: %d, RMID: %d, Time Delta: %d ns, Cycles Delta: %d, Instructions Delta: %d, LLC Misses Delta: %d",
record.CPU, event.Rmid, event.TimeDeltaNs, event.CyclesDelta, event.InstructionsDelta, event.LlcMissesDelta)
totalEvents++
// Output counts every second
var count uint64
var key uint32 = 0
if err := objs.EventCount.Lookup(&key, &count); err != nil {
log.Fatal(err)
}
log.Printf("Event count: userspace %d, eBPF %d\n", totalEvents, count)
}
}
}
19 changes: 12 additions & 7 deletions cmd/collector/task_counter.c
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

// Define the event structure that matches the Go side
struct event {
__u64 timestamp; // Add timestamp as first field
__u32 rmid;
__u64 cycles_delta;
__u64 instructions_delta;
Expand Down Expand Up @@ -225,8 +226,7 @@ int count_events(void *ctx) {

e.rmid = args->rmid;

// Get current timestamp
__u64 now = bpf_ktime_get_ns();
__u64 now;

// Get previous counters
__u32 zero = 0;
Expand Down Expand Up @@ -259,12 +259,17 @@ int count_events(void *ctx) {
}

// Compute time delta and update timestamp
e.time_delta_ns = compute_delta(now, prev->timestamp);
now = bpf_ktime_get_ns();
// if prev->timestamp is 0, this is the first event. We did not have the counter and timestamp values,
// so do not emit this event -- only use it to initialize the counters
if (prev->timestamp != 0) {
e.time_delta_ns = compute_delta(now, prev->timestamp);
e.timestamp = now;
// Submit the event to the perf event array
bpf_perf_event_output(ctx, &events, BPF_F_CURRENT_CPU, &e, sizeof(e));
}
prev->timestamp = now;

// Submit the event to the perf event array
bpf_perf_event_output(ctx, &events, BPF_F_CURRENT_CPU, &e, sizeof(e));


increase_count(ctx);

return 0;
Expand Down
4 changes: 2 additions & 2 deletions cmd/cpi-count/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,10 @@ COPY go.mod go.sum ./
RUN go mod download

# Copy the source code
COPY . .
COPY cmd/cpi-count/ cmd/cpi-count/

# Build the application
RUN go build -o cpi-count
RUN go build -o cpi-count ./cmd/cpi-count

# Stage 2: Create a lightweight production image
FROM alpine:latest
Expand Down
13 changes: 0 additions & 13 deletions cmd/cpi-count/go.mod

This file was deleted.

Loading

0 comments on commit 3870744

Please sign in to comment.