Skip to content

Commit

Permalink
Merge pull request #75 from yonch:main
Browse files Browse the repository at this point in the history
send measurement deltas from eBPF to userspace
  • Loading branch information
yonch authored Feb 20, 2025
2 parents 902833a + e864af9 commit 423767c
Show file tree
Hide file tree
Showing 5 changed files with 77 additions and 38 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/test-ebpf-collector.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ jobs:
mode: start
github-token: ${{ secrets.REPO_ADMIN_TOKEN }}
ec2-image-id: ami-0cb91c7de36eed2cb # Ubuntu 24.04 LTS in us-east-2
ec2-instance-type: ${{ inputs.instance-type || 'c5.9xlarge' }}
ec2-instance-type: ${{ inputs.instance-type || 'm5zn.6xlarge' }} # or c5.9xlarge
market-type: spot
subnet-id: ${{ secrets.AWS_SUBNET_ID }}
security-group-id: ${{ secrets.AWS_SECURITY_GROUP_ID }}
Expand Down
10 changes: 5 additions & 5 deletions cmd/collector/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func dumpRmidMap(objs *taskCounterObjects) {
}
// Convert comm to string, trimming null bytes
comm := string(bytes.TrimRight(commBytes, "\x00"))
log.Printf("%d\t%d\t%s\t%d\t%d\t%d\n",
log.Printf("%d\t%d\t%s\t%d\t%d\t%d\n",
i, key, comm, metadata.Tgid, metadata.Timestamp, metadata.Valid)
}
}
Expand Down Expand Up @@ -203,7 +203,7 @@ func main() {

record, err := rd.Read()
if err != nil {
if errors.Is(err, os.ErrDeadlineExceeded) || errors.Is(err, perf.ErrFlushed) {
if errors.Is(err, os.ErrDeadlineExceeded) || errors.Is(err, perf.ErrFlushed) {
break // make for loop check the select statement and set the deadline
} else if errors.Is(err, perf.ErrClosed) {
return
Expand All @@ -224,9 +224,9 @@ func main() {
continue
}

log.Printf("Event - CPU: %d, Cycles: %d, Instructions: %d, LLC Misses: %d",
record.CPU, event.Cycles, event.Instructions, event.LlcMisses)
log.Printf("Event - CPU: %d, RMID: %d, Time Delta: %d ns, Cycles Delta: %d, Instructions Delta: %d, LLC Misses Delta: %d",
record.CPU, event.Rmid, event.TimeDeltaNs, event.CyclesDelta, event.InstructionsDelta, event.LlcMissesDelta)
totalEvents++
}
}
}
}
66 changes: 56 additions & 10 deletions cmd/collector/task_counter.c
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,19 @@

// Define the event structure that matches the Go side
struct event {
__u64 counter;
__u32 rmid;
__u64 cycles_delta;
__u64 instructions_delta;
__u64 llc_misses_delta;
__u64 time_delta_ns;
};

// Structure to store previous counter values per CPU
struct prev_counters {
__u64 cycles;
__u64 instructions;
__u64 llc_misses;
__u64 timestamp;
};

// Tracepoint event structs
Expand Down Expand Up @@ -54,6 +63,14 @@ struct {
__type(value, struct rmid_metadata);
} rmid_map SEC(".maps");

// Per-CPU map to store previous counter values
struct {
__uint(type, BPF_MAP_TYPE_PERCPU_ARRAY);
__uint(max_entries, 1);
__type(key, __u32);
__type(value, struct prev_counters);
} prev_counters_map SEC(".maps");

// Declare the perf event arrays
struct {
__uint(type, BPF_MAP_TYPE_PERF_EVENT_ARRAY);
Expand Down Expand Up @@ -95,6 +112,11 @@ void increase_count(void *ctx) {
}
}

// Helper function to compute delta with wraparound handling
static __u64 compute_delta(__u64 current, __u64 previous) {
return current - previous;
}

