Skip to content

Python 3.x compat, cleanup #8

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 5 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions check.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ def check_pep8(srcdir):
print(">>> Running pep8...")
clean = True
pep8.process_options([''])
pep8.options.repeat=True
for pyfile in findpy(srcdir):
if pep8.Checker(pyfile).check_all() != 0:
clean = False
Expand All @@ -39,12 +38,12 @@ def check_pep8(srcdir):
def main():
src = os.path.join(os.path.dirname(sys.argv[0]), 'stun')
if not check_pyflakes(src):
print
print('')
err = "ERROR: pyflakes failed on some source files\n"
err += "ERROR: please fix the errors and re-run this script"
print(err)
elif not check_pep8(src):
print
print('')
err = "ERROR: pep8 failed on some source files\n"
err += "ERROR: please fix the errors and re-run this script"
print(err)
Expand Down
4 changes: 3 additions & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

setup(
name='pystun',
version="0.0.4",
version="0.0.5",
packages=find_packages(),
scripts=['bin/pystun'],
zip_safe=False,
Expand All @@ -23,5 +23,7 @@
"Topic :: Internet",
"Topic :: System :: Networking :: Firewalls",
"Programming Language :: Python",
"Programming Language :: Python :: 2",
"Programming Language :: Python :: 3"
],
)
143 changes: 65 additions & 78 deletions stun/__init__.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
#coding=utf-8
# coding=utf-8

import logging
import random
import socket
import binascii
import logging
import struct

__version__ = "0.0.4"
__version__ = "0.0.5"

log = logging.getLogger("pystun")

Expand All @@ -22,30 +22,33 @@ def enable_logging():
'stun.voipbuster.com',
)

#stun attributes
MappedAddress = '0001'
ResponseAddress = '0002'
ChangeRequest = '0003'
SourceAddress = '0004'
ChangedAddress = '0005'
Username = '0006'
Password = '0007'
MessageIntegrity = '0008'
ErrorCode = '0009'
UnknownAttribute = '000A'
ReflectedFrom = '000B'
XorOnly = '0021'
XorMappedAddress = '8020'
ServerName = '8022'
SecondaryAddress = '8050' # Non standard extention

#types for a stun message
BindRequestMsg = '0001'
BindResponseMsg = '0101'
BindErrorResponseMsg = '0111'
SharedSecretRequestMsg = '0002'
SharedSecretResponseMsg = '0102'
SharedSecretErrorResponseMsg = '0112'
# TODO: Enum would be perfect here, instead of all this
# manual mapping between numbers and names.

# stun attributes
MappedAddress = 0x0001
ResponseAddress = 0x0002
ChangeRequest = 0x0003
SourceAddress = 0x0004
ChangedAddress = 0x0005
Username = 0x0006
Password = 0x0007
MessageIntegrity = 0x0008
ErrorCode = 0x0009
UnknownAttribute = 0x000A
ReflectedFrom = 0x000B
XorOnly = 0x0021
XorMappedAddress = 0x8020
ServerName = 0x8022
SecondaryAddress = 0x8050 # Non standard extention

# types for a stun message
BindRequestMsg = 0x0001
BindResponseMsg = 0x0101
BindErrorResponseMsg = 0x0111
SharedSecretRequestMsg = 0x0002
SharedSecretResponseMsg = 0x0102
SharedSecretErrorResponseMsg = 0x0112

