-
Notifications
You must be signed in to change notification settings - Fork 40
/
akamTester.py
114 lines (99 loc) · 3.72 KB
/
akamTester.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# @Time : 2019/7/19 16:52
# @Author : Miyouzi
# @File : akamTester.py
# @Software: PyCharm
#from pythonping import ping
from icmplib import ping
from ColorPrinter import color_print
from GlobalDNS import GlobalDNS
import sys, os, argparse
import concurrent.futures
working_dir = os.path.dirname(os.path.realpath(__file__))
# working_dir = os.path.dirname(sys.executable) # 使用 pyinstaller 编译时,打开此项
ip_list_path = os.path.join(working_dir, 'ip_list.txt')
version = 5.0
def ping_test(ip):
result = ping(ip, count=5, privileged=False)
delay = result.avg_rtt
msg = ip + '\t平均延迟: ' + str(delay) + ' ms'
if delay<100:
color_print(msg, status=2)
else:
color_print(msg)
return delay
version_msg = '当前akamTester版本: ' + str(version)
color_print(version_msg, 2)
host = 'upos-hz-mirrorakam.akamaized.net'
# 支持命令行, 允许用户通过参数指定测试域名
if len(sys.argv) > 1:
parser = argparse.ArgumentParser()
parser.add_argument('--user_host', '-u', type=str, help='指定测试域名', default=host, required=True)
arg = parser.parse_args()
if arg.user_host:
host = arg.user_host
#添加一个以host为名称的低延迟ip列表地址,以供后续保存
low_delay_ip_list_path = os.path.join(working_dir, host+'.txt')
try:
akam = GlobalDNS(host)
color_print('第一次解析:')
ip_list = akam.get_ip_list()
print()
color_print('第二次解析:')
akam.renew()
ip_list = ip_list | akam.get_ip_list()
print()
color_print('第三次解析:')
akam.renew()
ip_list = ip_list | akam.get_ip_list()
except BaseException as e:
color_print('进行全球解析时遇到未知错误: '+str(e), status=1)
if os.path.exists(ip_list_path):
color_print('将读取本地保存的ip列表', status=1)
with open(ip_list_path, 'r', encoding='utf-8') as f:
ip_list = f.read().splitlines()
else:
color_print('没有本地保存的ip列表!程序终止!', status=1)
print()
input('按回车退出')
sys.exit(0)
else:
# 保存解析结果
with open(ip_list_path, 'w', encoding='utf-8') as f:
for ip in ip_list:
f.write(str(ip))
f.write('\n')
print()
color_print('共取得 '+str(len(ip_list))+' 个 IP, 开始测试延迟')
print()
ip_info = []
good_ips = []
with concurrent.futures.ThreadPoolExecutor() as executor:
futures = [executor.submit(ping_test, ip) for ip in ip_list]
delays = [f.result() for f in futures]
for delay, ip in zip(delays, ip_list):
ip_info.append({'ip': ip, 'delay': delay})
if delay < 100:
good_ips.append({'ip': ip, 'delay': delay})
print()
if len(good_ips) > 0:
color_print('基于当前网络环境, 以下为延迟低于100ms的IP', status=2)
good_ips.sort(key=lambda x:x['delay'])
#保存所有低延迟ip到txt,并在后面添加上该host,以供快速复制到hosts修改
with open(low_delay_ip_list_path, 'w', encoding='utf-8') as f:
for ip in good_ips:
color_print(ip['ip'] + '\t平均延迟: ' + str(ip['delay']) + ' ms', status=2)
f.write(ip['ip']+' '+host)
f.write('\n')
else:
ip_info.sort(key=lambda x:x['delay'])
num = len(ip_info) # 要显示的节点数
if num > 3: # 如果解析的节点数超过 3 个, 那么显示 3 个就行
num = 3
color_print('本次测试未能找到延迟低于100ms的IP! 以下为延迟最低的 ' + str(num) + ' 个节点', status=1)
for i in range(0,num):
color_print(ip_info[i]['ip'] + '\t平均延迟: ' + str(ip_info[i]['delay']) + ' ms')
print()
input('按回车退出')
sys.exit(0)