diff --git a/.github/workflows/test-ebpf-collector.yaml b/.github/workflows/test-ebpf-collector.yaml index fd0d7e1..89b0a6c 100644 --- a/.github/workflows/test-ebpf-collector.yaml +++ b/.github/workflows/test-ebpf-collector.yaml @@ -39,7 +39,7 @@ jobs: mode: start github-token: ${{ secrets.REPO_ADMIN_TOKEN }} ec2-image-id: ami-0cb91c7de36eed2cb # Ubuntu 24.04 LTS in us-east-2 - ec2-instance-type: ${{ inputs.instance-type || 'c5.9xlarge' }} + ec2-instance-type: ${{ inputs.instance-type || 'm5zn.6xlarge' }} # or c5.9xlarge market-type: spot subnet-id: ${{ secrets.AWS_SUBNET_ID }} security-group-id: ${{ secrets.AWS_SECURITY_GROUP_ID }} diff --git a/cmd/collector/main.go b/cmd/collector/main.go index f8ae5a2..8b23acf 100644 --- a/cmd/collector/main.go +++ b/cmd/collector/main.go @@ -43,7 +43,7 @@ func dumpRmidMap(objs *taskCounterObjects) { } // Convert comm to string, trimming null bytes comm := string(bytes.TrimRight(commBytes, "\x00")) - log.Printf("%d\t%d\t%s\t%d\t%d\t%d\n", + log.Printf("%d\t%d\t%s\t%d\t%d\t%d\n", i, key, comm, metadata.Tgid, metadata.Timestamp, metadata.Valid) } } @@ -203,7 +203,7 @@ func main() { record, err := rd.Read() if err != nil { - if errors.Is(err, os.ErrDeadlineExceeded) || errors.Is(err, perf.ErrFlushed) { + if errors.Is(err, os.ErrDeadlineExceeded) || errors.Is(err, perf.ErrFlushed) { break // make for loop check the select statement and set the deadline } else if errors.Is(err, perf.ErrClosed) { return @@ -224,9 +224,9 @@ func main() { continue } - log.Printf("Event - CPU: %d, Cycles: %d, Instructions: %d, LLC Misses: %d", - record.CPU, event.Cycles, event.Instructions, event.LlcMisses) + log.Printf("Event - CPU: %d, RMID: %d, Time Delta: %d ns, Cycles Delta: %d, Instructions Delta: %d, LLC Misses Delta: %d", + record.CPU, event.Rmid, event.TimeDeltaNs, event.CyclesDelta, event.InstructionsDelta, event.LlcMissesDelta) totalEvents++ } } -} \ No newline at end of file +} diff --git a/cmd/collector/task_counter.c b/cmd/collector/task_counter.c index d534083..565028d 100644 --- a/cmd/collector/task_counter.c +++ b/cmd/collector/task_counter.c @@ -9,10 +9,19 @@ // Define the event structure that matches the Go side struct event { - __u64 counter; + __u32 rmid; + __u64 cycles_delta; + __u64 instructions_delta; + __u64 llc_misses_delta; + __u64 time_delta_ns; +}; + +// Structure to store previous counter values per CPU +struct prev_counters { __u64 cycles; __u64 instructions; __u64 llc_misses; + __u64 timestamp; }; // Tracepoint event structs @@ -54,6 +63,14 @@ struct { __type(value, struct rmid_metadata); } rmid_map SEC(".maps"); +// Per-CPU map to store previous counter values +struct { + __uint(type, BPF_MAP_TYPE_PERCPU_ARRAY); + __uint(max_entries, 1); + __type(key, __u32); + __type(value, struct prev_counters); +} prev_counters_map SEC(".maps"); + // Declare the perf event arrays struct { __uint(type, BPF_MAP_TYPE_PERF_EVENT_ARRAY); @@ -95,6 +112,11 @@ void increase_count(void *ctx) { } } +// Helper function to compute delta with wraparound handling +static __u64 compute_delta(__u64 current, __u64 previous) { + return current - previous; +} + // Handler for RMID allocation events SEC("tracepoint/memory_collector/memory_collector_rmid_alloc") int handle_rmid_alloc(struct rmid_alloc_args *ctx) { @@ -193,29 +215,53 @@ int handle_rmid_existing(struct rmid_existing_args *ctx) { SEC("tracepoint/memory_collector/memory_collector_sample") int count_events(void *ctx) { struct event e = {}; - e.counter = 1; - // Read cycles from perf event + // Extract RMID from the tracepoint context + struct { + __u64 pad; // Common fields in tracepoint + __u8 is_context_switch; + __u32 rmid; + } *args = ctx; + + e.rmid = args->rmid; + + // Get current timestamp + __u64 now = bpf_ktime_get_ns(); + + // Get previous counters + __u32 zero = 0; + struct prev_counters *prev = bpf_map_lookup_elem(&prev_counters_map, &zero); + if (!prev) { + return 0; // Should never happen since it's a per-CPU array + } + + // Read current counter values struct bpf_perf_event_value cycles_val = {}; + struct bpf_perf_event_value instructions_val = {}; + struct bpf_perf_event_value llc_misses_val = {}; + int err = bpf_perf_event_read_value(&cycles, BPF_F_CURRENT_CPU, &cycles_val, sizeof(cycles_val)); if (err == 0) { - e.cycles = cycles_val.counter; + e.cycles_delta = compute_delta(cycles_val.counter, prev->cycles); + prev->cycles = cycles_val.counter; } - // Read instructions from perf event - struct bpf_perf_event_value instructions_val = {}; err = bpf_perf_event_read_value(&instructions, BPF_F_CURRENT_CPU, &instructions_val, sizeof(instructions_val)); if (err == 0) { - e.instructions = instructions_val.counter; + e.instructions_delta = compute_delta(instructions_val.counter, prev->instructions); + prev->instructions = instructions_val.counter; } - // Read LLC misses from perf event - struct bpf_perf_event_value llc_misses_val = {}; err = bpf_perf_event_read_value(&llc_misses, BPF_F_CURRENT_CPU, &llc_misses_val, sizeof(llc_misses_val)); if (err == 0) { - e.llc_misses = llc_misses_val.counter; + e.llc_misses_delta = compute_delta(llc_misses_val.counter, prev->llc_misses); + prev->llc_misses = llc_misses_val.counter; } + // Compute time delta and update timestamp + e.time_delta_ns = compute_delta(now, prev->timestamp); + prev->timestamp = now; + // Submit the event to the perf event array bpf_perf_event_output(ctx, &events, BPF_F_CURRENT_CPU, &e, sizeof(e)); diff --git a/module/collector_main.c b/module/collector_main.c index 23cfd5a..179c792 100644 --- a/module/collector_main.c +++ b/module/collector_main.c @@ -96,12 +96,9 @@ static void rdt_timer_tick(struct rdt_state *rdt_state) static void collect_sample_on_current_cpu(bool is_context_switch) { - u64 timestamp = ktime_get_ns(); - u32 cpu = smp_processor_id(); - struct cpu_state *state = this_cpu_ptr(cpu_states); - - trace_memory_collector_sample(cpu, timestamp, current->comm, is_context_switch, current->rmid); + trace_memory_collector_sample(is_context_switch, current->rmid); + struct cpu_state *state = this_cpu_ptr(cpu_states); rdt_timer_tick(&state->rdt_state); } @@ -111,12 +108,15 @@ static void probe_sched_switch(void *data, struct task_struct *next, unsigned int prev_state) { - // Collect sample for the outgoing task - collect_sample_on_current_cpu(true); - - // Update RMID if it's changing and we have hardware support - if (prev->rmid != next->rmid && rdt_hardware_support) { - rdt_write_rmid_closid(next->rmid, CLOSID_CATCHALL); + // Only collect sample if RMID is changing + if (prev->rmid != next->rmid) { + // Collect sample for the outgoing task + collect_sample_on_current_cpu(true); + + // Update RMID if we have hardware support + if (rdt_hardware_support) { + rdt_write_rmid_closid(next->rmid, CLOSID_CATCHALL); + } } } diff --git a/module/tracepoints.h b/module/tracepoints.h index 37ab8ff..82c759b 100644 --- a/module/tracepoints.h +++ b/module/tracepoints.h @@ -7,28 +7,21 @@ #include TRACE_EVENT(memory_collector_sample, - TP_PROTO(u32 cpu, u64 timestamp, const char *comm, bool is_context_switch, u32 rmid), + TP_PROTO(bool is_context_switch, u32 rmid), - TP_ARGS(cpu, timestamp, comm, is_context_switch, rmid), + TP_ARGS(is_context_switch, rmid), TP_STRUCT__entry( - __field(u32, cpu) - __field(u64, timestamp) - __array(char, comm, TASK_COMM_LEN) - __field(bool, is_context_switch) + __field(u8, is_context_switch) __field(u32, rmid) ), TP_fast_assign( - __entry->cpu = cpu; - __entry->timestamp = timestamp; - memcpy(__entry->comm, comm, TASK_COMM_LEN); __entry->is_context_switch = is_context_switch; __entry->rmid = rmid; ), - TP_printk("cpu=%u timestamp=%llu comm=%s context_switch=%d rmid=%u", - __entry->cpu, __entry->timestamp, __entry->comm, + TP_printk("context_switch=%d rmid=%u", __entry->is_context_switch, __entry->rmid) );