Skip to content

Commit

Permalink
新增 Telegram 通知,将预约成功的结果发送到频道
Browse files Browse the repository at this point in the history
  • Loading branch information
sakurasep committed Mar 11, 2024
1 parent eade71f commit a299792
Show file tree
Hide file tree
Showing 2 changed files with 120 additions and 87 deletions.
81 changes: 81 additions & 0 deletions py/main/get_info.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,12 @@
from datetime import timedelta, datetime
import sys
import requests
from Crypto.Cipher import AES
from Crypto.Util.Padding import pad, unpad
from datetime import datetime
import getpass
from get_bearer_token import get_bearer_token
import base64

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("开始打印日志")
Expand Down Expand Up @@ -128,6 +134,81 @@ def get_segment(build_id, nowday):
sys.exit()


# 根据当前系统时间获取 key
def get_key():
# 获取当前日期,并转换为字符串
current_date = datetime.now().strftime("%Y%m%d")

# 生成回文
palindrome = current_date[::-1]

# 使用当前日期和回文作为密钥
key = current_date + palindrome

# print("当前日期:", current_date)
# print("回文:", palindrome)
# print("密钥:", key)

return key


# 读取授权码
def get_auth_token():
# token = "test"
# return token
try:
# 从命令行中获取用户名和密码
username = input("请输入用户名(学号): \n")
password = getpass.getpass('请输入密码: \n')
name, token = get_bearer_token(username, password)
logger.info(f"你好,{name}同学")
new_token = "bearer" + str(token)
# logger.info(new_token)
return new_token
except Exception as e:
logger.error(f"获取授权码时发生异常: {str(e)}")
sys.exit()


# 加密函数
def encrypt(text):
# 自动获取 key
key = get_key()
# 目前获取到的加密密钥
iv = "ZZWBKJ_ZHIHUAWEI"
key_bytes = key.encode('utf-8')
iv_bytes = iv.encode('utf-8')

cipher = AES.new(key_bytes, AES.MODE_CBC, iv_bytes)
ciphertext_bytes = cipher.encrypt(pad(text.encode('utf-8'), AES.block_size))

return base64.b64encode(ciphertext_bytes).decode('utf-8')


# 定义解密函数
def decrypt(ciphertext):
# 自动获取 key
key = get_key()
# 目前获取到的加密密钥
iv = "ZZWBKJ_ZHIHUAWEI"

# 将密钥和初始化向量转换为 bytes 格式
key_bytes = key.encode('utf-8')
iv_bytes = iv.encode('utf-8')

# 将密文进行 base64 解码
ciphertext = base64.b64decode(ciphertext)

# 使用 AES 进行解密
cipher = AES.new(key_bytes, AES.MODE_CBC, iv_bytes)
decrypted_bytes = cipher.decrypt(ciphertext)

# 去除解密后的填充
decrypted_text = unpad(decrypted_bytes, AES.block_size).decode('utf-8')

return decrypted_text


def get_seat_info(build_id, segment, nowday):
try:
interrupted = False
Expand Down
126 changes: 39 additions & 87 deletions py/main/get_seat.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
import base64
import asyncio

import logging
import multiprocessing
import random
import time
from datetime import datetime
import getpass

from telegram import Bot

import requests
from Crypto.Cipher import AES
from Crypto.Util.Padding import pad, unpad

import sys
from get_bearer_token import get_bearer_token
from get_info import get_date, get_seat_info, get_segment, get_build_id

from get_info import get_date, get_seat_info, get_segment, get_build_id, get_auth_token, encrypt

# 配置日志
logging.basicConfig(level=logging.INFO)
Expand All @@ -22,26 +22,10 @@
# 在代码的顶部定义全局变量
global_exclude_ids = set()
seat_result = {}


# 读取授权码
def get_auth_token():
# token = "test"
# return token
try:
# 从命令行中获取用户名和密码
username = input("请输入用户名(学号): \n")
password = getpass.getpass('请输入密码: \n')
name, token = get_bearer_token(username, password)
logger.info(f"你好,{name}同学")
new_token = "bearer" + str(token)
# logger.info(new_token)
return new_token
except Exception as e:
logger.error(f"获取授权码时发生异常: {str(e)}")
sys.exit()


CHANNEL_ID = "Your Channel id"
TELEGRAM_BOT_TOKEN = 'Your telegram token'
TELEGRAM_URL = "https://telegram.sakurasep.workers.dev/bot"
message = ""
MAX_RETRIES = 3 # 最大重试次数
RETRY_DELAY = 5 # 重试间隔时间(秒)

Expand All @@ -66,61 +50,19 @@ def send_post_request_and_save_response(url, data, headers):
sys.exit()


# 根据当前系统时间获取 key
def get_key():
# 获取当前日期,并转换为字符串
current_date = datetime.now().strftime("%Y%m%d")

# 生成回文
palindrome = current_date[::-1]

# 使用当前日期和回文作为密钥
key = current_date + palindrome

