-
Notifications
You must be signed in to change notification settings - Fork 2
/
Myportscan.py
176 lines (160 loc) · 6.25 KB
/
Myportscan.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
import socket
import threading
from queue import Queue
import time
import argparse
# import argparse
'''
端口探测全类封装版,管道+探测
经过不断测试,发现一个问题
通过socket 发送包,如果认为目标,不响应则认为端口是关闭的,
但是有些端口是开放的,但是端口会回应你内容,
所以,如果不设置超时 通过s.recv(1024)会一直无响应,程序中断不了,一直在那卡着
默认不打印关闭端口信息
'''
class Myportscan:
def __init__(self, ips=None, ports=None, queue=None, timeout=2):
self.ips = ips
self.ports = ports
self.queue = queue
self.timeout = timeout
socket.setdefaulttimeout(timeout)
def get_port(self):
# 80 - 100, 3380 - 3390
port_list = []
port_segments = self.ports.split(',')
for port_segment in port_segments:
if '-' in port_segment:
start, end = port_segment.split('-')
for i in range(int(start), int(end) + 1):
port_list.append(i)
else:
port_list.append(port_segment)
return port_list
def get_ip(self):
# 10.10.10.10/24 10.10.10.10-20 10.10.10.10 10.10.10.11
ips = self.ips
ip_list = []
if '/' in self.ips:
# 192.168.1.1/24
ip_csection = self.ips.rsplit('.', 1)[0]
# 将需要 ping 的 ip 加入队列
for i in range(1, 256):
ip_list.append(i)
elif '-' in self.ips:
# 192.168.1.2-10
start_ip = self.ips.rsplit('-', 1)
ip_csection, start = start_ip[0].rsplit('.', 1)
end = int(start_ip[1]) + 1
for i in range(int(start), end):
ip_list.append(i)
else:
ip_list.append(self.ips)
return ip_list
def queue_put(self):
ip_list = self.get_ip()
port_list = self.get_port()
for ip in ip_list:
for port in port_list:
self.queue.put((ip, port))
def get_a_port_isalive(self, ip, port):
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
server.connect((ip, int(port)))
# result = '[+] {0}:{1} open'.format(ip, port)
# print(result)
# return ('1', ip, port)
# return result
return True
except Exception as e:
# result = '[-] {0}:{1} close'.format(ip, port)
# print(result)
# return ('0', ip, port)
# return result
return False
finally:
server.close()
# 获取banner,主要是为开放的端口进行获取开放的端口的banner
def get_a_port_banner(self, ip, port):
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
server.connect((ip, int(port)))
banner = server.recv(1024).decode()
return banner
except Exception as e:
return None # 空没有获取到banner
finally:
server.close()
# 探端口,只探活
def port_alive_scanner(self):
while not self.queue.empty():
ip, port = self.queue.get()
port_isalive = self.get_a_port_isalive(ip, port)
if port_isalive is True:
scan_result = '[+] {0}:{1} open'.format(ip, port)
print(scan_result)
return scan_result
elif port_isalive is False:
# scan_result = '[-] {0}:{1} close'.format(ip, port)
# print(scan_result)
# return scan_result
pass
# 探活+获取banner,先探活,然后获取banner
def port_alive_and_banner_scanner(self):
while not self.queue.empty():
ip, port = self.queue.get()
port_isalive = self.get_a_port_isalive(ip, port)
if port_isalive is True:
port_banner = self.get_a_port_banner(ip, port)
scan_result = '[+] {0}:{1} open {2}'.format(ip, port, port_banner)
print(scan_result)
return scan_result
elif port_isalive is False:
# scan_result = '[-] {0}:{1} close'.format(ip, port)
# print(scan_result)
# return scan_result
pass
if __name__ == '__main__':
title = '''
python3 portscan.py -p 80,81 10.10.10.10
python3 portscan.py -p 80-100,3380-3390 10.10.10.10 10.10.10.11
python3 portscan.py -p 1-65535 10.10.10.10-20
python3 portscan.py -p 80,90-100 10.10.10.10/24
'''
parser = argparse.ArgumentParser(usage=title, description="Multithread Portscan,the defalut threads is 50.")
parser.add_argument("-H", type=str, dest="hosts", required=True, help="Hosts to scan")
parser.add_argument("-p", type=str, dest="ports", help="ports to scan")
parser.add_argument("-s", "--simple", default=True, type=bool, dest="simple_scan", help="alive scan")
parser.add_argument("-a", "-all", default=False, type=bool, dest="all_scan", help="banner scan")
parser.add_argument("-t", "--thread", default=50, type=int, dest="threads", help="threads")
parser.add_argument("--timeout", default=2, type=int, dest="timeout", help="timeout")
args = parser.parse_args()
Thread_maxnum = args.threads
ips = args.hosts
ports = args.ports
timeout = 2
if args.timeout != 0 and args.timeout > 0:
timeout = args.timeout
else:
print("--timeout Setting error.")
exit(1)
IP_PORT_QUEUE = Queue()
Portscanner = Myportscan(ips, ports, IP_PORT_QUEUE, timeout)
threads = []
start_time = time.time()
# 读取数据,存入管道
Portscanner.queue_put()
if args.simple_scan is True:
for i in range(Thread_maxnum):
thread = threading.Thread(target=Portscanner.port_alive_scanner)
thread.start()
threads.append(thread)
for thread in threads:
thread.join()
elif args.all_scan is True:
for i in range(Thread_maxnum):
thread = threading.Thread(target=Portscanner.port_alive_and_banner_scanner)
thread.start()
threads.append(thread)
for thread in threads:
thread.join()