forked from chinjun/clash-config-preprocessor
-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathv1.py
193 lines (151 loc) · 6.67 KB
/
v1.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
import requests
import utils
import yaml
import re
from collections import OrderedDict
supported_rules: dict = {'IP-CIDR': 'IP-CIDR', 'IP-CIDR6': 'IP-CIDR6', 'DOMAIN': 'DOMAIN',
'DOMAIN-KEYWORD': 'DOMAIN-KEYWORD', 'DOMAIN-SUFFIX': 'DOMAIN-SUFFIX',
'GEOIP': 'GEOIP'}
def handle_v1(data: OrderedDict) -> OrderedDict:
preprocessor: OrderedDict = data["preprocessor"]
if preprocessor is None or preprocessor["version"] != 1:
raise utils.ParseException("Version != 1")
result: OrderedDict = OrderedDict()
general_block: OrderedDict = data["clash-general"]
result.update(general_block)
proxy_sources_dicts: list = data["proxy-sources"]
proxies: list = []
for item in proxy_sources_dicts:
proxies += load_proxies(item)
proxy_group_dispatch_dicts: list = data["proxy-group-dispatch"]
proxy_groups: list = []
for item in proxy_group_dispatch_dicts:
group_data: OrderedDict = item.copy()
ps: list = []
black_regex = re.compile(item["proxies-filters"]["black-regex"])
white_regex = re.compile(item["proxies-filters"]["white-regex"])
flat_proxies: set = set()
back_flat_proxies: set = set()
if "flat-proxies" in item and item["flat-proxies"] is not None:
flat_proxies = set(item['flat-proxies'])
ps += item['flat-proxies']
if "back-flat-proxies" in item and item["back-flat-proxies"] is not None:
back_flat_proxies = set(item['back-flat-proxies'])
for p in proxies:
p_name: str = p["name"]
if white_regex.fullmatch(p_name) and not black_regex.fullmatch(p_name) and p_name not in flat_proxies and p_name not in back_flat_proxies:
ps.append(p_name)
if "back-flat-proxies" in item and item["back-flat-proxies"] is not None:
ps += item['back-flat-proxies']
group_data.pop("proxies-filters", None)
group_data.pop("flat-proxies", None)
group_data.pop("back-flat-proxies", None)
group_data["proxies"] = ps
proxy_groups.append(group_data)
rule_sets_dicts: list = data["rule-sets"]
rule_sets: dict = {}
if not rule_sets_dicts is None:
for item in rule_sets_dicts:
item_name: str = item["name"]
item_type: str = item['type']
if item_type == 'clash':
item_source: str = item["source"]
item_map: dict = {}
item_rule_skip = item.get("rule-skip", {})
item_target_skip = item.get("target-skip", {})
for target_map_element in item.get("target-map", {}):
kv: list = target_map_element.split(",")
item_map[kv[0]] = kv[1]
if item_source == "url":
rule_sets[item_name] = load_clash_url_rule_set(
item["url"], item_map, item_rule_skip, item_target_skip)
elif item_source == "file":
rule_sets[item_name] = load_clash_file_rule_set(
item["path"], item_map, item_rule_skip, item_target_skip)
elif item_type == 'surge-ruleset':
item_source: str = item["source"]
item_target: str = item["target"]
if item_source == "url":
rule_sets[item_name] = load_surge_url_rule_set(
item["url"], item_target)
elif item_source == "file":
rule_sets[item_name] = load_surge_file_rule_set(
item["path"], item_target)
rules: list = []
for rule in data["rule"]:
if str(rule).startswith("RULE-SET"):
rules.extend(rule_sets[str(rule).split(",")[1]])
else:
rules.append(rule)
result["Proxy"] = proxies
result["Proxy Group"] = proxy_groups
result["Rule"] = rules
return result
def load_proxies(item):
if item["type"] == 'plain':
return [item['data']]
if item["type"] == 'url':
data = requests.get(item['url'])
data_yaml: OrderedDict = yaml.load(
data.content.decode(), Loader=yaml.Loader)
else:
with open(item['path'], "r") as f:
data_yaml: OrderedDict = yaml.load(f, Loader=yaml.Loader)
proxy_yaml = data_yaml['Proxy']
for p in proxy_yaml:
if 'udp' in item and 'udp' not in p:
p['udp'] = item['udp']
if 'prefix' in item:
p['name'] = item['prefix'] + p['name']
if 'suffix' in item:
p['name'] += item['suffix']
if p['type'] == 'ss':
if 'plugin' in item and 'plugin' not in p:
p['plugin'] = item['plugin']
if 'plugin-opts' in item:
p['plugin-opts'] = item['plugin-opts']
return proxy_yaml
def load_clash_url_rule_set(url: str, targetMap: dict, skipRule: set, skipTarget: set) -> list:
data = yaml.load(requests.get(url).content, Loader=yaml.Loader)
result: list = []
for rule in data["Rule"]:
original_target = str(rule).split(",")[-1]
map_to: str = targetMap.get(original_target)
if str(rule).split(',')[0] not in skipRule and original_target not in skipTarget:
if not map_to is None:
result.append(str(rule).replace(original_target, map_to))
else:
result.append(str(rule))
return result
def load_clash_file_rule_set(path: str, targetMap: dict, skipRule: set, skipTarget: set) -> list:
with open(path, "r") as f:
data = yaml.load(f, Loader=yaml.Loader)
result: list = []
for rule in data["Rule"]:
original_target = str(rule).split(",")[-1]
map_to: str = targetMap.get(original_target)
if str(rule).split(',')[0] not in skipRule and original_target not in skipTarget:
if not map_to is None:
result.append(str(rule).replace(original_target, map_to))
else:
result.append(rule)
return result
def load_surge_url_rule_set(url: str, target: str):
data = requests.get(url).text.splitlines()
result: list = []
for raw_rule in data:
rule = raw_rule.split(',')
if rule[0] in supported_rules:
result.append(supported_rules[rule[0]] + ',' + rule[1] + ',' + target)
# else:
# print('# ' + raw_rule)
return result
def load_surge_file_rule_set(path: str, target: str):
with open(path,"r") as f:
data = f.read()
result: list = []
for raw_rule in data:
rule = raw_rule.split(',')
if rule[0] in supported_rules:
result.append(supported_rules[rule[0]] + ',' + rule[1] + ',' + target)
return result