-
Notifications
You must be signed in to change notification settings - Fork 9
/
Copy pathwecom.py
81 lines (71 loc) · 2.61 KB
/
wecom.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
import time
import requests
class WecomMessage:
def __init__(self, corpid: str, agentid: int, secret: str):
"""
初始化企业微信消息发送类
:param corpid: 企业ID
:param agentid: 应用ID
:param secret: 应用的Secret
"""
self.corpid = corpid
self.agentid = agentid
self.secret = secret
self.access_token = None
self.token_expires_time = 0
def get_access_token(self) -> str:
"""
获取访问令牌
:return: access_token
"""
now = time.time()
if self.access_token and now < self.token_expires_time:
return self.access_token
url = "https://qyapi.weixin.qq.com/cgi-bin/gettoken"
params = {
"corpid": self.corpid,
"corpsecret": self.secret
}
try:
response = requests.get(url, params=params)
data = response.json()
if data.get("errcode") == 0:
self.access_token = data.get("access_token")
self.token_expires_time = now + data.get("expires_in") - 200 # 提前200秒刷新
return self.access_token
else:
raise Exception(f"获取access_token失败: {data}")
except Exception as e:
print(f"获取access_token异常: {e}")
raise
def send_text(self, content: str, touser: str = "@all") -> bool:
"""
发送文本消息
:param content: 消息内容
:param touser: 接收人,默认为所有人
:return: 是否发送成功
"""
try:
message = {
"touser": touser,
"msgtype": "text",
"agentid": self.agentid,
"text": {
"content": content
},
"enable_duplicate_check": 1,
"duplicate_check_interval": 1800
}
access_token = self.get_access_token()
url = f"https://qyapi.weixin.qq.com/cgi-bin/message/send?access_token={access_token}"
response = requests.post(url, json=message)
result = response.json()
if result.get("errcode") == 0:
print(f"企业微信消息发送成功")
return True
else:
print(f"企业微信消息发送失败: {result}")
return False
except Exception as e:
print(f"企业微信消息发送异常: {e}")
return False