// Handler for RMID allocation events
SEC("tracepoint/memory_collector/memory_collector_rmid_alloc")
int handle_rmid_alloc(struct rmid_alloc_args *ctx) {
Expand Down Expand Up @@ -193,29 +215,53 @@ int handle_rmid_existing(struct rmid_existing_args *ctx) {
SEC("tracepoint/memory_collector/memory_collector_sample")
int count_events(void *ctx) {
struct event e = {};
e.counter = 1;

// Read cycles from perf event
// Extract RMID from the tracepoint context
struct {
__u64 pad; // Common fields in tracepoint
__u8 is_context_switch;
__u32 rmid;
} *args = ctx;

e.rmid = args->rmid;

// Get current timestamp
__u64 now = bpf_ktime_get_ns();

// Get previous counters
__u32 zero = 0;
struct prev_counters *prev = bpf_map_lookup_elem(&prev_counters_map, &zero);
if (!prev) {
return 0; // Should never happen since it's a per-CPU array
}

// Read current counter values
struct bpf_perf_event_value cycles_val = {};
struct bpf_perf_event_value instructions_val = {};
struct bpf_perf_event_value llc_misses_val = {};

int err = bpf_perf_event_read_value(&cycles, BPF_F_CURRENT_CPU, &cycles_val, sizeof(cycles_val));
if (err == 0) {
e.cycles = cycles_val.counter;
e.cycles_delta = compute_delta(cycles_val.counter, prev->cycles);
prev->cycles = cycles_val.counter;
}

// Read instructions from perf event
struct bpf_perf_event_value instructions_val = {};
err = bpf_perf_event_read_value(&instructions, BPF_F_CURRENT_CPU, &instructions_val, sizeof(instructions_val));
if (err == 0) {
e.instructions = instructions_val.counter;
e.instructions_delta = compute_delta(instructions_val.counter, prev->instructions);
prev->instructions = instructions_val.counter;
}

// Read LLC misses from perf event
struct bpf_perf_event_value llc_misses_val = {};
err = bpf_perf_event_read_value(&llc_misses, BPF_F_CURRENT_CPU, &llc_misses_val, sizeof(llc_misses_val));
if (err == 0) {
e.llc_misses = llc_misses_val.counter;
e.llc_misses_delta = compute_delta(llc_misses_val.counter, prev->llc_misses);
prev->llc_misses = llc_misses_val.counter;
}

// Compute time delta and update timestamp
e.time_delta_ns = compute_delta(now, prev->timestamp);
prev->timestamp = now;

// Submit the event to the perf event array
bpf_perf_event_output(ctx, &events, BPF_F_CURRENT_CPU, &e, sizeof(e));

Expand Down
22 changes: 11 additions & 11 deletions module/collector_main.c
Original file line number Diff line number Diff line change
Expand Up @@ -96,12 +96,9 @@ static void rdt_timer_tick(struct rdt_state *rdt_state)

static void collect_sample_on_current_cpu(bool is_context_switch)
{
u64 timestamp = ktime_get_ns();
u32 cpu = smp_processor_id();
struct cpu_state *state = this_cpu_ptr(cpu_states);

trace_memory_collector_sample(cpu, timestamp, current->comm, is_context_switch, current->rmid);
trace_memory_collector_sample(is_context_switch, current->rmid);

struct cpu_state *state = this_cpu_ptr(cpu_states);
rdt_timer_tick(&state->rdt_state);
}

Expand All @@ -111,12 +108,15 @@ static void probe_sched_switch(void *data,
struct task_struct *next,
unsigned int prev_state)
{
// Collect sample for the outgoing task
collect_sample_on_current_cpu(true);

// Update RMID if it's changing and we have hardware support
if (prev->rmid != next->rmid && rdt_hardware_support) {
rdt_write_rmid_closid(next->rmid, CLOSID_CATCHALL);
// Only collect sample if RMID is changing
if (prev->rmid != next->rmid) {
// Collect sample for the outgoing task
collect_sample_on_current_cpu(true);

// Update RMID if we have hardware support
if (rdt_hardware_support) {
rdt_write_rmid_closid(next->rmid, CLOSID_CATCHALL);
}
}
}

Expand Down
15 changes: 4 additions & 11 deletions module/tracepoints.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,28 +7,21 @@
#include <linux/tracepoint.h>

TRACE_EVENT(memory_collector_sample,
TP_PROTO(u32 cpu, u64 timestamp, const char *comm, bool is_context_switch, u32 rmid),
TP_PROTO(bool is_context_switch, u32 rmid),

TP_ARGS(cpu, timestamp, comm, is_context_switch, rmid),
TP_ARGS(is_context_switch, rmid),

TP_STRUCT__entry(
__field(u32, cpu)
__field(u64, timestamp)
__array(char, comm, TASK_COMM_LEN)
__field(bool, is_context_switch)
__field(u8, is_context_switch)
__field(u32, rmid)
),

TP_fast_assign(
__entry->cpu = cpu;
__entry->timestamp = timestamp;
memcpy(__entry->comm, comm, TASK_COMM_LEN);
__entry->is_context_switch = is_context_switch;
__entry->rmid = rmid;
),

TP_printk("cpu=%u timestamp=%llu comm=%s context_switch=%d rmid=%u",
__entry->cpu, __entry->timestamp, __entry->comm,
TP_printk("context_switch=%d rmid=%u",
__entry->is_context_switch, __entry->rmid)
);

Expand Down

0 comments on commit 423767c

Please sign in to comment.