-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathrsss.py
274 lines (256 loc) · 9.51 KB
/
rsss.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
import feedparser
import ssl
import yaml
import requests
import sqlite3
import os
import logging
import urllib3
import time
import re
from sqlite3 import OperationalError
ssl._create_default_https_context = ssl._create_unverified_context
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")
logger = logging.getLogger(__name__)
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
class rssSpider:
# acsill 字符背景
def bannner(self):
print(f"""
8888888b. .d8888b. .d8888b. .d8888b.
888 Y88b d88P Y88b d88P Y88b d88P Y88b
888 888 Y88b. Y88b. Y88b.
888 d88P "Y888b. "Y888b. "Y888b.
8888888P" "Y88b. "Y88b. "Y88b.
888 T88b "888 "888 "888
888 T88b Y88b d88P Y88b d88P Y88b d88P
888 T88b "Y8888P" "Y8888P" "Y8888P"
[*] : Font: colossal
[*] : RSSS version 1.0
[*] : python3 rsss.py
[*] : {time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())}
""")
# 程序初始化
def __init__(self,configYamlPath):
self.bannner()
logger.info("程序初始化")
configDict = self.readConfigYaml(configYamlPath)
try:
# 获取飞书token
self.feishuToken = configDict["feishuToken"]
# 获取本地数据库文件名
self.sqlDataName = configDict["sqlDataName"]
# 获取数据库表名
self.sqlTableName = configDict["sqlTableName"]
# 获取数据库建表语句
self.sqlCretaTable = configDict["sqlCretaTable"]
# 创建数据库
self.createSqlite(self.sqlDataName,self.sqlCretaTable)
# 获取 filterSearch
self.filterSearch = configDict["filterSearch"]
self.rssPath = configDict["rssPath"]
except KeyError as e:
logger.info(e)
print(f"config.yaml中缺少必有条件: {e},程序退出")
exit()
# 过滤条件匹配
def filterNameSearch(self,title):
if self.filterSearch is None:
return True
else:
# 循环过滤条件
for i in self.filterSearch:
print("+====+",i,title)
# 如果标题中存在需匹配的返回True
if re.search(i,title, re.MULTILINE | re.IGNORECASE):
return True
return False
# 推送飞书请求
def feishuRequests(self,a,rssName):
url = self.feishuToken
feiShuData = {
"msg_type": "post",
"content": {
"post": {
"zh_cn": {
"title": f"{a[0]}",
"content": [
[
{
"tag": "text",
"text": f"链接 : {a[1]}" + "\n"
},
{
"tag": "text",
"text": f"日期 : {a[2]}" + "\n"
},
{
"tag": "text",
"text": f"来源 : {rssName}" + "\n"
}
]
]
}
}
}
}
try:
resp = requests.post(url=url, json=feiShuData, verify=False)
logger.info(resp.text)
except requests.exceptions.Timeout as e:
try:
print("重新尝试,发送数据")
time.sleep(3)
resp = requests.post(url=url, json=feiShuData, verify=False)
logger.info(resp.text)
except requests.exceptions.Timeout as e:
print("请求超时,程序退出")
exit()
# 判断添加数据库
def fsRequests(self,rssDict,rssName):
logger.info("准备发送请求到飞书")
reqLenCheck = 0
for i in rssDict.keys():
reqLen = len(rssDict[i])
if reqLenCheck == 0:
reqLenCheck = reqLen
continue
for i in range(reqLenCheck):
a = []
for h in rssDict.keys():
a.append(rssDict[h][i])
if self.initSelectDataSqlite is True:
if self.filterNameSearch(a[0]) is True:
self.insertDataSqlite(a[0],a[1],rssName,a[2])
self.feishuRequests(a,rssName=rssName)
else:
continue
else:
if self.filterNameSearch(a[0]) is True:
if self.selectDataSqlite(column="link",link=a[1]) is True and self.selectDataSqlite(column="title",link=a[0]) is True:
self.insertDataSqlite(a[0],a[1],rssName,a[2])
self.feishuRequests(a,rssName=rssName)
else:
continue
else:
continue
# 查询数据
def selectDataSqlite(self,column,link):
with sqlite3.connect(self.sqlDataName) as conn:
cursor = conn.cursor()
cursor.execute(f"SELECT * FROM {self.sqlTableName} WHERE {column}='{link}'")
rows = cursor.fetchall()
if len(rows) == 0:
return True
else:
return False
# 添加数据
def insertDataSqlite(self,title,link,rssName,date):
with sqlite3.connect(self.sqlDataName) as conn:
cursor = conn.cursor()
sql = f"insert into rssTable(title, link, rssName,date) VALUES (?,?,?,?);"
cursor.execute(sql,(title,link,rssName,date))
# 初始化数据
def initSelectDataSqlite(self):
with sqlite3.connect(self.sqlDataName) as conn:
cursor = conn.cursor()
cursor.execute(f"select link from {self.sqlTableName} ;")
rows = cursor.fetchall()
if len(rows) == 0:
return True
else:
return False
# 创建数据库
def createSqlite(self,sqlDataName,sqlCretaTable):
logger.info("检查数据库是否存在")
if os.path.exists(sqlDataName) is False:
# 创建数据库
logger.info("创建数据库")
conn = sqlite3.connect(sqlDataName)
cur = conn.cursor()
try:
sql = sqlCretaTable
cur.execute(sql)
logger.info("创建表成功")
return True
except OperationalError as o:
logger.info(f"{str(o)}")
pass
if str(o) == "table gas_price already exists":
return True
return False
except Exception as e:
logger.exception(e)
return False
finally:
cur.close()
conn.close()
else:
logger.info(f"{sqlDataName} 数据库已存在")
# 发送rss请求
def rssRequests(self,rssUrl,rssFormat,rssName):
logger.info(f"读取链接: {rssUrl}")
logger.info(f"获取格式: {rssFormat}")
"""抓取开源中国RSS"""
# 网站种子解析
rss_oschina = feedparser.parse(rssUrl)
checkFormat = ""
for i in rssFormat.keys():
if i in rss_oschina.keys():
checkFormat=i
logger.info(f"匹配获取格式: {i}")
rssDict = {}
for i in rssFormat[checkFormat]:
rssList = []
for h in rss_oschina[checkFormat]:
try:
rssList.append(h[i])
except KeyError:
rssList.append("1")
rssDict[i] = rssList
logger.info(f"RSS链接解析完成: {rssUrl}")
self.fsRequests(rssDict,rssName)
# 获取到rss链接
def formatRssTargetDict(self,targetDict):
try:
self.rssRequests(rssUrl=targetDict['rsslink'],rssFormat=targetDict,rssName=targetDict['rssName'])
except KeyError as e:
print(f"请检查rssConfig.yaml中必有字段是否存在,程序退出")
exit()
# 解析rss链接
def formatRssTarget(self,configFormatDict):
for i in configFormatDict.keys():
self.formatRssTargetDict(configFormatDict[i])
# 读取配置文件
def readConfigYaml(self,yamlUrl):
# 获取读取格式
with open (yamlUrl) as fp:
configFormatDict=yaml.safe_load(fp)
# 在程序中使用
logger.info(f"读取配置文件: {yamlUrl}")
return configFormatDict
# 主程序
def main(self, rssYamlPath):
# 读取rssConfigYaml配置文件
configFormatDict = self.readConfigYaml(rssYamlPath)
# 解析rss链接
self.formatRssTarget(configFormatDict=configFormatDict)
if __name__ == "__main__":
# 捕获异常退出
try:
configYamlPath = f"{os.getcwd()}/config.yaml"
rs = rssSpider(configYamlPath)
rssYamlPath = rs.rssPath
rs.main(rssYamlPath)
except KeyboardInterrupt as e:
logger.info(e)
print("外部关闭,程序退出")
exit()
except FileNotFoundError as e:
logger.info(e)
print("文件不存在,程序退出")
exit()
except Exception as e:
logger.exception(e)
print("未知异常,程序退出")
exit()