-
Notifications
You must be signed in to change notification settings - Fork 87
/
alist.py
407 lines (281 loc) · 11.3 KB
/
alist.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
from __future__ import annotations
import functools
import os
from datetime import timedelta
from urllib.parse import quote
import requests
from const import downloads_dir
from dao import ConfigInterface, to_raw_type
from download import DOWNLOAD_CONNECT_TIMEOUT, download_file, progress_callback_func_type
from log import color, logger
from server import get_alist_server_addr
from util import KiB, get_now, human_readable_size, reset_cache, with_cache
fn_SERVER_ADDR = get_alist_server_addr
def _make_api(api_name="/") -> str:
return f"{fn_SERVER_ADDR()}{api_name}"
def fn_API_LOGIN():
return _make_api("/api/auth/login")
def fn_API_UPLOAD():
return _make_api("/api/fs/put")
def fn_API_DOWNLOAD():
return _make_api("/api/fs/get")
def fn_API_LIST():
return _make_api("/api/fs/list")
def fn_API_REMOVE():
return _make_api("/api/fs/remove")
NORMAL_TIMEOUT = 8
UPLOAD_TIMEOUT = 60 * 5
alist_session = requests.session()
alist_session.request = functools.partial(alist_session.request, timeout=NORMAL_TIMEOUT) # type: ignore
class CommonResponse(ConfigInterface):
def __init__(self):
self.code = 200
self.message = "success"
self.data = {}
def generate_exception(res: CommonResponse, ctx: str) -> Exception:
return Exception(f"alist {ctx} failed, code={res.code}, message={res.message}")
class LoginRequest(ConfigInterface):
def __init__(self):
self.username = ""
self.password = ""
self.otp_code = ""
class LoginResponse(ConfigInterface):
def __init__(self):
self.token = ""
class ListRequest(ConfigInterface):
def __init__(self):
self.path = ""
self.password = ""
self.refresh = False
self.page = 1
self.per_page = 0
class ListResponse(ConfigInterface):
def __init__(self):
self.provider = "189Cloud"
self.readme = ""
self.write = True
self.total = 3
self.content: list[Content] = []
def fields_to_fill(self) -> list[tuple[str, type[ConfigInterface]]]:
return [
("content", Content),
]
class Content(ConfigInterface):
def __init__(self):
self.name = "自制小工具"
self.size = 0
self.is_dir = True
self.modified = "2022-10-28T10:38:13+08:00"
self.sign = ""
self.thumb = ""
self.type = 1
class DownloadRequest(ConfigInterface):
def __init__(self):
self.path = ""
self.password = ""
class DownloadResponse(ConfigInterface):
def __init__(self):
self.name = "DNF蚊子腿小助手_v20.0.1_by风之凌殇.7z"
self.size = 51111681
self.is_dir = False
self.modified = "2022-10-28T10:38:13+08:00"
self.sign = "h9zSwqaagxTsodawAGF53ICbv19t0_y9FJP2qj2gjhA=:0"
self.thumb = ""
self.type = 0
self.raw_url = ""
self.readme = ""
self.provider = "Aliyundrive"
self.related = None
# 用于构建下载链接的参数,使用时设置
self.remote_file_path = ""
def get_url(self) -> str:
if self.sign != "" and self.remote_file_path != "":
# 尽量使用alist的下载接口做中转,这样服务器日志方便查看下载情况
return f"{fn_SERVER_ADDR()}/d{self.remote_file_path}?sign={self.sign}"
return self.raw_url
class RemoveRequest(ConfigInterface):
def __init__(self):
self.dir = "/"
self.names: list[str] = []
def login(username: str, password: str, otp_code: str = "") -> str:
"""
登录alist,获取上传所需token
"""
return with_cache(
"alist", "login", cache_max_seconds=24 * 60 * 60, cache_miss_func=lambda: _login(username, password, otp_code)
)
def _login(username: str, password: str, otp_code: str = "") -> str:
req = LoginRequest()
req.username = username
req.password = password
req.otp_code = otp_code
raw_res = alist_session.post(fn_API_LOGIN(), json=to_raw_type(req))
res = CommonResponse().auto_update_config(raw_res.json())
if res.code != 200:
raise generate_exception(res, "login")
data = LoginResponse().auto_update_config(res.data)
return data.token
def format_remote_file_path(remote_file_path: str) -> str:
"""
确保远程路径以 / 开头
"""
if not remote_file_path.startswith("/"):
remote_file_path = "/" + remote_file_path
return remote_file_path
def upload(local_file_path: str, remote_file_path: str = "", old_version_name_prefix: str = ""):
if remote_file_path == "":
remote_file_path = os.path.basename(local_file_path)
remote_file_path = format_remote_file_path(remote_file_path)
actual_size = os.stat(local_file_path).st_size
file_size = human_readable_size(actual_size)
logger.info(f"开始上传 {local_file_path} ({file_size}) 到网盘,远程路径为 {remote_file_path}")
start_time = get_now()
with open(local_file_path, "rb") as file_to_upload:
raw_res = alist_session.put(
fn_API_UPLOAD(),
data=file_to_upload,
headers={
"File-Path": quote(remote_file_path),
"As-Task": "false",
"Authorization": login_using_env(),
},
timeout=UPLOAD_TIMEOUT,
)
res = CommonResponse().auto_update_config(raw_res.json())
if res.code != 200:
raise generate_exception(res, "upload")
end_time = get_now()
used_time = end_time - start_time
speed = actual_size / used_time.total_seconds()
human_readable_speed = human_readable_size(speed)
logger.info(color("bold_yellow") + f"上传完成,耗时 {used_time}({human_readable_speed}/s)")
remote_dir = os.path.dirname(remote_file_path)
remote_filename = os.path.basename(remote_file_path)
if old_version_name_prefix != "":
remove_file_startswith_prefix(remote_dir, old_version_name_prefix, [remote_filename])
logger.info("旧版本处理完毕,将开始实际上传流程")
logger.info("上传完毕后强制刷新该目录,确保后续访问可以看到新文件")
get_file_list(remote_dir, refresh=True)
def remove_file_startswith_prefix(remote_dir: str, name_prefix: str, except_filename_list: list[str] | None = None):
logger.info(f"将移除网盘目录 {remote_dir} 中 前缀为 {name_prefix} 的文件")
dir_file_list_info = get_file_list(remote_dir, refresh=True)
for file_info in dir_file_list_info.content:
if file_info.is_dir:
continue
if not file_info.name.startswith(name_prefix):
continue
if except_filename_list is not None and file_info.name in except_filename_list:
# 不包括最新上传的文件,因为alist会自动覆盖相同名字的文件
continue
remove(os.path.join(remote_dir, file_info.name))
def is_file_in_folder(remote_dir: str, filename: str) -> bool:
remote_filepath = os.path.join(remote_dir, filename)
try:
get_download_info(remote_filepath)
return True
except Exception:
return False
def get_download_info(remote_file_path: str) -> DownloadResponse:
remote_file_path = format_remote_file_path(remote_file_path)
req = DownloadRequest()
req.path = remote_file_path
req.password = ""
raw_res = alist_session.post(fn_API_DOWNLOAD(), json=to_raw_type(req))
res = CommonResponse().auto_update_config(raw_res.json())
if res.code != 200:
raise generate_exception(res, "download")
data = DownloadResponse().auto_update_config(res.data)
data.remote_file_path = remote_file_path
return data
def download_from_alist(
remote_file_path: str,
download_dir=downloads_dir,
filename="",
connect_timeout=DOWNLOAD_CONNECT_TIMEOUT,
extra_progress_callback: progress_callback_func_type | None = None,
) -> str:
download_info = get_download_info(remote_file_path)
if filename == "":
filename = download_info.name
guess_speed = 300 * KiB
guess_time = timedelta(seconds=download_info.size / guess_speed)
extra_info = f"文件大小为 {human_readable_size(download_info.size)}(进度条可能不会显示,请耐心等待。若下载速度为 {human_readable_size(guess_speed)}/s 预计耗时 {guess_time})"
return download_file(download_info.get_url(), download_dir, filename, extra_info=extra_info)
def get_file_list(
remote_dir_path: str, password: str = "", page: int = 1, per_page: int = 0, refresh=False
) -> ListResponse:
req = ListRequest()
req.path = remote_dir_path
req.password = password
req.page = page
req.per_page = per_page
req.refresh = refresh
headers = {}
if refresh:
# 刷新需要token
headers = {
"Authorization": login_using_env(),
}
raw_res = alist_session.post(fn_API_LIST(), json=to_raw_type(req), headers=headers)
res = CommonResponse().auto_update_config(raw_res.json())
if res.code != 200:
if res.code == 401 and "expired" in res.message:
reset_cache("alist")
raise generate_exception(res, "list")
data = ListResponse().auto_update_config(res.data)
return data
def remove(remote_file_path: str):
remote_file_path = format_remote_file_path(remote_file_path)
dir = os.path.dirname(remote_file_path)
file_name = os.path.basename(remote_file_path)
logger.info(f"开始删除网盘文件 {remote_file_path}")
req = RemoveRequest()
req.dir = dir
req.names = [
file_name,
]
raw_res = alist_session.post(
fn_API_REMOVE(),
json=to_raw_type(req),
headers={
"Authorization": login_using_env(),
},
)
res = CommonResponse().auto_update_config(raw_res.json())
if res.code != 200:
raise generate_exception(res, "remove")
logger.info(color("bold_yellow") + "删除完成")
def get_username_password_from_env() -> tuple[str, str]:
username = str(os.getenv("ALIST_USERNAME", ""))
password = str(os.getenv("ALIST_PASSWORD", ""))
if username == "" or password == "":
raise Exception("请在环境变量中设置 ALIST_USERNAME 和 ALIST_PASSWORD,否则将无法登录alist")
return username, password
def login_using_env() -> str:
username, password = get_username_password_from_env()
return login(username, password)
def demo_login():
username, password = get_username_password_from_env()
cached_token = login(username, password)
logger.info(f"cached_token = {cached_token}")
uncached_token = _login(username, password)
logger.info(f"uncached_token = {uncached_token}")
def demo_upload():
upload(
"C:/Users/fzls/Downloads/chromedriver_102.exe",
"/文本编辑器、chrome浏览器、autojs、HttpCanary等小工具/chromedriver_102.exe",
)
def demo_download():
filepath = download_from_alist("/文本编辑器、chrome浏览器、autojs、HttpCanary等小工具/chromedriver_102.exe")
logger.info(f"最终下载路径为 {filepath}")
def demo_list():
res = get_file_list("/", refresh=True)
logger.info(res)
def demove_remove():
remove("/文本编辑器、chrome浏览器、autojs、HttpCanary等小工具/chromedriver_102.exe")
if __name__ == "__main__":
# demo_login()
# demo_upload()
demo_download()
# demo_list()
# demove_remove()