-
Notifications
You must be signed in to change notification settings - Fork 2
/
ck_pupu_lottery.py
264 lines (240 loc) · 10.4 KB
/
ck_pupu_lottery.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
# -*- coding: utf-8 -*-
"""
cron: 30 9,23 * * *
new Env('朴朴抽奖');
微信登录朴朴app
找到请求https://cauth.pupuapi.com/clientauth/user/society/wechat/login?user_society_type=11
在json响应里有refresh_token
lottery_id 手动配置抽奖id 支持字符串数组或单个字符串
find_lottery 是否自动获取抽奖活动, 默认自动(部分藏的很深的抽奖暂时需要手动配置id)
coin_exchange 每个抽奖活动可兑换多少次朴分, 默认0次不兑换
"""
import asyncio
import sys
from traceback import format_exc
from typing import Iterable
from pupu_api import Client as PClient
from pupu_types import *
from utils import aio_randomSleep, check, log
assert sys.version_info >= (3, 9)
class PUPU:
__slots__ = (
"check_item",
"device_id",
"refresh_token",
"exchange_limit",
)
def __init__(self, check_item) -> None:
self.check_item: dict = check_item
async def main(self):
msg: list[str] = []
try:
self.device_id = self.check_item.get("device_id", "")
self.refresh_token = self.check_item.get("refresh_token", "")
if not self.device_id:
raise SystemExit("device_id 配置有误")
if not self.refresh_token:
raise SystemExit("refresh_token 配置有误")
msg += await self.Lottery()
except Exception:
log(f"失败: 请检查接口 {format_exc()}", msg)
return "\n".join(msg)
async def Lottery(self):
msg: list[str] = []
async with PClient(self.device_id, self.refresh_token) as api:
result = await api.InitializeToken(
self.check_item.get("addr_filter"), force_update_receiver=False
)
if isinstance(result, ApiResults.Error):
if api.nickname:
log(f"账号: {api.nickname}", msg)
log(result, msg)
return msg
log(f"账号: {api.nickname}", msg)
# 领取所有可领取的积分
coin_ids = await api.GetCoinList()
if isinstance(coin_ids, ApiResults.Error):
log(coin_ids, msg)
elif not coin_ids:
log("当前无可领取的朴分")
else:
total_coin = 0
for i, id in enumerate(coin_ids):
coin, _ = await asyncio.gather(
api.DrawCoin(id), aio_randomSleep(1, 3)
)
if isinstance(coin, ApiResults.Error):
log(coin, msg)
else:
log(f" [{i+1}/{len(coin_ids)}]成功领取{coin}朴分")
total_coin += coin
log(f"成功领取{total_coin}朴分", msg)
lottery_ids: list[str] = []
id = self.check_item.get("lottery_id")
if id:
if isinstance(id, str):
if id not in lottery_ids:
lottery_ids.append(id)
elif isinstance(id, Iterable):
for i in id:
if i not in lottery_ids:
lottery_ids.append(i)
if self.check_item.get("find_lottery", True):
banner_result, coin_cfg = await asyncio.gather(
api.GetBanner(
BANNER_LINK_TYPE.CUSTOM_LOTTERY,
position_types=[60, 220, 560, 620, 830, 850, 860, 890],
),
api.GetCoinConfig(),
)
if isinstance(banner_result, ApiResults.Error):
log(banner_result, msg)
else:
banner_result.banners.sort(key=lambda b: b.title)
# 把翻翻乐放在第一位
for i, b in enumerate(banner_result.banners):
if "翻翻乐" in b.title:
if i > 0:
del banner_result.banners[i]
banner_result.banners.insert(0, b)
break
for b in banner_result.banners:
if b.link_id not in lottery_ids:
if b.link_id not in lottery_ids:
lottery_ids.append(b.link_id)
log(f" 找到抽奖: {b.title}")
if isinstance(coin_cfg, ApiResults.Error):
log(coin_cfg, msg)
elif coin_cfg not in lottery_ids:
lottery_ids.insert(0, coin_cfg)
else:
log(f" 跳过了自动查找活动")
if len(lottery_ids) > 0:
self.exchange_limit = self.check_item.get("coin_exchange", 0)
log(f"朴分兑换限制数: {self.exchange_limit}次", msg)
for id in lottery_ids:
# 串行抽奖 确保print按顺序执行
msg += await self._Lottery(api, id)
else:
log("无抽奖活动")
exit() # 目前没必要执行后续的操作
return msg
async def _Lottery(self, api: PClient, id: str):
"""抽奖"""
msg: list[str] = []
# 首先获取抽奖详情
info = await api.GetLotteryInfo(id)
if isinstance(info, ApiResults.Error):
log(info, msg)
return msg
# 似乎朴朴压根不关心你点的哪张牌
# elif info.lottery.type != LOTTERY_TYPE.DRAW:
# log(f'[{info.lottery.name}] 不支持: {info.lottery.type}', msg)
# return msg
log(f"正在进行 [{info.lottery.name}]", msg)
# 同时拉取任务列表和抽奖机会兑换列表
task_groups, chance_info = await asyncio.gather(
api.GetTaskGroupsData(info.lottery), api.GetChanceEntrances(info.lottery)
)
if isinstance(task_groups, ApiResults.Error):
log(task_groups, msg)
elif not task_groups.tasks:
log(" 没有配置任务")
else:
# 然后开始做任务
for task in task_groups.tasks:
if task.task_status != TaskStatus.Undone:
continue
if task.page_rule:
co_task = api.PostPageTaskComplete(task)
elif task.target_code == 3001:
co_task = api.ClockInTask(info.lottery, task)
else:
continue
# 每个任务至少间隔2~5秒的时间
_, task_result = await asyncio.gather(aio_randomSleep(2, 5), co_task)
if isinstance(task_result, ApiResults.Error):
log(task_result)
else:
log(f" {task.task_name}: 已完成")
# 接着尝试朴分兑换
exchange_count = 0
while True:
if isinstance(chance_info, ApiResults.Error):
# 拉取失败了
if chance_info.code != ERROR_CODE.BUSY:
log(chance_info, msg)
# 直接尝试抽奖
_, lottery_msg = await self.__Lottery(api, info)
msg += lottery_msg
break
for entrance in chance_info.entrances:
if entrance.type == CHANCE_OBTAIN_TYPE.COIN_EXCHANGE:
# 目前只支持朴分兑换
break
else:
# 没有可用的朴分兑换入口
entrance = None
if entrance:
while (
chance_info.coin_balance >= entrance.target_value
and exchange_count < self.exchange_limit
):
# 朴分足够、兑换次数没超过限制
_, exchange_result = await asyncio.gather(
aio_randomSleep(4, 8), # 间隔4~8秒,确保朴分、抽奖机会数更新
api.CoinExchange(info.lottery, entrance),
)
if isinstance(exchange_result, ApiResults.Error):
log(exchange_result)
break
exchange_count += 1
log(
f" 第{exchange_count}次{entrance.title}: 成功兑换{exchange_result.gain_num}次抽奖机会"
)
# 更新朴分余额
chance_info = await api.GetChanceEntrances(info.lottery)
if isinstance(chance_info, ApiResults.Error):
# 拉取失败了
log(chance_info)
break
else:
if chance_info.coin_balance < entrance.target_value:
log(
f" 当前朴分{chance_info.coin_balance}少于{entrance.target_value}, 放弃兑换"
)
# 开始抽奖
result, lottery_msg = await self.__Lottery(api, info)
msg += lottery_msg
if not result:
# 抽奖失败
break
# 更新朴分余额
chance_info = await api.GetChanceEntrances(info.lottery)
return msg
async def __Lottery(self, api: PClient, info: ApiResults.LotteryInfo):
"""抽奖"""
msg: list[str] = []
chances_info = await api.GetUserLotteryInfo(info.lottery)
if isinstance(chances_info, ApiResults.Error):
log(chances_info, msg)
return (False, msg)
elif chances_info.remain_chances <= 0:
log(" 没有抽奖机会", msg)
return (False, msg)
log(f" 当前有{chances_info.remain_chances}次抽奖机会", msg)
for i in range(chances_info.remain_chances):
# 每次抽奖至少间隔4~8秒的时间
_, lottery_result = await asyncio.gather(
aio_randomSleep(4, 8), api.Lottery(info.lottery)
)
if isinstance(lottery_result, ApiResults.Error):
log(f" 第{i+1}次抽奖: {lottery_result}", msg)
else:
log(f" 第{i+1}次抽奖: {lottery_result.prize.name}", msg)
return (True, msg)
@check(run_script_name="朴朴抽奖", run_script_expression="pupu")
def main(*args, **kwargs):
return asyncio.run(PUPU(check_item=kwargs.get("value")).main())
if __name__ == "__main__":
main()