Skip to content

Commit

Permalink
✨ Support for unit tests.
Browse files Browse the repository at this point in the history
  • Loading branch information
AirportR committed Mar 30, 2024
1 parent f692592 commit df2fdab
Show file tree
Hide file tree
Showing 8 changed files with 318 additions and 365 deletions.
217 changes: 2 additions & 215 deletions botmodule/init_bot.py
Original file line number Diff line number Diff line change
@@ -1,223 +1,10 @@
import asyncio
import os
import sys
from pathlib import Path
from subprocess import check_output

from loguru import logger
from utils.cleaner import config, unzip_targz, unzip
from utils import HOME_DIR
from utils.collector import get_latest_tag, Download, DownloadError
from utils.cleaner import config
from utils.init import check_init, Init

admin = config.getAdmin() # 管理员


def check_args():
import argparse
help_text_socks5 = "设置socks5代理,bot代理使用的这个\n格式--> host:端口:用户名:密码\t用户名和密码可省略"
help_text_http = "设置HTTP代理,拉取订阅用的。\n格式--> 用户名:密码@host:端口\t@符号前面的用户名和密码如果不设置可省略"
help_f = "强制覆盖原先的mybot.session文件,重新生成"
parser = argparse.ArgumentParser(description="FullTClash命令行快速启动,其中api_id,api_hash,bot_token要么不填,要么全部填完")
parser.add_argument("-r", action='store_true', help=help_f)
parser.add_argument("-ah", "--api-hash", required=False, type=str, help="自定义api-hash")
parser.add_argument("-ai", "--api-id", required=False, type=int, help="自定义api-id")
parser.add_argument("-b", "--bot-token", required=False, type=str, help="自定义bot-token")
parser.add_argument("-ps5", "--proxy-socks5", required=False, type=str, help=help_text_socks5)
parser.add_argument("-http", "--proxy-http", required=False, type=str, help=help_text_http)
parser.add_argument("-su", "--admin", required=False, help="设置bot的管理员,多个管理员以 , 分隔")
parser.add_argument("-d", "--path", required=False, type=str, help="设置代理客户端路径")

args = parser.parse_args()
if args.r:
if os.path.exists("./my_bot.session"):
logger.info("检测到启用session文件覆写选项")
try:
os.remove("my_bot.session")
logger.info("已移除my_bot.session")
except Exception as e2:
logger.error(f"session文件移除失败:{e2}")
if args.path:
config.yaml['clash'] = config.config.get('clash', {}).setdefault('path', str(args.d))
if args.admin:
adminlist = str(args.admin).split(",")
logger.info(f"即将添加管理员:{adminlist}")
config.add_admin(adminlist)
if args.proxy_http:
config.yaml['proxy'] = str(args.proxy_http)
logger.info("从命令行参数中设置HTTP代理")
if args.api_hash and args.api_id and args.bot_token:
tempconf = {
"api_hash": args.api_hash,
"api_id": args.api_id,
"bot_token": args.bot_token,
"proxy": args.proxy_socks5,
}
bot_conf = config.getBotconfig()
bot_conf.update(tempconf)
config.yaml['bot'] = bot_conf
logger.info("从命令行参数中设置api_hash,api_id,bot_token")
if args.proxy_socks5:
logger.info("从命令行参数中设置bot的socks5代理")
i = config.reload()
if i:
logger.info("已覆写配置文件。")
else:
logger.warning("覆写配置失败!")


class Init:
repo_owner = "AirportR"
repo_name = "fulltclash"
ftcore_owner = repo_owner
ftcore_name = "FullTCore"

@staticmethod
def init_user():
config.add_user(admin) # 管理员同时也是用户
config.reload()

