-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathfinbot.py
96 lines (78 loc) · 3.08 KB
/
finbot.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
# -*- coding: utf-8 -*-
import logging
import time
from queue import Empty
from threading import Thread
import schedule
from wcferry import Wcf, WxMsg
from config import WX_ID
class FinBot:
"""财务机器人类"""
_instance = None
_initialized = False
def __new__(cls, wcf=None):
if cls._instance is None:
cls._instance = super().__new__(cls)
return cls._instance
def __init__(self) -> None:
if not self._initialized:
print('FinBot.__init__()')
self.wcf = Wcf(debug=True)
self.LOG = logging.getLogger("Robot")
self.wxid = self.wcf.get_self_wxid() if self.wcf else None
self._initialized = True
def processMsg(self, msg: WxMsg) -> None:
from message_parser import MessageParser
msg_parser = MessageParser()
"""当接收到消息的时候,会调用本方法。如果不实现本方法,则打印原始消息。
此处可进行自定义发送的内容,如通过 msg.content 关键字自动获取当前天气信息,并发送到对应的群组@发送者
群号:msg.roomid 微信ID:msg.sender 消息内容:msg.content
content = "xx天气信息为:"
receivers = msg.roomid
self.sendTextMsg(content, receivers, msg.sender)
"""
if msg.type == 1 and msg.sender == WX_ID:
msg_parser.parse_msg_self(msg.content)
if msg.type == 49:
msg_parser.parse_msg_xml(msg.content)
def enableReceivingMsg(self) -> None:
def innerProcessMsg(wcf: Wcf):
while wcf.is_receiving_msg():
try:
msg = wcf.get_msg()
# self.LOG.info(msg)
self.processMsg(msg)
except Empty:
continue # Empty message
except Exception as e:
self.LOG.error(f"Receiving message error: {e}")
# print(f'消息类型: ${self.wcf.get_msg_types()}')
self.wcf.enable_receiving_msg()
Thread(target=innerProcessMsg, name="GetMessage", args=(self.wcf,), daemon=True).start()
def getAllContacts(self) -> dict:
"""
获取联系人(包括好友、公众号、服务号、群成员……)
格式: {"wxid": "NickName"}
"""
contacts = self.wcf.query_sql("MicroMsg.db", "SELECT UserName, NickName FROM Contact;")
return {contact["UserName"]: contact["NickName"] for contact in contacts}
def keepRunningAndBlockProcess(self) -> None:
"""
保持机器人运行,不让进程退出
"""
while True:
schedule.run_pending()
time.sleep(1)
def send_text_msg(self, content: str, retry: int=3) -> None:
if self.wcf:
cnt = 0
while cnt < retry:
result = self.wcf.send_text(content, WX_ID)
if result == 0:
return
time.sleep(1)
cnt += 1
else:
print('wcf没有初始化')
def clean(self):
self.wcf.cleanup()