diff --git a/spockbot/plugins/__init__.py b/spockbot/plugins/__init__.py index 0b50ee5..6ef58df 100644 --- a/spockbot/plugins/__init__.py +++ b/spockbot/plugins/__init__.py @@ -1,4 +1,5 @@ -from spockbot.plugins.core import auth, event, net, taskmanager, ticker, timers +from spockbot.plugins.core import auth, event, net, select, \ + taskmanager, ticker, timers from spockbot.plugins.helpers import auxiliary, channels, chat, clientinfo, \ craft, entities, interact, inventory, movement, \ pathfinding, physics, start, world @@ -7,6 +8,7 @@ ('auth', auth.AuthPlugin), ('event', event.EventPlugin), ('net', net.NetPlugin), + ('select', select.SelectPlugin), ('taskmanager', taskmanager.TaskManager), ('ticker', ticker.TickerPlugin), ('timers', timers.TimersPlugin), diff --git a/spockbot/plugins/core/net.py b/spockbot/plugins/core/net.py index e26074f..17ee30f 100644 --- a/spockbot/plugins/core/net.py +++ b/spockbot/plugins/core/net.py @@ -5,7 +5,6 @@ """ import logging -import select import socket import time @@ -36,44 +35,11 @@ def decrypt(self, data): return self.decryptifier.update(data) -class SelectSocket(socket.socket): - """ - Provides an asynchronous socket with a poll method built on - top of select.select for cross-platform compatiability - """ - def __init__(self, timer): - super(SelectSocket, self).__init__(socket.AF_INET, socket.SOCK_STREAM) - self.sending = False - self.timer = timer - - def poll(self): - flags = [] - if self.sending: - self.sending = False - slist = [(self,), (self,), (self,)] - else: - slist = [(self,), (), (self,)] - timeout = self.timer.get_timeout() - if timeout >= 0: - slist.append(timeout) - try: - rlist, wlist, xlist = select.select(*slist) - except select.error as e: - logger.error("SELECTSOCKET: Socket Error: %s", str(e)) - rlist, wlist, xlist = [], [], [] - if rlist: - flags.append('SOCKET_RECV') - if wlist: - flags.append('SOCKET_SEND') - if xlist: - flags.append('SOCKET_ERR') - return flags - - class NetCore(object): - def __init__(self, sock, event): + def __init__(self, sock, event, select): self.sock = sock self.event = event + self.select = select self.host = None self.port = None self.connected = False @@ -84,21 +50,24 @@ def __init__(self, sock, event): self.sbuff = b'' self.rbuff = BoundBuffer() + def reset(self, sock): + self.__init__(sock, self.event, self.select) + def connect(self, host='localhost', port=25565): self.host = host self.port = port try: - logger.debug("NETCORE: Attempting to connect to host: %s port: %s", + logger.debug('NETCORE: Attempting to connect to host: %s port: %s', host, port) # Set the connect to be a blocking operation self.sock.setblocking(True) - self.sock.connect((self.host, self.port)) + self.sock.connect((host, port)) self.sock.setblocking(False) self.connected = True - self.event.emit('net_connect', (self.host, self.port)) - logger.debug("NETCORE: Connected to host: %s port: %s", host, port) + self.event.emit('net_connect', (host, port)) + logger.debug('NETCORE: Connected to host: %s port: %s', host, port) except socket.error as error: - logger.error("NETCORE: Error on Connect") + logger.error('NETCORE: Error on Connect') self.event.emit('SOCKET_ERR', error) def set_proto_state(self, state): @@ -115,7 +84,7 @@ def push(self, packet): self.sbuff += (self.cipher.encrypt(data) if self.encrypted else data) self.event.emit(packet.ident, packet) self.event.emit(packet.str_ident, packet) - self.sock.sending = True + self.select.schedule_sending(self.sock) def push_packet(self, ident, data): self.push(mcpacket.Packet(ident, data)) @@ -152,21 +121,19 @@ def disable_crypto(self): self.cipher = None self.encrypted = False - def reset(self, sock): - self.__init__(sock, self.event) - @pl_announce('Net') class NetPlugin(PluginBase): - requires = ('Event', 'Timers') + requires = ('Event', 'Select', 'Timers') defaults = { 'bufsize': 4096, 'sock_quit': True, } events = { 'event_tick': 'tick', - 'SOCKET_RECV': 'handle_recv', - 'SOCKET_SEND': 'handle_send', + 'select_recv': 'handle_recv', + 'select_send': 'handle_send', + 'select_err': 'handle_err', 'SOCKET_ERR': 'handle_err', 'SOCKET_HUP': 'handle_hup', 'PLAY`` and ``select__``, where +```` is one of ``recv, send, err``. + +The event payload is always the fileno of the corresponding socket. +(The event plugin deep-copies the payload, but sockets are not serializable) + +Note that the event loop is stopped during selecting. This is good in that +the loop does not consume 100% CPU, but it means you have to register +at least a slow timer if you do stuff on ``event_tick`` and +expect it to be emitted frequently. +""" + +import logging +import select + +from spockbot.plugins.base import PluginBase, pl_announce + +logger = logging.getLogger('spockbot') + + +@pl_announce('Select') +class SelectPlugin(PluginBase): + requires = ('Event', 'Timers') + + def __init__(self, ploader, settings): + super(SelectPlugin, self).__init__(ploader, settings) + self.sockets = set() + self.sending = set() + ploader.provides('Select', self) + + def register_socket(self, sock): + """``poll()``ing will emit events when this socket is ready.""" + self.sockets.add(sock) + + def unregister_socket(self, sock): + self.sockets.remove(sock) + + def schedule_sending(self, sock): + """Emit one event the next time this socket is ready to send.""" + self.sending.add(sock) + + def poll(self): + timeout = self.timers.get_timeout() + if timeout < 0: + timeout = 5 # do not hang + + select_args = [ + tuple(self.sockets), + tuple(self.sending), + tuple(self.sockets), + timeout, + ] + self.sending.clear() + + try: + ready_lists = select.select(*select_args) + except select.error as e: + logger.error('SELECTSOCKET: Socket Error: "%s" %s', str(e), e.args) + return + + for ready_socks, kind in zip(ready_lists, ('recv', 'send', 'err')): + for sock in ready_socks: + self.event.emit('select_%s' % kind, sock.fileno()) + self.event.emit('select_%s_%s' % (kind, sock.fileno()), + sock.fileno())