Skip to content

Commit 20244f2

Browse files
committed
改了比较丑陋的一版,准备再寻找一下优雅的方式
1 parent 65b5367 commit 20244f2

File tree

2 files changed

+123
-23
lines changed

2 files changed

+123
-23
lines changed

mq/mq_queue.py

+69-7
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
#encoding=utf-8
22
import time
3+
import uuid
34
import logging
45
import requests
56
import pika
@@ -48,6 +49,12 @@ def __init__(self,
4849
def __getattribute__(self, attr):
4950
"""每次使用connection,channel的时候都要检查一下是否已经断开连接了
5051
"""
52+
try:
53+
create_kk = object.__getattribute__(self, "create_kk")
54+
except AttributeError:
55+
create_kk = 0
56+
if create_kk:
57+
return object.__getattribute__(self, attr)
5158
if attr in ("connection", "channel"):
5259
try:
5360
connection = object.__getattribute__(self, "connection")
@@ -56,13 +63,30 @@ def __getattribute__(self, attr):
5663
try:
5764
connection.process_data_events()
5865
except (StreamLostError, ChannelWrongStateError, ValueError, TypeError):
59-
logging.debug("队列连接已断开或发生错误!")
66+
logging.info("队列连接已断开或发生错误!")
6067
if connection.is_closed or channel.is_closed:
6168
new_con = pika.BlockingConnection(connect_params)
6269
new_chan = new_con.channel()
6370
object.__setattr__(self, "connection", new_con)
6471
object.__setattr__(self, "channel", new_chan)
65-
logging.debug("重新连接了队列!")
72+
logging.info("重新连接了队列!")
73+
except AttributeError:
74+
pass
75+
elif attr in ("send_connection", "send_channel"):
76+
try:
77+
send_connection = object.__getattribute__(self, "send_connection")
78+
send_channel = object.__getattribute__(self, "send_channel")
79+
connect_params = object.__getattribute__(self, "connect_params")
80+
try:
81+
send_connection.process_data_events()
82+
except (StreamLostError, ChannelWrongStateError, ValueError, TypeError):
83+
logging.info("发送队列连接已断开或发生错误!")
84+
if send_connection.is_closed or send_channel.is_closed:
85+
new_send_con = pika.BlockingConnection(connect_params)
86+
new_send_channel = new_send_con.channel(channel_number=9)
87+
object.__setattr__(self, "send_connection", new_send_con)
88+
object.__setattr__(self, "send_channel", new_send_channel)
89+
logging.info("重新连接了发送队列!")
6690
except AttributeError:
6791
pass
6892
return object.__getattribute__(self, attr)
@@ -71,8 +95,12 @@ def __exit__(self, exc_type, exc_val, exc_tb):
7195
self.close_connection()
7296

7397
def create_connection(self):
98+
self.create_kk = 1
7499
self.connection = pika.BlockingConnection(self.connect_params)
75100
self.channel = self.connection.channel()
101+
self.send_connection = pika.BlockingConnection(self.connect_params)
102+
self.send_channel = self.send_connection.channel(channel_number=9)
103+
self.create_kk = 0
76104

77105
def getConnectionParam(self,
78106
host,
@@ -103,8 +131,32 @@ def getConnectionParam(self,
103131
params['port'] = port
104132
return pika.ConnectionParameters(**params)
105133

134+
def reconnect_queue_if_close(func):
135+
"""如果连接断了,重连一下
136+
"""
137+
138+
def ware(self, *args, **kwargs):
139+
try:
140+
self.connection.process_data_events()
141+
except (StreamLostError, ChannelWrongStateError, ValueError):
142+
logging.debug("队列连接已断开或发生错误!")
143+
pass
144+
145+
if self.connection.is_closed or self.channel.is_closed:
146+
logging.debug("重新连接了队列!")
147+
self.create_connection()
148+
149+
return func(self, *args, **kwargs)
150+
151+
return ware
152+
106153
def close_connection(self):
107-
self.connection.close()
154+
self.create_kk = 1
155+
if not self.send_connection.is_closed:
156+
self.send_connection.close()
157+
if not self.connection.is_closed:
158+
self.connection.close()
159+
self.create_kk = 0
108160

109161
def declare_exchange(self, exchange, **kwargs):
110162
exchange_type = kwargs.get('exchange_type', 'direct')
@@ -129,6 +181,10 @@ def declare_delay_exchange(self, exchange, **kwargs):
129181
exchange=exchange, exchange_type='x-delayed-message', durable=durable, arguments=arguments)
130182
except ChannelClosedByBroker as e:
131183
logging.warning(e)
184+
self.create_kk = 1
185+
self.connection.close()
186+
self.send_connection.close()
187+
self.create_kk = 0
132188
self.create_connection()
133189
self.channel.exchange_declare(
134190
exchange=exchange, exchange_type=exchange_type, durable=durable, arguments=arguments)
@@ -156,9 +212,11 @@ def declare_queue(self, queue, **kwargs):
156212
def delete_queue(self, queue):
157213
self.channel.queue_delete(queue=queue)
158214

215+
# @reconnect_queue_if_close
159216
def bind_exchange_queue(self, queue, exchange, binding_key=__default_routing_key):
160217
self.channel.queue_bind(queue=queue, exchange=exchange, routing_key=binding_key)
161218

219+
# @reconnect_queue_if_close
162220
def send(self, message, exchange, routing_key, **kwargs):
163221
"""
164222
kwargs 参数说明:
@@ -167,7 +225,7 @@ def send(self, message, exchange, routing_key, **kwargs):
167225
priority是消息优先级
168226
expiration是消息生存周期,与delay相冲突,如果使用了expiration,delay就无效了
169227
"""
170-
message_id = kwargs.get('message_id')
228+
message_id = kwargs.get('message_id') or str(uuid.uuid4())
171229
expiration = kwargs.get('expiration')
172230
close_connection = kwargs.get('close_connection', False)
173231
priority = kwargs.get('priority', 0)
@@ -185,30 +243,34 @@ def send(self, message, exchange, routing_key, **kwargs):
185243
headers = self.arg_set('x-delay', delay, headers)
186244

187245
property_params = self.arg_set('headers', headers, property_params)
188-
self.channel.basic_publish(
246+
self.send_channel.basic_publish(
189247
exchange=exchange,
190248
routing_key=routing_key,
191249
body=message,
192250
properties=pika.BasicProperties(**property_params))
193251
if close_connection:
194252
self.close_connection()
195253

254+
# @reconnect_queue_if_close
196255
def consume(self, queue, auto_ack=False, inactivity_timeout=None, prefetch_count=None):
197256
if prefetch_count:
198257
self.channel.basic_qos(prefetch_count=prefetch_count)
199258
result = self.channel.consume(queue, auto_ack, inactivity_timeout=inactivity_timeout)
200259
for nn in result:
201260
yield nn
202261

262+
# @reconnect_queue_if_close
203263
def run(self):
204264
try:
205265
self.channel.start_consuming()
206266
finally:
207267
self.channel.stop_consuming()
208268
self.close_connection()
209269

270+
# @reconnect_queue_if_close
210271
def ack_message(self, method):
211-
self.channel.basic_ack(delivery_tag=method.delivery_tag)
272+
return self.channel.basic_ack(delivery_tag=method.delivery_tag)
212273

274+
# @reconnect_queue_if_close
213275
def rej_message(self, method, requeue=False):
214-
self.channel.basic_reject(delivery_tag=method.delivery_tag, requeue=requeue)
276+
return self.channel.basic_reject(delivery_tag=method.delivery_tag, requeue=requeue)

test/test_rece_delay.py

+54-16
Original file line numberDiff line numberDiff line change
@@ -9,30 +9,68 @@
99
ROOT = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
1010
sys.path.insert(0, ROOT)
1111
from mq import MessageQueue
12+
"""测试监听接收延时信息
13+
"""
14+
15+
16+
def _notify_dingding(content, ding_urls):
17+
if not ding_urls:
18+
return None
19+
data = {
20+
'msgtype': 'text',
21+
'text': {
22+
'content': content,
23+
},
24+
}
25+
for ding_url in ding_urls:
26+
try:
27+
start = time.time()
28+
resp = requests.post(ding_url, json=data, timeout=5)
29+
except requests.exceptions.RequestException as ex:
30+
logging.exception("push dingtalk error: %s, params: %s", ex, data)
31+
return
32+
finally:
33+
end = time.time()
34+
logging.info("push dingtalk cost: %f, params: %s", end - start, data)
35+
ret = resp.json()
36+
logging.info("push dingtalk result: %s", resp.content)
37+
if ret['errcode'] != 0:
38+
logging.error("push dingtalk failed, response: %s, params: %s", resp.content, data)
1239

1340

1441
def process(body):
1542
in_data = json.loads(body)
1643
content = in_data.get('content', '')
17-
print(content)
44+
ding_urls = in_data.get('ding_urls', ())
45+
print(content, ding_urls)
46+
# _notify_dingding(content, ding_urls)
1847

1948

2049
def main():
21-
myqueue = MessageQueue(host='myhost', port='', mq_username='myuser', mq_password='mypasswd')
22-
myqueue.declare_delay_exchange('noti_chan_delay', exchange_type='direct')
23-
myqueue.declare_queue('noti_qu_delay')
24-
myqueue.bind_exchange_queue('noti_qu_delay', 'noti_chan_delay', binding_key="content_info")
25-
suke = myqueue.consume('noti_qu_delay')
26-
for method, properties, body in suke:
27-
try:
28-
process(body)
29-
except Exception as e:
30-
print(e)
31-
print("error")
32-
else:
33-
myqueue.ack_message(method)
34-
# myqueue.rej_message(method)
35-
myqueue.run()
50+
myqueue = MessageQueue(host='kangfuzi05', port='', mq_username='yangtao', mq_password='yangtao')
51+
myqueue.declare_delay_exchange('notify_dingding_delay', exchange_type='direct')
52+
myqueue.declare_queue('noti_delay')
53+
# myqueue.connection.close()
54+
# myqueue.channel.close()
55+
myqueue.bind_exchange_queue('noti_delay', 'notify_dingding_delay', binding_key="content_info")
56+
while 1:
57+
suke = myqueue.consume('noti_delay')
58+
for method, properties, body in suke:
59+
try:
60+
process(body)
61+
except Exception as e:
62+
print(e)
63+
print("error")
64+
else:
65+
myqueue.ack_message(method)
66+
# myqueue.rej_message(method)
67+
import random
68+
if random.random() <= 0.1:
69+
myqueue.create_kk = 1
70+
myqueue.connection.close()
71+
myqueue.create_kk = 0
72+
print("结束了!")
73+
myqueue.run()
3674

3775

3876
if __name__ == "__main__":

0 commit comments

Comments
 (0)