From 7f9050b94d3e6584117ba938c11bcd451c445297 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BC=A0=E9=93=AD=E8=BD=A9?= <11397959+zhangmingxuan777@user.noreply.gitee.com> Date: Fri, 6 Sep 2024 19:26:48 +0800 Subject: [PATCH] add the redis and stat the key --- .../net_watcher/common.bpf.h | 17 +- .../net_watcher/netwatcher.bpf.c | 14 +- .../net_watcher/netwatcher.c | 170 ++++++++++++++++-- .../net_watcher/netwatcher.h | 8 + .../Network_Subsystem/net_watcher/redis.bpf.h | 99 ++++++++++ 5 files changed, 292 insertions(+), 16 deletions(-) diff --git a/eBPF_Supermarket/Network_Subsystem/net_watcher/common.bpf.h b/eBPF_Supermarket/Network_Subsystem/net_watcher/common.bpf.h index 4e92f5633..8b6d336bf 100644 --- a/eBPF_Supermarket/Network_Subsystem/net_watcher/common.bpf.h +++ b/eBPF_Supermarket/Network_Subsystem/net_watcher/common.bpf.h @@ -235,6 +235,11 @@ struct { __uint(max_entries, 256 * 1024); } trace_rb SEC(".maps"); +struct { + __uint(type, BPF_MAP_TYPE_RINGBUF); + __uint(max_entries, 256 * 1024); +} redis_stat_rb SEC(".maps"); + struct { __uint(type, BPF_MAP_TYPE_RINGBUF); __uint(max_entries, 256 * 1024); @@ -352,13 +357,21 @@ struct { __type(value, u64); __uint(max_entries, 1024); } counters SEC(".maps"); + +struct { + __uint(type, BPF_MAP_TYPE_HASH); + __type(key, char*); // 键的最大长度,假设为 256 字节 + __type(value, u32); // 计数值 + __uint(max_entries, 1024); // 最大条目数 +} key_count SEC(".maps"); + const volatile int filter_dport = 0; const volatile int filter_sport = 0; const volatile int all_conn = 0, err_packet = 0, extra_conn_info = 0, layer_time = 0, http_info = 0, retrans_info = 0, udp_info = 0, net_filter = 0, drop_reason = 0, icmp_info = 0, tcp_info = 0, dns_info = 0, stack_info = 0, mysql_info = 0, - redis_info = 0, rtt_info = 0, rst_info = 0; + redis_info = 0, rtt_info = 0, rst_info = 0, redis_stat = 0; /* help macro */ @@ -633,4 +646,4 @@ static __always_inline u64 log2l(u64 v) { } /* help functions end */ -#endif \ No newline at end of file +#endif diff --git a/eBPF_Supermarket/Network_Subsystem/net_watcher/netwatcher.bpf.c b/eBPF_Supermarket/Network_Subsystem/net_watcher/netwatcher.bpf.c index 46834bfc4..42582103b 100644 --- a/eBPF_Supermarket/Network_Subsystem/net_watcher/netwatcher.bpf.c +++ b/eBPF_Supermarket/Network_Subsystem/net_watcher/netwatcher.bpf.c @@ -314,10 +314,20 @@ int BPF_KPROBE(query__start) { return __handle_mysql_start(ctx); } SEC("uretprobe/_Z16dispatch_commandP3THDPK8COM_DATA19enum_server_command") int BPF_KPROBE(query__end) { return __handle_mysql_end(ctx); } +//redis SEC("uprobe/processCommand") -int BPF_KPROBE(query__start_redis_process) { return __handle_redis_start(ctx); } +int BPF_KPROBE(redis_processCommand) { return __handle_redis_start(ctx); } SEC("uretprobe/call") -int BPF_KPROBE(query__end_redis) { return __handle_redis_end(ctx); } +int BPF_KPROBE(redis_call) { return __handle_redis_end(ctx); } + +SEC("uprobe/lookupKey") +int BPF_UPROBE(redis_lookupKey) { + return __handle_redis_key(ctx); +} +SEC("uprobe/addReply") +int BPF_UPROBE(redis_addReply) { + return __handle_redis_value(ctx); +} // rtt SEC("kprobe/tcp_rcv_established") int BPF_KPROBE(tcp_rcv_established, struct sock *sk, struct sk_buff *skb) { diff --git a/eBPF_Supermarket/Network_Subsystem/net_watcher/netwatcher.c b/eBPF_Supermarket/Network_Subsystem/net_watcher/netwatcher.c index 7c409a890..6956a1ae4 100644 --- a/eBPF_Supermarket/Network_Subsystem/net_watcher/netwatcher.c +++ b/eBPF_Supermarket/Network_Subsystem/net_watcher/netwatcher.c @@ -40,12 +40,21 @@ static char err_file_path[1024]; static char packets_file_path[1024]; static char udp_file_path[1024]; +// 用于存储从 eBPF map 读取的数据 +typedef struct { + char key[256]; + u32 value; +} kv_pair; + +static int map_fd; + static int sport = 0, dport = 0; // for filter static int all_conn = 0, err_packet = 0, extra_conn_info = 0, layer_time = 0, http_info = 0, retrans_info = 0, udp_info = 0, net_filter = 0, drop_reason = 0, addr_to_func = 0, icmp_info = 0, tcp_info = 0, time_load = 0, dns_info = 0, stack_info = 0, mysql_info = 0, - redis_info = 0, count_info = 0, rtt_info = 0, rst_info = 0; // flag + redis_info = 0, count_info = 0, rtt_info = 0, rst_info = 0, + redis_stat = 0; // flag static const char *tcp_states[] = { [1] = "ESTABLISHED", [2] = "SYN_SENT", [3] = "SYN_RECV", @@ -81,6 +90,7 @@ static const struct argp_option opts[] = { "set to trace mysql information info include Pid 进程id、Comm " "进程名、Size sql语句字节大小、Sql 语句"}, {"redis", 'R', 0, 0}, + {"redis-stat", 'b', 0, 0}, {"count", 'C', "NUMBER", 0, "specify the time to count the number of requests"}, {"rtt", 'T', 0, 0, "set to trace rtt"}, @@ -153,6 +163,9 @@ static error_t parse_arg(int key, char *arg, struct argp_state *state) { case 'U': rst_info = 1; break; + case 'b': + redis_stat = 1; + break; case 'C': count_info = strtoul(arg, &end, 10); break; @@ -182,6 +195,7 @@ enum MonitorMode { MODE_DNS, MODE_MYSQL, MODE_REDIS, + MODE_REDIS_STAT, MODE_RTT, MODE_RST, MODE_DEFAULT @@ -204,6 +218,8 @@ enum MonitorMode get_monitor_mode() { return MODE_MYSQL; } else if (redis_info) { return MODE_REDIS; + } else if (redis_stat) { + return MODE_REDIS_STAT; } else if (rtt_info) { return MODE_RTT; } else if (rst_info) { @@ -441,6 +457,7 @@ static void set_rodata_flags(struct netwatcher_bpf *skel) { skel->rodata->stack_info = stack_info; skel->rodata->mysql_info = mysql_info; skel->rodata->redis_info = redis_info; + skel->rodata->redis_stat = redis_stat; skel->rodata->rtt_info = rtt_info; skel->rodata->rst_info = rst_info; } @@ -599,9 +616,13 @@ static void set_disable_load(struct netwatcher_bpf *skel) { mysql_info ? true : false); bpf_program__set_autoload(skel->progs.query__end, mysql_info ? true : false); - bpf_program__set_autoload(skel->progs.query__end_redis, + bpf_program__set_autoload(skel->progs.redis_addReply, + redis_stat ? true : false); + bpf_program__set_autoload(skel->progs.redis_lookupKey, + redis_stat ? true : false); + bpf_program__set_autoload(skel->progs.redis_processCommand, redis_info ? true : false); - bpf_program__set_autoload(skel->progs.query__start_redis_process, + bpf_program__set_autoload(skel->progs.redis_call, redis_info ? true : false); bpf_program__set_autoload(skel->progs.tcp_rcv_established, (all_conn || err_packet || extra_conn_info || @@ -682,6 +703,13 @@ static void print_header(enum MonitorMode mode) { printf("%-20s %-20s %-20s %-20s %-20s \n", "Pid", "Comm", "Size", "Redis", "duration/μs"); break; + case MODE_REDIS_STAT: + printf("===============================================================" + "====================REDIS " + "INFORMATION====================================================" + "============================\n"); + printf("%-20s %-20s %-20s %-20s %-20s %-20s\n", "Pid", "Comm", "key", "Key_count","Value_Type","Value"); + break; case MODE_RTT: printf("===============================================================" "====================RTT " @@ -844,7 +872,7 @@ static int print_conns(struct netwatcher_bpf *skel) { static int print_packet(void *ctx, void *packet_info, size_t size) { if (udp_info || net_filter || drop_reason || icmp_info || tcp_info || - dns_info || mysql_info || redis_info || rtt_info) + dns_info || mysql_info || redis_info || redis_info || rtt_info ) return 0; const struct pack_t *pack_info = packet_info; if (pack_info->mac_time > MAXTIME || pack_info->ip_time > MAXTIME || @@ -1246,6 +1274,54 @@ static int print_redis(void *ctx, void *packet_info, size_t size) { strcpy(redis, ""); return 0; } +static int process_redis_first(char flag,char *message) { + if(flag=='+') + { + strcpy(message, "Status Reply"); + } + else if (flag=='-') + { + strcpy(message, "Error Reply"); + } + else if (flag==':') + { + strcpy(message, "Integer Reply"); + } + else if (flag=='$') + { + strcpy(message, "Bulk String Reply"); + } + else if (flag=='*') + { + strcpy(message, "Array Reply"); + } + else{ + strcpy(message, "Unknown Type"); + } + return 0; +} + +static int print_redis_stat(void *ctx, void *packet_info, size_t size) { + if (!redis_stat) { + return 0; + } + char message[20]={}; + const struct redis_stat_query *pack_info = packet_info; + if(pack_info->key_count) + { + printf("%-20d %-20s %-20s %-20d %-20s %-20s\n", pack_info->pid, pack_info->comm, + pack_info->key,pack_info->key_count,"-","-"); + } + else + { + process_redis_first(pack_info->value[0],message); + printf("%-20d %-20s %-20s %-20s %-20s %-20s\n", pack_info->pid, pack_info->comm, + "-","-",message,pack_info->value); + } + + return 0; +} + static int libbpf_print_fn(enum libbpf_print_level level, const char *format, va_list args) { return vfprintf(stderr, format, args); @@ -1348,10 +1424,61 @@ int attach_uprobe_mysql(struct netwatcher_bpf *skel) { return 0; } int attach_uprobe_redis(struct netwatcher_bpf *skel) { - ATTACH_UPROBE_CHECKED(skel, call, query__end_redis); - ATTACH_UPROBE_CHECKED(skel, processCommand, query__start_redis_process); + if(redis_info){ + ATTACH_UPROBE_CHECKED(skel, call, redis_call); + ATTACH_UPROBE_CHECKED(skel, processCommand, redis_processCommand); + } + if(redis_stat){ + ATTACH_UPROBE_CHECKED(skel, lookupKey, redis_lookupKey); + ATTACH_UPROBE_CHECKED(skel, addReply, redis_addReply); + } return 0; } + +void print_top_5_keys() { + kv_pair *pairs; + pairs = malloc(sizeof(kv_pair) * 1024); + if (!pairs) { + perror("Failed to allocate memory"); + exit(EXIT_FAILURE); + } + int index = 0; + char *key = NULL; + while (bpf_map_get_next_key(map_fd, &key, &key) == 0) { + // fprintf(stdout, "next_sk: (%p)\n", sk); + int count; + int err = bpf_map_lookup_elem(map_fd, &key, &count); + if (err) { + fprintf(stderr, "Failed to read value from the conns map: (%s)\n", + strerror(errno)); + return ; + } + memcpy(pairs[index].key, &key, 256); + pairs[index].value = count; + //printf("Key: %s, Count: %u\n", pairs[index].key, pairs[index].value); + index++; + } + // 获取所有键值对 + + // 排序前 5 个元素 + // 简单选择排序(可替换为其他高效排序算法) + for (int i = 0; i < index - 1; i++) { + for (int j = i + 1; j < index; j++) { + if (pairs[j].value > pairs[i].value) { + kv_pair temp = pairs[i]; + pairs[i] = pairs[j]; + pairs[j] = temp; + } + } + } + printf("----------------------------\n"); + // 打印前 5 个元素 + printf("Top 5 Keys:\n"); + for (int i = 0; i < 5 && i < index; i++) { + printf("Key: %s, Count: %u\n", pairs[i].key, pairs[i].value); + } + free(pairs); +} int main(int argc, char **argv) { char *last_slash = strrchr(argv[0], '/'); if (last_slash) { @@ -1375,6 +1502,7 @@ int main(int argc, char **argv) { struct ring_buffer *trace_rb = NULL; struct ring_buffer *mysql_rb = NULL; struct ring_buffer *redis_rb = NULL; + struct ring_buffer *redis_stat_rb = NULL; struct ring_buffer *rtt_rb = NULL; struct ring_buffer *events = NULL; struct netwatcher_bpf *skel; @@ -1419,7 +1547,7 @@ int main(int argc, char **argv) { goto cleanup; } - } else if (redis_info) { + } else if (redis_info||redis_stat) { strcpy(binary_path, "/usr/bin/redis-server"); err = attach_uprobe_redis(skel); if (err) { @@ -1503,6 +1631,13 @@ int main(int argc, char **argv) { fprintf(stderr, "Failed to create ring buffer(trace)\n"); goto cleanup; } + redis_stat_rb = ring_buffer__new(bpf_map__fd(skel->maps.redis_stat_rb), print_redis_stat, + NULL, NULL); + if (!redis_stat_rb) { + err = -1; + fprintf(stderr, "Failed to create ring buffer(trace)\n"); + goto cleanup; + } rtt_rb = ring_buffer__new(bpf_map__fd(skel->maps.rtt_rb), print_rtt, NULL, NULL); if (!rtt_rb) { @@ -1542,6 +1677,7 @@ int main(int argc, char **argv) { err = ring_buffer__poll(redis_rb, 100 /* timeout, ms */); err = ring_buffer__poll(rtt_rb, 100 /* timeout, ms */); err = ring_buffer__poll(events, 100 /* timeout, ms */); + err = ring_buffer__poll(redis_stat_rb, 100 /* timeout, ms */); print_conns(skel); sleep(1); /* Ctrl-C will cause -EINTR */ @@ -1556,12 +1692,22 @@ int main(int argc, char **argv) { gettimeofday(&end, NULL); if ((end.tv_sec - start.tv_sec) >= 5) { - print_stored_events(); - printf("Total RSTs in the last 5 seconds: %llu\n\n", rst_count); + //print_stored_events(); + //printf("Total RSTs in the last 5 seconds: %llu\n\n", rst_count); // 重置计数器和事件存储 - rst_count = 0; - event_count = 0; + //rst_count = 0; + //event_count = 0; + if(redis_stat) + { + map_fd = bpf_map__fd(skel->maps.key_count); + + if (map_fd < 0) { + perror("Failed to get map FD"); + return 1; + } + print_top_5_keys(); + } gettimeofday(&start, NULL); } } @@ -1569,4 +1715,4 @@ int main(int argc, char **argv) { cleanup: netwatcher_bpf__destroy(skel); return err < 0 ? -err : 0; -} \ No newline at end of file +} diff --git a/eBPF_Supermarket/Network_Subsystem/net_watcher/netwatcher.h b/eBPF_Supermarket/Network_Subsystem/net_watcher/netwatcher.h index e7559671d..b512d952e 100644 --- a/eBPF_Supermarket/Network_Subsystem/net_watcher/netwatcher.h +++ b/eBPF_Supermarket/Network_Subsystem/net_watcher/netwatcher.h @@ -192,6 +192,14 @@ struct redis_query { u64 begin_time; int argc; }; +struct redis_stat_query { + int pid; + char comm[20]; + char key[20]; + int key_count; + char value[64]; + int value_type; +}; struct RTT { u32 saddr; diff --git a/eBPF_Supermarket/Network_Subsystem/net_watcher/redis.bpf.h b/eBPF_Supermarket/Network_Subsystem/net_watcher/redis.bpf.h index c3e319ce3..dc0aa533e 100644 --- a/eBPF_Supermarket/Network_Subsystem/net_watcher/redis.bpf.h +++ b/eBPF_Supermarket/Network_Subsystem/net_watcher/redis.bpf.h @@ -19,6 +19,7 @@ #include "redis_helper.bpf.h" #define MAXEPOLL 5 static __always_inline int __handle_redis_start(struct pt_regs *ctx) { + if(!redis_info) return 0; struct client *cli = (struct client *)PT_REGS_PARM1(ctx); struct redis_query start={}; void *ptr; @@ -43,6 +44,7 @@ static __always_inline int __handle_redis_start(struct pt_regs *ctx) { } static __always_inline int __handle_redis_end(struct pt_regs *ctx) { + if(!redis_info) return 0; pid_t pid = bpf_get_current_pid_tgid() >> 32; struct redis_query *start; u64 end_time = bpf_ktime_get_ns() / 1000; @@ -65,4 +67,101 @@ static __always_inline int __handle_redis_end(struct pt_regs *ctx) { message->duratime = end_time - start->begin_time; bpf_ringbuf_submit(message, 0); return 0; +} +static __always_inline int __handle_redis_key(struct pt_regs *ctx) { + if(!redis_stat) return 0; + robj *key_obj = (robj *)PT_REGS_PARM2(ctx); + char redis_key[256]; + u32 *count; + u32 initial_count = 1; + + if (!key_obj) + return 0; + + robj local_key_obj; + if (bpf_probe_read_user(&local_key_obj, sizeof(local_key_obj), key_obj) != 0) { + bpf_printk("Failed to read local_key_obj\n"); + return 0; + } + + if (!local_key_obj.ptr) { + bpf_printk("local_key_obj.ptr is null\n"); + return 0; + } + + int ret; + ret = bpf_probe_read_user_str(redis_key, sizeof(redis_key), local_key_obj.ptr); + if (ret <= 0) { + bpf_printk("Read string failed: %d\n", ret); + return 0; + } + + // 打印读取到的键值 + bpf_printk("Read key: %s\n", redis_key); + + // 查找或更新键的计数 + count = bpf_map_lookup_elem(&key_count, redis_key); + if (count) { + //bpf_printk("Found key, incrementing count\n"); + // 如果已经存在,增加计数值 + (*count)++; + bpf_map_update_elem(&key_count, redis_key, count, BPF_ANY); + } else { + //bpf_printk("Key not found, initializing count\n"); + // 如果不存在,初始化计数值为 1 + bpf_map_update_elem(&key_count, redis_key, &initial_count, BPF_ANY); + } + + // 打印调试信息 + struct redis_stat_query *message = bpf_ringbuf_reserve(&redis_stat_rb, sizeof(*message), 0); + if (!message) { + return 0; + } + message->pid=bpf_get_current_pid_tgid() >> 32; + bpf_get_current_comm(&message->comm, sizeof(message->comm)); + memcpy(message->key, redis_key, sizeof(message->key)); + message->key_count=count ? *count : initial_count; + message->value_type=0; + memset(message->value, 0, sizeof(message->value)); + bpf_printk("Key: %s\n", message->key); + bpf_printk("Count: %d\n", message->key_count); + bpf_ringbuf_submit(message, 0); + + return 0; +} +static __always_inline int __handle_redis_value(struct pt_regs *ctx) { + if(!redis_stat) return 0; + robj *key_obj = (robj *)PT_REGS_PARM2(ctx); + int ret; + char redis_value[64]; + if (!key_obj) + return 0; + robj local_key_obj; + if (bpf_probe_read_user(&local_key_obj, sizeof(local_key_obj), key_obj) != 0) { + bpf_printk("Failed to read local_key_obj\n"); + return 0; + } + if (!local_key_obj.ptr) { + bpf_printk("local_key_obj.ptr is null\n"); + return 0; + } + ret = bpf_probe_read_user_str(redis_value, sizeof(redis_value), local_key_obj.ptr); + if (ret <= 0) { + bpf_printk("Read string failed: %d\n", ret); + return 0; + } + struct redis_stat_query *message = bpf_ringbuf_reserve(&redis_stat_rb, sizeof(*message), 0); + if (!message) { + return 0; + } + message->pid=bpf_get_current_pid_tgid() >> 32; + bpf_get_current_comm(&message->comm, sizeof(message->comm)); + memset(message->key, 0, sizeof(message->key)); + message->key_count=0; + message->value_type=local_key_obj.type; + memcpy(message->value, redis_value, sizeof(message->value)); + bpf_printk("Value: %s\n", message->value); + bpf_printk("type: %d\n", message->value_type); + bpf_ringbuf_submit(message, 0); + return 0; } \ No newline at end of file