diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 90bfda05f2..46aea7c264 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -35,3 +35,42 @@ jobs: with: sh_checker_comment: true sh_checker_exclude: "^deprecation/* ^xiaoyahelper/*" + + ruff: + runs-on: ubuntu-latest + steps: + - + name: Checkout + uses: actions/checkout@v4 + + - + name: Ruff + uses: astral-sh/ruff-action@v3 + + pylint: + runs-on: ubuntu-latest + strategy: + matrix: + python-version: ["3.12"] + steps: + - + name: Checkout + uses: actions/checkout@v4 + + - + name: Set up Python ${{ matrix.python-version }} + uses: actions/setup-python@v5 + with: + python-version: ${{ matrix.python-version }} + + - name: Install dependencies + run: | + python -m pip install --upgrade pip + pip install pylint + pip install typing-extensions + pip install -r aliyuntvtoken_connector/requirements.txt + pip install -r glue_python/requirements.txt + + - name: Analysing the code with pylint + run: | + pylint --rcfile=$(pwd)/.pylintrc --disable=R,C0301 $(git ls-files '*.py') diff --git a/.pylintrc b/.pylintrc new file mode 100644 index 0000000000..7615b8695e --- /dev/null +++ b/.pylintrc @@ -0,0 +1,2 @@ +[FORMAT] +max-line-length=120 diff --git a/.vscode/extensions.json b/.vscode/extensions.json new file mode 100644 index 0000000000..5e14bf4a1f --- /dev/null +++ b/.vscode/extensions.json @@ -0,0 +1,7 @@ +{ + "recommendations": [ + "ms-python.python", + "ms-python.vscode-pylance", + "charliermarsh.ruff" + ] +} diff --git a/aliyuntvtoken_connector/main.py b/aliyuntvtoken_connector/main.py index 418c53d867..80a70c7cd9 100644 --- a/aliyuntvtoken_connector/main.py +++ b/aliyuntvtoken_connector/main.py @@ -1,21 +1,21 @@ -from flask import Flask, request, Response -from Crypto.Cipher import AES -from Crypto.Util.Padding import unpad +# pylint: disable=C0114 +# pylint: disable=C0116 import json import base64 -import requests -import requests -import json import uuid import hashlib -import base64 import random +import requests +from flask import Flask, request, Response +from Crypto.Cipher import AES +from Crypto.Util.Padding import unpad + app = Flask(__name__) headers = { "token": "6733b42e28cdba32", - 'User-Agent': 'Mozilla/5.0 (Linux; U; Android 9; zh-cn; SM-S908E Build/TP1A.220624.014) AppleWebKit/533.1 (KHTML, like Gecko) Mobile Safari/533.1', + 'User-Agent': 'Mozilla/5.0 (Linux; U; Android 9; zh-cn; SM-S908E Build/TP1A.220624.014) AppleWebKit/533.1 (KHTML, like Gecko) Mobile Safari/533.1', # noqa: E501 'Host': 'api.extscreen.com' } @@ -23,7 +23,7 @@ def h(char_array, modifier): unique_chars = list(dict.fromkeys(char_array)) numeric_modifier = int(modifier[7:]) - transformed_string = "".join([chr(abs(ord(c) - (numeric_modifier % 127) - 1) + 33 if abs(ord(c) - (numeric_modifier % 127) - 1) < 33 else + transformed_string = "".join([chr(abs(ord(c) - (numeric_modifier % 127) - 1) + 33 if abs(ord(c) - (numeric_modifier % 127) - 1) < 33 else # noqa: E501 abs(ord(c) - (numeric_modifier % 127) - 1)) for c in unique_chars]) return transformed_string @@ -74,11 +74,11 @@ def oauth_token(): "refresh_token": refresh_token } - timestamp = str(requests.get('http://api.extscreen.com/timestamp').json()['data']['timestamp']) + timestamp = str(requests.get('http://api.extscreen.com/timestamp', timeout=10).json()['data']['timestamp']) unique_id = uuid.uuid4().hex wifimac = str(random.randint(10**11, 10**12 - 1)) - resp = requests.post("http://api.extscreen.com/aliyundrive/v3/token", data=req_body, headers={**get_params(timestamp, unique_id, wifimac), **headers}) + resp = requests.post("http://api.extscreen.com/aliyundrive/v3/token", data=req_body, headers={**get_params(timestamp, unique_id, wifimac), **headers}, timeout=10) # noqa: E501 if resp.status_code == 200: resp_data = resp.json() ciphertext = resp_data["data"]["ciphertext"] diff --git a/glue_python/115cookie/115cookie.py b/glue_python/115cookie/115cookie.py index 10b101b82d..14cdd48096 100644 --- a/glue_python/115cookie/115cookie.py +++ b/glue_python/115cookie/115cookie.py @@ -1,48 +1,54 @@ +# pylint: disable=C0103 +# pylint: disable=C0114 #!/usr/local/bin/python3 __author__ = "ChenyangGao " __license__ = "GPLv3 " -from flask import Flask, render_template, jsonify import threading import time import os import base64 import logging import argparse -import qrcode import sys -from PIL import Image from io import BytesIO from enum import Enum from json import loads from urllib.parse import urlencode from urllib.request import urlopen, Request +import qrcode +from flask import Flask, render_template, jsonify +from PIL import Image + -app = Flask(__name__) +flask_app = Flask(__name__) logging.basicConfig(level=logging.INFO) -last_status = 0 +LAST_STATUS = 0 AppEnum = Enum("AppEnum", { - "web": 1, - "ios": 6, - "115ios": 8, - "android": 9, - "115android": 11, - "115ipad": 14, - "tv": 15, - "qandroid": 16, - "windows": 19, - "mac": 20, - "linux": 21, - "wechatmini": 22, - "alipaymini": 23, + "web": 1, + "ios": 6, + "115ios": 8, + "android": 9, + "115android": 11, + "115ipad": 14, + "tv": 15, + "qandroid": 16, + "windows": 19, + "mac": 20, + "linux": 21, + "wechatmini": 22, + "alipaymini": 23, }) def get_enum_name(val, cls): + """ + 获取值其对应的名称 + """ if isinstance(val, cls): return val.name try: @@ -130,7 +136,7 @@ def post_qrcode_result(uid, app="web"): """ app = get_enum_name(app, AppEnum) payload = {"app": app, "account": uid} - api = "https://passportapi.115.com/app/1.0/%s/1.0/login/qrcode/" % app + api = f"https://passportapi.115.com/app/1.0/{app}/1.0/login/qrcode/" return loads(urlopen(Request(api, data=urlencode(payload).encode("utf-8"), method="POST")).read()) @@ -143,59 +149,72 @@ def get_qrcode(uid: str, /) -> str: def qrcode_token_url(uid: str, /) -> str: """获取二维码图片的扫码链接 - :return: 扫码链接 + :return: 扫码链接 """ return "http://115.com/scan/dg-" + uid -def poll_qrcode_status(qrcode_token, qrcode_app): - global last_status +# pylint: disable=W0603 +def poll_qrcode_status(_qrcode_token, qrcode_app): + """ + 循环等待扫码 + """ + global LAST_STATUS while True: time.sleep(1) - resp = get_qrcode_status(qrcode_token) - status = resp["data"].get("status") - if status == 2: - resp = post_qrcode_result(qrcode_token["uid"], qrcode_app) + resp = get_qrcode_status(_qrcode_token) + _status = resp["data"].get("status") + if _status == 2: + resp = post_qrcode_result(_qrcode_token["uid"], qrcode_app) cookie_data = resp['data']['cookie'] - cookie_str = "; ".join("%s=%s" % t for t in cookie_data.items()) + cookie_str = "; ".join(f"{key}={value}" for key, value in cookie_data.items()) if sys.platform.startswith('win32'): - with open('115_cookie.txt', 'w') as f: + with open('115_cookie.txt', 'w', encoding='utf-8') as f: f.write(cookie_str) else: - with open('/data/115_cookie.txt', 'w') as f: + with open('/data/115_cookie.txt', 'w', encoding='utf-8') as f: f.write(cookie_str) logging.info('扫码成功, cookie 已写入文件!') - last_status = 1 - elif status in [-1, -2]: + LAST_STATUS = 1 + elif _status in [-1, -2]: logging.error('扫码失败') - last_status = 2 + LAST_STATUS = 2 -@app.route('/') +@flask_app.route('/') def index(): - qrcode_token = get_qrcode_token()["data"] - uid = qrcode_token["uid"] + """ + 网页扫码首页 + """ + _qrcode_token = get_qrcode_token()["data"] + uid = _qrcode_token["uid"] qrcode_image_io = get_qrcode(uid) qrcode_image = Image.open(qrcode_image_io) buffered = BytesIO() qrcode_image.save(buffered, format="PNG") qrcode_image_b64_str = base64.b64encode(buffered.getvalue()).decode('utf-8') - threading.Thread(target=poll_qrcode_status, args=(qrcode_token, args.qrcode_app)).start() + threading.Thread(target=poll_qrcode_status, args=(_qrcode_token, flask_app.config['QRCODE_APP'])).start() return render_template('index.html', qrcode_image_b64_str=qrcode_image_b64_str) -@app.route('/status') +@flask_app.route('/status') def status(): - if last_status == 1: + """ + 扫码状态获取 + """ + if LAST_STATUS == 1: return jsonify({'status': 'success'}) - elif last_status == 2: + elif LAST_STATUS == 2: return jsonify({'status': 'failure'}) else: return jsonify({'status': 'unknown'}) -@app.route('/shutdown_server', methods=['GET']) +@flask_app.route('/shutdown_server', methods=['GET']) def shutdown(): + """ + 退出进程 + """ os._exit(0) @@ -205,7 +224,8 @@ def shutdown(): parser.add_argument('--qrcode_app', type=str, default='alipaymini', help='扫码绑定设备') args = parser.parse_args() if args.qrcode_mode == 'web': - app.run(host='0.0.0.0', port=34256) + flask_app.config['QRCODE_APP'] = args.qrcode_app + flask_app.run(host='0.0.0.0', port=34256) elif args.qrcode_mode == 'shell': qrcode_token = get_qrcode_token()["data"] threading.Thread(target=poll_qrcode_status, args=(qrcode_token, args.qrcode_app,)).start() @@ -214,7 +234,7 @@ def shutdown(): qr.make(fit=True) logging.info('请打开 115网盘 扫描此二维码!') qr.print_ascii(invert=True, tty=sys.stdout.isatty()) - while last_status != 1 and last_status != 2: + while LAST_STATUS != 1 and LAST_STATUS != 2: time.sleep(1) os._exit(0) else: diff --git a/glue_python/aliyunopentoken/aliyunopentoken.py b/glue_python/aliyunopentoken/aliyunopentoken.py index 82f7af0174..6e0c75034c 100644 --- a/glue_python/aliyunopentoken/aliyunopentoken.py +++ b/glue_python/aliyunopentoken/aliyunopentoken.py @@ -1,24 +1,26 @@ +# pylint: disable=C0114 #!/usr/local/bin/python3 -import json -import requests import time import logging import os import threading import sys -import qrcode import argparse +import json + +import qrcode +import requests from flask import Flask, send_file, render_template, jsonify app = Flask(__name__) logging.basicConfig(level=logging.INFO) -last_status = 0 +LAST_STATUS = 0 if sys.platform.startswith('win32'): - qrcode_dir = 'qrcode.png' + QRCODE_DIR = 'qrcode.png' else: - qrcode_dir= '/aliyunopentoken/qrcode.png' + QRCODE_DIR= '/aliyunopentoken/qrcode.png' headers = { @@ -34,7 +36,7 @@ "Sec-Fetch-Dest": "empty", "Sec-Fetch-Mode": "cors", "Sec-Fetch-Site": "cross-site", - "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/127.0.0.0 Safari/537.36" + "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/127.0.0.0 Safari/537.36" # noqa: E501 } headers_2 = { "Accept": "*/*", @@ -51,34 +53,39 @@ "Sec-Fetch-Dest": "empty", "Sec-Fetch-Mode": "cors", "Sec-Fetch-Site": "cross-site", - "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/127.0.0.0 Safari/537.36" + "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/127.0.0.0 Safari/537.36" # noqa: E501 } +# pylint: disable=W0603 def poll_qrcode_status(data, log_print): - global last_status + """ + 循环等待扫码 + """ + global LAST_STATUS while True: - re = requests.get(f"https://api.xhofe.top/proxy/https://open.aliyundrive.com/oauth/qrcode/{data}/status", headers=headers) - if re.status_code == 200: - re_data = json.loads(re.text) - if re_data['status'] == "LoginSuccess": - authCode = re_data['authCode'] - data_2 = {"code": authCode,"grant_type": "authorization_code" ,"client_id": "" ,"client_secret": ""} - re = requests.post('https://api.xhofe.top/alist/ali_open/code', json=data_2, headers=headers_2) - if re.status_code == 200: - re_data = json.loads(re.text) - refresh_token = re_data['refresh_token'] + url = f"https://api.xhofe.top/proxy/https://open.aliyundrive.com/oauth/qrcode/{data}/status" + _re = requests.get(url, headers=headers, timeout=10) + if _re.status_code == 200: + _re_data = json.loads(_re.text) + if _re_data['status'] == "LoginSuccess": + auth_code = _re_data['authCode'] + data_2 = {"code": auth_code, "grant_type": "authorization_code", "client_id": "", "client_secret": ""} + _re = requests.post('https://api.xhofe.top/alist/ali_open/code', json=data_2, headers=headers_2, timeout=10) # noqa: E501 + if _re.status_code == 200: + _re_data = json.loads(_re.text) + refresh_token = _re_data['refresh_token'] if sys.platform.startswith('win32'): - with open('myopentoken.txt', 'w') as f: + with open('myopentoken.txt', 'w', encoding='utf-8') as f: f.write(refresh_token) else: - with open('/data/myopentoken.txt', 'w') as f: + with open('/data/myopentoken.txt', 'w', encoding='utf-8') as f: f.write(refresh_token) logging.info('扫码成功, opentoken 已写入文件!') - last_status = 1 + LAST_STATUS = 1 break else: - if json.loads(re.text)['code'] == 'Too Many Requests': + if json.loads(_re.text)['code'] == 'Too Many Requests': logging.warning("Too Many Requests 请一小时后重试!") break else: @@ -93,19 +100,28 @@ def poll_qrcode_status(data, log_print): @app.route("/") def index(): + """ + 网页扫码首页 + """ return render_template('index.html') @app.route('/image') def serve_image(): - return send_file(qrcode_dir, mimetype='image/png') + """ + 获取二维码图片 + """ + return send_file(QRCODE_DIR, mimetype='image/png') @app.route('/status') def status(): - if last_status == 1: + """ + 扫码状态获取 + """ + if LAST_STATUS == 1: return jsonify({'status': 'success'}) - elif last_status == 2: + elif LAST_STATUS == 2: return jsonify({'status': 'failure'}) else: return jsonify({'status': 'unknown'}) @@ -113,31 +129,35 @@ def status(): @app.route('/shutdown_server', methods=['GET']) def shutdown(): - if os.path.isfile(qrcode_dir): - os.remove(qrcode_dir) + """ + 退出进程 + """ + if os.path.isfile(QRCODE_DIR): + os.remove(QRCODE_DIR) os._exit(0) if __name__ == '__main__': - if os.path.isfile(qrcode_dir): - os.remove(qrcode_dir) + if os.path.isfile(QRCODE_DIR): + os.remove(QRCODE_DIR) parser = argparse.ArgumentParser(description='AliyunPan Open Token') parser.add_argument('--qrcode_mode', type=str, required=True, help='扫码模式') args = parser.parse_args() logging.info('二维码生成中...') - re_count = 0 + RE_COUNT = 0 while True: - re = requests.get('https://api.xhofe.top/alist/ali_open/qr', headers=headers) + re = requests.get('https://api.xhofe.top/alist/ali_open/qr', headers=headers, timeout=10) if re.status_code == 200: re_data = json.loads(re.content) sid = re_data['sid'] - qrCodeUrl = f"https://www.aliyundrive.com/o/oauth/authorize?sid={sid}" + # pylint: disable=C0103 + qr_code_url = f"https://www.aliyundrive.com/o/oauth/authorize?sid={sid}" qr = qrcode.QRCode(version=1, error_correction=qrcode.constants.ERROR_CORRECT_H, box_size=5, border=4) - qr.add_data(qrCodeUrl) + qr.add_data(qr_code_url) qr.make(fit=True) img = qr.make_image(fill_color="black", back_color="white") - img.save(qrcode_dir) - if os.path.isfile(qrcode_dir): + img.save(QRCODE_DIR) + if os.path.isfile(QRCODE_DIR): logging.info('二维码生成完成!') break else: @@ -145,8 +165,8 @@ def shutdown(): logging.warning("Too Many Requests 请一小时后重试!") os._exit(0) time.sleep(1) - re_count += 1 - if re_count == 3: + RE_COUNT += 1 + if RE_COUNT == 3: logging.error('二维码生成失败,退出进程') os._exit(1) if args.qrcode_mode == 'web': @@ -156,13 +176,13 @@ def shutdown(): threading.Thread(target=poll_qrcode_status, args=(sid, False)).start() logging.info('请打开阿里云盘扫描此二维码!') qr.print_ascii(invert=True, tty=sys.stdout.isatty()) - while last_status != 1: + while LAST_STATUS != 1: time.sleep(1) - if os.path.isfile(qrcode_dir): - os.remove(qrcode_dir) + if os.path.isfile(QRCODE_DIR): + os.remove(QRCODE_DIR) os._exit(0) else: logging.error('未知的扫码模式') - if os.path.isfile(qrcode_dir): - os.remove(qrcode_dir) + if os.path.isfile(QRCODE_DIR): + os.remove(QRCODE_DIR) os._exit(1) diff --git a/glue_python/aliyunopentoken/aliyunopentoken_nn.ci.py b/glue_python/aliyunopentoken/aliyunopentoken_nn.ci.py index b6d042d260..d562a278fd 100644 --- a/glue_python/aliyunopentoken/aliyunopentoken_nn.ci.py +++ b/glue_python/aliyunopentoken/aliyunopentoken_nn.ci.py @@ -1,50 +1,57 @@ +# pylint: disable=C0114 #!/usr/local/bin/python3 -import json -import requests import time import logging import os import threading import sys -import qrcode import argparse +import json + +import qrcode +import requests from flask import Flask, send_file, render_template, jsonify app = Flask(__name__) logging.basicConfig(level=logging.INFO) -last_status = 0 +LAST_STATUS = 0 if sys.platform.startswith('win32'): - qrcode_dir = 'qrcode.png' + QRCODE_DIR = 'qrcode.png' else: - qrcode_dir= '/aliyunopentoken/qrcode.png' + QRCODE_DIR= '/aliyunopentoken/qrcode.png' +# pylint: disable=W0603 def poll_qrcode_status(data, log_print): - global last_status + """ + 循环等待扫码 + """ + global LAST_STATUS while True: - re = requests.get(f"https://api-cf.nn.ci/proxy/https://open.aliyundrive.com/oauth/qrcode/{data}/status") - if re.status_code == 200: - re_data = json.loads(re.text) - if re_data['status'] == "LoginSuccess": - authCode = re_data['authCode'] - data_2 = {"code": authCode,"grant_type": "authorization_code" ,"client_id": "" ,"client_secret": ""} - re = requests.post('https://api-cf.nn.ci/alist/ali_open/code', json=data_2) - if re.status_code == 200: - re_data = json.loads(re.text) - refresh_token = re_data['refresh_token'] + url = f"https://api-cf.nn.ci/proxy/https://open.aliyundrive.com/oauth/qrcode/{data}/status" + _re = requests.get(url, timeout=10) + if _re.status_code == 200: + _re_data = json.loads(_re.text) + if _re_data['status'] == "LoginSuccess": + auth_code = _re_data['authCode'] + data_2 = {"code": auth_code, "grant_type": "authorization_code", "client_id": "", "client_secret": ""} + _re = requests.post('https://api-cf.nn.ci/alist/ali_open/code', json=data_2, timeout=10) + if _re.status_code == 200: + _re_data = json.loads(_re.text) + refresh_token = _re_data['refresh_token'] if sys.platform.startswith('win32'): - with open('myopentoken.txt', 'w') as f: + with open('myopentoken.txt', 'w', encoding='utf-8') as f: f.write(refresh_token) else: - with open('/data/myopentoken.txt', 'w') as f: + with open('/data/myopentoken.txt', 'w', encoding='utf-8') as f: f.write(refresh_token) logging.info('扫码成功, opentoken 已写入文件!') - last_status = 1 + LAST_STATUS = 1 break else: - if json.loads(re.text)['code'] == 'Too Many Requests': + if json.loads(_re.text)['code'] == 'Too Many Requests': logging.warning("Too Many Requests 请一小时后重试!") break else: @@ -59,19 +66,28 @@ def poll_qrcode_status(data, log_print): @app.route("/") def index(): + """ + 网页扫码首页 + """ return render_template('index.html') @app.route('/image') def serve_image(): - return send_file(qrcode_dir, mimetype='image/png') + """ + 获取二维码图片 + """ + return send_file(QRCODE_DIR, mimetype='image/png') @app.route('/status') def status(): - if last_status == 1: + """ + 扫码状态获取 + """ + if LAST_STATUS == 1: return jsonify({'status': 'success'}) - elif last_status == 2: + elif LAST_STATUS == 2: return jsonify({'status': 'failure'}) else: return jsonify({'status': 'unknown'}) @@ -79,31 +95,35 @@ def status(): @app.route('/shutdown_server', methods=['GET']) def shutdown(): - if os.path.isfile(qrcode_dir): - os.remove(qrcode_dir) + """ + 退出进程 + """ + if os.path.isfile(QRCODE_DIR): + os.remove(QRCODE_DIR) os._exit(0) if __name__ == '__main__': - if os.path.isfile(qrcode_dir): - os.remove(qrcode_dir) + if os.path.isfile(QRCODE_DIR): + os.remove(QRCODE_DIR) parser = argparse.ArgumentParser(description='AliyunPan Open Token') parser.add_argument('--qrcode_mode', type=str, required=True, help='扫码模式') args = parser.parse_args() logging.info('二维码生成中...') - re_count = 0 + RE_COUNT = 0 while True: - re = requests.get('https://api-cf.nn.ci/alist/ali_open/qr') + re = requests.get('https://api-cf.nn.ci/alist/ali_open/qr', timeout=10) if re.status_code == 200: re_data = json.loads(re.content) sid = re_data['sid'] - qrCodeUrl = f"https://www.aliyundrive.com/o/oauth/authorize?sid={sid}" + # pylint: disable=C0103 + qr_code_url = f"https://www.aliyundrive.com/o/oauth/authorize?sid={sid}" qr = qrcode.QRCode(version=1, error_correction=qrcode.constants.ERROR_CORRECT_H, box_size=5, border=4) - qr.add_data(qrCodeUrl) + qr.add_data(qr_code_url) qr.make(fit=True) img = qr.make_image(fill_color="black", back_color="white") - img.save(qrcode_dir) - if os.path.isfile(qrcode_dir): + img.save(QRCODE_DIR) + if os.path.isfile(QRCODE_DIR): logging.info('二维码生成完成!') break else: @@ -111,8 +131,8 @@ def shutdown(): logging.warning("Too Many Requests 请一小时后重试!") os._exit(0) time.sleep(1) - re_count += 1 - if re_count == 3: + RE_COUNT += 1 + if RE_COUNT == 3: logging.error('二维码生成失败,退出进程') os._exit(1) if args.qrcode_mode == 'web': @@ -122,13 +142,13 @@ def shutdown(): threading.Thread(target=poll_qrcode_status, args=(sid, False)).start() logging.info('请打开阿里云盘扫描此二维码!') qr.print_ascii(invert=True, tty=sys.stdout.isatty()) - while last_status != 1: + while LAST_STATUS != 1: time.sleep(1) - if os.path.isfile(qrcode_dir): - os.remove(qrcode_dir) + if os.path.isfile(QRCODE_DIR): + os.remove(QRCODE_DIR) os._exit(0) else: logging.error('未知的扫码模式') - if os.path.isfile(qrcode_dir): - os.remove(qrcode_dir) + if os.path.isfile(QRCODE_DIR): + os.remove(QRCODE_DIR) os._exit(1) diff --git a/glue_python/aliyuntoken/aliyuntoken.py b/glue_python/aliyuntoken/aliyuntoken.py index e1bb80e2ac..2815688e37 100644 --- a/glue_python/aliyuntoken/aliyuntoken.py +++ b/glue_python/aliyuntoken/aliyuntoken.py @@ -1,25 +1,27 @@ +# pylint: disable=C0114 #!/usr/local/bin/python3 import json import base64 -import requests import time import logging import os import threading import sys -import qrcode import argparse + +import requests +import qrcode from flask import Flask, send_file, render_template, jsonify app = Flask(__name__) logging.basicConfig(level=logging.INFO) -last_status = 0 +LAST_STATUS = 0 if sys.platform.startswith('win32'): - qrcode_dir = 'qrcode.png' + QRCODE_DIR = 'qrcode.png' else: - qrcode_dir= '/aliyuntoken/qrcode.png' + QRCODE_DIR= '/aliyuntoken/qrcode.png' headers = { @@ -35,28 +37,36 @@ "Sec-Fetch-Dest": "empty", "Sec-Fetch-Mode": "cors", "Sec-Fetch-Site": "cross-site", - "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/127.0.0.0 Safari/537.36" + "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/127.0.0.0 Safari/537.36" # noqa: E501 } -def poll_qrcode_status(data, log_print): - global last_status +# pylint: disable=W0603 +def poll_qrcode_status(_data, log_print): + """ + 循环等待扫码 + """ + global LAST_STATUS while True: - re = requests.post('https://api.xhofe.top/alist/ali/ck', json=data, headers=headers) - if re.status_code == 200: - re_data = json.loads(re.text) - if re_data['content']['data']['qrCodeStatus'] == 'CONFIRMED': - h = re_data['content']['data']['bizExt'] + _re = requests.post('https://api.xhofe.top/alist/ali/ck', json=_data, headers=headers, timeout=10) + if _re.status_code == 200: + _re_data = json.loads(_re.text) + if _re_data['content']['data']['qrCodeStatus'] == 'CONFIRMED': + h = _re_data['content']['data']['bizExt'] c = json.loads(base64.b64decode(h).decode('gbk')) refresh_token = c['pds_login_result']['refreshToken'] if sys.platform.startswith('win32'): - with open('mytoken.txt', 'w') as f: + with open('mytoken.txt', 'w', encoding='utf-8') as f: f.write(refresh_token) else: - with open('/data/mytoken.txt', 'w') as f: + with open('/data/mytoken.txt', 'w', encoding='utf-8') as f: f.write(refresh_token) logging.info('扫码成功, refresh_token 已写入文件!') - last_status = 1 + LAST_STATUS = 1 + break + elif _re_data['content']['data']['qrCodeStatus'] == 'EXPIRED': + logging.error('二维码无效或已过期!') + LAST_STATUS = 2 break else: if log_print: @@ -66,19 +76,28 @@ def poll_qrcode_status(data, log_print): @app.route("/") def index(): + """ + 网页扫码首页 + """ return render_template('index.html') @app.route('/image') def serve_image(): - return send_file(qrcode_dir, mimetype='image/png') + """ + 获取二维码图片 + """ + return send_file(QRCODE_DIR, mimetype='image/png') @app.route('/status') def status(): - if last_status == 1: + """ + 扫码状态获取 + """ + if LAST_STATUS == 1: return jsonify({'status': 'success'}) - elif last_status == 2: + elif LAST_STATUS == 2: return jsonify({'status': 'failure'}) else: return jsonify({'status': 'unknown'}) @@ -86,23 +105,27 @@ def status(): @app.route('/shutdown_server', methods=['GET']) def shutdown(): - if os.path.isfile(qrcode_dir): - os.remove(qrcode_dir) + """ + 退出进程 + """ + if os.path.isfile(QRCODE_DIR): + os.remove(QRCODE_DIR) os._exit(0) if __name__ == '__main__': - if os.path.isfile(qrcode_dir): - os.remove(qrcode_dir) + if os.path.isfile(QRCODE_DIR): + os.remove(QRCODE_DIR) parser = argparse.ArgumentParser(description='AliyunPan Refresh Token') parser.add_argument('--qrcode_mode', type=str, required=True, help='扫码模式') args = parser.parse_args() logging.info('二维码生成中...') - re_count = 0 + RE_COUNT = 0 while True: - re = requests.get('https://api.xhofe.top/alist/ali/qr', headers=headers) + re = requests.get('https://api.xhofe.top/alist/ali/qr', headers=headers, timeout=10) if re.status_code == 200: re_data = json.loads(re.content) + # pylint: disable=C0103 t = str(re_data['content']['data']['t']) codeContent = re_data['content']['data']['codeContent'] ck = re_data['content']['data']['ck'] @@ -111,13 +134,13 @@ def shutdown(): qr.add_data(codeContent) qr.make(fit=True) img = qr.make_image(fill_color="black", back_color="white") - img.save(qrcode_dir) - if os.path.isfile(qrcode_dir): + img.save(QRCODE_DIR) + if os.path.isfile(QRCODE_DIR): logging.info('二维码生成完成!') break time.sleep(1) - re_count += 1 - if re_count == 3: + RE_COUNT += 1 + if RE_COUNT == 3: logging.error('二维码生成失败,退出进程') os._exit(1) if args.qrcode_mode == 'web': @@ -127,13 +150,15 @@ def shutdown(): threading.Thread(target=poll_qrcode_status, args=(data, False)).start() logging.info('请打开阿里云盘扫描此二维码!') qr.print_ascii(invert=True, tty=sys.stdout.isatty()) - while last_status != 1: + while LAST_STATUS != 1 and LAST_STATUS != 2: time.sleep(1) - if os.path.isfile(qrcode_dir): - os.remove(qrcode_dir) + if os.path.isfile(QRCODE_DIR): + os.remove(QRCODE_DIR) + if LAST_STATUS == 2: + os._exit(1) os._exit(0) else: logging.error('未知的扫码模式') - if os.path.isfile(qrcode_dir): - os.remove(qrcode_dir) + if os.path.isfile(QRCODE_DIR): + os.remove(QRCODE_DIR) os._exit(1) diff --git a/glue_python/aliyuntoken/aliyuntoken_nn.ci.py b/glue_python/aliyuntoken/aliyuntoken_nn.ci.py index d2dcaf262d..98401250cb 100644 --- a/glue_python/aliyuntoken/aliyuntoken_nn.ci.py +++ b/glue_python/aliyuntoken/aliyuntoken_nn.ci.py @@ -1,45 +1,55 @@ +# pylint: disable=C0114 #!/usr/local/bin/python3 import json import base64 -import requests import time import logging import os import threading import sys -import qrcode import argparse + +import requests +import qrcode from flask import Flask, send_file, render_template, jsonify app = Flask(__name__) logging.basicConfig(level=logging.INFO) -last_status = 0 +LAST_STATUS = 0 if sys.platform.startswith('win32'): - qrcode_dir = 'qrcode.png' + QRCODE_DIR = 'qrcode.png' else: - qrcode_dir= '/aliyuntoken/qrcode.png' + QRCODE_DIR= '/aliyuntoken/qrcode.png' -def poll_qrcode_status(data, log_print): - global last_status +# pylint: disable=W0603 +def poll_qrcode_status(_data, log_print): + """ + 循环等待扫码 + """ + global LAST_STATUS while True: - re = requests.post('https://api-cf.nn.ci/alist/ali/ck', json=data) - if re.status_code == 200: - re_data = json.loads(re.text) - if re_data['content']['data']['qrCodeStatus'] == 'CONFIRMED': - h = re_data['content']['data']['bizExt'] + _re = requests.post('https://api-cf.nn.ci/alist/ali/ck', json=_data, timeout=10) + if _re.status_code == 200: + _re_data = json.loads(_re.text) + if _re_data['content']['data']['qrCodeStatus'] == 'CONFIRMED': + h = _re_data['content']['data']['bizExt'] c = json.loads(base64.b64decode(h).decode('gbk')) refresh_token = c['pds_login_result']['refreshToken'] if sys.platform.startswith('win32'): - with open('mytoken.txt', 'w') as f: + with open('mytoken.txt', 'w', encoding='utf-8') as f: f.write(refresh_token) else: - with open('/data/mytoken.txt', 'w') as f: + with open('/data/mytoken.txt', 'w', encoding='utf-8') as f: f.write(refresh_token) logging.info('扫码成功, refresh_token 已写入文件!') - last_status = 1 + LAST_STATUS = 1 + break + elif _re_data['content']['data']['qrCodeStatus'] == 'EXPIRED': + logging.error('二维码无效或已过期!') + LAST_STATUS = 2 break else: if log_print: @@ -49,19 +59,28 @@ def poll_qrcode_status(data, log_print): @app.route("/") def index(): + """ + 网页扫码首页 + """ return render_template('index.html') @app.route('/image') def serve_image(): - return send_file(qrcode_dir, mimetype='image/png') + """ + 获取二维码图片 + """ + return send_file(QRCODE_DIR, mimetype='image/png') @app.route('/status') def status(): - if last_status == 1: + """ + 扫码状态获取 + """ + if LAST_STATUS == 1: return jsonify({'status': 'success'}) - elif last_status == 2: + elif LAST_STATUS == 2: return jsonify({'status': 'failure'}) else: return jsonify({'status': 'unknown'}) @@ -69,23 +88,27 @@ def status(): @app.route('/shutdown_server', methods=['GET']) def shutdown(): - if os.path.isfile(qrcode_dir): - os.remove(qrcode_dir) + """ + 退出进程 + """ + if os.path.isfile(QRCODE_DIR): + os.remove(QRCODE_DIR) os._exit(0) if __name__ == '__main__': - if os.path.isfile(qrcode_dir): - os.remove(qrcode_dir) + if os.path.isfile(QRCODE_DIR): + os.remove(QRCODE_DIR) parser = argparse.ArgumentParser(description='AliyunPan Refresh Token') parser.add_argument('--qrcode_mode', type=str, required=True, help='扫码模式') args = parser.parse_args() logging.info('二维码生成中...') - re_count = 0 + RE_COUNT = 0 while True: - re = requests.get('https://api-cf.nn.ci/alist/ali/qr') + re = requests.get('https://api-cf.nn.ci/alist/ali/qr', timeout=10) if re.status_code == 200: re_data = json.loads(re.content) + # pylint: disable=C0103 t = str(re_data['content']['data']['t']) codeContent = re_data['content']['data']['codeContent'] ck = re_data['content']['data']['ck'] @@ -94,13 +117,13 @@ def shutdown(): qr.add_data(codeContent) qr.make(fit=True) img = qr.make_image(fill_color="black", back_color="white") - img.save(qrcode_dir) - if os.path.isfile(qrcode_dir): + img.save(QRCODE_DIR) + if os.path.isfile(QRCODE_DIR): logging.info('二维码生成完成!') break time.sleep(1) - re_count += 1 - if re_count == 3: + RE_COUNT += 1 + if RE_COUNT == 3: logging.error('二维码生成失败,退出进程') os._exit(1) if args.qrcode_mode == 'web': @@ -110,13 +133,15 @@ def shutdown(): threading.Thread(target=poll_qrcode_status, args=(data, False)).start() logging.info('请打开阿里云盘扫描此二维码!') qr.print_ascii(invert=True, tty=sys.stdout.isatty()) - while last_status != 1: + while LAST_STATUS != 1 and LAST_STATUS != 2: time.sleep(1) - if os.path.isfile(qrcode_dir): - os.remove(qrcode_dir) + if os.path.isfile(QRCODE_DIR): + os.remove(QRCODE_DIR) + if LAST_STATUS == 2: + os._exit(1) os._exit(0) else: logging.error('未知的扫码模式') - if os.path.isfile(qrcode_dir): - os.remove(qrcode_dir) + if os.path.isfile(QRCODE_DIR): + os.remove(QRCODE_DIR) os._exit(1) diff --git a/glue_python/aliyuntoken/aliyuntoken_vercel.py b/glue_python/aliyuntoken/aliyuntoken_vercel.py index d0c7c61195..d41f0a1468 100644 --- a/glue_python/aliyuntoken/aliyuntoken_vercel.py +++ b/glue_python/aliyuntoken/aliyuntoken_vercel.py @@ -1,48 +1,54 @@ +# pylint: disable=C0114 #!/usr/local/bin/python3 import json -import requests import time import logging import os import threading import sys import argparse + +import requests import qrcode from flask import Flask, send_file, render_template, jsonify app = Flask(__name__) logging.basicConfig(level=logging.INFO) -last_status = 0 +LAST_STATUS = 0 if sys.platform.startswith('win32'): - qrcode_dir = 'qrcode.png' + QRCODE_DIR = 'qrcode.png' else: - qrcode_dir= '/aliyuntoken/qrcode.png' + QRCODE_DIR= '/aliyuntoken/qrcode.png' -def poll_qrcode_status(data, log_print): - global last_status - ck = str(data['ck']) - t = str(data['t']) +# pylint: disable=W0603 +def poll_qrcode_status(_data, log_print): + """ + 循环等待扫码 + """ + global LAST_STATUS + _ck = str(_data['ck']) + _t = str(_data['t']) while True: - re = requests.get(f'https://aliyuntoken.vercel.app/api/state-query?ck={ck}&t={t}') - if re.status_code == 200: - re_data = json.loads(re.text) - if re_data['data']['qrCodeStatus'] == 'CONFIRMED': - refresh_token = re_data['data']['bizExt']['pds_login_result']['refreshToken'] + _re = requests.get(f'https://aliyuntoken.vercel.app/api/state-query?ck={_ck}&t={_t}', timeout=10) + if _re.status_code == 200: + _re_data = json.loads(_re.text) + if _re_data['data']['qrCodeStatus'] == 'CONFIRMED': + refresh_token = _re_data['data']['bizExt']['pds_login_result']['refreshToken'] if sys.platform.startswith('win32'): - with open('mytoken.txt', 'w') as f: + with open('mytoken.txt', 'w', encoding='utf-8') as f: f.write(refresh_token) else: - with open('/data/mytoken.txt', 'w') as f: + with open('/data/mytoken.txt', 'w', encoding='utf-8') as f: f.write(refresh_token) logging.info('扫码成功, refresh_token 已写入文件!') - last_status = 1 + LAST_STATUS = 1 break - elif re_data['data']['qrCodeStatus'] == 'EXPIRED': + elif _re_data['data']['qrCodeStatus'] == 'EXPIRED': logging.error('二维码无效或已过期!') - last_status = 2 + LAST_STATUS = 2 break else: if log_print: @@ -52,19 +58,28 @@ def poll_qrcode_status(data, log_print): @app.route("/") def index(): + """ + 网页扫码首页 + """ return render_template('index.html') @app.route('/image') def serve_image(): - return send_file(qrcode_dir, mimetype='image/png') + """ + 获取二维码图片 + """ + return send_file(QRCODE_DIR, mimetype='image/png') @app.route('/status') def status(): - if last_status == 1: + """ + 扫码状态获取 + """ + if LAST_STATUS == 1: return jsonify({'status': 'success'}) - elif last_status == 2: + elif LAST_STATUS == 2: return jsonify({'status': 'failure'}) else: return jsonify({'status': 'unknown'}) @@ -72,23 +87,27 @@ def status(): @app.route('/shutdown_server', methods=['GET']) def shutdown(): - if os.path.isfile(qrcode_dir): - os.remove(qrcode_dir) + """ + 退出进程 + """ + if os.path.isfile(QRCODE_DIR): + os.remove(QRCODE_DIR) os._exit(0) if __name__ == '__main__': - if os.path.isfile(qrcode_dir): - os.remove(qrcode_dir) + if os.path.isfile(QRCODE_DIR): + os.remove(QRCODE_DIR) parser = argparse.ArgumentParser(description='AliyunPan Refresh Token') parser.add_argument('--qrcode_mode', type=str, required=True, help='扫码模式') args = parser.parse_args() logging.info('二维码生成中...') - re_count = 0 + RE_COUNT = 0 while True: - re = requests.get('https://aliyuntoken.vercel.app/api/generate') + re = requests.get('https://aliyuntoken.vercel.app/api/generate', timeout=10) if re.status_code == 200: re_data = json.loads(re.content) + # pylint: disable=C0103 t = str(re_data['t']) codeContent = re_data['codeContent'] ck = re_data['ck'] @@ -97,13 +116,13 @@ def shutdown(): qr.add_data(codeContent) qr.make(fit=True) img = qr.make_image(fill_color="black", back_color="white") - img.save(qrcode_dir) - if os.path.isfile(qrcode_dir): + img.save(QRCODE_DIR) + if os.path.isfile(QRCODE_DIR): logging.info('二维码生成完成!') break time.sleep(1) - re_count += 1 - if re_count == 3: + RE_COUNT += 1 + if RE_COUNT == 3: logging.error('二维码生成失败,退出进程') os._exit(1) if args.qrcode_mode == 'web': @@ -112,15 +131,15 @@ def shutdown(): elif args.qrcode_mode == 'shell': threading.Thread(target=poll_qrcode_status, args=(data, False)).start() qr.print_ascii(invert=True, tty=sys.stdout.isatty()) - while last_status != 1 and last_status != 2: + while LAST_STATUS != 1 and LAST_STATUS != 2: time.sleep(1) - if os.path.isfile(qrcode_dir): - os.remove(qrcode_dir) - if last_status == 2: + if os.path.isfile(QRCODE_DIR): + os.remove(QRCODE_DIR) + if LAST_STATUS == 2: os._exit(1) os._exit(0) else: logging.error('未知的扫码模式') - if os.path.isfile(qrcode_dir): - os.remove(qrcode_dir) + if os.path.isfile(QRCODE_DIR): + os.remove(QRCODE_DIR) os._exit(1) diff --git a/glue_python/aliyuntvtoken/alitoken2.py b/glue_python/aliyuntvtoken/alitoken2.py index 95f03c462d..5c9690c53f 100644 --- a/glue_python/aliyuntvtoken/alitoken2.py +++ b/glue_python/aliyuntvtoken/alitoken2.py @@ -1,31 +1,34 @@ +# pylint: disable=C0114 +# pylint: disable=C0116 +# pylint: disable=C0103 #!/usr/local/bin/python3 -from flask import Flask, jsonify, render_template, request -from Crypto.Cipher import AES -from Crypto.Util.Padding import unpad -import requests import time import os import logging import json import uuid import hashlib -import time import base64 import random import argparse -import qrcode import sys +import requests +import qrcode +from flask import Flask, jsonify, render_template, request +from Crypto.Cipher import AES +from Crypto.Util.Padding import unpad + logging.basicConfig(level=logging.INFO) app = Flask(__name__) -timestamp = str(requests.get('http://api.extscreen.com/timestamp').json()['data']['timestamp']) +timestamp = str(requests.get('http://api.extscreen.com/timestamp', timeout=10).json()['data']['timestamp']) unique_id = uuid.uuid4().hex wifimac = str(random.randint(10**11, 10**12 - 1)) headers = { "token": "6733b42e28cdba32", - 'User-Agent': 'Mozilla/5.0 (Linux; U; Android 9; zh-cn; SM-S908E Build/TP1A.220624.014) AppleWebKit/533.1 (KHTML, like Gecko) Mobile Safari/533.1', + 'User-Agent': 'Mozilla/5.0 (Linux; U; Android 9; zh-cn; SM-S908E Build/TP1A.220624.014) AppleWebKit/533.1 (KHTML, like Gecko) Mobile Safari/533.1', # noqa: E501 'Host': 'api.extscreen.com' } @@ -33,14 +36,14 @@ def h(char_array, modifier): unique_chars = list(dict.fromkeys(char_array)) numeric_modifier = int(modifier[7:]) - transformed_string = "".join([chr(abs(ord(c) - (numeric_modifier % 127) - 1) + 33 if abs(ord(c) - (numeric_modifier % 127) - 1) < 33 else + transformed_string = "".join([chr(abs(ord(c) - (numeric_modifier % 127) - 1) + 33 if abs(ord(c) - (numeric_modifier % 127) - 1) < 33 else # noqa: E501 abs(ord(c) - (numeric_modifier % 127) - 1)) for c in unique_chars]) return transformed_string -def decrypt(ciphertext, iv, t, unique_id, wifimac): +def decrypt(ciphertext, iv, t, _unique_id, _wifimac): try: - key = generate_key(t, unique_id, wifimac) + key = generate_key(t, _unique_id, _wifimac) cipher = AES.new(key.encode('utf-8'), AES.MODE_CBC, iv = bytes.fromhex(iv)) decrypted = unpad(cipher.decrypt(base64.b64decode(ciphertext)), AES.block_size) dec = decrypted.decode('utf-8') @@ -50,23 +53,23 @@ def decrypt(ciphertext, iv, t, unique_id, wifimac): raise error -def get_params(t, unique_id, wifimac): +def get_params(t, _unique_id, _wifimac): params = { 'akv': '2.8.1496', 'apv': '1.3.8', 'b': 'samsung', - 'd': unique_id, + 'd': _unique_id, 'm': 'SM-S908E', 'mac': '', 'n': 'SM-S908E', 't': t, - 'wifiMac': wifimac, + 'wifiMac': _wifimac, } return params -def generate_key(t, unique_id, wifimac): - params = get_params(t, unique_id, wifimac) +def generate_key(t, _unique_id, _wifimac): + params = get_params(t, _unique_id, _wifimac) sorted_keys = sorted(params.keys()) concatenated_params = "".join([params[key] for key in sorted_keys if key != "t"]) hashed_key = h(list(concatenated_params), t) @@ -78,27 +81,27 @@ def get_qrcode_url(): 'scopes': ','.join(["user:base", "file:all:read", "file:all:write"]), "width": 500, "height": 500, - }, headers={**get_params(timestamp, unique_id, wifimac), **headers}).json()['data'] - qr_link = "https://www.aliyundrive.com/o/oauth/authorize?sid=" + data['sid'] - return {'qr_link': qr_link, 'sid': data['sid']} + }, headers={**get_params(timestamp, unique_id, wifimac), **headers}, timeout=10).json()['data'] + _qr_link = "https://www.aliyundrive.com/o/oauth/authorize?sid=" + data['sid'] + return {'qr_link': _qr_link, 'sid': data['sid']} -def check_qrcode_status(sid): +def check_qrcode_status(_sid): status = 'NotLoggedIn' - auth_code = None + _auth_code = None while status != 'LoginSuccess': time.sleep(3) - result = requests.get(f'https://openapi.alipan.com/oauth/qrcode/{sid}/status').json() + result = requests.get(f'https://openapi.alipan.com/oauth/qrcode/{_sid}/status', timeout=10).json() status = result['status'] if status == 'LoginSuccess': - auth_code = result['authCode'] - return {'auth_code': auth_code} + _auth_code = result['authCode'] + return {'auth_code': _auth_code} def get_token(code): token_data = requests.post('http://api.extscreen.com/aliyundrive/v3/token', data={ 'code': code, - }, headers={**get_params(timestamp, unique_id, wifimac), **headers}).json()['data'] + }, headers={**get_params(timestamp, unique_id, wifimac), **headers}, timeout=10).json()['data'] ciphertext = token_data['ciphertext'] iv = token_data['iv'] token_data = decrypt(ciphertext, iv, timestamp, unique_id, wifimac) @@ -108,10 +111,10 @@ def get_token(code): file_path = "" else: file_path = "/data/" - with open(f"{file_path}myopentoken.txt", "w") as file: + with open(f"{file_path}myopentoken.txt", "w", encoding='utf-8') as file: file.write(refresh_token) logging.info('myopentoken.txt 文件更新成功!') - with open(f"{file_path}open_tv_token_url.txt", "w") as file: + with open(f"{file_path}open_tv_token_url.txt", "w", encoding='utf-8') as file: file.write("https://alipan-tv-token.pages.dev/refresh") logging.info('open_tv_token_url.txt 文件更新成功!') @@ -133,8 +136,8 @@ def check_qrcode(sid): @app.route('/get_tokens', methods=['POST']) def get_tokens(): - auth_code = request.json.get('auth_code') - get_token(auth_code) + _auth_code = request.json.get('auth_code') + get_token(_auth_code) return jsonify({'status': 'completed'}) @@ -152,13 +155,13 @@ def shutdown(): elif args.qrcode_mode == 'shell': date = get_qrcode_url() qr_link = date['qr_link'] - sid = date['sid'] + _sid = date['sid'] qr = qrcode.QRCode(version=1, error_correction=qrcode.constants.ERROR_CORRECT_H, box_size=5, border=4) qr.add_data(qr_link) qr.make(fit=True) logging.info('请打开阿里云盘扫描此二维码!') qr.print_ascii(invert=True, tty=sys.stdout.isatty()) - auth_code = check_qrcode_status(sid)['auth_code'] + auth_code = check_qrcode_status(_sid)['auth_code'] get_token(auth_code) else: logging.error('未知的扫码模式') diff --git a/glue_python/get_folder_id/get_folder_id.py b/glue_python/get_folder_id/get_folder_id.py index 6dc56bf408..901426f0b1 100644 --- a/glue_python/get_folder_id/get_folder_id.py +++ b/glue_python/get_folder_id/get_folder_id.py @@ -1,37 +1,48 @@ +# pylint: disable=C0114 #!/usr/local/bin/python3 import os import json import logging -import requests import argparse + +import requests from aligo import Aligo def write_to_file(file_path, data): - with open(file_path, 'w') as file: + """ + 数据写入文件 + """ + with open(file_path, 'w', encoding='utf-8') as file: file.write(data) def get_refresh_token(path): - with open(f'{path}/mytoken.txt', 'r') as file: + """ + 获取 refresh_token + """ + with open(f'{path}/mytoken.txt', encoding='utf-8') as file: return file.readline().strip() def update_refresh_token(path): + """ + 更新 refresh_token + """ file_path = os.path.join(os.path.expanduser("~"), ".aligo", "aligo.json") try: - with open(file_path, 'r') as file: + with open(file_path, encoding='utf-8') as file: data = json.load(file) aligo_refresh_token = data.get("refresh_token") if not aligo_refresh_token: logging.error("读取 Refresh Token 失败") return False - except Exception as e: - logging.error(f"读取 Refresh Token 发生错误:{e}") + except Exception as e: # pylint: disable=W0718 + logging.error("读取 Refresh Token 发生错误:%s", e) return False headers = { - "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/94.0.4606.54 Safari/537.36", + "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/94.0.4606.54 Safari/537.36", # noqa: E501 "Content-Type": "application/json", "Referer": "https://www.aliyundrive.com/" } @@ -40,12 +51,12 @@ def update_refresh_token(path): "grant_type": "refresh_token" } try: - response = requests.post("https://auth.aliyundrive.com/v2/account/token", headers=headers, json=data) + response = requests.post("https://auth.aliyundrive.com/v2/account/token", headers=headers, json=data, timeout=10) # noqa: E501 response.raise_for_status() - refresh_token = response.json().get("refresh_token") - if refresh_token: - with open(f"{path}/mytoken.txt", "w") as file: - file.write(refresh_token) + _refresh_token = response.json().get("refresh_token") + if _refresh_token: + with open(f"{path}/mytoken.txt", "w", encoding='utf-8') as file: + file.write(_refresh_token) logging.info("刷新 Refresh Token 成功") return True else: @@ -57,23 +68,26 @@ def update_refresh_token(path): def get_folder_id(client, drive_mode): + """ + 获取 folder id + """ if drive_mode == 'r': v2_user = client.v2_user_get() resource_drive_id = v2_user.resource_drive_id client.default_drive_id = resource_drive_id file_list = client.get_file_list() - folder_id = '' + _folder_id = '' for folder in file_list: if folder.name == '小雅转存文件夹': - folder_id = folder.file_id + _folder_id = folder.file_id break - if not folder_id: + if not _folder_id: try: - folder_id = client.create_folder(name='小雅转存文件夹').file_id - except Exception as e: - logging.error(f"创建 小雅转存文件夹 失败:{e}") + _folder_id = client.create_folder(name='小雅转存文件夹').file_id + except Exception as e: # pylint: disable=W0718 + logging.error("创建 小雅转存文件夹 失败:%s", e) return None - return folder_id + return _folder_id if __name__ == '__main__': @@ -85,11 +99,11 @@ def get_folder_id(client, drive_mode): refresh_token = get_refresh_token(args.data_path) try: ali = Aligo(refresh_token=refresh_token) - except Exception as e: - logging.error(f"登入阿里云盘失败:{e}") + except Exception as e: # pylint: disable=W0718 + logging.error("登入阿里云盘失败:%s", e) folder_id = get_folder_id(ali, args.drive_mode) if folder_id is not None: - logging.info(f'阿里云盘转存目录 folder id: {folder_id}') + logging.info('阿里云盘转存目录 folder id: %s', folder_id) write_to_file(f'{args.data_path}/temp_transfer_folder_id.txt', folder_id) write_to_file(f'{args.data_path}/folder_type.txt', args.drive_mode) else: diff --git a/glue_python/quark_cookie/quark_cookie.py b/glue_python/quark_cookie/quark_cookie.py index b82e266939..e20cc48f06 100644 --- a/glue_python/quark_cookie/quark_cookie.py +++ b/glue_python/quark_cookie/quark_cookie.py @@ -1,59 +1,68 @@ +# pylint: disable=C0114 #!/usr/local/bin/python3 import json -import requests import time import logging import os import threading import sys -import qrcode import argparse import uuid + +import requests +import qrcode from flask import Flask, send_file, render_template, jsonify app = Flask(__name__) logging.basicConfig(level=logging.INFO) -last_status = 0 +LAST_STATUS = 0 stop_event = threading.Event() if sys.platform.startswith('win32'): - qrcode_dir = 'qrcode.png' + QRCODE_DIR = 'qrcode.png' else: - qrcode_dir= '/quark_cookie/qrcode.png' + QRCODE_DIR = '/quark_cookie/qrcode.png' def cookiejar_to_string(cookiejar): + """ + 转换 Cookie 格式 + """ cookie_string = "" for cookie in cookiejar: cookie_string += cookie.name + "=" + cookie.value + "; " return cookie_string.strip('; ') -def poll_qrcode_status(stop, log_print): - global last_status +# pylint: disable=W0603 +def poll_qrcode_status(_token, stop, log_print): + """ + 循环等待扫码 + """ + global LAST_STATUS while not stop.is_set(): - re = requests.get(f'https://uop.quark.cn/cas/ajax/getServiceTicketByQrcodeToken?client_id=532&v=1.2&token={token}&request_id={uuid.uuid4()}') - if re.status_code == 200: - re_data = json.loads(re.text) + _re = requests.get(f'https://uop.quark.cn/cas/ajax/getServiceTicketByQrcodeToken?client_id=532&v=1.2&token={_token}&request_id={uuid.uuid4()}', timeout=10) # noqa: E501 + if _re.status_code == 200: + re_data = json.loads(_re.text) if re_data['status'] == 2000000: logging.info('扫码成功!') service_ticket = re_data['data']['members']['service_ticket'] - re = requests.get(f'https://pan.quark.cn/account/info?st={service_ticket}&lw=scan') - if re.status_code == 200: - quark_cookie = cookiejar_to_string(re.cookies) + _re = requests.get(f'https://pan.quark.cn/account/info?st={service_ticket}&lw=scan', timeout=10) + if _re.status_code == 200: + quark_cookie = cookiejar_to_string(_re.cookies) if sys.platform.startswith('win32'): - with open('quark_cookie.txt', 'w') as f: + with open('quark_cookie.txt', 'w', encoding='utf-8') as f: f.write(quark_cookie) else: - with open('/data/quark_cookie.txt', 'w') as f: + with open('/data/quark_cookie.txt', 'w', encoding='utf-8') as f: f.write(quark_cookie) logging.info('扫码成功,夸克 Cookie 已写入文件!') - last_status = 1 + LAST_STATUS = 1 break elif re_data['status'] == 50004002: logging.error('二维码无效或已过期!') - last_status = 2 + LAST_STATUS = 2 break elif re_data['status'] == 50004001: if log_print: @@ -63,19 +72,28 @@ def poll_qrcode_status(stop, log_print): @app.route("/") def index(): + """ + 网页扫码首页 + """ return render_template('index.html') @app.route('/image') def serve_image(): - return send_file(qrcode_dir, mimetype='image/png') + """ + 获取二维码图片 + """ + return send_file(QRCODE_DIR, mimetype='image/png') @app.route('/status') def status(): - if last_status == 1: + """ + 扫码状态获取 + """ + if LAST_STATUS == 1: return jsonify({'status': 'success'}) - elif last_status == 2: + elif LAST_STATUS == 2: return jsonify({'status': 'failure'}) else: return jsonify({'status': 'unknown'}) @@ -83,55 +101,58 @@ def status(): @app.route('/shutdown_server', methods=['GET']) def shutdown(): - if os.path.isfile(qrcode_dir): - os.remove(qrcode_dir) + """ + 退出进程 + """ + if os.path.isfile(QRCODE_DIR): + os.remove(QRCODE_DIR) os._exit(0) if __name__ == '__main__': - if os.path.isfile(qrcode_dir): - os.remove(qrcode_dir) + if os.path.isfile(QRCODE_DIR): + os.remove(QRCODE_DIR) parser = argparse.ArgumentParser(description='Quark Cookie') parser.add_argument('--qrcode_mode', type=str, required=True, help='扫码模式') args = parser.parse_args() logging.info('二维码生成中...') - re = requests.get(f'https://uop.quark.cn/cas/ajax/getTokenForQrcodeLogin?client_id=532&v=1.2&request_id={uuid.uuid4()}') + re = requests.get(f'https://uop.quark.cn/cas/ajax/getTokenForQrcodeLogin?client_id=532&v=1.2&request_id={uuid.uuid4()}', timeout=10) # noqa: E501 if re.status_code == 200: token = json.loads(re.text)['data']['members']['token'] qr = qrcode.QRCode(version=1, error_correction=qrcode.constants.ERROR_CORRECT_H, box_size=5, border=4) qr.add_data(f'https://su.quark.cn/4_eMHBJ?token={token}&client_id=532&ssb=weblogin&uc_param_str=&uc_biz_str=S%3Acustom%7COPT%3ASAREA%400%7COPT%3AIMMERSIVE%401%7COPT%3ABACK_BTN_STYLE%400') qr.make(fit=True) img = qr.make_image(fill_color="black", back_color="white") - img.save(qrcode_dir) + img.save(QRCODE_DIR) logging.info('二维码生成完成!') else: logging.error('二维码生成失败,退出进程') os._exit(1) try: if args.qrcode_mode == 'web': - wait_status = threading.Thread(target=poll_qrcode_status, args=(stop_event,True,)) + wait_status = threading.Thread(target=poll_qrcode_status, args=(token,stop_event,True,)) wait_status.start() app.run(host='0.0.0.0', port=34256) elif args.qrcode_mode == 'shell': - wait_status = threading.Thread(target=poll_qrcode_status, args=(stop_event,False,)) + wait_status = threading.Thread(target=poll_qrcode_status, args=(token,stop_event,False,)) wait_status.start() logging.info('请打开 夸克 APP 扫描此二维码!') qr.print_ascii(invert=True, tty=sys.stdout.isatty()) - while last_status != 1 and last_status != 2: + while LAST_STATUS != 1 and LAST_STATUS != 2: time.sleep(1) - if os.path.isfile(qrcode_dir): - os.remove(qrcode_dir) - if last_status == 2: + if os.path.isfile(QRCODE_DIR): + os.remove(QRCODE_DIR) + if LAST_STATUS == 2: os._exit(1) os._exit(0) else: logging.error('未知的扫码模式') - if os.path.isfile(qrcode_dir): - os.remove(qrcode_dir) + if os.path.isfile(QRCODE_DIR): + os.remove(QRCODE_DIR) os._exit(1) except KeyboardInterrupt: - if os.path.isfile(qrcode_dir): - os.remove(qrcode_dir) + if os.path.isfile(QRCODE_DIR): + os.remove(QRCODE_DIR) stop_event.set() wait_status.join() os._exit(0) diff --git a/glue_python/uc_cookie/uc_cookie.py b/glue_python/uc_cookie/uc_cookie.py index db3ef8d23c..f89b212a28 100644 --- a/glue_python/uc_cookie/uc_cookie.py +++ b/glue_python/uc_cookie/uc_cookie.py @@ -1,66 +1,75 @@ +# pylint: disable=C0114 #!/usr/local/bin/python3 -import json -import requests import time import logging import os import threading import sys -import qrcode import argparse import random + +import json +import requests +import qrcode from flask import Flask, send_file, render_template, jsonify app = Flask(__name__) logging.basicConfig(level=logging.INFO) -last_status = 0 +LAST_STATUS = 0 stop_event = threading.Event() if sys.platform.startswith('win32'): - qrcode_dir = 'qrcode.png' + QRCODE_DIR = 'qrcode.png' else: - qrcode_dir= '/uc_cookie/qrcode.png' + QRCODE_DIR= '/uc_cookie/qrcode.png' def cookiejar_to_string(cookiejar): + """ + 转换 Cookie 格式 + """ cookie_string = "" for cookie in cookiejar: cookie_string += cookie.name + "=" + cookie.value + "; " return cookie_string.strip('; ') -def poll_qrcode_status(stop, token, log_print): - global last_status +# pylint: disable=W0603 +def poll_qrcode_status(stop, _token, log_print): + """ + 循环等待扫码 + """ + global LAST_STATUS while not stop.is_set(): __t = int(time.time() * 1000) - data = { + _data = { 'client_id': 381, 'v': 1.2, 'request_id': __t, - 'token': token + 'token': _token } - re = requests.post(f'https://api.open.uc.cn/cas/ajax/getServiceTicketByQrcodeToken?__dt={random.randint(100, 999)}&__t={__t}', data=data) - if re.status_code == 200: - re_data = json.loads(re.content) + _re = requests.post(f'https://api.open.uc.cn/cas/ajax/getServiceTicketByQrcodeToken?__dt={random.randint(100, 999)}&__t={__t}', data=_data, timeout=10) # noqa: E501 + if _re.status_code == 200: + re_data = json.loads(_re.content) if re_data['status'] == 2000000: logging.info('扫码成功!') service_ticket = re_data['data']['members']['service_ticket'] - re = requests.get(f'https://drive.uc.cn/account/info?st={service_ticket}') - if re.status_code == 200: - uc_cookie = cookiejar_to_string(re.cookies) + _re = requests.get(f'https://drive.uc.cn/account/info?st={service_ticket}', timeout=10) + if _re.status_code == 200: + uc_cookie = cookiejar_to_string(_re.cookies) if sys.platform.startswith('win32'): - with open('uc_cookie.txt', 'w') as f: + with open('uc_cookie.txt', 'w', encoding='utf-8') as f: f.write(uc_cookie) else: - with open('/data/uc_cookie.txt', 'w') as f: + with open('/data/uc_cookie.txt', 'w', encoding='utf-8') as f: f.write(uc_cookie) logging.info('扫码成功,UC Cookie 已写入文件!') - last_status = 1 + LAST_STATUS = 1 break elif re_data['status'] == 50004002: logging.error('二维码无效或已过期!') - last_status = 2 + LAST_STATUS = 2 break elif re_data['status'] == 50004001: if log_print: @@ -70,19 +79,28 @@ def poll_qrcode_status(stop, token, log_print): @app.route("/") def index(): + """ + 网页扫码首页 + """ return render_template('index.html') @app.route('/image') def serve_image(): - return send_file(qrcode_dir, mimetype='image/png') + """ + 获取二维码图片 + """ + return send_file(QRCODE_DIR, mimetype='image/png') @app.route('/status') def status(): - if last_status == 1: + """ + 扫码状态获取 + """ + if LAST_STATUS == 1: return jsonify({'status': 'success'}) - elif last_status == 2: + elif LAST_STATUS == 2: return jsonify({'status': 'failure'}) else: return jsonify({'status': 'unknown'}) @@ -90,14 +108,17 @@ def status(): @app.route('/shutdown_server', methods=['GET']) def shutdown(): - if os.path.isfile(qrcode_dir): - os.remove(qrcode_dir) + """ + 退出进程 + """ + if os.path.isfile(QRCODE_DIR): + os.remove(QRCODE_DIR) os._exit(0) if __name__ == '__main__': - if os.path.isfile(qrcode_dir): - os.remove(qrcode_dir) + if os.path.isfile(QRCODE_DIR): + os.remove(QRCODE_DIR) parser = argparse.ArgumentParser(description='UC Cookie') parser.add_argument('--qrcode_mode', type=str, required=True, help='扫码模式') args = parser.parse_args() @@ -108,14 +129,14 @@ def shutdown(): 'v': 1.2, 'request_id': __t } - re = requests.post(f'https://api.open.uc.cn/cas/ajax/getTokenForQrcodeLogin?__dt={random.randint(100, 999)}&__t={__t}', data=data) + re = requests.post(f'https://api.open.uc.cn/cas/ajax/getTokenForQrcodeLogin?__dt={random.randint(100, 999)}&__t={__t}', data=data, timeout=10) # noqa: E501 if re.status_code == 200: token = json.loads(re.content)['data']['members']['token'] qr = qrcode.QRCode(version=1, error_correction=qrcode.constants.ERROR_CORRECT_H, box_size=5, border=4) qr.add_data(f'https://su.uc.cn/1_n0ZCv?uc_param_str=dsdnfrpfbivesscpgimibtbmnijblauputogpintnwktprchmt&token={token}&client_id=381&uc_biz_str=S%3Acustom%7CC%3Atitlebar_fix') qr.make(fit=True) img = qr.make_image(fill_color="black", back_color="white") - img.save(qrcode_dir) + img.save(QRCODE_DIR) logging.info('二维码生成完成!') else: logging.error('二维码生成失败,退出进程') @@ -130,21 +151,21 @@ def shutdown(): wait_status.start() logging.info('请打开 UC浏览器 APP 扫描此二维码!') qr.print_ascii(invert=True, tty=sys.stdout.isatty()) - while last_status != 1 and last_status != 2: + while LAST_STATUS != 1 and LAST_STATUS != 2: time.sleep(1) - if os.path.isfile(qrcode_dir): - os.remove(qrcode_dir) - if last_status == 2: + if os.path.isfile(QRCODE_DIR): + os.remove(QRCODE_DIR) + if LAST_STATUS == 2: os._exit(1) os._exit(0) else: logging.error('未知的扫码模式') - if os.path.isfile(qrcode_dir): - os.remove(qrcode_dir) + if os.path.isfile(QRCODE_DIR): + os.remove(QRCODE_DIR) os._exit(1) except KeyboardInterrupt: - if os.path.isfile(qrcode_dir): - os.remove(qrcode_dir) + if os.path.isfile(QRCODE_DIR): + os.remove(QRCODE_DIR) stop_event.set() wait_status.join() os._exit(0) diff --git a/ruff.toml b/ruff.toml new file mode 100644 index 0000000000..5f7b95110a --- /dev/null +++ b/ruff.toml @@ -0,0 +1,38 @@ +exclude = [ + ".bzr", + ".direnv", + ".eggs", + ".git", + ".git-rewrite", + ".hg", + ".ipynb_checkpoints", + ".mypy_cache", + ".nox", + ".pants.d", + ".pyenv", + ".pytest_cache", + ".pytype", + ".ruff_cache", + ".svn", + ".tox", + ".venv", + ".vscode", + "__pypackages__", + "_build", + "buck-out", + "build", + "dist", + "node_modules", + "site-packages", + "venv", +] +line-length = 120 +target-version = "py312" + +[lint] +select = [ + "F", + "E", + "W", + "UP", +]