-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmessage_parser.py
151 lines (126 loc) · 5.39 KB
/
message_parser.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
from datetime import datetime
import json
from analysis import FinanceAnalyzer
from db.services import TransactionService
from config import APP_ID, APP_SECRET, WX_ID
from feishu.table import FeishuTable
from util.date_util import get_date
class MessageParser:
def __init__(self):
self.analyzer = FinanceAnalyzer()
self.service = TransactionService()
self.feishu_table = FeishuTable(APP_ID, APP_SECRET)
def clean_text(self, text):
"""清理文本内容"""
if text is None:
return ""
return text.strip()
def insert_feishu(self, values):
"""向飞书表格插入数据"""
self.feishu_table.insert_data(values)
def parse_msg_xml(self, content):
from ai.services.manager import AIManager
"""解析XML消息内容"""
try:
with open('./msg.xml', 'a+', encoding='utf-8') as f:
f.write(content + '\n')
ai_manager = AIManager()
response = ai_manager.simple_chat(content)
data = json.loads(response.content)
print("response:", response.content)
if data['publisher'] and data['publisher'].endswith('银行') and data['标题'] == '交易提醒':
data['transaction_time'] = datetime.now().strftime('%Y年%m月%d日 %H:%M:%S')
data.pop('标题')
self.service.create(data)
values = [[data['transaction_time'], data['type'], data['amount'], data['remark']]]
self.insert_feishu(values)
return data
print("不符合条件,跳过解析。")
return None
except Exception as e:
print(f"Error with : {str(e)}")
return None
def parse_msg_self(self, content: str):
if content.startswith('#') is False and content.startswith('@AI') is False:
return
from finbot import FinBot
"""解析自定义消息内容"""
data = []
finBot = FinBot()
# 处理AI对话
if content.startswith('@AI'):
self.analyzer.chat_with_ai(content.split(" ")[1])
return
# 处理命令匹配
match content:
# case '#全部数据':
# data = self.service.get_all_transactions(desc=False)
case '#昨日数据':
data = self.analyzer.get_date_transactions(start_time=get_date(-1))
case '#今日数据':
data = self.analyzer.get_date_transactions(start_time=get_date())
# 处理汇总命令
if content.startswith('#汇总@'):
params = content.split('@')
data = self.analyzer.get_today_summary(date_str=params[1])
# 检查数据是否为空
if not data:
finBot.send_text_msg('没有数据')
return
# 分批发送数据
self._send_batch_data(data, finBot)
def _send_batch_data(self, data, finBot):
"""分批发送数据"""
batch_size = 3 # 每批包含的交易数量
start_index = 0
while start_index < len(data):
current_batch = data[start_index:start_index + batch_size]
batch_messages = []
# 获取当前批次的数据
for transaction in current_batch:
transaction_lines = self._format_transaction(transaction)
batch_messages.append("\n".join(transaction_lines))
msg = "\n".join(batch_messages)
finBot.send_text_msg(msg)
# 更新start_index以指向下一个批次的起始位置
start_index += batch_size
def _format_transaction(self, transaction):
"""格式化交易数据"""
transaction_lines = []
if type(transaction) is dict:
if 'date' in transaction:
transaction_lines.append(f"时间: {transaction['date']}")
if 'type' in transaction:
transaction_lines.append(f"类型: {transaction['type']}")
if 'amount' in transaction:
transaction_lines.append(f"金额:{transaction['amount']}")
if 'income' in transaction:
transaction_lines.append(f"收入: {transaction['income']}")
if 'expenses' in transaction:
transaction_lines.append(f"支出: {transaction['expenses']}")
if 'diff' in transaction:
transaction_lines.append(f"与前一日对比: {transaction['diff']}")
if 'remark' in transaction:
transaction_lines.append(f"备注: {transaction['remark']}")
# 添加分隔线
transaction_lines.append("---------------------------------")
return transaction_lines
# 为了保持向后兼容,提供与原始函数相同的接口
# def clean_text(text):
# parser = MessageParser()
# return parser.clean_text(text)
#
# def insert_feishu(values):
# parser = MessageParser()
# parser.insert_feishu(values)
#
# def parse_msg_xml(content):
# parser = MessageParser()
# return parser.parse_msg_xml(content)
#
# def parse_msg_self(content: str, wcf: Wcf):
# parser = MessageParser()
# parser.parse_msg_self(content, wcf)
# if __name__ == '__main__':
# with open('msg.xml', mode='a+', encoding='utf-8') as f:
# f.write('\n' + 'dddddd')