# print("当前日期:", current_date)
# print("回文:", palindrome)
# print("密钥:", key)

return key


# 加密函数
def encrypt(text):
# 自动获取 key
key = get_key()
# 目前获取到的加密密钥
iv = "ZZWBKJ_ZHIHUAWEI"
key_bytes = key.encode('utf-8')
iv_bytes = iv.encode('utf-8')
def add_message(text):
global message
message += text

cipher = AES.new(key_bytes, AES.MODE_CBC, iv_bytes)
ciphertext_bytes = cipher.encrypt(pad(text.encode('utf-8'), AES.block_size))

return base64.b64encode(ciphertext_bytes).decode('utf-8')


# 定义解密函数
def decrypt(ciphertext):
# 自动获取 key
key = get_key()
# 目前获取到的加密密钥
iv = "ZZWBKJ_ZHIHUAWEI"

# 将密钥和初始化向量转换为 bytes 格式
key_bytes = key.encode('utf-8')
iv_bytes = iv.encode('utf-8')

# 将密文进行 base64 解码
ciphertext = base64.b64decode(ciphertext)

# 使用 AES 进行解密
cipher = AES.new(key_bytes, AES.MODE_CBC, iv_bytes)
decrypted_bytes = cipher.decrypt(ciphertext)

# 去除解密后的填充
decrypted_text = unpad(decrypted_bytes, AES.block_size).decode('utf-8')

return decrypted_text
async def send_seat_result_to_channel():
try:
# 使用 API 令牌初始化您的机器人
bot = Bot(token=TELEGRAM_BOT_TOKEN)
logger.info(f"要发送的消息为: {message}\n")
await bot.send_message(chat_id=CHANNEL_ID, text=message)
except Exception as e:
logger.error(f"发送消息到 Telegram 失败: {e}")


# 状态检测函数
Expand All @@ -138,7 +80,10 @@ def check_reservation_status(seat_id, m):
logger.info("重复预约, 请检查选择的时间段或是否已经成功预约")
return True
elif status == "预约成功":
# elif "1" == "1":
logger.info("成功预约")
add_message(f"预约状态为:{status}")
asyncio.run(send_seat_result_to_channel())
return True
elif status == "开放预约时间19:20":
logger.info("未到预约时间,程序将会 10s 查询一次")
Expand All @@ -149,15 +94,19 @@ def check_reservation_status(seat_id, m):
return True
elif status == "该空间当前状态不可预约":
logger.info("此位置已被预约")
if m == "2":
new_seat_id = input("请重新输入想要预约的相同自习室的 id:\n")
return False, new_seat_id
else:
if m == "3":
logger.info(f"{seat_id} 已被预约,加入排除名单")
global_exclude_ids.add(seat_id)
time.sleep(1)
# logger.info(global_exclude_ids)
return False
elif m == "2":
seat_id = input("请重新输入想要预约的相同自习室的 id:\n")
return seat_id
else:
logger.info(f"{seat_id} 已被预约,重新刷新状态")
time.sleep(1)
return False

else:
logger.info("未知状态,程序退出")
Expand Down Expand Up @@ -240,12 +189,15 @@ def prefer_get_select_id(build_id):
def get_and_select_seat_default(auth, build_id, segment, nowday, m):
# 初始化
interrupted = False
global message
try:
while not interrupted:
if m == "1":
# logger.info(build_id)
select_id = prefer_get_select_id(build_id)
logger.info(f"优选的座位是:{select_id}")
message = f"优选的座位是:{select_id}"
time.sleep(1)
else:
# logger.info(f"获取的 segment: {segment}")
data = get_seat_info(build_id, segment, nowday)
Expand All @@ -260,8 +212,8 @@ def get_and_select_seat_default(auth, build_id, segment, nowday, m):
# 获取该字典中 'id' 键对应的值
select_id = random_dict['id']
seat_no = random_dict['no']

logger.info(f"随机选择的座位为: {select_id} 真实位置: {seat_no}")
message = f"随机选择的座位为: {select_id} 真实位置: {seat_no} \n"

if select_id is None:
logger.info("选定座位为空, 程序继续运行")
Expand All @@ -270,7 +222,7 @@ def get_and_select_seat_default(auth, build_id, segment, nowday, m):
else:
post_to_get_seat(select_id, segment, auth)
# logger.info(seat_result)
interrupted = check_reservation_status(select_id, "1")
interrupted = check_reservation_status(select_id, m)

except KeyboardInterrupt:
logger.info(f"接收到中断信号,程序将退出。")
Expand All @@ -286,7 +238,7 @@ def get_and_select_seat_selected(auth, segment, seat_id):
logger.info(f"你选定的座位为: {seat_id}")
post_to_get_seat(seat_id, segment, auth)
# logger.info(seat_result)
interrupted, seat_id = check_reservation_status(seat_id, "2")
interrupted = check_reservation_status(seat_id, "2")

except KeyboardInterrupt:
logger.info(f"接收到中断信号,程序将退出。")
Expand Down

0 comments on commit a299792

Please sign in to comment.