Skip to content

Commit 8f416fd

Browse files
author
jiawenhao
committed
RabbitMQ消息发送和消费
1 parent fc6be80 commit 8f416fd

File tree

1 file changed

+186
-0
lines changed

1 file changed

+186
-0
lines changed

Diff for: _posts/2021-07-21-RabbitMQ.md

+186
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,186 @@
1+
---
2+
layout: post
3+
title: RabbitMQ消息的发送和消费
4+
subtitle: RabbitMQ
5+
date: 2021-07-21
6+
author: HH
7+
header-img: img/post-bg-debug.png
8+
catalog: true
9+
tags:
10+
- RabbitMQ
11+
---
12+
## RabbitMQ
13+
14+
### 一、初始化队列
15+
16+
```shell
17+
rabbitmqctl add_vhost /path
18+
rabbitmqctl add_user name password
19+
rabbitmqctl set_permissions -p /path name ".*" ".*" ".*"
20+
rabbitmqctl set_policy ha-two ".*_#ha#_" '{"ha-mode":"exactly","ha-params":2,"ha-sync-mode":"automatic"}' -p /path
21+
##历史遗留问题,导致部分没有带#ha#的队列也需要做mirror
22+
rabbitmqctl set_policy ha-cluster-2 "^#cluster#_" '{"ha-mode":"exactly","ha-params":2,"ha-sync-mode":"automatic"}' -p /path
23+
```
24+
25+
### 二、时间key命名
26+
27+
- com.系统名.一级模块.二级模块.事件名称/三级模块名.事件名称(子事件名称).事件结果
28+
- 子事件没有时,默认使用“default”作为占位符替代
29+
- 事件结果,可自行定义
30+
31+
### 三、如何使用
32+
33+
#### 1.产生事件
34+
35+
```
36+
//创建发送者
37+
EventSender es = EventCenterFactory.createSender(new String[]{"127.0.0.1"},EventPriority.NORMAL);
38+
//异步发送-data为json string
39+
es.asyncPub("com.lemonhh.status.server.signup.email.success",data);
40+
es.syncPub("com.lemonhh.status.server.signup.email.success",data);
41+
```
42+
43+
- 发送方式区别
44+
- 异步发送:交给后台线程发送,发送者不关心发送结果。事件有丢失的可能。
45+
- 同步发送:发送失败即可返回,发送者需要根据返回结果自行处理事件发送失败的后续事宜。
46+
- 事件级别
47+
- 事件定义了3个级别:NONE/NORMAL/HIGH。默认情况下使用NORMAL。
48+
- 如果事件时消息量比较大的那种,比如:实时股价信息、日志等,请使用NONE,以减轻服务器负担。
49+
50+
#### 2.消费事件
51+
52+
```
53+
//创建pull方式的接收者
54+
PullEventReceiver pullEventReceiver = EventCenterFactory.createPullReceiver(new String[]{"192.168.1.11"},"queue_name",HaLevel.NONE,false);
55+
//绑定要监听的事件
56+
pullEventReceriver.binding(new String[]{"com.lemonhh.#"});
57+
//pull方式接收者获取事件,方法会阻塞。通过循环方式获取多条消息
58+
Event e = pullEventReceiver.next();
59+
//push方式接收者获取事件
60+
pushEventReceiver.binding(new String[]{"com.lemonhh.#"},new PushProcessor(){
61+
public void process(Event e){
62+
System.out.println("Push : " + e)
63+
}
64+
})
65+
```
66+
67+
- 两种接收方式
68+
- push方式:使用方便。实际上如果不设置channel.basicQos(1),那么broker端会一次推送多条数据
69+
RabbitMQ的每一数据帧(Frame)都是以0xCE结尾。
70+
71+
| 请求来源source | 请求目的地destination | 请求信息info | 备注remarks |
72+
| -------------- | --------------------- | ------------------------------------------------------------ | ------------------------------------------------------------ |
73+
| client | server | Protocol-Header 0-9-1 | 发送请求 |
74+
| server | client | Connection.Start | 服务端开始建立连接 |
75+
| client | server | Connection.Start-Ok | 回复Ok |
76+
| server | client | Connection.Tune | 服务端确认客户端状态 |
77+
| client | server | Connection.Tune-Ok | 回复Ok |
78+
| client | server | Connection.Open vhost=/ | vhost像是虚拟机,用于资源隔离,拥有自己的交换机、队列、绑定等。调整好后,客户端主动请求将start状态变为opening状态 |
79+
| server | client | Connection.Open-Ok | 回复Ok |
80+
| client | server | Basic.Qos | |
81+
| server | client | Basic.Qos-Ok | |
82+
| client | server | Basic.Consume q=queue.pk | 指定订阅的队列 |
83+
| server | client | Basic.Consume-Ok Basic.Deliver x=exchangePk rk=pk Content-Header type=text/plain Content-Body... | Exchange, routing-key,Content-Header |
84+
| client | server | Basic.Ack | 收到信息 |
85+
| client | server | Channel.Close reply=ok | 关闭Channel,回复Ok |
86+
| server | client | Basic.Deliver x=exchangePk rk=pk Content-Header type=text/plain Content-Body | |
87+
| server | client | Channel.Close-ok | |
88+
| client | server | Connection.Close reply=ok | |
89+
| server | client | Connection.Close-OK | |
90+
91+
- pull方式:可以设置等待超时,适合非阻塞下的场景。
92+
93+
| 请求来源source | 请求目的地destination | 请求信息info | 备注remarks |
94+
| -------------- | --------------------- | ------------------------------------------------------------ | ------------------------------------------------------------ |
95+
| client | server | Protocol-Header 0-9-1 | 发送请求 |
96+
| server | client | Connection.Start | 服务端开始建立连接 |
97+
| client | server | Connection.Start-Ok | 回复Ok |
98+
| server | client | Connection.Tune | 服务端确认客户端状态 |
99+
| client | server | Connection.Tune-Ok | 回复Ok |
100+
| client | server | Connection.Open vhost=/ | vhost像是虚拟机,用于资源隔离,拥有自己的交换机、队列、绑定等。调整好后,客户端主动请求将start状态变为opening状态 |
101+
| server | client | Connection.Open-Ok | 回复Ok |
102+
| client | server | Basic.Get q=queue.pk | |
103+
| server | client | Basic.Get-Ok x=exchangePk rk=pk Content-Header type=text/plain Content-Body... | |
104+
| client | server | Basic.Ack | 收到信息 |
105+
| clientclient | serverserver | Channel.Close reply=ok | 关闭Channel,回复Ok |
106+
| serverserver | clientclient | Basic.Deliver x=exchangePk rk=pk Content-Header type=text/plain Content-Body | |
107+
| serverclient | clientserver | Channel.Close-ok | |
108+
| clientserver | serverclient | Connection.Close reply=ok | |
109+
110+
111+
112+
- 接收者HA级别
113+
- 级别总共有3个,NONE/NORMAL/HIGH
114+
- NONE:客户端连接中断到再次链接这段时间的消息会丢失。
115+
- NORMAL:客户端连接中断到再次链接这段时间的消息能够保存
116+
- HIGH:事件做复制,事件服务器损坏一台不丢失消息。
117+
- 如无特殊要求,尽量使用NONE级别。
118+
- cluster配置
119+
- 配置为cluster方式,标记会有多个接收者一起消费事件
120+
121+
### 四、使用方法
122+
123+
#### 1.mq的使用都是基于eventcenter,从使用上看
124+
125+
- 事件生产者
126+
- 事件的发送主要封装在QueueMessageService和EventCenterService两个类中,实际都是调用EventSender。
127+
- EventSender是EventCenter提供的事件发送器,提供:
128+
- 发送机制:同步发送和异步发送
129+
- 消息级别:NONE/NORMAL/HIGH
130+
- 不同级别消息对应的特性
131+
132+
| Level | Durable | Priority |
133+
| ------ | ------- | -------- |
134+
| NONE | false | 1 |
135+
| NORMAL | true | 3 |
136+
| HIGH | true | 5 |
137+
138+
- 事件消费者
139+
- 事件的消费主要封装在MQReceive和EventCenterFactory两个类中,实际都是调用EventCenterFactory中的不同方法
140+
- EventConsumer的具体存在形式为EventReceiver
141+
- 消费机制:拉模式和推模式,具体为:PullEventReceiver/PushEventReceiver
142+
- 消费者HA界别:NONE/NORMAL/HIGH
143+
- 消息ack机制:自动和手动可配置
144+
- 不同界别对应的特性
145+
146+
| level | durable | exclusive | autoDelete |
147+
| ------ | ------- | --------- | ---------- |
148+
| NONE | 30min | false | false |
149+
| NORMAL | 3d | false | false |
150+
| HIGH | 3d | false | false |
151+
152+
注:如果是ifCluster = true,durable为一周
153+
154+
#### 2.具体使用情况
155+
156+
- 事件生产者,即EventSender的使用方式基本一致。
157+
- 事件消费者,即EventReceiver的使用,目前接触的业务都是采用的PushEventReceiver的方式
158+
159+
| queue | exchange | routing_key | event_level | priority | ha_level | durable | autoAck | consumer_service |
160+
| --------------------------------- | ---------------- | ------------------------------------------------------------ | ----------- | -------- | -------- | ------- | ------- | --------------------------------- |
161+
| async_lemonhh _status_thread | ex_default_topic | com.lemonhh.status.addComment.addReceiveTimeline | NORMAL | 3 | HIGH | true | false | lemonhh -status-thread |
162+
| lemonhh_mq_timeline | ex_default_topic | com.lemonhh.mq.timeline | NORMAL | 3 | NORMAL | true | true | lemonhh -thread-consumer-timeline |
163+
| replyTimelineCommentRankRefresher | ex_default_topic | com.lemonhh.xommunity.status.replytimeline.commentsrank.cache.refresh | NORMAL | 3 | HIGH | true | true | lemonhh -thread-consumer-status |
164+
165+
消费消息需要confirm,即如果consumer没有手动ack,那么队列不会删除对应的event,在consumer连接断开后,mq会重新分配event到其他的consumer。
166+
167+
ha_level为HIGH,表示mq server挂掉之后,queue中的数据会被持久化,并保留3天。
168+
169+
#### 3.存在问题
170+
171+
- consumer进程不可用时
172+
173+
- 如果autoAck=true,即自动confirm机制,event分配后即从queue中删除,如果consumer被kill,event丢失。
174+
- 如果autoAck=false,即手动confirm机制,失败的event会重新分配,event不会丢失
175+
176+
- consumer被kill时,event的process过程如果未完成
177+
178+
- 如果autoAck=true,即自动confirm机制,可能处理操作无法完成。
179+
- 如果autoAck=false,即手动confirm机制,失败的event会重新分配,有些过程会重新处理一遍。
180+
181+
- 解决方法
182+
183+
- 结合业务特征,event的重要程度,选择是否设置手动confirm
184+
- 选择合适的任务调度系统,保证consumer-thread对每一个event的完成处理,平稳,安全升级。
185+
186+

0 commit comments

Comments
 (0)