-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathgzip_connection.py
144 lines (137 loc) · 5.36 KB
/
gzip_connection.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
#!/usr/bin/python
import errno
import select
import socket
import struct
import weakref
import zlib
class GzipSocket(object):
def __init__(self, socket):
self.sock = socket
self.pollobj = select.poll()
self.pollobj.register(self.sock.fileno())
def poll(self):
result = self.pollobj.poll(0)
return len(result) > 0 and result[0][1] & select.EPOLLIN
def nextint(self):
packed = self.sock.recv(4, socket.MSG_WAITALL)
if len(packed) == 4:
return struct.unpack("!I", packed)[0]
elif len(packed) == 0:
return None
else:
raise Exception("Incomplete packetlength received")
def nextfragment(self):
length = self.nextint()
if length == None:
return None
buf = self.sock.recv(length, socket.MSG_WAITALL)
# Force gzip format. UNDOCUMENTED?!
data = zlib.decompress(buf, 16+zlib.MAX_WBITS)
return data
def sendint(self, i):
self.sock.sendall(struct.pack("!I", i))
def sendfragment(self, data):
# Force gzip format. UNDOCUMENTED?! (and unreachable in zlib.compress)
compressor = zlib.compressobj(9, zlib.DEFLATED, 16+zlib.MAX_WBITS, zlib.DEF_MEM_LEVEL, 0)
buf = compressor.compress(data) + compressor.flush()
res = self.sendint(len(buf))
self.sock.sendall(buf)
def getpeername(self):
return self.sock.getpeername()
class GzipSocketNonBlocking(GzipSocket):
def __init__(self, *args, **kwargs):
GzipSocket.__init__(self, *args, **kwargs)
self.sock.setblocking(0)
self.readbuf = ""
self.writebuf = ""
def process_buffers(self, getpoll=False):
result = self.pollobj.poll(0)
if len(result):
result = result[0][1]
else:
result = 0
acted = False
if result & select.POLLOUT and self.writebuf:
sent = self.sock.send(self.writebuf)
self.writebuf = self.writebuf[sent:]
acted = True
if result & select.POLLIN:
bytesread = self.sock.recv(2**16)
self.readbuf += bytesread
if not bytesread:
result |= select.POLLHUP
else:
acted = True
if result & (select.POLLERR | select.POLLNVAL):
raise socket.Error("poll returned POLLERR or POLLNVAL")
return result if getpoll else acted
def poll(self):
pollresult = self.process_buffers(True)
if pollresult & select.POLLHUP:
# You're not getting any more, so get the information to the caller
return True
if len(self.readbuf) < 4:
return False
length = struct.unpack("!I", self.readbuf[:4])[0]
return len(self.readbuf) >= 4 + length
def nextint(self):
pollresult = self.process_buffers(True)
if not self.readbuf and pollresult & select.POLLHUP:
return None
if len(self.readbuf) < 4:
raise socket.Error(errno.EWOULDBLOCK, "Not enough bytes read")
integer = struct.unpack("!I", self.readbuf[:4])[0]
self.readbuf = self.readbuf[4:]
return integer
def nextfragment(self):
length = self.nextint()
if length == None:
return None
if len(self.readbuf) < length:
raise socket.Error(errno.EWOULDBLOCK, "Not enough bytes read")
# Force gzip format. UNDOCUMENTED?!
data = zlib.decompress(self.readbuf[:length], 16+zlib.MAX_WBITS)
self.readbuf = self.readbuf[length:]
return data
def sendint(self, i):
self.writebuf += struct.pack("!I", i)
self.process_buffers()
def sendfragment(self, data):
# Force gzip format. UNDOCUMENTED?! (and unreachable in zlib.compress)
compressor = zlib.compressobj(9, zlib.DEFLATED, 16+zlib.MAX_WBITS, zlib.DEF_MEM_LEVEL, 0)
buf = compressor.compress(data) + compressor.flush()
res = self.sendint(len(buf))
self.writebuf += buf
self.process_buffers()
class GzipServer(object):
def __init__(self, server="", port=15000, clientclass=GzipSocket):
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.sock.bind((server, port))
self.sock.listen(1)
self.pollobj = select.poll()
self.pollobj.register(self.sock.fileno())
self.clients = weakref.WeakValueDictionary()
self.next_id = 1
self.clientclass = clientclass
def poll(self):
result = self.pollobj.poll(0)
return len(result) > 0 and result[0][1] & select.EPOLLIN
def accept(self):
base_sock, _ = self.sock.accept()
request = GzipSocket(base_sock).nextint() # We need to block for this one
wrapped_sock = self.clientclass(base_sock)
if request != 0:
raise NotImplementedError("connection recovering not supported")
wrapped_sock.connection_num = self.next_id
self.next_id += 1
wrapped_sock.sendint(wrapped_sock.connection_num)
self.clients[wrapped_sock.connection_num] = wrapped_sock
return wrapped_sock
def GzipClient(server="server.wesnoth.org", port=15000):
base_sock = socket.create_connection((server, port))
wrapped_sock = GzipSocket(base_sock)
wrapped_sock.sendint(0)
wrapped_sock.connectionnum = wrapped_sock.nextint()
return wrapped_sock
__all__ = ['GzipServer', 'GzipClient']