diff --git a/check.py b/check.py index de4060d..853f1da 100755 --- a/check.py +++ b/check.py @@ -29,7 +29,6 @@ def check_pep8(srcdir): print(">>> Running pep8...") clean = True pep8.process_options(['']) - pep8.options.repeat=True for pyfile in findpy(srcdir): if pep8.Checker(pyfile).check_all() != 0: clean = False @@ -39,12 +38,12 @@ def check_pep8(srcdir): def main(): src = os.path.join(os.path.dirname(sys.argv[0]), 'stun') if not check_pyflakes(src): - print + print('') err = "ERROR: pyflakes failed on some source files\n" err += "ERROR: please fix the errors and re-run this script" print(err) elif not check_pep8(src): - print + print('') err = "ERROR: pep8 failed on some source files\n" err += "ERROR: please fix the errors and re-run this script" print(err) diff --git a/setup.py b/setup.py index a595e4d..bae87f4 100644 --- a/setup.py +++ b/setup.py @@ -8,7 +8,7 @@ setup( name='pystun', - version="0.0.4", + version="0.0.5", packages=find_packages(), scripts=['bin/pystun'], zip_safe=False, @@ -23,5 +23,7 @@ "Topic :: Internet", "Topic :: System :: Networking :: Firewalls", "Programming Language :: Python", + "Programming Language :: Python :: 2", + "Programming Language :: Python :: 3" ], ) diff --git a/stun/__init__.py b/stun/__init__.py index dc0c0f5..86d61ea 100644 --- a/stun/__init__.py +++ b/stun/__init__.py @@ -1,11 +1,11 @@ -#coding=utf-8 +# coding=utf-8 +import logging import random import socket -import binascii -import logging +import struct -__version__ = "0.0.4" +__version__ = "0.0.5" log = logging.getLogger("pystun") @@ -22,30 +22,33 @@ def enable_logging(): 'stun.voipbuster.com', ) -#stun attributes -MappedAddress = '0001' -ResponseAddress = '0002' -ChangeRequest = '0003' -SourceAddress = '0004' -ChangedAddress = '0005' -Username = '0006' -Password = '0007' -MessageIntegrity = '0008' -ErrorCode = '0009' -UnknownAttribute = '000A' -ReflectedFrom = '000B' -XorOnly = '0021' -XorMappedAddress = '8020' -ServerName = '8022' -SecondaryAddress = '8050' # Non standard extention - -#types for a stun message -BindRequestMsg = '0001' -BindResponseMsg = '0101' -BindErrorResponseMsg = '0111' -SharedSecretRequestMsg = '0002' -SharedSecretResponseMsg = '0102' -SharedSecretErrorResponseMsg = '0112' +# TODO: Enum would be perfect here, instead of all this +# manual mapping between numbers and names. + +# stun attributes +MappedAddress = 0x0001 +ResponseAddress = 0x0002 +ChangeRequest = 0x0003 +SourceAddress = 0x0004 +ChangedAddress = 0x0005 +Username = 0x0006 +Password = 0x0007 +MessageIntegrity = 0x0008 +ErrorCode = 0x0009 +UnknownAttribute = 0x000A +ReflectedFrom = 0x000B +XorOnly = 0x0021 +XorMappedAddress = 0x8020 +ServerName = 0x8022 +SecondaryAddress = 0x8050 # Non standard extention + +# types for a stun message +BindRequestMsg = 0x0001 +BindResponseMsg = 0x0101 +BindErrorResponseMsg = 0x0111 +SharedSecretRequestMsg = 0x0002 +SharedSecretResponseMsg = 0x0102 +SharedSecretErrorResponseMsg = 0x0112 dictAttrToVal = {'MappedAddress': MappedAddress, 'ResponseAddress': ResponseAddress, @@ -86,30 +89,32 @@ def enable_logging(): def _initialize(): - items = dictAttrToVal.items() - for i in xrange(len(items)): - dictValToAttr.update({items[i][1]: items[i][0]}) - items = dictMsgTypeToVal.items() - for i in xrange(len(items)): - dictValToMsgType.update({items[i][1]: items[i][0]}) + global dictValToArray, dictValToMsgType + dictValToArray = dict((v, k) for k, v in dictAttrToVal.items()) + dictValToMsgType = dict((v, k) for k, v in dictMsgTypeToVal.items()) def gen_tran_id(): - a = '' - for i in xrange(32): - a += random.choice('0123456789ABCDEF') - #return binascii.a2b_hex(a) - return a + return bytearray(random.randrange(256) for _ in range(16)) + + +def unpack_port_ip(buf): + port_ipbits = struct.unpack('>H4B', buf) + ip = '.'.join(map(str, port_ipbits[1:])) + return port_ipbits[0], ip + + +def make_change_request(a, b): + return struct.pack('>HHI', ChangeRequest, a, b) -def stun_test(sock, host, port, source_ip, source_port, send_data=""): +def stun_test(sock, host, port, source_ip, source_port, send_data=b""): retVal = {'Resp': False, 'ExternalIP': None, 'ExternalPort': None, 'SourceIP': None, 'SourcePort': None, 'ChangedIP': None, 'ChangedPort': None} - str_len = "%#04d" % (len(send_data) / 2) + str_len = len(send_data) // 2 tranid = gen_tran_id() - str_data = ''.join([BindRequestMsg, str_len, tranid, send_data]) - data = binascii.a2b_hex(str_data) + data = struct.pack('>HH', BindRequestMsg, str_len) + tranid + send_data recvCorr = False while not recvCorr: recieved = False @@ -132,51 +137,34 @@ def stun_test(sock, host, port, source_ip, source_port, send_data=""): else: retVal['Resp'] = False return retVal - msgtype = binascii.b2a_hex(buf[0:2]) + msgtype, = struct.unpack('>H', buf[0:2]) bind_resp_msg = dictValToMsgType[msgtype] == "BindResponseMsg" - tranid_match = tranid.upper() == binascii.b2a_hex(buf[4:20]).upper() + tranid_match = tranid == buf[4:20] if bind_resp_msg and tranid_match: recvCorr = True retVal['Resp'] = True - len_message = int(binascii.b2a_hex(buf[2:4]), 16) + len_message, = struct.unpack('>H', buf[2:4]) len_remain = len_message base = 20 while len_remain: - attr_type = binascii.b2a_hex(buf[base:(base + 2)]) - attr_len = int(binascii.b2a_hex(buf[(base + 2):(base + 4)]), - 16) + attr_type, attr_len = struct.unpack('>HH', buf[base:base+4]) if attr_type == MappedAddress: - port = int(binascii.b2a_hex(buf[base + 6:base + 8]), 16) - ip = ".".join([ - str(int(binascii.b2a_hex(buf[base + 8:base + 9]), 16)), - str(int(binascii.b2a_hex(buf[base + 9:base + 10]), 16)), - str(int(binascii.b2a_hex(buf[base + 10:base + 11]), 16)), - str(int(binascii.b2a_hex(buf[base + 11:base + 12]), 16))]) + port, ip = unpack_port_ip(buf[base+6:base+12]) retVal['ExternalIP'] = ip retVal['ExternalPort'] = port if attr_type == SourceAddress: - port = int(binascii.b2a_hex(buf[base + 6:base + 8]), 16) - ip = ".".join([ - str(int(binascii.b2a_hex(buf[base + 8:base + 9]), 16)), - str(int(binascii.b2a_hex(buf[base + 9:base + 10]), 16)), - str(int(binascii.b2a_hex(buf[base + 10:base + 11]), 16)), - str(int(binascii.b2a_hex(buf[base + 11:base + 12]), 16))]) + port, ip = unpack_port_ip(buf[base+6:base+12]) retVal['SourceIP'] = ip retVal['SourcePort'] = port if attr_type == ChangedAddress: - port = int(binascii.b2a_hex(buf[base + 6:base + 8]), 16) - ip = ".".join([ - str(int(binascii.b2a_hex(buf[base + 8:base + 9]), 16)), - str(int(binascii.b2a_hex(buf[base + 9:base + 10]), 16)), - str(int(binascii.b2a_hex(buf[base + 10:base + 11]), 16)), - str(int(binascii.b2a_hex(buf[base + 11:base + 12]), 16))]) + port, ip = unpack_port_ip(buf[base+6:base+12]) retVal['ChangedIP'] = ip retVal['ChangedPort'] = port - #if attr_type == ServerName: - #serverName = buf[(base+4):(base+4+attr_len)] + # if attr_type == ServerName: + # serverName = buf[(base+4):(base+4+attr_len)] base = base + 4 + attr_len len_remain = len_remain - (4 + attr_len) - #s.close() + # s.close() return retVal @@ -203,7 +191,7 @@ def get_nat_type(s, source_ip, source_port, stun_host=None, stun_port=3478): changedIP = ret['ChangedIP'] changedPort = ret['ChangedPort'] if ret['ExternalIP'] == source_ip: - changeRequest = ''.join([ChangeRequest, '0004', "00000006"]) + changeRequest = make_change_request(4, 6) ret = stun_test(s, stun_host, port, source_ip, source_port, changeRequest) if ret['Resp']: @@ -211,7 +199,7 @@ def get_nat_type(s, source_ip, source_port, stun_host=None, stun_port=3478): else: typ = SymmetricUDPFirewall else: - changeRequest = ''.join([ChangeRequest, '0004', "00000006"]) + changeRequest = make_change_request(4, 6) log.debug("Do Test2") ret = stun_test(s, stun_host, port, source_ip, source_port, changeRequest) @@ -226,13 +214,12 @@ def get_nat_type(s, source_ip, source_port, stun_host=None, stun_port=3478): typ = ChangedAddressError else: if exIP == ret['ExternalIP'] and exPort == ret['ExternalPort']: - changePortRequest = ''.join([ChangeRequest, '0004', - "00000002"]) + changePortRequest = make_change_request(4, 2) log.debug("Do Test3") ret = stun_test(s, changedIP, port, source_ip, source_port, changePortRequest) log.debug("Result: %s" % ret) - if ret['Resp'] == True: + if ret['Resp']: typ = RestricNAT else: typ = RestricPortNAT @@ -257,9 +244,9 @@ def get_ip_info(source_ip="0.0.0.0", source_port=54320, stun_host=None, def main(): nat_type, external_ip, external_port = get_ip_info() - print "NAT Type:", nat_type - print "External IP:", external_ip - print "External Port:", external_port + print("NAT Type: {0}".format(nat_type)) + print("External IP: {0}".format(external_ip)) + print("External Port: {0}".format(external_port)) if __name__ == '__main__': main() diff --git a/stun/cli.py b/stun/cli.py index c96bda8..42045b2 100644 --- a/stun/cli.py +++ b/stun/cli.py @@ -1,4 +1,6 @@ -#coding=utf-8 +#!/usr/bin/env python +# coding=utf-8 + import optparse import stun @@ -26,9 +28,9 @@ def main(): stun_host=options.stun_host, stun_port=options.stun_port) nat_type, external_ip, external_port = stun.get_ip_info(**kwargs) - print "NAT Type:", nat_type - print "External IP:", external_ip - print "External Port:", external_port + print("NAT Type: {0}".format(nat_type)) + print("External IP: {0}".format(external_ip)) + print("External Port: {0}".format(external_port)) if __name__ == '__main__': main()