diff --git a/MicroWebSrv2/libs/XAsyncSockets.py b/MicroWebSrv2/libs/XAsyncSockets.py index 13858e5..80db0f4 100644 --- a/MicroWebSrv2/libs/XAsyncSockets.py +++ b/MicroWebSrv2/libs/XAsyncSockets.py @@ -4,7 +4,7 @@ """ -from _thread import allocate_lock, start_new_thread +from _thread import allocate_lock, start_new_thread, stack_size from time import sleep from select import select import socket @@ -29,25 +29,21 @@ class XAsyncSocketsPool : _CHECK_SEC_INTERVAL = 1.0 def __init__(self) : - self._processing = False - self._threadsCount = 0 + self._processing = None + self._microWorkers = None self._opLock = allocate_lock() self._asyncSockets = { } self._readList = [ ] self._writeList = [ ] self._handlingList = [ ] - - # ------------------------------------------------------------------------ - - def _incThreadsCount(self) : - with self._opLock : - self._threadsCount += 1 - - # ------------------------------------------------------------------------ - - def _decThreadsCount(self) : - with self._opLock : - self._threadsCount -= 1 + self._udpSockEvt = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + for i in range(30) : + self._udpSockEvtAddr = ('127.0.0.1', 54321+i) + try : + self._udpSockEvt.bind(self._udpSockEvtAddr) + break + except : + pass # ------------------------------------------------------------------------ @@ -79,7 +75,7 @@ def _removeSocket(self, socket) : def _socketListAdd(self, socket, socketsList) : with self._opLock : - if socket.fileno() in self._asyncSockets and socket not in socketsList : + if socket not in socketsList : socketsList.append(socket) return True return False @@ -88,16 +84,40 @@ def _socketListAdd(self, socket, socketsList) : def _socketListRemove(self, socket, socketsList) : with self._opLock : - if socket.fileno() in self._asyncSockets and socket in socketsList : + if socket in socketsList : socketsList.remove(socket) return True return False # ------------------------------------------------------------------------ + def _sendUDPSockEvent(self) : + self._udpSockEvt.sendto(b'\xFF', self._udpSockEvtAddr) + + # ------------------------------------------------------------------------ + def _processWaitEvents(self) : - self._incThreadsCount() - timeSec = perf_counter() + + def jobExceptionalCondition(asyncSocket) : + asyncSocket.OnExceptionalCondition() + self._socketListRemove(asyncSocket.GetSocketObj(), self._handlingList) + + def jobReadyForWriting(asyncSocket) : + asyncSocket.OnReadyForWriting() + self._socketListRemove(asyncSocket.GetSocketObj(), self._handlingList) + + def jobReadyForReading(asyncSocket) : + s = asyncSocket.GetSocketObj() + asyncSocket.OnReadyForReading() + self._socketListRemove(asyncSocket.GetSocketObj(), self._handlingList) + + self._processing = True + + self._socketListAdd(self._udpSockEvt, self._readList) + + timeSec = perf_counter() + udpSockEvtBuf = bytearray(32) + while self._processing : try : try : @@ -105,33 +125,40 @@ def _processWaitEvents(self) : self._writeList, self._readList, XAsyncSocketsPool._CHECK_SEC_INTERVAL ) - except KeyboardInterrupt as ex : - raise ex - except Exception as ex : + except KeyboardInterrupt : + break + except : continue if not self._processing : break for socketsList in ex, wr, rd : for sock in socketsList : - with self._opLock : - asyncSocket = self._asyncSockets.get(sock.fileno(), None) - if asyncSocket : - if self._socketListAdd(sock, self._handlingList) : - if socketsList is ex : - asyncSocket.OnExceptionalCondition() - self._socketListRemove(sock, self._readList) - self._socketListRemove(sock, self._writeList) - sock.shutdown(socket.SHUT_RDWR) - elif socketsList is wr : - self._socketListRemove(sock, self._writeList) - asyncSocket.OnReadyForWriting() - else : - asyncSocket.OnReadyForReading() - self._socketListRemove(sock, self._handlingList) + if sock == self._udpSockEvt : + self._udpSockEvt.recv_into(udpSockEvtBuf) else : - self._socketListRemove(sock, self._readList) - self._socketListRemove(sock, self._writeList) - sock.shutdown(socket.SHUT_RDWR) + asyncSocket = self._asyncSockets.get(sock.fileno()) + if asyncSocket : + if self._socketListAdd(sock, self._handlingList) : + if socketsList is rd : + if self._microWorkers : + self._microWorkers.AddJob(jobReadyForReading, asyncSocket) + else : + jobReadyForReading(asyncSocket) + elif socketsList is wr : + self._socketListRemove(sock, self._writeList) + if self._microWorkers : + self._microWorkers.AddJob(jobReadyForWriting, asyncSocket) + else : + jobReadyForWriting(asyncSocket) + else : + if self._microWorkers : + self._microWorkers.AddJob(jobExceptionalCondition, asyncSocket) + else : + jobExceptionalCondition(asyncSocket) + else : + self._socketListRemove(sock, self._readList) + self._socketListRemove(sock, self._writeList) + sock.close() sec = perf_counter() if sec > timeSec + XAsyncSocketsPool._CHECK_SEC_INTERVAL : timeSec = sec @@ -139,11 +166,22 @@ def _processWaitEvents(self) : if asyncSocket.ExpireTimeSec and \ timeSec > asyncSocket.ExpireTimeSec : asyncSocket._close(XClosedReason.Timeout) - except KeyboardInterrupt : - self._processing = False except : pass - self._decThreadsCount() + + if self._microWorkers : + self._microWorkers.StopAll() + self._microWorkers = None + for asyncSocket in list(self._asyncSockets.values()) : + try : + asyncSocket.Close() + except : + pass + + self._readList.clear() + self._writeList.clear() + + self._processing = None # ------------------------------------------------------------------------ @@ -181,7 +219,8 @@ def NotifyNextReadyForReading(self, asyncSocket, notify) : except : raise XAsyncSocketsPoolException('NotifyNextReadyForReading : "asyncSocket" is incorrect.') if notify : - self._socketListAdd(socket, self._readList) + if self._socketListAdd(socket, self._readList) : + self._sendUDPSockEvent() else : self._socketListRemove(socket, self._readList) @@ -193,26 +232,29 @@ def NotifyNextReadyForWriting(self, asyncSocket, notify) : except : raise XAsyncSocketsPoolException('NotifyNextReadyForWriting : "asyncSocket" is incorrect.') if notify : - self._socketListAdd(socket, self._writeList) + if self._socketListAdd(socket, self._writeList) : + self._sendUDPSockEvent() else : self._socketListRemove(socket, self._writeList) # ------------------------------------------------------------------------ def AsyncWaitEvents(self, threadsCount=0) : - if self._processing or self._threadsCount : + if self.WaitEventsProcessing : return - self._processing = True + self._processing = False if threadsCount > 0 : try : - for i in range(threadsCount) : - start_new_thread(self._processWaitEvents, ()) - if i < threadsCount-1 : - sleep(XAsyncSocketsPool._CHECK_SEC_INTERVAL / threadsCount) - while self._processing and self._threadsCount < threadsCount : + if threadsCount > 1 : + self._microWorkers = MicroWorkers(workersCount=threadsCount-1) + start_new_thread(self._processWaitEvents, ()) + while self._processing != True : sleep(0.010) except : - self._processing = False + if self._microWorkers : + self._microWorkers.StopAll() + self._microWorkers = None + self._processing = None raise XAsyncSocketsPoolException('AsyncWaitEvents : Fatal error to create new threads...') else : self._processWaitEvents() @@ -220,15 +262,18 @@ def AsyncWaitEvents(self, threadsCount=0) : # ------------------------------------------------------------------------ def StopWaitEvents(self) : + if not self.WaitEventsProcessing : + return self._processing = False - while self._threadsCount : - sleep(0.001) + self._sendUDPSockEvent() + while self.WaitEventsProcessing : + sleep(0.010) # ------------------------------------------------------------------------ @property def WaitEventsProcessing(self) : - return (self._threadsCount > 0) + return (self._processing is not None) # ============================================================================ # ===( XClosedReason )======================================================== @@ -1101,6 +1146,101 @@ def Clear(self) : def Empty(self) : return (self._first is None) +# ============================================================================ +# ===( MicroWorkers )========================================================= +# ============================================================================ + +class MicroWorkersException(Exception) : + pass + +class MicroWorkers : + + def __init__(self, workersCount, workersStackSize=None) : + self._workersCount = 0 + self._criticalLock = allocate_lock() + self._workersLock = allocate_lock() + self._jobsPrcCount = 0 + self._jobs = [ ] + self._processing = True + originalStackSize = None + if not isinstance(workersCount, int) or workersCount <= 0 : + raise MicroWorkersException('"workersCount" must be an integer greater than zero.') + if workersStackSize is not None : + if not isinstance(workersStackSize, int) or workersStackSize <= 0 : + raise MicroWorkersException('"workersStackSize" must be an integer greater than zero or None.') + try : + originalStackSize = stack_size(workersStackSize) + except : + raise MicroWorkersException('"workersStackSize" of %s cannot be used.' % workersStackSize) + try : + for _ in range(workersCount) : + start_new_thread(self._workerThreadFunc, (None, )) + while self._workersCount < workersCount : + sleep(0.010) + except Exception as ex : + self.StopAll() + raise MicroWorkersException('Error to create workers : %s' % ex) + if originalStackSize is not None : + stack_size(originalStackSize) + + def _workerThreadFunc(self, arg) : + with self._criticalLock : + self._workersCount += 1 + while self._processing : + jobFunc = None + self._workersLock.acquire() + if self._processing : + try : + jobFunc, jobArg = self._jobs.pop(0) + except : + self._workersLock.acquire() + self._workersLock.release() + if jobFunc : + with self._criticalLock : + self._jobsPrcCount += 1 + try : + jobFunc(jobArg) + except : + pass + with self._criticalLock : + self._jobsPrcCount -= 1 + with self._criticalLock : + self._workersCount -= 1 + + def AddJob(self, function, arg=None) : + if function : + self._jobs.append( (function, arg) ) + try : + self._workersLock.release() + except : + pass + + def StopAll(self) : + self._processing = False + self._jobs.clear() + try : + self._workersLock.release() + except : + pass + while self._workersCount : + sleep(0.010) + + @property + def Count(self) : + return self._workersCount + + @property + def JobsInQueue(self) : + return len(self._jobs) + + @property + def JobsInProcess(self) : + return self._jobsPrcCount + + @property + def IsWorking(self) : + return (len(self._jobs) > 0 or self._jobsPrcCount > 0) + # ============================================================================ # ============================================================================ # ============================================================================