-
Notifications
You must be signed in to change notification settings - Fork 87
/
process_my_home.py
76 lines (55 loc) · 1.96 KB
/
process_my_home.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
from collections import Counter
from datetime import datetime
# 示例数据
# 10折 1200 (0/1) 自己
# 10折 1200 VDMzZFF1T0hKdTRjaEJRMkV0N2xiZz09 (0/3) 舞***影(15***33)
def extract_discount(share_str: str) -> int:
return int(share_str.split(" ")[0][:-1])
def extract_price(share_str: str) -> int:
return int(share_str.split(" ")[1])
def extract_suin(share_str: str) -> str:
return share_str.split(" ")[2]
def extract_remaining_times(share_str: str) -> int:
# (0/3)
temp = share_str.split(" ")[3][1:-1]
remaing_times = int(temp.split("/")[0])
return remaing_times
# 清洗并去重
with open(".cached/my_home.csv", encoding="utf-8") as f:
suin_to_share_str: dict[str, str] = {}
f.readline()
for line in f:
line = line.strip()
if line == "":
continue
if line.endswith("自己"):
continue
suin = extract_suin(line)
if suin in suin_to_share_str:
last_info = suin_to_share_str[suin]
if extract_remaining_times(line) <= extract_remaining_times(last_info):
# 之前记录的是新一点的数据
continue
suin_to_share_str[suin] = line
# 排序
share_str_list = []
for s in suin_to_share_str.values():
share_str_list.append(s)
share_str_list.sort(key=lambda s: extract_price(s))
# 统计各个折扣对应数目
discount_to_count: Counter = Counter()
for s in reversed(share_str_list):
discount = extract_discount(s)
discount_to_count[discount] += 1
# 导出
with open(".cached/my_home_processed.csv", "w", encoding="utf-8") as f:
# 导出统计数据
f.write(f"{datetime.now()}\n")
f.write(f"总计: {len(share_str_list)}\n")
for discount in sorted(discount_to_count.keys()):
count = discount_to_count[discount]
f.write(f"{discount:2d} 折: {count}\n")
f.write("-----\n")
# 导出实际数据
for share_str in share_str_list:
f.write(share_str + "\n")