dictAttrToVal = {'MappedAddress': MappedAddress,
'ResponseAddress': ResponseAddress,
Expand Down Expand Up @@ -86,30 +89,32 @@ def enable_logging():


def _initialize():
items = dictAttrToVal.items()
for i in xrange(len(items)):
dictValToAttr.update({items[i][1]: items[i][0]})
items = dictMsgTypeToVal.items()
for i in xrange(len(items)):
dictValToMsgType.update({items[i][1]: items[i][0]})
global dictValToArray, dictValToMsgType
dictValToArray = dict((v, k) for k, v in dictAttrToVal.items())
dictValToMsgType = dict((v, k) for k, v in dictMsgTypeToVal.items())


def gen_tran_id():
a = ''
for i in xrange(32):
a += random.choice('0123456789ABCDEF')
#return binascii.a2b_hex(a)
return a
return bytearray(random.randrange(256) for _ in range(16))


def unpack_port_ip(buf):
port_ipbits = struct.unpack('>H4B', buf)
ip = '.'.join(map(str, port_ipbits[1:]))
return port_ipbits[0], ip


def make_change_request(a, b):
return struct.pack('>HHI', ChangeRequest, a, b)


def stun_test(sock, host, port, source_ip, source_port, send_data=""):
def stun_test(sock, host, port, source_ip, source_port, send_data=b""):
retVal = {'Resp': False, 'ExternalIP': None, 'ExternalPort': None,
'SourceIP': None, 'SourcePort': None, 'ChangedIP': None,
'ChangedPort': None}
str_len = "%#04d" % (len(send_data) / 2)
str_len = len(send_data) // 2
tranid = gen_tran_id()
str_data = ''.join([BindRequestMsg, str_len, tranid, send_data])
data = binascii.a2b_hex(str_data)
data = struct.pack('>HH', BindRequestMsg, str_len) + tranid + send_data
recvCorr = False
while not recvCorr:
recieved = False
Expand All @@ -132,51 +137,34 @@ def stun_test(sock, host, port, source_ip, source_port, send_data=""):
else:
retVal['Resp'] = False
return retVal
msgtype = binascii.b2a_hex(buf[0:2])
msgtype, = struct.unpack('>H', buf[0:2])
bind_resp_msg = dictValToMsgType[msgtype] == "BindResponseMsg"
tranid_match = tranid.upper() == binascii.b2a_hex(buf[4:20]).upper()
tranid_match = tranid == buf[4:20]
if bind_resp_msg and tranid_match:
recvCorr = True
retVal['Resp'] = True
len_message = int(binascii.b2a_hex(buf[2:4]), 16)
len_message, = struct.unpack('>H', buf[2:4])
len_remain = len_message
base = 20
while len_remain:
attr_type = binascii.b2a_hex(buf[base:(base + 2)])
attr_len = int(binascii.b2a_hex(buf[(base + 2):(base + 4)]),
16)
attr_type, attr_len = struct.unpack('>HH', buf[base:base+4])
if attr_type == MappedAddress:
port = int(binascii.b2a_hex(buf[base + 6:base + 8]), 16)
ip = ".".join([
str(int(binascii.b2a_hex(buf[base + 8:base + 9]), 16)),
str(int(binascii.b2a_hex(buf[base + 9:base + 10]), 16)),
str(int(binascii.b2a_hex(buf[base + 10:base + 11]), 16)),
str(int(binascii.b2a_hex(buf[base + 11:base + 12]), 16))])
port, ip = unpack_port_ip(buf[base+6:base+12])
retVal['ExternalIP'] = ip
retVal['ExternalPort'] = port
if attr_type == SourceAddress:
port = int(binascii.b2a_hex(buf[base + 6:base + 8]), 16)
ip = ".".join([
str(int(binascii.b2a_hex(buf[base + 8:base + 9]), 16)),
str(int(binascii.b2a_hex(buf[base + 9:base + 10]), 16)),
str(int(binascii.b2a_hex(buf[base + 10:base + 11]), 16)),
str(int(binascii.b2a_hex(buf[base + 11:base + 12]), 16))])
port, ip = unpack_port_ip(buf[base+6:base+12])
retVal['SourceIP'] = ip
retVal['SourcePort'] = port
if attr_type == ChangedAddress:
port = int(binascii.b2a_hex(buf[base + 6:base + 8]), 16)
ip = ".".join([
str(int(binascii.b2a_hex(buf[base + 8:base + 9]), 16)),
str(int(binascii.b2a_hex(buf[base + 9:base + 10]), 16)),
str(int(binascii.b2a_hex(buf[base + 10:base + 11]), 16)),
str(int(binascii.b2a_hex(buf[base + 11:base + 12]), 16))])
port, ip = unpack_port_ip(buf[base+6:base+12])
retVal['ChangedIP'] = ip
retVal['ChangedPort'] = port
#if attr_type == ServerName:
#serverName = buf[(base+4):(base+4+attr_len)]
# if attr_type == ServerName:
# serverName = buf[(base+4):(base+4+attr_len)]
base = base + 4 + attr_len
len_remain = len_remain - (4 + attr_len)
#s.close()
# s.close()
return retVal


Expand All @@ -203,15 +191,15 @@ def get_nat_type(s, source_ip, source_port, stun_host=None, stun_port=3478):
changedIP = ret['ChangedIP']
changedPort = ret['ChangedPort']
if ret['ExternalIP'] == source_ip:
changeRequest = ''.join([ChangeRequest, '0004', "00000006"])
changeRequest = make_change_request(4, 6)
ret = stun_test(s, stun_host, port, source_ip, source_port,
changeRequest)
if ret['Resp']:
typ = OpenInternet
else:
typ = SymmetricUDPFirewall
else:
changeRequest = ''.join([ChangeRequest, '0004', "00000006"])
changeRequest = make_change_request(4, 6)
log.debug("Do Test2")
ret = stun_test(s, stun_host, port, source_ip, source_port,
changeRequest)
Expand All @@ -226,13 +214,12 @@ def get_nat_type(s, source_ip, source_port, stun_host=None, stun_port=3478):
typ = ChangedAddressError
else:
if exIP == ret['ExternalIP'] and exPort == ret['ExternalPort']:
changePortRequest = ''.join([ChangeRequest, '0004',
"00000002"])
changePortRequest = make_change_request(4, 2)
log.debug("Do Test3")
ret = stun_test(s, changedIP, port, source_ip, source_port,
changePortRequest)
log.debug("Result: %s" % ret)
if ret['Resp'] == True:
if ret['Resp']:
typ = RestricNAT
else:
typ = RestricPortNAT
Expand All @@ -257,9 +244,9 @@ def get_ip_info(source_ip="0.0.0.0", source_port=54320, stun_host=None,

def main():
nat_type, external_ip, external_port = get_ip_info()
print "NAT Type:", nat_type
print "External IP:", external_ip
print "External Port:", external_port
print("NAT Type: {0}".format(nat_type))
print("External IP: {0}".format(external_ip))
print("External Port: {0}".format(external_port))

if __name__ == '__main__':
main()
10 changes: 6 additions & 4 deletions stun/cli.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
#coding=utf-8
#!/usr/bin/env python
# coding=utf-8

import optparse

import stun
Expand Down Expand Up @@ -26,9 +28,9 @@ def main():
stun_host=options.stun_host,
stun_port=options.stun_port)
nat_type, external_ip, external_port = stun.get_ip_info(**kwargs)
print "NAT Type:", nat_type
print "External IP:", external_ip
print "External Port:", external_port
print("NAT Type: {0}".format(nat_type))
print("External IP: {0}".format(external_ip))
print("External Port: {0}".format(external_port))

if __name__ == '__main__':
main()