|
| 1 | +#encoding=utf-8 |
| 2 | +import time |
| 3 | +import logging |
| 4 | +import requests |
| 5 | +import pika |
| 6 | +import json |
| 7 | +from pika.exceptions import StreamLostError, ChannelWrongStateError, ChannelClosedByBroker |
| 8 | +from .credentials import AliyunCredentialsProvider |
| 9 | + |
| 10 | +# https://github.com/suparek/RabbitMQClient/blob/master/rabbitmqclient.py |
| 11 | + |
| 12 | + |
| 13 | +class MyQueue(object): |
| 14 | + def __init__(self, queue_name): |
| 15 | + self.queue_name = queue_name |
| 16 | + |
| 17 | + def get_items(self, num, status): |
| 18 | + raise NotImplementedError |
| 19 | + |
| 20 | + def ack(self, id, status): |
| 21 | + raise NotImplementedError |
| 22 | + |
| 23 | + def add(self, status, payload, priority=0): |
| 24 | + raise NotImplementedError |
| 25 | + |
| 26 | + |
| 27 | +class RabbitQueue(MyQueue): |
| 28 | + """用rabbitmq生成的queue |
| 29 | + """ |
| 30 | + |
| 31 | + __default_routing_key = "#" |
| 32 | + __default_exchange_type = "direct" |
| 33 | + |
| 34 | + def __init__(self, |
| 35 | + host, |
| 36 | + port, |
| 37 | + mq_username="", |
| 38 | + mq_password="", |
| 39 | + virtualHost="", |
| 40 | + accessKey="", |
| 41 | + accessSecret="", |
| 42 | + instanceId="", |
| 43 | + heartbeat=None): |
| 44 | + self.connect_params = self.getConnectionParam(host, port, mq_username, mq_password, virtualHost, accessKey, |
| 45 | + accessSecret, instanceId, heartbeat) |
| 46 | + self.create_connection() |
| 47 | + |
| 48 | + def __exit__(self, exc_type, exc_val, exc_tb): |
| 49 | + self.close_connection() |
| 50 | + |
| 51 | + def create_connection(self): |
| 52 | + self.connection = pika.BlockingConnection(self.connect_params) |
| 53 | + self.channel = self.connection.channel() |
| 54 | + |
| 55 | + def getConnectionParam(self, |
| 56 | + host, |
| 57 | + port, |
| 58 | + mq_username="", |
| 59 | + mq_password="", |
| 60 | + virtualHost="", |
| 61 | + accessKey="", |
| 62 | + accessSecret="", |
| 63 | + instanceId="", |
| 64 | + heartbeat=None): |
| 65 | + if accessKey: |
| 66 | + provider = AliyunCredentialsProvider(accessKey, accessSecret, instanceId) |
| 67 | + credentials_param = pika.PlainCredentials( |
| 68 | + provider.get_username(), provider.get_password(), erase_on_connect=True) |
| 69 | + params = {"host": host, "credentials": credentials_param, "heartbeat": heartbeat} |
| 70 | + if virtualHost: |
| 71 | + params['virtual_host'] = virtualHost |
| 72 | + if port: |
| 73 | + params['port'] = port |
| 74 | + return pika.ConnectionParameters(**params) |
| 75 | + else: |
| 76 | + params = {"host": host, "heartbeat": heartbeat} |
| 77 | + if mq_username: |
| 78 | + credentials_param = pika.PlainCredentials(mq_username, mq_password, erase_on_connect=True) |
| 79 | + params["credentials"] = credentials_param |
| 80 | + if port: |
| 81 | + params['port'] = port |
| 82 | + return pika.ConnectionParameters(**params) |
| 83 | + |
| 84 | + def reconnect_queue_if_close(func): |
| 85 | + """如果连接断了,重连一下 |
| 86 | + """ |
| 87 | + |
| 88 | + def ware(self, *args, **kwargs): |
| 89 | + try: |
| 90 | + self.connection.process_data_events() |
| 91 | + except (StreamLostError, ChannelWrongStateError): |
| 92 | + logging.debug("队列连接已断开或发生错误!") |
| 93 | + pass |
| 94 | + |
| 95 | + if self.connection.is_closed or self.channel.is_closed: |
| 96 | + logging.debug("重新连接了队列!") |
| 97 | + self.create_connection() |
| 98 | + |
| 99 | + return func(self, *args, **kwargs) |
| 100 | + |
| 101 | + return ware |
| 102 | + |
| 103 | + def close_connection(self): |
| 104 | + self.connection.close() |
| 105 | + |
| 106 | + def declare_exchange(self, exchange, **kwargs): |
| 107 | + exchange_type = kwargs.get('exchange_type', 'direct') |
| 108 | + durable = kwargs.get('durable', True) |
| 109 | + x_delayed_type = kwargs.get('x-delayed-type') |
| 110 | + arguments = {} |
| 111 | + arguments = self.arg_set('x-delayed-type', x_delayed_type, arguments) |
| 112 | + self.channel.exchange_declare( |
| 113 | + exchange=exchange, exchange_type=exchange_type, durable=durable, arguments=arguments) |
| 114 | + |
| 115 | + def declare_delay_exchange(self, exchange, **kwargs): |
| 116 | + """ |
| 117 | + 延时队列声明,兼容 rabbitmq_delayed_message_exchange 和 阿里云 |
| 118 | + 如果要使用延时消息发送,请使用 这个来声明队列 |
| 119 | + """ |
| 120 | + exchange_type = kwargs.get('exchange_type', 'direct') |
| 121 | + durable = kwargs.get('durable', True) |
| 122 | + arguments = {} |
| 123 | + arguments = self.arg_set('x-delayed-type', exchange_type, arguments) |
| 124 | + try: |
| 125 | + self.channel.exchange_declare( |
| 126 | + exchange=exchange, exchange_type='x-delayed-message', durable=durable, arguments=arguments) |
| 127 | + except ChannelClosedByBroker as e: |
| 128 | + logging.warning(e) |
| 129 | + self.create_connection() |
| 130 | + self.channel.exchange_declare( |
| 131 | + exchange=exchange, exchange_type=exchange_type, durable=durable, arguments=arguments) |
| 132 | + |
| 133 | + def delete_exchange(self, exchange): |
| 134 | + self.channel.exchange_delete(exchange=exchange) |
| 135 | + |
| 136 | + def arg_set(self, key, value, arguments): |
| 137 | + if value: |
| 138 | + arguments[key] = value |
| 139 | + return arguments |
| 140 | + |
| 141 | + def declare_queue(self, queue, **kwargs): |
| 142 | + durable = kwargs.get('durable', True) |
| 143 | + priority = kwargs.get('priority', 10) |
| 144 | + ttl_milseconds = kwargs.get('ttl_milseconds') |
| 145 | + dead_letter_exchange = kwargs.get('dead_letter_exchange') |
| 146 | + dead_letter_routing_key = kwargs.get('dead_letter_routing_key') |
| 147 | + arguments = {'x-max-priority': priority} |
| 148 | + arguments = self.arg_set('x-message-ttl', ttl_milseconds, arguments) |
| 149 | + arguments = self.arg_set('x-dead-letter-exchange', dead_letter_exchange, arguments) |
| 150 | + arguments = self.arg_set('x-dead-letter-routing-key', dead_letter_routing_key, arguments) |
| 151 | + self.channel.queue_declare(queue=queue, durable=durable, arguments=arguments) |
| 152 | + |
| 153 | + def delete_queue(self, queue): |
| 154 | + self.channel.queue_delete(queue=queue) |
| 155 | + |
| 156 | + def bind_exchange_queue(self, queue, exchange, binding_key=__default_routing_key): |
| 157 | + self.channel.queue_bind(queue=queue, exchange=exchange, routing_key=binding_key) |
| 158 | + |
| 159 | + @reconnect_queue_if_close |
| 160 | + def send(self, message, exchange, routing_key, **kwargs): |
| 161 | + """ |
| 162 | + kwargs 参数说明: |
| 163 | + message_id是用户指定的消息id,为空则使用系统自动生成的。 |
| 164 | + delay是延时消息参数,单位是ms |
| 165 | + priority是消息优先级 |
| 166 | + expiration是消息生存周期,与delay相冲突,如果使用了expiration,delay就无效了 |
| 167 | + """ |
| 168 | + message_id = kwargs.get('message_id') |
| 169 | + expiration = kwargs.get('expiration') |
| 170 | + close_connection = kwargs.get('close_connection', False) |
| 171 | + priority = kwargs.get('priority', 0) |
| 172 | + delay = kwargs.get('delay') |
| 173 | + property_params = {"content_type": "application/json"} |
| 174 | + headers = {} |
| 175 | + property_params = self.arg_set('message_id', message_id, property_params) |
| 176 | + if not delay: |
| 177 | + # 如果要发送延时信息,就不要使用 生存周期了 |
| 178 | + property_params = self.arg_set('expiration', expiration, property_params) |
| 179 | + property_params = self.arg_set('priority', priority, property_params) |
| 180 | + |
| 181 | + # delay 是 aliyun,x-delay是 rabbitmq_delayed_message_exchange ,所以都加上了 |
| 182 | + headers = self.arg_set('delay', delay, headers) |
| 183 | + headers = self.arg_set('x-delay', delay, headers) |
| 184 | + |
| 185 | + property_params = self.arg_set('headers', headers, property_params) |
| 186 | + self.channel.basic_publish( |
| 187 | + exchange=exchange, |
| 188 | + routing_key=routing_key, |
| 189 | + body=message, |
| 190 | + properties=pika.BasicProperties(**property_params)) |
| 191 | + if close_connection: |
| 192 | + self.close_connection() |
| 193 | + |
| 194 | + @reconnect_queue_if_close |
| 195 | + def consume(self, queue, auto_ack=False, inactivity_timeout=None, prefetch_count=None): |
| 196 | + if prefetch_count: |
| 197 | + self.channel.basic_qos(prefetch_count=prefetch_count) |
| 198 | + result = self.channel.consume(queue, auto_ack, inactivity_timeout=inactivity_timeout) |
| 199 | + for nn in result: |
| 200 | + yield nn |
| 201 | + |
| 202 | + @reconnect_queue_if_close |
| 203 | + def run(self): |
| 204 | + try: |
| 205 | + self.channel.start_consuming() |
| 206 | + finally: |
| 207 | + self.channel.stop_consuming() |
| 208 | + self.close_connection() |
| 209 | + |
| 210 | + @reconnect_queue_if_close |
| 211 | + def ack_message(self, method): |
| 212 | + self.channel.basic_ack(delivery_tag=method.delivery_tag) |
| 213 | + |
| 214 | + @reconnect_queue_if_close |
| 215 | + def rej_message(self, method, requeue=False): |
| 216 | + self.channel.basic_reject(delivery_tag=method.delivery_tag, requeue=requeue) |
0 commit comments