Skip to content

Commit

Permalink
better multiprocessor obj
Browse files Browse the repository at this point in the history
  • Loading branch information
Alireza Sadri committed May 3, 2021
1 parent 09da288 commit fb5daac
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 32 deletions.
45 changes: 24 additions & 21 deletions RobustGaussianFittingLibrary/misc.py
Original file line number Diff line number Diff line change
Expand Up @@ -347,7 +347,6 @@ class multiprocessor:
def __init__(self,
targetFunction,
inputs,
outputIsNumpy = False,
max_cpu = None,
showProgress = False):
"""
Expand All @@ -358,55 +357,60 @@ def __init__(self,
the targetFunction, such as by syntax:
targetFunction(inputs[0])
Indeed we actually do this syntax before anything.
outputIsNumpy: If outputIsNumpy is True, then a numpy array will
be allocated and filled in with outputs of the queue. This
only works if the output of the targetFunction is numpy array.
notice that it cannot be a number, if it isa number you
must set the return of the targetFunction to np.array([output])
default: False
max_cpu: max number of allowed cpu
default: None
showProgress: using textProgBar, it shows the progress of
multiprocessing of your task.
default: False
"""
try:
targetFunction(inputs[0])
funcOutput = targetFunction(inputs[0])
except:
print('Running the following syntax raised an exception:')
print('targetFunction(inputs[0])')
print('I cannot call your function with input[0]')
print('You need to make inputs mutable and indexable by axis 0')
exit()

if(type(inputs).__module__ == np.__name__):
self.n_pts = inputs.shape[0]
if(showProgress):
print('Input is a numpy ndArray')
else:
try:
self.n_pts = len(inputs)
if(showProgress):
print('Input is not a numpy ndArray')
except:
print('I can not get the length of the input list')
print('we are supporting lists, tuples and numpy ndarrays')
exit()

if(type(funcOutput).__module__ == np.__name__):
self.outputIsNumpy = True
if(showProgress):
print('output is a numpy ndArray')
self.targetFunction = targetFunction
self.inputs = inputs
self.outputIsNumpy = outputIsNumpy
self.showProgress = showProgress
self.max_cpu = max_cpu

def _multiprocFunc(self, aQ, _procID, inputArgs):
result = self.targetFunction(inputArgs)
aQ.put(list([_procID, result]))

def __call__(self):
#for example allData is size N by D
try:
N = len(self.inputs)
except:
N = self.inputs.shape[0]

def start(self):
myCPUCount = cpu_count()-1
if(self.max_cpu is not None):
myCPUCount = np.minimum(self.max_cpu, myCPUCount)
aQ = Queue()
numProc = N
numProc = self.n_pts
procID = 0
numProcessed = 0
numBusyCores = 0
if(not self.outputIsNumpy):
allResults = []
Q_procID = np.zeros(N, dtype='uint')
Q_procID = np.zeros(self.n_pts, dtype='uint')
while(numProcessed<numProc):
if (not aQ.empty()):
aQElement = aQ.get()
Expand All @@ -416,10 +420,9 @@ def __call__(self):
else:
if(numProcessed == 0):
allResults = np.zeros(
shape = ((N,) + aQElement[1].shape),
shape = ((self.n_pts,) + aQElement[1].shape),
dtype = aQElement[1].dtype)
else:
allResults[aQElement[0]] = aQElement[1]
allResults[aQElement[0]] = aQElement[1]
numProcessed += 1
numBusyCores -= 1
if(self.showProgress):
Expand Down
31 changes: 21 additions & 10 deletions RobustGaussianFittingLibrary/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -1244,27 +1244,39 @@ def test_getTriangularVertices():

def multiprocessor_targetFunc(anInput):
data, op_type = anInput
if(op_type=='mean'):
if(op_type=='median'):
to_return = np.median(data)
elif(op_type=='mean'):
to_return = data.mean()
if(op_type=='max'):
elif(op_type=='max'):
to_return = data.max()
return(np.array([to_return]))

def test_multiprocessor():
N = 10000
Data = np.random.randn(N,100)
Param = 'max'

################### Data #################
N = 1000
Data = (10+100*np.random.randn(N,100)).astype('int')
Param = 'median'

################### mp #################
inputs = []
for cnt in range(N):
inputs.append((Data[cnt], Param))
someMul = RobustGaussianFittingLibrary.misc.multiprocessor(
multiprocessor_targetFunc, inputs,
outputIsNumpy = True, showProgress = True)
means = np.squeeze(someMul())
print(np.array([ [means], [Data.max(1)]]).T)
multiprocessor_targetFunc, inputs, showProgress=True).start()
means = np.squeeze(someMul)
#######################################

directMethod = np.median(Data, axis = 1)

print(np.array([ means, directMethod]).T)
print('difference of results: ', (directMethod - someMul).sum())

if __name__ == '__main__':
print('PID ->' + str(os.getpid()))
test_multiprocessor()
exit()
test_getTriangularVertices()
test_fitBackgroundCylindrically()
test_fitValue2Skewed_sweep_over_N()
Expand Down Expand Up @@ -1292,7 +1304,6 @@ def test_multiprocessor():
test_fitBackgroundRadiallyTensor_multiproc()
test_PDF2Uniform()
test_RobustAlgebraicLineFittingPy()
test_multiprocessor()
visOrderStat()
print('This was robust fitting')
exit()
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
------------------------------------------------------
"""
from distutils.core import setup, Extension
_version = 'v0.1.32'
_version = 'v0.1.33'
setup(
name = 'RobustGaussianFittingLibrary', # How you named your package folder (MyLib)
packages = ['RobustGaussianFittingLibrary'], # Chose the same as "name"
Expand Down

0 comments on commit fb5daac

Please sign in to comment.