@staticmethod
def init_emoji():
emoji_source = config.config.get('emoji', {}).get('emoji-source', 'TwemojiLocalSource')
if config.config.get('emoji', {}).get('enable', True) and emoji_source == 'TwemojiLocalSource':
from utils.emoji_custom import TwemojiLocalSource
if not os.path.isdir('./resources/emoji/twemoji'):
twemoji = TwemojiLocalSource()
logger.info("检测到未安装emoji资源包,正在初始化本地emoji...")
asyncio.get_event_loop().run_until_complete(twemoji.download_emoji(proxy=config.get_proxy()))
if twemoji.init_emoji(twemoji.savepath):
logger.info("初始化emoji成功")
else:
logger.warning("初始化emoji失败")

@staticmethod
def init_dir():
dirs = os.listdir(HOME_DIR)
if "logs" in dirs and "results" in dirs:
return
logger.info("检测到初次使用,正在初始化...")
if not os.path.isdir('logs'):
os.mkdir("logs")
logger.info("创建文件夹: logs 用于保存日志")
if not os.path.isdir('results'):
os.mkdir("results")
logger.info("创建文件夹: results 用于保存测试结果")

@staticmethod
def init_permission():
if sys.platform != "win32":
try:
status = os.system(f"chmod +x {config.get_clash_path()}")
if status != 0:
raise OSError(f"Failed to execute command: chmod +x {config.get_clash_path()}")
except OSError as o:
print(o)

@staticmethod
def init_commit_string():
_latest_version_hash = ""
try:
output = check_output(['git', 'log'], shell=False, encoding="utf-8").strip()
# 解析输出,提取最新提交的哈希值
for line in output.split("\n"):
if "commit" in line:
_latest_version_hash = line.split()[1][:7]
break
except Exception as e:
logger.info(f"可能不是通过git拉取源码,因此version将无法查看提交哈希。{str(e)}")
_latest_version_hash = "Unknown"
return _latest_version_hash

@staticmethod
def init_proxy_client():
"""
自动下载代理客户端FullTCore
"""
if config.get_clash_path() is not None:
return
import platform
loop = asyncio.get_event_loop()
tag = loop.run_until_complete(get_latest_tag(Init.ftcore_owner, Init.ftcore_name))
tag2 = tag[1:] if tag[0] == "v" else tag
arch = platform.machine().lower()
if arch == "x86_64":
arch = "amd64"
elif arch == "aarch64" or arch == "armv8":
arch = "arm64"
elif arch == "x86":
arch = "i386"
elif arch == "arm":
arch = "armv7l"
suffix = ".tar.gz"
if sys.platform.startswith('linux'):
pf = "linux"
elif sys.platform.startswith('darwin'):
pf = "darwin"
elif sys.platform.startswith('win32'):
pf = "windows"
suffix = ".zip"
else:
logger.info("无法找到FullTCore在当前平台的预编译文件,请自行下载。")
return

# https://github.com/AirportR/FullTCore/releases/download/v1.3-meta/FullTCore_1.3-meta_windows_amd64.zip
base_url = f"https://github.com/{Init.ftcore_owner}/{Init.ftcore_name}"

download_url = base_url + f"/releases/download/{tag}/FullTCore_{tag2}_{pf}_{arch}{suffix}"
savename = download_url.split("/")[-1]
logger.info(f"正在自动为您下载最新版本({tag})的FullTCore: {download_url}")
savepath = Path(HOME_DIR).joinpath("bin").absolute()
saved_file = savepath.joinpath(savename)

try:
loop.run_until_complete(Download(download_url, savepath, savename).dowload(proxy=config.get_proxy()))
except DownloadError:
logger.info("无法找到FullTCore在当前平台的预编译文件,请自行下载。")
return
except (OSError, Exception) as e:
logger.info(str(e))
return

if suffix.endswith("zip"):
unzip_result = unzip(saved_file, savepath)
elif suffix.endswith("tar.gz"):
unzip_result = unzip_targz(saved_file, savepath)
else:
unzip_result = False
if unzip_result:
if pf == "windows":
corename = Init.ftcore_name + ".exe"
else:
corename = Init.ftcore_name
proxy_path = str(savepath.joinpath(corename).as_posix())
clash_cfg = config.config.get('clash', {})
clash_cfg = clash_cfg if isinstance(clash_cfg, dict) else {}
clash_cfg['path'] = proxy_path
config.yaml['clash'] = clash_cfg
config.reload()


