Skip to content

Commit

Permalink
Merge pull request #890 from zhangxianyu777/develop
Browse files Browse the repository at this point in the history
netwatcher:add the redis and stat the key
  • Loading branch information
LinkinPF committed Sep 13, 2024
2 parents c6c1ee9 + 7f9050b commit 2471bf2
Show file tree
Hide file tree
Showing 5 changed files with 292 additions and 16 deletions.
17 changes: 15 additions & 2 deletions eBPF_Supermarket/Network_Subsystem/net_watcher/common.bpf.h
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,11 @@ struct {
__uint(max_entries, 256 * 1024);
} trace_rb SEC(".maps");

struct {
__uint(type, BPF_MAP_TYPE_RINGBUF);
__uint(max_entries, 256 * 1024);
} redis_stat_rb SEC(".maps");

struct {
__uint(type, BPF_MAP_TYPE_RINGBUF);
__uint(max_entries, 256 * 1024);
Expand Down Expand Up @@ -352,13 +357,21 @@ struct {
__type(value, u64);
__uint(max_entries, 1024);
} counters SEC(".maps");

struct {
__uint(type, BPF_MAP_TYPE_HASH);
__type(key, char*); // 键的最大长度,假设为 256 字节
__type(value, u32); // 计数值
__uint(max_entries, 1024); // 最大条目数
} key_count SEC(".maps");

const volatile int filter_dport = 0;
const volatile int filter_sport = 0;
const volatile int all_conn = 0, err_packet = 0, extra_conn_info = 0,
layer_time = 0, http_info = 0, retrans_info = 0,
udp_info = 0, net_filter = 0, drop_reason = 0, icmp_info = 0,
tcp_info = 0, dns_info = 0, stack_info = 0, mysql_info = 0,
redis_info = 0, rtt_info = 0, rst_info = 0;
redis_info = 0, rtt_info = 0, rst_info = 0, redis_stat = 0;

/* help macro */

Expand Down Expand Up @@ -633,4 +646,4 @@ static __always_inline u64 log2l(u64 v) {
}
/* help functions end */

#endif
#endif
14 changes: 12 additions & 2 deletions eBPF_Supermarket/Network_Subsystem/net_watcher/netwatcher.bpf.c
Original file line number Diff line number Diff line change
Expand Up @@ -314,10 +314,20 @@ int BPF_KPROBE(query__start) { return __handle_mysql_start(ctx); }
SEC("uretprobe/_Z16dispatch_commandP3THDPK8COM_DATA19enum_server_command")
int BPF_KPROBE(query__end) { return __handle_mysql_end(ctx); }

//redis
SEC("uprobe/processCommand")
int BPF_KPROBE(query__start_redis_process) { return __handle_redis_start(ctx); }
int BPF_KPROBE(redis_processCommand) { return __handle_redis_start(ctx); }
SEC("uretprobe/call")
int BPF_KPROBE(query__end_redis) { return __handle_redis_end(ctx); }
int BPF_KPROBE(redis_call) { return __handle_redis_end(ctx); }

SEC("uprobe/lookupKey")
int BPF_UPROBE(redis_lookupKey) {
return __handle_redis_key(ctx);
}
SEC("uprobe/addReply")
int BPF_UPROBE(redis_addReply) {
return __handle_redis_value(ctx);
}
// rtt
SEC("kprobe/tcp_rcv_established")
int BPF_KPROBE(tcp_rcv_established, struct sock *sk, struct sk_buff *skb) {
Expand Down
170 changes: 158 additions & 12 deletions eBPF_Supermarket/Network_Subsystem/net_watcher/netwatcher.c
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,21 @@ static char err_file_path[1024];
static char packets_file_path[1024];
static char udp_file_path[1024];

// 用于存储从 eBPF map 读取的数据
typedef struct {
char key[256];
u32 value;
} kv_pair;

static int map_fd;

static int sport = 0, dport = 0; // for filter
static int all_conn = 0, err_packet = 0, extra_conn_info = 0, layer_time = 0,
http_info = 0, retrans_info = 0, udp_info = 0, net_filter = 0,
drop_reason = 0, addr_to_func = 0, icmp_info = 0, tcp_info = 0,
time_load = 0, dns_info = 0, stack_info = 0, mysql_info = 0,
redis_info = 0, count_info = 0, rtt_info = 0, rst_info = 0; // flag
redis_info = 0, count_info = 0, rtt_info = 0, rst_info = 0,
redis_stat = 0; // flag

static const char *tcp_states[] = {
[1] = "ESTABLISHED", [2] = "SYN_SENT", [3] = "SYN_RECV",
Expand Down Expand Up @@ -81,6 +90,7 @@ static const struct argp_option opts[] = {
"set to trace mysql information info include Pid 进程id、Comm "
"进程名、Size sql语句字节大小、Sql 语句"},
{"redis", 'R', 0, 0},
{"redis-stat", 'b', 0, 0},
{"count", 'C', "NUMBER", 0,
"specify the time to count the number of requests"},
{"rtt", 'T', 0, 0, "set to trace rtt"},
Expand Down Expand Up @@ -153,6 +163,9 @@ static error_t parse_arg(int key, char *arg, struct argp_state *state) {
case 'U':
rst_info = 1;
break;
case 'b':
redis_stat = 1;
break;
case 'C':
count_info = strtoul(arg, &end, 10);
break;
Expand Down Expand Up @@ -182,6 +195,7 @@ enum MonitorMode {
MODE_DNS,
MODE_MYSQL,
MODE_REDIS,
MODE_REDIS_STAT,
MODE_RTT,
MODE_RST,
MODE_DEFAULT
Expand All @@ -204,6 +218,8 @@ enum MonitorMode get_monitor_mode() {
return MODE_MYSQL;
} else if (redis_info) {
return MODE_REDIS;
} else if (redis_stat) {
return MODE_REDIS_STAT;
} else if (rtt_info) {
return MODE_RTT;
} else if (rst_info) {
Expand Down Expand Up @@ -441,6 +457,7 @@ static void set_rodata_flags(struct netwatcher_bpf *skel) {
skel->rodata->stack_info = stack_info;
skel->rodata->mysql_info = mysql_info;
skel->rodata->redis_info = redis_info;
skel->rodata->redis_stat = redis_stat;
skel->rodata->rtt_info = rtt_info;
skel->rodata->rst_info = rst_info;
}
Expand Down Expand Up @@ -599,9 +616,13 @@ static void set_disable_load(struct netwatcher_bpf *skel) {
mysql_info ? true : false);
bpf_program__set_autoload(skel->progs.query__end,
mysql_info ? true : false);
bpf_program__set_autoload(skel->progs.query__end_redis,
bpf_program__set_autoload(skel->progs.redis_addReply,
redis_stat ? true : false);
bpf_program__set_autoload(skel->progs.redis_lookupKey,
redis_stat ? true : false);
bpf_program__set_autoload(skel->progs.redis_processCommand,
redis_info ? true : false);
bpf_program__set_autoload(skel->progs.query__start_redis_process,
bpf_program__set_autoload(skel->progs.redis_call,
redis_info ? true : false);
bpf_program__set_autoload(skel->progs.tcp_rcv_established,
(all_conn || err_packet || extra_conn_info ||
Expand Down Expand Up @@ -682,6 +703,13 @@ static void print_header(enum MonitorMode mode) {
printf("%-20s %-20s %-20s %-20s %-20s \n", "Pid", "Comm", "Size",
"Redis", "duration/μs");
break;
case MODE_REDIS_STAT:
printf("==============================================================="
"====================REDIS "
"INFORMATION===================================================="
"============================\n");
printf("%-20s %-20s %-20s %-20s %-20s %-20s\n", "Pid", "Comm", "key", "Key_count","Value_Type","Value");
break;
case MODE_RTT:
printf("==============================================================="
"====================RTT "
Expand Down Expand Up @@ -844,7 +872,7 @@ static int print_conns(struct netwatcher_bpf *skel) {

static int print_packet(void *ctx, void *packet_info, size_t size) {
if (udp_info || net_filter || drop_reason || icmp_info || tcp_info ||
dns_info || mysql_info || redis_info || rtt_info)
dns_info || mysql_info || redis_info || redis_info || rtt_info )
return 0;
const struct pack_t *pack_info = packet_info;
if (pack_info->mac_time > MAXTIME || pack_info->ip_time > MAXTIME ||
Expand Down Expand Up @@ -1246,6 +1274,54 @@ static int print_redis(void *ctx, void *packet_info, size_t size) {
strcpy(redis, "");
return 0;
}
static int process_redis_first(char flag,char *message) {
if(flag=='+')
{
strcpy(message, "Status Reply");
}
else if (flag=='-')
{
strcpy(message, "Error Reply");
}
else if (flag==':')
{
strcpy(message, "Integer Reply");
}
else if (flag=='$')
{
strcpy(message, "Bulk String Reply");
}
else if (flag=='*')
{
strcpy(message, "Array Reply");
}
else{
strcpy(message, "Unknown Type");
}
return 0;
}

static int print_redis_stat(void *ctx, void *packet_info, size_t size) {
if (!redis_stat) {
return 0;
}
char message[20]={};
const struct redis_stat_query *pack_info = packet_info;
if(pack_info->key_count)
{
printf("%-20d %-20s %-20s %-20d %-20s %-20s\n", pack_info->pid, pack_info->comm,
pack_info->key,pack_info->key_count,"-","-");
}
else
{
process_redis_first(pack_info->value[0],message);
printf("%-20d %-20s %-20s %-20s %-20s %-20s\n", pack_info->pid, pack_info->comm,
"-","-",message,pack_info->value);
}

return 0;
}

static int libbpf_print_fn(enum libbpf_print_level level, const char *format,
va_list args) {
return vfprintf(stderr, format, args);
Expand Down Expand Up @@ -1348,10 +1424,61 @@ int attach_uprobe_mysql(struct netwatcher_bpf *skel) {
return 0;
}
int attach_uprobe_redis(struct netwatcher_bpf *skel) {
ATTACH_UPROBE_CHECKED(skel, call, query__end_redis);
ATTACH_UPROBE_CHECKED(skel, processCommand, query__start_redis_process);
if(redis_info){
ATTACH_UPROBE_CHECKED(skel, call, redis_call);
ATTACH_UPROBE_CHECKED(skel, processCommand, redis_processCommand);
}
if(redis_stat){
ATTACH_UPROBE_CHECKED(skel, lookupKey, redis_lookupKey);
ATTACH_UPROBE_CHECKED(skel, addReply, redis_addReply);
}
return 0;
}

void print_top_5_keys() {
kv_pair *pairs;
pairs = malloc(sizeof(kv_pair) * 1024);
if (!pairs) {
perror("Failed to allocate memory");
exit(EXIT_FAILURE);
}
int index = 0;
char *key = NULL;
while (bpf_map_get_next_key(map_fd, &key, &key) == 0) {
// fprintf(stdout, "next_sk: (%p)\n", sk);
int count;
int err = bpf_map_lookup_elem(map_fd, &key, &count);
if (err) {
fprintf(stderr, "Failed to read value from the conns map: (%s)\n",
strerror(errno));
return ;
}
memcpy(pairs[index].key, &key, 256);
pairs[index].value = count;
//printf("Key: %s, Count: %u\n", pairs[index].key, pairs[index].value);
index++;
}
// 获取所有键值对

// 排序前 5 个元素
// 简单选择排序(可替换为其他高效排序算法)
for (int i = 0; i < index - 1; i++) {
for (int j = i + 1; j < index; j++) {
if (pairs[j].value > pairs[i].value) {
kv_pair temp = pairs[i];
pairs[i] = pairs[j];
pairs[j] = temp;
}
}
}
printf("----------------------------\n");
// 打印前 5 个元素
printf("Top 5 Keys:\n");
for (int i = 0; i < 5 && i < index; i++) {
printf("Key: %s, Count: %u\n", pairs[i].key, pairs[i].value);
}
free(pairs);
}
int main(int argc, char **argv) {
char *last_slash = strrchr(argv[0], '/');
if (last_slash) {
Expand All @@ -1375,6 +1502,7 @@ int main(int argc, char **argv) {
struct ring_buffer *trace_rb = NULL;
struct ring_buffer *mysql_rb = NULL;
struct ring_buffer *redis_rb = NULL;
struct ring_buffer *redis_stat_rb = NULL;
struct ring_buffer *rtt_rb = NULL;
struct ring_buffer *events = NULL;
struct netwatcher_bpf *skel;
Expand Down Expand Up @@ -1419,7 +1547,7 @@ int main(int argc, char **argv) {

goto cleanup;
}
} else if (redis_info) {
} else if (redis_info||redis_stat) {
strcpy(binary_path, "/usr/bin/redis-server");
err = attach_uprobe_redis(skel);
if (err) {
Expand Down Expand Up @@ -1503,6 +1631,13 @@ int main(int argc, char **argv) {
fprintf(stderr, "Failed to create ring buffer(trace)\n");
goto cleanup;
}
redis_stat_rb = ring_buffer__new(bpf_map__fd(skel->maps.redis_stat_rb), print_redis_stat,
NULL, NULL);
if (!redis_stat_rb) {
err = -1;
fprintf(stderr, "Failed to create ring buffer(trace)\n");
goto cleanup;
}
rtt_rb =
ring_buffer__new(bpf_map__fd(skel->maps.rtt_rb), print_rtt, NULL, NULL);
if (!rtt_rb) {
Expand Down Expand Up @@ -1542,6 +1677,7 @@ int main(int argc, char **argv) {
err = ring_buffer__poll(redis_rb, 100 /* timeout, ms */);
err = ring_buffer__poll(rtt_rb, 100 /* timeout, ms */);
err = ring_buffer__poll(events, 100 /* timeout, ms */);
err = ring_buffer__poll(redis_stat_rb, 100 /* timeout, ms */);
print_conns(skel);
sleep(1);
/* Ctrl-C will cause -EINTR */
Expand All @@ -1556,17 +1692,27 @@ int main(int argc, char **argv) {

gettimeofday(&end, NULL);
if ((end.tv_sec - start.tv_sec) >= 5) {
print_stored_events();
printf("Total RSTs in the last 5 seconds: %llu\n\n", rst_count);
//print_stored_events();
//printf("Total RSTs in the last 5 seconds: %llu\n\n", rst_count);

// 重置计数器和事件存储
rst_count = 0;
event_count = 0;
//rst_count = 0;
//event_count = 0;
if(redis_stat)
{
map_fd = bpf_map__fd(skel->maps.key_count);

if (map_fd < 0) {
perror("Failed to get map FD");
return 1;
}
print_top_5_keys();
}
gettimeofday(&start, NULL);
}
}

cleanup:
netwatcher_bpf__destroy(skel);
return err < 0 ? -err : 0;
}
}
8 changes: 8 additions & 0 deletions eBPF_Supermarket/Network_Subsystem/net_watcher/netwatcher.h
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,14 @@ struct redis_query {
u64 begin_time;
int argc;
};
struct redis_stat_query {
int pid;
char comm[20];
char key[20];
int key_count;
char value[64];
int value_type;
};

struct RTT {
u32 saddr;
Expand Down
Loading

0 comments on commit 2471bf2

Please sign in to comment.