-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathone_scan_man.py
277 lines (225 loc) · 13.3 KB
/
one_scan_man.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
#!/usr/bin/env python3
import sys
import queue
import os
from smbmap_runner import run_smbmap
import concurrent.futures
import threading
from concurrent.futures import ThreadPoolExecutor
from concurrent.futures import ProcessPoolExecutor
from port_discovery import run_port_discovery
from nmap_scanning import run_nmap, get_service_to_port_map
from dirsearch_scan import run_dirsearch
from nikto_scanning import run_nikto
from banner_grabbing import banner_grabbing
from ssl_scan import run_ssl_scan
from utils import get_ip_from_domain, is_valid_ipv4
from web_info_gather import web_recon
import requests
import urllib3
import argparse
from constants import all_tools
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
print_lock = threading.Lock()
colors = {
'red': '\033[91m',
'green': '\033[92m',
'yellow': '\033[93m',
'cyan': '\033[96m',
'reset': '\033[0m'
}
def synchronized_print(*args, **kwargs):
with print_lock:
print(*args, **kwargs)
def display_info():
print("-----------------------------------------------------------------------------------------------------------------")
print(f"\n{colors['cyan']}[#] Tools of importance\n{colors['reset']}")
print("-----------------------------------------------------------------------------------------------------------------")
for sections in all_tools:
print(f"\n{colors['cyan']}", sections, f"{colors['reset']}")
for tools in all_tools[sections]:
print(f" {colors['yellow']}", tools, f": \n{colors['reset']} {colors['red']}Description:{colors['reset']} {colors['green']}", all_tools[sections][tools]['description'], f"{colors['reset']}\n {colors['red']}Command:{colors['reset']} {colors['green']}", all_tools[sections][tools]['command'], f"{colors['reset']}\n")
def grab_banner_for_port(args):
ip_address, port, colors, all_websites = args
banner = banner_grabbing(ip_address, port, colors, all_websites)
if banner != None and len(banner) > 0:
if isinstance(banner, tuple) and len(banner) == 1:
banner = banner[0]
if isinstance(banner, str):
banner = banner.strip()
elif isinstance(banner, str):
banner = banner.strip()
return f"{colors['yellow']}[{colors['green']}Discovery{colors['yellow']}][Banner][{colors['cyan']}http.client/socket/netcat{colors['yellow']}][{colors['cyan']}{port}{colors['yellow']}]{colors['reset']}[{banner}]"
else:
banner = "Banner not found"
return f"{colors['yellow']}[{colors['green']}Discovery{colors['yellow']}][Banner][{colors['cyan']}http.client/socket/netcat{colors['yellow']}][{colors['cyan']}{port}{colors['yellow']}]{colors['reset']}[{colors['red']}{banner}{colors['reset']}]"
def grab_banners_concurrently(ip_address, open_ports, colors, all_websites, max_workers=10):
#print(f"\n\033[1m{colors['yellow']}[Banner Discovery]{colors['reset']}\033[0m\n")
results = []
with ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = executor.map(grab_banner_for_port, [(ip_address, port, colors, all_websites) for port in open_ports])
for future in futures:
results.append(future)
return results
def is_website_up(ip_address, port, protocol):
try:
url = f"{protocol}://{ip_address}:{port}"
requests.head(url, timeout=5, verify=False)
return True
except Exception as e:
return False
def check_http(ip_address, port):
try:
if is_website_up(ip_address, port, 'http'):
print(f"{colors['yellow']}[{colors['green']}Discovery{colors['yellow']}][Web Discovery][{colors['cyan']}Webserver{colors['yellow']}]{colors['reset']}[HTTP][{port}]")
return (port, "http")
except Exception as e:
pass
return None
def check_https(ip_address, port):
try:
if is_website_up(ip_address, port, 'https'):
print(f"{colors['yellow']}[{colors['green']}Discovery{colors['yellow']}][Web Discovery][{colors['cyan']}Webserver{colors['yellow']}]{colors['reset']}[HTTPS][{port}]")
return (port, "ssl")
except Exception as e:
pass
return None
def scan_services(ip_address, service_to_port_map, output_dir, colors, args):
for service in ('http', 'ssl'):
if service in service_to_port_map:
for port in service_to_port_map[service]:
if service in 'http':
web_recon(['http://'+ip_address+':'+str(port)], ['banner,comments,domains,links,files,params'], None, args, "main")
elif service in 'ssl':
web_recon(['https://'+ip_address+':'+str(port)], ['banner,comments,domains,links,files,params'], None, args, "main")
for port in service_to_port_map[service]:
synchronized_print(f"\n\n{colors['yellow']}\033[1m\n[--------] Scanning port: {port} [--------]{colors['reset']}\033[0m")
run_dirsearch(ip_address, port, output_dir, "enum", colors)
def main(scan_type, args):
output_dir = "./Reports/" + str(args.output_dir)
target = args.target
interface = args.interface
scan_type_u = scan_type.upper()
if not os.path.exists(output_dir):
os.makedirs(output_dir)
try:
ports_and_protocol = run_port_discovery(ip_address, os.path.join(output_dir, 'masscan.txt'), interface, colors, scan_type)
open_ports = []
protocols = {}
for port, protocol in ports_and_protocol.items():
open_ports.append(port)
protocols[port] = protocol
if open_ports and scan_type != "os":
open_ports_str = ', '.join(f"{port}/{protocol}" for port, protocol in protocols.items())
print(f"{colors['yellow']}[{colors['green']}Discovery{colors['yellow']}][{colors['cyan']}{scan_type_u}{colors['yellow']}]{colors['reset']}[{open_ports_str}]")
all_websites = {}
with concurrent.futures.ThreadPoolExecutor() as executor:
#print(f"\033[1m{colors['yellow']}[Web Discovery]{colors['yellow']}[Webservers]{colors['reset']}{colors['reset']}\033[0m")
futures_http = {executor.submit(check_http, ip_address, port): port for port in open_ports}
futures_https = {executor.submit(check_https, ip_address, port): port for port in open_ports}
for future in concurrent.futures.as_completed(futures_http):
result = future.result()
if result:
all_websites[result[0]] = result[1]
for future in concurrent.futures.as_completed(futures_https):
result = future.result()
if result:
all_websites[result[0]] = result[1]
service_to_port_map, service_banners = get_service_to_port_map(ip_address, protocols, colors)
# Banner grabbing for all open ports
banners = grab_banners_concurrently(ip_address, open_ports, colors, all_websites)
#print(f"{colors['yellow']}[Banner][{scan_type_u}][{colors['cyan']}http.client/socket/netcat{colors['yellow']}]{colors['reset']}")
for banner in banners:
if banner != None:
print(banner)
#print(f"{colors['yellow']}[Banner][{scan_type_u}][{colors['cyan']}nmap{colors['yellow']}]{colors['reset']}")
for ports in service_banners:
if len(service_banners[ports]) == 0:
service_banners[ports] = "Banner not found"
print(f"{colors['yellow']}[{colors['green']}Discovery{colors['yellow']}][Banner][{colors['cyan']}nmap{colors['yellow']}][{colors['cyan']}{scan_type_u}{colors['yellow']}][{colors['cyan']}{ports}{colors['yellow']}]{colors['reset']}[{colors['red']}{service_banners[ports]}{colors['reset']}]")
else:
if isinstance(service_banners[ports], str):
banner = service_banners[ports].strip()
else:
banner = service_banners[ports]
print(f"{colors['yellow']}[{colors['green']}Discovery{colors['yellow']}][Banner][{colors['cyan']}nmap{colors['yellow']}][{colors['cyan']}{scan_type_u}{colors['yellow']}][{colors['cyan']}{ports}{colors['yellow']}]{colors['reset']}[{colors['reset']}{banner}]")
service_names = list(service_to_port_map.keys())
for service_name in service_names:
for port, service in all_websites.items():
if port in service_to_port_map[service_name]:
if service == service_name:
pass
else:
service_to_port_map[service_name].remove(port)
if service in service_to_port_map:
service_to_port_map[service].append(port)
else:
service_to_port_map[service] = [port]
else:
if service in service_to_port_map:
service_to_port_map[service].append(port)
else:
service_to_port_map[service] = [port]
if port in all_websites:
service = all_websites[port]
service_to_port_map = {service: list(set(ports)) for service, ports in service_to_port_map.items()}
# Run smbmap if the service is netbios-ssn or microsoft-ds
if 'smb' in service_to_port_map:
smbmap_output = run_smbmap(ip_address, output_dir, colors)
if smbmap_output:
with open(os.path.join(output_dir, "smbmap_output.txt"), 'w') as f:
f.write(smbmap_output)
print(smbmap_output)
# Add ffuf for finding vhosts here [Bruteforce]
run_nmap(ip_address, open_ports, os.path.join(output_dir, 'nmap_scripts'), colors, service_to_port_map)
if scan_type == "udp":
return
scan_services(ip_address, service_to_port_map, output_dir, colors, args)
# Run Nikto for detected web servers
for service in ('http', 'ssl'):
if service in service_to_port_map:
for port in service_to_port_map[service]:
run_nikto(ip_address, port, os.path.join(output_dir, f'nikto_port_{port}.txt'), colors)
else:
if scan_type != "os" and scan_type != "udp":
print(f"{colors['red']}[Failure][{scan_type_u}][No open ports detected on {target}]{colors['reset']}")
except Exception as e:
print(f"{colors['red']}[Failure][{scan_type_u}][{str(e)}]{colors['reset']}")
if __name__ == "__main__":
parser = argparse.ArgumentParser(description='Script for web reconnaissance and enumeration.')
subparsers = parser.add_subparsers(dest='command')
enum_parser = subparsers.add_parser('enum', help='Perform enumeration')
enum_parser.add_argument('--target', '-t', required=True, type=str, help='Target IP address or domain name')
enum_parser.add_argument('--output_dir', '-o', required=True, type=str, help='Directory to store output')
enum_parser.add_argument('--interface', '-i', required=True, type=str, help='Interface to use for scanning')
web_recon_parser = subparsers.add_parser('web_recon', help='Perform web reconnaissance')
web_recon_parser.add_argument('--scan_type', '-s', required=True, type=str, nargs='+', help='Type of scan to perform: All, files, params, cookies, links, domains, cewl, comments, banner, http-output, dirbust, nmap')
web_recon_parser.add_argument('--proxy_url', '-p', type=str, help='Proxy URL')
web_recon_parser.add_argument('--depth', '-d', type=str, help='Recurse Depth')
web_recon_parser.add_argument('--cookies', '-c', type=str, help='Cookies')
web_recon_parser.add_argument('--target_url', '-t', required=True, type=str, nargs='+', help='Target URL with paths. Example: http://target.com/path1 and http://target.com/path2 will be "http://target.com path1 path2"')
info_parser = subparsers.add_parser('info', help='Display information of important tools')
args = parser.parse_args()
if args.command == 'enum':
target = args.target
ip_address = target if is_valid_ipv4(target) else target
ip_address = target
if ip_address is None:
print(f"{colors['red']}[-] Invalid IP address or domain name: {target}{colors['reset']}")
sys.exit(1)
print(f"\n\n\033[1m{colors['yellow']}[--------] Scanning IP:{colors['reset']}\033[0m \033[1m{colors['cyan']}{ip_address}{colors['reset']}\033[0m \033[1m{colors['yellow']}[--------]{colors['reset']}\033[0m\n")
scans = ['tcp', 'udp', 'os']
with ProcessPoolExecutor(max_workers=3) as executor:
for scan in scans:
executor.submit(main, scan, args)
elif args.command == 'web_recon':
scan_types = [x.lower() for x in args.scan_type]
if args.depth and int(args.depth) > 30:
print(f"{colors['red']}\n[-] Max allowed depth is 30\n{colors['reset']}")
parser.print_help()
sys.exit()
web_recon(args.target_url, scan_types, args.proxy_url, args, "web")
elif args.command == 'info':
display_info()
else:
parser.print_help()