Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TCPTransport: switch from select.select() to selectors #81

Open
wants to merge 13 commits into
base: master
Choose a base branch
from
6 changes: 6 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ lib64
include
.Python

# Example log
example.log

# Installer logs
pip-log.txt

Expand All @@ -32,9 +35,12 @@ nosetests.xml
*.mo

# Mr Developer
.envrc
.mr.developer.cfg
.project
.pydevproject
.python-version
.venv

_darcs
.boring
Expand Down
4 changes: 2 additions & 2 deletions examples/logsetup.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ def filter(self, logrecord):

logcfg = { 'version': 1,
'formatters': {
'normal': {'format': '%(levelname)-8s %(message)s'},
'actor': {'format': '%(levelname)-8s %(actorAddress)s => %(message)s'}},
'normal': {'format': '%(asctime)s %(levelname)-8s %(message)s'},
'actor': {'format': '%(asctime)s %(levelname)-8s %(actorAddress)s => %(message)s'}},
'filters': { 'isActorLog': { '()': actorLogFilter},
'notActorLog': { '()': notActorLogFilter}},
'handlers': { 'h1': {'class': 'logging.FileHandler',
Expand Down
120 changes: 120 additions & 0 deletions examples/socketstress.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
# Measures the time required to send and receive to/from
# a given number of actors with the intention to
# compare efficiency of different I/O multiplexing
# methods.
#
# Run this from the top level as:
# $ python examples/socketstress.py [<number-of-workers>] [<number-of-repetitions>]


import logging
import time
from logsetup import logcfg
from datetime import timedelta
from thespian.actors import *

### messages

class BaseMsg(object): pass


class Ping(BaseMsg): pass


class Pong(BaseMsg): pass


class Run(BaseMsg): pass


class Start(BaseMsg):
def __init__(self, num_workers, num_repetitions):
self.num_workers = num_workers
self.num_repetitions = num_repetitions


class WorkerStart(BaseMsg): pass


class WorkerStarted(BaseMsg): pass

### actors

class Dispatcher(ActorTypeDispatcher):
def __init__(self):
self.num_workers = 0
self.workers = []
self.pong_count = 0
self.worker_started_count = 0
self.sender = None
self.has_completed = False

def receiveMsg_Start(self, message, sender):
self.num_workers = message.num_workers
self.num_repetitions = message.num_repetitions
self.sender = sender
logging.info('receiveMsg_Start(): creating %s workers...', self.num_workers)
for _ in range(self.num_workers):
worker = self.createActor(Worker)
self.workers.append(worker)
self.send(worker, WorkerStart())
logging.info('receiveMsg_Start(): done', self.num_workers)

def receiveMsg_Run(self, message, sender):
logging.info('receiveMsg_Run(): sending pings...')
self.sender = sender
for each in self.workers:
self.send(each, Ping())
logging.info('receiveMsg_Run(): done')

def receiveMsg_Pong(self, message, sender):
self.pong_count += 1
if self.pong_count >= self.num_workers * self.num_repetitions and not self.has_completed:
self.has_completed = True
self.send(self.sender, "done")
if self.num_repetitions > 1:
self.send(sender, Ping())

def receiveMsg_WorkerStarted(self, message, sender):
self.worker_started_count += 1
if self.worker_started_count == self.num_workers:
logging.info('receiveMsg_WorkerStarted(): %s workers started', self.worker_started_count)
self.send(self.sender, "started")


class Worker(ActorTypeDispatcher):
def receiveMsg_Ping(self, message, sender):
self.send(sender, Pong())

def receiveMsg_WorkerStart(self, message, sender):
self.send(sender, WorkerStarted())


def run_example(num_workers, num_repetitions):
try:
num_workers = int(num_workers)
num_repetitions = int(num_repetitions)
except ValueError:
print('usage: socketstress.py [<num-workers>] [<num-repetitions>]')
sys.exit(1)
asys = ActorSystem("multiprocTCPBase", logDefs=logcfg)
try:
print(f'socketstress with {num_workers} worker(s) and {num_repetitions} repetition(s)')
print('creating dispatcher...')
dispatcher = ActorSystem().createActor(Dispatcher)
print('starting workers...')
ActorSystem().ask(dispatcher, Start(num_workers, num_repetitions))
print('run!')
start = time.perf_counter()
ActorSystem().ask(dispatcher, Run())
end = time.perf_counter()
print(f'run completed in {end - start} seconds')
finally:
asys.shutdown()

if __name__ == "__main__":
import sys
run_example(
sys.argv[1] if len(sys.argv) > 1 else "3",
sys.argv[2] if len(sys.argv) > 2 else "1"
)
58 changes: 33 additions & 25 deletions thespian/system/transport/TCPTransport.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@
from thespian.system.messages.multiproc import ChildMayHaveDied
from thespian.system.addressManager import ActorLocalAddress
import socket
import select
import selectors
from datetime import timedelta
try:
import cPickle as pickle
Expand Down Expand Up @@ -1072,6 +1072,13 @@ def _socketFile(sendOrRecv):
def set_watch(self, watchlist):
self._watches = watchlist

@staticmethod
def _check_fd(fd):
sel = selectors.DefaultSelector()
sel.register(fd, selectors.EVENT_READ)
_ = sel.select(0)
sel.close()

def _runWithExpiry(self, incomingHandler):
xmitOnly = incomingHandler == TransmitOnly or \
isinstance(incomingHandler, TransmitOnly)
Expand Down Expand Up @@ -1136,27 +1143,27 @@ def _runWithExpiry(self, incomingHandler):

if not xmitOnly:
wrecv.extend([self.socket.fileno()])
else:
# Windows does not support calling select with three
# empty lists, so as a workaround, supply the main
# listener if everything else is pending delays (or
# completed but unrealized) here, and ensure the main
# listener does not accept any listens below.
if not wrecv and not wsend:
if not hasattr(self, 'dummySock'):
self.dummySock = socket.socket(socket.AF_INET,
socket.SOCK_DGRAM,
socket.IPPROTO_UDP)
wrecv.extend([self.dummySock.fileno()])

if self._watches:
wrecv.extend(self._watches)

rrecv, rsend, rerr = [], [], []
try:
rrecv, rsend, rerr = select.select(wrecv, wsend,
set(wsend+wrecv), delay)
except (OSError, select.error, ValueError) as ex:
# creates selector on each loop, perhaps this could be optimized
sel = selectors.DefaultSelector()
for each in wrecv:
sel.register(each, selectors.EVENT_READ)
for each in wsend:
sel.register(each, selectors.EVENT_WRITE)
events = sel.select(delay)
for key, event in events:
if event & selectors.EVENT_READ:
rrecv.append(key.fd)
if event & selectors.EVENT_WRITE:
rsend.append(key.fd)
sel.close()
except (OSError, ValueError) as ex:
thesplog('selector exception: %s', ex, level=logging.DEBUG)
errnum = errno.EBADF if isinstance(ex, ValueError) \
else getattr(ex, 'errno', ex.args[0])
if err_select_retry(errnum):
Expand All @@ -1175,25 +1182,26 @@ def _runWithExpiry(self, incomingHandler):
bad = []
for each in self._watches:
try:
_ = select.select([each], [], [], 0)
except Exception:
TCPTransport._check_fd(each)
except Exception as ex:
thesplog('watcher %s is bad: %', each, ex)
bad.append(each)
if not bad:
thesplog('bad internal file descriptor!')
try:
_ = select.select([self.socket.fileno()], [], [], 0)
except Exception:
thesplog('listen %s is bad', self.socket.fileno)
TCPTransport._check_fd(self.socket.fileno())
except Exception as ex:
thesplog('listen %s is bad: %s', self.socket.fileno, ex)
rerr.append(self.socket.fileno)
for each in wrecv:
try:
_ = select.select([each], [], [], 0)
except Exception:
thesplog('wrecv %s is bad', each)
TCPTransport._check_fd(each)
except Exception as ex:
thesplog('wrecv %s is bad: %s', each, ex)
rerr.append(each)
for each in wsend:
try:
select.select([each], [], [], 0)
TCPTransport._check_fd(each)
except Exception:
thesplog('wsend %s is bad', each)
rerr.append(each)
Expand Down
2 changes: 1 addition & 1 deletion thespian/system/transport/errmgmt.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ def err_conn_refused(errex):


def err_send_inprogress(err):
return err in [errno.EINPROGRESS, errno.EAGAIN]
return err in [errno.EINPROGRESS, errno.EAGAIN, errno.ENOTCONN]
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Required to pass thespian/test/test_generators.py test on MacOS. selectors signal socket as ready to send yet [Errno 57] Socket is not connected error is raised when sending in _next_XMIT_2(). After a slight backoff, all works fine. From what I could tell looking at packet dumps, socket.send() was throwing this error after TCP connection was already established. What is special about this test is the use of global names combined with creation of multiple actors in quick succession.

Fun fact: This test was created after report from Rally lead developer in March 2017. Rally no longer uses global names. They were removed from Rally by Thespian lead developer in December 2017. :-)



def err_send_connrefused(errex):
Expand Down