def check_version() -> str:
return Init.init_commit_string()


def check_py_version() -> None:
if sys.version_info < (3, 9):
py_url = "https://www.python.org/downloads/"
logger.info(f"您的Python版本为{sys.version}\n至少需要Python3.9才能运行此程序。前往: {py_url}下载新版本。")
sys.exit()


def check_init():
check_args()
check_py_version()
Init.init_emoji()
Init.init_dir()
Init.init_proxy_client()
Init.init_permission()
Init.init_user()


def parse_bot_proxy():
_proxies = None
try:
Expand Down
34 changes: 34 additions & 0 deletions tests/test_download.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
import asyncio
import sys
import unittest
from os import getcwd, chdir


def pin_cwd():
"""
固定工作目录为项目根目录
"""
if "tests" in getcwd():
chdir("..")
sys.path.append(getcwd())
print(f"当前工作目录: {getcwd()}")
# print(f"包的搜索路径:{sys.path}")


class TestDownload(unittest.TestCase):
def setUp(self):
pin_cwd()
from utils.collector import Download
durl = "https://github.com/AirportR/FullTCore/releases/download/v1.3-meta/FullTCore_1.3-meta_windows_amd64.zip"
dl = Download(durl, "./tests", durl.split("/")[-1])
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
self.dl = dl
self.loop = loop

def test_init(self):
self.assertTrue(self.loop.run_until_complete(self.dl.dowload()))


if __name__ == '__main__':
unittest.main()
32 changes: 32 additions & 0 deletions tests/test_init.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
import sys
from os import getcwd, chdir
import unittest


def pin_cwd():
"""
固定工作目录为项目根目录
"""
if "tests" in getcwd():
chdir("..")
sys.path.append(getcwd())
print(f"当前工作目录: {getcwd()}")
# print(f"包的搜索路径:{sys.path}")


class TestInit(unittest.TestCase):
def setUp(self):
pin_cwd()
from utils.init import check_init
self.func = check_init

def test_init(self):
self.assertTrue(self.func())


if __name__ == '__main__':
# pin_cwd()

unittest.main()
# check_init()
print("✅测试通过")
43 changes: 41 additions & 2 deletions utils/__init__.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import random
from os import getcwd
import aiohttp

from utils.cron import *
from typing import Callable, Any, Union, Coroutine
from typing import Callable, Any, Union, Coroutine, Optional

__version__ = "3.6.8" # 项目版本号
HOME_DIR = getcwd()
Expand All @@ -14,10 +15,27 @@
"__version__",
"retry",
"script_demo",
"HOME_DIR"
"HOME_DIR",
"generate_random_string",
"async_runtime"
]


def generate_random_string():
length = random.randint(10, 30)
rand_str = ''
range_start = 48
range_end = 122

for _ in range(length):
random_integer = random.randint(range_start, range_end)
# Validate ascii range
if random_integer <= 57 or random_integer >= 65:
rand_str += chr(random_integer)

return rand_str


def default_breakfunc(ret_val: bool) -> bool:
return True if isinstance(ret_val, bool) and ret_val else False

Expand Down Expand Up @@ -64,3 +82,24 @@ def __init__(self):
await script_func(fakecl, session, *arg, **kwargs)
print(fakecl.info)
await session.close()


def async_runtime(loop: Optional[asyncio.AbstractEventLoop] = None):
"""
临时的异步运行时,适用于只有一两个异步函数的情况
:param: loop: 事件循环
:param: break_func: 触发的回调中止函数,参数为调用函数的返回值,返回值为bool,
"""
if loop is None:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)

def wrapper(func):
def inner(*args, **kwargs):
result = loop.run_until_complete(func(*args, **kwargs))
return result

return inner

return wrapper

Loading

0 comments on commit df2fdab

Please sign in to comment.