Skip to content

Commit

Permalink
Restructuring of the asynchronous I/O core with workers and new event…
Browse files Browse the repository at this point in the history
… optimisation
  • Loading branch information
jczic committed Feb 15, 2024
1 parent 393891f commit 2f1e982
Showing 1 changed file with 196 additions and 56 deletions.
252 changes: 196 additions & 56 deletions MicroWebSrv2/libs/XAsyncSockets.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
"""


from _thread import allocate_lock, start_new_thread
from _thread import allocate_lock, start_new_thread, stack_size
from time import sleep
from select import select
import socket
Expand All @@ -29,25 +29,21 @@ class XAsyncSocketsPool :
_CHECK_SEC_INTERVAL = 1.0

def __init__(self) :
self._processing = False
self._threadsCount = 0
self._processing = None
self._microWorkers = None
self._opLock = allocate_lock()
self._asyncSockets = { }
self._readList = [ ]
self._writeList = [ ]
self._handlingList = [ ]

# ------------------------------------------------------------------------

def _incThreadsCount(self) :
with self._opLock :
self._threadsCount += 1

# ------------------------------------------------------------------------

def _decThreadsCount(self) :
with self._opLock :
self._threadsCount -= 1
self._udpSockEvt = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
for i in range(30) :
self._udpSockEvtAddr = ('127.0.0.1', 54321+i)
try :
self._udpSockEvt.bind(self._udpSockEvtAddr)
break
except :
pass

# ------------------------------------------------------------------------

Expand Down Expand Up @@ -79,7 +75,7 @@ def _removeSocket(self, socket) :

def _socketListAdd(self, socket, socketsList) :
with self._opLock :
if socket.fileno() in self._asyncSockets and socket not in socketsList :
if socket not in socketsList :
socketsList.append(socket)
return True
return False
Expand All @@ -88,62 +84,104 @@ def _socketListAdd(self, socket, socketsList) :

def _socketListRemove(self, socket, socketsList) :
with self._opLock :
if socket.fileno() in self._asyncSockets and socket in socketsList :
if socket in socketsList :
socketsList.remove(socket)
return True
return False

# ------------------------------------------------------------------------

def _sendUDPSockEvent(self) :
self._udpSockEvt.sendto(b'\xFF', self._udpSockEvtAddr)

# ------------------------------------------------------------------------

def _processWaitEvents(self) :
self._incThreadsCount()
timeSec = perf_counter()

def jobExceptionalCondition(asyncSocket) :
asyncSocket.OnExceptionalCondition()
self._socketListRemove(asyncSocket.GetSocketObj(), self._handlingList)

def jobReadyForWriting(asyncSocket) :
asyncSocket.OnReadyForWriting()
self._socketListRemove(asyncSocket.GetSocketObj(), self._handlingList)

def jobReadyForReading(asyncSocket) :
s = asyncSocket.GetSocketObj()
asyncSocket.OnReadyForReading()
self._socketListRemove(asyncSocket.GetSocketObj(), self._handlingList)

self._processing = True

self._socketListAdd(self._udpSockEvt, self._readList)

timeSec = perf_counter()
udpSockEvtBuf = bytearray(32)

while self._processing :
try :
try :
rd, wr, ex = select( self._readList,
self._writeList,
self._readList,
XAsyncSocketsPool._CHECK_SEC_INTERVAL )
except KeyboardInterrupt as ex :
raise ex
except Exception as ex :
except KeyboardInterrupt :
break
except :
continue
if not self._processing :
break
for socketsList in ex, wr, rd :
for sock in socketsList :
with self._opLock :
asyncSocket = self._asyncSockets.get(sock.fileno(), None)
if asyncSocket :
if self._socketListAdd(sock, self._handlingList) :
if socketsList is ex :
asyncSocket.OnExceptionalCondition()
self._socketListRemove(sock, self._readList)
self._socketListRemove(sock, self._writeList)
sock.shutdown(socket.SHUT_RDWR)
elif socketsList is wr :
self._socketListRemove(sock, self._writeList)
asyncSocket.OnReadyForWriting()
else :
asyncSocket.OnReadyForReading()
self._socketListRemove(sock, self._handlingList)
if sock == self._udpSockEvt :
self._udpSockEvt.recv_into(udpSockEvtBuf)
else :
self._socketListRemove(sock, self._readList)
self._socketListRemove(sock, self._writeList)
sock.shutdown(socket.SHUT_RDWR)
asyncSocket = self._asyncSockets.get(sock.fileno())
if asyncSocket :
if self._socketListAdd(sock, self._handlingList) :
if socketsList is rd :
if self._microWorkers :
self._microWorkers.AddJob(jobReadyForReading, asyncSocket)
else :
jobReadyForReading(asyncSocket)
elif socketsList is wr :
self._socketListRemove(sock, self._writeList)
if self._microWorkers :
self._microWorkers.AddJob(jobReadyForWriting, asyncSocket)
else :
jobReadyForWriting(asyncSocket)
else :
if self._microWorkers :
self._microWorkers.AddJob(jobExceptionalCondition, asyncSocket)
else :
jobExceptionalCondition(asyncSocket)
else :
self._socketListRemove(sock, self._readList)
self._socketListRemove(sock, self._writeList)
sock.close()
sec = perf_counter()
if sec > timeSec + XAsyncSocketsPool._CHECK_SEC_INTERVAL :
timeSec = sec
for asyncSocket in list(self._asyncSockets.values()) :
if asyncSocket.ExpireTimeSec and \
timeSec > asyncSocket.ExpireTimeSec :
asyncSocket._close(XClosedReason.Timeout)
except KeyboardInterrupt :
self._processing = False
except :
pass
self._decThreadsCount()

if self._microWorkers :
self._microWorkers.StopAll()
self._microWorkers = None
for asyncSocket in list(self._asyncSockets.values()) :
try :
asyncSocket.Close()
except :
pass

self._readList.clear()
self._writeList.clear()

self._processing = None

# ------------------------------------------------------------------------

Expand Down Expand Up @@ -181,7 +219,8 @@ def NotifyNextReadyForReading(self, asyncSocket, notify) :
except :
raise XAsyncSocketsPoolException('NotifyNextReadyForReading : "asyncSocket" is incorrect.')
if notify :
self._socketListAdd(socket, self._readList)
if self._socketListAdd(socket, self._readList) :
self._sendUDPSockEvent()
else :
self._socketListRemove(socket, self._readList)

Expand All @@ -193,42 +232,48 @@ def NotifyNextReadyForWriting(self, asyncSocket, notify) :
except :
raise XAsyncSocketsPoolException('NotifyNextReadyForWriting : "asyncSocket" is incorrect.')
if notify :
self._socketListAdd(socket, self._writeList)
if self._socketListAdd(socket, self._writeList) :
self._sendUDPSockEvent()
else :
self._socketListRemove(socket, self._writeList)

# ------------------------------------------------------------------------

def AsyncWaitEvents(self, threadsCount=0) :
if self._processing or self._threadsCount :
if self.WaitEventsProcessing :
return
self._processing = True
self._processing = False
if threadsCount > 0 :
try :
for i in range(threadsCount) :
start_new_thread(self._processWaitEvents, ())
if i < threadsCount-1 :
sleep(XAsyncSocketsPool._CHECK_SEC_INTERVAL / threadsCount)
while self._processing and self._threadsCount < threadsCount :
if threadsCount > 1 :
self._microWorkers = MicroWorkers(workersCount=threadsCount-1)
start_new_thread(self._processWaitEvents, ())
while self._processing != True :
sleep(0.010)
except :
self._processing = False
if self._microWorkers :
self._microWorkers.StopAll()
self._microWorkers = None
self._processing = None
raise XAsyncSocketsPoolException('AsyncWaitEvents : Fatal error to create new threads...')
else :
self._processWaitEvents()

# ------------------------------------------------------------------------

def StopWaitEvents(self) :
if not self.WaitEventsProcessing :
return
self._processing = False
while self._threadsCount :
sleep(0.001)
self._sendUDPSockEvent()
while self.WaitEventsProcessing :
sleep(0.010)

# ------------------------------------------------------------------------

@property
def WaitEventsProcessing(self) :
return (self._threadsCount > 0)
return (self._processing is not None)

# ============================================================================
# ===( XClosedReason )========================================================
Expand Down Expand Up @@ -1101,6 +1146,101 @@ def Clear(self) :
def Empty(self) :
return (self._first is None)

# ============================================================================
# ===( MicroWorkers )=========================================================
# ============================================================================

class MicroWorkersException(Exception) :
pass

class MicroWorkers :

def __init__(self, workersCount, workersStackSize=None) :
self._workersCount = 0
self._criticalLock = allocate_lock()
self._workersLock = allocate_lock()
self._jobsPrcCount = 0
self._jobs = [ ]
self._processing = True
originalStackSize = None
if not isinstance(workersCount, int) or workersCount <= 0 :
raise MicroWorkersException('"workersCount" must be an integer greater than zero.')
if workersStackSize is not None :
if not isinstance(workersStackSize, int) or workersStackSize <= 0 :
raise MicroWorkersException('"workersStackSize" must be an integer greater than zero or None.')
try :
originalStackSize = stack_size(workersStackSize)
except :
raise MicroWorkersException('"workersStackSize" of %s cannot be used.' % workersStackSize)
try :
for _ in range(workersCount) :
start_new_thread(self._workerThreadFunc, (None, ))
while self._workersCount < workersCount :
sleep(0.010)
except Exception as ex :
self.StopAll()
raise MicroWorkersException('Error to create workers : %s' % ex)
if originalStackSize is not None :
stack_size(originalStackSize)

def _workerThreadFunc(self, arg) :
with self._criticalLock :
self._workersCount += 1
while self._processing :
jobFunc = None
self._workersLock.acquire()
if self._processing :
try :
jobFunc, jobArg = self._jobs.pop(0)
except :
self._workersLock.acquire()
self._workersLock.release()
if jobFunc :
with self._criticalLock :
self._jobsPrcCount += 1
try :
jobFunc(jobArg)
except :
pass
with self._criticalLock :
self._jobsPrcCount -= 1
with self._criticalLock :
self._workersCount -= 1

def AddJob(self, function, arg=None) :
if function :
self._jobs.append( (function, arg) )
try :
self._workersLock.release()
except :
pass

def StopAll(self) :
self._processing = False
self._jobs.clear()
try :
self._workersLock.release()
except :
pass
while self._workersCount :
sleep(0.010)

@property
def Count(self) :
return self._workersCount

@property
def JobsInQueue(self) :
return len(self._jobs)

@property
def JobsInProcess(self) :
return self._jobsPrcCount

@property
def IsWorking(self) :
return (len(self._jobs) > 0 or self._jobsPrcCount > 0)

# ============================================================================
# ============================================================================
# ============================================================================

0 comments on commit 2f1e982

Please sign in to comment.